Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added docstrings and code comments #53

Merged
merged 1 commit into from
Jun 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 143 additions & 12 deletions mmf_sa/Forecaster.py

Large diffs are not rendered by default.

46 changes: 43 additions & 3 deletions mmf_sa/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,53 @@ def run_forecast(
dynamic_future: List[str] = None,
dynamic_historical: List[str] = None,
active_models: List[str] = None,
accelerator: str = None,
accelerator: str = "cpu",
backtest_retrain: bool = None,
train_predict_ratio: int = None,
data_quality_check: bool = None,
experiment_path: str = None,
conf: Union[str, Dict[str, Any], OmegaConf] = None,
run_id: str = None,
conf: Union[str, Dict[str, Any], OmegaConf] = None,
) -> str:

"""
The function creates a Forecaster object with the provided configuration.
And then calls the evaluate_score method to perform the evaluation and forecasting.
The function returns the run id.

Parameters:
spark (SparkSession): A SparkSession object.
train_data (Union[str, pd.DataFrame, DataFrame]): Training data as a string of delta table name, pandas DataFrame, or Spark DataFrame.
group_id (str): A string specifying the column name that groups the training data into individual time series.
date_col (str): A string specifying the column name that stores the date variable.
target (str): A string specifying the column name of the target variable.
freq (str): A string specifying the frequency. Currently, "D" for daily and "M" for monthly are supported.
prediction_length (int): An integer specifying the prediction length: i.e. forecasting horizon.
backtest_months (int): An integer specifying the number of backtest months.
stride (int): An integer specifying the stride length.
metric (str): A string specifying the metric to use for evaluation. Default is smape.
resample (bool): A boolean specifying whether to back-fill skipped entries with 0. Default is False.
scoring_data (Union[str, pd.DataFrame, DataFrame]): Scoring data as a string of delta table name, pandas DataFrame, or Spark DataFrame.
scoring_output (str): A string specifying the output table name for scoring.
evaluation_output (str): A string specifying the output table name for evaluation.
model_output (str): A string specifying the output path for the model.
use_case_name (str): A string specifying the use case name.
static_features (List[str]): A list of strings specifying the static features.
dynamic_future (List[str]): A list of strings specifying the dynamic future features.
dynamic_historical (List[str]): A list of strings specifying the dynamic historical features.
active_models (List[str]): A list of strings specifying the active models.
accelerator (str): A string specifying the accelerator to use: cpu or gpu. Default is cpu.
backtest_retrain (bool): A boolean specifying whether to retrain the model during backtesting. Currently, not supported.
train_predict_ratio (int): An integer specifying the train predict ratio.
data_quality_check (bool): A boolean specifying whether to check the data quality.
experiment_path (str): A string specifying the experiment path.
run_id (str): A string specifying the run id. If not provided a random string is generated and assigned to each run.
conf (Union[str, Dict[str, Any], OmegaConf]): A configuration object.

Returns:
Dict[str, Union[int, str]]: A dictionary with an integer and a string as values.
"""

