Skip to content

Commit

Permalink
Merge pull request #113 from MITLibraries/HRQB-27-runtime-assertions
Browse files Browse the repository at this point in the history
HRQB 27 - establish integrity checks pattern and Employees check
  • Loading branch information
ghukill authored Jul 23, 2024
2 parents 18bc79c + cd0a068 commit c15c221
Show file tree
Hide file tree
Showing 7 changed files with 244 additions and 21 deletions.
75 changes: 62 additions & 13 deletions hrqb/base/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,17 @@
import logging
import os
from abc import abstractmethod
from collections.abc import Iterator
from collections.abc import Callable, Iterator
from typing import Literal

import luigi # type: ignore[import-untyped]
import numpy as np
import pandas as pd
import sentry_sdk

from hrqb.base import PandasPickleTarget, QuickbaseTableTarget
from hrqb.config import Config
from hrqb.exceptions import IntegrityCheckError
from hrqb.utils.data_warehouse import DWClient
from hrqb.utils.quickbase import QBClient

Expand Down Expand Up @@ -105,6 +107,32 @@ def named_inputs(self) -> dict[str, luigi.Target]:
for task, target in list(zip(self.deps(), self.input(), strict=True))
}

@classmethod
def integrity_check(cls, func: Callable) -> Callable:
"""Decorator used to register integrity check methods from task classes."""
if not hasattr(cls, "_integrity_checks"):
cls._integrity_checks = set()
cls._integrity_checks.add(func.__name__)
return func

def run_integrity_checks( # type: ignore[no-untyped-def]
self,
*args, # noqa: ANN002
**kwargs, # noqa: ANN003
) -> None:
"""Run all registered integrity check methods."""
for check_name in getattr(self, "_integrity_checks", []):
if check_func := getattr(self, check_name, None):
try:
message = f"Running integrity check: {self.name}.{check_name}"
logger.info(message)
check_func(*args, **kwargs)
except Exception as exc:
message = f"Task '{self.name}' failed integrity check: '{exc}'"
integrity_exception = IntegrityCheckError(message)
sentry_sdk.capture_exception(integrity_exception)
raise integrity_exception from exc


@HRQBTask.event_handler(luigi.Event.SUCCESS)
def task_success_handler(task: HRQBTask) -> None:
Expand Down Expand Up @@ -133,8 +161,13 @@ def get_dataframe(self) -> pd.DataFrame:
"""

def run(self) -> None:
"""Write dataframe prepared by self.get_dataframe as Task Target output."""
self.target.write(self.get_dataframe())
"""Write dataframe prepared by self.get_dataframe as Task Target output.
PandasPickleTasks pass the target data that will be written to integrity checks.
"""
output_df = self.get_dataframe()
self.run_integrity_checks(output_df)
self.target.write(output_df)


class SQLQueryExtractTask(PandasPickleTask):
Expand Down Expand Up @@ -223,6 +256,17 @@ def get_records(self) -> list[dict]:
records_df = self.named_inputs[self.input_task_to_load].read()
else:
records_df = self.single_input_dataframe

if (
self.merge_field
and len(records_df[records_df.duplicated(self.merge_field)]) > 0
):
message = (
f"Merge field '{self.merge_field}' found to have duplicate "
f"values for task '{self.name}'"
)
raise ValueError(message)

records_df = self._normalize_records_for_upsert(records_df)
return records_df.to_dict(orient="records")

Expand All @@ -234,6 +278,17 @@ def _normalize_records_for_upsert(self, records_df: pd.DataFrame) -> pd.DataFram
"""
return records_df.replace({np.nan: None})

def upsert_records(self, records: list[dict]) -> dict:
"""Perform Quickbase upsert given a list of record dictionaries."""
qbclient = QBClient()
table_id = qbclient.get_table_id(self.table_name)
upsert_payload = qbclient.prepare_upsert_payload(
table_id,
records,
merge_field=self.merge_field,
)
return qbclient.upsert_records(upsert_payload)

def run(self) -> None:
"""Retrieve data from parent Task and upsert to Quickbase table.
Expand All @@ -244,18 +299,12 @@ def run(self) -> None:
Partial successes are possible for Quickbase upserts. This method will log
warnings when detected in API response, but task will be considered complete and
ultimately successful.
QuickbaseUpsertTasks pass upsert results to integrity checks.
"""
records = self.get_records()

qbclient = QBClient()
table_id = qbclient.get_table_id(self.table_name)
upsert_payload = qbclient.prepare_upsert_payload(
table_id,
records,
merge_field=self.merge_field,
)
results = qbclient.upsert_records(upsert_payload)

