Skip to content

Commit

Permalink
Merge pull request #30 from databricks-industry-solutions/neuralforec…
Browse files Browse the repository at this point in the history
…ast-update

neuralforecast models update
  • Loading branch information
ryuta-yoshimatsu authored May 19, 2024
2 parents 61e89d0 + 5eaed4e commit 464bea6
Show file tree
Hide file tree
Showing 14 changed files with 458 additions and 299 deletions.
10 changes: 6 additions & 4 deletions forecasting_sa/Forecaster.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,8 @@ def objective(params, train_df_path, val_df_path):
best_model.fit(pd.concat([train_df, val_df]))
return best_model, best_params
else:
model.fit(pd.concat([train_df, val_df]))
#model.fit(x=pd.concat([train_df, val_df]), spark=self.spark)
model.fit(x=pd.concat([train_df, val_df]))
return model, model_conf

def backtest_global_model(
Expand Down Expand Up @@ -580,15 +581,16 @@ def evaluate_global_model(self, model_conf):
print(f"Champion alias assigned to the new model")

def score_models(self):
print("starting run_scoring")
print("Starting run_scoring")
for model_name in self.model_registry.get_active_model_keys():
model_conf = self.model_registry.get_model_conf(model_name)
print(f"Started scoring with {model_name}")
if model_conf["model_type"] == "global":
self.score_global_model(model_conf)
elif model_conf["model_type"] == "local":
self.score_local_model(model_conf)
print(f"finished scoring with {model_name}")
print("finished run_scoring")
print(f"Finished scoring with {model_name}")
print("Finished run_scoring")

def score_local_model(self, model_conf):
src_df = self.resolve_source("train_data")
Expand Down
2 changes: 1 addition & 1 deletion forecasting_sa/base_forecasting_conf.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
output_format: "delta"
accelerator: cpu
temp_path: /dbfs/tmp
temp_path: /Volumes/solacc_uc/mmf/partitions

metric: smape
scoring_model_stage: Production
Expand Down
2 changes: 1 addition & 1 deletion forecasting_sa/models/abstract_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def prepare_data(self, df: pd.DataFrame) -> pd.DataFrame:
return df

@abstractmethod
def fit(self, x, y=None):
def fit(self, x, y=None, spark=None):
pass

@abstractmethod
Expand Down
86 changes: 71 additions & 15 deletions forecasting_sa/models/neuralforecast/NeuralForecastPipeline.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import pandas as pd
import numpy as np
from sktime.performance_metrics.forecasting import mean_absolute_percentage_error
from neuralforecast import NeuralForecast
from neuralforecast import NeuralForecast, DistributedConfig
from forecasting_sa.models.abstract_model import ForecastingRegressor
from neuralforecast.auto import (
RNN,
Expand Down Expand Up @@ -104,21 +104,32 @@ def prepare_static_features(self, df: pd.DataFrame) -> pd.DataFrame:
return None

def fit(self, x, y=None):
#self.spark = spark
if isinstance(self.model, NeuralForecast):
_df = self.prepare_data(x)
_static_df = self.prepare_static_features(x)
self.model.fit(df=_df, static_df=_static_df)
pdf = self.prepare_data(x)
static_pdf = self.prepare_static_features(x)

#self.model.fit(
# df=self.spark.createDataFrame(pdf),
# static_df=self.spark.createDataFrame(static_pdf) if static_pdf else None,
# distributed_config=self.dist_cfg
# )

self.model.fit(df=pdf, static_df=static_pdf)

def predict(self, hist_df: pd.DataFrame, val_df: pd.DataFrame = None):
_df = self.prepare_data(hist_df)
_dynamic_future = self.prepare_data(val_df, future=True)
_dynamic_future = None if _dynamic_future.empty else _dynamic_future
_static_df = self.prepare_static_features(hist_df)
forecast_df = self.model.predict(
df=_df,
static_df=_static_df,
futr_df=_dynamic_future
)

#forecast_df = self.model.predict(
# df=self.spark.createDataFrame(_df),
# static_df=self.spark.createDataFrame(_static_df) if _static_df else None,
# futr_df=self.spark.createDataFrame(_dynamic_future) if _dynamic_future else None
#).toPandas()

forecast_df = self.model.predict(df=_df, static_df=_static_df, futr_df=_dynamic_future)
target = [col for col in forecast_df.columns.to_list()
if col not in ["unique_id", "ds"]][0]
forecast_df = forecast_df.reset_index(drop=False).rename(
Expand Down Expand Up @@ -149,15 +160,18 @@ def forecast(self, df: pd.DataFrame):
# Check if dynamic futures for all unique_id are provided.
# If not, drop unique_id without dynamic futures from scoring.
if (_dynamic_future is not None) and \
(not set(_df["unique_id"].unique().flatten()) \
(not set(_df["unique_id"].unique().flatten())
.issubset(set(_dynamic_future["unique_id"].unique().flatten()))):
_df = _df[_df["unique_id"].isin(list(_dynamic_future["unique_id"].unique()))]

forecast_df = self.model.predict(
df=_df,
static_df=_static_df,
futr_df=_dynamic_future
)
#forecast_df = self.model.predict(
# df=self.spark.createDataFrame(_df),
# static_df=self.spark.createDataFrame(_static_df),
# futr_df=self.spark.createDataFrame(_dynamic_future)
#).toPandas()

forecast_df = self.model.predict(df=_df, static_df=_static_df, futr_df=_dynamic_future)

target = [col for col in forecast_df.columns.to_list()
if col not in ["unique_id", "ds"]][0]
forecast_df = forecast_df.reset_index(drop=False).rename(
Expand Down Expand Up @@ -350,6 +364,7 @@ def __init__(self, params):
super().__init__(params)
self.params = params
self.loss = get_loss_function(self.params.loss)
self.cpus = 0 if self.params.accelerator == 'gpu' else -1
self.gpus = -1 if self.params.accelerator == 'gpu' else 0
self.config = dict(
encoder_n_layers=self.params.encoder_n_layers,
Expand All @@ -373,6 +388,7 @@ def __init__(self, params):
h=int(self.params["prediction_length"]),
loss=self.loss,
config=self.config,
#cpus=self.cpus,
gpus=self.gpus,
search_alg=HyperOptSearch(),
num_samples=int(self.params["num_samples"]),
Expand All @@ -390,6 +406,7 @@ def __init__(self, params):
super().__init__(params)
self.params = params
self.loss = get_loss_function(self.params.loss)
self.cpus = 0 if self.params.accelerator == 'gpu' else -1
self.gpus = -1 if self.params.accelerator == 'gpu' else 0
self.config = dict(
encoder_n_layers=self.params.encoder_n_layers,
Expand All @@ -412,6 +429,7 @@ def __init__(self, params):
h=int(self.params["prediction_length"]),
loss=self.loss,
config=self.config,
#cpus=self.cpus,
gpus=self.gpus,
search_alg=HyperOptSearch(),
num_samples=int(self.params["num_samples"]),
Expand All @@ -429,6 +447,7 @@ def __init__(self, params):
super().__init__(params)
self.params = params
self.loss = get_loss_function(self.params.loss)
self.cpus = 0 if self.params.accelerator == 'gpu' else -1
self.gpus = -1 if self.params.accelerator == 'gpu' else 0
self.config = dict(
input_size=self.params.input_size_factor * self.params.prediction_length,
Expand All @@ -443,16 +462,51 @@ def __init__(self, params):
learning_rate=tune.loguniform(1e-5, 1e-1),
batch_size=tune.choice([16, 32]),
)
#self.dist_cfg = DistributedConfig(
# partitions_path=f'{self.params.get("temp_path")}', # path where the partitions will be saved
# num_nodes=1, # number of nodes to use during training (machines)
# devices=4, # number of GPUs in each machine
#)
# pytorch lightning configuration
# the executors don't have permission to write on the filesystem, so we disable saving artifacts
#self.distributed_kwargs = dict(
# accelerator='gpu',
# enable_progress_bar=False,
# logger=False,
# enable_checkpointing=False,
#)
# exogenous features
#self.exogs = {
# 'futr_exog_list': list(self.params.get("dynamic_future", [])),
# 'stat_exog_list': list(self.params.get("static_features", [])),
#}
#def config(trial):
# return dict(
# input_size=self.params.input_size_factor * self.params.prediction_length,
# max_steps=self.params.max_steps,
# learning_rate=tune.loguniform(1e-5, 1e-1),
# **self.exogs,
# **self.distributed_kwargs,
# )

self.model = NeuralForecast(
models=[
AutoNBEATSx(
h=int(self.params["prediction_length"]),
loss=self.loss,
config=self.config,
#cpus=self.cpus,
gpus=self.gpus,
search_alg=HyperOptSearch(),
num_samples=int(self.params["num_samples"]),
),
#AutoNBEATSx(
# h=int(self.params["prediction_length"]),
# loss=self.loss,
# config=config,
# backend='optuna',
# num_samples=int(self.params["num_samples"]),
#),
],
freq=self.params["freq"]
)
Expand All @@ -466,6 +520,7 @@ def __init__(self, params):
super().__init__(params)
self.params = params
self.loss = get_loss_function(self.params.loss)
self.cpus = 0 if self.params.accelerator == 'gpu' else -1
self.gpus = -1 if self.params.accelerator == 'gpu' else 0
self.config = dict(
input_size=self.params.input_size_factor * self.params.prediction_length,
Expand All @@ -490,6 +545,7 @@ def __init__(self, params):
h=int(self.params["prediction_length"]),
loss=self.loss,
config=self.config,
#cpus=self.cpus,
gpus=self.gpus,
search_alg=HyperOptSearch(),
num_samples=int(self.params["num_samples"]),
Expand Down
Loading

0 comments on commit 464bea6

Please sign in to comment.