Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HRQB 35 - Generated Performance Review records from Employee Appointments #67

Merged
merged 3 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 178 additions & 0 deletions hrqb/tasks/performance_reviews.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
"""hrqb.tasks.performance_reviews"""

import datetime

import luigi # type: ignore[import-untyped]
import pandas as pd
from dateutil.relativedelta import relativedelta # type: ignore[import-untyped]

from hrqb.base.task import PandasPickleTask, QuickbaseUpsertTask
from hrqb.utils import (
convert_dataframe_columns_to_dates,
md5_hash_from_values,
normalize_dataframe_dates,
today_date,
)


class TransformPerformanceReviews(PandasPickleTask):
stage = luigi.Parameter("Transform")

def requires(self) -> list[luigi.Task]: # pragma: nocover
from hrqb.tasks.shared import ExtractQBEmployeeAppointments

return [ExtractQBEmployeeAppointments(pipeline=self.pipeline)]

def get_dataframe(self) -> pd.DataFrame:
"""Build dataframe of performance reviews from employee appointments."""
emp_appts_df = self._get_employee_appointments()

# loop through all appointments and create dataframe of performance reviews
reviews: list[dict] = []
for _, emp_appt_row in emp_appts_df.iterrows():
reviews.append(self._get_three_month_review(emp_appt_row))
reviews.append(self._get_six_month_review(emp_appt_row))
reviews.extend(self._get_annual_reviews(emp_appt_row))
perf_revs_df = pd.DataFrame(reviews)

perf_revs_df = normalize_dataframe_dates(
perf_revs_df,
[
"review_date",
"period_start_date",
"period_end_date",
],
)

# mint a unique, deterministic value for the merge "Key" field
perf_revs_df["key"] = perf_revs_df.apply(
lambda row: md5_hash_from_values(
[
row.mit_id,
row.review_type,
row.review_year,
]
),
axis=1,
)

fields = {
"mit_id": "MIT ID",
"employee_appointment_id": "Related Employee Appointment",
"review_type": "Review Type",
"period_start_date": "Period Covered Start Date",
"period_end_date": "Period Covered End Date",
"review_date": "Date of Review",
"review_year": "Related Year",
"key": "Key",
}
return perf_revs_df[fields.keys()].rename(columns=fields)

def _get_employee_appointments(self) -> pd.DataFrame:
"""Get employee appointments from Quickbase."""
emp_appts_df = self.named_inputs["ExtractQBEmployeeAppointments"].read()
emp_appt_fields = {
"MIT ID": "mit_id",
"Record ID#": "employee_appointment_id",
"Begin Date": "appointment_begin_date",
"End Date": "appointment_end_date",
"Related Employee Type": "employee_type",
"Union Name": "union_name",
"Exempt / NE": "exempt",
}
emp_appts_df = emp_appts_df.rename(columns=emp_appt_fields)[
emp_appt_fields.values()
]
return convert_dataframe_columns_to_dates(
emp_appts_df, ["appointment_begin_date", "appointment_end_date"]
)

def _get_three_month_review(self, emp_appt_row: pd.Series) -> dict:
review_date = emp_appt_row.appointment_begin_date + relativedelta(months=+3)
return {
"mit_id": emp_appt_row.mit_id,
"employee_appointment_id": emp_appt_row.employee_appointment_id,
"review_type": "3 Month Review",
"review_date": review_date,
"period_start_date": emp_appt_row.appointment_begin_date,
"period_end_date": review_date,
"review_year": str(review_date.year),
}

def _get_six_month_review(self, emp_appt_row: pd.Series) -> dict:
review_date = emp_appt_row.appointment_begin_date + relativedelta(months=+6)
return {
"mit_id": emp_appt_row.mit_id,
"employee_appointment_id": emp_appt_row.employee_appointment_id,
"review_type": "6 Month Review",
"review_date": review_date,
"period_start_date": emp_appt_row.appointment_begin_date,
"period_end_date": review_date,
"review_year": str(review_date.year),
}

def _get_annual_reviews(self, emp_appt_row: pd.Series) -> list[dict]:
"""Get annual performance reviews for an appointment.

This method begins with the appointment start year, with a minimum of 2019, then
adds performance reviews through current year + 1.

If an annual performance review would fall inside of a 3 or 6 month review, it is
not included.

NOTE: as of 6/17/2024, HR is in the process of re-evaluating annual review
timeframes. The cadence and review dates set below are placeholders until
that is finalized.
"""
start_year = max([emp_appt_row.appointment_begin_date.year, 2019])
end_year = today_date().year + 2

