1- from candle_api .candle_bus import CandleBus as CandleBusBase
1+ from typing import Optional , Tuple
2+ import can
3+ import candle_api as api
24
35
4- class CandleBus (CandleBusBase ):
5- pass
6+ ISO_DLC = (0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 12 , 16 , 20 , 24 , 32 , 48 , 64 )
67
78
8- __all__ = ['CandleBus' ]
9+ class CandleBus (can .bus .BusABC ):
10+ def __init__ (self , channel : int , can_filters : Optional [can .typechecking .CanFilters ] = None ,
11+ bitrate : int = 1000000 , sample_point : float = 87.5 ,
12+ data_bitrate : int = 5000000 , data_sample_point : float = 87.5 ,
13+ fd : bool = False , loop_back : bool = False , listen_only : bool = False ,
14+ triple_sample : bool = False , one_shot : bool = False , bit_error_reporting : bool = False ,
15+ termination : Optional [bool ] = None , vid : Optional [int ] = None , pid : Optional [int ] = None ,
16+ manufacture : Optional [str ] = None , product : Optional [str ] = None ,
17+ serial_number : Optional [str ] = None , ** kwargs ) -> None :
18+
19+ for dev in api .list_device ():
20+ if vid is not None and dev .vendor_id != vid :
21+ continue
22+ if pid is not None and dev .product_id != pid :
23+ continue
24+ if manufacture is not None and dev .manufacturer != manufacture :
25+ continue
26+ if product is not None and dev .product != product :
27+ continue
28+ if serial_number is not None and dev .serial_number != serial_number :
29+ continue
30+
31+ self ._device = dev
32+ break
33+ else :
34+ raise can .exceptions .CanInitializationError ('Device not found!' )
35+
36+ # Open the device.
37+ self ._device .open ()
38+
39+ # Get the channel.
40+ self ._channel_number = channel
41+ self ._channel = self ._device [channel ]
42+ self .channel_info = f'[{ self ._device .serial_number } ]: channel { channel } '
43+
44+ # Reset channel.
45+ self ._channel .reset ()
46+
47+ # Set termination.
48+ if termination is not None :
49+ self ._channel .set_termination (termination )
50+
51+ # Set bit timing.
52+ props_seg = 1
53+ if self ._channel .feature .fd :
54+ bit_timing_fd = can .BitTimingFd .from_sample_point (
55+ f_clock = self ._channel .clock_frequency ,
56+ nom_bitrate = bitrate ,
57+ nom_sample_point = sample_point ,
58+ data_bitrate = data_bitrate ,
59+ data_sample_point = data_sample_point
60+ )
61+
62+ self ._channel .set_bit_timing (
63+ props_seg ,
64+ bit_timing_fd .nom_tseg1 - props_seg ,
65+ bit_timing_fd .nom_tseg2 ,
66+ bit_timing_fd .nom_sjw ,
67+ bit_timing_fd .nom_brp
68+ )
69+
70+ self ._channel .set_data_bit_timing (
71+ props_seg ,
72+ bit_timing_fd .data_tseg1 - props_seg ,
73+ bit_timing_fd .data_tseg2 ,
74+ bit_timing_fd .data_sjw ,
75+ bit_timing_fd .data_brp
76+ )
77+ else :
78+ bit_timing = can .BitTiming .from_sample_point (
79+ f_clock = self ._channel .clock_frequency ,
80+ bitrate = bitrate ,
81+ sample_point = sample_point ,
82+ )
83+
84+ self ._channel .set_bit_timing (
85+ props_seg ,
86+ bit_timing .tseg1 - props_seg ,
87+ bit_timing .tseg2 ,
88+ bit_timing .sjw ,
89+ bit_timing .brp
90+ )
91+
92+ # Open the channel.
93+ self ._channel .start (
94+ hardware_timestamp = self ._channel .feature .hardware_timestamp ,
95+ fd = fd ,
96+ loop_back = loop_back ,
97+ listen_only = listen_only ,
98+ triple_sample = triple_sample ,
99+ one_shot = one_shot ,
100+ bit_error_reporting = bit_error_reporting
101+ )
102+
103+ super ().__init__ (
104+ channel = channel ,
105+ can_filters = can_filters ,
106+ ** kwargs ,
107+ )
108+
109+ def _recv_internal (
110+ self , timeout : Optional [float ]
111+ ) -> Tuple [Optional [can .Message ], bool ]:
112+ if timeout is None :
113+ frame = self ._channel .receive_nowait ()
114+ else :
115+ try :
116+ frame = self ._channel .receive (timeout )
117+ except TimeoutError :
118+ frame = None
119+
120+ if frame is not None :
121+ msg = can .Message (
122+ timestamp = frame .timestamp ,
123+ arbitration_id = frame .can_id ,
124+ is_extended_id = frame .frame_type .extended_id ,
125+ is_remote_frame = frame .frame_type .remote_frame ,
126+ is_error_frame = frame .frame_type .error_frame ,
127+ channel = self ._channel_number ,
128+ dlc = frame .size , # https://github.com/hardbyte/python-can/issues/749
129+ data = bytearray (frame ),
130+ is_fd = frame .frame_type .fd ,
131+ is_rx = frame .frame_type .rx ,
132+ bitrate_switch = frame .frame_type .bitrate_switch ,
133+ error_state_indicator = frame .frame_type .error_state_indicator
134+ )
135+ return msg , False
136+ return None , False
137+
138+ def send (self , msg : can .Message , timeout : Optional [float ] = None ) -> None :
139+ if timeout is None :
140+ timeout = 1.0
141+
142+ frame = api .CandleCanFrame (
143+ api .CandleFrameType (
144+ rx = msg .is_rx ,
145+ extended_id = msg .is_extended_id ,
146+ remote_frame = msg .is_remote_frame ,
147+ error_frame = msg .is_error_frame ,
148+ fd = msg .is_fd ,
149+ bitrate_switch = msg .bitrate_switch ,
150+ error_state_indicator = msg .error_state_indicator
151+ ),
152+ msg .arbitration_id ,
153+ ISO_DLC .index (msg .dlc ),
154+ msg .data
155+ )
156+
157+ try :
158+ self ._channel .send (frame , timeout )
159+ except TimeoutError as exc :
160+ raise can .CanOperationError ("The message could not be sent" ) from exc
161+
162+ def shutdown (self ):
163+ self ._channel .reset ()
164+ super ().shutdown ()
165+
166+
167+ __all__ = ['CandleBus' ]
0 commit comments