Skip to content

Commit

Permalink
Establish Quickbase API Client
Browse files Browse the repository at this point in the history
Why these changes are being introduced:

One of the primary functions of this app is to load data into Quickbase via its
API.  The API has some awkward edges, like the need to upsert data with Field IDs
vs Field names, or the need to use Table IDs instead of names.

Therefore to upsert data, you might need 2-3 API calls in advance just to map
the Table and Field IDs to names.

How this addresses that need:
* Creates new QBClient class
* QBClient has method for making API calls with authorization
* QBClient caches API calls when the call signature is identical
* QBClient has convenience methods for common API calls like getting
Table or Field information
* QBClient has some methods to map data
* QBClient will be the workhorse of most Load Tasks that get built

Side effects of this change:
* None

Relevant ticket(s):
* https://mitlibraries.atlassian.net/browse/HRQB-12
  • Loading branch information
ghukill committed May 2, 2024
1 parent c4216ba commit 06acc1a
Show file tree
Hide file tree
Showing 13 changed files with 776 additions and 65 deletions.
4 changes: 4 additions & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ oracledb = "*"
luigi = "*"
pandas = "*"
pandas-stubs = "*"
attrs = "*"
requests = "*"
types-requests = "*"

[dev-packages]
black = "*"
Expand All @@ -19,6 +22,7 @@ mypy = "*"
pre-commit = "*"
pytest = "*"
ruff = "*"
requests-mock = "*"

