Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 29 additions & 4 deletions doipclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@

logger = logging.getLogger("doipclient")

try:
from socket import if_nametoindex
except ImportError:
if_nametoindex = None


class Parser:
"""Implements state machine for DoIP transport layer.
Expand Down Expand Up @@ -217,13 +222,15 @@ def _create_udp_socket(
sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)

# IPv6 version always uses link-local scope multicast address (FF02 16 ::1)
sock.bind((LINK_LOCAL_MULTICAST_ADDRESS, udp_port))
sock.bind(("::", udp_port))

if source_interface is None:
# 0 is the "default multicast interface" which is unlikely to be correct, but it will do
interface_index = 0
elif if_nametoindex is not None:
interface_index = if_nametoindex(source_interface)
else:
interface_index = socket.if_nametoindex(source_interface)
raise OSError("source_interface requires socket.if_nametoindex()")

# Join the group so that packets are delivered
mc_addr = ipaddress.IPv6Address(LINK_LOCAL_MULTICAST_ADDRESS)
Expand Down Expand Up @@ -328,7 +335,12 @@ def await_vehicle_announcement(

@classmethod
def get_entity(
cls, ecu_ip_address="255.255.255.255", protocol_version=0x02, eid=None, vin=None
cls,
ecu_ip_address="255.255.255.255",
protocol_version=0x02,
eid=None,
vin=None,
source_interface=None,
):
"""Sends a VehicleIdentificationRequest and awaits a VehicleIdentificationResponse from the ECU,
either with a specified VIN, EIN, or nothing. Equivalent to the request_vehicle_identification() method
Expand All @@ -345,12 +357,25 @@ def get_entity(
:type eid: bytes, optional
:param vin: VIN of the Vehicle
:type vin: str, optional
:param source_interface: Interface name (like "eth0") to bind to for use with IPv6. Defaults to None which
will use the default interface (which may not be the one connected to the ECU). Does nothing for IPv4,
which will bind to all interfaces uses INADDR_ANY.
:type source_interface: str, optional
:return: The vehicle identification response message
:rtype: VehicleIdentificationResponse
"""

# UDP_TEST_EQUIPMENT_REQUEST is dynamically assigned using udp_port=0
sock = cls._create_udp_socket(udp_port=0, timeout=A_DOIP_CTRL)
ipv6 = False
if type(ipaddress.ip_address(ecu_ip_address)) == ipaddress.IPv6Address:
ipv6 = True

sock = cls._create_udp_socket(
ipv6=ipv6,
udp_port=0,
timeout=A_DOIP_CTRL,
source_interface=source_interface,
)

if eid:
message = VehicleIdentificationRequestWithEID(eid)
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

setuptools.setup(
name="doipclient",
version="1.2.0",
version="1.3.0",
description="A Diagnostic over IP (DoIP) client implementing ISO-13400-2.",
long_description=long_description,
long_description_content_type="text/x-rst",
Expand Down
23 changes: 21 additions & 2 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -774,7 +774,7 @@ def test_request_vehicle_identification_with_vin(mock_socket):
assert result.vin_sync_status == 0x00


def test_get_entity(mock_socket):
def test_get_entity_ipv4(mock_socket):
mock_socket.rx_queue.append(vehicle_identification_response)
_, result = DoIPClient.get_entity()
assert mock_socket.tx_queue[-1] == vehicle_identification_request
Expand All @@ -786,6 +786,25 @@ def test_get_entity(mock_socket):
assert result.vin_sync_status == 0x00


def test_get_entity_ipv6(mock_socket, mocker):
mocker.patch("doipclient.client.if_nametoindex", return_value=1)
mock_socket.rx_queue.append(vehicle_identification_response)
_, result = DoIPClient.get_entity(
ecu_ip_address="2001:db8::", source_interface="eth0"
)
assert mock_socket.tx_queue[-1] == vehicle_identification_request
assert mock_socket._bound_ip == "::"
assert mock_socket.opts[IPPROTO_IPV6][socket.IPV6_JOIN_GROUP] == bytes.fromhex(
"ff02000000000000000000000000000101000000"
)
assert result.vin == "1" * 17
assert result.logical_address == 0x1234
assert result.eid == b"1" * 6
assert result.gid == b"2" * 6
assert result.further_action_required == 0x00
assert result.vin_sync_status == 0x00


def test_get_entity_with_ein(mock_socket):
mock_socket.rx_queue.append(vehicle_identification_response)
_, result = DoIPClient.get_entity(eid=b"1" * 6)
Expand Down Expand Up @@ -881,7 +900,7 @@ def test_await_ipv6(mock_socket):
except TimeoutError:
pass
assert mock_socket._network == socket.AF_INET6
assert mock_socket._bound_ip == "ff02::1"
assert mock_socket._bound_ip == "::"
assert mock_socket._bound_port == 13400
assert mock_socket.opts == {
socket.SOL_SOCKET: {socket.SO_REUSEADDR: True},
Expand Down
Loading