Skip to content

Commit 75fe619

Browse files
committed
combine _scan_gs_usb_devices() and _find_gs_usb_device() (@zariiii9003)
1 parent ea01de5 commit 75fe619

2 files changed

Lines changed: 65 additions & 79 deletions

File tree

can/interfaces/gs_usb.py

Lines changed: 50 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import logging
2+
from typing import Any
23

34
import usb
45
from gs_usb.constants import CAN_EFF_FLAG, CAN_ERR_FLAG, CAN_MAX_DLC, CAN_RTR_FLAG
@@ -12,53 +13,49 @@
1213
logger = logging.getLogger(__name__)
1314

1415

15-
def _scan_gs_usb_devices() -> list[GsUsb]:
16-
"""Scan for gs_usb devices using auto-detected backend.
16+
def _find_gs_usb_devices(
17+
bus: int | None = None, address: int | None = None
18+
) -> list[GsUsb]:
19+
"""Find gs_usb devices using auto-detected backend.
1720
1821
Unlike :meth:`GsUsb.scan`, this does not force the ``libusb1`` backend,
1922
allowing ``pyusb`` to auto-detect the best available backend. This enables
2023
support for WinUSB on Windows in addition to libusbK.
24+
25+
:param bus: number of the bus that the device is connected to
26+
:param address: address of the device on the bus it is connected to
27+
:return: a list of found GsUsb devices
2128
"""
29+
kwargs = {}
30+
if bus is not None:
31+
kwargs["bus"] = bus
32+
if address is not None:
33+
kwargs["address"] = address
34+
2235
return [
2336
GsUsb(dev)
2437
for dev in (
2538
usb.core.find(
2639
find_all=True,
2740
custom_match=GsUsb.is_gs_usb_device,
41+
**kwargs,
2842
)
2943
or []
3044
)
3145
]
3246

3347

34-
def _find_gs_usb_device(bus: int, address: int) -> GsUsb | None:
35-
"""Find a specific gs_usb device using auto-detected backend.
36-
37-
Unlike :meth:`GsUsb.find`, this does not force the ``libusb1`` backend,
38-
allowing ``pyusb`` to auto-detect the best available backend. This enables
39-
support for WinUSB on Windows in addition to libusbK.
40-
"""
41-
dev = usb.core.find(
42-
custom_match=GsUsb.is_gs_usb_device,
43-
bus=bus,
44-
address=address,
45-
)
46-
if dev:
47-
return GsUsb(dev)
48-
return None
49-
50-
5148
class GsUsbBus(can.BusABC):
5249
def __init__(
5350
self,
54-
channel,
51+
channel: can.typechecking.Channel,
5552
bitrate: int = 500_000,
56-
index=None,
57-
bus=None,
58-
address=None,
59-
can_filters=None,
60-
**kwargs,
61-
):
53+
index: int | None = None,
54+
bus: int | None = None,
55+
address: int | None = None,
56+
can_filters: can.typechecking.CanFilters | None = None,
57+
**kwargs: Any,
58+
) -> None:
6259
"""
6360
:param channel: usb device name
6461
:param index: device number if using automatic scan, starting from 0.
@@ -74,24 +71,35 @@ def __init__(
7471
)
7572

7673
if index is None and address is None and bus is None:
77-
index = channel
74+
_index: Any = channel
75+
else:
76+
_index = index
7877

79-
self._index = None
80-
if index is not None:
81-
devs = _scan_gs_usb_devices()
82-
if len(devs) <= index:
78+
self._index: int | None = None
79+
if _index is not None:
80+
if not isinstance(_index, int):
81+
try:
82+
_index = int(_index)
83+
except (ValueError, TypeError):
84+
raise CanInitializationError(
85+
f"index must be an integer, but got {type(_index).__name__} ({_index})"
86+
) from None
87+
88+
devs = _find_gs_usb_devices()
89+
if len(devs) <= _index:
8390
raise CanInitializationError(
84-
f"Cannot find device {index}. Devices found: {len(devs)}"
91+
f"Cannot find device {_index}. Devices found: {len(devs)}"
8592
)
86-
gs_usb = devs[index]
87-
self._index = index
93+
gs_usb = devs[_index]
94+
self._index = _index
8895
else:
89-
gs_usb = _find_gs_usb_device(bus=bus, address=address)
90-
if not gs_usb:
96+
devs = _find_gs_usb_devices(bus=bus, address=address)
97+
if not devs:
9198
raise CanInitializationError(f"Cannot find device {channel}")
99+
gs_usb = devs[0]
92100

93101
self.gs_usb = gs_usb
94-
self.channel_info = channel
102+
self.channel_info = str(channel)
95103
self._can_protocol = can.CanProtocol.CAN_20
96104