[requires]
python_version = "3.11"
Expand Down
269 changes: 205 additions & 64 deletions Pipfile.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ See additional diagrams and documentation in the [docs](docs) folder:
SENTRY_DSN=# If set to a valid Sentry DSN, enables Sentry exception monitoring. This is not needed for local development.
WORKSPACE=# Set to `dev` for local development, this will be set to `stage` and `prod` in those environments by Terraform.
LUIGI_CONFIG_PATH=hrqb/luigi.cfg # this env var must be set, pointing to config file in hrqb folder
QUICKBASE_API_TOKEN=# Quickbase API token
```

### Optional
Expand Down
29 changes: 29 additions & 0 deletions hrqb/base/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from hrqb.base import PandasPickleTarget, QuickbaseTableTarget
from hrqb.utils import today_date
from hrqb.utils.quickbase import QBClient


class HRQBTask(luigi.Task):
Expand Down Expand Up @@ -65,6 +66,34 @@ def target(self) -> QuickbaseTableTarget:
def output(self) -> QuickbaseTableTarget:
return self.target()

def get_records(self) -> list[dict]:
"""Get Records data that will be upserted to Quickbase.
This method may be overridden if necessary if a load Task requires more complex
behavior than a straight conversion of the parent's DataFrame to a dictionary.
"""
return self.input_pandas_dataframe.to_dict(orient="records")

def run(self) -> None:
"""Retrieve data from parent Task and upsert to Quickbase table.
Because Load Tasks (upserting data to Quickbase) are so uniform, this run method
can be defined on this base class. All data required for this operation exists
on the Task: data from parent Transform class and QB table name.
"""
records = self.get_records()

qbclient = QBClient()
table_id = qbclient.get_table_id(self.table_name)
upsert_payload = qbclient.prepare_upsert_payload(
table_id,
records,
merge_field=None,
)
results = qbclient.upsert_records(upsert_payload)

self.target().write(results)


class HRQBPipelineTask(luigi.WrapperTask):
date = luigi.DateParameter(default=today_date())
2 changes: 2 additions & 0 deletions hrqb/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ class Config:
"WORKSPACE",
"SENTRY_DSN",
"LUIGI_CONFIG_PATH",
"QUICKBASE_API_TOKEN",
"QUICKBASE_APP_ID",
)
OPTIONAL_ENV_VARS = ("DYLD_LIBRARY_PATH",)

Expand Down
5 changes: 5 additions & 0 deletions hrqb/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""hrqb.exceptions"""


class QBFieldNotFoundError(ValueError):
pass
144 changes: 144 additions & 0 deletions hrqb/utils/quickbase.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
"""utils.quickbase"""

import json
import logging
from collections.abc import Callable

import pandas as pd
import requests
from attrs import define, field

from hrqb.config import Config
from hrqb.exceptions import QBFieldNotFoundError

logger = logging.getLogger(__name__)

RequestsMethod = Callable[..., requests.Response]


@define
class QBClient:
api_base: str = field(default="https://api.quickbase.com/v1")
cache_results: bool = field(default=True)
_cache: dict = field(factory=dict, repr=False)

@property
def request_headers(self) -> dict:
return {
"Authorization": f"QB-USER-TOKEN {Config().QUICKBASE_API_TOKEN}",
"QB-Realm-Hostname": "mit.quickbase.com",
}

@property
def app_id(self) -> str:
return Config().QUICKBASE_APP_ID

def make_request(
self, requests_method: RequestsMethod, path: str, **kwargs: dict
) -> dict:
"""Make an API request to Quickbase API.
This method caches request responses, such that data from informational requests
may be reused in later operations.
"""
# hash the request to cache the response
request_hash = (path, json.dumps(kwargs, sort_keys=True))
if self.cache_results and request_hash in self._cache:
message = f"Using cached result for path: {path}"
logger.debug(message)
return self._cache[request_hash]

# make API call
results = requests_method(
f"{self.api_base}/{path.removeprefix('/')}",
headers=self.request_headers,
**kwargs,
).json()
if self.cache_results:
self._cache[request_hash] = results

return results

def get_app_info(self) -> dict:
"""Retrieve information about the QB app.
https://developer.quickbase.com/operation/getApp
"""
return self.make_request(requests.get, f"apps/{self.app_id}")

def get_tables(self) -> pd.DataFrame:
"""Get all QB Tables as a Dataframe.
https://developer.quickbase.com/operation/getAppTables
"""
tables = self.make_request(requests.get, f"tables?appId={self.app_id}")
return pd.DataFrame(tables)

def get_table_id(self, name: str) -> str:
"""Get Table ID from Dataframe of Tables."""
tables_df = self.get_tables()
return tables_df[tables_df.name == name].iloc[0].id

def get_table_fields(self, table_id: str) -> pd.DataFrame:
"""Get all QB Table Fields as a Dataframe.
https://developer.quickbase.com/operation/getFields
"""
fields = self.make_request(requests.get, f"fields?tableId={table_id}")
return pd.DataFrame(fields)

def get_table_fields_name_to_id(self, table_id: str) -> dict:
"""Get Field name-to-id map for a Table.
This method is particularly helpful for upserting data via the QB API, where
Field IDs are required instead of Field names.
"""
fields_df = self.get_table_fields(table_id)
return {f["label"]: f["id"] for _, f in fields_df.iterrows()}

def upsert_records(self, upsert_payload: dict) -> dict:
"""Upsert Records into a Table.
https://developer.quickbase.com/operation/upsert
"""
return self.make_request(requests.post, "records", json=upsert_payload)

def prepare_upsert_payload(
self,
table_id: str,
records: list[dict],
merge_field: str | None = None,
) -> dict:
"""Prepare an API payload for upsert.
https://developer.quickbase.com/operation/upsert
This method expects a list of dictionaries, one dictionary per record, with a
{Field Name:Value} structure. This method will first retrieve a mapping of
Field name-to-ID, then remap the data to a {Field ID:Value} structure.
Then, return a dictionary payload suitable for the QB upsert API call.
"""
field_map = self.get_table_fields_name_to_id(table_id)
mapped_records = []
for record in records:
mapped_record = {}
for field_name, field_value in record.items():
if field_id := field_map.get(field_name):
mapped_record[str(field_id)] = {"value": field_value}
else:
message = (
f"Field name '{field_name}' not found for Table ID '{table_id}'"
)
raise QBFieldNotFoundError(message)
mapped_records.append(mapped_record)

upsert_payload = {
"to": table_id,
"data": mapped_records,
"fieldsToReturn": list(field_map.values()),
}
if merge_field:
upsert_payload["mergeFieldId"] = field_map[merge_field]

return upsert_payload
110 changes: 109 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
# ruff: noqa: N802, N803

import json

import pandas as pd
import pytest
import requests_mock
from click.testing import CliRunner

from hrqb.base.task import PandasPickleTask, QuickbaseUpsertTask
from hrqb.base import QuickbaseTableTarget
from hrqb.base.task import PandasPickleTarget, PandasPickleTask, QuickbaseUpsertTask
from hrqb.utils.quickbase import QBClient


@pytest.fixture(autouse=True)
def _test_env(monkeypatch):
monkeypatch.setenv("SENTRY_DSN", "None")
monkeypatch.setenv("WORKSPACE", "test")
monkeypatch.setenv("LUIGI_CONFIG_PATH", "hrqb/luigi.cfg")
monkeypatch.setenv("QUICKBASE_API_TOKEN", "qb-api-acb123")
monkeypatch.setenv("QUICKBASE_APP_ID", "qb-app-def456")


@pytest.fixture
Expand Down Expand Up @@ -118,3 +127,102 @@ def requires(self):
return [complete_first_pandas_series_task]

return SecondTask(path=f"{tmpdir}/bar.pickle", table_name="bar")


@pytest.fixture
def qbclient():
return QBClient()


@pytest.fixture(scope="session", autouse=True)
def global_requests_mock():
with requests_mock.Mocker() as m:
yield m


@pytest.fixture
def mocked_qb_api_getApp(qbclient, global_requests_mock):
url = f"{qbclient.api_base}/apps/{qbclient.app_id}"
with open("tests/fixtures/qb_api_responses/getApp.json") as f:
api_response = json.load(f)
global_requests_mock.get(url, json=api_response)
return api_response


@pytest.fixture
def mocked_qb_api_getAppTables(qbclient, global_requests_mock):
url = f"{qbclient.api_base}/tables?appId={qbclient.app_id}"
with open("tests/fixtures/qb_api_responses/getAppTables.json") as f:
api_response = json.load(f)
global_requests_mock.get(url, json=api_response)
return api_response


@pytest.fixture
def mocked_table_id():
return "bpqe82s1"


@pytest.fixture
def mocked_table_name():
return "Example Table #0"


@pytest.fixture
def mocked_qb_api_getFields(qbclient, mocked_table_id, global_requests_mock):
url = f"{qbclient.api_base}/fields?tableId={mocked_table_id}"
with open("tests/fixtures/qb_api_responses/getFields.json") as f:
api_response = json.load(f)
global_requests_mock.get(url, json=api_response)
return api_response


@pytest.fixture
def mocked_upsert_data():
return [
{"Field1": "Green", "Numeric Field": 42},
{"Field1": "Red", "Numeric Field": 101},
{"Field1": "Blue", "Numeric Field": 999},
]


@pytest.fixture
def mocked_upsert_payload(
qbclient, mocked_table_id, mocked_upsert_data, mocked_qb_api_getFields
):
return qbclient.prepare_upsert_payload(mocked_table_id, mocked_upsert_data, None)


@pytest.fixture
def mocked_qb_api_upsert(
qbclient, mocked_table_id, mocked_upsert_payload, global_requests_mock
):
url = f"{qbclient.api_base}/records"
with open("tests/fixtures/qb_api_responses/upsert.json") as f:
api_response = json.load(f)
global_requests_mock.register_uri(
"POST",
url,
additional_matcher=lambda req: req.json() == mocked_upsert_payload,
json=api_response,
)
return api_response


@pytest.fixture
def mocked_transform_pandas_target(tmpdir, mocked_table_name, mocked_upsert_data):
target = PandasPickleTarget(
path=f"{tmpdir}/transform__example_table_0.pickle", table_name=mocked_table_name
)
target.write(pd.DataFrame(mocked_upsert_data))
return target


@pytest.fixture
def quickbase_load_task_with_parent_data(mocked_transform_pandas_target):
class LoadTaskWithData(QuickbaseUpsertTask):
@property
def single_input(self) -> PandasPickleTarget | QuickbaseTableTarget:
return mocked_transform_pandas_target

return LoadTaskWithData
19 changes: 19 additions & 0 deletions tests/fixtures/qb_api_responses/getApp.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"created": "2020-03-27T18:34:12Z",
"dateFormat": "MM-DD-YYYY",
"description": "My testing app",
"hasEveryoneOnTheInternet": true,
"id": "bpqe82s1",
"name": "Testing App",
"securityProperties": {
"allowClone": false,
"allowExport": false,
"enableAppTokens": false,
"hideFromPublic": false,
"mustBeRealmApproved": true,
"useIPFilter": true
},
"timeZone": "(UTC-08:00) Pacific Time (US & Canada)",
"updated": "2020-04-03T19:12:20Z",
"dataClassification": "Confidential"
}
Loading

0 comments on commit 06acc1a

Please sign in to comment.