Skip to content

Commit a089f0a

Browse files
committed
http: api: telemetry: implement timeseries
1 parent 43fc18f commit a089f0a

17 files changed

Lines changed: 506 additions & 7 deletions

src/enapter/cli/http/api/telemetry_command.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from enapter import cli
44

55
from .telemetry_latest_command import TelemetryLatestCommand
6+
from .telemetry_timeseries_command import TelemetryTimeseriesCommand
67

78

89
class TelemetryCommand(cli.Command):
@@ -15,6 +16,7 @@ def register(parent: cli.Subparsers) -> None:
1516
subparsers = parser.add_subparsers(dest="telemetry_command", required=True)
1617
for command in [
1718
TelemetryLatestCommand,
19+
TelemetryTimeseriesCommand,
1820
]:
1921
command.register(subparsers)
2022

@@ -23,5 +25,7 @@ async def run(args: argparse.Namespace) -> None:
2325
match args.telemetry_command:
2426
case "latest":
2527
await TelemetryLatestCommand.run(args)
28+
case "timeseries":
29+
await TelemetryTimeseriesCommand.run(args)
2630
case _:
2731
raise NotImplementedError(args.telemetry_command)

src/enapter/cli/http/api/telemetry_latest_command.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,19 @@ def register(parent: cli.Subparsers) -> None:
1212
"latest", formatter_class=argparse.ArgumentDefaultsHelpFormatter
1313
)
1414
parser.add_argument(
15-
"device_attrs",
15+
"selectors",
1616
metavar="device:attr1,attr2,...,attrN",
1717
nargs="+",
18-
help="Device attributes to get the latest telemetry for",
18+
help="Device ID or slug followed by a colon and a comma-separated list of attribute names to fetch the telemetry for",
1919
)
2020

