diff --git a/hrqb/tasks/employee_appointments.py b/hrqb/tasks/employee_appointments.py index 4f72f2a..2555284 100644 --- a/hrqb/tasks/employee_appointments.py +++ b/hrqb/tasks/employee_appointments.py @@ -4,7 +4,7 @@ import pandas as pd from hrqb.base.task import PandasPickleTask, QuickbaseUpsertTask, SQLQueryExtractTask -from hrqb.utils import normalize_dataframe_dates +from hrqb.utils import md5_hash_from_values, normalize_dataframe_dates from hrqb.utils.quickbase import QBClient @@ -95,6 +95,19 @@ def get_dataframe(self) -> pd.DataFrame: ], ) + # mint a unique, deterministic value for the merge "Key" field + emp_appts_df["key"] = emp_appts_df.apply( + lambda row: md5_hash_from_values( + [ + row.mit_id, + row.position_id, + row.appt_begin_date, + row.appt_end_date, + ] + ), + axis=1, + ) + fields = { "hr_appt_key": "HR Appointment Key", "mit_id": "MIT ID", @@ -116,6 +129,7 @@ def get_dataframe(self) -> pd.DataFrame: "union_name": "Union Name", "term_or_perm": "Term or Permanent", "benefits_group_type": "Benefits Group Type", + "key": "Key", } return emp_appts_df[fields.keys()].rename(columns=fields) @@ -140,9 +154,8 @@ def requires(self) -> list[luigi.Task]: # pragma: nocover @property def merge_field(self) -> str | None: - return "HR Appointment Key" + return "Key" @property def input_task_to_load(self) -> str: - """Upsert data from parent task 'TransformEmployeeAppointments'.""" return "TransformEmployeeAppointments" diff --git a/hrqb/tasks/employee_leave.py b/hrqb/tasks/employee_leave.py index c19b563..a00b2f9 100644 --- a/hrqb/tasks/employee_leave.py +++ b/hrqb/tasks/employee_leave.py @@ -2,7 +2,6 @@ # ruff: noqa: S324 -import hashlib import luigi # type: ignore[import-untyped] import pandas as pd @@ -12,7 +11,11 @@ QuickbaseUpsertTask, SQLQueryExtractTask, ) -from hrqb.utils import convert_oracle_bools_to_qb_bools, normalize_dataframe_dates +from hrqb.utils import ( + convert_oracle_bools_to_qb_bools, + md5_hash_from_values, + normalize_dataframe_dates, +) class ExtractDWEmployeeLeave(SQLQueryExtractTask): @@ -63,13 +66,15 @@ def get_dataframe(self) -> pd.DataFrame: leaves_df, columns=["paid_leave", "accrue_seniority"] ) - # mint a unique, deterministic value for the "Key" field in Employee Leave table + # mint a unique, deterministic value for the merge "Key" field leaves_df["key"] = leaves_df.apply( - lambda row: self._create_unique_key_from_leave_data( - row.mit_id, - row.absence_date, - row.absence_type, - row.actual_absence_hours, + lambda row: md5_hash_from_values( + [ + row.mit_id, + row.absence_date, + row.absence_type, + str(row.actual_absence_hours), + ] ), axis=1, ) @@ -87,37 +92,6 @@ def get_dataframe(self) -> pd.DataFrame: } return leaves_df[fields.keys()].rename(columns=fields) - @staticmethod - def _create_unique_key_from_leave_data( - mit_id: str, - absence_date: str, - absence_type: str, - absence_hours: str | float, - ) -> str: - """Create unique, deterministic MD5 hash based on select leave attributes. - - Most tables in Quickbase have a complimentary unique value in the data warehouse - to use as a merge field: Employees have MIT ID, Employee Appointments have "HR Job - Key", Salary adjustments have "HR TXN (Transaction) Key", etc. However, absence - data in the data warehouse does not have such a column or value that can be used - to unambiguously and deterministically identify an absence row. - - For this reason, an MD5 hash is created from attributes from the leave data that, - in combination, is unique: MIT ID and Absence date, type, and duration. By - including this MD5 hash in Quickbase as a field, we can use this in future data - loads as a merge field to avoid duplicating what are effectively the same absence - record. - """ - data_string = "|".join( - [ - mit_id, - absence_date, - absence_type, - str(absence_hours), - ] - ).encode() - return hashlib.md5(data_string).hexdigest() - class LoadEmployeeLeave(QuickbaseUpsertTask): diff --git a/hrqb/tasks/employee_salary_history.py b/hrqb/tasks/employee_salary_history.py index a346dc7..f439cba 100644 --- a/hrqb/tasks/employee_salary_history.py +++ b/hrqb/tasks/employee_salary_history.py @@ -4,7 +4,7 @@ import pandas as pd from hrqb.base.task import PandasPickleTask, QuickbaseUpsertTask, SQLQueryExtractTask -from hrqb.utils import normalize_dataframe_dates +from hrqb.utils import md5_hash_from_values, normalize_dataframe_dates class ExtractDWEmployeeSalaryHistory(SQLQueryExtractTask): @@ -33,8 +33,19 @@ def get_dataframe(self) -> pd.DataFrame: qb_emp_appts_df = self.named_inputs["ExtractQBEmployeeAppointments"].read() # merge with employee appointment data for QB appointment record identifier - qb_emp_appts_df = qb_emp_appts_df[["Record ID#", "HR Appointment Key"]].rename( - columns={"Record ID#": "related_employee_appointment_id"} + qb_emp_appts_df = qb_emp_appts_df[ + [ + "Record ID#", + "HR Appointment Key", + "Begin Date", + "End Date", + ] + ].rename( + columns={ + "Record ID#": "related_employee_appointment_id", + "Begin Date": "appointment_begin_date", + "End Date": "appointment_end_date", + } ) salary_df = dw_salary_df.merge( qb_emp_appts_df, @@ -52,6 +63,21 @@ def get_dataframe(self) -> pd.DataFrame: salary_df["original_effort"] = salary_df["original_effort"] / 100.0 salary_df["temp_effort"] = salary_df["temp_effort"] / 100.0 + # mint a unique, deterministic value for the merge "Key" field + salary_df["key"] = salary_df.apply( + lambda row: md5_hash_from_values( + [ + row.mit_id, + row.position_id, + row.appointment_begin_date, + row.appointment_end_date, + row.start_date, + row.end_date, + ] + ), + axis=1, + ) + fields = { "hr_appt_tx_key": "HR Appointment Transaction Key", "mit_id": "MIT ID", @@ -67,6 +93,7 @@ def get_dataframe(self) -> pd.DataFrame: "temp_change_base_amount": "Temp Base Salary", "temp_change_hourly_rate": "Temp Hourly", "temp_effort": "Temp Effort %", + "key": "Key", } return salary_df[fields.keys()].rename(columns=fields) @@ -85,7 +112,7 @@ def requires(self) -> list[luigi.Task]: # pragma: nocover @property def merge_field(self) -> str | None: - return "HR Appointment Transaction Key" + return "Key" @property def input_task_to_load(self) -> str | None: diff --git a/hrqb/utils/__init__.py b/hrqb/utils/__init__.py index 678709d..88b70c7 100644 --- a/hrqb/utils/__init__.py +++ b/hrqb/utils/__init__.py @@ -1,6 +1,7 @@ """hrqb.utils""" import datetime +import hashlib import logging import click @@ -60,3 +61,15 @@ def us_state_abbreviation_to_name(abbreviation: str | None) -> str | None: if state: return state.name return None + + +def md5_hash_from_values(values: list[str]) -> str: + """Create an MD5 hash from an ordered list of values. + + This function is used by some Transform tasks to mint a deterministic, unique value + that can be used as merge fields in Quickbase during upserts. + + NOTE: the order of values in the provided list will affect the MD5 hash created. + """ + data_string = "|".join(values).encode() + return hashlib.md5(data_string).hexdigest() # noqa: S324 diff --git a/tests/conftest.py b/tests/conftest.py index 98c4f2c..71204e2 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -729,7 +729,7 @@ def task_extract_dw_employee_salary_history_complete(all_tasks_pipeline_name): "mit_id": "123456789", "job_id": "123456789", "position_id": "987654321", - "start_date": Timestamp("2010-01-01 00:00:00"), + "start_date": Timestamp("2010-07-01 00:00:00"), "end_date": datetime.datetime(2011, 12, 1, 0, 0), "hr_personnel_action_type_key": "CS01", "hr_personnel_action": "Annual Salary Review", @@ -760,6 +760,9 @@ def task_shared_extract_qb_employee_appointments_complete(all_tasks_pipeline_nam { "Record ID#": 12000, "HR Appointment Key": 123.0, + "Position ID": "987654321", + "Begin Date": "2010-01-01", + "End Date": "2011-12-01", } ] ) diff --git a/tests/tasks/test_employee_appointments.py b/tests/tasks/test_employee_appointments.py index 0a769ac..587af70 100644 --- a/tests/tasks/test_employee_appointments.py +++ b/tests/tasks/test_employee_appointments.py @@ -1,5 +1,7 @@ # ruff: noqa: PLR2004, PD901 +from hrqb.utils import md5_hash_from_values + def test_extract_dw_employees_load_sql_query( task_extract_dw_employee_appointment_complete, @@ -78,14 +80,29 @@ def test_task_transform_employee_appointments_returns_required_load_fields( "Union Name", "Term or Permanent", "Benefits Group Type", + "Key", } == set(df.columns) def test_task_load_employee_appointments_explicit_properties( task_load_employee_appointments, ): - assert task_load_employee_appointments.merge_field == "HR Appointment Key" + assert task_load_employee_appointments.merge_field == "Key" assert ( task_load_employee_appointments.input_task_to_load == "TransformEmployeeAppointments" ) + + +def test_task_transform_employee_appointments_key_expected_from_row_data( + task_transform_employee_appointments_complete, +): + row = task_transform_employee_appointments_complete.get_dataframe().iloc[0] + assert row["Key"] == md5_hash_from_values( + [ + row["MIT ID"], + row["Position ID"], + row["Begin Date"], + row["End Date"], + ] + ) diff --git a/tests/tasks/test_employee_leave.py b/tests/tasks/test_employee_leave.py index b522761..688ef95 100644 --- a/tests/tasks/test_employee_leave.py +++ b/tests/tasks/test_employee_leave.py @@ -1,5 +1,7 @@ # ruff: noqa: PLR2004, PD901, SLF001 +from hrqb.utils import md5_hash_from_values + def test_extract_dw_employee_leave_load_sql_query( task_extract_dw_employee_leave_complete, @@ -26,34 +28,17 @@ def test_task_transform_employee_leave_oracle_bools_converted( assert row["Accrue Seniority"] == "Yes" -def test_task_transform_employee_leave_create_key_from_md5_of_leave_data( - task_transform_employee_leave_complete, -): - assert ( - task_transform_employee_leave_complete._create_unique_key_from_leave_data( - "123456789", "2010-07-01", "Vacation", "8.0" - ) - == "ee9af1a7908735241aeb5c228e2e00fa" - ) - assert ( - task_transform_employee_leave_complete._create_unique_key_from_leave_data( - "123456789", "2010-07-01", "Vacation", "None" - ) - == "767d2672e2e6b38a7c65f0ea2f799067" - ) - - def test_task_transform_employee_leave_key_expected_from_row_data( task_transform_employee_leave_complete, ): row = task_transform_employee_leave_complete.get_dataframe().iloc[0] - assert row[ - "Key" - ] == task_transform_employee_leave_complete._create_unique_key_from_leave_data( - row["MIT ID"], - row["Leave Date"], - row["Related Leave Type"], - str(row["Duration Hours"]), + assert row["Key"] == md5_hash_from_values( + [ + row["MIT ID"], + row["Leave Date"], + row["Related Leave Type"], + str(row["Duration Hours"]), + ] ) diff --git a/tests/tasks/test_employee_salary_history.py b/tests/tasks/test_employee_salary_history.py index fa908f7..90e1d16 100644 --- a/tests/tasks/test_employee_salary_history.py +++ b/tests/tasks/test_employee_salary_history.py @@ -1,5 +1,7 @@ # ruff: noqa: PLR2004, PD901 +from hrqb.utils import md5_hash_from_values + def test_extract_dw_employee_salary_history_load_sql_query( task_extract_dw_employee_salary_history_complete, @@ -29,11 +31,32 @@ def test_task_transform_employee_salary_history_efforts_are_percents( def test_task_load_employee_salary_history_explicit_properties( task_load_employee_salary_history_complete, ): - assert ( - task_load_employee_salary_history_complete.merge_field - == "HR Appointment Transaction Key" - ) + assert task_load_employee_salary_history_complete.merge_field == "Key" assert ( task_load_employee_salary_history_complete.input_task_to_load == "TransformEmployeeSalaryHistory" ) + + +def test_task_transform_employee_salary_history_key_expected_from_input_data( + task_transform_employee_salary_history_complete, + task_shared_extract_qb_employee_appointments_complete, +): + # NOTE: for this test, need to manually get part of the employee appointment data + # that this transform is using when generating the unique MD5 Key field value + qb_emp_appt_row = ( + task_shared_extract_qb_employee_appointments_complete.target.read().iloc[0] + ) + emp_salary_row = task_transform_employee_salary_history_complete.get_dataframe().iloc[ + 0 + ] + assert emp_salary_row["Key"] == md5_hash_from_values( + [ + emp_salary_row["MIT ID"], + qb_emp_appt_row["Position ID"], + qb_emp_appt_row["Begin Date"], + qb_emp_appt_row["End Date"], + emp_salary_row["Start Date"], + emp_salary_row["End Date"], + ] + ) diff --git a/tests/test_utils.py b/tests/test_utils.py index a8663bd..4f23418 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -4,11 +4,13 @@ import numpy as np import pandas as pd +import pytest from freezegun import freeze_time from hrqb.utils import ( click_argument_to_dict, convert_oracle_bools_to_qb_bools, + md5_hash_from_values, normalize_dataframe_dates, normalize_date, today_date, @@ -113,3 +115,26 @@ def test_convert_oracle_bools_to_qb_bools_success(): ] ) ) + + +def test_md5_hash_from_values_gives_expected_results(): + assert md5_hash_from_values(["a", "b", "c"]) == "2e077b3ec5932ac3cf914ebdf242b4ee" + + +def test_md5_hash_from_values_order_of_strings_different_results(): + assert md5_hash_from_values(["a", "b"]) != md5_hash_from_values(["b", "a"]) + + +def test_md5_hash_from_values_raise_error_for_non_string_value(): + with pytest.raises(TypeError, match="NoneType found"): + md5_hash_from_values(["a", "b", None]) + with pytest.raises(TypeError, match="int found"): + md5_hash_from_values(["a", "b", 42]) + with pytest.raises(TypeError, match="datetime found"): + md5_hash_from_values( + [ + "a", + "b", + datetime.datetime(2000, 1, 1, tzinfo=datetime.UTC), + ] + )