diff --git a/example/logging.conf b/example/logging.conf new file mode 100644 index 0000000..44f22f0 --- /dev/null +++ b/example/logging.conf @@ -0,0 +1,27 @@ +[loggers] +keys=root,featurehub_sdk + +[handlers] +keys=consoleHandler + +[formatters] +keys=simpleFormatter + +[logger_root] +level=DEBUG +handlers=consoleHandler + +[logger_featurehub_sdk] +level=DEBUG +handlers=consoleHandler +qualname=featurehub_sdk +propagate=0 + +[handler_consoleHandler] +class=StreamHandler +level=DEBUG +formatter=simpleFormatter +args=(sys.stdout,) + +[formatter_simpleFormatter] +format=%(asctime)s - %(name)s - %(levelname)s - %(message)s \ No newline at end of file diff --git a/example/main.py b/example/main.py index 3d82a0f..7c2d910 100755 --- a/example/main.py +++ b/example/main.py @@ -6,12 +6,14 @@ """ import os import asyncio +import logging.config from typing import List from flask import Flask, jsonify, abort, request, Response from flask_cors import CORS from featurehub_sdk.client_context import ClientContext +from featurehub_sdk.version import sdk_version from featurehub_sdk.featurehub_config import FeatureHubConfig app = None @@ -34,6 +36,8 @@ def to_dict(self): 'resolved': self.resolved} def create_app(config=None): + logging.config.fileConfig('logging.conf') + app = Flask(__name__) # See http://flask.pocoo.org/docs/latest/config/ @@ -44,15 +48,14 @@ def create_app(config=None): # https://flask-cors.readthedocs.io/en/latest/ CORS(app) + print(f"Using featurehub version {sdk_version}") # Create FeatureHub configuration - edge_url = "http://localhost:8085" # cnf.edge_url - client_eval_key = 'default/845717ab-357e-4ce6-953c-2cb139974f2d/Zmv6OWy9K76IqnfeglTwSJoHbAAqhf*rjXljuNvAtoPVM4tpPIn' # cnf.client_eval_key - # edge_url = "https://zjbisc.demo.featurehub.io" # cnf.edge_url - # client_eval_key = "default/9b71f803-da79-4c04-8081-e5c0176dda87/CtVlmUHirgPd9Qz92Y0IQauUMUv3Wb*4dacoo47oYp6hSFFjVkG" # cnf.client_eval_key + edge_url = os.environ.get("FEATUREHUB_EDGE_URL", "http://localhost:8085") # cnf.edge_url + client_eval_key = os.environ.get("FEATUREHUB_CLIENT_API_KEY", "default/845717ab-357e-4ce6-953c-2cb139974f2d/Zmv6OWy9K76IqnfeglTwSJoHbAAqhf*rjXljuNvAtoPVM4tpPIn") # cnf.client_eval_key fh_config = FeatureHubConfig(edge_url, [client_eval_key]) # to use polling - # fh_config.use_polling_edge_service() + fh_config.use_polling_edge_service() # it takes a parameter uses the environment variable FEATUREHUB_POLL_INTERVAL if set print("starting featurehub") diff --git a/featurehub_sdk/CHANGELOG.adoc b/featurehub_sdk/CHANGELOG.adoc new file mode 100644 index 0000000..e9b60aa --- /dev/null +++ b/featurehub_sdk/CHANGELOG.adoc @@ -0,0 +1,4 @@ += Changelog + +* 1.0.0 - added in support for server based poll interval changes, stale environments, consistent logging name (now "featurehub_sdk"), updated example to match. Production grade SDK. +* 0.0.2 - initial release diff --git a/featurehub_sdk/featurehub_config.py b/featurehub_sdk/featurehub_config.py index 592c267..1f12ce8 100644 --- a/featurehub_sdk/featurehub_config.py +++ b/featurehub_sdk/featurehub_config.py @@ -3,7 +3,7 @@ import typing import os import logging -import sys + from featurehub_sdk.client_context import ClientContext, ClientEvalFeatureContext, ServerEvalFeatureContext, \ InternalFeatureRepository from featurehub_sdk.edge_service import EdgeService @@ -12,8 +12,7 @@ from featurehub_sdk.polling_edge_service import PollingEdgeService from featurehub_sdk.streaming_edge_service import StreamingEdgeClient -log = logging.getLogger(sys.modules[__name__].__name__) - +log = logging.getLogger('featurehub_sdk') class FeatureHubConfig: _api_keys: List[str] diff --git a/featurehub_sdk/polling_edge_service.py b/featurehub_sdk/polling_edge_service.py index 7c0b308..f7de596 100644 --- a/featurehub_sdk/polling_edge_service.py +++ b/featurehub_sdk/polling_edge_service.py @@ -1,18 +1,18 @@ from typing import Optional +import re import urllib3 import threading import logging -import sys import json import asyncio +from hashlib import sha256 from typing import List from featurehub_sdk.edge_service import EdgeService from featurehub_sdk.featurehub_repository import FeatureHubRepository from featurehub_sdk.version import sdk_version -log = logging.getLogger(sys.modules[__name__].__name__) - +log = logging.getLogger('featurehub_sdk') class PollingEdgeService(EdgeService): _interval: int @@ -20,7 +20,11 @@ class PollingEdgeService(EdgeService): _cancel: bool _thread: Optional[threading.Timer] _client_eval: bool + _stopped: bool _http: urllib3.PoolManager + _cache_control_pattern: re.Pattern + _sha_context: Optional[str] + _etag: Optional[str] def __init__(self, edge_url: str, api_keys: List[str], repository: FeatureHubRepository, @@ -28,10 +32,14 @@ def __init__(self, edge_url: str, api_keys: List[str], self._interval = interval self._repository = repository self._cancel = False + self._stopped = False self._thread = None self._client_eval = '*' in api_keys[0] self._context = None + self._etag = None + self._sha_context = None self._http = urllib3.PoolManager() + self._cache_control_pattern = re.compile('max-age=(\\d+)') self._url = f"{edge_url}features?" + "&".join(map(lambda i: 'apiKey=' + i, api_keys)) log.debug(f"polling url {self._url}") @@ -47,13 +55,36 @@ def update_interval(self, interval: int): # this does the business, calls the remote service and gets the features back async def _get_updates(self): # TODO: set timeout of tcp requests to 12 seconds, or give users control over it using environ vars - url = self._url if self._context is None else f"{self._url}&{self._context}" - log.log(5, "polling ", url) - resp = self._http.request(method='GET', url=url, headers={'X-SDK': 'Python', 'X-SDK-Version': sdk_version}) - log.log(5, "polling status", resp.status) + sha_context = "0" if self._sha_context is None else self._sha_context + url = f"{self._url}&contextSha={sha_context}" + + log.debug("polling %s", url) + headers = { + 'X-SDK': 'Python', + 'X-SDK-Version': sdk_version + } + + if self._etag: + headers['if-none-match'] = self._etag + + if self._context: + headers['x-featurehub'] = self._context + + resp = self._http.request(method='GET', url=url, headers=headers) + log.debug("polling status %s", resp.status) + + if resp.status == 200 or resp.status == 236: + if 'etag' in resp.headers: + self._etag = resp.headers['etag'] + + if 'cache-control' in resp.headers: + self._cache_control_polling_interval(resp.headers['cache-control']) - if resp.status == 200: self._process_successful_results(json.loads(resp.data.decode('utf-8'))) + + # if it is a 236, we have been told to stop + if resp.status == 236: + self._stopped = True elif resp.status == 404: # no such key self._repository.notify("failed", None) self._cancel = True @@ -63,12 +94,19 @@ async def _get_updates(self): return # otherwise its likely a transient failure, so keep trying + def _cache_control_polling_interval(self, cache_control: str): + max_age = re.findall(self._cache_control_pattern, cache_control) + if max_age: # not none and not empty + new_interval = int(max_age[0]) + if new_interval > 0: + self._interval = new_interval + # this is essentially a repeating task because it "calls itself" # another way to do this is with a separate class that is itself a thread descendant # which waits for the requisite time, then triggers a callback and then essentially does the same thing # if we need a repeating task elsewhere, we should consider refactoring this async def poll_with_interval(self): - if not self._cancel: + if not self._cancel and not self._stopped: await self._get_updates() if not self._cancel and self._interval > 0: self._thread = threading.Timer(self._interval, self.poll_again) @@ -97,6 +135,7 @@ def close(self): async def context_change(self, header: str): old_context = self._context self._context = header + self._sha_context = sha256(header.encode('utf-8')).hexdigest() if old_context != header: await self._get_updates() @@ -104,10 +143,18 @@ async def context_change(self, header: str): def cancelled(self): return self._cancel + @property + def stopped(self): + return self._stopped + + @property + def interval(self): + return self._interval + # we get returned a bunch of environments for a GET/Poll API so we need to cycle through them # the result is different for streaming def _process_successful_results(self, data): - log.log(5, "featurehub polling data was %s", data) + log.debug("featurehub polling data was %s", data) for feature_apikey in data: if feature_apikey: self._repository.notify("features", feature_apikey['features']) diff --git a/featurehub_sdk/streaming_edge_service.py b/featurehub_sdk/streaming_edge_service.py index bee262a..8bb07d5 100644 --- a/featurehub_sdk/streaming_edge_service.py +++ b/featurehub_sdk/streaming_edge_service.py @@ -12,14 +12,14 @@ from featurehub_sdk.edge_service import EdgeService from featurehub_sdk.version import sdk_version -log = logging.getLogger(sys.modules[__name__].__name__) - +log = logging.getLogger('featurehub_sdk') class _StreamingThread(threading.Thread): _cancel: bool _http: urllib3.PoolManager _client: Optional[sseclient.SSEClient] = None _repository: InternalFeatureRepository + _stopped: bool _url: str def __init__(self, edge_url: str, api_keys: List[str], @@ -28,15 +28,16 @@ def __init__(self, edge_url: str, api_keys: List[str], self._url = f"{edge_url}features/{api_keys[0]}" self._repository = repository self._cancel = False + self._stopped = False self._http = urllib3.PoolManager() def run(self): headers = {'Accept': 'text/event-stream', 'X-SDK': 'Python', 'X-SDK-Version': sdk_version} # headers = {'Accept': 'text/event-stream'} last_event_id = None - while not self._cancel: + while not self._cancel and not self._stopped: try: - log.log(5, "featurehub starting request: %s", self._url) + log.debug("featurehub starting request: %s", self._url) if last_event_id is not None: headers['Last-Event-Id'] = last_event_id resp = self._http.request('GET', self._url, preload_content=False, headers=headers) @@ -44,14 +45,26 @@ def run(self): self._client = sseclient.SSEClient(resp) for event in self._client.events(): last_event_id = event.id - log.log(5, "received data %s: %s", event.event, event.data) - self._repository.notify(event.event, self._check_data(event.data)) + log.debug("received data %s: %s", event.event, event.data) + + if event.event == 'config': + self._process_config(event.data) + else: + self._repository.notify(event.event, self._check_data(event.data)) + if self._cancel: self._client.close() elif resp.status == 404: + log.error("key provided for featurehub is invalid") self._cancel = True - except (ValueError, urllib3.exceptions.ProtocolError): - pass + except (ValueError, urllib3.exceptions.ProtocolError) as err: + log.error("failed to communicate with featurehub", err) + + def _process_config(self, data): + payload = json.loads(data) + if payload['edge.stale'] is not None: + log.warning("environment is stale, stopped requesting updates") + self._stopped = True def _check_data(self, data): if data and (data.startswith('{') or data.startswith('[')): @@ -59,6 +72,10 @@ def _check_data(self, data): return data + @property + def stopped(self): + return self._stopped + def cancel(self): self._cancel = True @@ -91,6 +108,10 @@ def close(self): def client_evaluated(self): return self._client_evaluated + @property + def stopped(self): + return self._streaming_thread.stopped + # not supported async def context_change(self, header: str): pass \ No newline at end of file diff --git a/featurehub_sdk/test/test_polling_edge.py b/featurehub_sdk/test/test_polling_edge.py index 5c145b7..c61a6b8 100644 --- a/featurehub_sdk/test/test_polling_edge.py +++ b/featurehub_sdk/test/test_polling_edge.py @@ -7,6 +7,9 @@ import asyncio import unittest +from featurehub_sdk.version import sdk_version + + class PollingEdgeServiceTest(TestCase): def test_success_base_case(self): with patch("urllib3.PoolManager") as http_class_mock: @@ -15,6 +18,7 @@ def test_success_base_case(self): data_mock.decode.return_value = '[{"features":[{"a":1}]}]' resp = MagicMock(name="http-response") resp.status = 200 + resp.headers = {} resp.data = data_mock http_mock.request.return_value = resp repo = MagicMock(spec=FeatureHubRepository) @@ -24,6 +28,53 @@ def test_success_base_case(self): repo.notify.assert_called_with('features', [{'a': 1}]) self.assertFalse(poller.cancelled) + def _data_mock(self): + data_mock = MagicMock(name='data-mock') + data_mock.decode.return_value = '{}' + return data_mock + + def test_success_has_cache_control(self): + with patch("urllib3.PoolManager") as http_class_mock: + http_mock = http_class_mock.return_value + resp = MagicMock(name="http-response") + resp.status = 200 + resp.headers = {'cache-control': 'private, max-age=20', 'etag': 'abcde'} + resp.data = self._data_mock() + http_mock.request.return_value = resp + repo = MagicMock(spec=FeatureHubRepository) + + poller = PollingEdgeService('http://localhost/', ['123'], repo, 0) + asyncio.run(poller.poll()) + self.assertEqual(poller.interval, 20) + resp.headers = {'cache-control': 'private, max-age=16'} + resp.status = 236 + asyncio.run(poller.poll()) + http_mock.request.assert_called_with(method='GET', url='http://localhost/features?apiKey=123&contextSha=0', + headers={'X-SDK': 'Python', + 'X-SDK-Version': sdk_version, + 'if-none-match': 'abcde' + }) + self.assertEqual(poller.interval, 16) + self.assertTrue(poller.stopped) + + def test_poller_uses_server_eval_key(self): + with patch("urllib3.PoolManager") as http_class_mock: + http_mock = http_class_mock.return_value + resp = MagicMock(name="http-response") + resp.status = 200 + resp.headers = {} + resp.data = self._data_mock() + http_mock.request.return_value = resp + repo = MagicMock(spec=FeatureHubRepository) + + poller = PollingEdgeService('http://localhost/', ['123'], repo, 0) + asyncio.run(poller.context_change('1234')) + http_mock.request.assert_called_with(method='GET', url='http://localhost/features?apiKey=123&contextSha=03ac674216f3e15c761ee1a5e255f067953623c8b388b4459e13f978d7c846f4', + headers={'X-SDK': 'Python', + 'X-SDK-Version': sdk_version, + 'x-featurehub': '1234' + }) + def test_with_failure(self): with patch("urllib3.PoolManager") as http_class_mock: http_mock = http_class_mock.return_value @@ -48,6 +99,24 @@ def test_with_dacha_not_ready(self): repo.notify.assert_not_called() self.assertFalse(poller.cancelled) + def test_with_environment_stale(self): + with patch("urllib3.PoolManager") as http_class_mock: + http_mock = http_class_mock.return_value + resp = MagicMock(name="http-response") + resp.status = 236 + resp.headers = {} + data_mock = MagicMock(name='data-mock') + data_mock.decode.return_value = '[{"features":[{"a":1}]}]' + + resp.data = data_mock + http_mock.request.return_value = resp + repo = MagicMock(spec=FeatureHubRepository) + + poller = PollingEdgeService('http://localhost', ['123'], repo, 0) + asyncio.run(poller.poll()) + repo.notify.assert_called_with('features', [{'a': 1}]) + self.assertFalse(poller.cancelled) + self.assertTrue(poller.stopped) if __name__ == '__main__': unittest.main() \ No newline at end of file diff --git a/featurehub_sdk/version.py b/featurehub_sdk/version.py index 8a94b7c..09c61f8 100644 --- a/featurehub_sdk/version.py +++ b/featurehub_sdk/version.py @@ -1 +1 @@ -sdk_version="0.0.2" +sdk_version="1.0.0"