if isinstance(conf, dict):
_conf = OmegaConf.create(conf)
elif isinstance(conf, str):
Expand Down Expand Up @@ -69,7 +108,6 @@ def run_forecast(
_conf["stride"] = stride
_conf["metric"] = metric
_conf["resample"] = resample

run_evaluation = True
run_scoring = False
if scoring_data is not None and scoring_output is not None:
Expand Down Expand Up @@ -112,7 +150,9 @@ def run_forecast(
spark=spark,
run_id=run_id,
)

run_id = f.evaluate_score(evaluate=run_evaluation, score=run_scoring)

return run_id


Expand Down
52 changes: 28 additions & 24 deletions mmf_sa/data_quality_checks.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,9 @@
import functools
from abc import ABC, abstractmethod
from argparse import ArgumentParser
from logging import Logger
from typing import Dict, Any, Union
import yaml
import pathlib
from omegaconf import OmegaConf, DictConfig
from omegaconf import DictConfig
import pandas as pd
import pyspark
from pyspark.sql import SparkSession
import sys


class DataQualityChecks:
Expand All @@ -28,8 +22,9 @@ def __init__(

def _backtest_length_check(self):
"""
Checks if the backtest interval contains at least one prediction length.
Mandatory check irrespective of data_quality_check being True or False.
Checks if backtest_months contains at least one prediction_length.
Mandatory check regardless of data_quality_check set to True or False.
Parameters: self (Forecaster): A Forecaster object.
"""
backtest_days = self.conf["backtest_months"] * 30
prediction_length_days = (
Expand All @@ -42,13 +37,14 @@ def _backtest_length_check(self):

def _external_regressors_check(self):
"""
Checks if the resampling is turned off when an external regressor is given.
Mandatory check irrespective of data_quality_check being True or False.
Checks if the resampling is turned off when an exogenous regressor is given.
Mandatory check irrespective of data_quality_check set to True or False.
Parameters: self (Forecaster): A Forecaster object.
"""
if (
self.conf.get("static_categoricals", None)
or self.conf.get("dynamic_categoricals", None)
or self.conf.get("dynamic_reals", None)
self.conf.get("static_features", None)
or self.conf.get("dynamic_future", None)
or self.conf.get("dynamic_historical", None)
):
if self.conf.get("resample"):
raise Exception(
Expand All @@ -60,16 +56,21 @@ def _multiple_checks(
_df: pd.DataFrame, conf: Dict[str, Any], max_date: pd.Timestamp
) -> pd.DataFrame:
"""
Run 3 checks on the subset dataset grouped by group_id.
Optional checks only when data_quality_check is True.
1. Check if any of external regressor provided contains null. If it does, this time series is removed.
2. Check if the training period is longer than the requirement (train_predict_ratio).
3. Check for missing entries. If the time series has a missing entry and the resampling is disabled,
this time series is removed. If the time series has too many missing entries (more than 0.2 of the
Runs 4 checks on the subset dataset grouped by group_id.
These optional checks run only when data_quality_check is True.
1. Checks if any of external regressor provided contains null. If it does, this time series is removed.
2. Checks if the training period is longer than the requirement (train_predict_ratio).
3. Checks for missing entries. If the time series has a missing entry and the resampling is disabled,
it is removed. If the time series has too many missing entries (more than 0.2 of the
entire duration), it is removed even when resampling is enabled.
4. If the time series has too many negative entries (more than 0.2 of the entire duration), it is removed.
:return:
pandas DataFrame with time series not meeting the requirement removed.

Parameters:
_df (pd.DataFrame): A pandas DataFrame.
conf (Dict[str, Any]): A dictionary specifying the configuration.
max_date (pd.Timestamp, optional): A pandas Timestamp object.

Returns: _df (pd.DataFrame): A pandas DataFrame after time series not meeting the requirement removed.
"""

group_id = _df[conf["group_id"]].iloc[0]
Expand Down Expand Up @@ -135,8 +136,11 @@ def _multiple_checks(

def run(self) -> tuple[Union[pd.DataFrame, pyspark.sql.DataFrame], list]:
"""
Main method of the job.
:return:
Runs the main method of the job.
Parameters: self (Forecaster): A Forecaster object.
Returns:
clean_df (Union[pd.DataFrame, pyspark.sql.DataFrame]): A pandas DataFrame or a PySpark DataFrame.
removed (list): A list of group ids that are removed.
"""
print(f"Running data quality checks...")
self.df[self.conf["date_col"]] = pd.to_datetime(self.df[self.conf["date_col"]])
Expand Down
3 changes: 1 addition & 2 deletions mmf_sa/jobs/forecasting_job.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import mlflow
from mlflow.tracking import MlflowClient

from mmf_sa.Forecaster import Forecaster
from mmf_sa.common import Job

Expand All @@ -17,7 +16,7 @@ def launch(self):
)

forecaster = Forecaster(self.conf, self.spark, experiment_id)
forecaster.train_eval_score(export_metrics=False, scoring=False)
forecaster.evaluate_score()
self.logger.info("Forecasting Job finished!")


Expand Down
28 changes: 0 additions & 28 deletions mmf_sa/jobs/retraining_evaluation_job.py

This file was deleted.

33 changes: 26 additions & 7 deletions mmf_sa/models/abstract_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,10 @@ def fit(self, x, y=None):

@abstractmethod
def predict(self, x, y=None):
# TODO Shouldn't X be optional if we have a trainable model and provide a prediction length
pass

@abstractmethod
def forecast(self, x, spark=None):
# TODO Shouldn't X be optional if we have a trainable model and provide a prediction length
pass

def backtest(
Expand All @@ -46,9 +44,20 @@ def backtest(
start: pd.Timestamp,
group_id: Union[str, int] = None,
stride: int = None,
backtest_retrain: bool = False,
# backtest_retrain: bool = False,
spark=None,
) -> pd.DataFrame:
"""
Performs backtesting using the provided pandas DataFrame, start timestamp, group id, stride and SparkSession.
Parameters:
self (Forecaster): A Forecaster object.
df (pd.DataFrame): A pandas DataFrame.
start (pd.Timestamp): A pandas Timestamp object.
group_id (Union[str, int], optional): A string or an integer specifying the group id. Default is None.
stride (int, optional): An integer specifying the stride. Default is None.
spark (SparkSession, optional): A SparkSession object. Default is None.
Returns: res_df (pd.DataFrame): A pandas DataFrame.
"""
if stride is None:
stride = int(self.params.get("stride", 7))
stride_offset = (
Expand All @@ -59,12 +68,12 @@ def backtest(
df = df.copy().sort_values(by=[self.params["date_col"]])
end_date = df[self.params["date_col"]].max()
curr_date = start + self.one_ts_offset
#print("end_date = ", end_date)
# print("end_date = ", end_date)

results = []

while curr_date + self.prediction_length_offset <= end_date + self.one_ts_offset:
#print("start_date = ", curr_date)
# print("start_date = ", curr_date)
_df = df[df[self.params["date_col"]] < np.datetime64(curr_date)]
actuals_df = df[
(df[self.params["date_col"]] >= np.datetime64(curr_date))
Expand All @@ -74,8 +83,8 @@ def backtest(
)]

# backtest_retrain for global models is currently not supported
if backtest_retrain and self.params["model_type"] == "global":
self.fit(_df)
# if backtest_retrain and self.params["model_type"] == "global":
# self.fit(_df)

metrics = self.calculate_metrics(_df, actuals_df, curr_date, spark)

Expand Down Expand Up @@ -113,6 +122,16 @@ def backtest(
def calculate_metrics(
self, hist_df: pd.DataFrame, val_df: pd.DataFrame, curr_date, spark=None
) -> Dict[str, Union[str, float, bytes]]:
"""
Calculates the metrics using the provided historical DataFrame, validation DataFrame, current date, and SparkSession.
Parameters:
self (Forecaster): A Forecaster object.
hist_df (pd.DataFrame): A pandas DataFrame.
val_df (pd.DataFrame): A pandas DataFrame.
curr_date: A pandas Timestamp object.
spark (SparkSession, optional): A SparkSession object. Default is None.
Returns: metrics (Dict[str, Union[str, float, bytes]]): A dictionary specifying the metrics.
"""
pred_df, model_fitted = self.predict(hist_df, val_df)
smape = mean_absolute_percentage_error(
val_df[self.params["target"]],
Expand Down
2 changes: 1 addition & 1 deletion mmf_sa/models/chronosforecast/ChronosPipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def predict(self,
)

# Todo
#forecast_df[self.params.target] = forecast_df[self.params.target].clip(0.01)
# forecast_df[self.params.target] = forecast_df[self.params.target].clip(0.01)
return forecast_df, self.model

def forecast(self, df: pd.DataFrame, spark=None):
Expand Down
21 changes: 10 additions & 11 deletions mmf_sa/models/models_conf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,35 +52,34 @@ models:
framework: StatsForecast
model_type: local
model_spec:
season_length:
approximation:
level:
season_length: 7
approximation: false

StatsForecastAutoETS:
module: mmf_sa.models.statsforecast.StatsFcForecastingPipeline
model_class: StatsFcAutoETS
framework: StatsForecast
model_type: local
model_spec:
season_length:
model_type: "ZZZ"
season_length: 7
model: "ZZZ"

StatsForecastAutoCES:
module: mmf_sa.models.statsforecast.StatsFcForecastingPipeline
model_class: StatsFcAutoCES
framework: StatsForecast
model_type: local
model_spec:
season_length: 1
model_type: "Z"
season_length: 7
model: "Z"

StatsForecastAutoTheta:
module: mmf_sa.models.statsforecast.StatsFcForecastingPipeline
model_class: StatsFcAutoTheta
framework: StatsForecast
model_type: local
model_spec:
season_length: 1
season_length: 7
decomposition_type: "multiplicative"

StatsForecastTSB:
Expand Down Expand Up @@ -133,23 +132,23 @@ models:
framework: RFable
model_type: local
model_spec:
season_length:
season_length: 7

RFableETS:
module: mmf_sa.models.r_fable.RFableForecastingPipeline
model_class: RFableModel
framework: RFable
model_type: local
model_spec:
season_length:
season_length: 7

RFableNNETAR:
module: mmf_sa.models.r_fable.RFableForecastingPipeline
model_class: RFableModel
framework: RFable
model_type: local
model_spec:
season_length:
season_length: 7

RFableEnsemble:
module: mmf_sa.models.r_fable.RFableForecastingPipeline
Expand Down
2 changes: 1 addition & 1 deletion mmf_sa/models/moiraiforecast/MoiraiPipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def predict(self,
)

# Todo
#forecast_df[self.params.target] = forecast_df[self.params.target].clip(0.01)
# forecast_df[self.params.target] = forecast_df[self.params.target].clip(0.01)
return forecast_df, self.model

def forecast(self, df: pd.DataFrame, spark=None):
Expand Down
2 changes: 1 addition & 1 deletion mmf_sa/models/momentforecast/MomentPipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def predict(self,
)

# Todo
#forecast_df[self.params.target] = forecast_df[self.params.target].clip(0.01)
# forecast_df[self.params.target] = forecast_df[self.params.target].clip(0.01)
return forecast_df, self.model

def forecast(self, df: pd.DataFrame, spark=None):
Expand Down
2 changes: 1 addition & 1 deletion mmf_sa/models/neuralforecast/NeuralForecastPipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def predict(self, hist_df: pd.DataFrame, val_df: pd.DataFrame = None):

return forecast_df, self.model

def forecast(self, df: pd.DataFrame):
def forecast(self, df: pd.DataFrame, spark=None):
_df = df[df[self.params.target].notnull()]
_df = self.prepare_data(_df)
_last_date = _df["ds"].max()
Expand Down
Loading
Loading