From e23fdd5a135fa6123abb922077a943a03fd4135f Mon Sep 17 00:00:00 2001 From: Jacob Schaer Date: Tue, 16 Dec 2025 23:20:17 -0800 Subject: [PATCH 1/2] Update ipv6 and unit tests Per recommendation in #64 --- doipclient/client.py | 14 +++++++++++--- setup.py | 2 +- tests/test_client.py | 19 +++++++++++++++++-- 3 files changed, 29 insertions(+), 6 deletions(-) diff --git a/doipclient/client.py b/doipclient/client.py index 7f35c38..0617b60 100644 --- a/doipclient/client.py +++ b/doipclient/client.py @@ -217,7 +217,7 @@ def _create_udp_socket( sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) # IPv6 version always uses link-local scope multicast address (FF02 16 ::1) - sock.bind((LINK_LOCAL_MULTICAST_ADDRESS, udp_port)) + sock.bind(("::", udp_port)) if source_interface is None: # 0 is the "default multicast interface" which is unlikely to be correct, but it will do @@ -328,7 +328,7 @@ def await_vehicle_announcement( @classmethod def get_entity( - cls, ecu_ip_address="255.255.255.255", protocol_version=0x02, eid=None, vin=None + cls, ecu_ip_address="255.255.255.255", protocol_version=0x02, eid=None, vin=None, source_interface=None ): """Sends a VehicleIdentificationRequest and awaits a VehicleIdentificationResponse from the ECU, either with a specified VIN, EIN, or nothing. Equivalent to the request_vehicle_identification() method @@ -345,12 +345,20 @@ def get_entity( :type eid: bytes, optional :param vin: VIN of the Vehicle :type vin: str, optional + :param source_interface: Interface name (like "eth0") to bind to for use with IPv6. Defaults to None which + will use the default interface (which may not be the one connected to the ECU). Does nothing for IPv4, + which will bind to all interfaces uses INADDR_ANY. + :type source_interface: str, optional :return: The vehicle identification response message :rtype: VehicleIdentificationResponse """ # UDP_TEST_EQUIPMENT_REQUEST is dynamically assigned using udp_port=0 - sock = cls._create_udp_socket(udp_port=0, timeout=A_DOIP_CTRL) + ipv6 = False + if type(ipaddress.ip_address(ecu_ip_address)) == ipaddress.IPv6Address: + ipv6 = True + + sock = cls._create_udp_socket(ipv6=ipv6, udp_port=0, timeout=A_DOIP_CTRL, source_interface=source_interface) if eid: message = VehicleIdentificationRequestWithEID(eid) diff --git a/setup.py b/setup.py index e4c9479..38260d8 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ setuptools.setup( name="doipclient", - version="1.2.0", + version="1.3.0", description="A Diagnostic over IP (DoIP) client implementing ISO-13400-2.", long_description=long_description, long_description_content_type="text/x-rst", diff --git a/tests/test_client.py b/tests/test_client.py index 7ae9c4c..f8fa55c 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -774,7 +774,7 @@ def test_request_vehicle_identification_with_vin(mock_socket): assert result.vin_sync_status == 0x00 -def test_get_entity(mock_socket): +def test_get_entity_ipv4(mock_socket): mock_socket.rx_queue.append(vehicle_identification_response) _, result = DoIPClient.get_entity() assert mock_socket.tx_queue[-1] == vehicle_identification_request @@ -786,6 +786,21 @@ def test_get_entity(mock_socket): assert result.vin_sync_status == 0x00 +def test_get_entity_ipv6(mock_socket, mocker): + mocker.patch("socket.if_nametoindex", return_value=1) + mock_socket.rx_queue.append(vehicle_identification_response) + _, result = DoIPClient.get_entity(ecu_ip_address="2001:db8::", source_interface="eth0") + assert mock_socket.tx_queue[-1] == vehicle_identification_request + assert mock_socket._bound_ip == "::" + assert mock_socket.opts[IPPROTO_IPV6][socket.IPV6_JOIN_GROUP] == bytes.fromhex("ff02000000000000000000000000000101000000") + assert result.vin == "1" * 17 + assert result.logical_address == 0x1234 + assert result.eid == b"1" * 6 + assert result.gid == b"2" * 6 + assert result.further_action_required == 0x00 + assert result.vin_sync_status == 0x00 + + def test_get_entity_with_ein(mock_socket): mock_socket.rx_queue.append(vehicle_identification_response) _, result = DoIPClient.get_entity(eid=b"1" * 6) @@ -881,7 +896,7 @@ def test_await_ipv6(mock_socket): except TimeoutError: pass assert mock_socket._network == socket.AF_INET6 - assert mock_socket._bound_ip == "ff02::1" + assert mock_socket._bound_ip == "::" assert mock_socket._bound_port == 13400 assert mock_socket.opts == { socket.SOL_SOCKET: {socket.SO_REUSEADDR: True}, From 576b428056284a75e61a29287b25c0e742d6e9a7 Mon Sep 17 00:00:00 2001 From: Jacob Schaer Date: Sun, 10 May 2026 23:31:19 -0700 Subject: [PATCH 2/2] Fix Windows Python <= 3.8.0 --- doipclient/client.py | 23 ++++++++++++++++++++--- tests/test_client.py | 10 +++++++--- 2 files changed, 27 insertions(+), 6 deletions(-) diff --git a/doipclient/client.py b/doipclient/client.py index 0617b60..81bae66 100644 --- a/doipclient/client.py +++ b/doipclient/client.py @@ -17,6 +17,11 @@ logger = logging.getLogger("doipclient") +try: + from socket import if_nametoindex +except ImportError: + if_nametoindex = None + class Parser: """Implements state machine for DoIP transport layer. @@ -222,8 +227,10 @@ def _create_udp_socket( if source_interface is None: # 0 is the "default multicast interface" which is unlikely to be correct, but it will do interface_index = 0 + elif if_nametoindex is not None: + interface_index = if_nametoindex(source_interface) else: - interface_index = socket.if_nametoindex(source_interface) + raise OSError("source_interface requires socket.if_nametoindex()") # Join the group so that packets are delivered mc_addr = ipaddress.IPv6Address(LINK_LOCAL_MULTICAST_ADDRESS) @@ -328,7 +335,12 @@ def await_vehicle_announcement( @classmethod def get_entity( - cls, ecu_ip_address="255.255.255.255", protocol_version=0x02, eid=None, vin=None, source_interface=None + cls, + ecu_ip_address="255.255.255.255", + protocol_version=0x02, + eid=None, + vin=None, + source_interface=None, ): """Sends a VehicleIdentificationRequest and awaits a VehicleIdentificationResponse from the ECU, either with a specified VIN, EIN, or nothing. Equivalent to the request_vehicle_identification() method @@ -358,7 +370,12 @@ def get_entity( if type(ipaddress.ip_address(ecu_ip_address)) == ipaddress.IPv6Address: ipv6 = True - sock = cls._create_udp_socket(ipv6=ipv6, udp_port=0, timeout=A_DOIP_CTRL, source_interface=source_interface) + sock = cls._create_udp_socket( + ipv6=ipv6, + udp_port=0, + timeout=A_DOIP_CTRL, + source_interface=source_interface, + ) if eid: message = VehicleIdentificationRequestWithEID(eid) diff --git a/tests/test_client.py b/tests/test_client.py index f8fa55c..c5aa69f 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -787,12 +787,16 @@ def test_get_entity_ipv4(mock_socket): def test_get_entity_ipv6(mock_socket, mocker): - mocker.patch("socket.if_nametoindex", return_value=1) + mocker.patch("doipclient.client.if_nametoindex", return_value=1) mock_socket.rx_queue.append(vehicle_identification_response) - _, result = DoIPClient.get_entity(ecu_ip_address="2001:db8::", source_interface="eth0") + _, result = DoIPClient.get_entity( + ecu_ip_address="2001:db8::", source_interface="eth0" + ) assert mock_socket.tx_queue[-1] == vehicle_identification_request assert mock_socket._bound_ip == "::" - assert mock_socket.opts[IPPROTO_IPV6][socket.IPV6_JOIN_GROUP] == bytes.fromhex("ff02000000000000000000000000000101000000") + assert mock_socket.opts[IPPROTO_IPV6][socket.IPV6_JOIN_GROUP] == bytes.fromhex( + "ff02000000000000000000000000000101000000" + ) assert result.vin == "1" * 17 assert result.logical_address == 0x1234 assert result.eid == b"1" * 6