Skip to content

Airflow Integration DAG

Lorenzo Muscillo edited this page Nov 11, 2024 · 2 revisions

Data Processing DAGs Overview

This guide details the setup, components, and execution flow of the SME Amortization, SME Asset, and SME Deal Details Directed Acyclic Graphs (DAGs). These DAGs are used to process, transform, and manage datasets associated with SME financial records. Each DAG is built with Apache Airflow to automate workflow management, leveraging Google Cloud Dataproc and Google Cloud Storage for data processing and storage, respectively.

Purpose of the DAGs

The SME Amortization, SME Asset, and SME Deal Details DAGs provide structured data pipelines for transforming raw SME data into refined datasets across various stages (Profile, Bronze, Silver). These stages ensure high-quality, structured, and query-ready data, primarily managed in Google Cloud. Key transformations include profiling data, managing dependencies, handling clean-up, and optimizing for storage.

Key Components

1. Variables and Constants

The DAGs use predefined variables to configure Google Cloud resources like project IDs, storage buckets, clusters, and Dataproc settings. Sample configuration:

PROJECT_ID = "your_project_id"
REGION = "your_region"
CODE_BUCKET = "your_code_bucket_name"
RAW_BUCKET = "your_raw_bucket_name"
DATA_BUCKET = "your_data_bucket_name"
PHS_CLUSTER = "your_cluster_name"
METASTORE_CLUSTER = "your_metastore_cluster_name"

2. Python and JAR Files

The DAGs execute PySpark jobs on Google Dataproc clusters. Dependencies like Delta Lake JAR files are stored in Google Cloud Storage for ease of access:

PYTHON_FILE_LOCATION = f"gs://{CODE_BUCKET}/dist/sme_main.py"
SPARK_DELTA_JAR_FILE = f"gs://{CODE_BUCKET}/dependencies/delta-core_2.13-2.1.0.jar"

3. Environment Configuration

Defines the execution environment, specifying settings for Dataproc clusters, Google Cloud Metastore, and Spark History Server integration:

ENVIRONMENT_CONFIG = {
    "execution_config": {"subnetwork_uri": "default"},
    "peripherals_config": {
        "metastore_service": METASTORE_SERVICE_LOCATION,
        "spark_history_server_config": {
            "dataproc_cluster": PHS_CLUSTER_PATH,
        },
    },
}

4. Default Arguments

Ensures DAGs follow a consistent configuration for retries, start dates, and project and region settings.

default_args = {
    "start_date": days_ago(1),
    "project_id": PROJECT_ID,
    "region": REGION,
    "retries": 0,
}

DAG Breakdown

Each DAG (sme_bond_info, sme_collaterals, and sme_deal_details) includes similar stages:

1. Task Group Structure

The tasks are organized into TaskGroups to streamline the DAG and enhance readability. For instance, the SME Amortization DAG has a group dedicated to each stage of data processing.

with TaskGroup(group_id=f"{dl_code}_amortisation") as tg:
    profile_task = DataprocCreateBatchOperator(
        task_id=f"profile_{dl_code}",
        batch={...}
    )
    bronze_task = DataprocCreateBatchOperator(
        task_id=f"bronze_{dl_code}",
        batch={...}
    )
    (profile_task >> bronze_task)

2. Dataproc Batch Operations

The DAGs rely on the DataprocCreateBatchOperator for processing data with PySpark. Here, profile_task and bronze_task are examples of profiling and data transformation:

profile_task = DataprocCreateBatchOperator(
    task_id=f"profile_{dl_code}",
    batch={
        "pyspark_batch": {
            "main_python_file_uri": PYTHON_FILE_LOCATION,
            "jar_file_uris": [SPARK_DELTA_JAR_FILE],
            "args": [
                f"--project={PROJECT_ID}",
                f"--raw-bucketname={RAW_BUCKET}",
                f"--data-bucketname={DATA_BUCKET}",
                f"--source-prefix=esmasecreg_data/downloaded-data/SME/{dl_code}",
                "--file-key=Amortization",
            ],
        }
    }
)

3. Cleanup Tasks

The DataprocDeleteBatchOperator ensures each batch job’s resources are cleared after processing, preventing unnecessary data storage or resource use:

delete_profile = DataprocDeleteBatchOperator(
    task_id=f"delete_profile_{dl_code}",
    project_id=PROJECT_ID,
    region=REGION,
    batch_id=f"{dl_code.lower()}-amortisation-profile",
)

4. Error Handling and Logging

DAGs implement error handling by logging and halting on issues such as missing parameters:

if ingestion_date is None:
    logging.error("No ingestion date set. DAG stopped!!")
    sys.exit(1)

Execution Flow

  1. Prefix Retrieval: The DAGs retrieve prefixes of raw data files in Google Cloud Storage using get_raw_prefixes.
  2. Task Creation: For each raw dataset, tasks are created to run the Profile and Bronze stages.
  3. Task Dependencies: Defined using >>, ensuring that stages follow the proper order.
  4. Clean-Up: Batch jobs are deleted post-processing, preventing resource bloat.

SME Deal Details DAG

The sme_deal_details DAG specifically handles SME deal data transformations with Bronze and Silver stages, followed by clean-up of intermediate data in each stage. It also includes XCom Cleanup tasks to remove unused data and optimize Airflow's storage.

  • Data Retrieval: Retrieves raw prefixes for processing each deal code.
  • Processing Stages: Includes Bronze (raw data to initial transformation) and Silver (further refinement) layers.
  • Cleanup: Deletes temporary data after each DAG run, controlled by on-success and on-failure triggers.

Stages in Detail

1. Profile Stage

The Profile Stage processes and prepares raw data for further transformations. This is typically the first stage after retrieving data prefixes.

profile_task = DataprocCreateBatchOperator(
    task_id=f"profile_{dl_code}",
    batch={...}
)

2. Bronze Stage

The Bronze Stage stores cleaned data from the Profile stage in the target storage bucket.

bronze_task = DataprocCreateBatchOperator(
    task_id=f"bronze_{dl_code}",
    batch={...}
)

3. Silver Stage

If configured, the Silver Stage takes data from the Bronze layer, performs further transformations, and stores higher-quality data. Although Silver Stage processing is currently commented out, it can be implemented as required.