diff --git a/01_mmf_univariate_daily_demo.py b/demo_global_daily.py similarity index 79% rename from 01_mmf_univariate_daily_demo.py rename to demo_global_daily.py index 89d7b69..837da39 100644 --- a/01_mmf_univariate_daily_demo.py +++ b/demo_global_daily.py @@ -77,28 +77,7 @@ def create_m4_df(): # COMMAND ---------- active_models = [ - "StatsForecastBaselineWindowAverage", - #"StatsForecastBaselineSeasonalWindowAverage", - #"StatsForecastBaselineNaive", - #"StatsForecastBaselineSeasonalNaive", - "StatsForecastAutoArima", - #"StatsForecastAutoETS", - #"StatsForecastAutoCES", - #"StatsForecastAutoTheta", - #"StatsForecastTSB", - #"StatsForecastADIDA", - #"StatsForecastIMAPA", - #"StatsForecastCrostonClassic", - #"StatsForecastCrostonOptimized", - #"StatsForecastCrostonSBA", - "RFableArima", - #"RFableETS", - #"RFableNNETAR", - #"RFableEnsemble", - #"RDynamicHarmonicRegression", - "SKTimeTBats", - #"SKTimeLgbmDsDt", - #"NeuralForecastRNN", + "NeuralForecastRNN", #"NeuralForecastLSTM", #"NeuralForecastNBEATSx", #"NeuralForecastNHITS", @@ -128,7 +107,8 @@ def create_m4_df(): train_data="mmf_train", scoring_data="mmf_train", scoring_output=f"{catalog}.{db}.daily_scoring_output", - metrics_output=f"{catalog}.{db}.daily_metrics_output", + evaluation_output=f"{catalog}.{db}.daily_evaluation_output", + model_output=f"{catalog}.{db}", group_id="unique_id", date_col="ds", target="y", @@ -147,16 +127,17 @@ def create_m4_df(): active_models=active_models, experiment_path=f"/Shared/mmf_experiment", use_case_name="mmf", + accelerator="gpu", ) # COMMAND ---------- -# MAGIC %md ### Metrics output -# MAGIC In the metrics output table, the metrics for all backtest windows and all models are stored. This info can be used to monitor model performance or decide which models should be taken into the final aggregated forecast. +# MAGIC %md ### Evaluation output +# MAGIC In the evaluation output table, the evaluation for all backtest windows and all models are stored. This info can be used to monitor model performance or decide which models should be taken into the final aggregated forecast. # COMMAND ---------- -# MAGIC %sql select * from solacc_uc.mmf.daily_metrics_output order by unique_id, model, backtest_window_start_date +# MAGIC %sql select * from solacc_uc.mmf.daily_evaluation_output order by unique_id, model, backtest_window_start_date # COMMAND ---------- @@ -178,7 +159,11 @@ def create_m4_df(): # COMMAND ---------- -# MAGIC %sql delete from solacc_uc.mmf.daily_metrics_output +# MAGIC %md ### Delete Tables + +# COMMAND ---------- + +# MAGIC %sql delete from solacc_uc.mmf.daily_evaluation_output # COMMAND ---------- diff --git a/02_mm_rossmann_forecasting.py b/demo_global_external_regressors_daily.py similarity index 79% rename from 02_mm_rossmann_forecasting.py rename to demo_global_external_regressors_daily.py index 0f5aa8f..1cbe306 100644 --- a/02_mm_rossmann_forecasting.py +++ b/demo_global_external_regressors_daily.py @@ -59,28 +59,7 @@ from forecasting_sa import run_forecast active_models = [ - #"StatsForecastBaselineWindowAverage", - #"StatsForecastBaselineSeasonalWindowAverage", - #"StatsForecastBaselineNaive", - #"StatsForecastBaselineSeasonalNaive", - #"StatsForecastAutoArima", - #"StatsForecastAutoETS", - #"StatsForecastAutoCES", - #"StatsForecastAutoTheta", - #"StatsForecastTSB", - #"StatsForecastADIDA", - #"StatsForecastIMAPA", - #"StatsForecastCrostonClassic", - #"StatsForecastCrostonOptimized", - #"StatsForecastCrostonSBA", - #"RFableArima", - #"RFableETS", - #"RFableNNETAR", - #"RFableEnsemble", - #"RDynamicHarmonicRegression", - "SKTimeLgbmDsDt", - #"SKTimeTBats", - #"NeuralForecastRNN", + "NeuralForecastRNN", #"NeuralForecastLSTM", #"NeuralForecastNBEATSx", #"NeuralForecastNHITS", @@ -95,7 +74,8 @@ train_data=f"{catalog}.{db}.rossmann_train", scoring_data=f"{catalog}.{db}.rossmann_test", scoring_output=f"{catalog}.{db}.rossmann_scoring_output", - metrics_output=f"{catalog}.{db}.rossmann_metrics_output", + evaluation_output=f"{catalog}.{db}.rossmann_evaluation_output", + model_output=f"{catalog}.{db}", group_id="Store", date_col="Date", target="Sales", @@ -120,7 +100,7 @@ # COMMAND ---------- -# MAGIC %sql select * from solacc_uc.mmf.rossmann_metrics_output order by Store, model, backtest_window_start_date +# MAGIC %sql select * from solacc_uc.mmf.rossmann_evaluation_output order by Store, model, backtest_window_start_date # COMMAND ---------- @@ -132,7 +112,11 @@ # COMMAND ---------- -# MAGIC %sql delete from solacc_uc.mmf.rossmann_metrics_output +# MAGIC %md ### Delete Tables + +# COMMAND ---------- + +# MAGIC %sql delete from solacc_uc.mmf.rossmann_evaluation_output # COMMAND ---------- diff --git a/01_mmf_univariate_monthly_demo.py b/demo_global_monthly.py similarity index 79% rename from 01_mmf_univariate_monthly_demo.py rename to demo_global_monthly.py index c86b4e8..40a0e40 100644 --- a/01_mmf_univariate_monthly_demo.py +++ b/demo_global_monthly.py @@ -88,28 +88,7 @@ def transform_group(df): # COMMAND ---------- active_models = [ - "StatsForecastBaselineWindowAverage", - #"StatsForecastBaselineSeasonalWindowAverage", - #"StatsForecastBaselineNaive", - #"StatsForecastBaselineSeasonalNaive", - "StatsForecastAutoArima", - #"StatsForecastAutoETS", - #"StatsForecastAutoCES", - #"StatsForecastAutoTheta", - #"StatsForecastTSB", - #"StatsForecastADIDA", - #"StatsForecastIMAPA", - #"StatsForecastCrostonClassic", - #"StatsForecastCrostonOptimized", - #"StatsForecastCrostonSBA", - "RFableArima", - #"RFableETS", - #"RFableNNETAR", - #"RFableEnsemble", - #"RDynamicHarmonicRegression", - "SKTimeTBats", - #"SKTimeLgbmDsDt", - #"NeuralForecastRNN", + "NeuralForecastRNN", #"NeuralForecastLSTM", #"NeuralForecastNBEATSx", #"NeuralForecastNHITS", @@ -139,7 +118,8 @@ def transform_group(df): train_data="mmf_train", scoring_data="mmf_train", scoring_output=f"{catalog}.{db}.monthly_scoring_output", - metrics_output=f"{catalog}.{db}.monthly_metrics_output", + evaluation_output=f"{catalog}.{db}.monthly_evaluation_output", + model_output=f"{catalog}.{db}", group_id="unique_id", date_col="date", target="y", @@ -162,16 +142,16 @@ def transform_group(df): # COMMAND ---------- -# MAGIC %md ### Metrics output -# MAGIC In the metrics output table, the metrics for all backtest windows and all models are stored. This info can be used to monitor model performance or decide which models should be taken into the final aggregated forecast. +# MAGIC %md ### Evaluation Output +# MAGIC In the evaluation output table, the evaluation for all backtest windows and all models are stored. This info can be used to monitor model performance or decide which models should be taken into the final aggregated forecast. # COMMAND ---------- -# MAGIC %sql select * from solacc_uc.mmf.monthly_metrics_output order by unique_id, model, backtest_window_start_date +# MAGIC %sql select * from solacc_uc.mmf.monthly_evaluation_output order by unique_id, model, backtest_window_start_date # COMMAND ---------- -# MAGIC %md ### Forecast output +# MAGIC %md ### Forecast Output # MAGIC In the Forecast output table, the final forecast for each model and each time series is stored. # COMMAND ---------- @@ -180,7 +160,7 @@ def transform_group(df): # COMMAND ---------- -# MAGIC %md ### Final Ensemble Output +# MAGIC %md ### Ensemble Output # MAGIC In the final ensemble output table, we store the averaged forecast. The models which meet the threshold defined using the ensembling parameters are taken into consideration # COMMAND ---------- @@ -189,7 +169,11 @@ def transform_group(df): # COMMAND ---------- -# MAGIC %sql delete from solacc_uc.mmf.monthly_metrics_output +# MAGIC %md ### Delete Tables + +# COMMAND ---------- + +# MAGIC %sql delete from solacc_uc.mmf.monthly_evaluation_output # COMMAND ---------- diff --git a/demo_local_univariate_daily.py b/demo_local_univariate_daily.py new file mode 100644 index 0000000..837b2ac --- /dev/null +++ b/demo_local_univariate_daily.py @@ -0,0 +1,189 @@ +# Databricks notebook source +# MAGIC %md +# MAGIC # Many Models Forecasting SA (MMFSA) Demo +# MAGIC This demo highlights how to configure MMF SA to use M4 competition data + +# COMMAND ---------- + +# MAGIC %pip install -r requirements.txt --quiet +dbutils.library.restartPython() +# COMMAND ---------- + +import logging +from tqdm.autonotebook import tqdm +logger = spark._jvm.org.apache.log4j +logging.getLogger("py4j.java_gateway").setLevel(logging.ERROR) +logging.getLogger("py4j.clientserver").setLevel(logging.ERROR) + +# COMMAND ---------- + +import pathlib +import pandas as pd +from datasetsforecast.m4 import M4 +from forecasting_sa import run_forecast + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### Data preparation steps +# MAGIC We are using `datasetsforecast` package to download M4 data. +# MAGIC +# MAGIC M4 dataset contains a set of time series which we use for testing of MMF SA. +# MAGIC +# MAGIC Below we have developed a number of functions to convert M4 time series to the expected format. + +# COMMAND ---------- + +def transform_group(df): + unique_id = df.unique_id.iloc[0] + _start = pd.Timestamp("2020-01-01") + _end = _start + pd.DateOffset(days=int(df.count()[0]) - 1) + date_idx = pd.date_range(start=_start, end=_end, freq="D", name="ds") + res_df = pd.DataFrame(data=[], index=date_idx).reset_index() + res_df["unique_id"] = unique_id + res_df["y"] = df.y.values + return res_df + + +def create_m4_df(): + y_df, _, _ = M4.load(directory=str(pathlib.Path.home()), group="Daily") + _ids = [f"D{i}" for i in range(1, 100)] + y_df = ( + y_df.groupby("unique_id") + .filter(lambda x: x.unique_id.iloc[0] in _ids) + .groupby("unique_id") + .apply(transform_group) + .reset_index(drop=True) + ) + return y_df + +# COMMAND ---------- + +# MAGIC %md ### Now the dataset looks in the following way: + +# COMMAND ---------- + +m4_df = spark.createDataFrame(create_m4_df()) +m4_df.createOrReplaceTempView("mmf_train") + +# COMMAND ---------- + +# MAGIC %sql select * from mmf_train where unique_id in ('D1', 'D2', 'D6', 'D7', 'D10') + +# COMMAND ---------- + +# MAGIC %md ### Let's configure the list of models we are going to use for training: + +# COMMAND ---------- + +active_models = [ + "StatsForecastBaselineWindowAverage", + "StatsForecastBaselineSeasonalWindowAverage", + "StatsForecastBaselineNaive", + "StatsForecastBaselineSeasonalNaive", + "StatsForecastAutoArima", + "StatsForecastAutoETS", + "StatsForecastAutoCES", + "StatsForecastAutoTheta", + "StatsForecastTSB", + "StatsForecastADIDA", + "StatsForecastIMAPA", + "StatsForecastCrostonClassic", + "StatsForecastCrostonOptimized", + "StatsForecastCrostonSBA", + "RFableArima", + "RFableETS", + "RFableNNETAR", + "RFableEnsemble", + "RDynamicHarmonicRegression", + "SKTimeTBats", + "SKTimeLgbmDsDt", +] + +# COMMAND ---------- + +# MAGIC %md ### Now we can run the forecasting process using `run_forecast` function. + +# COMMAND ---------- + +# Make sure that the catalog and the schema exist +catalog = "solacc_uc" # Name of the catalog we use to manage our assets +db = "mmf" # Name of the schema we use to manage our assets (e.g. datasets) + +_ = spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog}") +_ = spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{db}") + +# COMMAND ---------- + +run_forecast( + spark=spark, + train_data="mmf_train", + scoring_data="mmf_train", + scoring_output=f"{catalog}.{db}.daily_scoring_output", + evaluation_output=f"{catalog}.{db}.daily_evaluation_output", + group_id="unique_id", + date_col="ds", + target="y", + freq="D", + prediction_length=10, + backtest_months=1, + stride=10, + train_predict_ratio=2, + data_quality_check=True, + resample=False, + ensemble=True, + ensemble_metric="smape", + ensemble_metric_avg=0.3, + ensemble_metric_max=0.5, + ensemble_scoring_output=f"{catalog}.{db}.daily_ensemble_output", + active_models=active_models, + experiment_path=f"/Shared/mmf_experiment", + use_case_name="mmf", +) + +# COMMAND ---------- + +# MAGIC %md ### Evaluation Output +# MAGIC In the evaluation output table, the evaluations for all backtest windows and all models are stored. This info can be used to monitor model performance or decide which models should be taken into the final aggregated forecast. + +# COMMAND ---------- + +# MAGIC %sql select * from solacc_uc.mmf.daily_evaluation_output order by unique_id, model, backtest_window_start_date + +# COMMAND ---------- + +# MAGIC %md ### Forecast Output +# MAGIC In the Forecast output table, the final forecast for each model and each time series is stored. + +# COMMAND ---------- + +# MAGIC %sql select * from solacc_uc.mmf.daily_scoring_output order by unique_id, model, ds + +# COMMAND ---------- + +# MAGIC %md ### Ensemble Output +# MAGIC In the final ensemble output table, we store the averaged forecast. The models which meet the threshold defined using the ensembling parameters are taken into consideration + +# COMMAND ---------- + +# MAGIC %sql select * from solacc_uc.mmf.daily_ensemble_output order by unique_id, model, ds + +# COMMAND ---------- + +# MAGIC %md ### Delete Tables + +# COMMAND ---------- + +# MAGIC %sql delete from solacc_uc.mmf.daily_evaluation_output + +# COMMAND ---------- + +# MAGIC %sql delete from solacc_uc.mmf.daily_scoring_output + +# COMMAND ---------- + +# MAGIC %sql delete from solacc_uc.mmf.daily_ensemble_output + +# COMMAND ---------- + + diff --git a/demo_local_univariate_external_regressors_daily.py b/demo_local_univariate_external_regressors_daily.py new file mode 100644 index 0000000..7ea46db --- /dev/null +++ b/demo_local_univariate_external_regressors_daily.py @@ -0,0 +1,144 @@ +# Databricks notebook source +# MAGIC %pip install -r requirements.txt --quiet + +# COMMAND ---------- + +import logging +from tqdm.autonotebook import tqdm +logger = spark._jvm.org.apache.log4j +logging.getLogger("py4j.java_gateway").setLevel(logging.ERROR) +logging.getLogger("py4j.clientserver").setLevel(logging.ERROR) + +# COMMAND ---------- + +# Make sure that the catalog and the schema exist +catalog = "solacc_uc" # Name of the catalog we use to manage our assets +db = "mmf" # Name of the schema we use to manage our assets (e.g. datasets) +volume = "rossmann" # Name of the schema where you have your rossmann dataset csv sotred + +_ = spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog}") +_ = spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{db}") +_ = spark.sql(f"CREATE VOLUME IF NOT EXISTS {catalog}.{db}.{volume}") + +# COMMAND ---------- + +# MAGIC %md Download the dataset from [Kaggle](kaggle.com/competitions/rossmann-store-sales/data) and store them in the volume. + +# COMMAND ---------- + +# Randomly select 100 stores to forecast +import random +random.seed(10) +sample = True +stores = sorted(random.sample(range(0, 1000), 10)) + +train = spark.read.csv(f"/Volumes/{catalog}/{db}/{volume}/train.csv", header=True, inferSchema=True) +test = spark.read.csv(f"/Volumes/{catalog}/{db}/{volume}/test.csv", header=True, inferSchema=True) + +if sample: + train = train.filter(train.Store.isin(stores)) + test = test.filter(test.Store.isin(stores)) + +train.write.mode("overwrite").option("mergeSchema", "true").saveAsTable(f"{catalog}.{db}.rossmann_train") +test.write.mode("overwrite").option("mergeSchema", "true").saveAsTable(f"{catalog}.{db}.rossmann_test") + +# COMMAND ---------- + +# Set the number of shuffle partitions larger than the total number of cores +#sqlContext.setConf("spark.sql.shuffle.partitions", "1000") + +# COMMAND ---------- + +display(spark.sql(f"select * from {catalog}.{db}.rossmann_train where Store=49 order by Date")) +display(spark.sql(f"select * from {catalog}.{db}.rossmann_test where Store=49 order by Date")) + +# COMMAND ---------- + +import pathlib +import pandas as pd +from forecasting_sa import run_forecast + +active_models = [ + "StatsForecastBaselineWindowAverage", + "StatsForecastBaselineSeasonalWindowAverage", + "StatsForecastBaselineNaive", + "StatsForecastBaselineSeasonalNaive", + "StatsForecastAutoArima", + "StatsForecastAutoETS", + "StatsForecastAutoCES", + "StatsForecastAutoTheta", + "StatsForecastTSB", + "StatsForecastADIDA", + "StatsForecastIMAPA", + "StatsForecastCrostonClassic", + "StatsForecastCrostonOptimized", + "StatsForecastCrostonSBA", + "RFableArima", + "RFableETS", + "RFableNNETAR", + "RFableEnsemble", + "RDynamicHarmonicRegression", + "SKTimeTBats", + "SKTimeLgbmDsDt", +] + +run_id = run_forecast( + spark=spark, + train_data=f"{catalog}.{db}.rossmann_train", + scoring_data=f"{catalog}.{db}.rossmann_test", + scoring_output=f"{catalog}.{db}.rossmann_scoring_output", + evaluation_output=f"{catalog}.{db}.rossmann_evaluation_output", + group_id="Store", + date_col="Date", + target="Sales", + freq="D", + dynamic_reals=["DayOfWeek", "Open", "Promo", "SchoolHoliday"], + prediction_length=10, + backtest_months=1, + stride=10, + train_predict_ratio=2, + active_models=active_models, + data_quality_check=True, + resample=False, + ensemble=True, + ensemble_metric="smape", + ensemble_metric_avg=0.3, + ensemble_metric_max=0.5, + ensemble_scoring_output=f"{catalog}.{db}.rossmann_ensemble_output", + experiment_path=f"/Shared/mmf_rossmann", + use_case_name="mmf_rossmann", +) +print(run_id) + +# COMMAND ---------- + +# MAGIC %sql select * from solacc_uc.mmf.rossmann_evaluation_output order by Store, model, backtest_window_start_date + +# COMMAND ---------- + +# MAGIC %sql select * from solacc_uc.mmf.rossmann_scoring_output order by Store, model + +# COMMAND ---------- + +# MAGIC %sql select * from solacc_uc.mmf.rossmann_ensemble_output order by Store + +# COMMAND ---------- + +# MAGIC %md ### Delete Tables + +# COMMAND ---------- + +# MAGIC %sql delete from solacc_uc.mmf.rossmann_evaluation_output + +# COMMAND ---------- + +# MAGIC %sql delete from solacc_uc.mmf.rossmann_scoring_output + +# COMMAND ---------- + +# MAGIC %sql delete from solacc_uc.mmf.rossmann_ensemble_output + +# COMMAND ---------- + + + diff --git a/demo_local_univariate_monthly.py b/demo_local_univariate_monthly.py new file mode 100644 index 0000000..40bc848 --- /dev/null +++ b/demo_local_univariate_monthly.py @@ -0,0 +1,200 @@ +# Databricks notebook source +# MAGIC %md +# MAGIC # Many Models Forecasting SA (MMFSA) Demo +# MAGIC This demo highlights how to configure MMF SA to use M4 competition data + +# COMMAND ---------- + +# MAGIC %pip install -r requirements.txt --quiet +dbutils.library.restartPython() +# COMMAND ---------- + +import logging +from tqdm.autonotebook import tqdm +logger = spark._jvm.org.apache.log4j +logging.getLogger("py4j.java_gateway").setLevel(logging.ERROR) +logging.getLogger("py4j.clientserver").setLevel(logging.ERROR) + +# COMMAND ---------- + +import pathlib +import pandas as pd +from datasetsforecast.m4 import M4 +from forecasting_sa import run_forecast + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### Data preparation steps +# MAGIC We are using `datasetsforecast` package to download M4 data. +# MAGIC +# MAGIC M4 dataset contains a set of time series which we use for testing of MMF SA. +# MAGIC +# MAGIC Below we have developed a number of functions to convert M4 time series to the expected format. + +# COMMAND ---------- + +# Number of time series +n = 100 + +def load_m4(): + y_df, _, _ = M4.load(directory=str(pathlib.Path.home()), group="Monthly") + _ids = [f"M{i}" for i in range(1, n + 1)] + y_df = ( + y_df.groupby("unique_id") + .filter(lambda x: x.unique_id.iloc[0] in _ids) + .groupby("unique_id") + .apply(transform_group) + .reset_index(drop=True) + ) + return y_df + +def transform_group(df): + unique_id = df.unique_id.iloc[0] + _cnt = 60 # df.count()[0] + _start = pd.Timestamp("2018-01-01") + _end = _start + pd.DateOffset(months=_cnt) + date_idx = pd.date_range(start=_start, end=_end, freq="M", name="date") + _df = ( + pd.DataFrame(data=[], index=date_idx) + .reset_index() + .rename(columns={"index": "date"}) + ) + _df["unique_id"] = unique_id + _df["y"] = df[:60].y.values + return _df + +train = spark.createDataFrame(load_m4()) +train.createOrReplaceTempView("mmf_train") + +display(spark.read.table("mmf_train")) + +# COMMAND ---------- + +# MAGIC %md ### Now the dataset looks in the following way: + +# COMMAND ---------- + +# MAGIC %sql select unique_id, count(date) as count from mmf_train group by unique_id order by unique_id + +# COMMAND ---------- + +# MAGIC %sql select count(distinct(unique_id)) from mmf_train + +# COMMAND ---------- + +# MAGIC %md ### Let's configure the list of models we are going to use for training: + +# COMMAND ---------- + +active_models = [ + "StatsForecastBaselineWindowAverage", + "StatsForecastBaselineSeasonalWindowAverage", + "StatsForecastBaselineNaive", + "StatsForecastBaselineSeasonalNaive", + "StatsForecastAutoArima", + "StatsForecastAutoETS", + "StatsForecastAutoCES", + "StatsForecastAutoTheta", + "StatsForecastTSB", + "StatsForecastADIDA", + "StatsForecastIMAPA", + "StatsForecastCrostonClassic", + "StatsForecastCrostonOptimized", + "StatsForecastCrostonSBA", + "RFableArima", + "RFableETS", + "RFableNNETAR", + "RFableEnsemble", + "RDynamicHarmonicRegression", + "SKTimeTBats", + "SKTimeLgbmDsDt", +] + +# COMMAND ---------- + +# MAGIC %md ### Now we can run the forecasting process using `run_forecast` function. + +# COMMAND ---------- + +# Make sure that the catalog and the schema exist +catalog = "solacc_uc" # Name of the catalog we use to manage our assets +db = "mmf" # Name of the schema we use to manage our assets (e.g. datasets) + +_ = spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog}") +_ = spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{db}") + +# COMMAND ---------- + +run_forecast( + spark=spark, + train_data="mmf_train", + scoring_data="mmf_train", + scoring_output=f"{catalog}.{db}.monthly_scoring_output", + evaluation_output=f"{catalog}.{db}.monthly_evaluation_output", + group_id="unique_id", + date_col="date", + target="y", + freq="M", + prediction_length=3, + backtest_months=12, + stride=1, + train_predict_ratio=2, + data_quality_check=True, + resample=False, + ensemble=True, + ensemble_metric="smape", + ensemble_metric_avg=0.3, + ensemble_metric_max=0.5, + ensemble_scoring_output=f"{catalog}.{db}.monthly_ensemble_output", + active_models=active_models, + experiment_path=f"/Shared/mmf_experiment_monthly", + use_case_name="mmf", +) + +# COMMAND ---------- + +# MAGIC %md ### Evaluation Output +# MAGIC In the evaluation output table, the evaluation for all backtest windows and all models are stored. This info can be used to monitor model performance or decide which models should be taken into the final aggregated forecast. + +# COMMAND ---------- + +# MAGIC %sql select * from solacc_uc.mmf.monthly_evaluation_output order by unique_id, model, backtest_window_start_date + +# COMMAND ---------- + +# MAGIC %md ### Forecast Output +# MAGIC In the Forecast output table, the final forecast for each model and each time series is stored. + +# COMMAND ---------- + +# MAGIC %sql select * from solacc_uc.mmf.monthly_scoring_output order by unique_id, model, date + +# COMMAND ---------- + +# MAGIC %md ### Ensemble Output +# MAGIC In the final ensemble output table, we store the averaged forecast. The models which meet the threshold defined using the ensembling parameters are taken into consideration + +# COMMAND ---------- + +# MAGIC %sql select * from solacc_uc.mmf.monthly_ensemble_output order by unique_id, model, date + +# COMMAND ---------- + +# MAGIC %md ### Delete Tables + +# COMMAND ---------- + +# MAGIC %sql delete from solacc_uc.mmf.monthly_evaluation_output + +# COMMAND ---------- + +# MAGIC %sql delete from solacc_uc.mmf.monthly_scoring_output + +# COMMAND ---------- + +# MAGIC %sql delete from solacc_uc.mmf.monthly_ensemble_output + +# COMMAND ---------- + + diff --git a/forecasting_sa/Forecaster.py b/forecasting_sa/Forecaster.py index 687a711..2de46d4 100644 --- a/forecasting_sa/Forecaster.py +++ b/forecasting_sa/Forecaster.py @@ -14,6 +14,7 @@ import mlflow from mlflow.exceptions import MlflowException from mlflow.tracking import MlflowClient +from mlflow.models import infer_signature from omegaconf import OmegaConf, DictConfig from omegaconf.basecontainer import BaseContainer from pyspark.sql import SparkSession, DataFrame @@ -26,8 +27,9 @@ TimestampType, BinaryType, ArrayType, + MapType, ) -from pyspark.sql.functions import lit, avg, min, max, col, posexplode, collect_list +from pyspark.sql.functions import lit, avg, min, max, col, posexplode, collect_list, to_date from forecasting_sa.models.abstract_model import ForecastingRegressor from forecasting_sa.models import ModelRegistry from forecasting_sa.data_quality_checks import DataQualityChecks @@ -54,6 +56,7 @@ def __init__( else: raise Exception("No configuration provided!") + self.run_id = str(uuid.uuid4()) self.data_conf = data_conf self.model_registry = ModelRegistry(self.conf) self.spark = spark @@ -77,27 +80,6 @@ def set_mlflow_experiment(self): ) return experiment_id - def split_df_train_val(self, df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]: - """Splits df into train and data, based on backtest months and prediction length. - Data before backtest will be train, and data from backtest (at most prediction length days) will be val data.""" - # Train with data before the backtest months in conf - train_df = df[ - df[self.conf["date_col"]] - <= df[self.conf["date_col"]].max() - - pd.DateOffset(months=self.conf["backtest_months"]) - ] - # Validate with data after the backtest months cutoff... - val_true_df = df[ - df[self.conf["date_col"]] - > df[self.conf["date_col"]].max() - - pd.DateOffset(months=self.conf["backtest_months"]) - ] - # But just until prediction_length - # val_true_df = val_true_df[ - # val_true_df[self.conf['date_col']] < val_true_df[self.conf['date_col']].min() + pd.DateOffset( - # days=self.conf['prediction_length'])] - return train_df, val_true_df - def resolve_source(self, key: str) -> DataFrame: if self.data_conf: df_val = self.data_conf.get(key) @@ -108,38 +90,22 @@ def resolve_source(self, key: str) -> DataFrame: else: return self.spark.read.table(self.conf[key]) - def prepare_data(self, model_conf: DictConfig, path: str, scoring=False) \ - -> pd.DataFrame: - df = self.resolve_source(path) - if model_conf.get("data_prep", "none") == "pivot": - df = ( - df.groupby([self.conf["date_col"]]) - .pivot(self.conf["group_id"]) - .sum(self.conf["target"]) - ) - df = df.toPandas() - if not scoring: - df, removed = DataQualityChecks(df, self.conf).run() - if model_conf.get("data_prep", "none") == "none": - df[self.conf["group_id"]] = df[self.conf["group_id"]].astype(str) - return df - def train_eval_score(self, export_metrics=False, scoring=True) -> str: print("Starting train_evaluate_models") - self.run_id = str(uuid.uuid4()) self.train_models() self.evaluate_models() if scoring: - self.run_scoring() + self.score_models() self.ensemble() if export_metrics: self.update_metrics() + print("Finished train_evaluate_models") return self.run_id def ensemble(self): if self.conf.get("ensemble") and self.conf["ensemble_scoring_output"]: metrics_df = ( - self.spark.table(self.conf["metrics_output"]) + self.spark.table(self.conf["evaluation_output"]) .where(col("run_id").eqNullSafe(lit(self.run_id))) .where( col("metric_name").eqNullSafe( @@ -207,11 +173,36 @@ def ensemble(self): .withColumn("use_case", lit(self.conf["use_case_name"])) .withColumn("model", lit("ensemble")) .write.format("delta") - .option("mergeSchema", "true") .mode("append") .saveAsTable(self.conf["ensemble_scoring_output"]) ) + def prepare_data_for_global_model(self, model_conf: DictConfig, path: str) \ + -> pd.DataFrame: + df = self.resolve_source(path) + df = df.toPandas() + df, removed = DataQualityChecks(df, self.conf).run() + if model_conf.get("data_prep", "none") == "none": + df[self.conf["group_id"]] = df[self.conf["group_id"]].astype(str) + return df, removed + + def split_df_train_val(self, df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]: + """Splits df into train and data, based on backtest months and prediction length. + Data before backtest will be train, and data from backtest (at most prediction length days) will be val data.""" + # Train with data before the backtest months in conf + train_df = df[ + df[self.conf["date_col"]] + <= df[self.conf["date_col"]].max() + - pd.DateOffset(months=self.conf["backtest_months"]) + ] + # Validate with data after the backtest months cutoff... + val_true_df = df[ + df[self.conf["date_col"]] + > df[self.conf["date_col"]].max() + - pd.DateOffset(months=self.conf["backtest_months"]) + ] + return train_df, val_true_df + def train_models(self): """Trains and evaluates all models from the configuration file with the configuration's training data. Then evaluates the current best model with the configuration's training data. @@ -226,15 +217,13 @@ def train_models(self): ): with mlflow.start_run(experiment_id=self.experiment_id) as run: try: + model = self.model_registry.get_model(model_name) # Get training and scoring data - hist_df = self.prepare_data(model_conf, "train_data") + hist_df, removed = self.prepare_data_for_global_model(model_conf, "train_data") train_df, val_train_df = self.split_df_train_val(hist_df) # Train and evaluate new models - results are saved to MLFlow - model = self.model_registry.get_model(model_name) - print("--------------------\nTraining model:") - print(model) - # Trains and evaluates the model - logs results to experiment - self.train_one_model(train_df, val_train_df, model_conf, model) + print(f"Training model: {model}") + self.train_global_model(train_df, val_train_df, model_conf, model) except Exception as err: _logger.error( f"Error has occurred while training model {model_name}: {repr(err)}", @@ -243,18 +232,31 @@ def train_models(self): raise err print("Finished train_models") - def train_one_model( + def train_global_model( self, train_df: pd.DataFrame, val_df: pd.DataFrame, model_conf: DictConfig, model: ForecastingRegressor, ): - print("starting train") - tuned_model, tuned_params = self.tune_model(model_conf, model, train_df, val_df) - model_info = mlflow.sklearn.log_model(tuned_model, "model") + print(f"Started training {model_conf['name']}") + tuned_model, tuned_params = self.tune_global_model(model_conf, model, train_df, val_df) + # TODO fix + signature = infer_signature( + model_input=train_df, + model_output=train_df, + ) + input_example = train_df + model_info = mlflow.sklearn.log_model( + tuned_model, + "model", + registered_model_name=f"{self.conf['model_output']}.{model_conf['name']}", + input_example=input_example, + signature=signature, + pip_requirements=[], + ) + print(f"Model registered: {self.conf['model_output']}.{model_conf['name']}") try: - # TODO Decide if we should flatten in general mlflow.log_params(tuned_params) except MlflowException: # MLflow log_params has a parameter length limit of 500 @@ -263,14 +265,19 @@ def train_one_model( mlflow.log_params( flatten_nested_parameters(OmegaConf.to_object(tuned_params)) ) - self.backtest_and_log_metrics(tuned_model, train_df, val_df, "train") + train_metrics = self.backtest_global_model( + model=tuned_model, + train_df=train_df, + val_df=val_df, + model_uri=model_info.model_uri, + write=True, + ) mlflow.set_tag("action", "train") mlflow.set_tag("candidate", "true") mlflow.set_tag("model_name", model.params["name"]) - print("Finished train") - return model_info + print(f"Finished training {model_conf.get('name')}") - def tune_model( + def tune_global_model( self, model_conf: DictConfig, model: ForecastingRegressor, @@ -278,26 +285,25 @@ def tune_model( val_df: pd.DataFrame, ): def objective(params, train_df_path, val_df_path): - _conf = dict(model_conf) - _conf.update(params) - _model = model.__class__(_conf) - train_df = pd.read_parquet(train_df_path) - val_df = pd.read_parquet(val_df_path) - _model.fit(train_df) + obj_conf = dict(model_conf) + obj_conf.update(params) + _model = model.__class__(obj_conf) + obj_train_df = pd.read_parquet(train_df_path) + obj_val_df = pd.read_parquet(val_df_path) + _model.fit(obj_train_df) _metrics = _model.backtest( - pd.concat([train_df, val_df]), - start=train_df[_date_col].max(), + pd.concat([obj_train_df, obj_val_df]), + start=obj_train_df[_date_col].max(), retrain=_tuning_retrain, ) return {"loss": _metrics["metric_value"], "status": STATUS_OK} if ( self.conf.get("tuning_enabled", False) + and model.params.get("tuning", False) and model.supports_tuning() - and model.params.get("tuning", True) ): print(f'Tuning model: {model_conf["name"]}') - # with mlflow.start_run(): spark_trials = None if self.conf["tuning_distributed"]: spark_trials = SparkTrials( @@ -341,38 +347,54 @@ def objective(params, train_df_path, val_df_path): ) # TODO final train should be configurable # check if we should use all the data for final retrain - best_model.fit(train_df.append(val_df)) + best_model.fit(pd.concat([train_df, val_df])) return best_model, best_params else: - print(f'Fitting model: {model_conf["name"]}') - model.fit(train_df.append(val_df)) + model.fit(pd.concat([train_df, val_df])) return model, model_conf - def backtest_and_log_metrics( + def backtest_global_model( self, model: ForecastingRegressor, train_df: pd.DataFrame, val_df: pd.DataFrame, - prefix: str, + model_uri: str, + write: bool = True, ): - metrics_df = ( + res_pdf = ( model.backtest( pd.concat([train_df, val_df]), start=train_df[self.conf["date_col"]].max(), - retrain=self.conf["backtest_retrain"], - ) - .groupby("metric_name") - .mean() - ) - - metrics = { - f"{k}_{prefix}": v - for x in metrics_df.to_dict().values() - for k, v in x.items() - } + retrain=self.conf["backtest_retrain"])) + + res_sdf = self.spark.createDataFrame(res_pdf)\ + .withColumn("backtest_window_start_date", to_date("backtest_window_start_date"))\ + .withColumn("actual", col("actual").cast("array")) + + if write: + if self.conf.get("evaluation_output", None): + ( + res_sdf.withColumn("run_id", lit(self.run_id)) + .withColumn("run_date", lit(self.run_date)) + .withColumn("model", lit(model.params.name)) + .withColumn("use_case", lit(self.conf["use_case_name"])) + .withColumn("model_uri", lit(model_uri)) + .write.mode("append") + .saveAsTable(self.conf.get("evaluation_output")) + ) - mlflow.log_metrics(next(iter(metrics_df.to_dict().values()))) - return metrics + res_df = ( + res_sdf.groupby(["metric_name"]) + .mean("metric_value") + .withColumnRenamed("avg(metric_value)", "metric_value") + .toPandas() + ) + for rec in res_df.values: + metric_name, metric_value = rec + if write: + mlflow.log_metric(metric_name, metric_value) + print(res_df) + return metric_value def evaluate_models(self): print("Starting evaluate_models") @@ -418,15 +440,15 @@ def evaluate_local_model(self, model_conf): src_df.groupby(self.conf["group_id"]) .applyInPandas(evaluate_one_model_fn, schema=output_schema) ) - - if self.conf.get("metrics_output", None) is not None: + if self.conf.get("evaluation_output", None) is not None: ( res_sdf.withColumn("run_id", lit(self.run_id)) .withColumn("run_date", lit(self.run_date)) .withColumn("model", lit(model_conf["name"])) .withColumn("use_case", lit(self.conf["use_case_name"])) + .withColumn("model_uri", lit("")) .write.mode("append") - .saveAsTable(self.conf.get("metrics_output")) + .saveAsTable(self.conf.get("evaluation_output")) ) res_df = ( @@ -460,8 +482,7 @@ def evaluate_one_model( pdf = pdf.fillna(0.1) # Fix here pdf[model.params["target"]] = pdf[model.params["target"]].clip(0.1) - metrics_df = model.backtest(pdf, start=split_date, retrain=False) - metrics_df[model.params["group_id"]] = group_id + metrics_df = model.backtest(pdf, start=split_date, group_id=group_id, retrain=False) return metrics_df except Exception as err: _logger.error( @@ -485,29 +506,33 @@ def evaluate_one_model( def evaluate_global_model(self, model_conf): mlflow_client = mlflow.tracking.MlflowClient() with mlflow.start_run(experiment_id=self.experiment_id): - hist_df = self.prepare_data(model_conf, "train_data") + hist_df, removed = self.prepare_data_for_global_model(model_conf, "train_data") train_df, val_df = self.split_df_train_val(hist_df) model_name = model_conf["name"] mlflow.set_tag("model_name", model_conf["name"]) - mlflow_model_name = f"{self.conf['use_case_name']}_{model_name}" + mlflow_model_name = f"{self.conf['model_output']}.{model_name}" try: - deployed_model = mlflow.sklearn.load_model( - f"models:/{mlflow_model_name}/{self.conf['scoring_model_stage']}" + champion = mlflow_client.get_model_version_by_alias(mlflow_model_name, "champion") + champion_version = champion.version + champion_run_id = f"runs:/{champion.run_id}/model" + champion_model = mlflow.sklearn.load_model( + f"models:/{mlflow_model_name}/{champion_version}" ) - deployed_metrics = self.backtest_and_log_metrics( - deployed_model, train_df, val_df, "deployed" + champion_metrics = self.backtest_global_model( + model=champion_model, + train_df=train_df, + val_df=val_df, + model_uri=champion_run_id, + write=False, ) except: - print( - "No deployed model yet available for model: ", - mlflow_model_name, - ) - deployed_model = None + print(f"No deployed model yet available for model: {mlflow_model_name}") + champion_model = None new_runs = mlflow_client.search_runs( experiment_ids=[self.experiment_id], filter_string=f"tags.candidate='true' and tags.model_name='{model_name}'", - order_by=[f"metrics.{self.conf['selection_metric']}"], + order_by=["start_time DESC"], max_results=10, ) if len(new_runs) == 0: @@ -518,39 +543,37 @@ def evaluate_global_model(self, model_conf): new_run = new_runs[0] new_model_uri = f"runs:/{new_run.info.run_uuid}/model" new_model = mlflow.sklearn.load_model(new_model_uri) - new_metrics = self.backtest_and_log_metrics( - new_model, train_df, val_df, "new" + new_metrics = self.backtest_global_model( + model=new_model, + train_df=train_df, + val_df=val_df, + model_uri=new_model_uri, + write=False, ) - if ( - deployed_model is None - or new_metrics["smape_new"] <= deployed_metrics["smape_deployed"] - ): + if (champion_model is None) or (new_metrics <= champion_metrics): model_details = mlflow.register_model( - model_uri=new_model_uri, name=mlflow_model_name - ) + model_uri=new_model_uri, name=mlflow_model_name) # wait_until_ready(model_details.name, model_details.version) # TODO: Add description, version, metadata in general - mlflow_client.transition_model_version_stage( - name=model_details.name, - version=model_details.version, - stage=self.conf["scoring_model_stage"], - ) - print("Model promoted to production:") - print(model_details) + mlflow_client.set_registered_model_alias( + mlflow_model_name, + "champion", + model_details.version) + print(f"Champion alias assigned to the new model") - def run_scoring(self): + def score_models(self): print("starting run_scoring") for model_name in self.model_registry.get_active_model_keys(): model_conf = self.model_registry.get_model_conf(model_name) if model_conf["model_type"] == "global": - self.run_scoring_for_global_model(model_conf) + self.score_global_model(model_conf) elif model_conf["model_type"] == "local": - self.run_scoring_for_local_model(model_conf) + self.score_local_model(model_conf) print(f"finished scoring with {model_name}") print("finished run_scoring") - def run_scoring_for_local_model(self, model_conf): + def score_local_model(self, model_conf): src_df = self.resolve_source("train_data") src_df, removed = DataQualityChecks(src_df, self.conf, self.spark).run() # Check if external regressors are provided and framework is statsforecast @@ -576,19 +599,17 @@ def run_scoring_for_local_model(self, model_conf): src_df.groupby(self.conf["group_id"]) .applyInPandas(score_one_model_fn, schema=output_schema) ) - if not isinstance(res_sdf.schema[self.conf["group_id"]].dataType, StringType): res_sdf = res_sdf.withColumn( self.conf["group_id"], col(self.conf["group_id"]).cast(StringType()) ) - ( res_sdf.withColumn("run_id", lit(self.run_id)) .withColumn("run_date", lit(self.run_date)) .withColumn("use_case", lit(self.conf["use_case_name"])) .withColumn("model", lit(model_conf["name"])) + .withColumn("model_uri", lit("")) .write.mode("append") - .option("mergeSchema", "true") .saveAsTable(self.conf["scoring_output"]) ) @@ -617,47 +638,55 @@ def score_one_model( ) return res_df - def run_scoring_for_global_model(self, model_conf): + def score_global_model(self, model_conf): print(f"Running scoring for {model_conf['name']}...") - best_model = self.get_model_for_scoring(model_conf) - score_df = self.prepare_data(model_conf, "scoring_data", scoring=True) - if model_conf["framework"] == "NeuralForecast": - prediction_df = best_model.forecast(score_df) - else: - prediction_df = best_model.predict(score_df) + champion_model, champion_model_uri = self.get_model_for_scoring(model_conf) + score_df, removed = self.prepare_data_for_global_model(model_conf, "scoring_data") + prediction_df, model_fitted = champion_model.forecast(score_df) if prediction_df[self.conf["date_col"]].dtype.type != np.datetime64: prediction_df[self.conf["date_col"]] = prediction_df[ self.conf["date_col"] ].dt.to_timestamp() - - print(f"prediction generated, saving to {self.conf['scoring_output']}") - spark_df = ( - self.spark.createDataFrame(prediction_df) - .withColumn("model", lit(model_conf["name"])) - .withColumn( - self.conf["target"], col(self.conf["target"]).cast(DoubleType()) - ) - ) + sdf = self.spark.createDataFrame(prediction_df)\ + .drop('index')\ + .withColumn(self.conf["target"], col(self.conf["target"]).cast(DoubleType()))\ + .orderBy(self.conf["group_id"], self.conf["date_col"])\ + .groupBy(self.conf["group_id"])\ + .agg( + collect_list(self.conf["date_col"]).alias(self.conf["date_col"]), + collect_list(self.conf["target"]).alias(self.conf["target"])) ( - spark_df.withColumn("run_id", lit(self.run_id)) + sdf.withColumn("model", lit(model_conf["name"])) + .withColumn("run_id", lit(self.run_id)) .withColumn("run_date", lit(self.run_date)) .withColumn("use_case", lit(self.conf["use_case_name"])) - .withColumn("model", lit(model_conf["name"])) + .withColumn("model_pickle", lit(b"")) + .withColumn("model_uri", lit(champion_model_uri)) .write.mode("append") - .option("mergeSchema", "true") .saveAsTable(self.conf["scoring_output"]) ) - print(f"Finished scoring for {model_conf['name']}...") + + def get_latest_model_version(self, mlflow_client, registered_name): + latest_version = 1 + for mv in mlflow_client.search_model_versions(f"name='{registered_name}'"): + version_int = int(mv.version) + if version_int > latest_version: + latest_version = version_int + return latest_version def get_model_for_scoring(self, model_conf): + mlflow_client = MlflowClient() if model_conf["trainable"]: - mlflow_model_name = f"{self.conf['use_case_name']}_{model_conf['name']}" - best_model = mlflow.sklearn.load_model( - f"models:/{mlflow_model_name}/{self.conf['scoring_model_stage']}" + mlflow_model_name = f"{self.conf['model_output']}.{model_conf['name']}" + champion = mlflow_client.get_model_version_by_alias(mlflow_model_name, "champion") + champion_version = champion.version + champion_model_uri = f"runs:/{champion.run_id}/model" + champion_model = mlflow.sklearn.load_model( + f"models:/{mlflow_model_name}/{champion_version}" ) - return best_model + return champion_model, champion_model_uri else: - return self.model_registry.get_model(model_conf["name"]) + return self.model_registry.get_model(model_conf["name"]), None def flatten_nested_parameters(d): out = {} diff --git a/forecasting_sa/__init__.py b/forecasting_sa/__init__.py index ec378da..00f4192 100644 --- a/forecasting_sa/__init__.py +++ b/forecasting_sa/__init__.py @@ -29,7 +29,8 @@ def run_forecast( resample: bool = False, scoring_data: Union[str, pd.DataFrame, DataFrame] = None, scoring_output: str = None, - metrics_output: str = None, + evaluation_output: str = None, + model_output: str = None, ensemble: bool = None, ensemble_metric: str = None, ensemble_metric_avg: float = None, @@ -106,8 +107,10 @@ def run_forecast( _conf["train_predict_ratio"] = train_predict_ratio if experiment_path is not None: _conf["experiment_path"] = experiment_path - if metrics_output is not None: - _conf["metrics_output"] = metrics_output + if evaluation_output is not None: + _conf["evaluation_output"] = evaluation_output + if model_output is not None: + _conf["model_output"] = model_output if ensemble is not None: _conf["ensemble"] = ensemble if ensemble_metric is not None: diff --git a/forecasting_sa/models/__init__.py b/forecasting_sa/models/__init__.py index 2d630b7..4d9715c 100644 --- a/forecasting_sa/models/__init__.py +++ b/forecasting_sa/models/__init__.py @@ -1,9 +1,7 @@ import importlib import sys import importlib.resources as pkg_resources - from omegaconf import OmegaConf, DictConfig - from forecasting_sa.models.abstract_model import ForecastingRegressor diff --git a/forecasting_sa/models/abstract_model.py b/forecasting_sa/models/abstract_model.py index 753416e..de64bf1 100644 --- a/forecasting_sa/models/abstract_model.py +++ b/forecasting_sa/models/abstract_model.py @@ -25,16 +25,16 @@ def prepare_data(self, df: pd.DataFrame) -> pd.DataFrame: return df @abstractmethod - def fit(self, X, y=None): + def fit(self, x, y=None): pass @abstractmethod - def predict(self, X): + def predict(self, x): # TODO Shouldn't X be optional if we have a trainable model and provide a prediction length pass @abstractmethod - def forecast(self, X): + def forecast(self, x): # TODO Shouldn't X be optional if we have a trainable model and provide a prediction length pass @@ -48,6 +48,7 @@ def backtest( self, df: pd.DataFrame, start: pd.Timestamp, + group_id: Union[str, int] = None, stride: int = None, retrain: bool = True, ) -> pd.DataFrame: @@ -75,26 +76,33 @@ def backtest( < np.datetime64(curr_date + self.prediction_length_offset) )] - #if retrain: - # self.fit(_df) - - metrics = self.calculate_metrics(_df, actuals_df) - metrics_and_date = [ - ( - curr_date, - metrics["metric_name"], - metrics["metric_value"], - metrics["forecast"], - metrics["actual"], - metrics["model_pickle"] - ) - ] - results.extend(metrics_and_date) + if retrain: + self.fit(_df) + + metrics = self.calculate_metrics(_df, actuals_df, curr_date) + + if isinstance(metrics, dict): + evaluation_results = [ + ( + group_id, + metrics["curr_date"], + metrics["metric_name"], + metrics["metric_value"], + metrics["forecast"], + metrics["actual"], + metrics["model_pickle"], + ) + ] + results.extend(evaluation_results) + elif isinstance(metrics, list): + results.extend(metrics) + curr_date += stride_offset res_df = pd.DataFrame( results, - columns=["backtest_window_start_date", + columns=[self.params["group_id"], + "backtest_window_start_date", "metric_name", "metric_value", "forecast", @@ -104,7 +112,7 @@ def backtest( return res_df def calculate_metrics( - self, hist_df: pd.DataFrame, val_df: pd.DataFrame + self, hist_df: pd.DataFrame, val_df: pd.DataFrame, curr_date ) -> Dict[str, Union[str, float, bytes]]: pred_df, model_fitted = self.predict(hist_df, val_df) smape = mean_absolute_percentage_error( @@ -117,8 +125,10 @@ def calculate_metrics( else: raise Exception(f"Metric {self.params['metric']} not supported!") - return {"metric_name": self.params["metric"], - "metric_value": metric_value, - "forecast": pred_df[self.params["target"]].to_numpy(), - "actual": val_df[self.params["target"]].to_numpy(), - "model_pickle": cloudpickle.dumps(model_fitted)} + return { + "curr_date": curr_date, + "metric_name": self.params["metric"], + "metric_value": metric_value, + "forecast": pred_df[self.params["target"]].to_numpy(), + "actual": val_df[self.params["target"]].to_numpy(), + "model_pickle": cloudpickle.dumps(model_fitted)} diff --git a/forecasting_sa/models/models_conf.yaml b/forecasting_sa/models/models_conf.yaml index bbdcded..d84e747 100644 --- a/forecasting_sa/models/models_conf.yaml +++ b/forecasting_sa/models/models_conf.yaml @@ -252,7 +252,7 @@ models: season_length: 7 NeuralForecastRNN: - module: forecasting_sa.models.neuralforecast.NeuralForecastModel + module: forecasting_sa.models.neuralforecast.NeuralForecastPipeline model_class: NeuralFcRNN framework: NeuralForecast model_type: global @@ -273,7 +273,7 @@ models: decoder_layers: 2 NeuralForecastLSTM: - module: forecasting_sa.models.neuralforecast.NeuralForecastModel + module: forecasting_sa.models.neuralforecast.NeuralForecastPipeline model_class: NeuralFcLSTM framework: NeuralForecast model_type: global @@ -293,7 +293,7 @@ models: decoder_layers: 2 NeuralForecastNBEATSx: - module: forecasting_sa.models.neuralforecast.NeuralForecastModel + module: forecasting_sa.models.neuralforecast.NeuralForecastPipeline model_class: NeuralFcNBEATSx framework: NeuralForecast model_type: global @@ -310,7 +310,7 @@ models: dropout_prob_theta: 0.0 NeuralForecastNHITS: - module: forecasting_sa.models.neuralforecast.NeuralForecastModel + module: forecasting_sa.models.neuralforecast.NeuralForecastPipeline model_class: NeuralFcNHITS framework: NeuralForecast model_type: global @@ -331,7 +331,7 @@ models: pooling_mode: MaxPool1d NeuralForecastAutoRNN: - module: forecasting_sa.models.neuralforecast.NeuralForecastModel + module: forecasting_sa.models.neuralforecast.NeuralForecastPipeline model_class: NeuralFcAutoRNN framework: NeuralForecast model_type: global @@ -349,7 +349,7 @@ models: decoder_layers: 2 NeuralForecastAutoLSTM: - module: forecasting_sa.models.neuralforecast.NeuralForecastModel + module: forecasting_sa.models.neuralforecast.NeuralForecastPipeline model_class: NeuralFcAutoLSTM framework: NeuralForecast model_type: global @@ -367,7 +367,7 @@ models: decoder_layers: 2 NeuralForecastAutoNBEATSx: - module: forecasting_sa.models.neuralforecast.NeuralForecastModel + module: forecasting_sa.models.neuralforecast.NeuralForecastPipeline model_class: NeuralFcAutoNBEATSx framework: NeuralForecast model_type: global @@ -383,7 +383,7 @@ models: dropout_prob_theta: 0.0 NeuralForecastAutoNHITS: - module: forecasting_sa.models.neuralforecast.NeuralForecastModel + module: forecasting_sa.models.neuralforecast.NeuralForecastPipeline model_class: NeuralFcAutoNHITS framework: NeuralForecast model_type: global diff --git a/forecasting_sa/models/neuralforecast/NeuralForecastModel.py b/forecasting_sa/models/neuralforecast/NeuralForecastModel.py deleted file mode 100644 index 15f216f..0000000 --- a/forecasting_sa/models/neuralforecast/NeuralForecastModel.py +++ /dev/null @@ -1,340 +0,0 @@ -from neuralforecast import NeuralForecast -from neuralforecast.auto import ( - RNN, - LSTM, - NBEATSx, - NHITS, - AutoRNN, - AutoLSTM, - AutoNBEATSx, - AutoNHITS, - -) -from ray import tune -from ray.tune.search.hyperopt import HyperOptSearch -from neuralforecast.losses.pytorch import ( - MAE, MSE, RMSE, MAPE, SMAPE, MASE, -) - -from forecasting_sa.models.neuralforecast.NeuralForecastPipeline import ( - NeuralFcForecaster, -) - - -def get_loss_function(loss): - if loss == "smape": - return SMAPE() - elif loss == "mae": - return MAE() - elif loss == "mse": - return MSE() - elif loss == "rmse": - return RMSE() - elif loss == "mape": - return MAPE() - elif loss == "mase": - return MASE() - else: - raise Exception( - f"Provided loss {loss} not supported!" - ) - - -class NeuralFcRNN(NeuralFcForecaster): - def __init__(self, params): - super().__init__(params) - self.params = params - self.loss = get_loss_function(self.params.loss) - self.accelerator = 'gpu' if self.params.accelerator == 'gpu' else 'cpu' - self.devices = -1 if self.params.accelerator == 'gpu' else 1 - self.model = NeuralForecast( - models=[ - RNN( - h=self.params.prediction_length, - input_size=self.params.input_size_factor*self.params.prediction_length, - loss=self.loss, - max_steps=self.params.max_steps, - batch_size=self.params.batch_size, - encoder_n_layers=self.params.encoder_n_layers, - encoder_hidden_size=self.params.encoder_hidden_size, - encoder_activation=self.params.encoder_activation, - context_size=self.params.context_size, - decoder_hidden_size=self.params.decoder_hidden_size, - decoder_layers=self.params.decoder_layers, - learning_rate=self.params.learning_rate, - stat_exog_list=list(self.params.get("static_features", [])), - futr_exog_list=list(self.params.get("dynamic_future", [])), - hist_exog_list=list(self.params.get("dynamic_historical", [])), - scaler_type='robust', - accelerator=self.params.accelerator, - devices=self.devices, - ), - ], - freq=self.params["freq"] - ) - - def supports_tuning(self) -> bool: - return False - - -class NeuralFcLSTM(NeuralFcForecaster): - def __init__(self, params): - super().__init__(params) - self.params = params - self.loss = get_loss_function(self.params.loss) - self.accelerator = 'gpu' if self.params.accelerator == 'gpu' else 'cpu' - self.devices = -1 if self.params.accelerator == 'gpu' else 1 - self.model = NeuralForecast( - models=[ - LSTM( - h=self.params.prediction_length, - input_size=self.params.input_size_factor*self.params.prediction_length, - loss=self.loss, - max_steps=self.params.max_steps, - batch_size=self.params.batch_size, - encoder_n_layers=self.params.encoder_n_layers, - encoder_hidden_size=self.params.encoder_hidden_size, - context_size=self.params.context_size, - decoder_hidden_size=self.params.decoder_hidden_size, - decoder_layers=self.params.decoder_layers, - learning_rate=self.params.learning_rate, - stat_exog_list=list(self.params.get("static_features", [])), - futr_exog_list=list(self.params.get("dynamic_future", [])), - hist_exog_list=list(self.params.get("dynamic_historical", [])), - scaler_type='robust', - accelerator=self.params.accelerator, - devices=self.devices, - ), - ], - freq=self.params["freq"] - ) - - def supports_tuning(self) -> bool: - return False - - -class NeuralFcNBEATSx(NeuralFcForecaster): - def __init__(self, params): - super().__init__(params) - self.params = params - self.loss = get_loss_function(self.params.loss) - self.accelerator = 'gpu' if self.params.accelerator == 'gpu' else 'cpu' - self.devices = -1 if self.params.accelerator == 'gpu' else 1 - self.model = NeuralForecast( - models=[ - NBEATSx( - h=self.params.prediction_length, - input_size=self.params.input_size_factor*self.params.prediction_length, - loss=self.loss, - max_steps=self.params.max_steps, - batch_size=self.params.batch_size, - n_harmonics=self.params.n_harmonics, - n_polynomials=self.params.n_polynomials, - dropout_prob_theta=self.params.dropout_prob_theta, - stat_exog_list=list(self.params.get("static_features", [])), - futr_exog_list=list(self.params.get("dynamic_future", [])), - hist_exog_list=list(self.params.get("dynamic_historical", [])), - scaler_type='robust', - accelerator=self.params.accelerator, - devices=self.devices, - ), - ], - freq=self.params["freq"] - ) - - def supports_tuning(self) -> bool: - return False - - -class NeuralFcNHITS(NeuralFcForecaster): - def __init__(self, params): - super().__init__(params) - self.params = params - self.loss = get_loss_function(self.params.loss) - self.accelerator = 'gpu' if self.params.accelerator == 'gpu' else 'cpu' - self.devices = -1 if self.params.accelerator == 'gpu' else 1 - self.model = NeuralForecast( - models=[ - NHITS( - h=self.params.prediction_length, - input_size=self.params.input_size_factor*self.params.prediction_length, - loss=self.loss, - max_steps=self.params.max_steps, - batch_size=self.params.batch_size, - dropout_prob_theta=self.params.dropout_prob_theta, - stack_types=list(self.params.stack_types), - n_blocks=list(self.params.n_blocks), - n_pool_kernel_size=list(self.params.n_pool_kernel_size), - n_freq_downsample=list(self.params.n_freq_downsample), - interpolation_mode=self.params.interpolation_mode, - pooling_mode=self.params.pooling_mode, - stat_exog_list=list(self.params.get("static_features", [])), - futr_exog_list=list(self.params.get("dynamic_future", [])), - hist_exog_list=list(self.params.get("dynamic_historical", [])), - scaler_type='robust', - accelerator=self.params.accelerator, - devices=self.devices, - ), - ], - freq=self.params["freq"] - ) - - def supports_tuning(self) -> bool: - return False - - -class NeuralFcAutoRNN(NeuralFcForecaster): - def __init__(self, params): - super().__init__(params) - self.params = params - self.loss = get_loss_function(self.params.loss) - self.gpus = -1 if self.params.accelerator == 'gpu' else 0 - self.config = dict( - encoder_n_layers=self.params.encoder_n_layers, - encoder_hidden_size=self.params.encoder_hidden_size, - encoder_activation=self.params.encoder_activation, - context_size=self.params.context_size, - decoder_hidden_size=self.params.decoder_hidden_size, - decoder_layers=self.params.decoder_layers, - max_steps=self.params.max_steps, - stat_exog_list=list(self.params.get("static_features", [])), - futr_exog_list=list(self.params.get("dynamic_future", [])), - hist_exog_list=list(self.params.get("dynamic_historical", [])), - scaler_type='robust', - learning_rate=tune.loguniform(1e-5, 1e-1), - batch_size=tune.choice([16, 32]), - - ) - self.model = NeuralForecast( - models=[ - AutoRNN( - h=int(self.params["prediction_length"]), - loss=self.loss, - config=self.config, - gpus=self.gpus, - search_alg=HyperOptSearch(), - num_samples=int(self.params["num_samples"]), - ), - ], - freq=self.params["freq"] - ) - - def supports_tuning(self) -> bool: - return True - - -class NeuralFcAutoLSTM(NeuralFcForecaster): - def __init__(self, params): - super().__init__(params) - self.params = params - self.loss = get_loss_function(self.params.loss) - self.gpus = -1 if self.params.accelerator == 'gpu' else 0 - self.config = dict( - encoder_n_layers=self.params.encoder_n_layers, - encoder_hidden_size=self.params.encoder_hidden_size, - encoder_activation=self.params.encoder_activation, - context_size=self.params.context_size, - decoder_hidden_size=self.params.decoder_hidden_size, - decoder_layers=self.params.decoder_layers, - max_steps=self.params.max_steps, - stat_exog_list=list(self.params.get("static_features", [])), - futr_exog_list=list(self.params.get("dynamic_future", [])), - hist_exog_list=list(self.params.get("dynamic_historical", [])), - scaler_type='robust', - learning_rate=tune.loguniform(1e-5, 1e-1), - batch_size=tune.choice([16, 32]), - ) - self.model = NeuralForecast( - models=[ - AutoLSTM( - h=int(self.params["prediction_length"]), - loss=self.loss, - config=self.config, - gpus=self.gpus, - search_alg=HyperOptSearch(), - num_samples=int(self.params["num_samples"]), - ), - ], - freq=self.params["freq"] - ) - - def supports_tuning(self) -> bool: - return True - - -class NeuralFcAutoNBEATSx(NeuralFcForecaster): - def __init__(self, params): - super().__init__(params) - self.params = params - self.loss = get_loss_function(self.params.loss) - self.gpus = -1 if self.params.accelerator == 'gpu' else 0 - self.config = dict( - input_size=self.params.input_size_factor * self.params.prediction_length, - n_harmonics=self.params.n_harmonics, - n_polynomials=self.params.n_polynomials, - dropout_prob_theta=self.params.dropout_prob_theta, - max_steps=self.params.max_steps, - stat_exog_list=list(self.params.get("static_features", [])), - futr_exog_list=list(self.params.get("dynamic_future", [])), - hist_exog_list=list(self.params.get("dynamic_historical", [])), - scaler_type='robust', - learning_rate=tune.loguniform(1e-5, 1e-1), - batch_size=tune.choice([16, 32]), - ) - self.model = NeuralForecast( - models=[ - AutoNBEATSx( - h=int(self.params["prediction_length"]), - loss=self.loss, - config=self.config, - gpus=self.gpus, - search_alg=HyperOptSearch(), - num_samples=int(self.params["num_samples"]), - ), - ], - freq=self.params["freq"] - ) - - def supports_tuning(self) -> bool: - return True - - -class NeuralFcAutoNHITS(NeuralFcForecaster): - def __init__(self, params): - super().__init__(params) - self.params = params - self.loss = get_loss_function(self.params.loss) - self.gpus = -1 if self.params.accelerator == 'gpu' else 0 - self.config = dict( - input_size=self.params.input_size_factor * self.params.prediction_length, - dropout_prob_theta=self.params.dropout_prob_theta, - stack_types=list(self.params.stack_types), - n_blocks=list(self.params.n_blocks), - n_pool_kernel_size=list(self.params.n_pool_kernel_size), - n_freq_downsample=list(self.params.n_freq_downsample), - interpolation_mode=self.params.interpolation_mode, - pooling_mode=self.params.pooling_mode, - max_steps=self.params.max_steps, - stat_exog_list=list(self.params.get("static_features", [])), - futr_exog_list=list(self.params.get("dynamic_future", [])), - hist_exog_list=list(self.params.get("dynamic_historical", [])), - scaler_type='robust', - learning_rate=tune.loguniform(1e-5, 1e-1), - batch_size=tune.choice([16, 32]), - ) - self.model = NeuralForecast( - models=[ - AutoNHITS( - h=int(self.params["prediction_length"]), - loss=self.loss, - config=self.config, - gpus=self.gpus, - search_alg=HyperOptSearch(), - num_samples=int(self.params["num_samples"]), - ), - ], - freq=self.params["freq"] - ) - - def supports_tuning(self) -> bool: - return True diff --git a/forecasting_sa/models/neuralforecast/NeuralForecastPipeline.py b/forecasting_sa/models/neuralforecast/NeuralForecastPipeline.py index edfbdfc..a2d939b 100644 --- a/forecasting_sa/models/neuralforecast/NeuralForecastPipeline.py +++ b/forecasting_sa/models/neuralforecast/NeuralForecastPipeline.py @@ -1,9 +1,24 @@ import pandas as pd import numpy as np -from typing import Dict, Any, Union from sktime.performance_metrics.forecasting import mean_absolute_percentage_error from neuralforecast import NeuralForecast from forecasting_sa.models.abstract_model import ForecastingRegressor +from neuralforecast.auto import ( + RNN, + LSTM, + NBEATSx, + NHITS, + AutoRNN, + AutoLSTM, + AutoNBEATSx, + AutoNHITS, + +) +from ray import tune +from ray.tune.search.hyperopt import HyperOptSearch +from neuralforecast.losses.pytorch import ( + MAE, MSE, RMSE, MAPE, SMAPE, MASE, +) class NeuralFcForecaster(ForecastingRegressor): @@ -26,7 +41,6 @@ def prepare_data(self, df: pd.DataFrame, future: bool = False) -> pd.DataFrame: raise Exception(f"Dynamic future regressors missing: {e}") else: _df = df[[self.params.group_id, self.params.date_col]] - _df = ( _df.rename( columns={ @@ -89,15 +103,11 @@ def prepare_static_features(self, df: pd.DataFrame) -> pd.DataFrame: else: return None - def fit(self, X, y=None): + def fit(self, x, y=None): if isinstance(self.model, NeuralForecast): - _df = self.prepare_data(X) - _static_df = self.prepare_static_features(X) - self.model.fit( - df=_df, - static_df=_static_df, - ) - return self + _df = self.prepare_data(x) + _static_df = self.prepare_static_features(x) + self.model.fit(df=_df, static_df=_static_df) def predict(self, hist_df: pd.DataFrame, val_df: pd.DataFrame = None): _df = self.prepare_data(hist_df) @@ -109,16 +119,18 @@ def predict(self, hist_df: pd.DataFrame, val_df: pd.DataFrame = None): static_df=_static_df, futr_df=_dynamic_future ) - first_model = [col for col in forecast_df.columns.to_list() if col != "ds"][0] + target = [col for col in forecast_df.columns.to_list() + if col not in ["unique_id", "ds"]][0] forecast_df = forecast_df.reset_index(drop=False).rename( columns={ "unique_id": self.params.group_id, "ds": self.params.date_col, - first_model: self.params.target, + target: self.params.target, } ) forecast_df[self.params.target] = forecast_df[self.params.target].clip(0.01) - return forecast_df + + return forecast_df, self.model def forecast(self, df: pd.DataFrame): _df = df[df[self.params.target].notnull()] @@ -146,44 +158,364 @@ def forecast(self, df: pd.DataFrame): static_df=_static_df, futr_df=_dynamic_future ) - first_model = [col for col in forecast_df.columns.to_list() if col != "ds"][0] + target = [col for col in forecast_df.columns.to_list() + if col not in ["unique_id", "ds"]][0] forecast_df = forecast_df.reset_index(drop=False).rename( columns={ "unique_id": self.params.group_id, "ds": self.params.date_col, - first_model: self.params.target, + target: self.params.target, } ) forecast_df[self.params.target] = forecast_df[self.params.target].clip(0.01) - return forecast_df + return forecast_df, self.model def calculate_metrics( - self, hist_df: pd.DataFrame, val_df: pd.DataFrame - ) -> Dict[str, Union[str, float, bytes]]: - pred_df = self.predict(hist_df, val_df) + self, hist_df: pd.DataFrame, val_df: pd.DataFrame, curr_date + ) -> list: + pred_df, model_fitted = self.predict(hist_df, val_df) keys = pred_df[self.params["group_id"]].unique() metrics = [] + if self.params["metric"] == "smape": + metric_name = "smape" + else: + raise Exception(f"Metric {self.params['metric']} not supported!") for key in keys: - forecast = val_df[val_df[self.params["group_id"]] == key][self.params["target"]] - actual = pred_df[pred_df[self.params["group_id"]] == key][self.params["target"]].\ + actual = val_df[val_df[self.params["group_id"]] == key][self.params["target"]] + forecast = pred_df[pred_df[self.params["group_id"]] == key][self.params["target"]].\ iloc[-self.params["prediction_length"]:] try: - smape = mean_absolute_percentage_error( - actual, - forecast, - symmetric=True, - ) - metrics.append(smape) + if metric_name == "smape": + metric_value = mean_absolute_percentage_error(actual, forecast, symmetric=True) + metrics.extend( + [( + key, + curr_date, + metric_name, + metric_value, + actual.to_numpy(), + forecast.to_numpy(), + b'', + )]) except: pass - smape = sum(metrics) / len(metrics) - print("finished calculate_metrics") - if self.params["metric"] == "smape": - metric_value = smape - else: - raise Exception(f"Metric {self.params['metric']} not supported!") + return metrics + + +class NeuralFcRNN(NeuralFcForecaster): + def __init__(self, params): + super().__init__(params) + self.params = params + self.loss = get_loss_function(self.params.loss) + self.accelerator = 'gpu' if self.params.accelerator == 'gpu' else 'cpu' + self.devices = -1 if self.params.accelerator == 'gpu' else 1 + self.model = NeuralForecast( + models=[ + RNN( + h=self.params.prediction_length, + input_size=self.params.input_size_factor*self.params.prediction_length, + loss=self.loss, + max_steps=self.params.max_steps, + batch_size=self.params.batch_size, + encoder_n_layers=self.params.encoder_n_layers, + encoder_hidden_size=self.params.encoder_hidden_size, + encoder_activation=self.params.encoder_activation, + context_size=self.params.context_size, + decoder_hidden_size=self.params.decoder_hidden_size, + decoder_layers=self.params.decoder_layers, + learning_rate=self.params.learning_rate, + stat_exog_list=list(self.params.get("static_features", [])), + futr_exog_list=list(self.params.get("dynamic_future", [])), + hist_exog_list=list(self.params.get("dynamic_historical", [])), + scaler_type='robust', + accelerator=self.params.accelerator, + devices=self.devices, + ), + ], + freq=self.params["freq"] + ) + + def supports_tuning(self) -> bool: + return False + + +class NeuralFcLSTM(NeuralFcForecaster): + def __init__(self, params): + super().__init__(params) + self.params = params + self.loss = get_loss_function(self.params.loss) + self.accelerator = 'gpu' if self.params.accelerator == 'gpu' else 'cpu' + self.devices = -1 if self.params.accelerator == 'gpu' else 1 + self.model = NeuralForecast( + models=[ + LSTM( + h=self.params.prediction_length, + input_size=self.params.input_size_factor*self.params.prediction_length, + loss=self.loss, + max_steps=self.params.max_steps, + batch_size=self.params.batch_size, + encoder_n_layers=self.params.encoder_n_layers, + encoder_hidden_size=self.params.encoder_hidden_size, + context_size=self.params.context_size, + decoder_hidden_size=self.params.decoder_hidden_size, + decoder_layers=self.params.decoder_layers, + learning_rate=self.params.learning_rate, + stat_exog_list=list(self.params.get("static_features", [])), + futr_exog_list=list(self.params.get("dynamic_future", [])), + hist_exog_list=list(self.params.get("dynamic_historical", [])), + scaler_type='robust', + accelerator=self.params.accelerator, + devices=self.devices, + ), + ], + freq=self.params["freq"] + ) + + def supports_tuning(self) -> bool: + return False + + +class NeuralFcNBEATSx(NeuralFcForecaster): + def __init__(self, params): + super().__init__(params) + self.params = params + self.loss = get_loss_function(self.params.loss) + self.accelerator = 'gpu' if self.params.accelerator == 'gpu' else 'cpu' + self.devices = -1 if self.params.accelerator == 'gpu' else 1 + self.model = NeuralForecast( + models=[ + NBEATSx( + h=self.params.prediction_length, + input_size=self.params.input_size_factor*self.params.prediction_length, + loss=self.loss, + max_steps=self.params.max_steps, + batch_size=self.params.batch_size, + n_harmonics=self.params.n_harmonics, + n_polynomials=self.params.n_polynomials, + dropout_prob_theta=self.params.dropout_prob_theta, + stat_exog_list=list(self.params.get("static_features", [])), + futr_exog_list=list(self.params.get("dynamic_future", [])), + hist_exog_list=list(self.params.get("dynamic_historical", [])), + scaler_type='robust', + accelerator=self.params.accelerator, + devices=self.devices, + ), + ], + freq=self.params["freq"] + ) + + def supports_tuning(self) -> bool: + return False + + +class NeuralFcNHITS(NeuralFcForecaster): + def __init__(self, params): + super().__init__(params) + self.params = params + self.loss = get_loss_function(self.params.loss) + self.accelerator = 'gpu' if self.params.accelerator == 'gpu' else 'cpu' + self.devices = -1 if self.params.accelerator == 'gpu' else 1 + self.model = NeuralForecast( + models=[ + NHITS( + h=self.params.prediction_length, + input_size=self.params.input_size_factor*self.params.prediction_length, + loss=self.loss, + max_steps=self.params.max_steps, + batch_size=self.params.batch_size, + dropout_prob_theta=self.params.dropout_prob_theta, + stack_types=list(self.params.stack_types), + n_blocks=list(self.params.n_blocks), + n_pool_kernel_size=list(self.params.n_pool_kernel_size), + n_freq_downsample=list(self.params.n_freq_downsample), + interpolation_mode=self.params.interpolation_mode, + pooling_mode=self.params.pooling_mode, + stat_exog_list=list(self.params.get("static_features", [])), + futr_exog_list=list(self.params.get("dynamic_future", [])), + hist_exog_list=list(self.params.get("dynamic_historical", [])), + scaler_type='robust', + accelerator=self.params.accelerator, + devices=self.devices, + ), + ], + freq=self.params["freq"] + ) + + def supports_tuning(self) -> bool: + return False + + +class NeuralFcAutoRNN(NeuralFcForecaster): + def __init__(self, params): + super().__init__(params) + self.params = params + self.loss = get_loss_function(self.params.loss) + self.gpus = -1 if self.params.accelerator == 'gpu' else 0 + self.config = dict( + encoder_n_layers=self.params.encoder_n_layers, + encoder_hidden_size=self.params.encoder_hidden_size, + encoder_activation=self.params.encoder_activation, + context_size=self.params.context_size, + decoder_hidden_size=self.params.decoder_hidden_size, + decoder_layers=self.params.decoder_layers, + max_steps=self.params.max_steps, + stat_exog_list=list(self.params.get("static_features", [])), + futr_exog_list=list(self.params.get("dynamic_future", [])), + hist_exog_list=list(self.params.get("dynamic_historical", [])), + scaler_type='robust', + learning_rate=tune.loguniform(1e-5, 1e-1), + batch_size=tune.choice([16, 32]), + + ) + self.model = NeuralForecast( + models=[ + AutoRNN( + h=int(self.params["prediction_length"]), + loss=self.loss, + config=self.config, + gpus=self.gpus, + search_alg=HyperOptSearch(), + num_samples=int(self.params["num_samples"]), + ), + ], + freq=self.params["freq"] + ) + + def supports_tuning(self) -> bool: + return True + + +class NeuralFcAutoLSTM(NeuralFcForecaster): + def __init__(self, params): + super().__init__(params) + self.params = params + self.loss = get_loss_function(self.params.loss) + self.gpus = -1 if self.params.accelerator == 'gpu' else 0 + self.config = dict( + encoder_n_layers=self.params.encoder_n_layers, + encoder_hidden_size=self.params.encoder_hidden_size, + encoder_activation=self.params.encoder_activation, + context_size=self.params.context_size, + decoder_hidden_size=self.params.decoder_hidden_size, + decoder_layers=self.params.decoder_layers, + max_steps=self.params.max_steps, + stat_exog_list=list(self.params.get("static_features", [])), + futr_exog_list=list(self.params.get("dynamic_future", [])), + hist_exog_list=list(self.params.get("dynamic_historical", [])), + scaler_type='robust', + learning_rate=tune.loguniform(1e-5, 1e-1), + batch_size=tune.choice([16, 32]), + ) + self.model = NeuralForecast( + models=[ + AutoLSTM( + h=int(self.params["prediction_length"]), + loss=self.loss, + config=self.config, + gpus=self.gpus, + search_alg=HyperOptSearch(), + num_samples=int(self.params["num_samples"]), + ), + ], + freq=self.params["freq"] + ) + + def supports_tuning(self) -> bool: + return True - return {"metric_name": self.params["metric"], - "metric_value": metric_value, - "forecast": None, - "actual": None} \ No newline at end of file + +class NeuralFcAutoNBEATSx(NeuralFcForecaster): + def __init__(self, params): + super().__init__(params) + self.params = params + self.loss = get_loss_function(self.params.loss) + self.gpus = -1 if self.params.accelerator == 'gpu' else 0 + self.config = dict( + input_size=self.params.input_size_factor * self.params.prediction_length, + n_harmonics=self.params.n_harmonics, + n_polynomials=self.params.n_polynomials, + dropout_prob_theta=self.params.dropout_prob_theta, + max_steps=self.params.max_steps, + stat_exog_list=list(self.params.get("static_features", [])), + futr_exog_list=list(self.params.get("dynamic_future", [])), + hist_exog_list=list(self.params.get("dynamic_historical", [])), + scaler_type='robust', + learning_rate=tune.loguniform(1e-5, 1e-1), + batch_size=tune.choice([16, 32]), + ) + self.model = NeuralForecast( + models=[ + AutoNBEATSx( + h=int(self.params["prediction_length"]), + loss=self.loss, + config=self.config, + gpus=self.gpus, + search_alg=HyperOptSearch(), + num_samples=int(self.params["num_samples"]), + ), + ], + freq=self.params["freq"] + ) + + def supports_tuning(self) -> bool: + return True + + +class NeuralFcAutoNHITS(NeuralFcForecaster): + def __init__(self, params): + super().__init__(params) + self.params = params + self.loss = get_loss_function(self.params.loss) + self.gpus = -1 if self.params.accelerator == 'gpu' else 0 + self.config = dict( + input_size=self.params.input_size_factor * self.params.prediction_length, + dropout_prob_theta=self.params.dropout_prob_theta, + stack_types=list(self.params.stack_types), + n_blocks=list(self.params.n_blocks), + n_pool_kernel_size=list(self.params.n_pool_kernel_size), + n_freq_downsample=list(self.params.n_freq_downsample), + interpolation_mode=self.params.interpolation_mode, + pooling_mode=self.params.pooling_mode, + max_steps=self.params.max_steps, + stat_exog_list=list(self.params.get("static_features", [])), + futr_exog_list=list(self.params.get("dynamic_future", [])), + hist_exog_list=list(self.params.get("dynamic_historical", [])), + scaler_type='robust', + learning_rate=tune.loguniform(1e-5, 1e-1), + batch_size=tune.choice([16, 32]), + ) + self.model = NeuralForecast( + models=[ + AutoNHITS( + h=int(self.params["prediction_length"]), + loss=self.loss, + config=self.config, + gpus=self.gpus, + search_alg=HyperOptSearch(), + num_samples=int(self.params["num_samples"]), + ), + ], + freq=self.params["freq"] + ) + + def supports_tuning(self) -> bool: + return True + + +def get_loss_function(loss): + if loss == "smape": + return SMAPE() + elif loss == "mae": + return MAE() + elif loss == "mse": + return MSE() + elif loss == "rmse": + return RMSE() + elif loss == "mape": + return MAPE() + elif loss == "mase": + return MASE() + else: + raise Exception( + f"Provided loss {loss} not supported!" + ) diff --git a/requirements.txt b/requirements.txt index fef3a1e..0d0a096 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,7 +2,6 @@ rpy2==3.5.16 kaleido==0.2.1 Jinja2 omegaconf==2.3.0 -protobuf==3.20.2 statsforecast==1.7.4 neuralforecast==1.7.2 missingno==0.5.2 diff --git a/tests/integration_test.py b/tests/integration_test.py index 53876ec..ef337cc 100644 --- a/tests/integration_test.py +++ b/tests/integration_test.py @@ -104,7 +104,7 @@ def create_m4_df(): train_data="mmf_train", scoring_data="mmf_train", scoring_output=f"{catalog}.{db}.daily_scoring_output", - metrics_output=f"{catalog}.{db}.daily_metrics_output", + evaluation_output=f"{catalog}.{db}.daily_evaluation_output", group_id="unique_id", date_col="ds", target="y", @@ -127,7 +127,7 @@ def create_m4_df(): # COMMAND ---------- -# MAGIC %sql drop table main.mmf.daily_metrics_output +# MAGIC %sql drop table main.mmf.daily_evaluation_output # COMMAND ----------