review_month = 7 if emp_appt_row.exempt else 8

reviews = []
for year in range(start_year, end_year):
review_end_date = datetime.datetime(
year, review_month, 1, tzinfo=datetime.UTC
)
review_start_date = review_end_date - relativedelta(years=1)

# if annual review is less than 6 month review, skip
six_month_review_date = self._get_six_month_review(emp_appt_row)[
"review_date"
]
if review_end_date <= six_month_review_date:
continue

reviews.append(
{
"mit_id": emp_appt_row.mit_id,
"employee_appointment_id": emp_appt_row.employee_appointment_id,
"review_type": "Annual",
"period_start_date": review_start_date,
"period_end_date": review_end_date,
"review_date": review_end_date,
"review_year": str(year),
}
)
return reviews


class LoadPerformanceReviews(QuickbaseUpsertTask):
table_name = luigi.Parameter("Performance Reviews")
stage = luigi.Parameter("Load")

def requires(self) -> list[luigi.Task]: # pragma: nocover
from hrqb.tasks.years import LoadYears

return [
LoadYears(pipeline=self.pipeline),
TransformPerformanceReviews(pipeline=self.pipeline),
]

@property
def merge_field(self) -> str | None:
return "Key"

@property
def input_task_to_load(self) -> str | None:
return "TransformPerformanceReviews"
2 changes: 2 additions & 0 deletions hrqb/tasks/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ def requires(self) -> Iterator[luigi.Task]: # pragma: no cover
from hrqb.tasks.employee_leave import LoadEmployeeLeave
from hrqb.tasks.employee_salary_history import LoadEmployeeSalaryHistory
from hrqb.tasks.employees import LoadEmployees
from hrqb.tasks.performance_reviews import LoadPerformanceReviews

yield LoadEmployees(pipeline=self.pipeline_name)
yield LoadEmployeeAppointments(pipeline=self.pipeline_name)
yield LoadEmployeeSalaryHistory(pipeline=self.pipeline_name)
yield LoadEmployeeLeave(pipeline=self.pipeline_name)
yield LoadPerformanceReviews(pipeline=self.pipeline_name)


class UpdateLibHRData(HRQBPipelineTask):
Expand Down
43 changes: 43 additions & 0 deletions hrqb/tasks/years.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
"""hrqb.tasks.years"""

import luigi # type: ignore[import-untyped]
import pandas as pd

from hrqb.base.task import PandasPickleTask, QuickbaseUpsertTask
from hrqb.utils import today_date


class TransformYears(PandasPickleTask):
stage = luigi.Parameter("Transform")

def requires(self) -> list[luigi.Task]: # pragma: nocover
from hrqb.tasks.performance_reviews import TransformPerformanceReviews

return [TransformPerformanceReviews(pipeline=self.pipeline)]

def get_dataframe(self) -> pd.DataFrame:
perf_revs_df = self.single_input_dataframe
perf_revs_df = perf_revs_df.rename(columns={"Related Year": "year"})

years_df = perf_revs_df.drop_duplicates("year").copy()
years_df["year"] = years_df["year"].astype(int)
years_df["active"] = years_df["year"] == today_date().year
years_df["year"] = years_df["year"].astype(str)

fields = {
"year": "Year",
"active": "Active Year",
}
return years_df[fields.keys()].rename(columns=fields)


class LoadYears(QuickbaseUpsertTask):
Copy link
Collaborator Author

@ghukill ghukill Jun 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a little unusual that Quickbase has a table dedicated to just years... but that's how it was setup. This Load task ensures that all years from the Performance Reviews ETL tasks that will get written, have a value in the lookup table Years.

table_name = luigi.Parameter("Years")
stage = luigi.Parameter("Load")

@property
def merge_field(self) -> str | None:
return "Year" # pragma: nocover

def requires(self) -> list[luigi.Task]: # pragma: nocover
return [TransformYears(pipeline=self.pipeline)]
25 changes: 25 additions & 0 deletions hrqb/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,31 @@ def normalize_date(date: str | datetime.datetime) -> str | None:
return None


def convert_dataframe_columns_to_dates(
df: pd.DataFrame, columns: list[str]
) -> pd.DataFrame:
"""Convert select columns from a dataframe to datetime objects.

This more manual approach avoids a pandas error with pd.to_datetime() when the date
exceeds 2262-04-11. Normally this would not be a problem, but employee appointments
that are ongoing receive a datetime of 2999-12-31. See: https://pandas.pydata.org/
pandas-docs/stable/user_guide/timeseries.html#timestamp-limitations.
"""

