Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HRQB 29 - Improve Quickbase upsert reporting #100

Merged
merged 2 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 49 additions & 42 deletions hrqb/base/task.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
"""hrqb.base.task"""

import json
import logging
import os
from abc import abstractmethod
from collections import defaultdict
from collections.abc import Iterator
from typing import Literal

Expand All @@ -19,6 +17,9 @@

logger = logging.getLogger(__name__)

successful_tasks = []
successful_upsert_tasks = []


class HRQBTask(luigi.Task):
"""Base Task class for all HRQB Tasks."""
Expand All @@ -27,6 +28,10 @@ class HRQBTask(luigi.Task):
stage: Literal["Extract", "Transform", "Load"] = luigi.Parameter()
table_name = luigi.OptionalStrParameter(default=None)

@property
def name(self) -> str:
return self.__class__.__name__

@property
def path(self) -> str:
"""Dynamically generate path for HRQBTask Targets.
Expand All @@ -44,9 +49,7 @@ def path(self) -> str:
likely) contain a single underscore.
"""
filename = (
"__".join( # noqa: FLY002
[self.pipeline, self.stage, self.__class__.__name__]
)
"__".join([self.pipeline, self.stage, self.name]) # noqa: FLY002
+ self.filename_extension
)
return os.path.join(Config().targets_directory(), filename)
Expand Down Expand Up @@ -98,11 +101,16 @@ def named_inputs(self) -> dict[str, luigi.Target]:
access a specific parent Task's output.
"""
return {
task.__class__.__name__: target
task.name: target
for task, target in list(zip(self.deps(), self.input(), strict=True))
}


@HRQBTask.event_handler(luigi.Event.SUCCESS)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an example of the Luigi Events/Callbacks. They allow registering on a particular task class, and then subscribing to particular events, in this case a successful run.

def task_success_handler(task: HRQBTask) -> None:
successful_tasks.append(task)


class PandasPickleTask(HRQBTask):
"""Base Task class for Tasks that write a pickled pandas DataFrame."""

Expand Down Expand Up @@ -205,6 +213,13 @@ def target(self) -> QuickbaseTableTarget:
table_name=self.table_name,
)

@property
def parse_upsert_counts(self) -> dict | None:
"""Parse results of upsert via QBClient method, if target data exists from run."""
if self.target.exists():
return QBClient.parse_upsert_results(self.target.read())
return None

def get_records(self) -> list[dict]:
"""Get Records data that will be upserted to Quickbase.

Expand Down Expand Up @@ -248,39 +263,12 @@ def run(self) -> None:
)
results = qbclient.upsert_records(upsert_payload)

self.parse_and_log_upsert_results(results)
self.parse_and_log_upsert_errors(results)

self.target.write(results)

def parse_and_log_upsert_results(self, api_response: dict) -> None:
"""Parse Quickbase upsert response and log counts of records modified."""
record_counts = QBClient.parse_upsert_results(api_response)
if not record_counts:
return # pragma: nocover
for key in ["created", "updated", "unchanged"]:
record_counts[key] = len(record_counts[key])
message = f"Upsert results: {record_counts}"
logger.info(message)

def parse_and_log_upsert_errors(self, api_response: dict) -> None:
"""Parse Quickbase upsert response and log any errors.

Errors are returned for each record upserted, for each field with issues. This is
an unnecessary level of grain for logging, so the field error types are counted
across all records and logged. This gives a high level overview of fields that
are failing, and how often, where further debugging would involve looking at the
response directly in the task output.
"""
if api_errors := api_response.get("metadata", {}).get("lineErrors"):
api_error_counts: dict[str, int] = defaultdict(int)
for errors in api_errors.values():
for error in errors:
api_error_counts[error] += 1
message = "Quickbase API call completed but had errors: " + json.dumps(
api_error_counts
)
logger.warning(message)

@QuickbaseUpsertTask.event_handler(luigi.Event.SUCCESS)
def upsert_task_success_handler(task: QuickbaseUpsertTask) -> None:
successful_upsert_tasks.append(task)


class HRQBPipelineTask(luigi.WrapperTask):
Expand All @@ -303,9 +291,13 @@ def requires(self):

parent_pipeline_name = luigi.OptionalStrParameter(default=None, significant=False)

@property
def name(self) -> str:
return self.__class__.__name__

