-
Notifications
You must be signed in to change notification settings - Fork 16
/
model.py
executable file
·99 lines (74 loc) · 2.73 KB
/
model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import jade
import numpy as np
import json
def parse_RGB(message):
buffer_window = message["bufferWindow"]
# # ** FOR GREEN CHANNEL ONLY **
# X = np.array(message["array"])
# X = normalize_array(X)
# return json.dumps(parse_ICA_results(X, buffer_window))
# ** FOR RGB CHANNELS & ICA **
X = np.ndarray(shape = (3, buffer_window), buffer= np.array(message["array"]))
X = normalize_matrix(X)
ICA = jade.main(X)
return json.dumps(parse_ICA_results(ICA, buffer_window)) #message["time"]
def parse_ICA_results(ICA, buffer_window): #time
signals = {}
signals["id"] = "ICA"
signals["bufferWindow"] = buffer_window
# ** FOR RGB CHANNELS & ICA **
one = np.squeeze(np.asarray(ICA[:, 0])).tolist()
two = np.squeeze(np.asarray(ICA[:, 1])).tolist()
three = np.squeeze(np.asarray(ICA[:, 2])).tolist()
one = (np.hamming(len(one)) * one)
two = (np.hamming(len(two)) * two)
three = (np.hamming(len(three)) * three)
# print "one: ", one.astype(float).tolist()
# print "two: ", two.astype(float).tolist()
# print "three: ", three.astype(float).tolist()
one = np.absolute(np.square(np.fft.irfft(one))).astype(float).tolist()
two = np.absolute(np.square(np.fft.irfft(two))).astype(float).tolist()
three = np.absolute(np.square(np.fft.irfft(three))).astype(float).tolist()
power_ratio = [0, 0, 0]
power_ratio[0] = np.sum(one)/np.amax(one)
power_ratio[1] = np.sum(two)/np.amax(two)
power_ratio[2] = np.sum(three)/np.amax(three)
if np.argmax(power_ratio) == 0:
signals["array"] = one
elif np.argmax(power_ratio) == 1:
signals["array"] = two
else:
signals["array"] = three
# print power_ratio
# print signals
return signals
# # ** FOR GREEN CHANNEL ONLY **
# hamming = (np.hamming(len(ICA)) * ICA)
# fft = np.fft.rfft(hamming)
# fft = np.absolute(np.square(fft))
# signals["array"] = fft.astype(float).tolist()
# return signals
# ** experiments **
# ** for interpolation and hamming **
# even_times = np.linspace(time[0], time[-1], len(time))
# interpolated_two = np.interp(even_times, time, np.squeeze(np.asarray(ICA[:, 1])).tolist())
# interpolated_two = np.hamming(len(time)) * interpolated_two
# interpolated_two = interpolated_two - np.mean(interpolated_two)
# signals["two"] = interpolated_two.tolist()
# #fft = np.fft.rfft(np.squeeze(np.asarray(ICA[:, 1])))
# #signals["two"] = fft.astype(float).tolist()
def normalize_matrix(matrix):
# ** for matrix
for array in matrix:
average_of_array = np.mean(array)
std_dev = np.std(array)
for i in range(len(array)):
array[i] = ((array[i] - average_of_array)/std_dev)
return matrix
def normalize_array(array):
#** for array
average_of_array = np.mean(array)
std_dev = np.std(array)
for i in range(len(array)):
array[i] = ((array[i] - average_of_array)/std_dev)
return array