Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HRQB 21 - Employees table #31

Merged
merged 8 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ attrs = "*"
requests = "*"
types-requests = "*"
sqlalchemy = "*"
us = "*"

[dev-packages]
black = "*"
Expand All @@ -26,6 +27,7 @@ ruff = "*"
requests-mock = "*"
freezegun = "*"
sqlalchemy-stubs = "*"
boto3 = "*"

[requires]
python_version = "3.11"
Expand Down
349 changes: 236 additions & 113 deletions Pipfile.lock

Large diffs are not rendered by default.

23 changes: 12 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,18 @@ Options:
Usage: -c pipeline [OPTIONS] COMMAND [ARGS]...

Options:
-p, --pipeline TEXT Pipeline Task class name to be imported from
configured pipeline module, e.g. 'MyPipeline'
[required]
-pm, --pipeline-module TEXT Module where Pipeline Task class is defined.
Default: 'hrqb.tasks.pipelines'.
--pipeline-parameters TEXT Comma separated list of luigi Parameters to
pass to HRQBPipelineTask, e.g.
'Param1=foo,Param2=bar'.
-t, --task TEXT Select a target task for pipeline sub-commands
(e.g. remove-data, run, etc.)
-h, --help Show this message and exit.
-p, --pipeline TEXT Pipeline Task class name to be imported from
configured pipeline module, e.g.
'MyPipeline' [required]
-pm, --pipeline-module TEXT Module where Pipeline Task class is defined.
Default: 'hrqb.tasks.pipelines'.
-pp, --pipeline-parameters TEXT
Comma separated list of luigi Parameters to
pass to HRQBPipelineTask, e.g.
'Param1=foo,Param2=bar'.
-t, --task TEXT Select a target task for pipeline sub-
commands (e.g. remove-data, run, etc.)
-h, --help Show this message and exit.

Commands:
remove-data Remove target data from pipeline tasks.
Expand Down
40 changes: 38 additions & 2 deletions hrqb/base/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from typing import Literal

import luigi # type: ignore[import-untyped]
import numpy as np
import pandas as pd

from hrqb.base import PandasPickleTarget, QuickbaseTableTarget
Expand Down Expand Up @@ -177,6 +178,15 @@ class QuickbaseUpsertTask(HRQBTask):
def filename_extension(self) -> str:
return ".json"

@property
def merge_field(self) -> str | None:
"""Field to merge records on during upsert.

Optional property that extending task classes can define to include a merge field.
Defaults to None.
"""
return None

