Skip to content

Commit

Permalink
First implementation for analytics paging (see davidhuser#21)
Browse files Browse the repository at this point in the history
  • Loading branch information
pvanliefland committed May 9, 2021
1 parent 8890a2f commit 2aefa39
Show file tree
Hide file tree
Showing 5 changed files with 279 additions and 39 deletions.
14 changes: 14 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
FROM python:3

RUN apt-get update
RUN pip install --upgrade pip
RUN pip install pipenv

RUN mkdir /code
WORKDIR /code

COPY Pipfile .
RUN pipenv install --dev

ENTRYPOINT ["pipenv", "run"]
CMD ["tests"]
61 changes: 22 additions & 39 deletions dhis2/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import codecs
from contextlib import closing
from itertools import chain
from typing import Union, Optional, Generator, List, Any, Iterator

from urllib.parse import urlparse, urlunparse
Expand All @@ -18,6 +17,7 @@
from csv import DictReader

from .exceptions import ClientException, RequestException
from .pager import CollectionPager, PagerException, AnalyticsPager, Pager
from .utils import load_json, partition_payload, search_auth_file, version_to_int


Expand Down Expand Up @@ -384,45 +384,28 @@ def get_paged(
:param merge: If true, return a list containing all pages instead of one page. Defaults to False.
:return: generator OR a normal DHIS2 response dict, e.g. {"organisationUnits": [...]}
"""
try:
if not isinstance(page_size, (str, int)) or int(page_size) < 1:
raise ValueError
except ValueError:
raise ClientException("page_size must be > 1")

params = {} if not params else params
if "paging" in params:
raise ClientException(
"Can't set paging manually in `params` when using `get_paged`"
)
params["pageSize"] = page_size # type: ignore
params["page"] = 1 # type: ignore
params["totalPages"] = True # type: ignore

collection = endpoint.split("/")[
0
] # only use e.g. events when submitting events/query as endpoint

def page_generator() -> Generator[dict, dict, None]:
"""Yield pages"""
page = self.get(endpoint=endpoint, file_type="json", params=params).json()
page_count = page["pager"]["pageCount"]
yield page

while page["pager"]["page"] < page_count:
params["page"] += 1 # type: ignore
page = self.get(
endpoint=endpoint, file_type="json", params=params
).json()
yield page

if not merge:
return page_generator()
else:
data = []
for p in page_generator():
data.append(p[collection])
return {collection: list(chain.from_iterable(data))}
try:
pager: Pager
if endpoint == "analytics":

This comment has been minimized.

Copy link
@davidhuser

davidhuser May 10, 2021

is it true for all kinds of analytics, or just analytics/events/query ?

This comment has been minimized.

Copy link
@pvanliefland

pvanliefland May 10, 2021

Author Owner

I need to I still need to handle those. Ideally, I should gather a bunch of sample analytics queries and the associated responses and test all of them. Analytic queries look much more diverse than standard collection queries.

This comment has been minimized.

Copy link
@davidhuser

davidhuser May 18, 2021

agreed - I would scan the developer docs for any analytics to see what the available endpoints are and how they are structured in their responses. Unfortunately there's no /api/resources for analytics.

pager = AnalyticsPager(
get=self.get,
endpoint=endpoint,
params=params,
page_size=page_size,
merge=merge,
)
else:
pager = CollectionPager(
get=self.get,
endpoint=endpoint,
params=params,
page_size=page_size,
merge=merge,
)
return pager.page()
except PagerException as e:
raise ClientException(str(e))

def get_sqlview(
self,
Expand Down
111 changes: 111 additions & 0 deletions dhis2/pager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from itertools import chain
from typing import Union, List, Generator, Callable


class PagerException(Exception):

This comment has been minimized.

Copy link
@davidhuser

davidhuser May 10, 2021

Could the module specific exception be handled by, or inherited from, the classes present in exceptions.py ?

This comment has been minimized.

Copy link
@pvanliefland

pvanliefland May 10, 2021

Author Owner

TBH I created this exception class to avoid a circular import issue but I just figured out how to fix it.

"""Paging exceptions."""


class Pager:
"""Base pager class."""

def __init__(
self,
*,
get: Callable,
endpoint: str,
params: Union[dict, List[tuple]] = None,
page_size: Union[int, str] = 50,
merge: bool = False,
):
try:
if not isinstance(page_size, (str, int)) or int(page_size) < 1:
raise ValueError
except ValueError:
raise PagerException("page_size must be > 1")

params = {} if not params else params
if "paging" in params:
raise PagerException(
"Can't set paging manually in `params` when using `get_paged`"
)
params["pageSize"] = page_size # type: ignore
params["page"] = 1 # type: ignore
params["totalPages"] = True # type: ignore

self._get = get
self._endpoint = endpoint
self._params = params
self._merge = merge

def page_generator(self) -> Generator[dict, dict, None]:
"""This method should return a generator that allows page iteration."""

raise NotImplementedError("Each Pager class should implement page_generator()")

def merge(self):
"""This method should loop over the pages yielded by page_generator() and merge the results"""

raise NotImplementedError("Each Pager class should implement merge()")

def page(self) -> Union[Generator[dict, dict, None], dict]:
"""Returns the paginated results taking the merge option into account"""

if not self._merge:
return self.page_generator()
else:
return self.merge()


class CollectionPager(Pager):
"""Pager class for regular DHIS2 collections (data elements, indicators, etc...)"""

def page_generator(self) -> Generator[dict, dict, None]:
page = self._get(
endpoint=self._endpoint, file_type="json", params=self._params
).json()
page_count = page["pager"]["pageCount"]

yield page

while page["pager"]["page"] < page_count:
self._params["page"] += 1 # type: ignore
page = self._get(
endpoint=self._endpoint, file_type="json", params=self._params
).json()
yield page

def merge(self):
collection = self._endpoint.split("/")[
0
] # only use e.g. events when submitting events/query as endpoint
data = []
for p in self.page_generator():
data.append(p[collection])
return {collection: list(chain.from_iterable(data))}


class AnalyticsPager(Pager):
"""Pager class for the analytics endpoint (data elements, indicators, etc...)"""

def page_generator(self) -> Generator[dict, dict, None]:
page = self._get(
endpoint=self._endpoint, file_type="json", params=self._params
).json()
page_count = page["metaData"]["pager"]["pageCount"]

yield page

while page["metaData"]["pager"]["page"] < page_count:
self._params["page"] += 1 # type: ignore
page = self._get(
endpoint=self._endpoint, file_type="json", params=self._params
).json()
yield page

def merge(self):
data = []
for p in self.page_generator():
data.append(p["rows"])

return {"rows": list(chain.from_iterable(data))}
7 changes: 7 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
version: "3.8"

services:
dhis2:
build: .
volumes:
- .:/code
125 changes: 125 additions & 0 deletions tests/test_paging.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import uuid
from urllib.parse import urlencode

import pytest
import responses
Expand Down Expand Up @@ -202,3 +203,127 @@ def test_paging_with_params(api):
with pytest.raises(exceptions.ClientException):
params = {"paging": False}
api.get_paged("organisationUnits", params=params)


@responses.activate
def test_paging_analytics(api):
dx = "eTDtyyaSA7f;FbKK4ofIv5R"
pe = "2016Q1;2016Q2"
ou = "ImspTQPwCqd"
r_base = {
"headers": [
{"name": "dx", "column": "Data", "meta": True, "type": "java.lang.String"},
{
"name": "pe",
"column": "Period",
"meta": True,
"type": "java.lang.String",
},
{
"name": "value",
"column": "Value",
"meta": False,
"type": "java.lang.Double",
},
],
"height": 2,
"metaData": {
"pe": ["2016Q1", "2016Q2"],
"ou": ["ImspTQPwCqd"],
"names": {
"2016Q1": "Jan to Mar 2016",
"2016Q2": "Apr to Jun 2016",
"FbKK4ofIv5R": "Measles Coverage <1 y",
"ImspTQPwCqd": "Sierra Leone",
"eTDtyyaSA7f": "Fully Immunized Coverage",
},
"pager": {
"total": 4,
"pageSize": 2,
"pageCount": 2
},
},
"width": 3,
}

# first page
responses.add(
responses.GET,
"{}/analytics.json?{}&page={}&pageSize=2&totalPages=True".format(
API_URL,
urlencode(
[
("dimension", "dx:{}".format(dx)),
("dimension", "pe:{}".format(pe)),
("filter", "ou:{}".format(ou)),
],
),
1,
),
match_querystring=True,
json={
**r_base,
"metaData": {
**r_base["metaData"],
"pager": {
**r_base["metaData"]["pager"],
"page": 1
}
},
"rows": [
["eTDtyyaSA7f", "2016Q2", "81.1"],
["eTDtyyaSA7f", "2016Q1", "74.7"],
],

},
status=200,
)

# Second page
responses.add(
responses.GET,
"{}/analytics.json?{}&page={}&pageSize=2&totalPages=True".format(
API_URL,
urlencode(
[
("dimension", "dx:{}".format(dx)),
("dimension", "pe:{}".format(pe)),
("filter", "ou:{}".format(ou)),
],
),
2,
),
match_querystring=True,
json={
**r_base,
"metaData": {
**r_base["metaData"],
"pager": {
**r_base["metaData"]["pager"],
"page": 2
}
},
"rows": [
["FbKK4ofIv5R", "2016Q2", "88.9"],
["FbKK4ofIv5R", "2016Q1", "84.0"],
],
},
status=200,
)

data = api.get_paged(
"analytics",
params={
"dimension": [
"dx:{}".format(dx),
"pe:{}".format(pe),
],
"filter": [
"ou:{}".format(ou),
]
},
merge=True,
page_size=2,
)
assert len(data["rows"]) == 4

This comment has been minimized.

Copy link
@davidhuser

davidhuser May 10, 2021

Would be good to verify the non-paged, or static parts of the response(s) sch as metaData

This comment has been minimized.

Copy link
@pvanliefland

pvanliefland May 10, 2021

Author Owner

Indeed. And I think I might have to merge metaData as well, I'm not sure that every page contains the full meta data.

assert len(responses.calls) == 2

0 comments on commit 2aefa39

Please sign in to comment.