Skip to content

Commit

Permalink
Mint MD5 hash values for Key field
Browse files Browse the repository at this point in the history
Why these changes are being introduced:

It was determined that columns used from the data warehouse, expected to be
enduringly the same and unique, where changing across warehouse runs.  These
values were used for Quickbase merge fields, but this would not work if the
values changed.

A value was needed that uniquely identifies the row for a handful of tables,
across warehouse runs over time.

How this addresses that need:
* Updates multiple tables to mint an MD5 value from an ordered list of
field values that are known to uniquely identify a row

Side effects of this change:
* None

Relevant ticket(s):
* https://mitlibraries.atlassian.net/browse/HRQB-36
  • Loading branch information
ghukill committed Jun 14, 2024
1 parent d315d77 commit 4929e52
Show file tree
Hide file tree
Showing 9 changed files with 156 additions and 76 deletions.
19 changes: 16 additions & 3 deletions hrqb/tasks/employee_appointments.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import pandas as pd

from hrqb.base.task import PandasPickleTask, QuickbaseUpsertTask, SQLQueryExtractTask
from hrqb.utils import normalize_dataframe_dates
from hrqb.utils import md5_hash_from_values, normalize_dataframe_dates
from hrqb.utils.quickbase import QBClient


Expand Down Expand Up @@ -95,6 +95,19 @@ def get_dataframe(self) -> pd.DataFrame:
],
)

# mint a unique, deterministic value for the merge "Key" field
emp_appts_df["key"] = emp_appts_df.apply(
lambda row: md5_hash_from_values(
[
row.mit_id,
row.position_id,
row.appt_begin_date,
row.appt_end_date,
]
),
axis=1,
)

fields = {
"hr_appt_key": "HR Appointment Key",
"mit_id": "MIT ID",
Expand All @@ -116,6 +129,7 @@ def get_dataframe(self) -> pd.DataFrame:
"union_name": "Union Name",
"term_or_perm": "Term or Permanent",
"benefits_group_type": "Benefits Group Type",
"key": "Key",
}

return emp_appts_df[fields.keys()].rename(columns=fields)
Expand All @@ -140,9 +154,8 @@ def requires(self) -> list[luigi.Task]: # pragma: nocover

@property
def merge_field(self) -> str | None:
return "HR Appointment Key"
return "Key"

@property
def input_task_to_load(self) -> str:
"""Upsert data from parent task 'TransformEmployeeAppointments'."""
return "TransformEmployeeAppointments"
52 changes: 13 additions & 39 deletions hrqb/tasks/employee_leave.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

# ruff: noqa: S324

import hashlib

import luigi # type: ignore[import-untyped]
import pandas as pd
Expand All @@ -12,7 +11,11 @@
QuickbaseUpsertTask,
SQLQueryExtractTask,
)
from hrqb.utils import convert_oracle_bools_to_qb_bools, normalize_dataframe_dates
from hrqb.utils import (
convert_oracle_bools_to_qb_bools,
md5_hash_from_values,
normalize_dataframe_dates,
)


class ExtractDWEmployeeLeave(SQLQueryExtractTask):
Expand Down Expand Up @@ -63,13 +66,15 @@ def get_dataframe(self) -> pd.DataFrame:
leaves_df, columns=["paid_leave", "accrue_seniority"]
)

# mint a unique, deterministic value for the "Key" field in Employee Leave table
# mint a unique, deterministic value for the merge "Key" field
leaves_df["key"] = leaves_df.apply(
lambda row: self._create_unique_key_from_leave_data(
row.mit_id,
row.absence_date,
row.absence_type,
row.actual_absence_hours,
lambda row: md5_hash_from_values(
[
row.mit_id,
row.absence_date,
row.absence_type,
str(row.actual_absence_hours),
]
),
axis=1,
)
Expand All @@ -87,37 +92,6 @@ def get_dataframe(self) -> pd.DataFrame:
}
return leaves_df[fields.keys()].rename(columns=fields)

@staticmethod
def _create_unique_key_from_leave_data(
mit_id: str,
absence_date: str,
absence_type: str,
absence_hours: str | float,
) -> str:
"""Create unique, deterministic MD5 hash based on select leave attributes.
Most tables in Quickbase have a complimentary unique value in the data warehouse
to use as a merge field: Employees have MIT ID, Employee Appointments have "HR Job
Key", Salary adjustments have "HR TXN (Transaction) Key", etc. However, absence
data in the data warehouse does not have such a column or value that can be used
to unambiguously and deterministically identify an absence row.
For this reason, an MD5 hash is created from attributes from the leave data that,
in combination, is unique: MIT ID and Absence date, type, and duration. By
including this MD5 hash in Quickbase as a field, we can use this in future data
loads as a merge field to avoid duplicating what are effectively the same absence
record.
"""
data_string = "|".join(
[
mit_id,
absence_date,
absence_type,
str(absence_hours),
]
).encode()
return hashlib.md5(data_string).hexdigest()


