-
Notifications
You must be signed in to change notification settings - Fork 1
Airflow Integration DAG
This guide details the setup, components, and execution flow of the SME Amortization, SME Asset, and SME Deal Details Directed Acyclic Graphs (DAGs). These DAGs are used to process, transform, and manage datasets associated with SME financial records. Each DAG is built with Apache Airflow to automate workflow management, leveraging Google Cloud Dataproc and Google Cloud Storage for data processing and storage, respectively.
The SME Amortization, SME Asset, and SME Deal Details DAGs provide structured data pipelines for transforming raw SME data into refined datasets across various stages (Profile, Bronze, Silver). These stages ensure high-quality, structured, and query-ready data, primarily managed in Google Cloud. Key transformations include profiling data, managing dependencies, handling clean-up, and optimizing for storage.
The DAGs use predefined variables to configure Google Cloud resources like project IDs, storage buckets, clusters, and Dataproc settings. Sample configuration:
PROJECT_ID = "your_project_id"
REGION = "your_region"
CODE_BUCKET = "your_code_bucket_name"
RAW_BUCKET = "your_raw_bucket_name"
DATA_BUCKET = "your_data_bucket_name"
PHS_CLUSTER = "your_cluster_name"
METASTORE_CLUSTER = "your_metastore_cluster_name"
The DAGs execute PySpark jobs on Google Dataproc clusters. Dependencies like Delta Lake JAR files are stored in Google Cloud Storage for ease of access:
PYTHON_FILE_LOCATION = f"gs://{CODE_BUCKET}/dist/sme_main.py"
SPARK_DELTA_JAR_FILE = f"gs://{CODE_BUCKET}/dependencies/delta-core_2.13-2.1.0.jar"
Defines the execution environment, specifying settings for Dataproc clusters, Google Cloud Metastore, and Spark History Server integration:
ENVIRONMENT_CONFIG = {
"execution_config": {"subnetwork_uri": "default"},
"peripherals_config": {
"metastore_service": METASTORE_SERVICE_LOCATION,
"spark_history_server_config": {
"dataproc_cluster": PHS_CLUSTER_PATH,
},
},
}
Ensures DAGs follow a consistent configuration for retries, start dates, and project and region settings.
default_args = {
"start_date": days_ago(1),
"project_id": PROJECT_ID,
"region": REGION,
"retries": 0,
}
Each DAG (sme_bond_info
, sme_collaterals
, and sme_deal_details
) includes similar stages:
The tasks are organized into TaskGroups to streamline the DAG and enhance readability. For instance, the SME Amortization DAG has a group dedicated to each stage of data processing.
with TaskGroup(group_id=f"{dl_code}_amortisation") as tg:
profile_task = DataprocCreateBatchOperator(
task_id=f"profile_{dl_code}",
batch={...}
)
bronze_task = DataprocCreateBatchOperator(
task_id=f"bronze_{dl_code}",
batch={...}
)
(profile_task >> bronze_task)
The DAGs rely on the DataprocCreateBatchOperator for processing data with PySpark. Here, profile_task
and bronze_task
are examples of profiling and data transformation:
profile_task = DataprocCreateBatchOperator(
task_id=f"profile_{dl_code}",
batch={
"pyspark_batch": {
"main_python_file_uri": PYTHON_FILE_LOCATION,
"jar_file_uris": [SPARK_DELTA_JAR_FILE],
"args": [
f"--project={PROJECT_ID}",
f"--raw-bucketname={RAW_BUCKET}",
f"--data-bucketname={DATA_BUCKET}",
f"--source-prefix=esmasecreg_data/downloaded-data/SME/{dl_code}",
"--file-key=Amortization",
],
}
}
)
The DataprocDeleteBatchOperator
ensures each batch job’s resources are cleared after processing, preventing unnecessary data storage or resource use:
delete_profile = DataprocDeleteBatchOperator(
task_id=f"delete_profile_{dl_code}",
project_id=PROJECT_ID,
region=REGION,
batch_id=f"{dl_code.lower()}-amortisation-profile",
)
DAGs implement error handling by logging and halting on issues such as missing parameters:
if ingestion_date is None:
logging.error("No ingestion date set. DAG stopped!!")
sys.exit(1)
-
Prefix Retrieval: The DAGs retrieve prefixes of raw data files in Google Cloud Storage using
get_raw_prefixes
. - Task Creation: For each raw dataset, tasks are created to run the Profile and Bronze stages.
-
Task Dependencies: Defined using
>>
, ensuring that stages follow the proper order. - Clean-Up: Batch jobs are deleted post-processing, preventing resource bloat.
The sme_deal_details
DAG specifically handles SME deal data transformations with Bronze and Silver stages, followed by clean-up of intermediate data in each stage. It also includes XCom Cleanup tasks to remove unused data and optimize Airflow's storage.
- Data Retrieval: Retrieves raw prefixes for processing each deal code.
- Processing Stages: Includes Bronze (raw data to initial transformation) and Silver (further refinement) layers.
- Cleanup: Deletes temporary data after each DAG run, controlled by on-success and on-failure triggers.
The Profile Stage processes and prepares raw data for further transformations. This is typically the first stage after retrieving data prefixes.
profile_task = DataprocCreateBatchOperator(
task_id=f"profile_{dl_code}",
batch={...}
)
The Bronze Stage stores cleaned data from the Profile stage in the target storage bucket.
bronze_task = DataprocCreateBatchOperator(
task_id=f"bronze_{dl_code}",
batch={...}
)
If configured, the Silver Stage takes data from the Bronze layer, performs further transformations, and stores higher-quality data. Although Silver Stage processing is currently commented out, it can be implemented as required.