Skip to content

Commit

Permalink
bulk update
Browse files Browse the repository at this point in the history
  • Loading branch information
ryuta-yoshimatsu committed May 15, 2024
1 parent 34ae39b commit e7b3597
Show file tree
Hide file tree
Showing 10 changed files with 92 additions and 212 deletions.
42 changes: 21 additions & 21 deletions 01_mmf_univariate_daily_demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,27 +77,27 @@ def create_m4_df():
# COMMAND ----------

active_models = [
#"StatsForecastBaselineWindowAverage",
#"StatsForecastBaselineSeasonalWindowAverage",
#"StatsForecastBaselineNaive",
#"StatsForecastBaselineSeasonalNaive",
#"StatsForecastAutoArima",
#"StatsForecastAutoETS",
#"StatsForecastAutoCES",
#"StatsForecastAutoTheta",
#"StatsForecastTSB",
#"StatsForecastADIDA",
#"StatsForecastIMAPA",
#"StatsForecastCrostonClassic",
#"StatsForecastCrostonOptimized",
#"StatsForecastCrostonSBA",
#"RFableArima",
#"RFableETS",
#"RFableNNETAR",
#"RFableEnsemble",
#"RDynamicHarmonicRegression",
"SKTimeLgbmDsDt",
"SKTimeTBats",
"StatsForecastBaselineWindowAverage",
"StatsForecastBaselineSeasonalWindowAverage",
"StatsForecastBaselineNaive",
"StatsForecastBaselineSeasonalNaive",
"StatsForecastAutoArima",
"StatsForecastAutoETS",
"StatsForecastAutoCES",
"StatsForecastAutoTheta",
"StatsForecastTSB",
"StatsForecastADIDA",
"StatsForecastIMAPA",
"StatsForecastCrostonClassic",
"StatsForecastCrostonOptimized",
"StatsForecastCrostonSBA",
"RFableArima",
"RFableETS",
"RFableNNETAR",
"RFableEnsemble",
"RDynamicHarmonicRegression",
#"SKTimeLgbmDsDt",
#"SKTimeTBats",
#"NeuralForecastRNN",
#"NeuralForecastLSTM",
#"NeuralForecastNBEATSx",
Expand Down
6 changes: 3 additions & 3 deletions 02_mm_rossmann_forecasting.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

# Randomly select 100 stores to forecast
import random
random.seed(100)
random.seed(10)
sample = False
stores = sorted(random.sample(range(0, 1000), 100))

Expand All @@ -45,7 +45,7 @@
# COMMAND ----------

# Set the number of shuffle partitions larger than the total number of cores
sqlContext.setConf("spark.sql.shuffle.partitions", "1000")
#sqlContext.setConf("spark.sql.shuffle.partitions", "1000")

# COMMAND ----------

Expand Down Expand Up @@ -107,7 +107,7 @@
train_predict_ratio=2,
active_models=active_models,
data_quality_check=True,
resample=False,
resample=True,
ensemble=True,
ensemble_metric="smape",
ensemble_metric_avg=0.3,
Expand Down
34 changes: 13 additions & 21 deletions forecasting_sa/Forecaster.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
from datetime import datetime
import uuid
import yaml
from hyperopt import fmin, tpe, SparkTrials, STATUS_OK, rand, atpe
from typing import Dict, Any, Tuple, List, Union
from hyperopt import fmin, tpe, SparkTrials, STATUS_OK
from typing import Dict, Any, Tuple, Union
import pandas as pd
import numpy as np
import cloudpickle
Expand All @@ -20,7 +20,6 @@
from pyspark.sql.types import (
StructType,
StructField,
IntegerType,
StringType,
DateType,
DoubleType,
Expand All @@ -29,10 +28,9 @@
ArrayType,
)
from pyspark.sql.functions import lit, avg, min, max, col, posexplode, collect_list
from forecasting_sa.models.abstract_model import ForecastingSARegressor
from forecasting_sa.models.abstract_model import ForecastingRegressor
from forecasting_sa.models import ModelRegistry
from forecasting_sa.data_quality_checks import DataQualityChecks

_logger = logging.getLogger(__name__)
os.environ['NIXTLA_ID_AS_COL'] = '1'
mlflow.set_registry_uri("databricks-uc")
Expand Down Expand Up @@ -121,7 +119,7 @@ def prepare_data(self, model_conf: DictConfig, path: str, scoring=False) \
)
df = df.toPandas()
if not scoring:
df = DataQualityChecks(df, self.conf).run()
df, removed = DataQualityChecks(df, self.conf).run()
if model_conf.get("data_prep", "none") == "none":
df[self.conf["group_id"]] = df[self.conf["group_id"]].astype(str)
return df
Expand Down Expand Up @@ -250,7 +248,7 @@ def train_one_model(
train_df: pd.DataFrame,
val_df: pd.DataFrame,
model_conf: DictConfig,
model: ForecastingSARegressor,
model: ForecastingRegressor,
):
print("starting train")
tuned_model, tuned_params = self.tune_model(model_conf, model, train_df, val_df)
Expand All @@ -275,7 +273,7 @@ def train_one_model(
def tune_model(
self,
model_conf: DictConfig,
model: ForecastingSARegressor,
model: ForecastingRegressor,
train_df: pd.DataFrame,
val_df: pd.DataFrame,
):
Expand Down Expand Up @@ -352,7 +350,7 @@ def objective(params, train_df_path, val_df_path):

def backtest_and_log_metrics(
self,
model: ForecastingSARegressor,
model: ForecastingRegressor,
train_df: pd.DataFrame,
val_df: pd.DataFrame,
prefix: str,
Expand Down Expand Up @@ -396,7 +394,7 @@ def evaluate_models(self):

def evaluate_local_model(self, model_conf):
src_df = self.resolve_source("train_data")
src_df = DataQualityChecks(src_df, self.conf, self.spark).run()
src_df, removed = DataQualityChecks(src_df, self.conf, self.spark).run()
output_schema = StructType(
[
StructField(
Expand Down Expand Up @@ -450,14 +448,13 @@ def evaluate_local_model(self, model_conf):

@staticmethod
def evaluate_one_model(
pdf: pd.DataFrame, model: ForecastingSARegressor
pdf: pd.DataFrame, model: ForecastingRegressor
) -> pd.DataFrame:
pdf[model.params["date_col"]] = pd.to_datetime(pdf[model.params["date_col"]])
pdf.sort_values(by=model.params["date_col"], inplace=True)
split_date = pdf[model.params["date_col"]].max() - pd.DateOffset(
months=model.params["backtest_months"]
)

group_id = pdf[model.params["group_id"]].iloc[0]
try:
pdf = pdf.fillna(0.1)
Expand Down Expand Up @@ -555,12 +552,13 @@ def run_scoring(self):

def run_scoring_for_local_model(self, model_conf):
src_df = self.resolve_source("train_data")
src_df = DataQualityChecks(src_df, self.conf, self.spark).run()
src_df, removed = DataQualityChecks(src_df, self.conf, self.spark).run()
# Check if external regressors are provided and framework is statsforecast
# External regressors are supported only with statsforecast and neuralforecast models
if (self.conf["train_data"] != self.conf["scoring_data"]) & \
(model_conf["framework"] == "StatsForecast"):
score_df = self.resolve_source("scoring_data")
score_df = score_df.where(~col(self.conf["group_id"]).isin(removed))
src_df = src_df.unionByName(score_df, allowMissingColumns=True)
output_schema = StructType(
[
Expand Down Expand Up @@ -596,17 +594,12 @@ def run_scoring_for_local_model(self, model_conf):

@staticmethod
def score_one_model(
pdf: pd.DataFrame, model: ForecastingSARegressor
pdf: pd.DataFrame, model: ForecastingRegressor
) -> pd.DataFrame:
pdf[model.params["date_col"]] = pd.to_datetime(pdf[model.params["date_col"]])
pdf.sort_values(by=model.params["date_col"], inplace=True)
group_id = pdf[model.params["group_id"]].iloc[0]
if model.params["framework"] == "SKTime":
model.fit(pdf)
res_df, model_fitted = model.predict(pdf)
else:
res_df, model_fitted = model.forecast(pdf)

res_df, model_fitted = model.forecast(pdf)
try:
data = [
group_id,
Expand All @@ -615,7 +608,6 @@ def score_one_model(
cloudpickle.dumps(model_fitted)]
except:
data = [group_id, None, None, None]

res_df = pd.DataFrame(
columns=[
model.params["group_id"],
Expand Down
8 changes: 5 additions & 3 deletions forecasting_sa/data_quality_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def _multiple_checks(
_df = _resampled
return _df

def run(self) -> Union[pd.DataFrame, pyspark.sql.DataFrame]:
def run(self) -> tuple[Union[pd.DataFrame, pyspark.sql.DataFrame], list]:
"""
Main method of the job.
:return:
Expand All @@ -148,6 +148,7 @@ def run(self) -> Union[pd.DataFrame, pyspark.sql.DataFrame]:
self.df.sort_values(by=self.conf["date_col"], inplace=True)
self._external_regressors_check()
self._backtest_length_check()
removed = []

# If data_quality_check is None (not provided), we don't run the optional checks
if self.conf.get("data_quality_check", False):
Expand Down Expand Up @@ -175,8 +176,9 @@ def run(self) -> Union[pd.DataFrame, pyspark.sql.DataFrame]:
]
before = set(self.df[self.conf['group_id']].unique())
after = set(clean_df[self.conf['group_id']].unique())
removed = sorted(list(before - after))
print(f"Following {self.conf['group_id']} "
f"have been removed: {sorted(list(before - after))}")
f"have been removed: {removed}")
else:
clean_df = self.df

Expand All @@ -187,4 +189,4 @@ def run(self) -> Union[pd.DataFrame, pyspark.sql.DataFrame]:
if self.type == "spark":
clean_df = self.spark.createDataFrame(clean_df)

return clean_df
return clean_df, removed
4 changes: 2 additions & 2 deletions forecasting_sa/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from omegaconf import OmegaConf, DictConfig

from forecasting_sa.models.abstract_model import ForecastingSARegressor
from forecasting_sa.models.abstract_model import ForecastingRegressor


class ModelRegistry:
Expand Down Expand Up @@ -49,7 +49,7 @@ def load_models_conf():

def get_model(
self, model_name: str, override_conf: DictConfig = None
) -> ForecastingSARegressor:
) -> ForecastingRegressor:
model_conf = self.active_models.get(model_name)
if override_conf is not None:
model_conf = OmegaConf.merge(model_conf, override_conf)
Expand Down
76 changes: 18 additions & 58 deletions forecasting_sa/models/abstract_model.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from abc import abstractmethod
import numpy as np
import pandas as pd
from typing import Dict, Any, Union
import cloudpickle
from typing import Dict, Union
from sklearn.base import BaseEstimator, RegressorMixin
from sktime.performance_metrics.forecasting import mean_absolute_percentage_error


class ForecastingSARegressor(BaseEstimator, RegressorMixin):
class ForecastingRegressor(BaseEstimator, RegressorMixin):
def __init__(self, params):
self.params = params
self.freq = params["freq"].upper()[0]
Expand All @@ -20,12 +21,13 @@ def __init__(self, params):
)

@abstractmethod
def fit(self, X, y=None):
pass

def prepare_data(self, df: pd.DataFrame) -> pd.DataFrame:
return df

@abstractmethod
def fit(self, X, y=None):
pass

@abstractmethod
def predict(self, X):
# TODO Shouldn't X be optional if we have a trainable model and provide a prediction length
Expand All @@ -39,16 +41,9 @@ def forecast(self, X):
def supports_tuning(self) -> bool:
return False

@abstractmethod
def search_space(self):
pass

@abstractmethod
def calculate_metrics(
self, hist_df: pd.DataFrame, val_df: pd.DataFrame
) -> Dict[str, Union[str, float, bytes]]:
pass

def backtest(
self,
df: pd.DataFrame,
Expand Down Expand Up @@ -108,57 +103,22 @@ def backtest(
)
return res_df


class ForecastingSAVerticalizedDataRegressor(ForecastingSARegressor):
# Can be removed or an abstract method
def calculate_metrics(
self, hist_df: pd.DataFrame, val_df: pd.DataFrame
) -> Dict[str, Union[str, float, bytes, None]]:
print("starting calculate_metrics")
to_pred_df = val_df.copy()
to_pred_df[self.params["target"]] = np.nan
to_pred_df = pd.concat([hist_df, to_pred_df]).reset_index(drop=True)
pred_df = self.predict(to_pred_df)
keys = pred_df[self.params["group_id"]].unique()
metrics = []
# Compared predicted with val
for key in keys:
try:
smape = mean_absolute_percentage_error(
val_df[val_df[self.params["group_id"]] == key][self.params["target"]],
pred_df[pred_df[self.params["group_id"]] == key][self.params["target"]]
.iloc[-self.params["prediction_length"]:],
symmetric=True,
)
metrics.append(smape)
except:
pass
smape = sum(metrics) / len(metrics)
print("finished calculate_metrics")
) -> Dict[str, Union[str, float, bytes]]:
pred_df, model_fitted = self.predict(hist_df, val_df)
smape = mean_absolute_percentage_error(
val_df[self.params["target"]],
pred_df[self.params["target"]],
symmetric=True,
)
if self.params["metric"] == "smape":
metric_value = smape
else:
raise Exception(f"Metric {self.params['metric']} not supported!")

return {"metric_name": self.params["metric"],
"metric_value": metric_value,
"forecast": None,
"actual": None}


#class ForecastingSAPivotRegressor(ForecastingSARegressor):
# def calculate_metrics(
# self, hist_df: pd.DataFrame, val_df: pd.DataFrame
# ) -> Dict[str, Union[str, float, bytes, None]]:
# print("start calculate_metrics_pivot for model: ", self.params["name"])
# pred_df = self.predict(hist_df)
# pred_cols = [c for c in pred_df.columns if c not in [self.params["date_col"]]]
# smape = mean_absolute_percentage_error(
# val_df[pred_cols],
# pred_df[pred_cols],
# symmetric=True,
# )
# print("finished calculate_metrics")
# return {"metric_name": self.params["metric"],
# "metric_value": smape,
# "forecast": None,
# "actual": None}
"forecast": pred_df[self.params["target"]].to_numpy(),
"actual": val_df[self.params["target"]].to_numpy(),
"model_pickle": cloudpickle.dumps(model_fitted)}
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
from typing import Dict, Any, Union
from sktime.performance_metrics.forecasting import mean_absolute_percentage_error
from neuralforecast import NeuralForecast
from forecasting_sa.models.abstract_model import ForecastingSAVerticalizedDataRegressor
from forecasting_sa.models.abstract_model import ForecastingRegressor


class NeuralFcForecaster(ForecastingSAVerticalizedDataRegressor):
class NeuralFcForecaster(ForecastingRegressor):
def __init__(self, params):
super().__init__(params)
self.params = params
Expand Down
Loading

0 comments on commit e7b3597

Please sign in to comment.