Skip to content

Commit

Permalink
Merge pull request #26 from databricks-industry-solutions/update_202405
Browse files Browse the repository at this point in the history
bulk update
  • Loading branch information
ryuta-yoshimatsu authored May 16, 2024
2 parents e5a24de + ceb4f5f commit 3137b3f
Show file tree
Hide file tree
Showing 29 changed files with 956 additions and 1,203 deletions.
Binary file not shown.
Binary file not shown.
2 changes: 1 addition & 1 deletion .github/workflows/integration-test-aws-pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
new-cluster-json: >
{
"num_workers": 0,
"spark_version": "10.4.x-scala2.12",
"spark_version": "14.3.x-cpu-ml-scala2.12",
"node_type_id": "i3.xlarge",
"aws_attributes": {
"availability": "ON_DEMAND"
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/integration-test-aws-push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
new-cluster-json: >
{
"num_workers": 0,
"spark_version": "10.4.x-scala2.12",
"spark_version": "14.3.x-cpu-ml-scala2.12",
"node_type_id": "i3.xlarge",
"aws_attributes": {
"availability": "ON_DEMAND"
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/integration-test-gcp-pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
new-cluster-json: >
{
"num_workers": 0,
"spark_version": "10.4.x-scala2.12",
"spark_version": "14.3.x-cpu-ml-scala2.12",
"node_type_id": "n1-highmem-4",
"gcp_attributes": {
"availability": "ON_DEMAND_GCP"
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/integration-test-gcp-push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
new-cluster-json: >
{
"num_workers": 0,
"spark_version": "10.4.x-scala2.12",
"spark_version": "14.3.x-cpu-ml-scala2.12",
"node_type_id": "n1-highmem-4",
"gcp_attributes": {
"availability": "ON_DEMAND_GCP"
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/integration-test-msa-pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
new-cluster-json: >
{
"num_workers": 0,
"spark_version": "10.4.x-scala2.12",
"spark_version": "14.3.x-cpu-ml-scala2.12",
"node_type_id": "Standard_DS3_v2",
"azure_attributes": {
"availability": "ON_DEMAND_AZURE"
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/integration-test-msa-push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
new-cluster-json: >
{
"num_workers": 0,
"spark_version": "10.4.x-scala2.12",
"spark_version": "14.3.x-cpu-ml-scala2.12",
"node_type_id": "Standard_D3_v2",
"azure_attributes": {
"availability": "ON_DEMAND_AZURE"
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,5 @@ dmypy.json
# Pyre type checker
.pyre/


.databricks
89 changes: 54 additions & 35 deletions 01_mm_forecasting_demo.py → 01_mmf_univariate_daily_demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@

# COMMAND ----------

# MAGIC %pip install -r requirements.txt
# MAGIC %pip install datasetsforecast==0.0.7

# MAGIC %pip install -r requirements.txt --quiet
dbutils.library.restartPython()
# COMMAND ----------

import logging

from tqdm.autonotebook import tqdm
logger = spark._jvm.org.apache.log4j
logging.getLogger("py4j.java_gateway").setLevel(logging.ERROR)
logging.getLogger("py4j.clientserver").setLevel(logging.ERROR)
Expand All @@ -35,7 +34,7 @@

# COMMAND ----------

def _transform_group(df):
def transform_group(df):
unique_id = df.unique_id.iloc[0]
_start = pd.Timestamp("2020-01-01")
_end = _start + pd.DateOffset(days=int(df.count()[0]) - 1)
Expand All @@ -53,7 +52,7 @@ def create_m4_df():
y_df.groupby("unique_id")
.filter(lambda x: x.unique_id.iloc[0] in _ids)
.groupby("unique_id")
.apply(_transform_group)
.apply(transform_group)
.reset_index(drop=True)
)
return y_df
Expand All @@ -65,12 +64,11 @@ def create_m4_df():
# COMMAND ----------

m4_df = spark.createDataFrame(create_m4_df())
m4_df.createOrReplaceTempView("mmf_sa_train")
display(m4_df)
m4_df.createOrReplaceTempView("mmf_train")

# COMMAND ----------

# MAGIC %sql select * from mmf_sa_train where unique_id in ('D1', 'D2', 'D6', 'D7')
# MAGIC %sql select * from mmf_train where unique_id in ('D1', 'D2', 'D6', 'D7', 'D10')

# COMMAND ----------

Expand All @@ -79,35 +77,35 @@ def create_m4_df():
# COMMAND ----------

active_models = [
#"StatsForecastBaselineWindowAverage",
"StatsForecastBaselineWindowAverage",
#"StatsForecastBaselineSeasonalWindowAverage",
#"StatsForecastBaselineNaive",
#"StatsForecastBaselineSeasonalNaive",
"StatsForecastAutoArima",
"StatsForecastAutoETS",
"StatsForecastAutoCES",
"StatsForecastAutoTheta",
#"StatsForecastAutoETS",
#"StatsForecastAutoCES",
#"StatsForecastAutoTheta",
#"StatsForecastTSB",
#"StatsForecastADIDA",
#"StatsForecastIMAPA",
#"StatsForecastCrostonClassic",
#"StatsForecastCrostonOptimized",
#"StatsForecastCrostonSBA",
#"RFableArima",
"RFableArima",
#"RFableETS",
#"RFableNNETAR",
#"RFableEnsemble",
#"RDynamicHarmonicRegression",
#"GluonTSSimpleFeedForward",
#"GluonTSSeasonalNaive",
#"GluonTSNBEATS",
#"GluonTSDeepAR",
#"GluonTSProphet",
#"GluonTSTorchDeepAR",
#"GluonTSTransformer",
#"NeuralForecastMQNHiTS",
"SKTimeTBats",
#"SKTimeLgbmDsDt",
#"SKTimeTBats",
#"NeuralForecastRNN",
#"NeuralForecastLSTM",
#"NeuralForecastNBEATSx",
#"NeuralForecastNHITS",
#"NeuralForecastAutoRNN",
#"NeuralForecastAutoLSTM",
#"NeuralForecastAutoNBEATSx",
#"NeuralForecastAutoNHITS",
]

# COMMAND ----------
Expand All @@ -116,13 +114,21 @@ def create_m4_df():

# COMMAND ----------

# Make sure that the catalog and the schema exist
catalog = "solacc_uc" # Name of the catalog we use to manage our assets
db = "mmf" # Name of the schema we use to manage our assets (e.g. datasets)

_ = spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog}")
_ = spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{db}")

# COMMAND ----------

run_forecast(
spark=spark,
# conf={"temp_path": f"{str(temp_dir)}/temp"},
train_data="mmf_sa_train",
scoring_data="mmf_sa_train",
scoring_output="mmf_sa_forecast_scoring_out",
metrics_output="mmf_sa_metrics",
train_data="mmf_train",
scoring_data="mmf_train",
scoring_output=f"{catalog}.{db}.daily_scoring_output",
metrics_output=f"{catalog}.{db}.daily_metrics_output",
group_id="unique_id",
date_col="ds",
target="y",
Expand All @@ -131,15 +137,16 @@ def create_m4_df():
backtest_months=1,
stride=10,
train_predict_ratio=2,
#data_quality_check=False,
data_quality_check=True,
resample=False,
ensemble=True,
ensemble_metric="smape",
ensemble_metric_avg=0.3,
ensemble_metric_max=0.5,
ensemble_scoring_output="mmf_sa_forecast_ensemble_out",
ensemble_scoring_output=f"{catalog}.{db}.daily_ensemble_output",
active_models=active_models,
experiment_path=f"/Shared/fsa_cicd_pr_experiment",
use_case_name="fsa",
experiment_path=f"/Shared/mmf_experiment",
use_case_name="mmf",
)

# COMMAND ----------
Expand All @@ -149,7 +156,7 @@ def create_m4_df():

# COMMAND ----------

# MAGIC %sql select * from mmf_sa_metrics
# MAGIC %sql select * from solacc_uc.mmf.daily_metrics_output order by unique_id, model, backtest_window_start_date

# COMMAND ----------

Expand All @@ -158,7 +165,7 @@ def create_m4_df():

# COMMAND ----------

# MAGIC %sql select * from mmf_sa_forecast_scoring_out
# MAGIC %sql select * from solacc_uc.mmf.daily_scoring_output order by unique_id, model, ds

# COMMAND ----------

Expand All @@ -167,7 +174,19 @@ def create_m4_df():

# COMMAND ----------

# MAGIC %sql select * from mmf_sa_forecast_ensemble_out
# MAGIC %sql select * from solacc_uc.mmf.daily_ensemble_output order by unique_id, model, ds

# COMMAND ----------

# MAGIC %sql delete from solacc_uc.mmf.daily_metrics_output

# COMMAND ----------

# MAGIC %sql delete from solacc_uc.mmf.daily_scoring_output

# COMMAND ----------

# MAGIC %sql delete from solacc_uc.mmf.daily_ensemble_output

# COMMAND ----------

Expand Down
Loading

0 comments on commit 3137b3f

Please sign in to comment.