@property
def pipeline_name(self) -> str:
output = self.__class__.__name__
output = self.name
if self.parent_pipeline_name:
output = f"{self.parent_pipeline_name}__{output}"
return output
Expand All @@ -322,15 +314,16 @@ def init_task_from_class_path(
return task_class(**pipeline_parameters or {})

def pipeline_tasks_iter(
self, task: luigi.Task | None = None, level: int = 0
self,
task: HRQBTask | None = None,
level: int = 0,
) -> Iterator[tuple[int, luigi.Task]]:
"""Yield all Tasks that are part of the dependency chain for this Task.

This method begins with the Pipeline Task itself, then recursively discovers and
yields parent Tasks as they are required.
"""
if task is None:
task = self
task = task or self
yield level, task
for parent_task in task.requires():
yield from self.pipeline_tasks_iter(task=parent_task, level=level + 1)
Expand All @@ -352,7 +345,7 @@ def get_task(self, task_class_or_name: str | type) -> luigi.Task | None:
else task_class_or_name
)
for _, task in self.pipeline_tasks_iter():
if task.__class__.__name__ == task_class_name:
if task.name == task_class_name:
return task
return None

Expand All @@ -374,3 +367,17 @@ def remove_pipeline_targets(self) -> None:
target.remove()
message = f"Target {target} successfully removed"
logger.debug(message)

def aggregate_upsert_results(self) -> dict | None:
"""Aggregate upsert results for Load tasks from pipeline run."""
if not successful_upsert_tasks:
return None
results = {"tasks": {}, "qb_upsert_errors": False}
for task in successful_upsert_tasks:
result = None
if task.target.exists():
result = QBClient.parse_upsert_results(task.target.read())
if result and result.get("errors") is not None:
results["qb_upsert_errors"] = True
results["tasks"][task.name] = result # type: ignore[index]
return results
17 changes: 12 additions & 5 deletions hrqb/utils/luigi.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
"""hrqb.utils.luigi"""

import json
import logging

import luigi # type: ignore[import-untyped]
from luigi.execution_summary import LuigiRunResult # type: ignore[import-untyped]

from hrqb.base.task import HRQBPipelineTask
from hrqb.config import Config

logger = logging.getLogger(__name__)


def run_task(task: luigi.Task) -> LuigiRunResult:
"""Function to run any luigi Task type via luigi runner."""
Expand All @@ -17,11 +22,13 @@ def run_task(task: luigi.Task) -> LuigiRunResult:
)


def run_pipeline(pipeline_task: luigi.WrapperTask) -> LuigiRunResult:
def run_pipeline(pipeline_task: HRQBPipelineTask) -> LuigiRunResult:
"""Function to run a HRQBPipelineTask."""
if not isinstance(pipeline_task, HRQBPipelineTask):
message = (
f"{pipeline_task.__class__.__name__} is not a HRQBPipelineTask type task"
)
message = f"{pipeline_task.name} is not a HRQBPipelineTask type task"
raise TypeError(message)
return run_task(pipeline_task)
results = run_task(pipeline_task)
if upsert_results := pipeline_task.aggregate_upsert_results():
message = f"Upsert results: {json.dumps(upsert_results)}"
logger.info(message)
return results
22 changes: 17 additions & 5 deletions hrqb/utils/quickbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import json
import logging
from collections import defaultdict
from collections.abc import Callable, Iterator

import pandas as pd
Expand Down Expand Up @@ -142,17 +143,28 @@ def upsert_records(self, upsert_payload: dict) -> dict:

