Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HRQB 22 - Employee Appointments #39

Merged
merged 9 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
292 changes: 142 additions & 150 deletions Pipfile.lock

Large diffs are not rendered by default.

35 changes: 22 additions & 13 deletions hrqb/base/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def single_input_dataframe(self) -> pd.DataFrame:
return target.read() # type: ignore[return-value]

@property
def named_inputs(self) -> dict[str, PandasPickleTarget | QuickbaseTableTarget]:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, why'd the type hint change? 🤔

Copy link
Collaborator Author

@ghukill ghukill Jun 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great question. This was an over-engineering early on, where I had assumed that QuickbaseUpsertTasks might be used as inputs into other tasks. But that seems unlikely now.

If we make this more generic, then we can avoid some linting and typing hiccups where it could be a dictionary from a QuickbsaeTableTarget, when we know 99% of the time it's a dataframe from PandasPickleTarget. But even that isn't worth the linting/typing headaches, as we often get the task target in a way that loses some of that thread. Seemed pretty clear that just relaxing this type was the best option.

def named_inputs(self) -> dict[str, luigi.Target]:
"""Dictionary of parent Tasks and their Targets.

This is useful when a Task has multiple parent Tasks, to easily and precisely
Expand Down Expand Up @@ -187,6 +187,17 @@ def merge_field(self) -> str | None:
"""
return None

@property
def input_task_to_load(self) -> str | None:
"""Task name of parent, required, input task to get data from and upsert.

If a QuickbaseUpsertTask load task has multiple required tasks, we cannot use the
convenience method 'single_input_dataframe' to get the dataframe to load. This
property can be set to explicitly define which single parent task to retrieve data
from for upsert.
"""
return None

