Skip to content

Commit a4f4742

Browse files
authored
Minor Typing Improvements (#1962)
1 parent afb1204 commit a4f4742

22 files changed

Lines changed: 150 additions & 113 deletions

can/bit_timing.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from can.typechecking import BitTimingDict, BitTimingFdDict
88

99

10-
class BitTiming(Mapping):
10+
class BitTiming(Mapping[str, int]):
1111
"""Representation of a bit timing configuration for a CAN 2.0 bus.
1212
1313
The class can be constructed in multiple ways, depending on the information
@@ -477,7 +477,7 @@ def __hash__(self) -> int:
477477
return tuple(self._data.values()).__hash__()
478478

479479

480-
class BitTimingFd(Mapping):
480+
class BitTimingFd(Mapping[str, int]):
481481
"""Representation of a bit timing configuration for a CAN FD bus.
482482
483483
The class can be constructed in multiple ways, depending on the information

can/bus.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import contextlib
66
import logging
77
import threading
8-
from abc import ABC, ABCMeta, abstractmethod
8+
from abc import ABC, abstractmethod
99
from collections.abc import Iterator, Sequence
1010
from enum import Enum, auto
1111
from time import time
@@ -19,7 +19,6 @@
1919

2020
from typing_extensions import Self
2121

22-
import can
2322
import can.typechecking
2423
from can.broadcastmanager import CyclicSendTaskABC, ThreadBasedCyclicSendTask
2524
from can.message import Message
@@ -44,7 +43,7 @@ class CanProtocol(Enum):
4443
CAN_XL = auto()
4544

4645

47-
class BusABC(metaclass=ABCMeta):
46+
class BusABC(ABC):
4847
"""The CAN Bus Abstract Base Class that serves as the basis
4948
for all concrete interfaces.
5049
@@ -68,7 +67,7 @@ class BusABC(metaclass=ABCMeta):
6867
@abstractmethod
6968
def __init__(
7069
self,
71-
channel: Optional[can.typechecking.Channel],
70+
channel: can.typechecking.Channel,
7271
can_filters: Optional[can.typechecking.CanFilters] = None,
7372
**kwargs: object,
7473
):
@@ -447,7 +446,6 @@ def _matches_filters(self, msg: Message) -> bool:
447446
for _filter in self._filters:
448447
# check if this filter even applies to the message
449448
if "extended" in _filter:
450-
_filter = cast("can.typechecking.CanFilterExtended", _filter)
451449
if _filter["extended"] != msg.is_extended_id:
452450
continue
453451

@@ -524,7 +522,7 @@ def protocol(self) -> CanProtocol:
524522
return self._can_protocol
525523

526524
@staticmethod
527-
def _detect_available_configs() -> list[can.typechecking.AutoDetectedConfig]:
525+
def _detect_available_configs() -> Sequence[can.typechecking.AutoDetectedConfig]:
528526
"""Detect all configurations/channels that this interface could
529527
currently connect with.
530528

can/interface.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import concurrent.futures.thread
88
import importlib
99
import logging
10-
from collections.abc import Callable, Iterable
10+
from collections.abc import Callable, Iterable, Sequence
1111
from typing import Any, Optional, Union, cast
1212

1313
from . import util
@@ -142,7 +142,7 @@ def Bus( # noqa: N802
142142
def detect_available_configs(
143143
interfaces: Union[None, str, Iterable[str]] = None,
144144
timeout: float = 5.0,
145-
) -> list[AutoDetectedConfig]:
145+
) -> Sequence[AutoDetectedConfig]:
146146
"""Detect all configurations/channels that the interfaces could
147147
currently connect with.
148148
@@ -175,7 +175,7 @@ def detect_available_configs(
175175
# otherwise assume iterable of strings
176176

177177
# Collect detection callbacks
178-
callbacks: dict[str, Callable[[], list[AutoDetectedConfig]]] = {}
178+
callbacks: dict[str, Callable[[], Sequence[AutoDetectedConfig]]] = {}
179179
for interface_keyword in interfaces:
180180
try:
181181
bus_class = _get_class_for_interface(interface_keyword)

can/interfaces/cantact.py

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import logging
66
import time
7+
from collections.abc import Sequence
78
from typing import Any, Optional, Union
89
from unittest.mock import Mock
910

@@ -14,6 +15,7 @@
1415
CanInterfaceNotImplementedError,
1516
error_check,
1617
)
18+
from ..typechecking import AutoDetectedConfig
1719
from ..util import check_or_adjust_timing_clock, deprecated_args_alias
1820

1921
logger = logging.getLogger(__name__)
@@ -31,7 +33,7 @@ class CantactBus(BusABC):
3133
"""CANtact interface"""
3234

3335
@staticmethod
34-
def _detect_available_configs():
36+
def _detect_available_configs() -> Sequence[AutoDetectedConfig]:
3537
try:
3638
interface = cantact.Interface()
3739
except (NameError, SystemError, AttributeError):
@@ -40,7 +42,7 @@ def _detect_available_configs():
4042
)
4143
return []
4244

43-
channels = []
45+
channels: list[AutoDetectedConfig] = []
4446
for i in range(0, interface.channel_count()):
4547
channels.append({"interface": "cantact", "channel": f"ch:{i}"})
4648
return channels
@@ -121,7 +123,14 @@ def __init__(
121123
**kwargs,
122124
)
123125

124-
def _recv_internal(self, timeout):
126+
def _recv_internal(
127+
self, timeout: Optional[float]
128+
) -> tuple[Optional[Message], bool]:
129+
if timeout is None:
130+
raise TypeError(
131+
f"{self.__class__.__name__} expects a numeric `timeout` value."
132+
)
133+
125134
with error_check("Cannot receive message"):
126135
frame = self.interface.recv(int(timeout * 1000))
127136
if frame is None:
@@ -140,7 +149,7 @@ def _recv_internal(self, timeout):
140149
)
141150
return msg, False
142151

143-
def send(self, msg, timeout=None):
152+
def send(self, msg: Message, timeout: Optional[float] = None) -> None:
144153
with error_check("Cannot send message"):
145154
self.interface.send(
146155
self.channel,
@@ -151,13 +160,13 @@ def send(self, msg, timeout=None):
151160
msg.data,
152161
)
153162

154-
def shutdown(self):
163+
def shutdown(self) -> None:
155164
super().shutdown()
156165
with error_check("Cannot shutdown interface"):
157166
self.interface.stop()
158167

159168

160-
def mock_recv(timeout):
169+
def mock_recv(timeout: int) -> Optional[dict[str, Any]]:
161170
if timeout > 0:
162171
return {
163172
"id": 0x123,

can/interfaces/ixxat/canlib.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
CyclicSendTaskABC,
1010
Message,
1111
)
12-
from can.typechecking import AutoDetectedConfig
1312

1413

1514
class IXXATBus(BusABC):
@@ -175,5 +174,5 @@ def state(self) -> BusState:
175174
return self.bus.state
176175

177176
@staticmethod
178-
def _detect_available_configs() -> list[AutoDetectedConfig]:
177+
def _detect_available_configs() -> Sequence[vcinpl.AutoDetectedIxxatConfig]:
179178
return vcinpl._detect_available_configs()

can/interfaces/ixxat/canlib_vcinpl.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -976,8 +976,8 @@ def get_ixxat_hwids():
976976
return hwids
977977

978978

979-
def _detect_available_configs() -> list[AutoDetectedConfig]:
980-
config_list = [] # list in wich to store the resulting bus kwargs
979+
def _detect_available_configs() -> Sequence["AutoDetectedIxxatConfig"]:
980+
config_list = [] # list in which to store the resulting bus kwargs
981981

982982
# used to detect HWID
983983
device_handle = HANDLE()
@@ -1026,3 +1026,7 @@ def _detect_available_configs() -> list[AutoDetectedConfig]:
10261026
pass # _canlib is None in the CI tests -> return a blank list
10271027

10281028
return config_list
1029+
1030+
1031+
class AutoDetectedIxxatConfig(AutoDetectedConfig):
1032+
unique_hardware_id: int

can/interfaces/serial/serial_can.py

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@
1010
import io
1111
import logging
1212
import struct
13-
from typing import Any, Optional
13+
from collections.abc import Sequence
14+
from typing import Any, Optional, cast
1415

1516
from can import (
1617
BusABC,
@@ -26,21 +27,14 @@
2627
logger = logging.getLogger("can.serial")
2728

2829
try:
29-
import serial
30+
import serial.tools.list_ports
3031
except ImportError:
3132
logger.warning(
3233
"You won't be able to use the serial can backend without "
33-
"the serial module installed!"
34+
"the `pyserial` package installed!"
3435
)
3536
serial = None
3637

37-
try:
38-
from serial.tools.list_ports import comports as list_comports
39-
except ImportError:
40-
# If unavailable on some platform, just return nothing
41-
def list_comports() -> list[Any]:
42-
return []
43-
4438

4539
CAN_ERR_FLAG = 0x20000000
4640
CAN_RTR_FLAG = 0x40000000
@@ -63,8 +57,7 @@ def __init__(
6357
baudrate: int = 115200,
6458
timeout: float = 0.1,
6559
rtscts: bool = False,
66-
*args,
67-
**kwargs,
60+
**kwargs: Any,
6861
) -> None:
6962
"""
7063
:param channel:
@@ -107,7 +100,7 @@ def __init__(
107100
"could not create the serial device"
108101
) from error
109102

110-
super().__init__(channel, *args, **kwargs)
103+
super().__init__(channel, **kwargs)
111104

112105
def shutdown(self) -> None:
113106
"""
@@ -232,7 +225,7 @@ def _recv_internal(
232225

233226
def fileno(self) -> int:
234227
try:
235-
return self._ser.fileno()
228+
return cast("int", self._ser.fileno())
236229
except io.UnsupportedOperation:
237230
raise NotImplementedError(
238231
"fileno is not implemented using current CAN bus on this platform"
@@ -241,7 +234,11 @@ def fileno(self) -> int:
241234
raise CanOperationError("Cannot fetch fileno") from exception
242235

243236
@staticmethod
244-
def _detect_available_configs() -> list[AutoDetectedConfig]:
245-
return [
246-
{"interface": "serial", "channel": port.device} for port in list_comports()
247-
]
237+
def _detect_available_configs() -> Sequence[AutoDetectedConfig]:
238+
configs: list[AutoDetectedConfig] = []
239+
if serial is None:
240+
return configs
241+
242+
for port in serial.tools.list_ports.comports():
243+
configs.append({"interface": "serial", "channel": port.device})
244+
return configs

can/interfaces/socketcan/utils.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import struct
1010
import subprocess
1111
import sys
12-
from typing import Optional, cast
12+
from typing import Optional
1313

1414
from can import typechecking
1515
from can.interfaces.socketcan.constants import CAN_EFF_FLAG
@@ -28,7 +28,6 @@ def pack_filters(can_filters: Optional[typechecking.CanFilters] = None) -> bytes
2828
can_id = can_filter["can_id"]
2929
can_mask = can_filter["can_mask"]
3030
if "extended" in can_filter:
31-
can_filter = cast("typechecking.CanFilterExtended", can_filter)
3231
# Match on either 11-bit OR 29-bit messages instead of both
3332
can_mask |= CAN_EFF_FLAG
3433
if can_filter["extended"]:

can/interfaces/udp_multicast/bus.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@
66
import struct
77
import time
88
import warnings
9-
from typing import Optional, Union
9+
from typing import Any, Optional, Union
1010

1111
import can
12-
from can import BusABC, CanProtocol
12+
from can import BusABC, CanProtocol, Message
1313
from can.typechecking import AutoDetectedConfig
1414

1515
from .utils import is_msgpack_installed, pack_message, unpack_message
@@ -98,7 +98,7 @@ def __init__(
9898
hop_limit: int = 1,
9999
receive_own_messages: bool = False,
100100
fd: bool = True,
101-
**kwargs,
101+
**kwargs: Any,
102102
) -> None:
103103
is_msgpack_installed()
104104

@@ -126,7 +126,9 @@ def is_fd(self) -> bool:
126126
)
127127
return self._can_protocol is CanProtocol.CAN_FD
128128

129-
def _recv_internal(self, timeout: Optional[float]):
129+
def _recv_internal(
130+
self, timeout: Optional[float]
131+
) -> tuple[Optional[Message], bool]:
130132
result = self._multicast.recv(timeout)
131133
if not result:
132134
return None, False
@@ -204,7 +206,7 @@ def __init__(
204206

205207
# Look up multicast group address in name server and find out IP version of the first suitable target
206208
# and then get the address family of it (socket.AF_INET or socket.AF_INET6)
207-
connection_candidates = socket.getaddrinfo( # type: ignore
209+
connection_candidates = socket.getaddrinfo(
208210
group, self.port, type=socket.SOCK_DGRAM
209211
)
210212
sock = None

can/interfaces/udp_multicast/utils.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
Defines common functions.
33
"""
44

5-
from typing import Any, Optional
5+
from typing import Any, Optional, cast
66

77
from can import CanInterfaceNotImplementedError, Message
88
from can.typechecking import ReadableBytesLike
@@ -51,7 +51,7 @@ def pack_message(message: Message) -> bytes:
5151
"bitrate_switch": message.bitrate_switch,
5252
"error_state_indicator": message.error_state_indicator,
5353
}
54-
return msgpack.packb(as_dict, use_bin_type=True)
54+
return cast("bytes", msgpack.packb(as_dict, use_bin_type=True))
5555

5656

5757
def unpack_message(

0 commit comments

Comments
 (0)