Skip to content

Commit 645bd5e

Browse files
committed
Add support for parsing data from a memory buffer
1 parent 6408c0b commit 645bd5e

4 files changed

Lines changed: 123 additions & 19 deletions

File tree

harp/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
from harp.io import REFERENCE_EPOCH, MessageType, read
1+
from harp.io import REFERENCE_EPOCH, MessageType, parse, read
22
from harp.reader import create_reader
33
from harp.schema import read_schema
44

5-
__all__ = ["REFERENCE_EPOCH", "MessageType", "read", "create_reader", "read_schema"]
5+
__all__ = ["REFERENCE_EPOCH", "MessageType", "parse", "read", "create_reader", "read_schema"]

harp/io.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,12 @@
44
from typing import Any, BinaryIO, Optional, Union
55

66
import numpy as np
7+
import numpy.typing as npt
78
import pandas as pd
89
from pandas._typing import Axes
910

11+
from harp.typing import BufferLike
12+
1013
REFERENCE_EPOCH = datetime(1904, 1, 1)
1114
"""The reference epoch for UTC harp time."""
1215

@@ -73,6 +76,59 @@ def read(
7376
A pandas data frame containing message data, sorted by time.
7477
"""
7578
data = np.fromfile(file, dtype=np.uint8)
79+
return _fromraw(data, address, dtype, length, columns, epoch, keep_type)
80+
81+
82+
def parse(
83+
buffer: BufferLike,
84+
address: Optional[int] = None,
85+
dtype: Optional[np.dtype] = None,
86+
length: Optional[int] = None,
87+
columns: Optional[Axes] = None,
88+
epoch: Optional[datetime] = None,
89+
keep_type: bool = False,
90+
):
91+
"""Parse single-register Harp data from the specified buffer.
92+
93+
Parameters
94+
----------
95+
buffer
96+
An object that exposes a buffer interface containing binary data from
97+
a single device register.
98+
address
99+
Expected register address. If specified, the address of
100+
the first message in the buffer is used for validation.
101+
dtype
102+
Expected data type of the register payload. If specified, the
103+
payload type of the first message in the buffer is used for validation.
104+
length
105+
Expected number of elements in register payload. If specified, the
106+
payload length of the first message in the buffer is used for validation.
107+
columns
108+
The optional column labels to use for the data values.
109+
epoch
110+
Reference datetime at which time zero begins. If specified,
111+
the result data frame will have a datetime index.
112+
keep_type
113+
Specifies whether to include a column with the message type.
114+
115+
Returns
116+
-------
117+
A pandas data frame containing message data, sorted by time.
118+
"""
119+
data = np.frombuffer(buffer, dtype=np.uint8)
120+
return _fromraw(data, address, dtype, length, columns, epoch, keep_type)
121+
122+
123+
def _fromraw(
124+
data: npt.NDArray[np.uint8],
125+
address: Optional[int] = None,
126+
dtype: Optional[np.dtype] = None,
127+
length: Optional[int] = None,
128+
columns: Optional[Axes] = None,
129+
epoch: Optional[datetime] = None,
130+
keep_type: bool = False,
131+
):
76132
if len(data) == 0:
77133
return pd.DataFrame(columns=columns, index=pd.Index([], dtype=np.float64, name="Time"))
78134

harp/reader.py

Lines changed: 55 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,10 @@
1212
from pandas import DataFrame, Series
1313
from pandas._typing import Axes
1414

15-
from harp.io import MessageType, read
15+
from harp.io import MessageType, parse, read
1616
from harp.model import BitMask, GroupMask, Model, PayloadMember, Register
1717
from harp.schema import read_schema
18+
from harp.typing import BufferLike
1819

1920

2021
@dataclass
@@ -33,17 +34,29 @@ def __call__(
3334
) -> DataFrame: ...
3435

3536

37+
class _ParseRegister(Protocol):
38+
def __call__(
39+
self,
40+
buffer: BufferLike,
41+
epoch: Optional[datetime] = None,
42+
keep_type: bool = False,
43+
) -> DataFrame: ...
44+
45+
3646
class RegisterReader:
3747
register: Register
3848
read: _ReadRegister
49+
parse: _ParseRegister
3950

4051
def __init__(
4152
self,
4253
register: Register,
4354
read: _ReadRegister,
55+
parse: _ParseRegister,
4456
) -> None:
4557
self.register = register
4658
self.read = read
59+
self.parse = parse
4760

4861

4962
class RegisterMap(UserDict[str, RegisterReader]):
@@ -81,12 +94,12 @@ def _compose_parser(
8194
params: _ReaderParams,
8295
) -> Callable[..., DataFrame]:
8396
def parser(
84-
file: Optional[Union[str, bytes, PathLike[Any], BinaryIO]] = None,
97+
data,
8598
columns: Optional[Axes] = None,
8699
epoch: Optional[datetime] = params.epoch,
87100
keep_type: bool = params.keep_type,
88101
):
89-
df = g(file, columns, epoch, keep_type)
102+
df = g(data, columns, epoch, keep_type)
90103
result = f(df)
91104
type_col = df.get(MessageType.__name__)
92105
if type_col is not None:
@@ -189,38 +202,63 @@ def reader(
189202
return reader
190203

191204

192-
def _create_register_parser(device: Model, name: str, params: _ReaderParams):
205+
def _create_register_parser(register: Register, params: _ReaderParams):
206+
def parser(
207+
buffer: BufferLike,
208+
columns: Optional[Axes] = None,
209+
epoch: Optional[datetime] = params.epoch,
210+
keep_type: bool = params.keep_type,
211+
):
212+
return parse(
213+
buffer,
214+
address=register.address,
215+
dtype=dtype(register.type),
216+
length=register.length,
217+
columns=columns,
218+
epoch=epoch,
219+
keep_type=keep_type,
220+
)
221+
222+
return parser
223+
224+
225+
def _create_register_handler(device: Model, name: str, params: _ReaderParams):
193226
register = device.registers[name]
194227
reader = _create_register_reader(register, params)
228+
parser = _create_register_parser(register, params)
195229

196230
if register.maskType is not None:
197231
key = register.maskType.root
198232
bitMask = None if device.bitMasks is None else device.bitMasks.get(key)
199233
if bitMask is not None:
200-
parser = _create_bitmask_parser(bitMask)
201-
reader = _compose_parser(parser, reader, params)
202-
return RegisterReader(register, reader)
234+
bitmask_parser = _create_bitmask_parser(bitMask)
235+
reader = _compose_parser(bitmask_parser, reader, params)
236+
parser = _compose_parser(bitmask_parser, parser, params)
237+
return RegisterReader(register, reader, parser)
203238

204239
groupMask = None if device.groupMasks is None else device.groupMasks.get(key)
205240
if groupMask is not None:
206-
parser = _create_groupmask_parser(name, groupMask)
207-
reader = _compose_parser(parser, reader, params)
208-
return RegisterReader(register, reader)
241+
groupmask_parser = _create_groupmask_parser(name, groupMask)
242+
reader = _compose_parser(groupmask_parser, reader, params)
243+
parser = _compose_parser(groupmask_parser, parser, params)
244+
return RegisterReader(register, reader, parser)
209245

210246
if register.payloadSpec is not None:
211-
payload_parsers = [
247+
member_parsers = [
212248
(key, _create_payloadmember_parser(device, member))
213249
for key, member in register.payloadSpec.items()
214250
]
215251

216-
def parser(df: DataFrame):
217-
return DataFrame({n: f(df) for n, f in payload_parsers}, index=df.index)
252+
def payload_parser(df: DataFrame):
253+
return DataFrame({n: f(df) for n, f in member_parsers}, index=df.index)
218254

219-
reader = _compose_parser(parser, reader, params)
220-
return RegisterReader(register, reader)
255+
reader = _compose_parser(payload_parser, reader, params)
256+
parser = _compose_parser(payload_parser, parser, params)
257+
return RegisterReader(register, reader, parser)
221258

222259
reader = partial(reader, columns=[name])
223-
return RegisterReader(register, reader)
260+
parser = partial(parser, columns=[name])
261+
return RegisterReader(register, reader, parser)
224262

225263

226264
def create_reader(
@@ -265,7 +303,7 @@ def create_reader(
265303
base_path = path / device.device if is_dir else path.parent / device.device
266304

267305
reg_readers = {
268-
name: _create_register_parser(device, name, _ReaderParams(base_path, epoch, keep_type))
306+
name: _create_register_handler(device, name, _ReaderParams(base_path, epoch, keep_type))
269307
for name in device.registers.keys()
270308
}
271309
return DeviceReader(device, reg_readers)

harp/typing.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
import mmap
2+
import sys
3+
from typing import Any, Union
4+
5+
from numpy.typing import NDArray
6+
7+
if sys.version_info >= (3, 12):
8+
from collections.abc import Buffer as BufferLike
9+
else:
10+
BufferLike = Union[bytes, bytearray, memoryview, mmap.mmap, NDArray[Any]]

0 commit comments

Comments
 (0)