@staticmethod
def parse_upsert_results(api_response: dict) -> dict | None:
"""Parse Record IDs and counts from API response from upsert."""
"""Parse counts of modified records and errors from upsert."""
metadata = api_response.get("metadata")
if not metadata:
return None
return {

results = {
"processed": metadata.get("totalNumberOfRecordsProcessed", 0),
"created": metadata.get("createdRecordIds", []),
"updated": metadata.get("updatedRecordIds", []),
"unchanged": metadata.get("unchangedRecordIds", []),
"created": len(metadata.get("createdRecordIds", [])),
"updated": len(metadata.get("updatedRecordIds", [])),
"unchanged": len(metadata.get("unchangedRecordIds", [])),
"errors": None,
}

if api_errors := metadata.get("lineErrors"):
api_error_counts: dict[str, int] = defaultdict(int)
for errors in api_errors.values():
for error in errors:
api_error_counts[error] += 1
results["errors"] = api_error_counts

return results

def prepare_upsert_payload(
self,
table_id: str,
Expand Down
7 changes: 5 additions & 2 deletions tests/fixtures/tasks/load.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import json

import luigi

from hrqb.base import QuickbaseUpsertTask
Expand All @@ -20,9 +22,10 @@ def requires(self):
return [PrepareAnimals(pipeline=self.pipeline)]

def run(self):
"""Override default method to print data instead of upsert to Quickbase."""
"""Override default method to print input data and simulate successful upsert."""
print(self.single_input_dataframe) # noqa: T201
self.target.write({"note": "data printed to console"})
with open("tests/fixtures/qb_api_responses/upsert.json") as f:
self.target.write(json.load(f))


class LoadTaskMultipleRequired(QuickbaseUpsertTask):
Expand Down
85 changes: 83 additions & 2 deletions tests/test_base_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@
)
from hrqb.config import Config
from hrqb.utils.data_warehouse import DWClient
from hrqb.utils.luigi import run_pipeline
from tests.fixtures.tasks.extract import ExtractAnimalNames
from tests.fixtures.tasks.load import LoadTaskMultipleRequired


def test_base_task_name(task_extract_animal_names):
assert task_extract_animal_names.name == "ExtractAnimalNames"


def test_base_task_required_parameter_pipeline(pipeline_name):
with pytest.raises(
luigi.parameter.MissingParameterException,
Expand Down Expand Up @@ -134,7 +139,15 @@ def test_quickbase_task_run_upsert_and_json_receipt_output_target_success(
task_transform_animals_target, task_load_animals
):
"""Mocks upsert to Quickbase, asserting mocked response is written as Target data"""
mocked_qb_upsert_receipt = {"message": "upserted to Animals QB Table"}
mocked_qb_upsert_receipt = {
"data": [],
"metadata": {
"createdRecordIds": [11, 12],
"totalNumberOfRecordsProcessed": 2,
"unchangedRecordIds": [],
"updatedRecordIds": [],
},
}

with mock.patch("hrqb.base.task.QBClient", autospec=True) as mock_qbclient_class:
mock_qbclient = mock_qbclient_class()
Expand Down Expand Up @@ -172,7 +185,7 @@ def test_quickbase_task_run_upsert_and_json_receipt_output_target_api_errors_log

task_load_animals.run()

assert "Quickbase API call completed but had errors" in caplog.text
assert "errors" in task_load_animals.parse_upsert_counts


def test_base_pipeline_name(task_pipeline_animals):
Expand Down Expand Up @@ -275,3 +288,71 @@ def test_quickbase_task_input_task_to_load_property_used(
assert task.input_task_to_load == "ExtractAnimalColors"
input_dict = task.get_records()
assert pd.DataFrame(input_dict).equals(task_extract_animal_colors_target.read())


def test_base_pipeline_task_aggregate_upsert_results_one_success_returns_dict(
task_pipeline_animals_debug,
):
run_pipeline(task_pipeline_animals_debug)

assert task_pipeline_animals_debug.aggregate_upsert_results() == {
"tasks": {
"LoadAnimalsDebug": {
"processed": 3,
"created": 2,
"updated": 1,
"unchanged": 0,
"errors": None,
}
},
"qb_upsert_errors": False,
}


def test_base_pipeline_task_aggregate_upsert_results_failed_load_returns_none_value(
task_pipeline_animals_debug,
):

# mock run() method of LoadAnimalsDebug to throw exception
load_task = task_pipeline_animals_debug.get_task("LoadAnimalsDebug")
with mock.patch.object(load_task, "run") as mocked_run:
mocked_run.side_effect = Exception("UPSERT FAILED!")
run_pipeline(task_pipeline_animals_debug)

assert task_pipeline_animals_debug.aggregate_upsert_results() == {
"qb_upsert_errors": False,
"tasks": {"LoadAnimalsDebug": None},
}


def test_base_pipeline_task_aggregate_upsert_results_upsert_with_errors_noted(
task_pipeline_animals_debug,
):
run_pipeline(task_pipeline_animals_debug)

# manually modify output of load task to simulate upsert errors
load_task = task_pipeline_animals_debug.get_task("LoadAnimalsDebug")
new_target = load_task.target.read()
new_target["metadata"]["lineErrors"] = {
"2": ['Incompatible value for field with ID "6".'],
"4": ['Incompatible value for field with ID "6".'],
"5": ["Weird Error", "Another Weird Error"],
}
load_task.target.write(new_target)

assert task_pipeline_animals_debug.aggregate_upsert_results() == {
"tasks": {
"LoadAnimalsDebug": {
"processed": 3,
"created": 2,
"updated": 1,
"unchanged": 0,
"errors": {
'Incompatible value for field with ID "6".': 2,
"Weird Error": 1,
"Another Weird Error": 1,
},
}
},
"qb_upsert_errors": True,
}
Loading
Loading