results = self.upsert_records(records)
self.run_integrity_checks(results)
self.target.write(results)


Expand Down
10 changes: 5 additions & 5 deletions hrqb/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,9 @@ def _remove_sensitive_scope_variables(event: Event) -> Event:
object separately.
"""
new_event = copy.deepcopy(event)
if "exception" in new_event:
for exc_type in new_event["exception"]["values"]:
for stacktrace in exc_type.get("stacktrace", {}).get("frames", []):
for item in ["vars", "pre_context", "post_context"]:
stacktrace.pop(item, None)
for captured_exception in new_event.get("exception", {}).get("values", []):
for frame in captured_exception.get("stacktrace", {}).get("frames", []):
for item in ["vars", "pre_context", "post_context"]:
if item in frame:
frame.pop(item, None)
return new_event
4 changes: 4 additions & 0 deletions hrqb/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,7 @@

class QBFieldNotFoundError(ValueError):
pass


class IntegrityCheckError(Exception):
pass
8 changes: 8 additions & 0 deletions hrqb/tasks/employees.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pandas as pd

from hrqb.base.task import (
HRQBTask,
PandasPickleTask,
QuickbaseUpsertTask,
SQLQueryExtractTask,
Expand Down Expand Up @@ -79,6 +80,13 @@ def get_dataframe(self) -> pd.DataFrame:
}
return employees_df[fields.keys()].rename(columns=fields)

@HRQBTask.integrity_check
def check_unique_mit_ids(self, output_df: pd.DataFrame) -> None:
duplicate_mit_ids = len(output_df[output_df.duplicated("MIT ID", keep=False)])
if duplicate_mit_ids > 0:
message = f"Found {duplicate_mit_ids} duplicate 'MIT ID' column values"
raise ValueError(message)


class LoadEmployees(QuickbaseUpsertTask):
table_name = luigi.Parameter("Employees")
Expand Down
71 changes: 69 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# ruff: noqa: N802, N803, DTZ001
# ruff: noqa: N802, N803, DTZ001, PLR2004, TRY003, EM101, SLF001

import datetime
import json
Expand All @@ -14,7 +14,7 @@
from pandas import Timestamp

from hrqb.base import HRQBTask, QuickbaseTableTarget
from hrqb.base.task import PandasPickleTarget, QuickbaseUpsertTask
from hrqb.base.task import PandasPickleTarget, PandasPickleTask, QuickbaseUpsertTask
from hrqb.tasks.pipelines import UpdateLibHRData
from hrqb.utils.data_warehouse import DWClient
from hrqb.utils.quickbase import QBClient
Expand Down Expand Up @@ -927,3 +927,70 @@ def sensitive_scope_variable():
"note": "I am a dictionary with sensitive information",
"secret": "very-secret-abc123",
}


@pytest.fixture
def pandas_task_with_integrity_checks():
class PandasWithChecks(PandasPickleTask):
stage = luigi.Parameter("Transform")

def get_dataframe(self) -> pd.DataFrame:
return pd.DataFrame(
[0, 1, 2, 3, 4],
columns=["number"],
)

@HRQBTask.integrity_check
def expecting_five_items(self, output_df: pd.DataFrame):
if len(output_df) != 5:
raise ValueError("Expecting a dataframe with five rows")

@HRQBTask.integrity_check
def expecting_letter_column(self, output_df: pd.DataFrame):
if "letter" not in output_df.columns:
raise ValueError("Expecting a 'letter' column in dataframe")

return PandasWithChecks(pipeline="Checks")


@pytest.fixture
def upsert_task_with_integrity_checks(mocked_qb_api_upsert):
class UpsertWithChecks(QuickbaseUpsertTask):
stage = luigi.Parameter("Load")

def get_records(self) -> pd.DataFrame:
return None

def upsert_records(self, _records):
return mocked_qb_api_upsert

@HRQBTask.integrity_check
def expecting_three_processed_records(self, upsert_results: dict):
if upsert_results["metadata"]["totalNumberOfRecordsProcessed"] != 3:
raise ValueError("Expecting three processed records")

@HRQBTask.integrity_check
def expecting_zero_updated_records(self, upsert_results: dict):
if len(upsert_results["metadata"]["updatedRecordIds"]) != 0:
raise ValueError("Expecting zero updated records")

return UpsertWithChecks(pipeline="Checks")


@pytest.fixture
def upsert_task_with_duplicate_merge_field_values(mocked_qb_api_upsert):
class UpsertWithDuplicates(QuickbaseUpsertTask):
stage = luigi.Parameter("Load")

@property
def merge_field(self) -> str | None:
return "Key"

@property
def single_input_dataframe(self) -> pd.DataFrame:
return pd.DataFrame(
["abc123", "def456", "abc123"],
columns=["Key"],
)

return UpsertWithDuplicates(pipeline="Checks")
13 changes: 13 additions & 0 deletions tests/tasks/test_employees.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
import pandas as pd
import pytest


def test_extract_dw_employees_load_sql_query(task_extract_dw_employees):
assert task_extract_dw_employees.sql_file == "hrqb/tasks/sql/employees.sql"
assert task_extract_dw_employees.sql_query is not None
Expand All @@ -22,3 +26,12 @@ def test_transform_employees_normalize_state_names(

def test_task_load_employee_appointments_explicit_properties(task_load_employees):
assert task_load_employees.merge_field == "MIT ID"


def test_transform_employees_integrity_check_duplicate_mit_ids(task_transform_employees):
test_df = pd.DataFrame(
["55555", "55555", "12345", "67890"],
columns=["MIT ID"],
)
with pytest.raises(ValueError, match="Found 2 duplicate 'MIT ID' column values"):
task_transform_employees.check_unique_mit_ids(test_df)
84 changes: 83 additions & 1 deletion tests/test_base_task.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# ruff: noqa: PLR2004, PD901
# ruff: noqa: PLR2004, PD901, D205, D212, SLF001

import os
from unittest import mock
Expand All @@ -15,6 +15,7 @@
SQLQueryExtractTask,
)
from hrqb.config import Config
from hrqb.exceptions import IntegrityCheckError
from hrqb.utils.data_warehouse import DWClient
from hrqb.utils.luigi import run_pipeline
from tests.fixtures.tasks.extract import ExtractAnimalNames
Expand Down Expand Up @@ -331,3 +332,84 @@ def test_base_pipeline_task_aggregate_upsert_results_upsert_with_errors_noted(
},
"qb_upsert_errors": True,
}


def test_base_task_integrity_check_decorator_adds_check_names_to_registered_checks(
pandas_task_with_integrity_checks,
):
assert pandas_task_with_integrity_checks._integrity_checks.issuperset(
{
"expecting_five_items",
"expecting_letter_column",
}
)


def test_base_task_run_integrity_checks_invokes_present_and_skips_absent_checks(
pandas_task_with_integrity_checks,
):
"""
This test simulates another task registering an integrity check that this task does
not have. The task's defined integrity checks are invoked, while this other one is
quietly skipped.
"""
pandas_task_with_integrity_checks._integrity_checks.add("i_do_not_have")

check_one = mock.MagicMock()
check_two = mock.MagicMock()
pandas_task_with_integrity_checks.expecting_five_items = check_one
pandas_task_with_integrity_checks.expecting_letter_column = check_two

output_df = pandas_task_with_integrity_checks.get_dataframe()
pandas_task_with_integrity_checks.run_integrity_checks(output_df)

check_one.assert_called()
check_two.assert_called()


def test_quickbase_upsert_task_integrity_checks_get_upsert_results(
upsert_task_with_integrity_checks, mocked_qb_api_upsert
):
check_one = mock.MagicMock()
check_two = mock.MagicMock()
upsert_task_with_integrity_checks.expecting_three_processed_records = check_one
upsert_task_with_integrity_checks.expecting_zero_updated_records = check_two

upsert_task_with_integrity_checks.run()

check_arg = check_one.call_args[0][0]
assert isinstance(check_arg, dict)
assert check_arg == mocked_qb_api_upsert


def test_failed_integrity_checks_raise_custom_exception(
pandas_task_with_integrity_checks, upsert_task_with_integrity_checks
):
# pandas task
with pytest.raises(
IntegrityCheckError, match="Expecting a 'letter' column in dataframe"
):
pandas_task_with_integrity_checks.run()

# upsert task
with pytest.raises(IntegrityCheckError, match="Expecting zero updated records"):
upsert_task_with_integrity_checks.run()


def test_task_without_integrity_checks_run_without_error(
task_extract_animal_names, mocker
):
spy_run_checks = mocker.spy(task_extract_animal_names, "run_integrity_checks")
task_extract_animal_names.run()
spy_run_checks.assert_called()


def test_upsert_task_duplicate_merge_field_values_raises_error(
upsert_task_with_duplicate_merge_field_values,
):
with pytest.raises(
ValueError,
match="Merge field 'Key' found to have duplicate values for task "
"'UpsertWithDuplicates'",
):
upsert_task_with_duplicate_merge_field_values.get_records()

0 comments on commit c15c221

Please sign in to comment.