Skip to content

Commit

Permalink
Merge pull request #67 from MITLibraries/HRQB-35-generate-performance…
Browse files Browse the repository at this point in the history
…-reviews

HRQB 35 - Generated Performance Review records from Employee Appointments
  • Loading branch information
ghukill authored Jun 18, 2024
2 parents cd654e9 + b264bc7 commit 9320bf7
Show file tree
Hide file tree
Showing 11 changed files with 551 additions and 0 deletions.
178 changes: 178 additions & 0 deletions hrqb/tasks/performance_reviews.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
"""hrqb.tasks.performance_reviews"""

import datetime

import luigi # type: ignore[import-untyped]
import pandas as pd
from dateutil.relativedelta import relativedelta # type: ignore[import-untyped]

from hrqb.base.task import PandasPickleTask, QuickbaseUpsertTask
from hrqb.utils import (
convert_dataframe_columns_to_dates,
md5_hash_from_values,
normalize_dataframe_dates,
today_date,
)


class TransformPerformanceReviews(PandasPickleTask):
stage = luigi.Parameter("Transform")

def requires(self) -> list[luigi.Task]: # pragma: nocover
from hrqb.tasks.shared import ExtractQBEmployeeAppointments

return [ExtractQBEmployeeAppointments(pipeline=self.pipeline)]

def get_dataframe(self) -> pd.DataFrame:
"""Build dataframe of performance reviews from employee appointments."""
emp_appts_df = self._get_employee_appointments()

# loop through all appointments and create dataframe of performance reviews
reviews: list[dict] = []
for _, emp_appt_row in emp_appts_df.iterrows():
reviews.append(self._get_three_month_review(emp_appt_row))
reviews.append(self._get_six_month_review(emp_appt_row))
reviews.extend(self._get_annual_reviews(emp_appt_row))
perf_revs_df = pd.DataFrame(reviews)

perf_revs_df = normalize_dataframe_dates(
perf_revs_df,
[
"review_date",
"period_start_date",
"period_end_date",
],
)

# mint a unique, deterministic value for the merge "Key" field
perf_revs_df["key"] = perf_revs_df.apply(
lambda row: md5_hash_from_values(
[
row.mit_id,
row.review_type,
row.review_year,
]
),
axis=1,
)

fields = {
"mit_id": "MIT ID",
"employee_appointment_id": "Related Employee Appointment",
"review_type": "Review Type",
"period_start_date": "Period Covered Start Date",
"period_end_date": "Period Covered End Date",
"review_date": "Date of Review",
"review_year": "Related Year",
"key": "Key",
}
return perf_revs_df[fields.keys()].rename(columns=fields)

def _get_employee_appointments(self) -> pd.DataFrame:
"""Get employee appointments from Quickbase."""
emp_appts_df = self.named_inputs["ExtractQBEmployeeAppointments"].read()
emp_appt_fields = {
"MIT ID": "mit_id",
"Record ID#": "employee_appointment_id",
"Begin Date": "appointment_begin_date",
"End Date": "appointment_end_date",
"Related Employee Type": "employee_type",
"Union Name": "union_name",
"Exempt / NE": "exempt",
}
emp_appts_df = emp_appts_df.rename(columns=emp_appt_fields)[
emp_appt_fields.values()
]
return convert_dataframe_columns_to_dates(
emp_appts_df, ["appointment_begin_date", "appointment_end_date"]
)

def _get_three_month_review(self, emp_appt_row: pd.Series) -> dict:
review_date = emp_appt_row.appointment_begin_date + relativedelta(months=+3)
return {
"mit_id": emp_appt_row.mit_id,
"employee_appointment_id": emp_appt_row.employee_appointment_id,
"review_type": "3 Month Review",
"review_date": review_date,
"period_start_date": emp_appt_row.appointment_begin_date,
"period_end_date": review_date,
"review_year": str(review_date.year),
}

def _get_six_month_review(self, emp_appt_row: pd.Series) -> dict:
review_date = emp_appt_row.appointment_begin_date + relativedelta(months=+6)
return {
"mit_id": emp_appt_row.mit_id,
"employee_appointment_id": emp_appt_row.employee_appointment_id,
"review_type": "6 Month Review",
"review_date": review_date,
"period_start_date": emp_appt_row.appointment_begin_date,
"period_end_date": review_date,
"review_year": str(review_date.year),
}

