Skip to content

Commit 9d05b58

Browse files
authored
Merge pull request #44 from harp-tech/unified-io
Refactor IO API for consistency and symmetry
2 parents 52dbc5d + 63194f5 commit 9d05b58

5 files changed

Lines changed: 77 additions & 153 deletions

File tree

harp/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
from harp.io import REFERENCE_EPOCH, MessageType, parse, read
1+
from harp.io import REFERENCE_EPOCH, MessageType, read, to_buffer, to_file
22
from harp.reader import create_reader
33
from harp.schema import read_schema
44

5-
__all__ = ["REFERENCE_EPOCH", "MessageType", "parse", "read", "create_reader", "read_schema"]
5+
__all__ = ["REFERENCE_EPOCH", "MessageType", "read", "to_buffer", "to_file", "create_reader", "read_schema"]

harp/io.py

Lines changed: 40 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import pandas as pd
99
from pandas._typing import Axes
1010

11-
from harp.typing import BufferLike
11+
from harp.typing import _BufferLike, _FileLike
1212

1313
REFERENCE_EPOCH = datetime(1904, 1, 1)
1414
"""The reference epoch for UTC harp time."""
@@ -41,30 +41,30 @@ class MessageType(IntEnum):
4141

4242

4343
def read(
44-
file: Union[str, bytes, PathLike[Any], BinaryIO],
44+
file_or_buf: Union[_FileLike, _BufferLike],
4545
address: Optional[int] = None,
4646
dtype: Optional[np.dtype] = None,
4747
length: Optional[int] = None,
4848
columns: Optional[Axes] = None,
4949
epoch: Optional[datetime] = None,
5050
keep_type: bool = False,
5151
):
52-
"""Read single-register Harp data from the specified file.
52+
"""Read single-register Harp data from the specified file or buffer.
5353
5454
Parameters
5555
----------
56-
file
57-
Open file object or filename containing binary data from
56+
file_or_buf
57+
File path, open file object, or buffer containing binary data from
5858
a single device register.
5959
address
6060
Expected register address. If specified, the address of
61-
the first message in the file is used for validation.
61+
the first message is used for validation.
6262
dtype
6363
Expected data type of the register payload. If specified, the
64-
payload type of the first message in the file is used for validation.
64+
payload type of the first message is used for validation.
6565
length
6666
Expected number of elements in register payload. If specified, the
67-
payload length of the first message in the file is used for validation.
67+
payload length of the first message is used for validation.
6868
columns
6969
The optional column labels to use for the data values.
7070
epoch
@@ -77,60 +77,13 @@ def read(
7777
-------
7878
A pandas data frame containing message data, sorted by time.
7979
"""
80-
data = np.fromfile(file, dtype=np.uint8)
81-
return _fromraw(data, address, dtype, length, columns, epoch, keep_type)
82-
83-
84-
def parse(
85-
buffer: BufferLike,
86-
address: Optional[int] = None,
87-
dtype: Optional[np.dtype] = None,
88-
length: Optional[int] = None,
89-
columns: Optional[Axes] = None,
90-
epoch: Optional[datetime] = None,
91-
keep_type: bool = False,
92-
):
93-
"""Parse single-register Harp data from the specified buffer.
94-
95-
Parameters
96-
----------
97-
buffer
98-
An object that exposes a buffer interface containing binary data from
99-
a single device register.
100-
address
101-
Expected register address. If specified, the address of
102-
the first message in the buffer is used for validation.
103-
dtype
104-
Expected data type of the register payload. If specified, the
105-
payload type of the first message in the buffer is used for validation.
106-
length
107-
Expected number of elements in register payload. If specified, the
108-
payload length of the first message in the buffer is used for validation.
109-
columns
110-
The optional column labels to use for the data values.
111-
epoch
112-
Reference datetime at which time zero begins. If specified,
113-
the result data frame will have a datetime index.
114-
keep_type
115-
Specifies whether to include a column with the message type.
116-
117-
Returns
118-
-------
119-
A pandas data frame containing message data, sorted by time.
120-
"""
121-
data = np.frombuffer(buffer, dtype=np.uint8)
122-
return _fromraw(data, address, dtype, length, columns, epoch, keep_type)
123-
80+
if isinstance(file_or_buf, (str, PathLike, BinaryIO)) or hasattr(file_or_buf, "readinto"):
81+
# TODO: in the below we ignore the type as otherwise
82+
# we have no way to runtime check _IOProtocol
83+
data = np.fromfile(file_or_buf, dtype=np.uint8) # type: ignore
84+
else:
85+
data = np.frombuffer(file_or_buf, dtype=np.uint8)
12486

125-
def _fromraw(
126-
data: npt.NDArray[np.uint8],
127-
address: Optional[int] = None,
128-
dtype: Optional[np.dtype] = None,
129-
length: Optional[int] = None,
130-
columns: Optional[Axes] = None,
131-
epoch: Optional[datetime] = None,
132-
keep_type: bool = False,
133-
):
13487
if len(data) == 0:
13588
return pd.DataFrame(
13689
columns=columns,
@@ -185,11 +138,12 @@ def _fromraw(
185138
return result
186139

187140

188-
def write(
189-
file: Union[str, bytes, PathLike[Any], BinaryIO],
141+
def to_file(
190142
data: pd.DataFrame,
143+
file: _FileLike,
191144
address: int,
192145
dtype: Optional[np.dtype] = None,
146+
length: Optional[int] = None,
193147
port: Optional[int] = None,
194148
epoch: Optional[datetime] = None,
195149
message_type: Optional[MessageType] = None,
@@ -198,16 +152,19 @@ def write(
198152
199153
Parameters
200154
----------
201-
file
202-
Open file object or filename where to store binary data from
203-
a single device register.
204155
data
205156
Pandas data frame containing message payload.
157+
file
158+
File path, or open file object in which to store binary data from
159+
a single device register.
206160
address
207161
Register address used to identify all formatted Harp messages.
208162
dtype
209163
Data type of the register payload. If specified, all data will
210164
be converted before formatting the binary payload.
165+
length
166+
Expected number of elements in register payload. If specified, the
167+
number of columns in the input data frame is validated.
211168
port
212169
Optional port value used for all formatted Harp messages.
213170
epoch
@@ -217,19 +174,20 @@ def write(
217174
Optional message type used for all formatted Harp messages.
218175
If not specified, data must contain a MessageType column.
219176
"""
220-
buffer = format(data, address, dtype, port, epoch, message_type)
177+
buffer = to_buffer(data, address, dtype, port, length, epoch, message_type)
221178
buffer.tofile(file)
222179

223180

224-
def format(
181+
def to_buffer(
225182
data: pd.DataFrame,
226183
address: int,
227184
dtype: Optional[np.dtype] = None,
185+
length: Optional[int] = None,
228186
port: Optional[int] = None,
229187
epoch: Optional[datetime] = None,
230188
message_type: Optional[MessageType] = None,
231189
) -> npt.NDArray[np.uint8]:
232-
"""Format single-register Harp data as a flat binary buffer.
190+
"""Convert single-register Harp data to a flat binary buffer.
233191
234192
Parameters
235193
----------
@@ -240,6 +198,9 @@ def format(
240198
dtype
241199
Data type of the register payload. If specified, all data will
242200
be converted before formatting the binary payload.
201+
length
202+
Expected number of elements in register payload. If specified, the
203+
number of columns in the input data frame is validated.
243204
port
244205
Optional port value used for all formatted Harp messages.
245206
epoch
@@ -254,12 +215,13 @@ def format(
254215
An array object containing message data formatted according
255216
to the Harp binary protocol.
256217
"""
257-
if len(data) == 0:
218+
nrows = len(data)
219+
if nrows == 0:
258220
return np.empty(0, dtype=np.uint8)
259221

260-
if "MessageType" in data.columns:
261-
msgtype = data["MessageType"].cat.codes
262-
payload = data[data.columns.drop("MessageType")].values
222+
if MessageType.__name__ in data.columns:
223+
msgtype = data[MessageType.__name__].cat.codes
224+
payload = data[data.columns.drop(MessageType.__name__)].values
263225
elif message_type is not None:
264226
msgtype = message_type
265227
payload = data.values
@@ -278,17 +240,20 @@ def format(
278240
if dtype is not None:
279241
payload = payload.astype(dtype, copy=False)
280242

243+
ncols = payload.shape[1]
244+
if length is not None and ncols != length:
245+
raise ValueError(f"expected payload length {length} but got {ncols}")
246+
281247
if port is None:
282248
port = 255
283249

284250
payloadtype = _payloadtypefromdtype[payload.dtype]
285-
payloadlength = payload.shape[1] * payload.dtype.itemsize
251+
payloadlength = ncols * payload.dtype.itemsize
286252
stride = payloadlength + 6
287253
if is_timestamped:
288254
payloadtype |= _PAYLOAD_TIMESTAMP_MASK
289255
stride += 6
290256

291-
nrows = len(data)
292257
buffer = np.empty((nrows, stride), dtype=np.uint8)
293258
buffer[:, 0] = msgtype
294259
buffer[:, 1:5] = [stride - 2, address, port, payloadtype]

harp/reader.py

Lines changed: 13 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -6,38 +6,29 @@
66
from math import log2
77
from os import PathLike
88
from pathlib import Path
9-
from typing import Any, BinaryIO, Callable, Iterable, Mapping, Optional, Protocol, Union
9+
from typing import Callable, Iterable, Mapping, Optional, Protocol, Union
1010

1111
from numpy import dtype
1212
from pandas import DataFrame, Series
1313
from pandas._typing import Axes
1414

15-
from harp.io import MessageType, parse, read
15+
from harp.io import MessageType, read
1616
from harp.model import BitMask, GroupMask, Model, PayloadMember, Register
1717
from harp.schema import read_schema
18-
from harp.typing import BufferLike
18+
from harp.typing import _BufferLike, _FileLike
1919

2020

2121
@dataclass
2222
class _ReaderParams:
23-
path: Path
23+
base_path: Path
2424
epoch: Optional[datetime] = None
2525
keep_type: bool = False
2626

2727

2828
class _ReadRegister(Protocol):
2929
def __call__(
3030
self,
31-
file: Optional[Union[str, bytes, PathLike[Any], BinaryIO]] = None,
32-
epoch: Optional[datetime] = None,
33-
keep_type: bool = False,
34-
) -> DataFrame: ...
35-
36-
37-
class _ParseRegister(Protocol):
38-
def __call__(
39-
self,
40-
buffer: BufferLike,
31+
file_or_buf: Optional[Union[_FileLike, _BufferLike]] = None,
4132
epoch: Optional[datetime] = None,
4233
keep_type: bool = False,
4334
) -> DataFrame: ...
@@ -46,17 +37,14 @@ def __call__(
4637
class RegisterReader:
4738
register: Register
4839
read: _ReadRegister
49-
parse: _ParseRegister
5040

5141
def __init__(
5242
self,
5343
register: Register,
5444
read: _ReadRegister,
55-
parse: _ParseRegister,
5645
) -> None:
5746
self.register = register
5847
self.read = read
59-
self.parse = parse
6048

6149

6250
class RegisterMap(UserDict[str, RegisterReader]):
@@ -180,16 +168,16 @@ def parser(df: DataFrame):
180168

181169
def _create_register_reader(register: Register, params: _ReaderParams):
182170
def reader(
183-
file: Optional[Union[str, bytes, PathLike[Any], BinaryIO]] = None,
171+
file_or_buf: Optional[Union[_FileLike, _BufferLike]] = None,
184172
columns: Optional[Axes] = None,
185173
epoch: Optional[datetime] = params.epoch,
186174
keep_type: bool = params.keep_type,
187175
):
188-
if file is None:
189-
file = f"{params.path}_{register.address}.bin"
176+
if file_or_buf is None:
177+
file_or_buf = f"{params.base_path}_{register.address}.bin"
190178

191179
data = read(
192-
file,
180+
file_or_buf,
193181
address=register.address,
194182
dtype=dtype(register.type),
195183
length=register.length,
@@ -202,46 +190,23 @@ def reader(
202190
return reader
203191

204192

205-
def _create_register_parser(register: Register, params: _ReaderParams):
206-
def parser(
207-
buffer: BufferLike,
208-
columns: Optional[Axes] = None,
209-
epoch: Optional[datetime] = params.epoch,
210-
keep_type: bool = params.keep_type,
211-
):
212-
return parse(
213-
buffer,
214-
address=register.address,
215-
dtype=dtype(register.type),
216-
length=register.length,
217-
columns=columns,
218-
epoch=epoch,
219-
keep_type=keep_type,
220-
)
221-
222-
return parser
223-
224-
225193
def _create_register_handler(device: Model, name: str, params: _ReaderParams):
226194
register = device.registers[name]
227195
reader = _create_register_reader(register, params)
228-
parser = _create_register_parser(register, params)
229196

230197
if register.maskType is not None:
231198
key = register.maskType.root
232199
bitMask = None if device.bitMasks is None else device.bitMasks.get(key)
233200
if bitMask is not None:
234201
bitmask_parser = _create_bitmask_parser(bitMask)
235202
reader = _compose_parser(bitmask_parser, reader, params)
236-
parser = _compose_parser(bitmask_parser, parser, params)
237-
return RegisterReader(register, reader, parser)
203+
return RegisterReader(register, reader)
238204

239205
groupMask = None if device.groupMasks is None else device.groupMasks.get(key)
240206
if groupMask is not None:
241207
groupmask_parser = _create_groupmask_parser(name, groupMask)
242208
reader = _compose_parser(groupmask_parser, reader, params)
243-
parser = _compose_parser(groupmask_parser, parser, params)
244-
return RegisterReader(register, reader, parser)
209+
return RegisterReader(register, reader)
245210

246211
if register.payloadSpec is not None:
247212
member_parsers = [
@@ -253,17 +218,15 @@ def payload_parser(df: DataFrame):
253218
return DataFrame({n: f(df) for n, f in member_parsers}, index=df.index)
254219

255220
reader = _compose_parser(payload_parser, reader, params)
256-
parser = _compose_parser(payload_parser, parser, params)
257-
return RegisterReader(register, reader, parser)
221+
return RegisterReader(register, reader)
258222

259223
columns = (
260224
[name]
261225
if register.length is None or register.length == 1
262226
else [f"{name}_{i}" for i in range(register.length)]
263227
)
264228
reader = partial(reader, columns=columns)
265-
parser = partial(parser, columns=columns)
266-
return RegisterReader(register, reader, parser)
229+
return RegisterReader(register, reader)
267230

268231

269232
def create_reader(

0 commit comments

Comments
 (0)