forked from hardbyte/python-can
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrobotell.py
More file actions
403 lines (353 loc) · 15.4 KB
/
robotell.py
File metadata and controls
403 lines (353 loc) · 15.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
"""
Interface for Chinese Robotell compatible interfaces (win32/linux).
"""
import io
import logging
import time
from can import BusABC, CanProtocol, Message
from ..exceptions import CanInterfaceNotImplementedError, CanOperationError
logger = logging.getLogger(__name__)
try:
import serial
except ImportError:
logger.warning(
"You won't be able to use the Robotell can backend without "
"the serial module installed!"
)
serial = None
class robotellBus(BusABC):
"""
robotell interface
"""
_PACKET_HEAD = 0xAA # Frame starts with 2x FRAME_HEAD bytes
_PACKET_TAIL = 0x55 # Frame ends with 2x FRAME_END bytes
_PACKET_ESC = (
0xA5 # Escape char before any HEAD, TAIL or ESC chat (including in checksum)
)
_CAN_CONFIG_CHANNEL = 0xFF # Configuration channel of CAN
_CAN_SERIALBPS_ID = 0x01FFFE90 # USB Serial port speed
_CAN_ART_ID = 0x01FFFEA0 # Automatic retransmission
_CAN_ABOM_ID = 0x01FFFEB0 # Automatic bus management
_CAN_RESET_ID = 0x01FFFEC0 # ID for initialization
_CAN_BAUD_ID = 0x01FFFED0 # CAN baud rate
_CAN_FILTER_BASE_ID = 0x01FFFEE0 # ID for first filter (filter0)
_CAN_FILTER_MAX_ID = 0x01FFFEE0 + 13 # ID for the last filter (filter13)
_CAN_INIT_FLASH_ID = 0x01FFFEFF # Restore factory settings
_CAN_READ_SERIAL1 = 0x01FFFFF0 # Read first part of device serial number
_CAN_READ_SERIAL2 = 0x01FFFFF1 # Read first part of device serial number
_MAX_CAN_BAUD = 1000000 # Maximum supported CAN baud rate
_FILTER_ID_MASK = 0x0000000F # Filter ID mask
_CAN_FILTER_EXTENDED = 0x40000000 # Enable mask
_CAN_FILTER_ENABLE = 0x80000000 # Enable filter
_CAN_STANDARD_FMT = 0 # Standard message ID
_CAN_EXTENDED_FMT = 1 # 29 Bit extended format ID
_CAN_DATA_FRAME = 0 # Send data frame
_CAN_REMOTE_FRAME = 1 # Request remote frame
def __init__(
self, channel, ttyBaudrate=115200, bitrate=None, rtscts=False, **kwargs
):
"""
:param str channel:
port of underlying serial or usb device (e.g. ``/dev/ttyUSB0``, ``COM8``, ...)
Must not be empty. Can also end with ``@115200`` (or similarly) to specify the baudrate.
:param int ttyBaudrate:
baudrate of underlying serial or usb device (Ignored if set via the ``channel`` parameter)
:param int bitrate:
CAN Bitrate in bit/s. Value is stored in the adapter and will be used as default if no bitrate is specified
:param bool rtscts:
turn hardware handshake (RTS/CTS) on and off
"""
if serial is None:
raise CanInterfaceNotImplementedError("The serial module is not installed")
if not channel: # if None or empty
raise TypeError("Must specify a serial port.")
if "@" in channel:
channel, ttyBaudrate = channel.split("@")
self.serialPortOrig = serial.serial_for_url(
channel, baudrate=ttyBaudrate, rtscts=rtscts
)
# Disable flushing queued config ACKs on lookup channel (for unit tests)
self._loopback_test = channel == "loop://"
self._rxbuffer = bytearray() # raw bytes from the serial port
self._rxmsg = [] # extracted CAN messages waiting to be read
self._configmsg = [] # extracted config channel messages
self._writeconfig(self._CAN_RESET_ID, 0) # Not sure if this is really necessary
if bitrate is not None:
self.set_bitrate(bitrate)
self._can_protocol = CanProtocol.CAN_20
self.channel_info = (
f"Robotell USB-CAN s/n {self.get_serial_number(1)} on {channel}"
)
logger.info("Using device: %s", self.channel_info)
super().__init__(channel=channel, **kwargs)
def set_bitrate(self, bitrate):
"""
:raise ValueError: if *bitrate* is greater than 1000000
:param int bitrate:
Bitrate in bit/s
"""
if bitrate <= self._MAX_CAN_BAUD:
self._writeconfig(self._CAN_BAUD_ID, bitrate)
else:
raise ValueError(f"Invalid bitrate, must be less than {self._MAX_CAN_BAUD}")
def set_auto_retransmit(self, retrans_flag):
"""
:param bool retrans_flag:
Enable/disable automatic retransmission of unacknowledged CAN frames
"""
self._writeconfig(self._CAN_ART_ID, 1 if retrans_flag else 0)
def set_auto_bus_management(self, auto_man):
"""
:param bool auto_man:
Enable/disable automatic bus management
"""
# Not sure what "automatic bus management" does. Does not seem to control
# automatic ACK of CAN frames (listen only mode)
self._writeconfig(self._CAN_ABOM_ID, 1 if auto_man else 0)
def set_serial_rate(self, serial_bps):
"""
:param int serial_bps:
Set the baud rate of the serial port (not CAN) interface
"""
self._writeconfig(self._CAN_SERIALBPS_ID, serial_bps)
def set_hw_filter(self, filterid, enabled, msgid_value, msgid_mask, extended_msg):
"""
:raise ValueError: if *filterid* is not between 1 and 14
:param int filterid:
ID of filter (1-14)
:param bool enabled:
This filter is enabled
:param int msgid_value:
CAN message ID to filter on. The test unit does not accept an extented message ID unless bit 31 of the ID was set.
:param int msgid_mask:
Mask to apply to CAN messagge ID
:param bool extended_msg:
Filter operates on extended format messages
"""
if filterid < 1 or filterid > 14:
raise ValueError("Invalid filter ID. ID must be between 0 and 13")
else:
configid = self._CAN_FILTER_BASE_ID + (filterid - 1)
msgid_value += self._CAN_FILTER_ENABLE if enabled else 0
msgid_value += self._CAN_FILTER_EXTENDED if extended_msg else 0
self._writeconfig(configid, msgid_value, msgid_mask)
def _getconfigsize(self, configid):
if configid == self._CAN_ART_ID or configid == self._CAN_ABOM_ID:
return 1
if configid == self._CAN_BAUD_ID or configid == self._CAN_INIT_FLASH_ID:
return 4
if configid == self._CAN_SERIALBPS_ID:
return 4
if configid == self._CAN_READ_SERIAL1 or configid <= self._CAN_READ_SERIAL2:
return 8
if self._CAN_FILTER_BASE_ID <= configid <= self._CAN_FILTER_MAX_ID:
return 8
return 0
def _readconfig(self, configid, timeout):
self._writemessage(
msgid=configid,
msgdata=bytearray(8),
datalen=self._getconfigsize(configid),
msgchan=self._CAN_CONFIG_CHANNEL,
msgformat=self._CAN_EXTENDED_FMT,
msgtype=self._CAN_REMOTE_FRAME,
)
# Read message from config channel with result. Flush any previously pending config messages
newmsg = self._readmessage(not self._loopback_test, True, timeout)
if newmsg is None:
logger.warning(
f"Timeout waiting for response when reading config value {configid:04X}."
)
return None
return newmsg[4:12]
def _writeconfig(self, configid, value, value2=0):
configsize = self._getconfigsize(configid)
configdata = bytearray(configsize)
if configsize >= 1:
configdata[0] = value & 0xFF
if configsize >= 4:
configdata[1] = (value >> 8) & 0xFF
configdata[2] = (value >> 16) & 0xFF
configdata[3] = (value >> 24) & 0xFF
if configsize >= 8:
configdata[4] = value2 & 0xFF
configdata[5] = (value2 >> 8) & 0xFF
configdata[6] = (value2 >> 16) & 0xFF
configdata[7] = (value2 >> 24) & 0xFF
self._writemessage(
msgid=configid,
msgdata=configdata,
datalen=configsize,
msgchan=self._CAN_CONFIG_CHANNEL,
msgformat=self._CAN_EXTENDED_FMT,
msgtype=self._CAN_DATA_FRAME,
)
# Read message from config channel to verify. Flush any previously pending config messages
newmsg = self._readmessage(not self._loopback_test, True, 1)
if newmsg is None:
logger.warning(
"Timeout waiting for response when writing config value %d", configid
)
def _readmessage(self, flushold, cfgchannel, timeout):
header = bytearray([self._PACKET_HEAD, self._PACKET_HEAD])
terminator = bytearray([self._PACKET_TAIL, self._PACKET_TAIL])
msgqueue = self._configmsg if cfgchannel else self._rxmsg
if flushold:
del msgqueue[:]
# read what is already in serial port receive buffer - unless we are doing loopback testing
if not self._loopback_test:
while self.serialPortOrig.in_waiting:
self._rxbuffer += self.serialPortOrig.read()
# loop until we have read an appropriate message
start = time.time()
time_left = timeout
while True:
# make sure first bytes in RX buffer is a new packet header
headpos = self._rxbuffer.find(header)
if headpos > 0:
# data does not start with expected header bytes. Log error and ignore garbage
logger.warning("Ignoring extra " + str(headpos) + " garbage bytes")
del self._rxbuffer[:headpos]
headpos = self._rxbuffer.find(header) # should now be at index 0!
# check to see if we have a complete packet in the RX buffer
termpos = self._rxbuffer.find(terminator)
if headpos == 0 and termpos > headpos:
# copy packet into message structure and un-escape bytes
newmsg = bytearray()
idx = headpos + len(header)
while idx < termpos:
if self._rxbuffer[idx] == self._PACKET_ESC:
idx += 1
newmsg.append(self._rxbuffer[idx])
idx += 1
del self._rxbuffer[: termpos + len(terminator)]
# Check one - make sure message structure is the correct length
if len(newmsg) == 17:
# Check two - verify the checksum
cs = 0
for idx in range(16):
cs = (cs + newmsg[idx]) & 0xFF
if newmsg[16] == cs:
# OK, valid message - place it in the correct queue
if newmsg[13] == 0xFF: # Check for config channel
self._configmsg.append(newmsg)
else:
self._rxmsg.append(newmsg)
else:
logger.warning("Incorrect message checksum, discarded message")
else:
logger.warning(
"Invalid message structure length %d, ignoring message",
len(newmsg),
)
# Check if we have a message in the desired queue - if so copy and return
if len(msgqueue) > 0:
newmsg = msgqueue[0]
del msgqueue[:1]
return newmsg
# if we still don't have a complete message, do a blocking read
self.serialPortOrig.timeout = time_left
byte = self.serialPortOrig.read()
if byte:
self._rxbuffer += byte
# If there is time left, try next one with reduced timeout
if timeout is not None:
time_left = timeout - (time.time() - start)
if time_left <= 0:
return None
def _writemessage(self, msgid, msgdata, datalen, msgchan, msgformat, msgtype):
msgbuf = bytearray(17) # Message structure plus checksum byte
msgbuf[0] = msgid & 0xFF
msgbuf[1] = (msgid >> 8) & 0xFF
msgbuf[2] = (msgid >> 16) & 0xFF
msgbuf[3] = (msgid >> 24) & 0xFF
if msgtype == self._CAN_DATA_FRAME:
for idx in range(datalen):
msgbuf[idx + 4] = msgdata[idx]
msgbuf[12] = datalen
msgbuf[13] = msgchan
msgbuf[14] = msgformat
msgbuf[15] = msgtype
cs = 0
for idx in range(16):
cs = (cs + msgbuf[idx]) & 0xFF
msgbuf[16] = cs
packet = bytearray()
packet.append(self._PACKET_HEAD)
packet.append(self._PACKET_HEAD)
for msgbyte in msgbuf:
if (
msgbyte == self._PACKET_ESC
or msgbyte == self._PACKET_HEAD
or msgbyte == self._PACKET_TAIL
):
packet.append(self._PACKET_ESC)
packet.append(msgbyte)
packet.append(self._PACKET_TAIL)
packet.append(self._PACKET_TAIL)
self.serialPortOrig.write(packet)
self.serialPortOrig.flush()
def flush(self):
del self._rxbuffer[:]
del self._rxmsg[:]
del self._configmsg[:]
while self.serialPortOrig.in_waiting:
self.serialPortOrig.read()
def _recv_internal(self, timeout):
msgbuf = self._readmessage(False, False, timeout)
if msgbuf is not None:
msg = Message(
arbitration_id=msgbuf[0]
+ (msgbuf[1] << 8)
+ (msgbuf[2] << 16)
+ (msgbuf[3] << 24),
is_extended_id=(msgbuf[14] == self._CAN_EXTENDED_FMT),
timestamp=time.time(), # Better than nothing...
is_remote_frame=(msgbuf[15] == self._CAN_REMOTE_FRAME),
dlc=msgbuf[12],
data=msgbuf[4 : 4 + msgbuf[12]],
)
return msg, False
return None, False
def send(self, msg, timeout=None):
if timeout != self.serialPortOrig.write_timeout:
self.serialPortOrig.write_timeout = timeout
self._writemessage(
msg.arbitration_id,
msg.data,
msg.dlc,
0,
self._CAN_EXTENDED_FMT if msg.is_extended_id else self._CAN_STANDARD_FMT,
self._CAN_REMOTE_FRAME if msg.is_remote_frame else self._CAN_DATA_FRAME,
)
def shutdown(self):
super().shutdown()
self.serialPortOrig.close()
def fileno(self):
try:
return self.serialPortOrig.fileno()
except io.UnsupportedOperation:
raise NotImplementedError(
"fileno is not implemented using current CAN bus on this platform"
) from None
except Exception as exception:
raise CanOperationError("Cannot fetch fileno") from exception
def get_serial_number(self, timeout: int | None) -> str | None:
"""Get serial number of the slcan interface.
:param timeout:
seconds to wait for serial number or None to wait indefinitely
:return:
None on timeout or a str object.
"""
sn1 = self._readconfig(self._CAN_READ_SERIAL1, timeout)
if sn1 is None:
return None
sn2 = self._readconfig(self._CAN_READ_SERIAL2, timeout)
if sn2 is None:
return None
serial = ""
for idx in range(0, 8, 2):
serial += f"{sn1[idx]:02X}{sn1[idx + 1]:02X}-"
for idx in range(0, 4, 2):
serial += f"{sn2[idx]:02X}{sn2[idx + 1]:02X}-"
return serial[:-1]