def _get_annual_reviews(self, emp_appt_row: pd.Series) -> list[dict]:
"""Get annual performance reviews for an appointment.
This method begins with the appointment start year, with a minimum of 2019, then
adds performance reviews through current year + 1.
If an annual performance review would fall inside of a 3 or 6 month review, it is
not included.
NOTE: as of 6/17/2024, HR is in the process of re-evaluating annual review
timeframes. The cadence and review dates set below are placeholders until
that is finalized.
"""
start_year = max([emp_appt_row.appointment_begin_date.year, 2019])
end_year = today_date().year + 2

review_month = 7 if emp_appt_row.exempt else 8

reviews = []
for year in range(start_year, end_year):
review_end_date = datetime.datetime(
year, review_month, 1, tzinfo=datetime.UTC
)
review_start_date = review_end_date - relativedelta(years=1)

# if annual review is less than 6 month review, skip
six_month_review_date = self._get_six_month_review(emp_appt_row)[
"review_date"
]
if review_end_date <= six_month_review_date:
continue

reviews.append(
{
"mit_id": emp_appt_row.mit_id,
"employee_appointment_id": emp_appt_row.employee_appointment_id,
"review_type": "Annual",
"period_start_date": review_start_date,
"period_end_date": review_end_date,
"review_date": review_end_date,
"review_year": str(year),
}
)
return reviews


class LoadPerformanceReviews(QuickbaseUpsertTask):
table_name = luigi.Parameter("Performance Reviews")
stage = luigi.Parameter("Load")

def requires(self) -> list[luigi.Task]: # pragma: nocover
from hrqb.tasks.years import LoadYears

return [
LoadYears(pipeline=self.pipeline),
TransformPerformanceReviews(pipeline=self.pipeline),
]

@property
def merge_field(self) -> str | None:
return "Key"

@property
def input_task_to_load(self) -> str | None:
return "TransformPerformanceReviews"
2 changes: 2 additions & 0 deletions hrqb/tasks/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ def requires(self) -> Iterator[luigi.Task]: # pragma: no cover
from hrqb.tasks.employee_leave import LoadEmployeeLeave
from hrqb.tasks.employee_salary_history import LoadEmployeeSalaryHistory
from hrqb.tasks.employees import LoadEmployees
from hrqb.tasks.performance_reviews import LoadPerformanceReviews

yield LoadEmployees(pipeline=self.pipeline_name)
yield LoadEmployeeAppointments(pipeline=self.pipeline_name)
yield LoadEmployeeSalaryHistory(pipeline=self.pipeline_name)
yield LoadEmployeeLeave(pipeline=self.pipeline_name)
yield LoadPerformanceReviews(pipeline=self.pipeline_name)


class UpdateLibHRData(HRQBPipelineTask):
Expand Down
43 changes: 43 additions & 0 deletions hrqb/tasks/years.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
"""hrqb.tasks.years"""

import luigi # type: ignore[import-untyped]
import pandas as pd

from hrqb.base.task import PandasPickleTask, QuickbaseUpsertTask
from hrqb.utils import today_date


class TransformYears(PandasPickleTask):
stage = luigi.Parameter("Transform")

def requires(self) -> list[luigi.Task]: # pragma: nocover
from hrqb.tasks.performance_reviews import TransformPerformanceReviews

return [TransformPerformanceReviews(pipeline=self.pipeline)]

def get_dataframe(self) -> pd.DataFrame:
perf_revs_df = self.single_input_dataframe
perf_revs_df = perf_revs_df.rename(columns={"Related Year": "year"})

years_df = perf_revs_df.drop_duplicates("year").copy()
years_df["year"] = years_df["year"].astype(int)
years_df["active"] = years_df["year"] == today_date().year
years_df["year"] = years_df["year"].astype(str)

fields = {
"year": "Year",
"active": "Active Year",
}
return years_df[fields.keys()].rename(columns=fields)


class LoadYears(QuickbaseUpsertTask):
table_name = luigi.Parameter("Years")
stage = luigi.Parameter("Load")

@property
def merge_field(self) -> str | None:
return "Year" # pragma: nocover

def requires(self) -> list[luigi.Task]: # pragma: nocover
return [TransformYears(pipeline=self.pipeline)]
25 changes: 25 additions & 0 deletions hrqb/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,31 @@ def normalize_date(date: str | datetime.datetime) -> str | None:
return None


def convert_dataframe_columns_to_dates(
df: pd.DataFrame, columns: list[str]
) -> pd.DataFrame:
"""Convert select columns from a dataframe to datetime objects.
This more manual approach avoids a pandas error with pd.to_datetime() when the date
exceeds 2262-04-11. Normally this would not be a problem, but employee appointments
that are ongoing receive a datetime of 2999-12-31. See: https://pandas.pydata.org/
pandas-docs/stable/user_guide/timeseries.html#timestamp-limitations.
"""