@property
def target(self) -> QuickbaseTableTarget:
return QuickbaseTableTarget(
Expand All @@ -190,7 +200,17 @@ def get_records(self) -> list[dict]:
This method may be overridden if necessary if a load Task requires more complex
behavior than a straight conversion of the parent's DataFrame to a dictionary.
"""
return self.single_input_dataframe.to_dict(orient="records")
records_df = self.single_input_dataframe
records_df = self._normalize_records_for_upsert(records_df)
return records_df.to_dict(orient="records")

def _normalize_records_for_upsert(self, records_df: pd.DataFrame) -> pd.DataFrame:
"""Normalize dataframe before upsert to Quickbase.

If and when data type edge cases arise, as Quickbase is fairly particular about
formats, this is a good place to apply additional, centralized normalization.
"""
return records_df.replace({np.nan: None})

def run(self) -> None:
"""Retrieve data from parent Task and upsert to Quickbase table.
Expand All @@ -210,14 +230,30 @@ def run(self) -> None:
upsert_payload = qbclient.prepare_upsert_payload(
table_id,
records,
merge_field=None,
merge_field=self.merge_field,
)
results = qbclient.upsert_records(upsert_payload)

self.parse_and_log_upsert_results(results)
self.parse_and_log_upsert_errors(results)

self.target.write(results)

def parse_and_log_upsert_results(self, api_response: dict) -> None:
"""Parse Quickbase upsert response and log counts of records modified."""
metadata = api_response.get("metadata")
if not metadata:
return
processed = metadata.get("totalNumberOfRecordsProcessed", 0)
created = len(metadata.get("createdRecordIds", []))
updated = len(metadata.get("updatedRecordIds", []))
unchanged = len(metadata.get("unchangedRecordIds", []))
message = (
f"Record processed: {processed}, created: {created}, "
f"modified: {updated}, unchanged: {unchanged}"
)
logger.info(message)
ghukill marked this conversation as resolved.
Show resolved Hide resolved

def parse_and_log_upsert_errors(self, api_response: dict) -> None:
"""Parse Quickbase upsert response and log any errors.

Expand Down
6 changes: 2 additions & 4 deletions hrqb/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def test_connections(ctx: click.Context) -> None:
)


@click.group()
@main.group()
jonavellecuerdo marked this conversation as resolved.
Show resolved Hide resolved
@click.option(
"-p",
"--pipeline",
Expand All @@ -90,6 +90,7 @@ def test_connections(ctx: click.Context) -> None:
default="hrqb.tasks.pipelines",
)
@click.option(
"-pp",
"--pipeline-parameters",
callback=click_argument_to_dict,
help="Comma separated list of luigi Parameters to pass to HRQBPipelineTask, "
Expand Down Expand Up @@ -134,9 +135,6 @@ def pipeline(
ctx.obj["PIPELINE_TARGET_TASK"] = pipeline_target_task


main.add_command(pipeline)


@pipeline.command()
@click.pass_context
def status(ctx: click.Context) -> None:
Expand Down
87 changes: 87 additions & 0 deletions hrqb/tasks/employees.py
ghukill marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
"""hrqb.tasks.employees"""

import luigi # type: ignore[import-untyped]
import pandas as pd

from hrqb.base.task import (
PandasPickleTask,
QuickbaseUpsertTask,
SQLQueryExtractTask,
)
from hrqb.utils import normalize_dataframe_dates, state_abbreviation_to_name


class ExtractDWEmployees(SQLQueryExtractTask):
pipeline = luigi.Parameter()
stage = luigi.Parameter("Extract")

@property
def sql_file(self) -> str:
return "hrqb/tasks/sql/employees.sql"


class TransformEmployees(PandasPickleTask):
stage = luigi.Parameter("Transform")

def requires(self) -> list[luigi.Task]: # pragma: nocover
return [ExtractDWEmployees(pipeline=self.pipeline)]

def get_dataframe(self) -> pd.DataFrame:
employees_df = self.single_input_dataframe

employees_df = normalize_dataframe_dates(
employees_df,
[
"date_of_birth",
"mit_hire_date",
"mit_lib_hire_date",
"appointment_end_date",
"i9_form_expiration_date",
],
)

employees_df["home_addr_state"] = employees_df["home_addr_state"].apply(
state_abbreviation_to_name
)

fields = {
"mit_id": "MIT ID",
"first_name": "First Name",
"last_name": "Last Name",
"preferred_name": "Preferred Name",
"date_of_birth": "Date of Birth",
"mit_hire_date": "Original Hire Date at MIT",
"mit_lib_hire_date": "Original Hire Date at MIT Libraries",
"appointment_end_date": "End Date",
"home_addr_street1": "Street 1",
"home_addr_street2": "Street 2",
"home_addr_city": "City",
"home_addr_state": "State/Region",
"home_addr_zip": "Postal Code",
"home_addr_country": "Country",
"mit_email_address": "MIT Email Address",
"office_address": "Work Address Office",
"office_phone": "Work Phone",
"home_phone": "Cell Phone Number",
"emergency_contact_name": "Emergency Contact Name",
"emergency_contact_relation": "Emergency Contact Relationship",
"emergency_contact_email": "Emergency Contact Email",
"emergency_home_phone": "Emergency Contact Home Phone",
"emergency_work_phone": "Emergency Contact Work Phone",
"emergency_cell_phone": "Emergency Contact Cell Phone",
"highest_degree_type": "Highest Degree Type",
"highest_degree_year": "Highest Degree Year",
"residency_status": "Residency Status",
"yrs_of_mit_serv": "MIT Years of Service",
"yrs_of_prof_expr": "Years of Professional Experience",
"i9_form_expiration_date": "I9 Expiration Date",
}
return employees_df[fields.keys()].rename(columns=fields)


class LoadEmployees(QuickbaseUpsertTask):
table_name = luigi.Parameter("Employees")
stage = luigi.Parameter("Load")

def requires(self) -> list[luigi.Task]: # pragma: nocover
return [TransformEmployees(pipeline=self.pipeline)]
3 changes: 0 additions & 3 deletions hrqb/tasks/extract.py

This file was deleted.

3 changes: 0 additions & 3 deletions hrqb/tasks/load.py

This file was deleted.

15 changes: 14 additions & 1 deletion hrqb/tasks/pipelines.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
"""hrqb.tasks.pipelines"""

# WIP: HRQBPipelineTasks will be defined here
from collections.abc import Iterator

import luigi # type: ignore[import-untyped]

from hrqb.base.task import HRQBPipelineTask


class FullUpdate(HRQBPipelineTask):
"""Pipeline to perform a full update of all Quickbase tables."""

def requires(self) -> Iterator[luigi.Task]: # pragma: no cover
from hrqb.tasks.employees import LoadEmployees
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imports are performed inside the requires() method to avoid them imported at the top level. This keeps imports from this file limited to HRQBPipelineTask tasks themselves.


yield LoadEmployees(pipeline=self.pipeline_name)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As more table ETL tasks are added, the final load task will get added here. I would estimate this will be 10-12 tasks long by the end.

41 changes: 41 additions & 0 deletions hrqb/tasks/sql/employees.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
Query for Employee data.

CHANGELOG
- 2024-05-13 Query created and added
*/

select
e.MIT_ID,
e.FIRST_NAME,
e.LAST_NAME,
e.NAME_KNOWN_BY as PREFERRED_NAME,
e.DATE_OF_BIRTH,
e.ORIGINAL_HIRE_DATE AS MIT_HIRE_DATE,
e.CURRENT_EMPLOYMENT_DATE as MIT_LIB_HIRE_DATE,
e.APPOINTMENT_END_DATE,
e.HOME_ADDR_STREET1,
e.HOME_ADDR_STREET2,
e.HOME_ADDR_CITY,
e.HOME_ADDR_STATE,
e.HOME_ADDR_ZIP,
e.HOME_ADDR_COUNTRY,
e.EMAIL_ADDRESS as MIT_EMAIL_ADDRESS,
e.OFFICE_ADDRESS,
e.OFFICE_PHONE,
e.HOME_PHONE,
e.EMERGENCY_CONTACT_NAME,
e.EMERGENCY_CONTACT_RELATION,
e.EMERGENCY_CONTACT_EMAIL,
e.EMERGENCY_HOME_PHONE,
e.EMERGENCY_WORK_PHONE,
e.EMERGENCY_CELL_PHONE,
e.HIGHEST_DEGREE_TYPE,
e.HIGHEST_DEGREE_YEAR,
e.RESIDENCY_STATUS,
ROUND(MONTHS_BETWEEN(SYSDATE, e.ORIGINAL_HIRE_DATE) / 12, 2) AS YRS_OF_MIT_SERV,
e.YRS_OF_SERVICE as YRS_OF_PROF_EXPR,
e.I9_FORM_EXPIRATION_DATE,
e.RESIDENCY_STATUS
from HR_PERSON_EMPLOYEE e
order by LAST_NAME
3 changes: 0 additions & 3 deletions hrqb/tasks/transform.py

This file was deleted.

37 changes: 37 additions & 0 deletions hrqb/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,52 @@
import datetime

import click
import pandas as pd
import us # type: ignore[import-untyped]
from dateutil.parser import parse as date_parser # type: ignore[import-untyped]


def today_date() -> datetime.date:
return datetime.datetime.now(tz=datetime.UTC).date()


def normalize_date(date: str | datetime.datetime) -> str | None:
if date is None or date == "" or pd.isna(date):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch! Will remove that redundant is None clause.

return None
ghukill marked this conversation as resolved.
Show resolved Hide resolved
if isinstance(date, str):
date = date_parser(date)
if isinstance(date, datetime.datetime):
return date.strftime("%Y-%m-%d")
return None

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, when viewing this code locally, Pylance indicates "Code is unreachable" for the final return None. Should there be some error handling for dates that cannot be parsed?

Perhaps rewrite to:

import logging

def normalize_date(date: str | datetime.datetime) -> str | None:
    if date is None or date == "" or pd.isna(date):
        return None
    if isinstance(date, str):
        try:
            date = date_parser(date)
        except ParserError:
            logging.warning("Unable to parse date from '{date}'")
    if isinstance(date, datetime.datetime):
        return date.strftime("%Y-%m-%d")
    return None

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This utility function has been a headache, and @ehanson8 commented on it too. I think this is a good change. You're (and pylance) are right, that if the string is not parsable, it would raise an exception so we'd never hit that return None (because if it did parse it, then we'd return on the next block). Updating!



def normalize_dataframe_dates(df: pd.DataFrame, columns: list[str]) -> pd.DataFrame:
df[columns] = df[columns].map(normalize_date)
return df


def click_argument_to_dict(
_ctx: click.Context, _parameter: click.Parameter, value: str
) -> dict:
if value is None:
return {}
return dict(pair.split("=") for pair in value.split(","))


def convert_oracle_bools_to_qb_bools(
dataframe: pd.DataFrame,
columns: list[str],
) -> pd.DataFrame:
"""Convert Oracle bools [Y,N] to Quickbase bools [Yes,No] for a subset of columns."""
dataframe[columns] = dataframe[columns].replace({"Y": "Yes", "N": "No"})
return dataframe


def state_abbreviation_to_name(abbreviation: str | None) -> str | None:
ghukill marked this conversation as resolved.
Show resolved Hide resolved
try:
state = getattr(us.states, abbreviation or "None")
except AttributeError:
state = None
if state:
return state.name
return None
Loading
Loading