-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HRQB 22 - Employee Appointments #39
Changes from 6 commits
4f5c59e
bac33e5
c15239f
d59bc74
000a714
3879be3
b9d634a
bcfcb96
ab70eb8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,148 @@ | ||
"""hrqb.tasks.employee_appointments""" | ||
|
||
import luigi # type: ignore[import-untyped] | ||
import pandas as pd | ||
|
||
from hrqb.base.task import PandasPickleTask, QuickbaseUpsertTask, SQLQueryExtractTask | ||
from hrqb.utils import normalize_dataframe_dates | ||
from hrqb.utils.quickbase import QBClient | ||
|
||
|
||
class ExtractDWEmployeeAppointments(SQLQueryExtractTask): | ||
"""Query Data Warehouse for employee appointment data.""" | ||
|
||
stage = luigi.Parameter("Extract") | ||
|
||
@property | ||
def sql_file(self) -> str: | ||
return "hrqb/tasks/sql/employee_appointments.sql" | ||
|
||
|
||
class ExtractQBLibHREmployeeAppointments(PandasPickleTask): | ||
"""Query Quickbase for data provided by Library HR about employee appointments.""" | ||
|
||
stage = luigi.Parameter("Extract") | ||
|
||
def get_dataframe(self) -> pd.DataFrame: | ||
qbclient = QBClient() | ||
return qbclient.get_table_as_df( | ||
qbclient.get_table_id("LibHR Employee Appointments") | ||
) | ||
|
||
|
||
class ExtractQBDepartments(PandasPickleTask): | ||
"""Query Quickbase for Department data to merge with Library HR data.""" | ||
|
||
stage = luigi.Parameter("Extract") | ||
|
||
def get_dataframe(self) -> pd.DataFrame: | ||
qbclient = QBClient() | ||
return qbclient.get_table_as_df(qbclient.get_table_id("Departments")) | ||
|
||
|
||
class TransformEmployeeAppointments(PandasPickleTask): | ||
"""Combine Data Warehouse and Library HR data for Employee Appointments QB table.""" | ||
|
||
stage = luigi.Parameter("Transform") | ||
|
||
def requires(self) -> list[luigi.Task]: # pragma: nocover | ||
return [ | ||
ExtractDWEmployeeAppointments(pipeline=self.pipeline), | ||
ExtractQBLibHREmployeeAppointments(pipeline=self.pipeline), | ||
ExtractQBDepartments(pipeline=self.pipeline), | ||
] | ||
|
||
def get_dataframe(self) -> pd.DataFrame: | ||
dw_emp_appts_df = self.named_inputs["ExtractDWEmployeeAppointments"].read() | ||
libhr_df = self.named_inputs["ExtractQBLibHREmployeeAppointments"].read() | ||
depts_df = self.named_inputs["ExtractQBDepartments"].read() | ||
|
||
# normalize position id to string and pad zeros | ||
libhr_df["Position ID"] = libhr_df["Position ID"].apply( | ||
lambda x: str(int(x)).zfill(8) | ||
) | ||
|
||
# merge data warehouse data with libhr data to create new employee appointments df | ||
emp_appts_df = dw_emp_appts_df.merge( | ||
libhr_df[ | ||
[ | ||
"Related Employee MIT ID", | ||
"Position ID", | ||
"Related Supervisor MIT ID", | ||
"HC ID", | ||
"Related Department ID", | ||
"Cost Object", | ||
] | ||
], | ||
how="left", | ||
left_on=["position_id", "mit_id"], | ||
right_on=["Position ID", "Related Employee MIT ID"], | ||
) | ||
|
||
# merge on departments to get directorates | ||
emp_appts_df = emp_appts_df.merge( | ||
depts_df[["Record ID#", "Directorate"]], | ||
how="left", | ||
left_on="Related Department ID", | ||
right_on="Record ID#", | ||
) | ||
|
||
emp_appts_df = normalize_dataframe_dates( | ||
emp_appts_df, | ||
[ | ||
"appt_begin_date", | ||
"appt_end_date", | ||
], | ||
) | ||
|
||
fields = { | ||
"hr_appt_key": "HR Appointment Key", | ||
"mit_id": "MIT ID", | ||
"HC ID": "HC ID", | ||
"employee_type": "Related Employee Type", | ||
"appt_begin_date": "Begin Date", | ||
"appt_end_date": "End Date", | ||
"Directorate": "Related Directorate", | ||
"Related Department ID": "Related Department ID", | ||
"Related Supervisor MIT ID": "Supervisor", | ||
"job_title_long": "Related Job Title", | ||
"position_title_long": "Related Position Title", | ||
"job_family": "Job Family", | ||
"job_subfamily": "Job Subfamily", | ||
"job_track": "Job Track", | ||
"position_id": "Position ID", | ||
"Cost Object": "Cost Object", | ||
"exempt": "Exempt / NE", | ||
"union_name": "Union Name", | ||
"term_or_perm": "Term or Permanent", | ||
"benefits_group_type": "Benefits Group Type", | ||
} | ||
|
||
return emp_appts_df[fields.keys()].rename(columns=fields) | ||
|
||
|
||
class LoadEmployeeAppointments(QuickbaseUpsertTask): | ||
|
||
stage = luigi.Parameter("Load") | ||
table_name = "Employee Appointments" | ||
|
||
def requires(self) -> list[luigi.Task]: # pragma: nocover | ||
from hrqb.tasks.employee_types import LoadEmployeeTypes | ||
from hrqb.tasks.job_titles import LoadJobTitles | ||
from hrqb.tasks.position_titles import LoadPositionTitles | ||
|
||
return [ | ||
LoadEmployeeTypes(pipeline=self.pipeline), | ||
LoadJobTitles(pipeline=self.pipeline), | ||
LoadPositionTitles(pipeline=self.pipeline), | ||
TransformEmployeeAppointments(pipeline=self.pipeline), | ||
] | ||
|
||
@property | ||
def merge_field(self) -> str | None: | ||
return "HR Appointment Key" | ||
|
||
@property | ||
def input_task_to_load(self) -> str: | ||
"""Upsert data from parent task 'TransformEmployeeAppointments'.""" | ||
return "TransformEmployeeAppointments" | ||
Comment on lines
+146
to
+148
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is an example of the using the optional property |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
"""hrqb.tasks.employee_types""" | ||
|
||
import luigi # type: ignore[import-untyped] | ||
import pandas as pd | ||
|
||
from hrqb.base.task import PandasPickleTask, QuickbaseUpsertTask | ||
from hrqb.tasks.employee_appointments import ( | ||
ExtractDWEmployeeAppointments, | ||
) | ||
|
||
|
||
class TransformEmployeeTypes(PandasPickleTask): | ||
"""Get unique employee titles from employee appointment data.""" | ||
|
||
stage = luigi.Parameter("Transform") | ||
|
||
def requires(self) -> list[luigi.Task]: # pragma: nocover | ||
return [ExtractDWEmployeeAppointments(pipeline=self.pipeline)] | ||
|
||
def get_dataframe(self) -> pd.DataFrame: | ||
fields = {"employee_type": "Employee Type"} | ||
return ( | ||
self.single_input_dataframe[fields.keys()] | ||
.drop_duplicates() | ||
.rename(columns=fields) | ||
) | ||
|
||
|
||
class LoadEmployeeTypes(QuickbaseUpsertTask): | ||
table_name = luigi.Parameter("Employee Types") | ||
stage = luigi.Parameter("Load") | ||
|
||
def requires(self) -> list[luigi.Task]: # pragma: nocover | ||
return [TransformEmployeeTypes(pipeline=self.pipeline)] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
"""hrqb.tasks.job_titles""" | ||
|
||
import luigi # type: ignore[import-untyped] | ||
import pandas as pd | ||
|
||
from hrqb.base.task import PandasPickleTask, QuickbaseUpsertTask | ||
from hrqb.tasks.employee_appointments import ( | ||
ExtractDWEmployeeAppointments, | ||
) | ||
|
||
|
||
class TransformUniqueJobTitles(PandasPickleTask): | ||
"""Get unique job titles from employee appointment data.""" | ||
|
||
stage = luigi.Parameter("Transform") | ||
|
||
def requires(self) -> list[luigi.Task]: # pragma: nocover | ||
return [ExtractDWEmployeeAppointments(pipeline=self.pipeline)] | ||
|
||
def get_dataframe(self) -> pd.DataFrame: | ||
fields = {"job_title_long": "Job Title", "pay_grade": "Pay Grade"} | ||
return ( | ||
self.single_input_dataframe[fields.keys()] | ||
.drop_duplicates() | ||
.rename(columns=fields) | ||
) | ||
|
||
|
||
class LoadJobTitles(QuickbaseUpsertTask): | ||
table_name = luigi.Parameter("Job Titles") | ||
stage = luigi.Parameter("Load") | ||
|
||
def requires(self) -> list[luigi.Task]: # pragma: nocover | ||
return [TransformUniqueJobTitles(pipeline=self.pipeline)] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
"""hrqb.tasks.position_titles""" | ||
|
||
import luigi # type: ignore[import-untyped] | ||
import pandas as pd | ||
|
||
from hrqb.base.task import PandasPickleTask, QuickbaseUpsertTask | ||
from hrqb.tasks.employee_appointments import ( | ||
ExtractDWEmployeeAppointments, | ||
) | ||
|
||
|
||
class TransformUniquePositionTitles(PandasPickleTask): | ||
"""Get unique position titles from employee appointment data.""" | ||
|
||
stage = luigi.Parameter("Transform") | ||
|
||
def requires(self) -> list[luigi.Task]: # pragma: nocover | ||
return [ExtractDWEmployeeAppointments(pipeline=self.pipeline)] | ||
|
||
def get_dataframe(self) -> pd.DataFrame: | ||
fields = {"position_title_long": "Position Title"} | ||
return ( | ||
self.single_input_dataframe[fields.keys()] | ||
.drop_duplicates() | ||
.rename(columns=fields) | ||
) | ||
|
||
|
||
class LoadPositionTitles(QuickbaseUpsertTask): | ||
table_name = luigi.Parameter("Position Titles") | ||
stage = luigi.Parameter("Load") | ||
|
||
def requires(self) -> list[luigi.Task]: # pragma: nocover | ||
return [TransformUniquePositionTitles(pipeline=self.pipeline)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, why'd the type hint change? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great question. This was an over-engineering early on, where I had assumed that
QuickbaseUpsertTasks
might be used as inputs into other tasks. But that seems unlikely now.If we make this more generic, then we can avoid some linting and typing hiccups where it could be a dictionary from a
QuickbsaeTableTarget
, when we know 99% of the time it's a dataframe fromPandasPickleTarget
. But even that isn't worth the linting/typing headaches, as we often get the task target in a way that loses some of that thread. Seemed pretty clear that just relaxing this type was the best option.