Skip to content

Commit

Permalink
Add decorator syntax for defining subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
ruscoder committed Nov 4, 2024
1 parent 2d0b52b commit 0eb2ba4
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 72 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
__pycache__/
dist/
.history/
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
- `setup_tbs` as the public interface
- underscore keys in the subscription definitions
- non-managed subscriptions by default
- new way of subscription definition
- Support for non-managed subscriptions
- Support for optional token auth for webhook
- Support for both `id-only`/`full-resource`
Expand Down
106 changes: 77 additions & 29 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
# fhir-tbs-py

Topic-based subscription extension for python aiohttp web applications
Topic-based subscription extension for python aiohttp web applications.

**Features**:
- Automatically created webhook aiohttp handlers based on definitions
- Unified R4B/R5 API for automatic registration (managed subscriptions)
- Optional managed subscriptions
- Optional authentication using X-Api-Key
- Optional authentication using `X-Api-Key`
- `id-only`/`full-resource` support

## Install
Expand All @@ -14,23 +15,63 @@ Install `fhir-tbs[r4b]` or `fhir-tbs[r5]` using poetry/pipenv/pip.

## Usage

1. Define subscriptions using `fhir_tbs.SubscriptionDefinition` type.
2. Instantiate R4BTBS/R5TBS class with predefined subscriptions.
- `tbs = R4BTBS(subscriptions=subscriptions)`
- `subscription_defaults` can be optionally passed to define defaults for managed subscriptions
- `payload_content` - `id-only`/`full-resource` (default is `id-only`)
- `timeout` - default is 60
- `heartbeat_period` - default is 20
3. Invoke `setup_tbs(app, tbs, webhook_path_prefix="webhook")` on app initialization passing needed parameters:
- The package supports managed and non-managed subscriptions through `manage_subscriptions` flag (default is False).
Managed subscriptions requires `app_url` and `get_fhir_client` args to be set.
- Also in the future the package will be responsible for handling delivery errors, in that case
`handle_delivery_errors` should be set to `True` and it also requires `app_url` and `get_fhir_client` args to be set.
- Specify `webhook_token` for required auth token that should be passed via `X-Api-Key` header
1. Instantiate R4BTBS/R5TBS class with optionally passing predefined subscriptions using `subscriptions` arg and `subscription_defaults` with default subscription parameters (e.g. `payload_content`, `timeout` or `heartbeat_period`):
```python
tbs = R4BTBS(subscription_defaults={"payload_content": "full-resource"})
```
2. Define subscriptions using decorator `tbs.define`:
```python
@tbs.define(
topic="https://example.com/SubscriptionTopic/new-appointment-event",
filter_by=[
{
"resource_type": "Appointment",
"filter_parameter": "status",
"value": "booked"
}
],
webhook_id="new-appointment"
)
async def new_appointment_handler(
app: web.Application,
reference: str,
_included_resources: list[r4b.AnyResource],
_timestamp: str | None,
) -> None:
logging.info("New appointment %s", reference)
```
3. Invoke `setup_tbs` on app initialization passing needed parameters (see specification below):
```python
setup_tbs(app, tbs, webhook_path_prefix="webhook")
```

### Specification

TBD
**fhir_tbs.r4b.R4BTBS**/**fhir_tbs.r5.R5TBS**
- subscriptions (*list[fhir_tbs.SubscriptionDefinitionWithHandler]*, optional) - predefined list of subscriptions.
- subscription_defaults (optional) - default parameters for all subscription definitions.
- payload_content (*str*, optional): `id-only`/`full-resource` (default is `id-only`)
- timeout (*int*, optional): default is `60`
- heartbeat_period (*int*, optional): default is `20`


**tbs_instance.define**
- topic (*str*): URL of SubscriptionTopic to subscribe.
- webhook_id (optional): Optional webhook id that will be part of webhook URL.
- filter_by (*list[FilterBy]*, optional): Optional list of filters applied to topic.
- payload_content (*str*, optional): `id-only`/`full-resource` (default is `id-only`)
- timeout (*int*, optional): default is `60`
- heartbeat_period (*int*, optional): default is `20`

**setup_tbs**
- app (*web.Application*): aiohttp application.
- tbs (*R4BTBS*/*R5TBS*): TBS class instance.
- webhook_path_prefix (*str*): Prefix for the generated aiohttp routes.
- webhook_token (*str*, optional): The authorization token that is checked in X-Api-Token header.
- manage_subscriptions (*bool*, optional): The flag that indicates whether subscription registration/population should be enabled.
- handle_delivery_errors (*bool*, optional): WIP The flag that indicated whether subscription delivery errors (e.g. broken connection or missing events) should be handled.
- app_url (*str*, optional): Application url that is used when `manage_subscriptions`/`handle_delivery_errors` are set.
- get_fhir_client (*Callable[[web.Application], AsyncFHIRClient]*, optional): Getter for web.Application that returns AsyncFHIRClient that further used when `manage_subscriptions`/`handle_delivery_errors` are set.