class LoadEmployeeLeave(QuickbaseUpsertTask):

Expand Down
35 changes: 31 additions & 4 deletions hrqb/tasks/employee_salary_history.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import pandas as pd

from hrqb.base.task import PandasPickleTask, QuickbaseUpsertTask, SQLQueryExtractTask
from hrqb.utils import normalize_dataframe_dates
from hrqb.utils import md5_hash_from_values, normalize_dataframe_dates


class ExtractDWEmployeeSalaryHistory(SQLQueryExtractTask):
Expand Down Expand Up @@ -33,8 +33,19 @@ def get_dataframe(self) -> pd.DataFrame:
qb_emp_appts_df = self.named_inputs["ExtractQBEmployeeAppointments"].read()

# merge with employee appointment data for QB appointment record identifier
qb_emp_appts_df = qb_emp_appts_df[["Record ID#", "HR Appointment Key"]].rename(
columns={"Record ID#": "related_employee_appointment_id"}
qb_emp_appts_df = qb_emp_appts_df[
[
"Record ID#",
"HR Appointment Key",
"Begin Date",
"End Date",
]
].rename(
columns={
"Record ID#": "related_employee_appointment_id",
"Begin Date": "appointment_begin_date",
"End Date": "appointment_end_date",
}
)
salary_df = dw_salary_df.merge(
qb_emp_appts_df,
Expand All @@ -52,6 +63,21 @@ def get_dataframe(self) -> pd.DataFrame:
salary_df["original_effort"] = salary_df["original_effort"] / 100.0
salary_df["temp_effort"] = salary_df["temp_effort"] / 100.0

# mint a unique, deterministic value for the merge "Key" field
salary_df["key"] = salary_df.apply(
lambda row: md5_hash_from_values(
[
row.mit_id,
row.position_id,
row.appointment_begin_date,
row.appointment_end_date,
row.start_date,
row.end_date,
]
),
axis=1,
)

fields = {
"hr_appt_tx_key": "HR Appointment Transaction Key",
"mit_id": "MIT ID",
Expand All @@ -67,6 +93,7 @@ def get_dataframe(self) -> pd.DataFrame:
"temp_change_base_amount": "Temp Base Salary",
"temp_change_hourly_rate": "Temp Hourly",
"temp_effort": "Temp Effort %",
"key": "Key",
}
return salary_df[fields.keys()].rename(columns=fields)

Expand All @@ -85,7 +112,7 @@ def requires(self) -> list[luigi.Task]: # pragma: nocover

@property
def merge_field(self) -> str | None:
return "HR Appointment Transaction Key"
return "Key"

@property
def input_task_to_load(self) -> str | None:
Expand Down
13 changes: 13 additions & 0 deletions hrqb/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""hrqb.utils"""

import datetime
import hashlib
import logging

import click
Expand Down Expand Up @@ -60,3 +61,15 @@ def us_state_abbreviation_to_name(abbreviation: str | None) -> str | None:
if state:
return state.name
return None


def md5_hash_from_values(values: list[str]) -> str:
"""Create an MD5 hash from an ordered list of values.
This function is used by some Transform tasks to mint a deterministic, unique value
that can be used as merge fields in Quickbase during upserts.
NOTE: the order of values in the provided list will affect the MD5 hash created.
"""
data_string = "|".join(values).encode()
return hashlib.md5(data_string).hexdigest() # noqa: S324
5 changes: 4 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ def task_extract_dw_employee_salary_history_complete(all_tasks_pipeline_name):
"mit_id": "123456789",
"job_id": "123456789",
"position_id": "987654321",
"start_date": Timestamp("2010-01-01 00:00:00"),
"start_date": Timestamp("2010-07-01 00:00:00"),
"end_date": datetime.datetime(2011, 12, 1, 0, 0),
"hr_personnel_action_type_key": "CS01",
"hr_personnel_action": "Annual Salary Review",
Expand Down Expand Up @@ -760,6 +760,9 @@ def task_shared_extract_qb_employee_appointments_complete(all_tasks_pipeline_nam
{
"Record ID#": 12000,
"HR Appointment Key": 123.0,
"Position ID": "987654321",
"Begin Date": "2010-01-01",
"End Date": "2011-12-01",
}
]
)
Expand Down
19 changes: 18 additions & 1 deletion tests/tasks/test_employee_appointments.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# ruff: noqa: PLR2004, PD901

from hrqb.utils import md5_hash_from_values


def test_extract_dw_employees_load_sql_query(
task_extract_dw_employee_appointment_complete,
Expand Down Expand Up @@ -78,14 +80,29 @@ def test_task_transform_employee_appointments_returns_required_load_fields(
"Union Name",
"Term or Permanent",
"Benefits Group Type",
"Key",
} == set(df.columns)


def test_task_load_employee_appointments_explicit_properties(
task_load_employee_appointments,
):
assert task_load_employee_appointments.merge_field == "HR Appointment Key"
assert task_load_employee_appointments.merge_field == "Key"
assert (
task_load_employee_appointments.input_task_to_load
== "TransformEmployeeAppointments"
)


def test_task_transform_employee_appointments_key_expected_from_row_data(
task_transform_employee_appointments_complete,
):
row = task_transform_employee_appointments_complete.get_dataframe().iloc[0]
assert row["Key"] == md5_hash_from_values(
[
row["MIT ID"],
row["Position ID"],
row["Begin Date"],
row["End Date"],
]
)
33 changes: 9 additions & 24 deletions tests/tasks/test_employee_leave.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# ruff: noqa: PLR2004, PD901, SLF001

from hrqb.utils import md5_hash_from_values


def test_extract_dw_employee_leave_load_sql_query(
task_extract_dw_employee_leave_complete,
Expand All @@ -26,34 +28,17 @@ def test_task_transform_employee_leave_oracle_bools_converted(
assert row["Accrue Seniority"] == "Yes"


def test_task_transform_employee_leave_create_key_from_md5_of_leave_data(
task_transform_employee_leave_complete,
):
assert (
task_transform_employee_leave_complete._create_unique_key_from_leave_data(
"123456789", "2010-07-01", "Vacation", "8.0"
)
== "ee9af1a7908735241aeb5c228e2e00fa"
)
assert (
task_transform_employee_leave_complete._create_unique_key_from_leave_data(
"123456789", "2010-07-01", "Vacation", "None"
)
== "767d2672e2e6b38a7c65f0ea2f799067"
)


def test_task_transform_employee_leave_key_expected_from_row_data(
task_transform_employee_leave_complete,
):
row = task_transform_employee_leave_complete.get_dataframe().iloc[0]
assert row[
"Key"
] == task_transform_employee_leave_complete._create_unique_key_from_leave_data(
row["MIT ID"],
row["Leave Date"],
row["Related Leave Type"],
str(row["Duration Hours"]),
assert row["Key"] == md5_hash_from_values(
[
row["MIT ID"],
row["Leave Date"],
row["Related Leave Type"],
str(row["Duration Hours"]),
]
)


Expand Down
31 changes: 27 additions & 4 deletions tests/tasks/test_employee_salary_history.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# ruff: noqa: PLR2004, PD901

from hrqb.utils import md5_hash_from_values


def test_extract_dw_employee_salary_history_load_sql_query(
task_extract_dw_employee_salary_history_complete,
Expand Down Expand Up @@ -29,11 +31,32 @@ def test_task_transform_employee_salary_history_efforts_are_percents(
def test_task_load_employee_salary_history_explicit_properties(
task_load_employee_salary_history_complete,
):
assert (
task_load_employee_salary_history_complete.merge_field
== "HR Appointment Transaction Key"
)
assert task_load_employee_salary_history_complete.merge_field == "Key"
assert (
task_load_employee_salary_history_complete.input_task_to_load
== "TransformEmployeeSalaryHistory"
)


def test_task_transform_employee_salary_history_key_expected_from_input_data(
task_transform_employee_salary_history_complete,
task_shared_extract_qb_employee_appointments_complete,
):
# NOTE: for this test, need to manually get part of the employee appointment data
# that this transform is using when generating the unique MD5 Key field value
qb_emp_appt_row = (
task_shared_extract_qb_employee_appointments_complete.target.read().iloc[0]
)
emp_salary_row = task_transform_employee_salary_history_complete.get_dataframe().iloc[
0
]
assert emp_salary_row["Key"] == md5_hash_from_values(
[
emp_salary_row["MIT ID"],
qb_emp_appt_row["Position ID"],
qb_emp_appt_row["Begin Date"],
qb_emp_appt_row["End Date"],
emp_salary_row["Start Date"],
emp_salary_row["End Date"],
]
)
Loading

0 comments on commit 4929e52

Please sign in to comment.