Skip to content

Commit

Permalink
Merge pull request #7 from MITLibraries/HRQB-10-scaffold-task-and-tar…
Browse files Browse the repository at this point in the history
…get-classes

Hrqb 10 scaffold task and target classes
  • Loading branch information
ghukill authored May 3, 2024
2 parents 03b6db3 + c4216ba commit 623bc68
Show file tree
Hide file tree
Showing 23 changed files with 700 additions and 105 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
SHELL=/bin/bash
DATETIME:=$(shell date -u +%Y%m%dT%H%M%SZ)
export LUIGI_CONFIG_PATH=hrqb/luigi.cfg

help: # preview Makefile commands
@awk 'BEGIN { FS = ":.*#"; print "Usage: make <target>\n\nTargets:" } \
Expand Down Expand Up @@ -44,8 +45,7 @@ safety: # check for security vulnerabilities and verify Pipfile.lock is up-to-da
pipenv check
pipenv verify

lint-apply: # apply changes with 'black' and resolve 'fixable errors' with 'ruff'
black-apply ruff-apply
lint-apply: black-apply ruff-apply # apply changes with 'black' and resolve 'fixable errors' with 'ruff'

black-apply: # apply changes with 'black'
pipenv run black .
Expand Down
2 changes: 2 additions & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ click = "*"
sentry-sdk = "*"
oracledb = "*"
luigi = "*"
pandas = "*"
pandas-stubs = "*"

[dev-packages]
black = "*"
Expand Down
276 changes: 192 additions & 84 deletions Pipfile.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ See additional diagrams and documentation in the [docs](docs) folder:
```shell
SENTRY_DSN=# If set to a valid Sentry DSN, enables Sentry exception monitoring. This is not needed for local development.
WORKSPACE=# Set to `dev` for local development, this will be set to `stage` and `prod` in those environments by Terraform.
LUIGI_CONFIG_PATH=hrqb/luigi.cfg # this env var must be set, pointing to config file in hrqb folder
```

### Optional
Expand Down
17 changes: 17 additions & 0 deletions hrqb/base/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from hrqb.base.target import HRQBLocalTarget, PandasPickleTarget, QuickbaseTableTarget
from hrqb.base.task import (
HRQBPipelineTask,
HRQBTask,
PandasPickleTask,
QuickbaseUpsertTask,
)

__all__ = [
"HRQBLocalTarget",
"PandasPickleTarget",
"QuickbaseTableTarget",
"HRQBPipelineTask",
"HRQBTask",
"PandasPickleTask",
"QuickbaseUpsertTask",
]
40 changes: 40 additions & 0 deletions hrqb/base/target.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""hrqb.base.target"""

import json

import luigi # type: ignore[import-untyped]
import pandas as pd
from luigi.format import MixedUnicodeBytes # type: ignore[import-untyped]

PandasObject = pd.DataFrame | pd.Series


class HRQBLocalTarget(luigi.LocalTarget):
"""Target is local file with path and table name init."""

def __init__(self, path: str, table_name: str) -> None:
super().__init__(path, format=MixedUnicodeBytes)
self.path = path
self.table_name = table_name


class PandasPickleTarget(HRQBLocalTarget):
"""Target is Pandas object (DataFrame or Series) pickled to disk."""

def read(self) -> PandasObject:
return pd.read_pickle(self.path)

def write(self, panda_object: PandasObject) -> None:
panda_object.to_pickle(self.path)


class QuickbaseTableTarget(HRQBLocalTarget):
"""Target is upsert to Quickbase table."""

def read(self) -> dict:
with open(self.path) as f:
return json.load(f)

def write(self, data: dict, indent: bool = True) -> int: # noqa: FBT001, FBT002
with open(self.path, "w") as f:
return f.write(json.dumps(data, indent=indent))
70 changes: 70 additions & 0 deletions hrqb/base/task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""hrqb.base.task"""

import luigi # type: ignore[import-untyped]
import pandas as pd

from hrqb.base import PandasPickleTarget, QuickbaseTableTarget
from hrqb.utils import today_date


class HRQBTask(luigi.Task):
"""Base Task class for all HRQB Tasks."""

path = luigi.Parameter()
table_name = luigi.Parameter()

@property
def single_input(self) -> PandasPickleTarget | QuickbaseTableTarget:
input_count = len(self.input())
if input_count != 1:
message = f"Expected a single input to this Task but found: {input_count}"
raise ValueError(message)
return self.input()[0]

@property
def input_pandas_dataframe(self) -> pd.DataFrame:
input_object = self.single_input
data_object = input_object.read()
if not isinstance(data_object, pd.DataFrame):
message = f"Expected pandas Dataframe but got: {type(data_object)}"
raise TypeError(message)
return data_object

@property
def input_pandas_series(self) -> pd.Series:
input_object = self.single_input
data_object = input_object.read()
if not isinstance(data_object, pd.Series):
message = f"Expected pandas Series but got: {type(data_object)}"
raise TypeError(message)
return data_object


