Skip to content

Commit

Permalink
Merge pull request #30 from MITLibraries/HRQB-18-connection-tests
Browse files Browse the repository at this point in the history
Update main from HRQB-17 and HRQB-18
  • Loading branch information
ghukill authored May 28, 2024
2 parents 2fbe3f0 + c5001bc commit dd711e8
Show file tree
Hide file tree
Showing 8 changed files with 251 additions and 44 deletions.
40 changes: 24 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,56 +67,64 @@ Options:
Usage: -c pipeline [OPTIONS] COMMAND [ARGS]...
Options:
-p, --pipeline TEXT Pipeline Task class name imported from
-p, --pipeline TEXT Pipeline Task class name to be imported from
configured pipeline module, e.g. 'MyPipeline'
[required]
-pm, --pipeline-module TEXT Module where Pipeline Task class is defined.
Default: 'hrqb.tasks.pipelines'.
--pipeline-parameters TEXT Comma separated list luigi Parameters to pass
to HRQBPipelineTask, e.g.
--pipeline-parameters TEXT Comma separated list of luigi Parameters to
pass to HRQBPipelineTask, e.g.
'Param1=foo,Param2=bar'.
-t, --task TEXT Select a target task for pipeline sub-commands
(e.g. remove-data, run, etc.)
-h, --help Show this message and exit.
Commands:
remove-data
run
status
remove-data Remove target data from pipeline tasks.
run Run a pipeline.
status Get status of a pipeline's tasks.
```
<br>


### `pipeline status`
```text
Usage: status [OPTIONS]
Usage: -c pipeline status [OPTIONS]
Return the status of a Pipeline.
Get status of a pipeline's tasks.
Options:
--help Show this message and exit.
-h, --help Show this message and exit.
```
<br>


### `pipeline remove-data`
```text
Usage: remove-data [OPTIONS]
Usage: -c pipeline remove-data [OPTIONS]
Remove target data from pipeline tasks.
Remove all Task output data (Targets) for a pipeline.
If argument --task is passed to parent 'pipeline' command, only this task
will have its target data removed.
Options:
--help Show this message and exit.
-h, --help Show this message and exit.
```
<br>


### `pipeline run`
```text
Usage: run [OPTIONS]
Usage: -c pipeline run [OPTIONS]
Run a pipeline.
Run a Pipeline.
If argument --task is passed to parent 'pipeline' command, only this task,
and the tasks it requires, will run.
Options:
--remove-data Pass to automatically removed Task artifacts after run.
--help Show this message and exit.
--cleanup Remove target data for all tasks in pipeline after run.
-h, --help Show this message and exit.
```

38 changes: 34 additions & 4 deletions hrqb/base/task.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
"""hrqb.base.task"""

import json
import logging
import os
from abc import abstractmethod
from collections import defaultdict
from collections.abc import Iterator
from typing import Literal

Expand Down Expand Up @@ -212,13 +214,29 @@ def run(self) -> None:
)
results = qbclient.upsert_records(upsert_payload)

# log warning, but consider task complete if some errors present in API response
if api_errors := results.get("metadata", {}).get("lineErrors"):
message = f"Quickbase API call completed but had errors: {api_errors}"
logger.warning(message)
self.parse_and_log_upsert_errors(results)

self.target.write(results)

def parse_and_log_upsert_errors(self, api_response: dict) -> None:
"""Parse Quickbase upsert response and log any errors.
Errors are returned for each record upserted, for each field with issues. This is
an unnecessary level of grain for logging, so the field error types are counted
across all records and logged. This gives a high level overview of fields that
are failing, and how often, where further debugging would involve looking at the
response directly in the task output.
"""
if api_errors := api_response.get("metadata", {}).get("lineErrors"):
api_error_counts: dict[str, int] = defaultdict(int)
for errors in api_errors.values():
for error in errors:
api_error_counts[error] += 1
message = "Quickbase API call completed but had errors: " + json.dumps(
api_error_counts
)
logger.warning(message)


class HRQBPipelineTask(luigi.WrapperTask):
"""Base class for Pipeline Tasks.
Expand Down Expand Up @@ -281,6 +299,18 @@ def pipeline_targets_iter(self) -> Iterator[tuple[int, luigi.Target]]:
if hasattr(task, "target"):
yield level, task.target

def get_task(self, task_class_or_name: str | type) -> luigi.Task | None:
"""Get an instantiated child Task, by name, from this parent Pipeline task."""
task_class_name = (
task_class_or_name.__name__
if isinstance(task_class_or_name, type)
else task_class_or_name
)
for _, task in self.pipeline_tasks_iter():
if task.__class__.__name__ == task_class_name:
return task
return None

def pipeline_as_ascii(self) -> str:
"""Return an ASCII representation of this Pipeline Task."""
output = ""
Expand Down
61 changes: 54 additions & 7 deletions hrqb/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@

from hrqb.base.task import HRQBPipelineTask
from hrqb.config import Config, configure_logger, configure_sentry
from hrqb.tasks.pipelines import run_pipeline
from hrqb.utils import click_argument_to_dict
from hrqb.utils.data_warehouse import DWClient
from hrqb.utils.luigi import run_pipeline, run_task
from hrqb.utils.quickbase import QBClient

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -95,57 +95,104 @@ def test_connections(ctx: click.Context) -> None:
help="Comma separated list of luigi Parameters to pass to HRQBPipelineTask, "
"e.g. 'Param1=foo,Param2=bar'.",
)
@click.option(
"-t",
"--task",
"target_task",
type=str,
required=False,
help="Select a target task for pipeline sub-commands (e.g. remove-data, run, etc.)",
)
@click.pass_context
def pipeline(
ctx: click.Context,
pipeline: str,
pipeline_module: str,
pipeline_parameters: dict,
target_task: str,
) -> None:
# load pipeline task
pipeline_task = HRQBPipelineTask.init_task_from_class_path(
pipeline,
task_class_module=pipeline_module,
pipeline_parameters=pipeline_parameters,
)
message = f"Successfully loaded pipeline: '{pipeline_module}.{pipeline}'"
ctx.obj["PIPELINE_TASK"] = pipeline_task
message = f"Successfully loaded pipeline: '{pipeline_module}.{pipeline}'"
logger.debug(message)

# load target pipeline task if present
pipeline_target_task = None
if target_task:
pipeline_target_task = pipeline_task.get_task(target_task)
if not pipeline_target_task:
message = f"Could not find target task: {target_task}"
logger.error(message)
ctx.exit(0)
message = f"Successfully loaded target task: {pipeline_target_task}"
logger.info(message)
ctx.obj["PIPELINE_TARGET_TASK"] = pipeline_target_task


main.add_command(pipeline)


@pipeline.command()
@click.pass_context
def status(ctx: click.Context) -> None:
"""Get status of a pipeline's tasks."""
pipeline_task = ctx.obj["PIPELINE_TASK"]
logger.info(pipeline_task.pipeline_as_ascii())


@pipeline.command()
@click.pass_context
def remove_data(ctx: click.Context) -> None:
"""Remove target data from pipeline tasks.
If argument --task is passed to parent 'pipeline' command, only this task will have
its target data removed.
"""
pipeline_task = ctx.obj["PIPELINE_TASK"]
logger.warning("Removing all Pipeline Tasks Targets (data).")
logger.info(pipeline_task.remove_pipeline_targets())
logger.info("Successfully removed pipeline artifacts.")
pipeline_target_task = ctx.obj["PIPELINE_TARGET_TASK"]

if pipeline_target_task:
pipeline_target_task.target.remove()
message = f"Target {pipeline_target_task.target} successfully removed"
logger.debug(message)
else:
pipeline_task.remove_pipeline_targets()

logger.info("Successfully removed target data(s).")


@pipeline.command()
@click.option(
"--cleanup",
is_flag=True,
help="Pass to automatically removed Task artifacts after run.",
help="Remove target data for all tasks in pipeline after run.",
)
@click.pass_context
def run(
ctx: click.Context,
cleanup: bool, # noqa: FBT001
) -> None:
"""Run a pipeline.
If argument --task is passed to parent 'pipeline' command, only this task, and the
tasks it requires, will run.
"""
pipeline_task = ctx.obj["PIPELINE_TASK"]
run_results = run_pipeline(pipeline_task)
pipeline_target_task = ctx.obj["PIPELINE_TARGET_TASK"]

if pipeline_target_task:
run_results = run_task(pipeline_target_task)
else:
run_results = run_pipeline(pipeline_task)

message = f"Pipeline run result: {run_results.status.name}"
logger.info(message)
logger.info(pipeline_task.pipeline_as_ascii())

if cleanup:
ctx.invoke(remove_data)
15 changes: 1 addition & 14 deletions hrqb/tasks/pipelines.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,3 @@
"""hrqb.tasks.pipelines"""

import luigi # type: ignore[import-untyped]
from luigi.execution_summary import LuigiRunResult # type: ignore[import-untyped]

from hrqb.config import Config


def run_pipeline(pipeline_task: luigi.WrapperTask) -> LuigiRunResult:
"""Function to run a HRQBPipelineTask via luigi runner."""
return luigi.build(
[pipeline_task],
local_scheduler=True,
detailed_summary=True,
workers=Config().LUIGI_NUM_WORKERS or 1,
)
# WIP: HRQBPipelineTasks will be defined here
27 changes: 27 additions & 0 deletions hrqb/utils/luigi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""hrqb.utils.luigi"""

import luigi # type: ignore[import-untyped]
from luigi.execution_summary import LuigiRunResult # type: ignore[import-untyped]

from hrqb.base.task import HRQBPipelineTask
from hrqb.config import Config


def run_task(task: luigi.Task) -> LuigiRunResult:
"""Function to run any luigi Task type via luigi runner."""
return luigi.build(
[task],
local_scheduler=True,
detailed_summary=True,
workers=Config().LUIGI_NUM_WORKERS or 1,
)


def run_pipeline(pipeline_task: luigi.WrapperTask) -> LuigiRunResult:
"""Function to run a HRQBPipelineTask."""
if not isinstance(pipeline_task, HRQBPipelineTask):
message = (
f"{pipeline_task.__class__.__name__} is not a HRQBPipelineTask type task"
)
raise TypeError(message)
return run_task(pipeline_task)
16 changes: 16 additions & 0 deletions tests/test_base_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@
from hrqb.base import (
HRQBTask,
PandasPickleTarget,
PandasPickleTask,
QuickbaseTableTarget,
SQLQueryExtractTask,
)
from hrqb.config import Config
from hrqb.utils.data_warehouse import DWClient
from tests.fixtures.tasks.extract import ExtractAnimalNames


def test_base_task_required_parameter_pipeline(pipeline_name):
Expand Down Expand Up @@ -249,3 +251,17 @@ def test_base_sql_task_run_writes_pickled_dataframe(task_sql_extract_animal_name
task_sql_extract_animal_names.target.read()
)
assert task_sql_extract_animal_names.complete()


def test_base_pipeline_task_get_task_str_success(task_pipeline_animals):
task = task_pipeline_animals.get_task("ExtractAnimalNames")
assert isinstance(task, PandasPickleTask)


def test_base_pipeline_task_get_task_class_success(task_pipeline_animals):
task = task_pipeline_animals.get_task(ExtractAnimalNames)
assert isinstance(task, PandasPickleTask)


def test_base_pipeline_task_get_task_not_found_return_none(task_pipeline_animals):
assert not task_pipeline_animals.get_task("BadTask")
Loading

0 comments on commit dd711e8

Please sign in to comment.