forked from hardbyte/python-can
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathneousys.py
More file actions
243 lines (198 loc) · 7.44 KB
/
neousys.py
File metadata and controls
243 lines (198 loc) · 7.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
""" Neousys CAN bus driver """
#
# This kind of interface can be found for example on Neousys POC-551VTC
# One needs to have correct drivers and DLL (Share object for Linux) from Neousys
#
# https://www.neousys-tech.com/en/support-service/resources/category/299-poc-551vtc-driver
#
# Beware this is only tested on Linux kernel higher than v5.3. This should be drop in
# with Windows but you have to replace with correct named DLL
#
# pylint: disable=too-few-public-methods
# pylint: disable=too-many-instance-attributes
# pylint: disable=wrong-import-position
import logging
import platform
import queue
from ctypes import (
CFUNCTYPE,
POINTER,
Structure,
byref,
c_ubyte,
c_uint,
c_ushort,
sizeof,
)
from time import time
try:
from ctypes import WinDLL
except ImportError:
from ctypes import CDLL
from can import (
BusABC,
CanInitializationError,
CanInterfaceNotImplementedError,
CanOperationError,
CanProtocol,
Message,
)
logger = logging.getLogger(__name__)
class NeousysCanSetup(Structure):
"""C CAN Setup struct"""
_fields_ = [
("bitRate", c_uint),
("recvConfig", c_uint),
("recvId", c_uint),
("recvMask", c_uint),
]
class NeousysCanMsg(Structure):
"""C CAN Message struct"""
_fields_ = [
("id", c_uint),
("flags", c_ushort),
("extra", c_ubyte),
("len", c_ubyte),
("data", c_ubyte * 8),
]
# valid:2~16, sum of the Synchronization, Propagation, and
# Phase Buffer 1 segments, measured in time quanta.
# valid:1~8, the Phase Buffer 2 segment in time quanta.
# valid:1~4, Resynchronization Jump Width in time quanta
# valid:1~1023, CAN_CLK divider used to determine time quanta
class NeousysCanBitClk(Structure):
"""C CAN BIT Clock struct"""
_fields_ = [
("syncPropPhase1Seg", c_ushort),
("phase2Seg", c_ushort),
("jumpWidth", c_ushort),
("quantumPrescaler", c_ushort),
]
NEOUSYS_CAN_MSG_CALLBACK = CFUNCTYPE(None, POINTER(NeousysCanMsg), c_uint)
NEOUSYS_CAN_STATUS_CALLBACK = CFUNCTYPE(None, c_uint)
NEOUSYS_CAN_MSG_EXTENDED_ID = 0x0004
NEOUSYS_CAN_MSG_REMOTE_FRAME = 0x0040
NEOUSYS_CAN_MSG_DATA_NEW = 0x0080
NEOUSYS_CAN_MSG_DATA_LOST = 0x0100
NEOUSYS_CAN_MSG_USE_ID_FILTER = 0x00000008
NEOUSYS_CAN_MSG_USE_DIR_FILTER = (
0x00000010 | NEOUSYS_CAN_MSG_USE_ID_FILTER
) # only accept the direction specified in the message type
NEOUSYS_CAN_MSG_USE_EXT_FILTER = (
0x00000020 | NEOUSYS_CAN_MSG_USE_ID_FILTER
) # filters on only extended identifiers
NEOUSYS_CAN_STATUS_BUS_OFF = 0x00000080
NEOUSYS_CAN_STATUS_EWARN = (
0x00000040 # can controller error level has reached warning level.
)
NEOUSYS_CAN_STATUS_EPASS = (
0x00000020 # can controller error level has reached error passive level.
)
NEOUSYS_CAN_STATUS_LEC_STUFF = 0x00000001 # a bit stuffing error has occurred.
NEOUSYS_CAN_STATUS_LEC_FORM = 0x00000002 # a formatting error has occurred.
NEOUSYS_CAN_STATUS_LEC_ACK = 0x00000003 # an acknowledge error has occurred.
NEOUSYS_CAN_STATUS_LEC_BIT1 = (
0x00000004 # the bus remained a bit level of 1 for longer than is allowed.
)
NEOUSYS_CAN_STATUS_LEC_BIT0 = (
0x00000005 # the bus remained a bit level of 0 for longer than is allowed.
)
NEOUSYS_CAN_STATUS_LEC_CRC = 0x00000006 # a crc error has occurred.
NEOUSYS_CAN_STATUS_LEC_MASK = (
0x00000007 # this is the mask for the can last error code (lec).
)
NEOUSYS_CANLIB = None
try:
if platform.system() == "Windows":
NEOUSYS_CANLIB = WinDLL("./WDT_DIO.dll")
else:
NEOUSYS_CANLIB = CDLL("libwdt_dio.so")
logger.info("Loaded Neousys WDT_DIO Can driver")
except OSError as error:
logger.info("Cannot load Neousys CAN bus dll or shared object: %s", error)
class NeousysBus(BusABC):
"""Neousys CAN bus Class"""
def __init__(self, channel, device=0, bitrate=500000, **kwargs):
"""
:param channel: channel number
:param device: device number
:param bitrate: bit rate.
"""
super().__init__(channel, **kwargs)
if NEOUSYS_CANLIB is None:
raise CanInterfaceNotImplementedError("Neousys WDT_DIO Can driver missing")
self.channel = channel
self.device = device
self.channel_info = f"Neousys Can: device {self.device}, channel {self.channel}"
self._can_protocol = CanProtocol.CAN_20
self.queue = queue.Queue()
# Init with accept all and wanted bitrate
self.init_config = NeousysCanSetup(bitrate, NEOUSYS_CAN_MSG_USE_ID_FILTER, 0, 0)
self._neousys_recv_cb = NEOUSYS_CAN_MSG_CALLBACK(self._neousys_recv_cb)
self._neousys_status_cb = NEOUSYS_CAN_STATUS_CALLBACK(self._neousys_status_cb)
if NEOUSYS_CANLIB.CAN_RegisterReceived(0, self._neousys_recv_cb) == 0:
raise CanInitializationError("Neousys CAN bus Setup receive callback")
if NEOUSYS_CANLIB.CAN_RegisterStatus(0, self._neousys_status_cb) == 0:
raise CanInitializationError("Neousys CAN bus Setup status callback")
if (
NEOUSYS_CANLIB.CAN_Setup(
channel, byref(self.init_config), sizeof(self.init_config)
)
== 0
):
raise CanInitializationError("Neousys CAN bus Setup Error")
if NEOUSYS_CANLIB.CAN_Start(channel) == 0:
raise CanInitializationError("Neousys CAN bus Start Error")
def send(self, msg, timeout=None) -> None:
"""
:param msg: message to send
:param timeout: timeout is not used here
"""
tx_msg = NeousysCanMsg(
msg.arbitration_id, 0, 0, msg.dlc, (c_ubyte * 8)(*msg.data)
)
if NEOUSYS_CANLIB.CAN_Send(self.channel, byref(tx_msg), sizeof(tx_msg)) == 0:
raise CanOperationError("Neousys Can can't send message")
def _recv_internal(self, timeout):
try:
return self.queue.get(block=True, timeout=timeout), False
except queue.Empty:
return None, False
def _neousys_recv_cb(self, msg, sizeof_msg) -> None:
"""
:param msg: struct CAN_MSG
:param sizeof_msg: message number
"""
msg_bytes = bytearray(msg.contents.data)
remote_frame = bool(msg.contents.flags & NEOUSYS_CAN_MSG_REMOTE_FRAME)
extended_frame = bool(msg.contents.flags & NEOUSYS_CAN_MSG_EXTENDED_ID)
if msg.contents.flags & NEOUSYS_CAN_MSG_DATA_LOST:
logger.error("_neousys_recv_cb flag CAN_MSG_DATA_LOST")
msg = Message(
timestamp=time(),
arbitration_id=msg.contents.id,
is_remote_frame=remote_frame,
is_extended_id=extended_frame,
channel=self.channel,
dlc=msg.contents.len,
data=msg_bytes[: msg.contents.len],
)
# Reading happens in Callback function and
# with Python-CAN it happens polling
# so cache stuff in array to for poll
try:
self.queue.put(msg)
except queue.Full:
raise CanOperationError("Neousys message Queue is full") from None
def _neousys_status_cb(self, status) -> None:
"""
:param status: BUS Status
"""
logger.info("%s _neousys_status_cb: %d", self.init_config, status)
def shutdown(self):
super().shutdown()
NEOUSYS_CANLIB.CAN_Stop(self.channel)
@staticmethod
def _detect_available_configs():
# There is only one channel
return [{"interface": "neousys", "channel": 0}]