97105
bit_timing = can.BitTiming.from_sample_point(
@@ -116,7 +124,7 @@ def __init__(
116124
**kwargs,
117125
)
118126

119-
def send(self, msg: can.Message, timeout: float | None = None):
127+
def send(self, msg: can.Message, timeout: float | None = None) -> None:
120128
"""Transmit a message to the CAN bus.
121129
122130
:param Message msg: A message object.
@@ -200,9 +208,10 @@ def shutdown(self):
200208

201209
self.gs_usb.stop()
202210
if self._index is not None:
203-
# Avoid errors on subsequent __init() by repeating the .scan() and .start() that would otherwise fail
204-
# the next time the device is opened in __init__()
205-
devs = _scan_gs_usb_devices()
211+
# Avoid errors on subsequent __init() by repeating the .scan() and
212+
# .start() that would otherwise fail the next time the device is
213+
# opened in __init__()
214+
devs = _find_gs_usb_devices()
206215
if self._index < len(devs):
207216
gs_usb = devs[self._index]
208217
try:

test/test_interface_gs_usb.py

Lines changed: 15 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,17 @@
66

77
from can.interfaces.gs_usb import (
88
GsUsbBus,
9-
_find_gs_usb_device,
10-
_scan_gs_usb_devices,
9+
_find_gs_usb_devices,
1110
)
1211

1312

1413
@patch("can.interfaces.gs_usb.usb.core.find")
15-
def test_scan_does_not_force_backend(mock_find):
16-
"""Verify that _scan_gs_usb_devices does not pass a backend argument,
14+
def test_find_devices_does_not_force_backend(mock_find):
15+
"""Verify that _find_gs_usb_devices does not pass a backend argument,
1716
allowing pyusb to auto-detect the best available backend (WinUSB, libusbK, etc.)."""
1817
mock_find.return_value = []
1918

20-
_scan_gs_usb_devices()
19+
_find_gs_usb_devices()
2120

2221
mock_find.assert_called_once()
2322
call_kwargs = mock_find.call_args[1]
@@ -28,12 +27,11 @@ def test_scan_does_not_force_backend(mock_find):
2827

2928

3029
@patch("can.interfaces.gs_usb.usb.core.find")
31-
def test_find_does_not_force_backend(mock_find):
32-
"""Verify that _find_gs_usb_device does not pass a backend argument,
33-
allowing pyusb to auto-detect the best available backend (WinUSB, libusbK, etc.)."""
34-
mock_find.return_value = None
30+
def test_find_devices_with_args_does_not_force_backend(mock_find):
31+
"""Verify that _find_gs_usb_devices with bus/address does not pass a backend argument."""
32+
mock_find.return_value = []
3533

36-
_find_gs_usb_device(bus=1, address=2)
34+
_find_gs_usb_devices(bus=1, address=2)
3735

3836
mock_find.assert_called_once()
3937
call_kwargs = mock_find.call_args[1]
@@ -42,49 +40,28 @@ def test_find_does_not_force_backend(mock_find):
4240
), "backend should not be specified so pyusb can auto-detect"
4341
assert call_kwargs["bus"] == 1
4442
assert call_kwargs["address"] == 2
43+
assert call_kwargs["find_all"] is True
4544

4645

4746
@patch("can.interfaces.gs_usb.usb.core.find")
48-
def test_scan_returns_gs_usb_devices(mock_find):
49-
"""Verify that _scan_gs_usb_devices wraps found USB devices in GsUsb objects."""
47+
def test_find_devices_returns_gs_usb_devices(mock_find):
48+
"""Verify that _find_gs_usb_devices wraps found USB devices in GsUsb objects."""
5049
mock_dev1 = MagicMock()
5150
mock_dev2 = MagicMock()
5251
mock_find.return_value = [mock_dev1, mock_dev2]
5352

54-
devices = _scan_gs_usb_devices()
53+
devices = _find_gs_usb_devices()
5554

5655
assert len(devices) == 2
5756
assert devices[0].gs_usb is mock_dev1
5857
assert devices[1].gs_usb is mock_dev2
5958

6059

6160
@patch("can.interfaces.gs_usb.usb.core.find")
62-
def test_find_returns_gs_usb_device(mock_find):
63-
"""Verify that _find_gs_usb_device wraps the found USB device in a GsUsb object."""
64-
mock_dev = MagicMock()
65-
mock_find.return_value = mock_dev
66-
67-
device = _find_gs_usb_device(bus=1, address=2)
68-
69-
assert device is not None
70-
assert device.gs_usb is mock_dev
71-
72-
73-
@patch("can.interfaces.gs_usb.usb.core.find")
74-
def test_find_returns_none_when_no_device(mock_find):
75-
"""Verify that _find_gs_usb_device returns None when no device is found."""
76-
mock_find.return_value = None
77-
78-
device = _find_gs_usb_device(bus=1, address=2)
79-
80-
assert device is None
81-
82-
83-
@patch("can.interfaces.gs_usb.usb.core.find")
84-
def test_scan_returns_empty_list_when_no_devices(mock_find):
85-
"""Verify that _scan_gs_usb_devices returns an empty list when no devices are found."""
61+
def test_find_devices_returns_empty_list_when_no_devices(mock_find):
62+
"""Verify that _find_gs_usb_devices returns an empty list when no devices are found."""
8663
mock_find.return_value = []
8764

88-
devices = _scan_gs_usb_devices()
65+
devices = _find_gs_usb_devices()
8966

9067
assert devices == []

0 commit comments

Comments
 (0)