From 2aefa393db10d3806293df3fef10e1b6e09117e7 Mon Sep 17 00:00:00 2001 From: pvanliefland Date: Sun, 9 May 2021 21:30:18 +0200 Subject: [PATCH] First implementation for analytics paging (see #21) --- Dockerfile | 14 +++++ dhis2/api.py | 61 ++++++++------------- dhis2/pager.py | 111 ++++++++++++++++++++++++++++++++++++++ docker-compose.yaml | 7 +++ tests/test_paging.py | 125 +++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 279 insertions(+), 39 deletions(-) create mode 100644 Dockerfile create mode 100644 dhis2/pager.py create mode 100644 docker-compose.yaml diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..e2e57f9 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,14 @@ +FROM python:3 + +RUN apt-get update +RUN pip install --upgrade pip +RUN pip install pipenv + +RUN mkdir /code +WORKDIR /code + +COPY Pipfile . +RUN pipenv install --dev + +ENTRYPOINT ["pipenv", "run"] +CMD ["tests"] \ No newline at end of file diff --git a/dhis2/api.py b/dhis2/api.py index e452fff..71a9f31 100644 --- a/dhis2/api.py +++ b/dhis2/api.py @@ -9,7 +9,6 @@ import codecs from contextlib import closing -from itertools import chain from typing import Union, Optional, Generator, List, Any, Iterator from urllib.parse import urlparse, urlunparse @@ -18,6 +17,7 @@ from csv import DictReader from .exceptions import ClientException, RequestException +from .pager import CollectionPager, PagerException, AnalyticsPager, Pager from .utils import load_json, partition_payload, search_auth_file, version_to_int @@ -384,45 +384,28 @@ def get_paged( :param merge: If true, return a list containing all pages instead of one page. Defaults to False. :return: generator OR a normal DHIS2 response dict, e.g. {"organisationUnits": [...]} """ - try: - if not isinstance(page_size, (str, int)) or int(page_size) < 1: - raise ValueError - except ValueError: - raise ClientException("page_size must be > 1") - - params = {} if not params else params - if "paging" in params: - raise ClientException( - "Can't set paging manually in `params` when using `get_paged`" - ) - params["pageSize"] = page_size # type: ignore - params["page"] = 1 # type: ignore - params["totalPages"] = True # type: ignore - collection = endpoint.split("/")[ - 0 - ] # only use e.g. events when submitting events/query as endpoint - - def page_generator() -> Generator[dict, dict, None]: - """Yield pages""" - page = self.get(endpoint=endpoint, file_type="json", params=params).json() - page_count = page["pager"]["pageCount"] - yield page - - while page["pager"]["page"] < page_count: - params["page"] += 1 # type: ignore - page = self.get( - endpoint=endpoint, file_type="json", params=params - ).json() - yield page - - if not merge: - return page_generator() - else: - data = [] - for p in page_generator(): - data.append(p[collection]) - return {collection: list(chain.from_iterable(data))} + try: + pager: Pager + if endpoint == "analytics": + pager = AnalyticsPager( + get=self.get, + endpoint=endpoint, + params=params, + page_size=page_size, + merge=merge, + ) + else: + pager = CollectionPager( + get=self.get, + endpoint=endpoint, + params=params, + page_size=page_size, + merge=merge, + ) + return pager.page() + except PagerException as e: + raise ClientException(str(e)) def get_sqlview( self, diff --git a/dhis2/pager.py b/dhis2/pager.py new file mode 100644 index 0000000..a15acec --- /dev/null +++ b/dhis2/pager.py @@ -0,0 +1,111 @@ +from itertools import chain +from typing import Union, List, Generator, Callable + + +class PagerException(Exception): + """Paging exceptions.""" + + +class Pager: + """Base pager class.""" + + def __init__( + self, + *, + get: Callable, + endpoint: str, + params: Union[dict, List[tuple]] = None, + page_size: Union[int, str] = 50, + merge: bool = False, + ): + try: + if not isinstance(page_size, (str, int)) or int(page_size) < 1: + raise ValueError + except ValueError: + raise PagerException("page_size must be > 1") + + params = {} if not params else params + if "paging" in params: + raise PagerException( + "Can't set paging manually in `params` when using `get_paged`" + ) + params["pageSize"] = page_size # type: ignore + params["page"] = 1 # type: ignore + params["totalPages"] = True # type: ignore + + self._get = get + self._endpoint = endpoint + self._params = params + self._merge = merge + + def page_generator(self) -> Generator[dict, dict, None]: + """This method should return a generator that allows page iteration.""" + + raise NotImplementedError("Each Pager class should implement page_generator()") + + def merge(self): + """This method should loop over the pages yielded by page_generator() and merge the results""" + + raise NotImplementedError("Each Pager class should implement merge()") + + def page(self) -> Union[Generator[dict, dict, None], dict]: + """Returns the paginated results taking the merge option into account""" + + if not self._merge: + return self.page_generator() + else: + return self.merge() + + +class CollectionPager(Pager): + """Pager class for regular DHIS2 collections (data elements, indicators, etc...)""" + + def page_generator(self) -> Generator[dict, dict, None]: + page = self._get( + endpoint=self._endpoint, file_type="json", params=self._params + ).json() + page_count = page["pager"]["pageCount"] + + yield page + + while page["pager"]["page"] < page_count: + self._params["page"] += 1 # type: ignore + page = self._get( + endpoint=self._endpoint, file_type="json", params=self._params + ).json() + yield page + + def merge(self): + collection = self._endpoint.split("/")[ + 0 + ] # only use e.g. events when submitting events/query as endpoint + data = [] + for p in self.page_generator(): + data.append(p[collection]) + return {collection: list(chain.from_iterable(data))} + + +class AnalyticsPager(Pager): + """Pager class for the analytics endpoint (data elements, indicators, etc...)""" + + def page_generator(self) -> Generator[dict, dict, None]: + page = self._get( + endpoint=self._endpoint, file_type="json", params=self._params + ).json() + page_count = page["metaData"]["pager"]["pageCount"] + + yield page + + while page["metaData"]["pager"]["page"] < page_count: + self._params["page"] += 1 # type: ignore + page = self._get( + endpoint=self._endpoint, file_type="json", params=self._params + ).json() + yield page + + def merge(self): + data = [] + for p in self.page_generator(): + data.append(p["rows"]) + + return {"rows": list(chain.from_iterable(data))} diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..649df18 --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,7 @@ +version: "3.8" + +services: + dhis2: + build: . + volumes: + - .:/code \ No newline at end of file diff --git a/tests/test_paging.py b/tests/test_paging.py index 0f3b059..1670a94 100644 --- a/tests/test_paging.py +++ b/tests/test_paging.py @@ -1,4 +1,5 @@ import uuid +from urllib.parse import urlencode import pytest import responses @@ -202,3 +203,127 @@ def test_paging_with_params(api): with pytest.raises(exceptions.ClientException): params = {"paging": False} api.get_paged("organisationUnits", params=params) + + +@responses.activate +def test_paging_analytics(api): + dx = "eTDtyyaSA7f;FbKK4ofIv5R" + pe = "2016Q1;2016Q2" + ou = "ImspTQPwCqd" + r_base = { + "headers": [ + {"name": "dx", "column": "Data", "meta": True, "type": "java.lang.String"}, + { + "name": "pe", + "column": "Period", + "meta": True, + "type": "java.lang.String", + }, + { + "name": "value", + "column": "Value", + "meta": False, + "type": "java.lang.Double", + }, + ], + "height": 2, + "metaData": { + "pe": ["2016Q1", "2016Q2"], + "ou": ["ImspTQPwCqd"], + "names": { + "2016Q1": "Jan to Mar 2016", + "2016Q2": "Apr to Jun 2016", + "FbKK4ofIv5R": "Measles Coverage <1 y", + "ImspTQPwCqd": "Sierra Leone", + "eTDtyyaSA7f": "Fully Immunized Coverage", + }, + "pager": { + "total": 4, + "pageSize": 2, + "pageCount": 2 + }, + }, + "width": 3, + } + + # first page + responses.add( + responses.GET, + "{}/analytics.json?{}&page={}&pageSize=2&totalPages=True".format( + API_URL, + urlencode( + [ + ("dimension", "dx:{}".format(dx)), + ("dimension", "pe:{}".format(pe)), + ("filter", "ou:{}".format(ou)), + ], + ), + 1, + ), + match_querystring=True, + json={ + **r_base, + "metaData": { + **r_base["metaData"], + "pager": { + **r_base["metaData"]["pager"], + "page": 1 + } + }, + "rows": [ + ["eTDtyyaSA7f", "2016Q2", "81.1"], + ["eTDtyyaSA7f", "2016Q1", "74.7"], + ], + + }, + status=200, + ) + + # Second page + responses.add( + responses.GET, + "{}/analytics.json?{}&page={}&pageSize=2&totalPages=True".format( + API_URL, + urlencode( + [ + ("dimension", "dx:{}".format(dx)), + ("dimension", "pe:{}".format(pe)), + ("filter", "ou:{}".format(ou)), + ], + ), + 2, + ), + match_querystring=True, + json={ + **r_base, + "metaData": { + **r_base["metaData"], + "pager": { + **r_base["metaData"]["pager"], + "page": 2 + } + }, + "rows": [ + ["FbKK4ofIv5R", "2016Q2", "88.9"], + ["FbKK4ofIv5R", "2016Q1", "84.0"], + ], + }, + status=200, + ) + + data = api.get_paged( + "analytics", + params={ + "dimension": [ + "dx:{}".format(dx), + "pe:{}".format(pe), + ], + "filter": [ + "ou:{}".format(ou), + ] + }, + merge=True, + page_size=2, + ) + assert len(data["rows"]) == 4 + assert len(responses.calls) == 2