44from typing import Any , BinaryIO , Optional , Union
55
66import numpy as np
7+ import numpy .typing as npt
78import pandas as pd
89from pandas ._typing import Axes
910
11+ from harp .typing import _BufferLike , _FileLike
12+
1013REFERENCE_EPOCH = datetime (1904 , 1 , 1 )
1114"""The reference epoch for UTC harp time."""
1215
@@ -21,8 +24,9 @@ class MessageType(IntEnum):
2124
2225
2326_SECONDS_PER_TICK = 32e-6
27+ _PAYLOAD_TIMESTAMP_MASK = 0x10
2428_messagetypes = [type .name for type in MessageType ]
25- _payloadtypes = {
29+ _dtypefrompayloadtype = {
2630 1 : np .dtype (np .uint8 ),
2731 2 : np .dtype (np .uint16 ),
2832 4 : np .dtype (np .uint32 ),
@@ -33,33 +37,34 @@ class MessageType(IntEnum):
3337 136 : np .dtype (np .int64 ),
3438 68 : np .dtype (np .float32 ),
3539}
40+ _payloadtypefromdtype = {v : k for k , v in _dtypefrompayloadtype .items ()}
3641
3742
3843def read (
39- file : Union [str , bytes , PathLike [ Any ], BinaryIO ],
44+ file_or_buf : Union [_FileLike , _BufferLike ],
4045 address : Optional [int ] = None ,
4146 dtype : Optional [np .dtype ] = None ,
4247 length : Optional [int ] = None ,
4348 columns : Optional [Axes ] = None ,
4449 epoch : Optional [datetime ] = None ,
4550 keep_type : bool = False ,
4651):
47- """Read single-register Harp data from the specified file.
52+ """Read single-register Harp data from the specified file or buffer .
4853
4954 Parameters
5055 ----------
51- file
52- Open file object or filename containing binary data from
56+ file_or_buf
57+ File path, open file object, or buffer containing binary data from
5358 a single device register.
5459 address
5560 Expected register address. If specified, the address of
56- the first message in the file is used for validation.
61+ the first message is used for validation.
5762 dtype
5863 Expected data type of the register payload. If specified, the
59- payload type of the first message in the file is used for validation.
64+ payload type of the first message is used for validation.
6065 length
6166 Expected number of elements in register payload. If specified, the
62- payload length of the first message in the file is used for validation.
67+ payload length of the first message is used for validation.
6368 columns
6469 The optional column labels to use for the data values.
6570 epoch
@@ -72,9 +77,20 @@ def read(
7277 -------
7378 A pandas data frame containing message data, sorted by time.
7479 """
75- data = np .fromfile (file , dtype = np .uint8 )
80+ if isinstance (file_or_buf , (str , PathLike , BinaryIO )) or hasattr (file_or_buf , "readinto" ):
81+ # TODO: in the below we ignore the type as otherwise
82+ # we have no way to runtime check _IOProtocol
83+ data = np .fromfile (file_or_buf , dtype = np .uint8 ) # type: ignore
84+ else :
85+ data = np .frombuffer (file_or_buf , dtype = np .uint8 )
86+
7687 if len (data ) == 0 :
77- return pd .DataFrame (columns = columns , index = pd .Index ([], dtype = np .float64 , name = "Time" ))
88+ return pd .DataFrame (
89+ columns = columns ,
90+ index = pd .DatetimeIndex ([], name = "Time" )
91+ if epoch
92+ else pd .Index ([], dtype = np .float64 , name = "Time" ),
93+ )
7894
7995 if address is not None and address != data [2 ]:
8096 raise ValueError (f"expected address { address } but got { data [2 ]} " )
@@ -84,20 +100,20 @@ def read(
84100 nrows = len (data ) // stride
85101 payloadtype = data [4 ]
86102 payloadoffset = 5
87- if payloadtype & 0x10 != 0 :
103+ if payloadtype & _PAYLOAD_TIMESTAMP_MASK != 0 :
88104 seconds = np .ndarray (nrows , dtype = np .uint32 , buffer = data , offset = payloadoffset , strides = stride )
89105 payloadoffset += 4
90106 micros = np .ndarray (nrows , dtype = np .uint16 , buffer = data , offset = payloadoffset , strides = stride )
91107 payloadoffset += 2
92108 time = micros * _SECONDS_PER_TICK + seconds
93- payloadtype = payloadtype & ~ np .uint8 (0x10 )
109+ payloadtype = payloadtype & ~ np .uint8 (_PAYLOAD_TIMESTAMP_MASK )
94110 if epoch is not None :
95111 time = epoch + pd .to_timedelta (time , "s" ) # type: ignore
96112 index = pd .Series (time )
97113 index .name = "Time"
98114
99115 payloadsize = stride - payloadoffset - 1
100- payloadtype = _payloadtypes [payloadtype ]
116+ payloadtype = _dtypefrompayloadtype [payloadtype ]
101117 if dtype is not None and dtype != payloadtype :
102118 raise ValueError (f"expected payload type { dtype } but got { payloadtype } " )
103119
@@ -120,3 +136,139 @@ def read(
120136 msgtype = pd .Categorical .from_codes (msgtype , categories = _messagetypes ) # type: ignore
121137 result [MessageType .__name__ ] = msgtype
122138 return result
139+
140+
141+ def to_file (
142+ data : pd .DataFrame ,
143+ file : _FileLike ,
144+ address : int ,
145+ dtype : Optional [np .dtype ] = None ,
146+ length : Optional [int ] = None ,
147+ port : Optional [int ] = None ,
148+ epoch : Optional [datetime ] = None ,
149+ message_type : Optional [MessageType ] = None ,
150+ ):
151+ """Write single-register Harp data to the specified file.
152+
153+ Parameters
154+ ----------
155+ data
156+ Pandas data frame containing message payload.
157+ file
158+ File path, or open file object in which to store binary data from
159+ a single device register.
160+ address
161+ Register address used to identify all formatted Harp messages.
162+ dtype
163+ Data type of the register payload. If specified, all data will
164+ be converted before formatting the binary payload.
165+ length
166+ Expected number of elements in register payload. If specified, the
167+ number of columns in the input data frame is validated.
168+ port
169+ Optional port value used for all formatted Harp messages.
170+ epoch
171+ Reference datetime at which time zero begins. If specified,
172+ the input data frame must have a datetime index.
173+ message_type
174+ Optional message type used for all formatted Harp messages.
175+ If not specified, data must contain a MessageType column.
176+ """
177+ buffer = to_buffer (data , address , dtype , port , length , epoch , message_type )
178+ buffer .tofile (file )
179+
180+
181+ def to_buffer (
182+ data : pd .DataFrame ,
183+ address : int ,
184+ dtype : Optional [np .dtype ] = None ,
185+ length : Optional [int ] = None ,
186+ port : Optional [int ] = None ,
187+ epoch : Optional [datetime ] = None ,
188+ message_type : Optional [MessageType ] = None ,
189+ ) -> npt .NDArray [np .uint8 ]:
190+ """Convert single-register Harp data to a flat binary buffer.
191+
192+ Parameters
193+ ----------
194+ data
195+ Pandas data frame containing message payload.
196+ address
197+ Register address used to identify all formatted Harp messages.
198+ dtype
199+ Data type of the register payload. If specified, all data will
200+ be converted before formatting the binary payload.
201+ length
202+ Expected number of elements in register payload. If specified, the
203+ number of columns in the input data frame is validated.
204+ port
205+ Optional port value used for all formatted Harp messages.
206+ epoch
207+ Reference datetime at which time zero begins. If specified,
208+ the input data frame must have a datetime index.
209+ message_type
210+ Optional message type used for all formatted Harp messages.
211+ If not specified, data must contain a MessageType column.
212+
213+ Returns
214+ -------
215+ An array object containing message data formatted according
216+ to the Harp binary protocol.
217+ """
218+ nrows = len (data )
219+ if nrows == 0 :
220+ return np .empty (0 , dtype = np .uint8 )
221+
222+ if MessageType .__name__ in data .columns :
223+ msgtype = data [MessageType .__name__ ].cat .codes
224+ payload = data [data .columns .drop (MessageType .__name__ )].values
225+ elif message_type is not None :
226+ msgtype = message_type
227+ payload = data .values
228+ else :
229+ raise ValueError (f"message type must be specified either in the data or as argument" )
230+
231+ time = data .index
232+ is_timestamped = True
233+ if epoch is not None :
234+ if not isinstance (time , pd .DatetimeIndex ):
235+ raise ValueError (f"expected datetime index to encode with epoch but got { time .inferred_type } " )
236+ time = (time - epoch ).total_seconds ()
237+ elif isinstance (time , pd .RangeIndex ):
238+ is_timestamped = False
239+
240+ if dtype is not None :
241+ payload = payload .astype (dtype , copy = False )
242+
243+ ncols = payload .shape [1 ]
244+ if length is not None and ncols != length :
245+ raise ValueError (f"expected payload length { length } but got { ncols } " )
246+
247+ if port is None :
248+ port = 255
249+
250+ payloadtype = _payloadtypefromdtype [payload .dtype ]
251+ payloadlength = ncols * payload .dtype .itemsize
252+ stride = payloadlength + 6
253+ if is_timestamped :
254+ payloadtype |= _PAYLOAD_TIMESTAMP_MASK
255+ stride += 6
256+
257+ buffer = np .empty ((nrows , stride ), dtype = np .uint8 )
258+ buffer [:, 0 ] = msgtype
259+ buffer [:, 1 :5 ] = [stride - 2 , address , port , payloadtype ]
260+
261+ payloadoffset = 5
262+ if is_timestamped :
263+ seconds = time .astype (np .uint32 )
264+ micros = np .around (((time - seconds ) / _SECONDS_PER_TICK ).values ).astype (np .uint16 )
265+ buffer [:, 5 :9 ] = np .ndarray ((nrows , 4 ), dtype = np .uint8 , buffer = seconds .values )
266+ buffer [:, 9 :11 ] = np .ndarray ((nrows , 2 ), dtype = np .uint8 , buffer = micros )
267+ payloadoffset += 6
268+
269+ payloadstop = payloadoffset + payloadlength
270+ buffer [:, payloadoffset :payloadstop ] = np .ndarray (
271+ (nrows , payloadlength ), dtype = np .uint8 , buffer = np .ascontiguousarray (payload )
272+ )
273+ buffer [:, - 1 ] = np .sum (buffer [:, 0 :- 1 ], axis = 1 , dtype = np .uint8 )
274+ return buffer .reshape (- 1 )
0 commit comments