diff --git a/README.md b/README.md index 422c4af..b3f3c39 100644 --- a/README.md +++ b/README.md @@ -93,7 +93,7 @@ run_forecast( - ```prediction_length``` is your forecasting horizon in the number of steps. - ```backtest_months``` specifies how many previous months you use for backtesting. - ```stride``` is the number of steps in which you update your backtesting trial start date when going from one trial to the next. -- ```metric``` is the metric to log in the evaluation table and MLFlow. Supported metrics are mape and smape. Default is smape. +- ```metric``` is the metric to log in the evaluation table and MLFlow. Supported metrics are mae, mse, rmse, mape and smape. Default is smape. - ```train_predict_ratio``` specifies the minimum length required for your training dataset with respect to ```prediction_length```. If ```train_predict_ratio```=2, you need to have training dataset that is at least twice as long as ```prediciton_length```. - ```data_quality_check``` checks the quality of the input data if set to True (default False). See [data_quality_checks.py](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/mmf_sa/data_quality_checks.py) for the full details of the checks. - ```resample``` backfills skipped entries with 0 if set to True. Only relevant when data_quality_check is True. Default is False. If data_quality_check is True and resample is False, the check removes all time series with skipped dates. diff --git a/examples/foundation-model-examples/chronos-example.py b/examples/foundation-model-examples/chronos-example.py index 1be2d0e..5926566 100644 --- a/examples/foundation-model-examples/chronos-example.py +++ b/examples/foundation-model-examples/chronos-example.py @@ -24,6 +24,7 @@ # COMMAND ---------- # This cell will create tables: {catalog}.{db}.m4_daily_train, {catalog}.{db}.m4_monthly_train, {catalog}.{db}.rossmann_daily_train, {catalog}.{db}.rossmann_daily_test + dbutils.notebook.run("data_preparation", timeout_seconds=0, arguments={"catalog": catalog, "db": db, "n": n}) # COMMAND ---------- diff --git a/examples/foundation_daily.py b/examples/foundation_daily.py index be38b21..3217f10 100644 --- a/examples/foundation_daily.py +++ b/examples/foundation_daily.py @@ -45,7 +45,7 @@ # COMMAND ---------- # Number of time series -n = 1000 +n = 100 def create_m4_daily(): @@ -80,9 +80,9 @@ def transform_group(df): # COMMAND ---------- -catalog = "mmf" # Name of the catalog we use to manage our assets -db = "m4" # Name of the schema we use to manage our assets (e.g. datasets) -user = spark.sql('select current_user() as user').collect()[0]['user'] # User email address +catalog = "mmf" # Name of the catalog we use to manage our assets +db = "m4" # Name of the schema we use to manage our assets (e.g. datasets) +user = spark.sql('select current_user() as user').collect()[0]['user'] # User email address # COMMAND ---------- diff --git a/examples/foundation_monthly.py b/examples/foundation_monthly.py index 16c2dd5..e2b64f7 100644 --- a/examples/foundation_monthly.py +++ b/examples/foundation_monthly.py @@ -45,7 +45,7 @@ # COMMAND ---------- # Number of time series -n = 1000 +n = 100 def create_m4_monthly(): diff --git a/examples/global_daily.py b/examples/global_daily.py index 9b9dd5c..a6140be 100644 --- a/examples/global_daily.py +++ b/examples/global_daily.py @@ -46,7 +46,7 @@ # COMMAND ---------- # Number of time series -n = 1000 +n = 100 def create_m4_daily(): @@ -82,9 +82,9 @@ def transform_group(df): # COMMAND ---------- -catalog = "mmf" # Name of the catalog we use to manage our assets -db = "m4" # Name of the schema we use to manage our assets (e.g. datasets) -user = spark.sql('select current_user() as user').collect()[0]['user'] # User email address +catalog = "mmf" # Name of the catalog we use to manage our assets +db = "m4" # Name of the schema we use to manage our assets (e.g. datasets) +user = spark.sql('select current_user() as user').collect()[0]['user'] # User email address # COMMAND ---------- diff --git a/examples/global_external_regressors_daily.py b/examples/global_external_regressors_daily.py index abab248..5ccd630 100644 --- a/examples/global_external_regressors_daily.py +++ b/examples/global_external_regressors_daily.py @@ -66,7 +66,7 @@ # Number of time series to sample sample = True -size = 1000 +size = 100 stores = sorted(random.sample(range(0, 1000), size)) train = spark.read.csv(f"/Volumes/{catalog}/{db}/{volume}/train.csv", header=True, inferSchema=True) diff --git a/examples/global_monthly.py b/examples/global_monthly.py index dafe06b..1fcfc54 100644 --- a/examples/global_monthly.py +++ b/examples/global_monthly.py @@ -45,7 +45,7 @@ # COMMAND ---------- # Number of time series -n = 1000 +n = 100 def create_m4_monthly(): diff --git a/examples/local_univariate_daily.py b/examples/local_univariate_daily.py index 29f1da5..3d4990c 100644 --- a/examples/local_univariate_daily.py +++ b/examples/local_univariate_daily.py @@ -51,7 +51,7 @@ # COMMAND ---------- # Number of time series -n = 1000 +n = 100 def create_m4_daily(): diff --git a/examples/local_univariate_external_regressors_daily.py b/examples/local_univariate_external_regressors_daily.py index 8297d5a..6c18072 100644 --- a/examples/local_univariate_external_regressors_daily.py +++ b/examples/local_univariate_external_regressors_daily.py @@ -58,7 +58,7 @@ # Number of time series to sample sample = True -size = 1000 +size = 100 stores = sorted(random.sample(range(0, 1000), size)) train = spark.read.csv(f"/Volumes/{catalog}/{db}/{volume}/train.csv", header=True, inferSchema=True) diff --git a/examples/local_univariate_monthly.py b/examples/local_univariate_monthly.py index 5e1866a..6538eb3 100644 --- a/examples/local_univariate_monthly.py +++ b/examples/local_univariate_monthly.py @@ -52,7 +52,7 @@ # COMMAND ---------- # Number of time series -n = 1000 +n = 100 def create_m4_monthly(): diff --git a/examples/m5-examples/foundation_daily_m5.py b/examples/m5-examples/foundation_daily_m5.py index 591d6eb..56a1446 100644 --- a/examples/m5-examples/foundation_daily_m5.py +++ b/examples/m5-examples/foundation_daily_m5.py @@ -20,7 +20,7 @@ catalog = "mmf" # Name of the catalog we use to manage our assets db = "m5" # Name of the schema we use to manage our assets (e.g. datasets) -n = 1000 # Number of items: choose from [100, 1000, 10000, 'full']. full is 35k +n = 100 # Number of items: choose from [100, 1000, 10000, 'full']. full is 35k table = f"daily_train_{n}" # Training table name user = spark.sql('select current_user() as user').collect()[0]['user'] # User email diff --git a/examples/m5-examples/global_daily_m5.py b/examples/m5-examples/global_daily_m5.py index 8863a2f..9e3c8cd 100644 --- a/examples/m5-examples/global_daily_m5.py +++ b/examples/m5-examples/global_daily_m5.py @@ -21,7 +21,7 @@ catalog = "mmf" # Name of the catalog we use to manage our assets db = "m5" # Name of the schema we use to manage our assets (e.g. datasets) -n = 1000 # Number of items: choose from [100, 1000, 10000, 'full']. full is 35k +n = 100 # Number of items: choose from [100, 1000, 10000, 'full']. full is 35k table = f"daily_train_{n}" # Training table name user = spark.sql('select current_user() as user').collect()[0]['user'] # User email diff --git a/examples/m5-examples/local_univariate_daily_m5.py b/examples/m5-examples/local_univariate_daily_m5.py index 41b3e47..30fed6c 100644 --- a/examples/m5-examples/local_univariate_daily_m5.py +++ b/examples/m5-examples/local_univariate_daily_m5.py @@ -22,7 +22,7 @@ db = "m5" # Name of the schema we use to manage our assets (e.g. datasets) user = spark.sql('select current_user() as user').collect()[0]['user'] # User email address -n = 1000 # Number of items: choose from [1000, 10000, 'full']. full is 35k +n = 100 # Number of items: choose from [1000, 10000, 'full']. full is 35k taining_table = f"daily_train_{n}" # COMMAND ---------- diff --git a/mmf_sa/__init__.py b/mmf_sa/__init__.py index 52438ee..348bb4a 100644 --- a/mmf_sa/__init__.py +++ b/mmf_sa/__init__.py @@ -56,7 +56,7 @@ def run_forecast( prediction_length (int): An integer specifying the prediction length: i.e. forecasting horizon. backtest_months (int): An integer specifying the number of backtest months. stride (int): An integer specifying the stride length. - metric (str): A string specifying the metric to use for evaluation. Supported metrics are mape and smape. Default is smape. + metric (str): A string specifying the metric to use for evaluation. Supported metrics are mae, mse, rmse, mape and smape. Default is smape. scoring_data (Union[str, pd.DataFrame, DataFrame]): Scoring data as a string of delta table name, pandas DataFrame, or Spark DataFrame. scoring_output (str): A string specifying the output table name for scoring. evaluation_output (str): A string specifying the output table name for evaluation. diff --git a/mmf_sa/forecasting_conf.yaml b/mmf_sa/forecasting_conf.yaml index d594c9c..33f4ee1 100644 --- a/mmf_sa/forecasting_conf.yaml +++ b/mmf_sa/forecasting_conf.yaml @@ -1,10 +1,12 @@ -temp_path: /Volumes/solacc_uc/mmf/partitions +freq: D + metric: smape -backtest_retrain: false -accelerator: cpu + resample: false -train_predict_ratio: 4 -freq: D + +train_predict_ratio: 1 + +accelerator: cpu static_features: #- State @@ -132,4 +134,6 @@ models: - model: name: RFableNNETAR model_spec: - season_length: 7 \ No newline at end of file + season_length: 7 + +temp_path: /Volumes/mmf/tmp/partitions \ No newline at end of file diff --git a/mmf_sa/models/abstract_model.py b/mmf_sa/models/abstract_model.py index 0264e2b..1b20791 100644 --- a/mmf_sa/models/abstract_model.py +++ b/mmf_sa/models/abstract_model.py @@ -4,7 +4,11 @@ import cloudpickle from typing import Dict, Union from sklearn.base import BaseEstimator, RegressorMixin -from sktime.performance_metrics.forecasting import mean_absolute_percentage_error +from sktime.performance_metrics.forecasting import ( + MeanAbsoluteError, + MeanSquaredError, + MeanAbsolutePercentageError, +) import mlflow mlflow.set_registry_uri("databricks-uc") @@ -135,16 +139,34 @@ def calculate_metrics( pred_df, model_fitted = self.predict(hist_df, val_df) if self.params["metric"] == "smape": - metric_value = mean_absolute_percentage_error( + smape = MeanAbsolutePercentageError(symmetric=True) + metric_value = smape( val_df[self.params["target"]], pred_df[self.params["target"]], - symmetric=True, ) elif self.params["metric"] == "mape": - metric_value = mean_absolute_percentage_error( + mape = MeanAbsolutePercentageError(symmetric=False) + metric_value = mape( + val_df[self.params["target"]], + pred_df[self.params["target"]], + ) + elif self.params["metric"] == "mae": + mae = MeanAbsoluteError() + metric_value = mae( + val_df[self.params["target"]], + pred_df[self.params["target"]], + ) + elif self.params["metric"] == "mse": + mse = MeanSquaredError(square_root=False) + metric_value = mse( + val_df[self.params["target"]], + pred_df[self.params["target"]], + ) + elif self.params["metric"] == "rmse": + rmse = MeanSquaredError(square_root=True) + metric_value = rmse( val_df[self.params["target"]], pred_df[self.params["target"]], - symmetric=False, ) else: raise Exception(f"Metric {self.params['metric']} not supported!") @@ -156,3 +178,4 @@ def calculate_metrics( "forecast": pred_df[self.params["target"]].to_numpy("float"), "actual": val_df[self.params["target"]].to_numpy(), "model_pickle": cloudpickle.dumps(model_fitted)} + diff --git a/mmf_sa/models/chronosforecast/ChronosPipeline.py b/mmf_sa/models/chronosforecast/ChronosPipeline.py index 1b79df9..b67db2c 100644 --- a/mmf_sa/models/chronosforecast/ChronosPipeline.py +++ b/mmf_sa/models/chronosforecast/ChronosPipeline.py @@ -7,7 +7,11 @@ import mlflow from mlflow.types import Schema, TensorSpec from mlflow.models.signature import ModelSignature -from sktime.performance_metrics.forecasting import mean_absolute_percentage_error +from sktime.performance_metrics.forecasting import ( + MeanAbsoluteError, + MeanSquaredError, + MeanAbsolutePercentageError, +) from typing import Iterator from pyspark.sql.functions import collect_list, pandas_udf from pyspark.sql import DataFrame @@ -117,6 +121,12 @@ def calculate_metrics( metric_name = "smape" elif self.params["metric"] == "mape": metric_name = "mape" + elif self.params["metric"] == "mae": + metric_name = "mae" + elif self.params["metric"] == "mse": + metric_name = "mse" + elif self.params["metric"] == "rmse": + metric_name = "rmse" else: raise Exception(f"Metric {self.params['metric']} not supported!") for key in keys: @@ -124,9 +134,20 @@ def calculate_metrics( forecast = pred_df[pred_df[self.params["group_id"]] == key][self.params["target"]].to_numpy()[0] try: if metric_name == "smape": - metric_value = mean_absolute_percentage_error(actual, forecast, symmetric=True) + smape = MeanAbsolutePercentageError(symmetric=True) + metric_value = smape(actual, forecast) elif metric_name == "mape": - metric_value = mean_absolute_percentage_error(actual, forecast, symmetric=False) + mape = MeanAbsolutePercentageError(symmetric=False) + metric_value = mape(actual, forecast) + elif metric_name == "mae": + mae = MeanAbsoluteError() + metric_value = mae(actual, forecast) + elif metric_name == "mse": + mse = MeanSquaredError(square_root=False) + metric_value = mse(actual, forecast) + elif metric_name == "rmse": + rmse = MeanSquaredError(square_root=True) + metric_value = rmse(actual, forecast) metrics.extend( [( key, diff --git a/mmf_sa/models/moiraiforecast/MoiraiPipeline.py b/mmf_sa/models/moiraiforecast/MoiraiPipeline.py index 59860ee..e4f5661 100644 --- a/mmf_sa/models/moiraiforecast/MoiraiPipeline.py +++ b/mmf_sa/models/moiraiforecast/MoiraiPipeline.py @@ -7,7 +7,11 @@ import mlflow from mlflow.types import Schema, TensorSpec from mlflow.models.signature import ModelSignature -from sktime.performance_metrics.forecasting import mean_absolute_percentage_error +from sktime.performance_metrics.forecasting import ( + MeanAbsoluteError, + MeanSquaredError, + MeanAbsolutePercentageError, +) from typing import Iterator from pyspark.sql.functions import collect_list, pandas_udf from pyspark.sql import DataFrame @@ -117,6 +121,12 @@ def calculate_metrics( metric_name = "smape" elif self.params["metric"] == "mape": metric_name = "mape" + elif self.params["metric"] == "mae": + metric_name = "mae" + elif self.params["metric"] == "mse": + metric_name = "mse" + elif self.params["metric"] == "rmse": + metric_name = "rmse" else: raise Exception(f"Metric {self.params['metric']} not supported!") for key in keys: @@ -124,9 +134,20 @@ def calculate_metrics( forecast = pred_df[pred_df[self.params["group_id"]] == key][self.params["target"]].to_numpy()[0] try: if metric_name == "smape": - metric_value = mean_absolute_percentage_error(actual, forecast, symmetric=True) + smape = MeanAbsolutePercentageError(symmetric=True) + metric_value = smape(actual, forecast) elif metric_name == "mape": - metric_value = mean_absolute_percentage_error(actual, forecast, symmetric=False) + mape = MeanAbsolutePercentageError(symmetric=False) + metric_value = mape(actual, forecast) + elif metric_name == "mae": + mae = MeanAbsoluteError() + metric_value = mae(actual, forecast) + elif metric_name == "mse": + mse = MeanSquaredError(square_root=False) + metric_value = mse(actual, forecast) + elif metric_name == "rmse": + rmse = MeanSquaredError(square_root=True) + metric_value = rmse(actual, forecast) metrics.extend( [( key, diff --git a/mmf_sa/models/momentforecast/MomentPipeline.py b/mmf_sa/models/momentforecast/MomentPipeline.py index 9ec2267..39623f5 100644 --- a/mmf_sa/models/momentforecast/MomentPipeline.py +++ b/mmf_sa/models/momentforecast/MomentPipeline.py @@ -7,7 +7,11 @@ import mlflow from mlflow.types import Schema, TensorSpec from mlflow.models.signature import ModelSignature -from sktime.performance_metrics.forecasting import mean_absolute_percentage_error +from sktime.performance_metrics.forecasting import ( + MeanAbsoluteError, + MeanSquaredError, + MeanAbsolutePercentageError, +) from typing import Iterator from pyspark.sql.functions import collect_list, pandas_udf from pyspark.sql import DataFrame @@ -115,6 +119,12 @@ def calculate_metrics( metric_name = "smape" elif self.params["metric"] == "mape": metric_name = "mape" + elif self.params["metric"] == "mae": + metric_name = "mae" + elif self.params["metric"] == "mse": + metric_name = "mse" + elif self.params["metric"] == "rmse": + metric_name = "rmse" else: raise Exception(f"Metric {self.params['metric']} not supported!") for key in keys: @@ -122,9 +132,20 @@ def calculate_metrics( forecast = pred_df[pred_df[self.params["group_id"]] == key][self.params["target"]].to_numpy()[0] try: if metric_name == "smape": - metric_value = mean_absolute_percentage_error(actual, forecast, symmetric=True) + smape = MeanAbsolutePercentageError(symmetric=True) + metric_value = smape(actual, forecast) elif metric_name == "mape": - metric_value = mean_absolute_percentage_error(actual, forecast, symmetric=False) + mape = MeanAbsolutePercentageError(symmetric=False) + metric_value = mape(actual, forecast) + elif metric_name == "mae": + mae = MeanAbsoluteError() + metric_value = mae(actual, forecast) + elif metric_name == "mse": + mse = MeanSquaredError(square_root=False) + metric_value = mse(actual, forecast) + elif metric_name == "rmse": + rmse = MeanSquaredError(square_root=True) + metric_value = rmse(actual, forecast) metrics.extend( [( key, diff --git a/mmf_sa/models/neuralforecast/NeuralForecastPipeline.py b/mmf_sa/models/neuralforecast/NeuralForecastPipeline.py index 68037d2..de1f027 100644 --- a/mmf_sa/models/neuralforecast/NeuralForecastPipeline.py +++ b/mmf_sa/models/neuralforecast/NeuralForecastPipeline.py @@ -1,6 +1,10 @@ import pandas as pd import numpy as np -from sktime.performance_metrics.forecasting import mean_absolute_percentage_error +from sktime.performance_metrics.forecasting import ( + MeanAbsoluteError, + MeanSquaredError, + MeanAbsolutePercentageError, +) from neuralforecast import NeuralForecast from mmf_sa.models.abstract_model import ForecastingRegressor from neuralforecast.auto import ( @@ -168,6 +172,12 @@ def calculate_metrics( metric_name = "smape" elif self.params["metric"] == "mape": metric_name = "mape" + elif self.params["metric"] == "mae": + metric_name = "mae" + elif self.params["metric"] == "mse": + metric_name = "mse" + elif self.params["metric"] == "rmse": + metric_name = "rmse" else: raise Exception(f"Metric {self.params['metric']} not supported!") for key in keys: @@ -176,10 +186,20 @@ def calculate_metrics( iloc[-self.params["prediction_length"]:] try: if metric_name == "smape": - metric_value = mean_absolute_percentage_error(actual, forecast, symmetric=True) + smape = MeanAbsolutePercentageError(symmetric=True) + metric_value = smape(actual, forecast) elif metric_name == "mape": - metric_value = mean_absolute_percentage_error(actual, forecast, symmetric=False) - + mape = MeanAbsolutePercentageError(symmetric=False) + metric_value = mape(actual, forecast) + elif metric_name == "mae": + mae = MeanAbsoluteError() + metric_value = mae(actual, forecast) + elif metric_name == "mse": + mse = MeanSquaredError(square_root=False) + metric_value = mse(actual, forecast) + elif metric_name == "rmse": + rmse = MeanSquaredError(square_root=True) + metric_value = rmse(actual, forecast) metrics.extend( [( key, diff --git a/mmf_sa/models/timesfmforecast/TimesFMPipeline.py b/mmf_sa/models/timesfmforecast/TimesFMPipeline.py index 0385f14..127a496 100644 --- a/mmf_sa/models/timesfmforecast/TimesFMPipeline.py +++ b/mmf_sa/models/timesfmforecast/TimesFMPipeline.py @@ -3,7 +3,11 @@ import sys import pandas as pd import torch -from sktime.performance_metrics.forecasting import mean_absolute_percentage_error +from sktime.performance_metrics.forecasting import ( + MeanAbsoluteError, + MeanSquaredError, + MeanAbsolutePercentageError, +) from mmf_sa.models.abstract_model import ForecastingRegressor @@ -70,6 +74,12 @@ def calculate_metrics( metric_name = "smape" elif self.params["metric"] == "mape": metric_name = "mape" + elif self.params["metric"] == "mae": + metric_name = "mae" + elif self.params["metric"] == "mse": + metric_name = "mse" + elif self.params["metric"] == "rmse": + metric_name = "rmse" else: raise Exception(f"Metric {self.params['metric']} not supported!") for key in keys: @@ -77,10 +87,20 @@ def calculate_metrics( forecast = pred_df[pred_df[self.params["group_id"]] == key][self.params["target"]].to_numpy()[0] try: if metric_name == "smape": - metric_value = mean_absolute_percentage_error(actual, forecast, symmetric=True) + smape = MeanAbsolutePercentageError(symmetric=True) + metric_value = smape(actual, forecast) elif metric_name == "mape": - metric_value = mean_absolute_percentage_error(actual, forecast, symmetric=False) - + mape = MeanAbsolutePercentageError(symmetric=False) + metric_value = mape(actual, forecast) + elif metric_name == "mae": + mae = MeanAbsoluteError() + metric_value = mae(actual, forecast) + elif metric_name == "mse": + mse = MeanSquaredError(square_root=False) + metric_value = mse(actual, forecast) + elif metric_name == "rmse": + rmse = MeanSquaredError(square_root=True) + metric_value = rmse(actual, forecast) metrics.extend( [( key,