class PandasPickleTask(HRQBTask):
"""Base Task class for Tasks that write pickled pandas objects."""

def target(self) -> PandasPickleTarget:
return PandasPickleTarget(
path=self.path,
table_name=self.table_name,
)

def output(self) -> PandasPickleTarget:
return self.target()


class QuickbaseUpsertTask(HRQBTask):
"""Base Task class for Tasks that upsert data to Quickbase tables."""

def target(self) -> QuickbaseTableTarget:
return QuickbaseTableTarget(
path=self.path,
table_name=self.table_name,
)

def output(self) -> QuickbaseTableTarget:
return self.target()


class HRQBPipelineTask(luigi.WrapperTask):
date = luigi.DateParameter(default=today_date())
29 changes: 21 additions & 8 deletions hrqb/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,38 @@

import click

from hrqb.config import configure_logger, configure_sentry
from hrqb.config import Config, configure_logger, configure_sentry

logger = logging.getLogger(__name__)

CONFIG = Config()

@click.command()

@click.group(context_settings={"help_option_names": ["-h", "--help"]})
@click.option(
"-v", "--verbose", is_flag=True, help="Pass to log at debug level instead of info"
"-v",
"--verbose",
is_flag=True,
help="Pass to log at debug level instead of info.",
)
def main(*, verbose: bool) -> None:
start_time = perf_counter()
@click.pass_context
def main(ctx: click.Context, verbose: bool) -> None: # noqa: FBT001
ctx.ensure_object(dict)
ctx.obj["START_TIME"] = perf_counter()
root_logger = logging.getLogger()
logger.info(configure_logger(root_logger, verbose=verbose))
logger.info(configure_sentry())
CONFIG.check_required_env_vars()
logger.info("Running process")

# Do things here!

elapsed_time = perf_counter() - start_time
@main.command()
@click.pass_context
def ping(ctx: click.Context) -> None:
logger.debug("pong")
logger.info(
"Total time to complete process: %s", str(timedelta(seconds=elapsed_time))
"Total elapsed: %s",
str(
timedelta(seconds=perf_counter() - ctx.obj["START_TIME"]),
),
)
24 changes: 24 additions & 0 deletions hrqb/config.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,33 @@
import logging
import os
from typing import Any

import sentry_sdk


class Config:
REQUIRED_ENV_VARS = (
"WORKSPACE",
"SENTRY_DSN",
"LUIGI_CONFIG_PATH",
)
OPTIONAL_ENV_VARS = ("DYLD_LIBRARY_PATH",)

def check_required_env_vars(self) -> None:
"""Method to raise exception if required env vars not set."""
missing_vars = [var for var in self.REQUIRED_ENV_VARS if not os.getenv(var)]
if missing_vars:
message = f"Missing required environment variables: {', '.join(missing_vars)}"
raise OSError(message)

def __getattr__(self, name: str) -> Any: # noqa: ANN401
"""Provide dot notation access to configurations and env vars on this class."""
if name in self.REQUIRED_ENV_VARS or name in self.OPTIONAL_ENV_VARS:
return os.getenv(name)
message = f"'{name}' not a valid configuration variable"
raise AttributeError(message)


def configure_logger(logger: logging.Logger, *, verbose: bool) -> str:
if verbose:
logging.basicConfig(
Expand Down
3 changes: 3 additions & 0 deletions hrqb/luigi.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[core]
autoload_range=true
parallel_scheduling=false
15 changes: 15 additions & 0 deletions hrqb/pipelines.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
"""hrqb.pipelines"""

import luigi # type: ignore[import-untyped]
from luigi.execution_summary import LuigiRunResult # type: ignore[import-untyped]


def run_pipeline(pipeline_task: luigi.WrapperTask) -> LuigiRunResult:
return luigi.build(
[pipeline_task],
local_scheduler=True,
detailed_summary=True,
)


# TODO: add wrapper Pipeline tasks in this file # noqa: TD002, TD003, FIX002
Empty file added hrqb/tasks/__init__.py
Empty file.
3 changes: 3 additions & 0 deletions hrqb/tasks/extract.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"""hrqb.tasks.extract"""

# TODO: add extract tasks in this file # noqa: TD002, TD003, FIX002
3 changes: 3 additions & 0 deletions hrqb/tasks/load.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"""hrqb.tasks.load"""

# TODO: add load tasks in this file # noqa: TD002, TD003, FIX002
3 changes: 3 additions & 0 deletions hrqb/tasks/transform.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"""hrqb.tasks.transform"""

# TODO: add transform tasks in this file # noqa: TD002, TD003, FIX002
7 changes: 7 additions & 0 deletions hrqb/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
"""hrqb.utils"""

import datetime


def today_date() -> datetime.date:
return datetime.datetime.now(tz=datetime.UTC).date()
Empty file added hrqb/utils/db.py
Empty file.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ ignore = [
"PLR0912",
"PLR0913",
"PLR0915",
"S301",
"S320",
"S321",
]
Expand Down
Loading

0 comments on commit 623bc68

Please sign in to comment.