-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathwebapp.py
executable file
·120 lines (92 loc) · 4.02 KB
/
webapp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
#!/usr/bin/env python
# ----------------------------------------------------------------------
# Numenta Platform for Intelligent Computing (NuPIC)
# Copyright (C) 2015, Numenta, Inc. Unless you have purchased from
# Numenta, Inc. a separate commercial license for this software code, the
# following terms and conditions apply:
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 3 as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see http://www.gnu.org/licenses.
#
# http://numenta.org/licenses/
# ----------------------------------------------------------------------
import calendar
import json
import os
from gevent import pywsgi
from nta.utils import message_bus_connector
from nta.utils.config import Config
from htmengine import repository
from htmengine.adapters.datasource import createDatasourceAdapter
from htmengine.exceptions import MetricAlreadyMonitored
from htmengine.repository import schema
appConfig = Config("application.conf", os.environ["APPLICATION_CONFIG_PATH"])
MESSAGE_QUEUE_NAME = appConfig.get("metric_listener", "queue_name")
bus = message_bus_connector.MessageBusConnector()
def sendSample(bus, metricName, value, epochTimestamp):
singleDataPoint = "%s %r %d" % (metricName, value, epochTimestamp)
msg = json.dumps(dict(protocol="plain", data=[singleDataPoint]))
bus.publish(mqName=MESSAGE_QUEUE_NAME, body=msg, persistent=True)
def handler(environ, start_response):
metricName = environ["PATH_INFO"]
if environ["REQUEST_METHOD"] == "PUT":
# Trigger model creation...
modelSpec = {
"datasource": "custom",
"metricSpec": {
"metric": metricName
},
"modelParams": {}
}
try:
modelSpec["modelParams"].update(json.load(environ["wsgi.input"]))
except Exception as e:
start_response("400 Bad Request", [("Content-Type", "text/html")])
yield "Unable to parse request"
adapter = createDatasourceAdapter(modelSpec["datasource"])
try:
modelId = adapter.monitorMetric(modelSpec)
start_response("201 Created", [("Content-Type", "text/html")])
yield "Created %s\n" % modelId
except MetricAlreadyMonitored:
start_response("400 Bad Request", [("Content-Type", "text/html")])
yield "Model already exists for %s" % metricName
elif environ["REQUEST_METHOD"] == "POST":
# Send data...
start_response("200 OK", [("Content-Type", "text/html")])
for sample in environ["wsgi.input"]:
value, ts = sample.split(" ")
sendSample(bus, metricName=metricName, value=float(value),
epochTimestamp=int(ts))
yield "Saved %s %f @ %d\n" % (metricName, float(value), int(ts))
elif environ["REQUEST_METHOD"] == "GET":
with repository.engineFactory(appConfig).connect() as conn:
fields = (schema.metric_data.c.metric_value,
schema.metric_data.c.timestamp,
schema.metric_data.c.rowid,
schema.metric_data.c.anomaly_score)
sort = schema.metric_data.c.timestamp.asc()
metricObj = repository.getCustomMetricByName(conn, metricName, fields=[
schema.metric.c.uid])
result = repository.getMetricData(conn,
metricId=metricObj.uid,
fields=fields,
sort=sort)
start_response("200 OK", [("Content-Type", "text/html")])
for row in result:
yield " ".join((
metricName,
str(row.metric_value),
str(calendar.timegm(row.timestamp.timetuple())),
str(row.anomaly_score))) + "\n"
server = pywsgi.WSGIServer(('', 8080), handler)
server.serve_forever()