forked from hardbyte/python-can
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgs_usb.py
More file actions
146 lines (119 loc) · 4.7 KB
/
gs_usb.py
File metadata and controls
146 lines (119 loc) · 4.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import logging
from typing import Optional, Tuple
import usb
from gs_usb.constants import CAN_EFF_FLAG, CAN_ERR_FLAG, CAN_MAX_DLC, CAN_RTR_FLAG
from gs_usb.gs_usb import GsUsb
from gs_usb.gs_usb_frame import GS_USB_NONE_ECHO_ID, GsUsbFrame
import can
from ..exceptions import CanInitializationError, CanOperationError
logger = logging.getLogger(__name__)
class GsUsbBus(can.BusABC):
def __init__(
self,
channel,
bitrate,
index=None,
bus=None,
address=None,
can_filters=None,
**kwargs,
):
"""
:param channel: usb device name
:param index: device number if using automatic scan, starting from 0.
If specified, bus/address shall not be provided.
:param bus: number of the bus that the device is connected to
:param address: address of the device on the bus it is connected to
:param can_filters: not supported
:param bitrate: CAN network bandwidth (bits/s)
"""
if (index is not None) and ((bus or address) is not None):
raise CanInitializationError(
f"index and bus/address cannot be used simultaneously"
)
if index is not None:
devs = GsUsb.scan()
if len(devs) <= index:
raise CanInitializationError(
f"Cannot find device {index}. Devices found: {len(devs)}"
)
gs_usb = devs[index]
else:
gs_usb = GsUsb.find(bus=bus, address=address)
if not gs_usb:
raise CanInitializationError(f"Cannot find device {channel}")
self.gs_usb = gs_usb
self.channel_info = channel
self.gs_usb.set_bitrate(bitrate)
self.gs_usb.start()
super().__init__(
channel=channel,
can_filters=can_filters,
protocol=can.CanProtocol.CAN_20,
**kwargs,
)
def send(self, msg: can.Message, timeout: Optional[float] = None):
"""Transmit a message to the CAN bus.
:param Message msg: A message object.
:param timeout: timeout is not supported.
The function won't return until message is sent or exception is raised.
:raises CanOperationError:
if the message could not be sent
"""
can_id = msg.arbitration_id
if msg.is_extended_id:
can_id = can_id | CAN_EFF_FLAG
if msg.is_remote_frame:
can_id = can_id | CAN_RTR_FLAG
if msg.is_error_frame:
can_id = can_id | CAN_ERR_FLAG
# Pad message data
msg.data.extend([0x00] * (CAN_MAX_DLC - len(msg.data)))
frame = GsUsbFrame()
frame.can_id = can_id
frame.can_dlc = msg.dlc
frame.timestamp_us = int(msg.timestamp * 1000000)
frame.data = list(msg.data)
try:
self.gs_usb.send(frame)
except usb.core.USBError:
raise CanOperationError("The message could not be sent")
def _recv_internal(
self, timeout: Optional[float]
) -> Tuple[Optional[can.Message], bool]:
"""
Read a message from the bus and tell whether it was filtered.
This methods may be called by :meth:`~can.BusABC.recv`
to read a message multiple times if the filters set by
:meth:`~can.BusABC.set_filters` do not match and the call has
not yet timed out.
Never raises an error/exception.
:param float timeout: seconds to wait for a message,
see :meth:`~can.BusABC.send`
0 and None will be converted to minimum value 1ms.
:return:
1. a message that was read or None on timeout
2. a bool that is True if message filtering has already
been done and else False. In this interface it is always False
since filtering is not available
"""
frame = GsUsbFrame()
# Do not set timeout as None or zero here to avoid blocking
timeout_ms = round(timeout * 1000) if timeout else 1
if not self.gs_usb.read(frame=frame, timeout_ms=timeout_ms):
return None, False
msg = can.Message(
timestamp=frame.timestamp,
arbitration_id=frame.arbitration_id,
is_extended_id=frame.is_extended_id,
is_remote_frame=frame.is_remote_frame,
is_error_frame=frame.is_error_frame,
channel=self.channel_info,
dlc=frame.can_dlc,
data=bytearray(frame.data)[0 : frame.can_dlc],
is_rx=frame.echo_id == GS_USB_NONE_ECHO_ID,
)
return msg, False
def shutdown(self):
super().shutdown()
self.gs_usb.stop()