From 4929e5231656e162e690e64994afbf07230cb6c4 Mon Sep 17 00:00:00 2001 From: Graham Hukill Date: Fri, 14 Jun 2024 10:14:50 -0400 Subject: [PATCH] Mint MD5 hash values for Key field Why these changes are being introduced: It was determined that columns used from the data warehouse, expected to be enduringly the same and unique, where changing across warehouse runs. These values were used for Quickbase merge fields, but this would not work if the values changed. A value was needed that uniquely identifies the row for a handful of tables, across warehouse runs over time. How this addresses that need: * Updates multiple tables to mint an MD5 value from an ordered list of field values that are known to uniquely identify a row Side effects of this change: * None Relevant ticket(s): * https://mitlibraries.atlassian.net/browse/HRQB-36 --- hrqb/tasks/employee_appointments.py | 19 ++++++-- hrqb/tasks/employee_leave.py | 52 ++++++--------------- hrqb/tasks/employee_salary_history.py | 35 ++++++++++++-- hrqb/utils/__init__.py | 13 ++++++ tests/conftest.py | 5 +- tests/tasks/test_employee_appointments.py | 19 +++++++- tests/tasks/test_employee_leave.py | 33 ++++--------- tests/tasks/test_employee_salary_history.py | 31 ++++++++++-- tests/test_utils.py | 25 ++++++++++ 9 files changed, 156 insertions(+), 76 deletions(-) diff --git a/hrqb/tasks/employee_appointments.py b/hrqb/tasks/employee_appointments.py index 4f72f2a..2555284 100644 --- a/hrqb/tasks/employee_appointments.py +++ b/hrqb/tasks/employee_appointments.py @@ -4,7 +4,7 @@ import pandas as pd from hrqb.base.task import PandasPickleTask, QuickbaseUpsertTask, SQLQueryExtractTask -from hrqb.utils import normalize_dataframe_dates +from hrqb.utils import md5_hash_from_values, normalize_dataframe_dates from hrqb.utils.quickbase import QBClient @@ -95,6 +95,19 @@ def get_dataframe(self) -> pd.DataFrame: ], ) + # mint a unique, deterministic value for the merge "Key" field + emp_appts_df["key"] = emp_appts_df.apply( + lambda row: md5_hash_from_values( + [ + row.mit_id, + row.position_id, + row.appt_begin_date, + row.appt_end_date, + ] + ), + axis=1, + ) + fields = { "hr_appt_key": "HR Appointment Key", "mit_id": "MIT ID", @@ -116,6 +129,7 @@ def get_dataframe(self) -> pd.DataFrame: "union_name": "Union Name", "term_or_perm": "Term or Permanent", "benefits_group_type": "Benefits Group Type", + "key": "Key", } return emp_appts_df[fields.keys()].rename(columns=fields) @@ -140,9 +154,8 @@ def requires(self) -> list[luigi.Task]: # pragma: nocover @property def merge_field(self) -> str | None: - return "HR Appointment Key" + return "Key" @property def input_task_to_load(self) -> str: - """Upsert data from parent task 'TransformEmployeeAppointments'.""" return "TransformEmployeeAppointments" diff --git a/hrqb/tasks/employee_leave.py b/hrqb/tasks/employee_leave.py index c19b563..a00b2f9 100644 --- a/hrqb/tasks/employee_leave.py +++ b/hrqb/tasks/employee_leave.py @@ -2,7 +2,6 @@ # ruff: noqa: S324 -import hashlib import luigi # type: ignore[import-untyped] import pandas as pd @@ -12,7 +11,11 @@ QuickbaseUpsertTask, SQLQueryExtractTask, ) -from hrqb.utils import convert_oracle_bools_to_qb_bools, normalize_dataframe_dates +from hrqb.utils import ( + convert_oracle_bools_to_qb_bools, + md5_hash_from_values, + normalize_dataframe_dates, +) class ExtractDWEmployeeLeave(SQLQueryExtractTask): @@ -63,13 +66,15 @@ def get_dataframe(self) -> pd.DataFrame: leaves_df, columns=["paid_leave", "accrue_seniority"] ) - # mint a unique, deterministic value for the "Key" field in Employee Leave table + # mint a unique, deterministic value for the merge "Key" field leaves_df["key"] = leaves_df.apply( - lambda row: self._create_unique_key_from_leave_data( - row.mit_id, - row.absence_date, - row.absence_type, - row.actual_absence_hours, + lambda row: md5_hash_from_values( + [ + row.mit_id, + row.absence_date, + row.absence_type, + str(row.actual_absence_hours), + ] ), axis=1, ) @@ -87,37 +92,6 @@ def get_dataframe(self) -> pd.DataFrame: } return leaves_df[fields.keys()].rename(columns=fields) - @staticmethod - def _create_unique_key_from_leave_data( - mit_id: str, - absence_date: str, - absence_type: str, - absence_hours: str | float, - ) -> str: - """Create unique, deterministic MD5 hash based on select leave attributes. - - Most tables in Quickbase have a complimentary unique value in the data warehouse - to use as a merge field: Employees have MIT ID, Employee Appointments have "HR Job - Key", Salary adjustments have "HR TXN (Transaction) Key", etc. However, absence - data in the data warehouse does not have such a column or value that can be used - to unambiguously and deterministically identify an absence row. - - For this reason, an MD5 hash is created from attributes from the leave data that, - in combination, is unique: MIT ID and Absence date, type, and duration. By - including this MD5 hash in Quickbase as a field, we can use this in future data - loads as a merge field to avoid duplicating what are effectively the same absence - record. - """ - data_string = "|".join( - [ - mit_id, - absence_date, - absence_type, - str(absence_hours), - ] - ).encode() - return hashlib.md5(data_string).hexdigest() - class LoadEmployeeLeave(QuickbaseUpsertTask): diff --git a/hrqb/tasks/employee_salary_history.py b/hrqb/tasks/employee_salary_history.py index a346dc7..f439cba 100644 --- a/hrqb/tasks/employee_salary_history.py +++ b/hrqb/tasks/employee_salary_history.py @@ -4,7 +4,7 @@ import pandas as pd from hrqb.base.task import PandasPickleTask, QuickbaseUpsertTask, SQLQueryExtractTask -from hrqb.utils import normalize_dataframe_dates +from hrqb.utils import md5_hash_from_values, normalize_dataframe_dates class ExtractDWEmployeeSalaryHistory(SQLQueryExtractTask): @@ -33,8 +33,19 @@ def get_dataframe(self) -> pd.DataFrame: qb_emp_appts_df = self.named_inputs["ExtractQBEmployeeAppointments"].read() # merge with employee appointment data for QB appointment record identifier - qb_emp_appts_df = qb_emp_appts_df[["Record ID#", "HR Appointment Key"]].rename( - columns={"Record ID#": "related_employee_appointment_id"} + qb_emp_appts_df = qb_emp_appts_df[ + [ + "Record ID#", + "HR Appointment Key", + "Begin Date", + "End Date", + ] + ].rename( + columns={ + "Record ID#": "related_employee_appointment_id", + "Begin Date": "appointment_begin_date", + "End Date": "appointment_end_date", + } ) salary_df = dw_salary_df.merge( qb_emp_appts_df, @@ -52,6 +63,21 @@ def get_dataframe(self) -> pd.DataFrame: salary_df["original_effort"] = salary_df["original_effort"] / 100.0 salary_df["temp_effort"] = salary_df["temp_effort"] / 100.0 + # mint a unique, deterministic value for the merge "Key" field + salary_df["key"] = salary_df.apply( + lambda row: md5_hash_from_values( + [ + row.mit_id, + row.position_id, + row.appointment_begin_date, + row.appointment_end_date, + row.start_date, + row.end_date, + ] + ), + axis=1, + ) + fields = { "hr_appt_tx_key": "HR Appointment Transaction Key", "mit_id": "MIT ID", @@ -67,6 +93,7 @@ def get_dataframe(self) -> pd.DataFrame: "temp_change_base_amount": "Temp Base Salary", "temp_change_hourly_rate": "Temp Hourly", "temp_effort": "Temp Effort %", + "key": "Key", } return salary_df[fields.keys()].rename(columns=fields) @@ -85,7 +112,7 @@ def requires(self) -> list[luigi.Task]: # pragma: nocover @property def merge_field(self) -> str | None: - return "HR Appointment Transaction Key" + return "Key" @property def input_task_to_load(self) -> str | None: diff --git a/hrqb/utils/__init__.py b/hrqb/utils/__init__.py index 678709d..88b70c7 100644 --- a/hrqb/utils/__init__.py +++ b/hrqb/utils/__init__.py @@ -1,6 +1,7 @@ """hrqb.utils""" import datetime +import hashlib import logging import click @@ -60,3 +61,15 @@ def us_state_abbreviation_to_name(abbreviation: str | None) -> str | None: if state: return state.name return None + + +def md5_hash_from_values(values: list[str]) -> str: + """Create an MD5 hash from an ordered list of values. + + This function is used by some Transform tasks to mint a deterministic, unique value + that can be used as merge fields in Quickbase during upserts. + + NOTE: the order of values in the provided list will affect the MD5 hash created. + """ + data_string = "|".join(values).encode() + return hashlib.md5(data_string).hexdigest() # noqa: S324 diff --git a/tests/conftest.py b/tests/conftest.py index 98c4f2c..71204e2 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -729,7 +729,7 @@ def task_extract_dw_employee_salary_history_complete(all_tasks_pipeline_name): "mit_id": "123456789", "job_id": "123456789", "position_id": "987654321", - "start_date": Timestamp("2010-01-01 00:00:00"), + "start_date": Timestamp("2010-07-01 00:00:00"), "end_date": datetime.datetime(2011, 12, 1, 0, 0), "hr_personnel_action_type_key": "CS01", "hr_personnel_action": "Annual Salary Review", @@ -760,6 +760,9 @@ def task_shared_extract_qb_employee_appointments_complete(all_tasks_pipeline_nam { "Record ID#": 12000, "HR Appointment Key": 123.0, + "Position ID": "987654321", + "Begin Date": "2010-01-01", + "End Date": "2011-12-01", } ] ) diff --git a/tests/tasks/test_employee_appointments.py b/tests/tasks/test_employee_appointments.py index 0a769ac..587af70 100644 --- a/tests/tasks/test_employee_appointments.py +++ b/tests/tasks/test_employee_appointments.py @@ -1,5 +1,7 @@ # ruff: noqa: PLR2004, PD901 +from hrqb.utils import md5_hash_from_values + def test_extract_dw_employees_load_sql_query( task_extract_dw_employee_appointment_complete, @@ -78,14 +80,29 @@ def test_task_transform_employee_appointments_returns_required_load_fields( "Union Name", "Term or Permanent", "Benefits Group Type", + "Key", } == set(df.columns) def test_task_load_employee_appointments_explicit_properties( task_load_employee_appointments, ): - assert task_load_employee_appointments.merge_field == "HR Appointment Key" + assert task_load_employee_appointments.merge_field == "Key" assert ( task_load_employee_appointments.input_task_to_load == "TransformEmployeeAppointments" ) + + +def test_task_transform_employee_appointments_key_expected_from_row_data( + task_transform_employee_appointments_complete, +): + row = task_transform_employee_appointments_complete.get_dataframe().iloc[0] + assert row["Key"] == md5_hash_from_values( + [ + row["MIT ID"], + row["Position ID"], + row["Begin Date"], + row["End Date"], + ] + ) diff --git a/tests/tasks/test_employee_leave.py b/tests/tasks/test_employee_leave.py index b522761..688ef95 100644 --- a/tests/tasks/test_employee_leave.py +++ b/tests/tasks/test_employee_leave.py @@ -1,5 +1,7 @@ # ruff: noqa: PLR2004, PD901, SLF001 +from hrqb.utils import md5_hash_from_values + def test_extract_dw_employee_leave_load_sql_query( task_extract_dw_employee_leave_complete, @@ -26,34 +28,17 @@ def test_task_transform_employee_leave_oracle_bools_converted( assert row["Accrue Seniority"] == "Yes" -def test_task_transform_employee_leave_create_key_from_md5_of_leave_data( - task_transform_employee_leave_complete, -): - assert ( - task_transform_employee_leave_complete._create_unique_key_from_leave_data( - "123456789", "2010-07-01", "Vacation", "8.0" - ) - == "ee9af1a7908735241aeb5c228e2e00fa" - ) - assert ( - task_transform_employee_leave_complete._create_unique_key_from_leave_data( - "123456789", "2010-07-01", "Vacation", "None" - ) - == "767d2672e2e6b38a7c65f0ea2f799067" - ) - - def test_task_transform_employee_leave_key_expected_from_row_data( task_transform_employee_leave_complete, ): row = task_transform_employee_leave_complete.get_dataframe().iloc[0] - assert row[ - "Key" - ] == task_transform_employee_leave_complete._create_unique_key_from_leave_data( - row["MIT ID"], - row["Leave Date"], - row["Related Leave Type"], - str(row["Duration Hours"]), + assert row["Key"] == md5_hash_from_values( + [ + row["MIT ID"], + row["Leave Date"], + row["Related Leave Type"], + str(row["Duration Hours"]), + ] ) diff --git a/tests/tasks/test_employee_salary_history.py b/tests/tasks/test_employee_salary_history.py index fa908f7..90e1d16 100644 --- a/tests/tasks/test_employee_salary_history.py +++ b/tests/tasks/test_employee_salary_history.py @@ -1,5 +1,7 @@ # ruff: noqa: PLR2004, PD901 +from hrqb.utils import md5_hash_from_values + def test_extract_dw_employee_salary_history_load_sql_query( task_extract_dw_employee_salary_history_complete, @@ -29,11 +31,32 @@ def test_task_transform_employee_salary_history_efforts_are_percents( def test_task_load_employee_salary_history_explicit_properties( task_load_employee_salary_history_complete, ): - assert ( - task_load_employee_salary_history_complete.merge_field - == "HR Appointment Transaction Key" - ) + assert task_load_employee_salary_history_complete.merge_field == "Key" assert ( task_load_employee_salary_history_complete.input_task_to_load == "TransformEmployeeSalaryHistory" ) + + +def test_task_transform_employee_salary_history_key_expected_from_input_data( + task_transform_employee_salary_history_complete, + task_shared_extract_qb_employee_appointments_complete, +): + # NOTE: for this test, need to manually get part of the employee appointment data + # that this transform is using when generating the unique MD5 Key field value + qb_emp_appt_row = ( + task_shared_extract_qb_employee_appointments_complete.target.read().iloc[0] + ) + emp_salary_row = task_transform_employee_salary_history_complete.get_dataframe().iloc[ + 0 + ] + assert emp_salary_row["Key"] == md5_hash_from_values( + [ + emp_salary_row["MIT ID"], + qb_emp_appt_row["Position ID"], + qb_emp_appt_row["Begin Date"], + qb_emp_appt_row["End Date"], + emp_salary_row["Start Date"], + emp_salary_row["End Date"], + ] + ) diff --git a/tests/test_utils.py b/tests/test_utils.py index a8663bd..4f23418 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -4,11 +4,13 @@ import numpy as np import pandas as pd +import pytest from freezegun import freeze_time from hrqb.utils import ( click_argument_to_dict, convert_oracle_bools_to_qb_bools, + md5_hash_from_values, normalize_dataframe_dates, normalize_date, today_date, @@ -113,3 +115,26 @@ def test_convert_oracle_bools_to_qb_bools_success(): ] ) ) + + +def test_md5_hash_from_values_gives_expected_results(): + assert md5_hash_from_values(["a", "b", "c"]) == "2e077b3ec5932ac3cf914ebdf242b4ee" + + +def test_md5_hash_from_values_order_of_strings_different_results(): + assert md5_hash_from_values(["a", "b"]) != md5_hash_from_values(["b", "a"]) + + +def test_md5_hash_from_values_raise_error_for_non_string_value(): + with pytest.raises(TypeError, match="NoneType found"): + md5_hash_from_values(["a", "b", None]) + with pytest.raises(TypeError, match="int found"): + md5_hash_from_values(["a", "b", 42]) + with pytest.raises(TypeError, match="datetime found"): + md5_hash_from_values( + [ + "a", + "b", + datetime.datetime(2000, 1, 1, tzinfo=datetime.UTC), + ] + )