def convert_to_date(
value: str | datetime.datetime | pd.Timestamp,
) -> datetime.datetime | pd.Timestamp | None:
if isinstance(value, str):
return date_parser(value).replace(tzinfo=datetime.UTC)
if isinstance(value, datetime.datetime | pd.Timestamp):
return value.replace(tzinfo=datetime.UTC)
return None

for column in columns:
df[column] = df[column].apply(lambda x: convert_to_date(x))
return df


def normalize_dataframe_dates(df: pd.DataFrame, columns: list[str]) -> pd.DataFrame:
df[columns] = df[columns].map(normalize_date)
return df
Expand Down
26 changes: 26 additions & 0 deletions hrqb/utils/quickbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,3 +247,29 @@ def get_table_as_df(
records,
columns=table_fields_df.label,
)

def delete_records(self, table_id: str, where_clause: str) -> dict:
"""Deleted Records from a Table given a where clause.
https://developer.quickbase.com/operation/deleteRecords
"""
return self.make_request(
requests.delete,
"records",
cache=False,
json={
"from": table_id,
"where": where_clause,
},
)

def delete_all_table_records(self, table_id: str) -> dict:
"""Delete all records from a Table.
This is accomplished by retrieving table fields, identifying the 'Record ID#'
field ID, and then creating a query that deletes all records where record id is
greater than 0 (this is the suggested method for truncating a QB table).
"""
table = self.get_table(table_id)
key_field_id = table["keyFieldId"]
return self.delete_records(table_id, f"{{{key_field_id}.GT.0}}")
57 changes: 57 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,25 @@ def mocked_qb_api_getTable(qbclient, global_requests_mock, mocked_table_id):
return api_response


@pytest.fixture
def mocked_delete_payload(mocked_table_id):
return {"from": mocked_table_id, "where": "{3.GT.0}"}


@pytest.fixture
def mocked_qb_api_delete_records(qbclient, mocked_delete_payload, global_requests_mock):
url = f"{qbclient.api_base}/records"
with open("tests/fixtures/qb_api_responses/deleteRecords.json") as f:
api_response = json.load(f)
global_requests_mock.register_uri(
"DELETE",
url,
additional_matcher=lambda req: req.json() == mocked_delete_payload,
json=api_response,
)
return api_response


@pytest.fixture
def qbclient_with_mocked_table_fields(qbclient, mocked_query_table_fields):
with mock.patch.object(type(qbclient), "get_table_fields") as mocked_table_fields:
Expand Down Expand Up @@ -763,6 +782,10 @@ def task_shared_extract_qb_employee_appointments_complete(all_tasks_pipeline_nam
"Position ID": "987654321",
"Begin Date": "2010-01-01",
"End Date": "2011-12-01",
"MIT ID": "123456789",
"Related Employee Type": "Admin Staff",
"Union Name": "Le Union",
"Exempt / NE": "E",
}
]
)
Expand Down Expand Up @@ -861,3 +884,37 @@ def task_transform_employee_leave_types_complete(
task = TransformEmployeeLeaveTypes(pipeline=all_tasks_pipeline_name)
task.run()
return task


@pytest.fixture
def task_transform_performance_reviews_complete(
all_tasks_pipeline_name,
task_shared_extract_qb_employee_appointments_complete,
):
from hrqb.tasks.performance_reviews import TransformPerformanceReviews

task = TransformPerformanceReviews(pipeline=all_tasks_pipeline_name)
task.run()
return task


@pytest.fixture
def task_load_performance_reviews_complete(
all_tasks_pipeline_name,
task_transform_performance_reviews_complete,
):
from hrqb.tasks.performance_reviews import LoadPerformanceReviews

return LoadPerformanceReviews(pipeline=all_tasks_pipeline_name)


@pytest.fixture
def task_transform_years_complete(
all_tasks_pipeline_name,
task_transform_performance_reviews_complete,
):
from hrqb.tasks.years import TransformYears

task = TransformYears(pipeline=all_tasks_pipeline_name)
task.run()
return task
3 changes: 3 additions & 0 deletions tests/fixtures/qb_api_responses/deleteRecords.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"numberDeleted": 1
}
8 changes: 8 additions & 0 deletions tests/tasks/test_lookup_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,11 @@ def test_task_transform_employee_leave_types_required_fields(
assert {"Leave Type", "Paid Leave", "Accrue Seniority"} == set(
task_transform_employee_leave_types_complete.get_dataframe().columns
)


def test_task_transform_years_required_fields(
task_transform_years_complete,
):
assert {"Year", "Active Year"} == set(
task_transform_years_complete.get_dataframe().columns
)
Loading

0 comments on commit 9320bf7

Please sign in to comment.