Skip to content

Commit a331cea

Browse files
committed
Allow formatting data as binary protocol messages
1 parent ac49256 commit a331cea

1 file changed

Lines changed: 130 additions & 4 deletions

File tree

harp/io.py

Lines changed: 130 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@ class MessageType(IntEnum):
2424

2525

2626
_SECONDS_PER_TICK = 32e-6
27+
_PAYLOAD_TIMESTAMP_MASK = 0x10
2728
_messagetypes = [type.name for type in MessageType]
28-
_payloadtypes = {
29+
_dtypefrompayloadtype = {
2930
1: np.dtype(np.uint8),
3031
2: np.dtype(np.uint16),
3132
4: np.dtype(np.uint32),
@@ -36,6 +37,7 @@ class MessageType(IntEnum):
3637
136: np.dtype(np.int64),
3738
68: np.dtype(np.float32),
3839
}
40+
_payloadtypefromdtype = {v: k for k, v in _dtypefrompayloadtype.items()}
3941

4042

4143
def read(
@@ -140,20 +142,20 @@ def _fromraw(
140142
nrows = len(data) // stride
141143
payloadtype = data[4]
142144
payloadoffset = 5
143-
if payloadtype & 0x10 != 0:
145+
if payloadtype & _PAYLOAD_TIMESTAMP_MASK != 0:
144146
seconds = np.ndarray(nrows, dtype=np.uint32, buffer=data, offset=payloadoffset, strides=stride)
145147
payloadoffset += 4
146148
micros = np.ndarray(nrows, dtype=np.uint16, buffer=data, offset=payloadoffset, strides=stride)
147149
payloadoffset += 2
148150
time = micros * _SECONDS_PER_TICK + seconds
149-
payloadtype = payloadtype & ~np.uint8(0x10)
151+
payloadtype = payloadtype & ~np.uint8(_PAYLOAD_TIMESTAMP_MASK)
150152
if epoch is not None:
151153
time = epoch + pd.to_timedelta(time, "s") # type: ignore
152154
index = pd.Series(time)
153155
index.name = "Time"
154156

155157
payloadsize = stride - payloadoffset - 1
156-
payloadtype = _payloadtypes[payloadtype]
158+
payloadtype = _dtypefrompayloadtype[payloadtype]
157159
if dtype is not None and dtype != payloadtype:
158160
raise ValueError(f"expected payload type {dtype} but got {payloadtype}")
159161

@@ -176,3 +178,127 @@ def _fromraw(
176178
msgtype = pd.Categorical.from_codes(msgtype, categories=_messagetypes) # type: ignore
177179
result[MessageType.__name__] = msgtype
178180
return result
181+
182+
183+
def write(
184+
file: Union[str, bytes, PathLike[Any], BinaryIO],
185+
data: pd.DataFrame,
186+
address: int,
187+
dtype: Optional[np.dtype] = None,
188+
port: Optional[int] = None,
189+
epoch: Optional[datetime] = None,
190+
message_type: Optional[MessageType] = None,
191+
):
192+
"""Write single-register Harp data to the specified file.
193+
194+
Parameters
195+
----------
196+
file
197+
Open file object or filename where to store binary data from
198+
a single device register.
199+
data
200+
Pandas data frame containing message payload.
201+
address
202+
Register address used to identify all formatted Harp messages.
203+
dtype
204+
Data type of the register payload. If specified, all data will
205+
be converted before formatting the binary payload.
206+
port
207+
Optional port value used for all formatted Harp messages.
208+
epoch
209+
Reference datetime at which time zero begins. If specified,
210+
the input data frame must have a datetime index.
211+
message_type
212+
Optional message type used for all formatted Harp messages.
213+
If not specified, data must contain a MessageType column.
214+
"""
215+
buffer = format(data, address, dtype, port, epoch, message_type)
216+
buffer.tofile(file)
217+
218+
219+
def format(
220+
data: pd.DataFrame,
221+
address: int,
222+
dtype: Optional[np.dtype] = None,
223+
port: Optional[int] = None,
224+
epoch: Optional[datetime] = None,
225+
message_type: Optional[MessageType] = None,
226+
) -> npt.NDArray[np.uint8]:
227+
"""Format single-register Harp data as a flat binary buffer.
228+
229+
Parameters
230+
----------
231+
data
232+
Pandas data frame containing message payload.
233+
address
234+
Register address used to identify all formatted Harp messages.
235+
dtype
236+
Data type of the register payload. If specified, all data will
237+
be converted before formatting the binary payload.
238+
port
239+
Optional port value used for all formatted Harp messages.
240+
epoch
241+
Reference datetime at which time zero begins. If specified,
242+
the input data frame must have a datetime index.
243+
message_type
244+
Optional message type used for all formatted Harp messages.
245+
If not specified, data must contain a MessageType column.
246+
247+
Returns
248+
-------
249+
An array object containing message data formatted according
250+
to the Harp binary protocol.
251+
"""
252+
if len(data) == 0:
253+
return np.empty(0, dtype=np.uint8)
254+
255+
if "MessageType" in data.columns:
256+
msgtype = data["MessageType"].cat.codes
257+
payload = data.iloc[:, 0:-1].values
258+
elif message_type is not None:
259+
msgtype = message_type
260+
payload = data.values
261+
else:
262+
raise ValueError(f"message type must be specified either in the data or as argument")
263+
264+
time = data.index
265+
is_timestamped = True
266+
if epoch is not None:
267+
if not isinstance(time, pd.DatetimeIndex):
268+
raise ValueError(f"expected datetime index to encode with epoch but got {time.inferred_type}")
269+
time = (time - epoch).total_seconds()
270+
elif isinstance(time, pd.RangeIndex):
271+
is_timestamped = False
272+
273+
if dtype is not None:
274+
payload = payload.astype(dtype)
275+
276+
if port is None:
277+
port = 255
278+
279+
payloadtype = _payloadtypefromdtype[payload.dtype]
280+
payloadlength = payload.shape[1] * payload.dtype.itemsize
281+
stride = payloadlength + 6
282+
if is_timestamped:
283+
payloadtype |= _PAYLOAD_TIMESTAMP_MASK
284+
stride += 6
285+
286+
nrows = len(data)
287+
buffer = np.empty((nrows, stride), dtype=np.uint8)
288+
buffer[:, 0] = msgtype
289+
buffer[:, 1:5] = [stride - 2, address, port, payloadtype]
290+
291+
payloadoffset = 5
292+
if is_timestamped:
293+
seconds = time.astype(np.uint32)
294+
micros = np.around(((time - seconds) / _SECONDS_PER_TICK).values).astype(np.uint16)
295+
buffer[:, 5:9] = np.ndarray((nrows, 4), dtype=np.uint8, buffer=seconds.values)
296+
buffer[:, 9:11] = np.ndarray((nrows, 2), dtype=np.uint8, buffer=micros)
297+
payloadoffset += 6
298+
299+
payloadstop = payloadoffset + payloadlength
300+
buffer[:, payloadoffset:payloadstop] = np.ndarray(
301+
(nrows, payloadlength), dtype=np.uint8, buffer=np.ascontiguousarray(payload)
302+
)
303+
buffer[:, -1] = np.sum(buffer[:, 0:-1], axis=1, dtype=np.uint8)
304+
return buffer.reshape(-1)

0 commit comments

Comments
 (0)