1- from typing import Optional , Tuple
1+ from typing import Optional , Tuple , List , Union
22import can
33import candle_api as api
44
5-
65ISO_DLC = (0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 12 , 16 , 20 , 24 , 32 , 48 , 64 )
76
87
98class CandleBus (can .bus .BusABC ):
10- def __init__ (self , channel : int , can_filters : Optional [can .typechecking .CanFilters ] = None ,
9+ def __init__ (self , channel : Union [ int , str ] , can_filters : Optional [can .typechecking .CanFilters ] = None ,
1110 bitrate : int = 1000000 , sample_point : float = 87.5 ,
1211 data_bitrate : int = 5000000 , data_sample_point : float = 87.5 ,
1312 fd : bool = False , loop_back : bool = False , listen_only : bool = False ,
@@ -16,30 +15,24 @@ def __init__(self, channel: int, can_filters: Optional[can.typechecking.CanFilte
1615 manufacture : Optional [str ] = None , product : Optional [str ] = None ,
1716 serial_number : Optional [str ] = None , ** kwargs ) -> None :
1817
19- for dev in api .list_device ():
20- if vid is not None and dev .vendor_id != vid :
21- continue
22- if pid is not None and dev .product_id != pid :
23- continue
24- if manufacture is not None and dev .manufacturer != manufacture :
25- continue
26- if product is not None and dev .product != product :
27- continue
28- if serial_number is not None and dev .serial_number != serial_number :
29- continue
30-
31- self ._device = dev
32- break
18+ # Parse channel.
19+ if isinstance (channel , str ):
20+ serial_number , channel_number = channel .split (':' )
21+ self ._channel_number = int (channel_number )
22+ elif isinstance (channel , int ):
23+ self ._channel_number = channel
3324 else :
34- raise can .exceptions .CanInitializationError ('Device not found!' )
25+ raise TypeError ("channel must be of type str or int" )
26+
27+ # Find the device.
28+ self ._device = self ._find_device (vid , pid , manufacture , product , serial_number )
3529
3630 # Open the device.
3731 self ._device .open ()
3832
3933 # Get the channel.
40- self ._channel_number = channel
41- self ._channel = self ._device [channel ]
42- self .channel_info = f'[{ self ._device .serial_number } ]: channel { channel } '
34+ self ._channel = self ._device [self ._channel_number ]
35+ self .channel_info = f'{ self ._device .serial_number } :{ self ._channel_number } '
4336
4437 # Reset channel.
4538 self ._channel .reset ()
@@ -106,16 +99,35 @@ def __init__(self, channel: int, can_filters: Optional[can.typechecking.CanFilte
10699 ** kwargs ,
107100 )
108101
102+ @staticmethod
103+ def _find_device (vid : Optional [int ] = None , pid : Optional [int ] = None , manufacture : Optional [str ] = None ,
104+ product : Optional [str ] = None , serial_number : Optional [str ] = None ) -> api .CandleDevice :
105+ for dev in api .list_device ():
106+ if vid is not None and dev .vendor_id != vid :
107+ continue
108+ if pid is not None and dev .product_id != pid :
109+ continue
110+ if manufacture is not None and dev .manufacturer != manufacture :
111+ continue
112+ if product is not None and dev .product != product :
113+ continue
114+ if serial_number is not None and dev .serial_number != serial_number :
115+ continue
116+ return dev
117+ else :
118+ raise can .exceptions .CanInitializationError ('Device not found!' )
119+
109120 def _recv_internal (
110- self , timeout : Optional [float ]
121+ self , timeout : Optional [float ]
111122 ) -> Tuple [Optional [can .Message ], bool ]:
123+ frame : Optional [api .CandleCanFrame ] = None
112124 if timeout is None :
113125 frame = self ._channel .receive_nowait ()
114126 else :
115127 try :
116128 frame = self ._channel .receive (timeout )
117129 except TimeoutError :
118- frame = None
130+ pass
119131
120132 if frame is not None :
121133 msg = can .Message (
@@ -125,7 +137,7 @@ def _recv_internal(
125137 is_remote_frame = frame .frame_type .remote_frame ,
126138 is_error_frame = frame .frame_type .error_frame ,
127139 channel = self ._channel_number ,
128- dlc = frame .size , # https://github.com/hardbyte/python-can/issues/749
140+ dlc = frame .size , # https://github.com/hardbyte/python-can/issues/749
129141 data = bytearray (frame ),
130142 is_fd = frame .frame_type .fd ,
131143 is_rx = frame .frame_type .rx ,
@@ -163,5 +175,12 @@ def shutdown(self):
163175 self ._channel .reset ()
164176 super ().shutdown ()
165177
178+ @staticmethod
179+ def _detect_available_configs () -> List [can .typechecking .AutoDetectedConfig ]:
180+ return [{
181+ 'interface' : 'candle' ,
182+ 'channel' : f'{ d .serial_number } :{ i } '
183+ } for d in api .list_device () for i in range (len (d ))]
184+
166185
167186__all__ = ['CandleBus' ]
0 commit comments