Skip to content

Commit

Permalink
[feature/sdk-compliance] See changelog (#18)
Browse files Browse the repository at this point in the history
Adds in support for cache control header override, better example,
stale environments, better logging
  • Loading branch information
rvowles authored Nov 5, 2022
1 parent a05da67 commit b025242
Show file tree
Hide file tree
Showing 8 changed files with 197 additions and 27 deletions.
27 changes: 27 additions & 0 deletions example/logging.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
[loggers]
keys=root,featurehub_sdk

[handlers]
keys=consoleHandler

[formatters]
keys=simpleFormatter

[logger_root]
level=DEBUG
handlers=consoleHandler

[logger_featurehub_sdk]
level=DEBUG
handlers=consoleHandler
qualname=featurehub_sdk
propagate=0

[handler_consoleHandler]
class=StreamHandler
level=DEBUG
formatter=simpleFormatter
args=(sys.stdout,)

[formatter_simpleFormatter]
format=%(asctime)s - %(name)s - %(levelname)s - %(message)s
13 changes: 8 additions & 5 deletions example/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
"""
import os
import asyncio
import logging.config
from typing import List

from flask import Flask, jsonify, abort, request, Response
from flask_cors import CORS

from featurehub_sdk.client_context import ClientContext
from featurehub_sdk.version import sdk_version
from featurehub_sdk.featurehub_config import FeatureHubConfig

app = None
Expand All @@ -34,6 +36,8 @@ def to_dict(self):
'resolved': self.resolved}

def create_app(config=None):
logging.config.fileConfig('logging.conf')

app = Flask(__name__)

# See http://flask.pocoo.org/docs/latest/config/
Expand All @@ -44,15 +48,14 @@ def create_app(config=None):
# https://flask-cors.readthedocs.io/en/latest/
CORS(app)

print(f"Using featurehub version {sdk_version}")
# Create FeatureHub configuration
edge_url = "http://localhost:8085" # cnf.edge_url
client_eval_key = 'default/845717ab-357e-4ce6-953c-2cb139974f2d/Zmv6OWy9K76IqnfeglTwSJoHbAAqhf*rjXljuNvAtoPVM4tpPIn' # cnf.client_eval_key
# edge_url = "https://zjbisc.demo.featurehub.io" # cnf.edge_url
# client_eval_key = "default/9b71f803-da79-4c04-8081-e5c0176dda87/CtVlmUHirgPd9Qz92Y0IQauUMUv3Wb*4dacoo47oYp6hSFFjVkG" # cnf.client_eval_key
edge_url = os.environ.get("FEATUREHUB_EDGE_URL", "http://localhost:8085") # cnf.edge_url
client_eval_key = os.environ.get("FEATUREHUB_CLIENT_API_KEY", "default/845717ab-357e-4ce6-953c-2cb139974f2d/Zmv6OWy9K76IqnfeglTwSJoHbAAqhf*rjXljuNvAtoPVM4tpPIn") # cnf.client_eval_key

fh_config = FeatureHubConfig(edge_url, [client_eval_key])
# to use polling
# fh_config.use_polling_edge_service()
fh_config.use_polling_edge_service()
# it takes a parameter uses the environment variable FEATUREHUB_POLL_INTERVAL if set

print("starting featurehub")
Expand Down
4 changes: 4 additions & 0 deletions featurehub_sdk/CHANGELOG.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
= Changelog

* 1.0.0 - added in support for server based poll interval changes, stale environments, consistent logging name (now "featurehub_sdk"), updated example to match. Production grade SDK.
* 0.0.2 - initial release
5 changes: 2 additions & 3 deletions featurehub_sdk/featurehub_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import typing
import os
import logging
import sys

from featurehub_sdk.client_context import ClientContext, ClientEvalFeatureContext, ServerEvalFeatureContext, \
InternalFeatureRepository
from featurehub_sdk.edge_service import EdgeService
Expand All @@ -12,8 +12,7 @@
from featurehub_sdk.polling_edge_service import PollingEdgeService
from featurehub_sdk.streaming_edge_service import StreamingEdgeClient

log = logging.getLogger(sys.modules[__name__].__name__)

log = logging.getLogger('featurehub_sdk')

class FeatureHubConfig:
_api_keys: List[str]
Expand Down
67 changes: 57 additions & 10 deletions featurehub_sdk/polling_edge_service.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,45 @@
from typing import Optional

import re
import urllib3
import threading
import logging
import sys
import json
import asyncio
from hashlib import sha256
from typing import List
from featurehub_sdk.edge_service import EdgeService
from featurehub_sdk.featurehub_repository import FeatureHubRepository
from featurehub_sdk.version import sdk_version

log = logging.getLogger(sys.modules[__name__].__name__)

log = logging.getLogger('featurehub_sdk')

class PollingEdgeService(EdgeService):
_interval: int
_repository: FeatureHubRepository
_cancel: bool
_thread: Optional[threading.Timer]
_client_eval: bool
_stopped: bool
_http: urllib3.PoolManager
_cache_control_pattern: re.Pattern
_sha_context: Optional[str]
_etag: Optional[str]

def __init__(self, edge_url: str, api_keys: List[str],
repository: FeatureHubRepository,
interval: int):
self._interval = interval
self._repository = repository
self._cancel = False
self._stopped = False
self._thread = None
self._client_eval = '*' in api_keys[0]
self._context = None
self._etag = None
self._sha_context = None
self._http = urllib3.PoolManager()
self._cache_control_pattern = re.compile('max-age=(\\d+)')

self._url = f"{edge_url}features?" + "&".join(map(lambda i: 'apiKey=' + i, api_keys))
log.debug(f"polling url {self._url}")
Expand All @@ -47,13 +55,36 @@ def update_interval(self, interval: int):
# this does the business, calls the remote service and gets the features back
async def _get_updates(self):
# TODO: set timeout of tcp requests to 12 seconds, or give users control over it using environ vars
url = self._url if self._context is None else f"{self._url}&{self._context}"
log.log(5, "polling ", url)
resp = self._http.request(method='GET', url=url, headers={'X-SDK': 'Python', 'X-SDK-Version': sdk_version})
log.log(5, "polling status", resp.status)
sha_context = "0" if self._sha_context is None else self._sha_context
url = f"{self._url}&contextSha={sha_context}"

log.debug("polling %s", url)
headers = {
'X-SDK': 'Python',
'X-SDK-Version': sdk_version
}

if self._etag:
headers['if-none-match'] = self._etag

if self._context:
headers['x-featurehub'] = self._context

resp = self._http.request(method='GET', url=url, headers=headers)
log.debug("polling status %s", resp.status)

if resp.status == 200 or resp.status == 236:
if 'etag' in resp.headers:
self._etag = resp.headers['etag']

if 'cache-control' in resp.headers:
self._cache_control_polling_interval(resp.headers['cache-control'])

if resp.status == 200:
self._process_successful_results(json.loads(resp.data.decode('utf-8')))

# if it is a 236, we have been told to stop
if resp.status == 236:
self._stopped = True
elif resp.status == 404: # no such key
self._repository.notify("failed", None)
self._cancel = True
Expand All @@ -63,12 +94,19 @@ async def _get_updates(self):
return
# otherwise its likely a transient failure, so keep trying

def _cache_control_polling_interval(self, cache_control: str):
max_age = re.findall(self._cache_control_pattern, cache_control)
if max_age: # not none and not empty
new_interval = int(max_age[0])
if new_interval > 0:
self._interval = new_interval

# this is essentially a repeating task because it "calls itself"
# another way to do this is with a separate class that is itself a thread descendant
# which waits for the requisite time, then triggers a callback and then essentially does the same thing
# if we need a repeating task elsewhere, we should consider refactoring this
async def poll_with_interval(self):
if not self._cancel:
if not self._cancel and not self._stopped:
await self._get_updates()
if not self._cancel and self._interval > 0:
self._thread = threading.Timer(self._interval, self.poll_again)
Expand Down Expand Up @@ -97,17 +135,26 @@ def close(self):
async def context_change(self, header: str):
old_context = self._context
self._context = header
self._sha_context = sha256(header.encode('utf-8')).hexdigest()
if old_context != header:
await self._get_updates()

@property
def cancelled(self):
return self._cancel

@property
def stopped(self):
return self._stopped

@property
def interval(self):
return self._interval

# we get returned a bunch of environments for a GET/Poll API so we need to cycle through them
# the result is different for streaming
def _process_successful_results(self, data):
log.log(5, "featurehub polling data was %s", data)
log.debug("featurehub polling data was %s", data)
for feature_apikey in data:
if feature_apikey:
self._repository.notify("features", feature_apikey['features'])
37 changes: 29 additions & 8 deletions featurehub_sdk/streaming_edge_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@
from featurehub_sdk.edge_service import EdgeService
from featurehub_sdk.version import sdk_version

log = logging.getLogger(sys.modules[__name__].__name__)

log = logging.getLogger('featurehub_sdk')

class _StreamingThread(threading.Thread):
_cancel: bool
_http: urllib3.PoolManager
_client: Optional[sseclient.SSEClient] = None
_repository: InternalFeatureRepository
_stopped: bool
_url: str

def __init__(self, edge_url: str, api_keys: List[str],
Expand All @@ -28,37 +28,54 @@ def __init__(self, edge_url: str, api_keys: List[str],
self._url = f"{edge_url}features/{api_keys[0]}"
self._repository = repository
self._cancel = False
self._stopped = False
self._http = urllib3.PoolManager()

def run(self):
headers = {'Accept': 'text/event-stream', 'X-SDK': 'Python', 'X-SDK-Version': sdk_version}
# headers = {'Accept': 'text/event-stream'}
last_event_id = None
while not self._cancel:
while not self._cancel and not self._stopped:
try:
log.log(5, "featurehub starting request: %s", self._url)
log.debug("featurehub starting request: %s", self._url)
if last_event_id is not None:
headers['Last-Event-Id'] = last_event_id
resp = self._http.request('GET', self._url, preload_content=False, headers=headers)
if resp.status == 200:
self._client = sseclient.SSEClient(resp)
for event in self._client.events():
last_event_id = event.id
log.log(5, "received data %s: %s", event.event, event.data)
self._repository.notify(event.event, self._check_data(event.data))
log.debug("received data %s: %s", event.event, event.data)

if event.event == 'config':
self._process_config(event.data)
else:
self._repository.notify(event.event, self._check_data(event.data))

if self._cancel:
self._client.close()
elif resp.status == 404:
log.error("key provided for featurehub is invalid")
self._cancel = True
except (ValueError, urllib3.exceptions.ProtocolError):
pass
except (ValueError, urllib3.exceptions.ProtocolError) as err:
log.error("failed to communicate with featurehub", err)

def _process_config(self, data):
payload = json.loads(data)
if payload['edge.stale'] is not None:
log.warning("environment is stale, stopped requesting updates")
self._stopped = True

def _check_data(self, data):
if data and (data.startswith('{') or data.startswith('[')):
return json.loads(data)

return data

@property
def stopped(self):
return self._stopped

def cancel(self):
self._cancel = True

Expand Down Expand Up @@ -91,6 +108,10 @@ def close(self):
def client_evaluated(self):
return self._client_evaluated

@property
def stopped(self):
return self._streaming_thread.stopped

# not supported
async def context_change(self, header: str):
pass
Loading

0 comments on commit b025242

Please sign in to comment.