2121
@staticmethod
2222
async def run(args: argparse.Namespace) -> None:
23+
attributes_by_device = {}
24+
for selector in args.selectors:
25+
device, attributes = selector.split(":", 1)
26+
attributes_by_device[device] = attributes.split(",")
2327
async with http.api.Client(http.api.Config.from_env()) as client:
24-
attributes_by_device = {}
25-
for device_attrs in args.device_attrs:
26-
device, attrs_str = device_attrs.split(":", 1)
27-
attributes_by_device[device] = attrs_str.split(",")
2828
telemetry = await client.telemetry.latest(attributes_by_device)
2929
dto = {
3030
device: {
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
import argparse
2+
import datetime
3+
import json
4+
5+
from enapter import cli, http
6+
7+
8+
class TelemetryTimeseriesCommand(cli.Command):
9+
10+
@staticmethod
11+
def register(parent: cli.Subparsers) -> None:
12+
parser = parent.add_parser(
13+
"timeseries", formatter_class=argparse.ArgumentDefaultsHelpFormatter
14+
)
15+
parser.add_argument(
16+
"-f",
17+
"--time-from",
18+
required=True,
19+
help="Start time for the telemetry data (ISO 8601 format)",
20+
)
21+
parser.add_argument(
22+
"-t", "--time-to", help="End time for the telemetry data (ISO 8601 format)"
23+
)
24+
parser.add_argument(
25+
"-s",
26+
"--shape",
27+
choices=["w", "wide", "l", "long"],
28+
default="long",
29+
help="Shape of the output data",
30+
)
31+
parser.add_argument(
32+
"-g",
33+
"--granularity",
34+
type=int,
35+
default=60 * 60,
36+
help="Granularity of the telemetry data in seconds",
37+
)
38+
parser.add_argument(
39+
"selectors",
40+
metavar="device:attr1,attr2,...,attrN",
41+
nargs="+",
42+
help="Device ID or slug followed by a colon and a comma-separated list of attribute names to fetch the telemetry for",
43+
)
44+
45+
@staticmethod
46+
async def run(args: argparse.Namespace) -> None:
47+
attributes_by_device = {}
48+
for selector in args.selectors:
49+
device, attributes = selector.split(":", 1)
50+
attributes_by_device[device] = attributes.split(",")
51+
async with http.api.Client(http.api.Config.from_env()) as client:
52+
time_to = (
53+
datetime.datetime.fromisoformat(args.time_to)
54+
if args.time_to is not None
55+
else datetime.datetime.now(datetime.timezone.utc)
56+
)
57+
time_from = (
58+
datetime.datetime.fromisoformat(args.time_from)
59+
if args.time_from is not None
60+
else time_to - datetime.timedelta(minutes=10)
61+
)
62+
match args.shape:
63+
case "l" | "long":
64+
await TelemetryTimeseriesCommand._handle_long_timeseries(
65+
args, client, time_from, time_to, attributes_by_device
66+
)
67+
case "w" | "wide":
68+
await TelemetryTimeseriesCommand._handle_wide_timeseries(
69+
args, client, time_from, time_to, attributes_by_device
70+
)
71+
case _:
72+
raise NotImplementedError(args.shape)
73+
74+
@staticmethod
75+
async def _handle_long_timeseries(
76+
args: argparse.Namespace,
77+
client: http.api.Client,
78+
time_from: datetime.datetime,
79+
time_to: datetime.datetime,
80+
attributes_by_device: dict[str, list[str]],
81+
) -> None:
82+
async with client.telemetry.long_timeseries(
83+
from_=time_from,
84+
to=time_to,
85+
granularity=args.granularity,
86+
selectors=[
87+
http.api.telemetry.Selector(device=device, attributes=attributes)
88+
for device, attributes in attributes_by_device.items()
89+
],
90+
) as stream:
91+
async for row in stream:
92+
print(json.dumps(row.to_dto()))
93+
94+
@staticmethod
95+
async def _handle_wide_timeseries(
96+
args: argparse.Namespace,
97+
client: http.api.Client,
98+
time_from: datetime.datetime,
99+
time_to: datetime.datetime,
100+
attributes_by_device: dict[str, list[str]],
101+
) -> None:
102+
wide_timeseries = await client.telemetry.wide_timeseries(
103+
from_=time_from,
104+
to=time_to,
105+
granularity=args.granularity,
106+
selectors=[
107+
http.api.telemetry.Selector(device=device, attributes=attributes)
108+
for device, attributes in attributes_by_device.items()
109+
],
110+
)
111+
print(json.dumps(wide_timeseries.to_dto()))
Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,33 @@
1+
from .aggregation import Aggregation
12
from .client import Client
3+
from .data_type import DataType
4+
from .gap_filling import GapFilling
5+
from .gap_filling_method import GapFillingMethod
6+
from .labels import Labels
27
from .latest_datapoint import LatestDatapoint
8+
from .long_timeseries_row import LongTimeseriesRow
9+
from .long_timeseries_stream import LongTimeseriesStream
10+
from .raw_timeseries_row import RawTimeseriesRow
11+
from .raw_timeseries_stream import RawTimeseriesStream
12+
from .selector import Selector
13+
from .wide_timeseries import WideTimeseries
14+
from .wide_timeseries_column import WideTimeseriesColumn
315

4-
__all__ = ["Client", "LatestDatapoint"]
16+
__all__ = [
17+
"Aggregation",
18+
"Client",
19+
"DataType",
20+
"GapFilling",
21+
"GapFillingMethod",
22+
"Labels",
23+
"LatestDatapoint",
24+
"LongTimeseriesRow",
25+
"LongTimeseriesStream",
26+
"RawTimeseriesRow",
27+
"RawTimeseriesStream",
28+
"Selector",
29+
"TimeseriesStream",
30+
"TimeseriesStreamRow",
31+
"WideTimeseries",
32+
"WideTimeseriesColumn",
33+
]
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
import enum
2+
3+
4+
class Aggregation(enum.Enum):
5+
6+
AUTO = "AUTO"
7+
AVG = "AVG"
8+
LAST = "LAST"
9+
MIN = "MIN"
10+
MAX = "MAX"
11+
BOOL_OR = "BOOL_OR"

src/enapter/http/api/telemetry/client.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,17 @@
1+
import contextlib
12
import datetime
3+
from typing import AsyncGenerator
24

35
import httpx
46

57
from enapter.http import api
68

9+
from .data_type import DataType
710
from .latest_datapoint import LatestDatapoint
11+
from .long_timeseries_stream import LongTimeseriesStream
12+
from .raw_timeseries_stream import RawTimeseriesStream
13+
from .selector import Selector
14+
from .wide_timeseries import WideTimeseries
815

916

1017
class Client:
@@ -39,3 +46,60 @@ async def latest(
3946
}
4047
for device, datapoints in response.json().get("telemetry", {}).items()
4148
}
49+
50+
@contextlib.asynccontextmanager
51+
async def long_timeseries(
52+
self,
53+
from_: datetime.datetime,
54+
to: datetime.datetime,
55+
granularity: int | datetime.timedelta,
56+
selectors: list[Selector],
57+
) -> AsyncGenerator[LongTimeseriesStream, None]:
58+
async with self.raw_timeseries(
59+
from_=from_, to=to, granularity=granularity, selectors=selectors
60+
) as stream:
61+
yield LongTimeseriesStream.from_raw_stream(stream)
62+
63+
async def wide_timeseries(
64+
self,
65+
from_: datetime.datetime,
66+
to: datetime.datetime,
67+
granularity: int | datetime.timedelta,
68+
selectors: list[Selector],
69+
) -> WideTimeseries:
70+
async with self.raw_timeseries(
71+
from_=from_, to=to, granularity=granularity, selectors=selectors
72+
) as stream:
73+
return await WideTimeseries.from_raw_stream(stream)
74+
75+
@contextlib.asynccontextmanager
76+
async def raw_timeseries(
77+
self,
78+
from_: datetime.datetime,
79+
to: datetime.datetime,
80+
granularity: int | datetime.timedelta,
81+
selectors: list[Selector],
82+
) -> AsyncGenerator[RawTimeseriesStream, None]:
83+
if to <= from_:
84+
raise ValueError("`to` must be greater than `from_`")
85+
if isinstance(granularity, datetime.timedelta):
86+
granularity = int(granularity.total_seconds())
87+
payload = {
88+
"from": from_.isoformat(),
89+
"to": to.isoformat(),
90+
"granularity": str(granularity) + "s",
91+
"telemetry": [selector.to_dto() for selector in selectors],
92+
}
93+
async with self._client.stream(
94+
"POST",
95+
"v3/telemetry/query_timeseries",
96+
headers={"Accept": "text/csv"},
97+
json=payload,
98+
) as response:
99+
api.check_error(response)
100+
data_types = [
101+
DataType(dt.strip().upper())
102+
for dt in response.headers["X-Enapter-Timeseries-Data-Types"].split(",")
103+
]
104+
lines = response.aiter_lines()
105+
yield await RawTimeseriesStream.new(data_types, lines)
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import enum
2+
3+
4+
class DataType(enum.Enum):
5+
6+
FLOAT = "FLOAT"
7+
INTEGER = "INTEGER"
8+
STRING = "STRING"
9+
STRING_ARRAY = "STRING_ARRAY"
10+
BOOLEAN = "BOOLEAN"
11+
12+
def parse_value(self, s: str) -> float | int | str | list[str] | bool | None:
13+
if not s:
14+
return None
15+
match self:
16+
case DataType.FLOAT:
17+
return float(s)
18+
case DataType.INTEGER:
19+
return int(s)
20+
case DataType.STRING:
21+
return s
22+
case DataType.STRING_ARRAY:
23+
raise NotImplementedError(self)
24+
case DataType.BOOLEAN:
25+
if s.lower() in ("true", "1"):
26+
return True
27+
elif s.lower() in ("false", "0"):
28+
return False
29+
else:
30+
raise ValueError(f"invalid boolean value: {s}")
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import dataclasses
2+
import datetime
3+
from typing import Any
4+
5+
from .gap_filling_method import GapFillingMethod
6+
7+
8+
@dataclasses.dataclass
9+
class GapFilling:
10+
11+
method: GapFillingMethod
12+
look_around: datetime.timedelta
13+
14+
def to_dto(self) -> dict[str, Any]:
15+
return {
16+
"method": self.method.value.lower(),
17+
"look_around": str(int(self.look_around.total_seconds())) + "s",
18+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
import enum
2+
3+
4+
class GapFillingMethod(enum.Enum):
5+
6+
NONE = "NONE"
7+
LOCF = "LOCF"
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
import collections
2+
from typing import Self
3+
4+
5+
class Labels(collections.UserDict):
6+
7+
@classmethod
8+
def parse(cls, s: str) -> Self:
9+
return cls(kv.split("=") for kv in s.split(" "))
10+
11+
@property
12+
def device(self) -> str:
13+
return self.data["device"]
14+
15+
@property
16+
def telemetry(self) -> str:
17+
return self.data["telemetry"]

0 commit comments

Comments
 (0)