Skip to content

Commit

Permalink
minor updates
Browse files Browse the repository at this point in the history
  • Loading branch information
ryuta-yoshimatsu committed Jun 24, 2024
1 parent 56d18a3 commit f1933d7
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 65 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,9 @@ MMF is fully integrated with MLflow and so once the training kicks off, the expe

We encourage you to read through [examples/foundation_daily.py](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/examples/foundation_daily.py) notebook to better understand how foundation models can be applied to your time series using MMF. An example notebook for monthly forecasting can be found in [examples/foundation_monthly.py](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/examples/foundation_monthly.py).

#### Using Foundation Models on Databricks
#### Using Time Series Foundation Models on Databricks

If you want to try out open source foundation models on Databricks without MMF, you can find example notebooks in [examples/foundation-model-examples](https://github.com/databricks-industry-solutions/many-model-forecasting/tree/main/examples/foundation-model-examples). These notebooks will show you how you can load, distribute the inference, fine-tune, register, deploy a model and generate online forecasts on it. We have notebooks for Chronos, Moirai, Moment, and TimesFM.
If you want to try out time series foundation models on Databricks without MMF, you can find example notebooks in [examples/foundation-model-examples](https://github.com/databricks-industry-solutions/many-model-forecasting/tree/main/examples/foundation-model-examples). These notebooks will show you how you can load, distribute the inference, fine-tune, register, deploy a model and generate online forecasts on it. We have notebooks for [TimeGPT](https://docs.nixtla.io/), [Chronos](https://github.com/amazon-science/chronos-forecasting), [Moirai](https://github.com/SalesforceAIResearch/uni2ts), [Moment](https://github.com/moment-timeseries-foundation-model/moment), and [TimesFM](https://github.com/google-research/timesfm).

## Project support
Please note the code in this project is provided for your exploration only, and are not formally supported by Databricks with Service Level Agreements (SLAs). They are provided AS-IS and we do not make any guarantees of any kind. Please do not submit a support ticket relating to any issues arising from the use of these projects. The source in this project is provided subject to the Databricks License. All included or referenced third party libraries are subject to the licenses set forth below.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,18 @@
# MAGIC Here are the prerequisites:
# MAGIC 1. If you don’t have an Azure subscription, get one here: https://azure.microsoft.com/en-us/pricing/purchase-options/pay-as-you-go
# MAGIC 2. Create an Azure AI Studio hub and project. Supported regions are: East US 2, Sweden Central, North Central US, East US, West US, West US3, South Central US. Make sure you pick one these as the Azure region for the hub.
# MAGIC Next, you need to create a deployment to obtain the inference API and key:
# MAGIC Next, you need to create a deployment to obtain the inference API and key.
# MAGIC
# MAGIC 3. Open the TimeGEN-1 model card in the model catalog: https://aka.ms/aistudio/landing/nixtlatimegen1
# MAGIC 4. Click on Deploy and select the Pay-as-you-go option.
# MAGIC 5.ubscribe to the Marketplace offer and deploy. You can also review the API pricing at this step.
# MAGIC 5. Subscribe to the Marketplace offer and deploy. You can also review the API pricing at this step.
# MAGIC 6. You should land on the deployment page that shows you the API key and URL in less than a minute.

# COMMAND ----------

# DBTITLE 1,Import Libraries
# MAGIC %pip install nixtla --quiet
# MAGIC %pip install --upgrade mlflow --quiet
# MAGIC dbutils.library.restartPython()

# COMMAND ----------
Expand All @@ -30,23 +31,29 @@

# COMMAND ----------

# import time
key_name = f'api_key'
scope_name = f'time-gpt'

# COMMAND ----------

# from databricks.sdk import WorkspaceClient
# MAGIC %md
# MAGIC If this is your first time running the notebook and you still don't have your credential managed in the secret, uncomment and run the following cell.

# w = WorkspaceClient()
# COMMAND ----------

key_name = f'api_key'
scope_name = f'time-gpt'
#import time
#from databricks.sdk import WorkspaceClient

# # w.secrets.create_scope(scope=scope_name)
# # w.secrets.put_secret(scope=scope_name, key=key_name, string_value=f'<input api key here>')
#w = WorkspaceClient()

# put the key in secret
#w.secrets.create_scope(scope=scope_name)
#w.secrets.put_secret(scope=scope_name, key=key_name, string_value=f'<input api key here>')

# # cleanup
# # w.secrets.delete_secret(scope=scope_name, key=key_name)
# # w.secrets.delete_secret(scope=scope_name, key=key_name)
# # w.secrets.delete_scope(scope=scope_name)
# cleanup
#w.secrets.delete_secret(scope=scope_name, key=key_name)
## w.secrets.delete_secret(scope=scope_name, key=key_name)
## w.secrets.delete_scope(scope=scope_name)

# COMMAND ----------

Expand All @@ -56,16 +63,16 @@

# COMMAND ----------

catalog = "mlops_pj" # Name of the catalog we use to manage our assets
db = "timegpt" # Name of the schema we use to manage our assets (e.g. datasets)
catalog = "mmf" # Name of the catalog we use to manage our assets
db = "m4" # Name of the schema we use to manage our assets (e.g. datasets)
n = 10 # Number of time series to sample

# COMMAND ----------

# This cell will create tables:
# 1. {catalog}.{db}.m4_daily_train,
# 2. {catalog}.{db}.m4_monthly_train
# dbutils.notebook.run("../data_preparation", timeout_seconds=0, arguments={"catalog": catalog, "db": db, "n": n})
dbutils.notebook.run("../data_preparation", timeout_seconds=0, arguments={"catalog": catalog, "db": db, "n": n})

# COMMAND ----------

Expand All @@ -90,6 +97,8 @@
from typing import Iterator,Tuple
from pyspark.sql.functions import pandas_udf


## function to select single time series from the prepared dataset
def get_single_time_series(unique_id):
pdf = df.filter(df.unique_id == unique_id).toPandas()
pdf = {
Expand All @@ -98,6 +107,7 @@ def get_single_time_series(unique_id):
}
return pd.DataFrame(pdf)


def create_forecast_udf(model_url, api_key,prediction_length=12):

@pandas_udf('struct<timestamp:array<string>,forecast:array<double>>')
Expand Down Expand Up @@ -132,13 +142,29 @@ def forecast_udf(iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd
# COMMAND ----------

# DBTITLE 1,Forecasting with TimeGEN on Azure AI
model_url = "https://TimeGEN-1-pj-serverless.westus3.inference.ai.azure.com"
model_url = "https://TimeGEN-1-pj-serverless.eastus2.inference.ai.azure.com" # Your model url
prediction_length = 12 # Time horizon for forecasting
api_key = dbutils.secrets.get(scope =scope_name,key = key_name)
api_key = dbutils.secrets.get(scope=scope_name, key=key_name) # Get credential from secrets
freq = "D" # Frequency of the time series

# COMMAND ----------

# Create Pandas UDF
forecast_udf = create_forecast_udf(
model_url=model_url,
api_key=api_key,
)

# Apply Pandas UDF to the dataframe
forecasts = df.select(
df.unique_id,
forecast_udf("ds", "y").alias("forecast"),
).select("unique_id", "forecast.timestamp", "forecast.forecast")

display(forecasts)

# COMMAND ----------

# MAGIC %md
# MAGIC ##Register Model

Expand All @@ -148,7 +174,7 @@ def forecast_udf(iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd
import torch
import numpy as np
from mlflow.models.signature import ModelSignature
from mlflow.types import DataType, Schema, TensorSpec ,ColSpec, ParamSpec,ParamSchema
from mlflow.types import DataType, Schema, TensorSpec, ColSpec, ParamSpec,ParamSchema
mlflow.set_registry_uri("databricks-uc")


Expand Down Expand Up @@ -189,7 +215,7 @@ def predict(self, context, input_data, params=None):
"timestamp" : list(pdf['ds'][0]),
"value" : list(pdf['y'][0])
}
pdf =get_single_time_series('D4')
pdf=get_single_time_series('D4')

with mlflow.start_run() as run:
mlflow.pyfunc.log_model(
Expand All @@ -214,7 +240,7 @@ def predict(self, context, input_data, params=None):
mlflow_client = MlflowClient()

def get_latest_model_version(mlflow_client, registered_model_name):
latest_version = 4
latest_version = 1
for mv in mlflow_client.search_model_versions(f"name='{registered_model_name}'"):
version_int = int(mv.version)
if version_int > latest_version:
Expand All @@ -234,9 +260,9 @@ def get_latest_model_version(mlflow_client, registered_model_name):

# COMMAND ----------

#Get Random Data Point
pdf =get_single_time_series('D4')
loaded_model.predict(pdf,params = {'h' :20})
# Get Random Data Point
pdf = get_single_time_series('D4')
loaded_model.predict(pdf, params = {'h': 20})

# COMMAND ----------

Expand Down
73 changes: 34 additions & 39 deletions examples/foundation-model-examples/timegpt/02_timegpt_fine_tune.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# Databricks notebook source
# MAGIC %md
# MAGIC This is an example notebook that shows how to use Foundational Model Time-Series [TimeGPT](https://docs.nixtla.io/) models on Databricks and fine-tune the model on the fly.
# MAGIC The notebook loads the model, distributes the inference, registers and deploys the model
# MAGIC This is an example notebook that shows how to use Foundational Model Time-Series [TimeGPT](https://docs.nixtla.io/) models on Databricks and fine-tune the model on the fly. The notebook loads the model, distributes the fine-tuning and inference, registers and deploys the model.

# COMMAND ----------

Expand All @@ -14,13 +13,14 @@
# MAGIC
# MAGIC 3. Open the TimeGEN-1 model card in the model catalog: https://aka.ms/aistudio/landing/nixtlatimegen1
# MAGIC 4. Click on Deploy and select the Pay-as-you-go option.
# MAGIC 5.ubscribe to the Marketplace offer and deploy. You can also review the API pricing at this step.
# MAGIC 5. Subscribe to the Marketplace offer and deploy. You can also review the API pricing at this step.
# MAGIC 6. You should land on the deployment page that shows you the API key and URL in less than a minute.

# COMMAND ----------

# DBTITLE 1,Import Libraries
# MAGIC %pip install nixtla --quiet
# MAGIC %pip install --upgrade mlflow --quiet
# MAGIC dbutils.library.restartPython()

# COMMAND ----------
Expand All @@ -30,23 +30,29 @@

# COMMAND ----------

import time
key_name = f'api_key'
scope_name = f'time-gpt'

from databricks.sdk import WorkspaceClient
# COMMAND ----------

w = WorkspaceClient()
# MAGIC %md
# MAGIC If this is your first time running the notebook and you still don't have your credential managed in the secret, uncomment and run the following cell.

key_name = f'api_key'
scope_name = f'time-gpt'
# COMMAND ----------

# # w.secrets.create_scope(scope=scope_name)
# w.secrets.put_secret(scope=scope_name, key=key_name, string_value=<enter API key>)
#import time
#from databricks.sdk import WorkspaceClient

#w = WorkspaceClient()

# # cleanup
# w.secrets.delete_secret(scope=scope_name, key=key_name)
# w.secrets.delete_secret(scope=scope_name, key=key_name)
# # w.secrets.delete_scope(scope=scope_name)
# put the key in secret
#w.secrets.create_scope(scope=scope_name)
#w.secrets.put_secret(scope=scope_name, key=key_name, string_value=f'<input api key here>')

# cleanup
#w.secrets.delete_secret(scope=scope_name, key=key_name)
## w.secrets.delete_secret(scope=scope_name, key=key_name)
## w.secrets.delete_scope(scope=scope_name)

# COMMAND ----------

Expand All @@ -56,16 +62,16 @@

# COMMAND ----------

catalog = "mlops_pj" # Name of the catalog we use to manage our assets
db = "timegpt" # Name of the schema we use to manage our assets (e.g. datasets)
catalog = "mmf" # Name of the catalog we use to manage our assets
db = "m4" # Name of the schema we use to manage our assets (e.g. datasets)
n = 10 # Number of time series to sample

# COMMAND ----------

# This cell will create tables:
# 1. {catalog}.{db}.m4_daily_train,
# 2. {catalog}.{db}.m4_monthly_train
# dbutils.notebook.run("../data_preparation", timeout_seconds=0, arguments={"catalog": catalog, "db": db, "n": n})
dbutils.notebook.run("../data_preparation", timeout_seconds=0, arguments={"catalog": catalog, "db": db, "n": n})

# COMMAND ----------

Expand All @@ -74,7 +80,7 @@
# Make sure that the data exists
df = spark.table(f'{catalog}.{db}.m4_daily_train')
df = df.groupBy('unique_id').agg(collect_list('ds').alias('ds'), collect_list('y').alias('y'))
df = df.filter(size(df.ds)>=300)
df = df.filter(size(df.ds) >= 300)
display(df)

# COMMAND ----------
Expand All @@ -92,8 +98,7 @@
from pyspark.sql.functions import pandas_udf



def create_forecast_udf(model_url, api_key,prediction_length=12,ft_steps= 10):
def create_forecast_udf(model_url, api_key, prediction_length=12, ft_steps=10):

@pandas_udf('struct<timestamp:array<string>,forecast:array<double>>')
def forecast_udf(iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.DataFrame]:
Expand All @@ -108,11 +113,12 @@ def forecast_udf(iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd
api_key=api_key)

## inference
for timeseries,past_values in iterator:
for timeseries, past_values in iterator:
median = []
for ts, y in zip(timeseries,past_values):
for ts, y in zip(timeseries, past_values):
tdf = pd.DataFrame({"timestamp":ts,
"value" :y})

pred = model.forecast(
df=tdf,
h=prediction_length,
Expand All @@ -129,32 +135,21 @@ def forecast_udf(iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd

model_url = "https://TimeGEN-1-pj-serverless.eastus2.inference.ai.azure.com"
prediction_length = 10 # Time horizon for forecasting
ft_steps = 10 # Number of forecast to generate. We will take median as our final forecast.
ft_steps = 10 # Number of training interations to perform for fientuning
api_key = dbutils.secrets.get(scope =scope_name,key = key_name)
freq = "D" # Frequency of the time series

# COMMAND ----------

# from nixtla import NixtlaClient
# nixtla_client = NixtlaClient(a
# base_url=model_url,
# api_key=dbutils.secrets.get(scope =scope_name,key = key_name),
# )

# COMMAND ----------

# get_horizon_timestamps = create_get_horizon_timestamps(freq=freq, prediction_length=prediction_length)

forecast_udf = create_forecast_udf(
model_url=model_url,
api_key=api_key
api_key=api_key,
)

forecasts = df.select(
df.unique_id,
# get_horizon_timestamps(df.ds).alias("ds"),
forecast_udf("ds","y").alias("forecast"),
).select("unique_id","forecast.timestamp","forecast.forecast" )
df.unique_id,
forecast_udf("ds", "y").alias("forecast"),
).select("unique_id", "forecast.timestamp", "forecast.forecast")

display(forecasts)

Expand Down Expand Up @@ -257,7 +252,7 @@ def get_latest_model_version(mlflow_client, registered_model_name):

# COMMAND ----------

#Test the endpoint before deployment
# Test the endpoint before deployment
loaded_model.predict(pdf,params = {'h' :20})

# COMMAND ----------
Expand Down

0 comments on commit f1933d7

Please sign in to comment.