def convert_to_date(
value: str | datetime.datetime | pd.Timestamp,
) -> datetime.datetime | pd.Timestamp | None:
if isinstance(value, str):
return date_parser(value).replace(tzinfo=datetime.UTC)
if isinstance(value, datetime.datetime | pd.Timestamp):
return value.replace(tzinfo=datetime.UTC)
return None

for column in columns:
df[column] = df[column].apply(lambda x: convert_to_date(x))
return df


def normalize_dataframe_dates(df: pd.DataFrame, columns: list[str]) -> pd.DataFrame:
df[columns] = df[columns].map(normalize_date)
return df
Expand Down
26 changes: 26 additions & 0 deletions hrqb/utils/quickbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,3 +247,29 @@ def get_table_as_df(
records,
columns=table_fields_df.label,
)

def delete_records(self, table_id: str, where_clause: str) -> dict:
"""Deleted Records from a Table given a where clause.

https://developer.quickbase.com/operation/deleteRecords
"""
return self.make_request(
requests.delete,
"records",
cache=False,
json={
"from": table_id,
"where": where_clause,
},
)

def delete_all_table_records(self, table_id: str) -> dict:
"""Delete all records from a Table.

This is accomplished by retrieving table fields, identifying the 'Record ID#'
field ID, and then creating a query that deletes all records where record id is
greater than 0 (this is the suggested method for truncating a QB table).
"""
table = self.get_table(table_id)
key_field_id = table["keyFieldId"]
return self.delete_records(table_id, f"{{{key_field_id}.GT.0}}")
57 changes: 57 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,25 @@ def mocked_qb_api_getTable(qbclient, global_requests_mock, mocked_table_id):
return api_response


@pytest.fixture
def mocked_delete_payload(mocked_table_id):
return {"from": mocked_table_id, "where": "{3.GT.0}"}


@pytest.fixture
def mocked_qb_api_delete_records(qbclient, mocked_delete_payload, global_requests_mock):
url = f"{qbclient.api_base}/records"
with open("tests/fixtures/qb_api_responses/deleteRecords.json") as f:
api_response = json.load(f)
global_requests_mock.register_uri(
"DELETE",
url,
additional_matcher=lambda req: req.json() == mocked_delete_payload,
json=api_response,
)
return api_response


@pytest.fixture
def qbclient_with_mocked_table_fields(qbclient, mocked_query_table_fields):
with mock.patch.object(type(qbclient), "get_table_fields") as mocked_table_fields:
Expand Down Expand Up @@ -763,6 +782,10 @@ def task_shared_extract_qb_employee_appointments_complete(all_tasks_pipeline_nam
"Position ID": "987654321",
"Begin Date": "2010-01-01",
"End Date": "2011-12-01",
"MIT ID": "123456789",
"Related Employee Type": "Admin Staff",
"Union Name": "Le Union",
"Exempt / NE": "E",
}
]
)
Expand Down Expand Up @@ -861,3 +884,37 @@ def task_transform_employee_leave_types_complete(
task = TransformEmployeeLeaveTypes(pipeline=all_tasks_pipeline_name)
task.run()
return task


@pytest.fixture
def task_transform_performance_reviews_complete(
all_tasks_pipeline_name,
task_shared_extract_qb_employee_appointments_complete,
):
from hrqb.tasks.performance_reviews import TransformPerformanceReviews

task = TransformPerformanceReviews(pipeline=all_tasks_pipeline_name)
task.run()
return task


@pytest.fixture
def task_load_performance_reviews_complete(
all_tasks_pipeline_name,
task_transform_performance_reviews_complete,
):
from hrqb.tasks.performance_reviews import LoadPerformanceReviews

return LoadPerformanceReviews(pipeline=all_tasks_pipeline_name)


@pytest.fixture
def task_transform_years_complete(
all_tasks_pipeline_name,
task_transform_performance_reviews_complete,
):
from hrqb.tasks.years import TransformYears

task = TransformYears(pipeline=all_tasks_pipeline_name)
task.run()
return task
3 changes: 3 additions & 0 deletions tests/fixtures/qb_api_responses/deleteRecords.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"numberDeleted": 1
}
8 changes: 8 additions & 0 deletions tests/tasks/test_lookup_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,11 @@ def test_task_transform_employee_leave_types_required_fields(
assert {"Leave Type", "Paid Leave", "Accrue Seniority"} == set(
task_transform_employee_leave_types_complete.get_dataframe().columns
)


def test_task_transform_years_required_fields(
task_transform_years_complete,
):
assert {"Year", "Active Year"} == set(
task_transform_years_complete.get_dataframe().columns
)
Loading
Loading