-
-
Notifications
You must be signed in to change notification settings - Fork 15
Expand file tree
/
Copy pathmetrics_response.py
More file actions
264 lines (237 loc) · 11 KB
/
metrics_response.py
File metadata and controls
264 lines (237 loc) · 11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
import copy
import json
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Union
from .metrics_shared import OpenPAYGOMetricsShared
from .models import MetricsDataFormat
class MetricsResponseHandler(object):
def __init__(
self,
received_metrics: str,
data_format: Optional[Union[Dict[str, Any], MetricsDataFormat]] = None,
secret_key: Optional[str] = None,
last_request_count: Optional[int] = None,
last_request_timestamp: Optional[int] = None,
) -> None:
self.received_metrics = received_metrics
self.request_dict = json.loads(received_metrics)
# We convert the base variable names to simple
self.request_dict = OpenPAYGOMetricsShared.convert_dict_keys_to_simple(
self.request_dict
)
# We add the reception timestamp if not timestamp was provided
self.request_timestamp = self.request_dict.get("timestamp")
if not self.request_dict.get("timestamp"):
self.timestamp = int(datetime.now().timestamp())
else:
self.timestamp = self.request_dict.get("timestamp")
self.response_dict: Dict[str, Any] = {}
self.secret_key = secret_key
self.last_request_count = last_request_count
self.last_request_timestamp = last_request_timestamp
if data_format is not None:
self.data_format: Optional[Dict[str, Any]] = dict(data_format)
else:
self.data_format = None
if not self.data_format and self.request_dict.get("data_format"):
self.data_format = self.request_dict.get("data_format")
def get_device_serial(self) -> str:
return self.request_dict.get("serial_number")
def get_data_format_id(self) -> Optional[int]:
return self.request_dict.get("data_format_id")
def data_format_available(self) -> bool:
return self.data_format is not None
def set_device_parameters(
self,
secret_key: Optional[str] = None,
data_format: Optional[Union[Dict[str, Any], MetricsDataFormat]] = None,
last_request_count: Optional[int] = None,
last_request_timestamp: Optional[int] = None,
) -> None:
if secret_key:
self.secret_key = secret_key
if data_format is not None:
self.data_format = dict(data_format)
if last_request_count:
self.last_request_count = last_request_count
if last_request_timestamp:
self.last_request_timestamp = last_request_timestamp
def is_auth_valid(self) -> bool:
auth_string = self.request_dict.get("auth", None)
if not auth_string:
return False
elif not self.secret_key:
raise ValueError("Secret key is required to check the auth.")
self.auth_method = auth_string[:2]
new_signature = OpenPAYGOMetricsShared.generate_request_signature_from_data(
self.request_dict, self.auth_method, self.secret_key
)
if auth_string == new_signature:
request_count = self.get_request_count()
if (
request_count
and self.last_request_count
and request_count <= self.last_request_count
):
return False
timestamp = self.get_request_timestamp()
if (
timestamp
and self.last_request_timestamp
and timestamp <= self.last_request_timestamp
):
return False
# Either the request count or timestamp is required
if request_count or timestamp:
return True
return False
def get_simple_metrics(self) -> Dict[str, Any]:
# We start the process by making a copy of the dict to work with
simple_dict = copy.deepcopy(self.request_dict)
simple_dict.pop("auth") if "auth" in simple_dict else None # We remove the auth
# We process the data and replace it
simple_dict["data"] = self._get_simple_data()
# We process the historical data
simple_dict["historical_data"] = self._get_simple_historical_data()
# We fill in the timestamps for each time step
simple_dict["historical_data"] = self._fill_timestamp_in_historical_data(
simple_dict["historical_data"]
)
return simple_dict
def get_data_timestamp(self) -> int:
return self.request_dict.get("data_collection_timestamp", self.timestamp)
def get_request_timestamp(self) -> Optional[int]:
return self.request_timestamp
def get_request_count(self) -> Optional[int]:
return self.request_dict.get("request_count")
def get_token_count(self) -> Optional[int]:
data = self._get_simple_data()
return data.get("token_count")
def expects_token_answer(self) -> bool:
return self.get_token_count() is not None
def add_tokens_to_answer(self, token_list: List[str]) -> None:
self.response_dict["token_list"] = token_list
def expects_time_answer(self) -> bool:
data = self._get_simple_data()
if data.get("active_until_timestamp_requested", False) or data.get(
"active_seconds_left_requested", False
):
return True
return False
def add_time_to_answer(self, target_datetime: datetime) -> None:
data = self._get_simple_data()
if data.get("active_until_timestamp_requested", False):
target_timestamp = 0
if target_datetime:
if target_datetime.year > 1970:
target_timestamp = int(target_datetime.timestamp())
self.response_dict["active_until_timestamp"] = target_timestamp
elif data.get("active_seconds_left_requested", False):
seconds_left = (
(datetime.now() - target_datetime).total_seconds()
if target_datetime
else 0
)
self.response_dict["active_seconds_left"] = (
seconds_left if seconds_left > 0 else 0
)
else:
raise ValueError("No time requested")
def add_new_base_url_to_answer(self, new_base_url: str) -> None:
self.add_settings_to_answer({"base_url": new_base_url})
def add_settings_to_answer(self, settings_dict: Dict[str, Any]) -> None:
if not self.response_dict.get("settings"):
self.response_dict["settings"] = {}
self.response_dict["settings"].update(settings_dict)
def add_extra_data_to_answer(self, extra_data_dict: Dict[str, Any]) -> None:
if not self.response_dict.get("extra_data"):
self.response_dict["extra_data"] = {}
self.response_dict["extra_data"].update(extra_data_dict)
def get_answer_payload(self) -> str:
payload = self.get_answer_dict()
return OpenPAYGOMetricsShared.convert_to_metrics_json(payload)
def get_answer_dict(self) -> Dict[str, Any]:
# If there is not data format, we just return the full response
condensed_answer = copy.deepcopy(self.response_dict)
if self.secret_key:
condensed_answer["auth"] = (
OpenPAYGOMetricsShared.generate_response_signature_from_data(
serial_number=self.request_dict.get("serial_number"),
request_count=self.request_dict.get("request_count"),
data=condensed_answer,
timestamp=self.request_dict.get("timestamp"),
secret_key=self.secret_key,
)
)
return OpenPAYGOMetricsShared.convert_dict_keys_to_condensed(condensed_answer)
def _get_simple_data(self):
data = copy.deepcopy(self.request_dict.get("data"))
# If no data or not condensed in list, we just return it
if not data:
return {}
if not isinstance(data, list):
return data
data_order = self.data_format.get("data_order")
if not data_order:
raise ValueError("Data Format does not contain data_order")
clean_data = {}
data_len = len(data)
for idx, var in enumerate(data_order):
clean_data[var] = data[idx] if idx < data_len else None
data = data[data_len:]
if len(data) > 0:
raise ValueError(
"Additional variables not present in the data format: " + str(data)
)
return OpenPAYGOMetricsShared.convert_dict_keys_to_simple(clean_data)
def _get_simple_historical_data(self):
historical_data = copy.deepcopy(self.request_dict.get("historical_data"))
if not historical_data:
return []
historical_data_order = self.data_format.get("historical_data_order")
clean_historical_data = []
for time_step in historical_data:
time_step_data = {}
if isinstance(time_step, list):
if not historical_data_order:
raise ValueError(
"Data Format does not contain historical_data_order"
)
timse_step_len = len(time_step)
for idx, var in enumerate(historical_data_order):
if idx < timse_step_len:
time_step_data[var] = time_step[idx]
time_step = time_step[timse_step_len:]
if len(time_step) > 0:
raise ValueError(
"Additional variables not present in the historical data format"
": " + str(time_step)
)
elif isinstance(time_step, dict):
for key in time_step:
if key.isdigit() and int(key) < len(historical_data_order):
time_step_data[historical_data_order[int(key)]] = time_step[key]
else:
time_step_data[key] = time_step[key]
else:
raise ValueError("Invalid historical data step type: " + str(time_step))
clean_historical_data.append(time_step_data)
return clean_historical_data
def _fill_timestamp_in_historical_data(self, historical_data):
last_timestamp = datetime.fromtimestamp(self.get_data_timestamp())
for idx, time_step in enumerate(historical_data):
if time_step.get("relative_time") is not None:
last_timestamp = last_timestamp + timedelta(
seconds=int(time_step.get("relative_time"))
)
historical_data[idx]["timestamp"] = int(last_timestamp.timestamp())
del historical_data[idx]["relative_time"]
elif time_step.get("timestamp"):
last_timestamp = datetime.fromtimestamp(time_step.get("timestamp"))
else:
if idx != 0:
last_timestamp = last_timestamp + timedelta(
seconds=int(self.data_format.get("historical_data_interval"))
)
historical_data[idx]["timestamp"] = int(last_timestamp.timestamp())
return historical_data