@property
def target(self) -> QuickbaseTableTarget:
return QuickbaseTableTarget(
Expand All @@ -200,7 +211,10 @@ def get_records(self) -> list[dict]:
This method may be overridden if necessary if a load Task requires more complex
behavior than a straight conversion of the parent's DataFrame to a dictionary.
"""
records_df = self.single_input_dataframe
if self.input_task_to_load:
records_df = self.named_inputs[self.input_task_to_load].read()
else:
records_df = self.single_input_dataframe
records_df = self._normalize_records_for_upsert(records_df)
return records_df.to_dict(orient="records")

Expand Down Expand Up @@ -241,17 +255,12 @@ def run(self) -> None:

def parse_and_log_upsert_results(self, api_response: dict) -> None:
"""Parse Quickbase upsert response and log counts of records modified."""
metadata = api_response.get("metadata")
if not metadata:
return
processed = metadata.get("totalNumberOfRecordsProcessed", 0)
created = len(metadata.get("createdRecordIds", []))
updated = len(metadata.get("updatedRecordIds", []))
unchanged = len(metadata.get("unchangedRecordIds", []))
message = (
f"Record processed: {processed}, created: {created}, "
f"modified: {updated}, unchanged: {unchanged}"
)
record_counts = QBClient.parse_upsert_results(api_response)
if not record_counts:
return # pragma: nocover
for key in ["created", "updated", "unchanged"]:
record_counts[key] = len(record_counts[key])
message = f"Upsert results: {record_counts}"
logger.info(message)

def parse_and_log_upsert_errors(self, api_response: dict) -> None:
Expand Down
148 changes: 148 additions & 0 deletions hrqb/tasks/employee_appointments.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
"""hrqb.tasks.employee_appointments"""

import luigi # type: ignore[import-untyped]
import pandas as pd

from hrqb.base.task import PandasPickleTask, QuickbaseUpsertTask, SQLQueryExtractTask
from hrqb.utils import normalize_dataframe_dates
from hrqb.utils.quickbase import QBClient


class ExtractDWEmployeeAppointments(SQLQueryExtractTask):
"""Query Data Warehouse for employee appointment data."""

stage = luigi.Parameter("Extract")

@property
def sql_file(self) -> str:
return "hrqb/tasks/sql/employee_appointments.sql"


class ExtractQBLibHREmployeeAppointments(PandasPickleTask):
"""Query Quickbase for data provided by Library HR about employee appointments."""

stage = luigi.Parameter("Extract")

def get_dataframe(self) -> pd.DataFrame:
qbclient = QBClient()
return qbclient.get_table_as_df(
qbclient.get_table_id("LibHR Employee Appointments")
)


class ExtractQBDepartments(PandasPickleTask):
"""Query Quickbase for Department data to merge with Library HR data."""

stage = luigi.Parameter("Extract")

def get_dataframe(self) -> pd.DataFrame:
qbclient = QBClient()
return qbclient.get_table_as_df(qbclient.get_table_id("Departments"))


class TransformEmployeeAppointments(PandasPickleTask):
"""Combine Data Warehouse and Library HR data for Employee Appointments QB table."""

stage = luigi.Parameter("Transform")

def requires(self) -> list[luigi.Task]: # pragma: nocover
return [
ExtractDWEmployeeAppointments(pipeline=self.pipeline),
ExtractQBLibHREmployeeAppointments(pipeline=self.pipeline),
ExtractQBDepartments(pipeline=self.pipeline),
]

def get_dataframe(self) -> pd.DataFrame:
dw_emp_appts_df = self.named_inputs["ExtractDWEmployeeAppointments"].read()
libhr_df = self.named_inputs["ExtractQBLibHREmployeeAppointments"].read()
depts_df = self.named_inputs["ExtractQBDepartments"].read()

# normalize position id to string and pad zeros
libhr_df["Position ID"] = libhr_df["Position ID"].apply(
lambda x: str(int(x)).zfill(8)
)

# merge data warehouse data with libhr data to create new employee appointments df
emp_appts_df = dw_emp_appts_df.merge(
libhr_df[
[
"Related Employee MIT ID",
"Position ID",
"Related Supervisor MIT ID",
"HC ID",
"Related Department ID",
"Cost Object",
]
],
how="left",
left_on=["position_id", "mit_id"],
right_on=["Position ID", "Related Employee MIT ID"],
)

# merge on departments to get directorates
emp_appts_df = emp_appts_df.merge(
depts_df[["Record ID#", "Directorate"]],
how="left",
left_on="Related Department ID",
right_on="Record ID#",
)

emp_appts_df = normalize_dataframe_dates(
emp_appts_df,
[
"appt_begin_date",
"appt_end_date",
],
)

fields = {
"hr_appt_key": "HR Appointment Key",
"mit_id": "MIT ID",
"HC ID": "HC ID",
"employee_type": "Related Employee Type",
"appt_begin_date": "Begin Date",
"appt_end_date": "End Date",
"Directorate": "Related Directorate",
"Related Department ID": "Related Department ID",
"Related Supervisor MIT ID": "Supervisor",
"job_title_long": "Related Job Title",
"position_title_long": "Related Position Title",
"job_family": "Job Family",
"job_subfamily": "Job Subfamily",
"job_track": "Job Track",
"position_id": "Position ID",
"Cost Object": "Cost Object",
"exempt": "Exempt / NE",
"union_name": "Union Name",
"term_or_perm": "Term or Permanent",
"benefits_group_type": "Benefits Group Type",
}

return emp_appts_df[fields.keys()].rename(columns=fields)


class LoadEmployeeAppointments(QuickbaseUpsertTask):

stage = luigi.Parameter("Load")
table_name = "Employee Appointments"

def requires(self) -> list[luigi.Task]: # pragma: nocover
from hrqb.tasks.employee_types import LoadEmployeeTypes
from hrqb.tasks.job_titles import LoadJobTitles
from hrqb.tasks.position_titles import LoadPositionTitles

return [
LoadEmployeeTypes(pipeline=self.pipeline),
LoadJobTitles(pipeline=self.pipeline),
LoadPositionTitles(pipeline=self.pipeline),
TransformEmployeeAppointments(pipeline=self.pipeline),
]

@property
def merge_field(self) -> str | None:
return "HR Appointment Key"

@property
def input_task_to_load(self) -> str:
"""Upsert data from parent task 'TransformEmployeeAppointments'."""
return "TransformEmployeeAppointments"
Comment on lines +146 to +148
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an example of the using the optional property input_task_to_load. By setting this, it explicitly sets which parent task from the multiple ones defined in self.requires() to use as the input data for upserting.

34 changes: 34 additions & 0 deletions hrqb/tasks/employee_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""hrqb.tasks.employee_types"""

import luigi # type: ignore[import-untyped]
import pandas as pd

from hrqb.base.task import PandasPickleTask, QuickbaseUpsertTask
from hrqb.tasks.employee_appointments import (
ExtractDWEmployeeAppointments,
)


class TransformEmployeeTypes(PandasPickleTask):
"""Get unique employee titles from employee appointment data."""

stage = luigi.Parameter("Transform")

def requires(self) -> list[luigi.Task]: # pragma: nocover
return [ExtractDWEmployeeAppointments(pipeline=self.pipeline)]

def get_dataframe(self) -> pd.DataFrame:
fields = {"employee_type": "Employee Type"}
return (
self.single_input_dataframe[fields.keys()]
.drop_duplicates()
.rename(columns=fields)
)


class LoadEmployeeTypes(QuickbaseUpsertTask):
table_name = luigi.Parameter("Employee Types")
stage = luigi.Parameter("Load")

def requires(self) -> list[luigi.Task]: # pragma: nocover
return [TransformEmployeeTypes(pipeline=self.pipeline)]
4 changes: 4 additions & 0 deletions hrqb/tasks/employees.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,9 @@ class LoadEmployees(QuickbaseUpsertTask):
table_name = luigi.Parameter("Employees")
stage = luigi.Parameter("Load")

@property
def merge_field(self) -> str | None:
return "MIT ID"

def requires(self) -> list[luigi.Task]: # pragma: nocover
return [TransformEmployees(pipeline=self.pipeline)]
34 changes: 34 additions & 0 deletions hrqb/tasks/job_titles.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""hrqb.tasks.job_titles"""

import luigi # type: ignore[import-untyped]
import pandas as pd

from hrqb.base.task import PandasPickleTask, QuickbaseUpsertTask
from hrqb.tasks.employee_appointments import (
ExtractDWEmployeeAppointments,
)


class TransformUniqueJobTitles(PandasPickleTask):
"""Get unique job titles from employee appointment data."""

stage = luigi.Parameter("Transform")

def requires(self) -> list[luigi.Task]: # pragma: nocover
return [ExtractDWEmployeeAppointments(pipeline=self.pipeline)]

def get_dataframe(self) -> pd.DataFrame:
fields = {"job_title_long": "Job Title", "pay_grade": "Pay Grade"}
return (
self.single_input_dataframe[fields.keys()]
.drop_duplicates()
.rename(columns=fields)
)


class LoadJobTitles(QuickbaseUpsertTask):
table_name = luigi.Parameter("Job Titles")
stage = luigi.Parameter("Load")

def requires(self) -> list[luigi.Task]: # pragma: nocover
return [TransformUniqueJobTitles(pipeline=self.pipeline)]
2 changes: 2 additions & 0 deletions hrqb/tasks/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ class FullUpdate(HRQBPipelineTask):
"""Pipeline to perform a full update of all Quickbase tables."""

def requires(self) -> Iterator[luigi.Task]: # pragma: no cover
from hrqb.tasks.employee_appointments import LoadEmployeeAppointments
from hrqb.tasks.employees import LoadEmployees

yield LoadEmployees(pipeline=self.pipeline_name)
yield LoadEmployeeAppointments(pipeline=self.pipeline_name)


class UpdateLibHRData(HRQBPipelineTask):
Expand Down
34 changes: 34 additions & 0 deletions hrqb/tasks/position_titles.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""hrqb.tasks.position_titles"""

import luigi # type: ignore[import-untyped]
import pandas as pd

from hrqb.base.task import PandasPickleTask, QuickbaseUpsertTask
from hrqb.tasks.employee_appointments import (
ExtractDWEmployeeAppointments,
)


class TransformUniquePositionTitles(PandasPickleTask):
"""Get unique position titles from employee appointment data."""

stage = luigi.Parameter("Transform")

def requires(self) -> list[luigi.Task]: # pragma: nocover
return [ExtractDWEmployeeAppointments(pipeline=self.pipeline)]

def get_dataframe(self) -> pd.DataFrame:
fields = {"position_title_long": "Position Title"}
return (
self.single_input_dataframe[fields.keys()]
.drop_duplicates()
.rename(columns=fields)
)


class LoadPositionTitles(QuickbaseUpsertTask):
table_name = luigi.Parameter("Position Titles")
stage = luigi.Parameter("Load")

def requires(self) -> list[luigi.Task]: # pragma: nocover
return [TransformUniquePositionTitles(pipeline=self.pipeline)]
Loading
Loading