### Examples

Expand All @@ -43,7 +84,7 @@ import logging

from fhirpy import AsyncFHIRClient
import fhirpy_types_r4b as r4b
from fhir_tbs import SubscriptionDefinition
from fhir_tbs import SubscriptionDefinitionWithHandler
from fhir_tbs.r4b import R4BTBS
from aiohttp import web

Expand All @@ -69,18 +110,26 @@ async def new_appointment_sub(
logging.info("New appointment %s", appointment.model_dump())


subscriptions: list[SubscriptionDefinition[r4b.AnyResource]] = [
{
"topic": "https://example.com/SubscriptionTopic/new-appointment-event",
"handler": new_appointment_sub,
"filter_by": [
{"resource_type": "Appointment", "filter_parameter": "status", "value": "booked"}
],
},
]

tbs = R4BTBS()

tbs = R4BTBS(subscriptions=subscriptions)
@tbs.define(
topic="https://example.com/SubscriptionTopic/new-appointment-event",
filter_by=[
{
"resource_type": "Appointment",
"filter_parameter": "status",
"value": "booked"
}
],
webhook_id="new-appointment"
)
async def new_appointment_handler(
app: web.Application,
reference: str,
_included_resources: list[r4b.AnyResource],
_timestamp: str | None,
) -> None:
logging.info("New appointment %s", reference)


def create_app() -> web.Application:
Expand Down Expand Up @@ -150,6 +199,5 @@ def create_app() -> web.Application:
app[fhir_client_key] = AsyncFHIRClient(...)

setup_external_tbs(app)

```

4 changes: 2 additions & 2 deletions fhir_tbs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from .implementation import setup_tbs
from .types import FilterBy, SubscriptionDefinition
from .types import FilterBy, SubscriptionDefinitionWithHandler

__title__ = "fhir-tbs-py"
__version__ = "1.0.0a0"
Expand All @@ -12,7 +12,7 @@


__all__ = [
"SubscriptionDefinition",
"SubscriptionDefinitionWithHandler",
"FilterBy",
"setup_tbs",
]
52 changes: 46 additions & 6 deletions fhir_tbs/implementation.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from abc import abstractmethod
from collections.abc import AsyncGenerator, Awaitable, Callable
from typing import Generic, Self
from typing import Generic, Self, Unpack, cast

from aiohttp import web
from fhirpy import AsyncFHIRClient
Expand All @@ -10,6 +10,7 @@
SubscriptionCommonDefinition,
SubscriptionDefinition,
SubscriptionDefinitionPrepared,
SubscriptionDefinitionWithHandler,
SubscriptionEvent,
SubscriptionHandler,
SubscriptionInfo,
Expand Down Expand Up @@ -59,18 +60,54 @@ def ctx(app: web.Application) -> AsyncGenerator[None, None]:


class AbstractTBS(Generic[SubscriptionType, AnyResourceType]):
subscriptions: list[SubscriptionDefinition[AnyResourceType]]
subscriptions: list[SubscriptionDefinitionWithHandler[AnyResourceType]]
subscription_defaults: SubscriptionCommonDefinition | None

def __init__(
self: Self,
*,
subscriptions: list[SubscriptionDefinition[AnyResourceType]] | None = None,
subscriptions: list[SubscriptionDefinitionWithHandler[AnyResourceType]] | None = None,
subscription_defaults: SubscriptionCommonDefinition | None = None,
) -> None:
"""Initialize TBS with initial subscriptions and default settings.
Args:
subscriptions: List of initial subscriptions.
subscription_defaults: Default settings for all subscriptions.
"""
self.subscriptions = subscriptions or []
self.subscription_defaults = subscription_defaults

def define(
self: Self, *, webhook_id: str | None = None, **kwargs: Unpack[SubscriptionDefinition]
) -> Callable[[SubscriptionHandler[AnyResourceType]], SubscriptionHandler[AnyResourceType]]:
"""Define subscription handler.
Args:
topic: URL of SubscriptionTopic to subscribe.
webhook_id: Optional webhook id that will be part of webhook URL.
filter_by: Optional list of filters applied to topic.
**kwargs: Additional subscription definition parameters.
"""

def wrapper(
handler: SubscriptionHandler[AnyResourceType],
) -> SubscriptionHandler[AnyResourceType]:
self.subscriptions.append(
cast(
SubscriptionDefinitionWithHandler,
{
**kwargs,
"handler": handler,
**({"webhook_id": webhook_id} if webhook_id else {}),
},
)
)

return handler

return wrapper

async def _ctx_factory( # noqa: PLR0913
self: Self,
app: web.Application,
Expand All @@ -84,7 +121,7 @@ async def _ctx_factory( # noqa: PLR0913
) -> AsyncGenerator[None, None]:
subscription_defaults = self.subscription_defaults or {}
for subscription in self.subscriptions:
subscription_prepared: SubscriptionDefinitionPrepared[AnyResourceType] = {
subscription_prepared: SubscriptionDefinitionPrepared = {
"payload_content": subscription.get(
"payload_content",
subscription_defaults.get("payload_content", "id-only"),
Expand All @@ -102,8 +139,11 @@ async def _ctx_factory( # noqa: PLR0913
webhook_id = subscription.get("webhook_id")
if not webhook_id:
if not manage_subscriptions:
raise TypeError("`webhook_id` should be set for non-managed subscriptions")
raise TypeError(
"`webhook_id` should be set for non-managed subscriptions in the definition"
)
webhook_id = f"{handler.__module__}.{handler.__name__}"

webhook_path_parts = [webhook_path_prefix.strip("/"), webhook_id]
webhook_path = "/".join(webhook_path_parts)

Expand Down Expand Up @@ -205,7 +245,7 @@ def build_subscription(
webhook_id: str,
webhook_url: str,
webhook_token: str | None,
subscription: SubscriptionDefinitionPrepared[AnyResourceType],
subscription: SubscriptionDefinitionPrepared,
) -> SubscriptionType: ...

@classmethod
Expand Down
28 changes: 15 additions & 13 deletions fhir_tbs/r4b.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from .implementation import AbstractTBS
from .types import (
FilterBy,
SubscriptionDefinitionPrepared,
SubscriptionEvent,
SubscriptionInfo,
Expand Down Expand Up @@ -99,7 +100,7 @@ def build_subscription(
webhook_id: str,
webhook_url: str,
webhook_token: str | None,
subscription: SubscriptionDefinitionPrepared[r4b.AnyResource],
subscription: SubscriptionDefinitionPrepared,
) -> r4b.Subscription:
return r4b.Subscription(
meta=r4b.Meta(
Expand Down Expand Up @@ -139,23 +140,27 @@ def build_subscription(
],
),
criteria=subscription["topic"],
criteria__ext=r4b.Element(
extension=[
r4b.Extension(
url="http://hl7.org/fhir/uv/subscriptions-backport/StructureDefinition/backport-filter-criteria",
valueString=_build_filter_criteria(subscription),
)
],
criteria__ext=(
r4b.Element(
extension=[
r4b.Extension(
url="http://hl7.org/fhir/uv/subscriptions-backport/StructureDefinition/backport-filter-criteria",
valueString=_build_filter_criteria(subscription["filter_by"]),
)
],
)
if "filter_by" in subscription
else None
),
)


def _build_filter_criteria(subscription: SubscriptionDefinitionPrepared[r4b.AnyResource]) -> str:
def _build_filter_criteria(filter_by: list[FilterBy]) -> str:
params: dict[str, Any] = {}

resource_type = None

for f in subscription["filter_by"]:
for f in filter_by:
if not resource_type:
resource_type = f["resource_type"]
elif resource_type != f["resource_type"]:
Expand All @@ -170,9 +175,6 @@ def _build_filter_criteria(subscription: SubscriptionDefinitionPrepared[r4b.AnyR

params[param_name] = param_value

if not resource_type:
raise TypeError("At least one filterBy is required for AidboxSubscription")

return f"{resource_type}?{encode_params(params)}"


Expand Down
34 changes: 20 additions & 14 deletions fhir_tbs/r5.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def build_subscription(
webhook_id: str,
webhook_url: str,
webhook_token: str | None,
subscription: SubscriptionDefinitionPrepared[r5.AnyResource],
subscription: SubscriptionDefinitionPrepared,
) -> r5.Subscription:
return r5.Subscription(
status="requested",
Expand All @@ -112,19 +112,25 @@ def build_subscription(
heartbeatPeriod=subscription["heartbeat_period"],
timeout=subscription["timeout"],
endpoint=webhook_url,
parameter=[r5.SubscriptionParameter(name="X-Api-Key", value=webhook_token)]
if webhook_token
else [],
filterBy=[
r5.SubscriptionFilterBy(
resourceType=filter_by["resource_type"],
filterParameter=filter_by["filter_parameter"],
comparator=filter_by.get("comparator"),
modifier=filter_by.get("modifier"),
value=filter_by["value"],
)
for filter_by in subscription["filter_by"]
],
parameter=(
[r5.SubscriptionParameter(name="X-Api-Key", value=webhook_token)]
if webhook_token
else []
),
filterBy=(
[
r5.SubscriptionFilterBy(
resourceType=filter_by["resource_type"],
filterParameter=filter_by["filter_parameter"],
comparator=filter_by.get("comparator"),
modifier=filter_by.get("modifier"),
value=filter_by["value"],
)
for filter_by in subscription["filter_by"]
]
if "filter_by" in subscription
else None
),
)


Expand Down
Loading

0 comments on commit 0eb2ba4

Please sign in to comment.