Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HRQB 40 - Fix merges without volatile 'HR Appointment Key' #115

Merged
merged 2 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 21 additions & 7 deletions hrqb/tasks/employee_appointments.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,11 @@ def get_dataframe(self) -> pd.DataFrame:

# mint a unique, deterministic value for the merge "Key" field
emp_appts_df["key"] = emp_appts_df.apply(
lambda row: md5_hash_from_values(
[
row.mit_id,
row.position_id,
row.appt_begin_date,
row.appt_end_date,
]
lambda row: self.generate_merge_key(
row.mit_id,
row.position_id,
row.appt_begin_date,
row.appt_end_date,
),
axis=1,
)
Expand Down Expand Up @@ -133,6 +131,22 @@ def get_dataframe(self) -> pd.DataFrame:

return emp_appts_df[fields.keys()].rename(columns=fields)

@staticmethod
def generate_merge_key(
mit_id: str,
position_id: str,
appt_begin_date: str,
appt_end_date: str,
) -> str:
return md5_hash_from_values(
[
mit_id,
position_id,
appt_begin_date,
appt_end_date,
]
Comment on lines +135 to +147

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that this function gives us more information on which fields (and their data types) are selected as merge keys!

)


class LoadEmployeeAppointments(QuickbaseUpsertTask):

Expand Down
39 changes: 32 additions & 7 deletions hrqb/tasks/employee_leave.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
import pandas as pd

from hrqb.base.task import (
HRQBTask,
PandasPickleTask,
QuickbaseUpsertTask,
SQLQueryExtractTask,
)
from hrqb.tasks.employee_appointments import TransformEmployeeAppointments
from hrqb.utils import (
convert_oracle_bools_to_qb_bools,
md5_hash_from_values,
Expand Down Expand Up @@ -44,17 +46,28 @@ def get_dataframe(self) -> pd.DataFrame:
dw_leaves_df = self.named_inputs["ExtractDWEmployeeLeave"].read()
qb_emp_appts_df = self.named_inputs["ExtractQBEmployeeAppointments"].read()

qb_emp_appts_df = qb_emp_appts_df[["HR Appointment Key", "Record ID#"]].rename(
# join Employee Appointments from QB to get QB Record ID
dw_leaves_df = normalize_dataframe_dates(
dw_leaves_df,
["appt_begin_date", "appt_end_date", "absence_date"],
)
dw_leaves_df["emp_appt_merge_key"] = dw_leaves_df.apply(
lambda row: TransformEmployeeAppointments.generate_merge_key(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Non-blocking] Hmm, is there a reason why generate_merge_key needs to be a static method of the TransformEmployeeAppointments class as opposed to a standalone util method? 🤔 Later in the module, a key is created for leaves_df--should TransformEmployeeLeave get its own static method generate_merge_key? 🤔

Copy link
Collaborator Author

@ghukill ghukill Jul 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reasoning here is kind of twofold.

The logic for generating a merge field key on TransformEmployeeAppointments is needed by other tasks, but it's specific to how that task does it, so keeping it associated with that task felt like it made sense.

Related -- at least not yet -- the other tasks that generate a merge key, no other tasks need to know how they do it. If that changes, then I think a natural refactor would be to drop it into an externally available @staticmethod.

If this pattern continues where tasks need to be able to recreate the merge key for other tasks, it might be worth abstracting that out to the base tasks, and then each task just identifies what row data is used for it. But I don't think we're quite there yet. This could be the end of this need.... or not!

row.mit_id,
row.position_id,
row.appt_begin_date,
row.appt_end_date,
),
axis=1,
)
qb_emp_appts_df = qb_emp_appts_df[["Key", "Record ID#"]].rename(
columns={
"HR Appointment Key": "hr_appt_key",
"Key": "emp_appt_merge_key",
"Record ID#": "related_employee_appointment_id",
}
)
leaves_df = dw_leaves_df.merge(qb_emp_appts_df, how="left", on="hr_appt_key")

leaves_df = normalize_dataframe_dates(
leaves_df,
["appt_begin_date", "appt_end_date", "absence_date"],
leaves_df = dw_leaves_df.merge(
qb_emp_appts_df, how="left", on="emp_appt_merge_key"
)

# WIP TODO: determine what data points from combination employee leave and
Expand Down Expand Up @@ -92,6 +105,18 @@ def get_dataframe(self) -> pd.DataFrame:
}
return leaves_df[fields.keys()].rename(columns=fields)

@HRQBTask.integrity_check
def all_rows_have_employee_appointments(self, output_df: pd.DataFrame) -> None:
missing_appointment_count = len(
output_df[output_df["Related Employee Appointment"].isna()]
)
if missing_appointment_count > 0:
message = (
f"{missing_appointment_count} rows are missing an Employee "
f"Appointment for task '{self.name}'"
)
raise ValueError(message)


class LoadEmployeeLeave(QuickbaseUpsertTask):

Expand Down
47 changes: 35 additions & 12 deletions hrqb/tasks/employee_salary_history.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@
import luigi # type: ignore[import-untyped]
import pandas as pd

from hrqb.base.task import PandasPickleTask, QuickbaseUpsertTask, SQLQueryExtractTask
from hrqb.base.task import (
HRQBTask,
PandasPickleTask,
QuickbaseUpsertTask,
SQLQueryExtractTask,
)
from hrqb.tasks.employee_appointments import TransformEmployeeAppointments
from hrqb.utils import md5_hash_from_values, normalize_dataframe_dates


Expand Down Expand Up @@ -32,17 +38,22 @@ def get_dataframe(self) -> pd.DataFrame:
dw_salary_df = self.named_inputs["ExtractDWEmployeeSalaryHistory"].read()

# merge with employee appointment data for QB appointment record identifier
dw_salary_df = normalize_dataframe_dates(
dw_salary_df,
["appt_begin_date", "appt_end_date", "start_date", "end_date"],
)
dw_salary_df["emp_appt_merge_key"] = dw_salary_df.apply(
lambda row: TransformEmployeeAppointments.generate_merge_key(
row.mit_id,
row.position_id,
row.appt_begin_date,
row.appt_end_date,
),
axis=1,
)
qb_emp_appts_df = self._get_employee_appointments()
salary_df = dw_salary_df.merge(
qb_emp_appts_df,
how="left",
left_on="hr_appt_key",
right_on="HR Appointment Key",
)

salary_df = normalize_dataframe_dates(
salary_df,
["start_date", "end_date"],
qb_emp_appts_df, how="left", on="emp_appt_merge_key"
)

# convert efforts to percentages
Expand Down Expand Up @@ -70,7 +81,6 @@ def get_dataframe(self) -> pd.DataFrame:
)

fields = {
"hr_appt_tx_key": "HR Appointment Transaction Key",
"mit_id": "MIT ID",
"related_employee_appointment_id": "Related Employee Appointment",
"hr_personnel_action": "Related Salary Change Type",
Expand All @@ -94,13 +104,14 @@ def _get_employee_appointments(self) -> pd.DataFrame:
return qb_emp_appts_df[
[
"Record ID#",
"HR Appointment Key",
"Key",
"Begin Date",
"End Date",
]
].rename(
columns={
"Record ID#": "related_employee_appointment_id",
"Key": "emp_appt_merge_key",
"Begin Date": "appointment_begin_date",
"End Date": "appointment_end_date",
}
Expand Down Expand Up @@ -140,6 +151,18 @@ def _set_base_salary_change_percent(self, salary_df: pd.DataFrame) -> pd.DataFra
)
return new_salary_df

@HRQBTask.integrity_check
def all_rows_have_employee_appointments(self, output_df: pd.DataFrame) -> None:
missing_appointment_count = len(
output_df[output_df["Related Employee Appointment"].isna()]
)
if missing_appointment_count > 0:
message = (
f"{missing_appointment_count} rows are missing an Employee "
f"Appointment for task '{self.name}'"
)
raise ValueError(message)


class LoadEmployeeSalaryHistory(QuickbaseUpsertTask):
table_name = luigi.Parameter("Employee Salary History")
Expand Down
6 changes: 5 additions & 1 deletion hrqb/tasks/sql/employee_leave.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ Query for Employee Leaves.

CHANGELOG
- 2024-05-13 Query created and added
- 2024-07-23 Added HR_POSITION.POSITION ID to select
*/

select distinct
appt.MIT_ID,
appt.APPT_BEGIN_DATE,
appt.APPT_END_DATE,
p.POSITION_ID,
appt.HR_APPT_KEY,
abse.ABSENCE_DATE,
at.ABSENCE_TYPE,
Expand All @@ -21,7 +23,9 @@ select distinct
end as PAID_LEAVE
from HR_ABSENCE_DETAIL abse
inner join HR_APPOINTMENT_DETAIL appt on abse.MIT_ID = appt.MIT_ID
inner join HR_ABSENCE_TYPE at on at.HR_ABSENCE_TYPE_KEY = abse.HR_ABSENCE_TYPE_KEY where (
inner join HR_ABSENCE_TYPE at on at.HR_ABSENCE_TYPE_KEY = abse.HR_ABSENCE_TYPE_KEY
left join HR_POSITION p on p.HR_POSITION_KEY = appt.HR_POSITION_KEY
where (
appt.APPT_END_DATE >= TO_DATE('2019-01-01', 'YYYY-MM-DD')
and abse.ABSENCE_DATE between appt.APPT_BEGIN_DATE and appt.APPT_END_DATE
)
Expand Down
4 changes: 4 additions & 0 deletions hrqb/tasks/sql/employee_salary_history.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Notes:
CHANGELOG
- 2024-05-31 Query created and added
- 2024-06-03 Date limit to appointments ending after 2019-01-01
- 2024-07-24 Added appointment begin/end date to help match appointments
*/

select distinct
Expand All @@ -16,6 +17,8 @@ select distinct
a.MIT_ID,
j.JOB_ID,
p.POSITION_ID,
appt.APPT_BEGIN_DATE,
appt.APPT_END_DATE,
a.APPT_TX_BEGIN_DATE as START_DATE,
a.APPT_TX_END_DATE as END_DATE,
at.HR_PERSONNEL_ACTION_TYPE_KEY,
Expand All @@ -37,6 +40,7 @@ select distinct
else 'N'
end as SPECIAL_ONETIME_PAY
from HR_APPT_TX_DETAIL a
inner join HR_APPOINTMENT_DETAIL appt on appt.HR_APPT_KEY = a.HR_APPT_KEY
left join HR_PERSONNEL_ACTION_TYPE at on at.HR_PERSONNEL_ACTION_TYPE_KEY = a.HR_PERSONNEL_ACTION_TYPE_KEY
left join HR_JOB j on j.HR_JOB_KEY = a.HR_JOB_KEY
left join HR_POSITION p on p.HR_POSITION_KEY = a.HR_POSITION_KEY
Expand Down
15 changes: 9 additions & 6 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -749,8 +749,10 @@ def task_extract_dw_employee_salary_history_complete(all_tasks_pipeline_name):
"mit_id": "123456789",
"job_id": "123456789",
"position_id": "987654321",
"start_date": Timestamp("2010-07-01 00:00:00"),
"end_date": datetime.datetime(2011, 12, 1, 0, 0),
"appt_begin_date": Timestamp("2019-01-01 00:00:00"),
"appt_end_date": datetime.datetime(2022, 12, 1, 0, 0),
"start_date": Timestamp("2019-07-01 00:00:00"),
"end_date": datetime.datetime(2022, 12, 1, 0, 0),
"hr_personnel_action_type_key": "CS01",
"hr_personnel_action": "Annual Salary Review",
"hr_action_reason": "Review Increase",
Expand Down Expand Up @@ -779,14 +781,14 @@ def task_shared_extract_qb_employee_appointments_complete(all_tasks_pipeline_nam
[
{
"Record ID#": 12000,
"HR Appointment Key": 123.0,
"Position ID": "987654321",
"Begin Date": "2019-01-01",
"End Date": "2022-12-01",
"MIT ID": "123456789",
"Related Employee Type": "Admin Staff",
"Union Name": "Le Union",
"Exempt / NE": "E",
"Key": "868ce513323c5391ae8afaa0ceb70c69",
}
]
)
Expand Down Expand Up @@ -837,10 +839,11 @@ def task_extract_dw_employee_leave_complete(all_tasks_pipeline_name):
[
{
"mit_id": "123456789",
"appt_begin_date": Timestamp("2010-01-01 00:00:00"),
"appt_end_date": datetime.datetime(2011, 12, 1, 0, 0),
"appt_begin_date": Timestamp("2019-01-01 00:00:00"),
"appt_end_date": datetime.datetime(2022, 12, 1, 0, 0),
"position_id": "987654321",
"hr_appt_key": 123,
"absence_date": Timestamp("2010-07-01 00:00:00"),
"absence_date": Timestamp("2019-07-01 00:00:00"),
Comment on lines -840 to +846
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This update, while not strictly required here, was just to better align mocked data in fixtures.

"absence_type": "Vacation",
"absence_type_code": "VACA",
"actual_absence_hours": 8.0,
Expand Down
2 changes: 1 addition & 1 deletion tests/tasks/test_shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ def test_task_extract_qb_employee_appointments_complete_has_required_fields(
):
required_columns = [
"Record ID#",
"HR Appointment Key",
"Key",
]
assert set(required_columns).issubset(
set(task_shared_extract_qb_employee_appointments_complete.target.read().columns)
Expand Down
Loading