Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 26 additions & 6 deletions doipclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -741,13 +741,23 @@ def send_diagnostic_to_address(
else:
result = self.read_doip()
if type(result) == DiagnosticMessageNegativeAcknowledgement:
raise IOError(
"Diagnostic request rejected with negative acknowledge code: {}".format(
result.nack_code
if result.source_address == address and result.target_address == self._client_logical_address:
raise IOError(
"Diagnostic request rejected with negative acknowledge code: {}".format(
result.nack_code
)
)
else:
logger.warning(
"Received DiagnosticMessageNegativeAcknowledgement, but source and target addresses don't match expected values. Ignoring."
)
)
elif type(result) == DiagnosticMessagePositiveAcknowledgement:
return
if result.source_address == address and result.target_address == self._client_logical_address:
return
else:
logger.warning(
"Received DiagnosticMessagePositiveAcknowledgement, but source and target addresses don't match expected values. Ignoring."
)
elif result:
logger.warning(
"Received unexpected DoIP message type {}. Ignoring".format(
Expand All @@ -772,7 +782,17 @@ def receive_diagnostic(self, timeout=None):
else:
result = self.read_doip()
if type(result) == DiagnosticMessage:
return result.user_data
if result.source_address == self._ecu_logical_address and result.target_address == self._client_logical_address:
return result.user_data
elif result.source_address != self._ecu_logical_address and result.target_address == self._client_logical_address:
logger.warning(
"Received DiagnosticMessage with expected target address, but source address doesn't match expected ECU logical address. Ignoring."
)
start_time = time.time()
else:
logger.warning(
"Received DiagnosticMessage, but source and target addresses don't match expected values. Ignoring."
)
elif result:
logger.warning(
"Received unexpected DoIP message type {}. Ignoring".format(
Expand Down
194 changes: 177 additions & 17 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,44 @@
alive_check_response = bytearray(
[int(x, 16) for x in "02 fd 00 08 00 00 00 02 0e 00".split(" ")]
)
diagnostic_negative_response = bytearray(
[int(x, 16) for x in "02 fd 80 03 00 00 00 05 00 00 00 00 05".split(" ")]
diagnostic_nack = bytearray(
[int(x, 16) for x in "02 fd 80 03 00 00 00 05 00 01 0e 00 05".split(" ")]
)
diagnostic_positive_response = bytearray(
[int(x, 16) for x in "02 fd 80 02 00 00 00 05 00 00 00 00 00".split(" ")]
diagnostic_nack_to_address = bytearray(
[int(x, 16) for x in "02 fd 80 03 00 00 00 05 12 34 0e 00 05".split(" ")]
)
diagnostic_nack_to_address_invalid_source = bytearray(
[int(x, 16) for x in "02 fd 80 03 00 00 00 05 12 35 0e 00 05".split(" ")]
)
diagnostic_nack_to_address_invalid_target = bytearray(
[int(x, 16) for x in "02 fd 80 03 00 00 00 05 12 34 0e 01 05".split(" ")]
)
diagnostic_nack_invalid_source = bytearray(
[int(x, 16) for x in "02 fd 80 03 00 00 00 05 00 02 0e 00 05".split(" ")]
)
diagnostic_nack_invalid_target = bytearray(
[int(x, 16) for x in "02 fd 80 03 00 00 00 05 00 01 0e 01 05".split(" ")]
)
diagnostic_ack = bytearray(
[int(x, 16) for x in "02 fd 80 02 00 00 00 05 00 01 0e 00 00".split(" ")]
)
diagnostic_ack_to_address = bytearray(
[int(x, 16) for x in "02 fd 80 02 00 00 00 05 12 34 0e 00 00".split(" ")]
)
diagnostic_ack_to_address_invalid_source = bytearray(
[int(x, 16) for x in "02 fd 80 02 00 00 00 05 12 35 0e 00 00".split(" ")]
)
diagnostic_ack_to_address_invalid_target = bytearray(
[int(x, 16) for x in "02 fd 80 02 00 00 00 05 12 34 0e 01 00".split(" ")]
)
diagnostic_ack_invalid_source = bytearray(
[int(x, 16) for x in "02 fd 80 02 00 00 00 05 00 02 0e 00 00".split(" ")]
)
diagnostic_ack_invalid_target = bytearray(
[int(x, 16) for x in "02 fd 80 02 00 00 00 05 00 01 0e 01 00".split(" ")]
)
diagnostic_result = bytearray(
[int(x, 16) for x in "02 fd 80 01 00 00 00 08 00 e0 00 55 00 01 02 03".split(" ")]
[int(x, 16) for x in "02 fd 80 01 00 00 00 08 00 01 0e 00 00 01 02 03".split(" ")]
)
entity_status_response = bytearray(
[int(x, 16) for x in "02 fd 40 02 00 00 00 03 01 10 1".split(" ")]
Expand Down Expand Up @@ -361,7 +391,7 @@ def test_resend_reactivate_closed_socket(mock_socket, mocker):
sut = DoIPClient(test_ip, test_logical_address, auto_reconnect_tcp=True)
mock_socket.rx_queue.append(bytearray())
mock_socket.rx_queue.append(successful_activation_response)
mock_socket.rx_queue.append(diagnostic_positive_response)
mock_socket.rx_queue.append(diagnostic_ack)
assert None == sut.send_diagnostic(bytearray([0, 1, 2]))
assert request_activation_spy.call_count == 2
assert reconnect_spy.call_count == 1
Expand All @@ -374,7 +404,7 @@ def test_resend_reactivate_broken_socket(mock_socket, mocker):
sut = DoIPClient(test_ip, test_logical_address, auto_reconnect_tcp=True)
mock_socket.rx_queue.append(ConnectionResetError(""))
mock_socket.rx_queue.append(successful_activation_response)
mock_socket.rx_queue.append(diagnostic_positive_response)
mock_socket.rx_queue.append(diagnostic_ack)
assert None == sut.send_diagnostic(bytearray([0, 1, 2]))
assert request_activation_spy.call_count == 2
assert reconnect_spy.call_count == 1
Expand All @@ -386,7 +416,7 @@ def test_no_resend_reactivate_broken_socket(mock_socket, mocker):
sut = DoIPClient(test_ip, test_logical_address)
mock_socket.rx_queue.append(ConnectionResetError(""))
mock_socket.rx_queue.append(successful_activation_response)
mock_socket.rx_queue.append(diagnostic_positive_response)
mock_socket.rx_queue.append(diagnostic_ack)
with pytest.raises(ConnectionResetError):
sut.send_diagnostic(bytearray([0, 1, 2]))
assert request_activation_spy.call_count == 1
Expand All @@ -401,7 +431,7 @@ def test_connect_with_bind(mock_socket):

def test_context_manager(mock_socket, mocker):
close_spy = mocker.spy(DoIPClient, "close")
mock_socket.rx_queue.append(diagnostic_positive_response)
mock_socket.rx_queue.append(diagnostic_ack)

with DoIPClient(test_ip, test_logical_address) as sut:
assert None == sut.send_diagnostic(bytearray([0, 1, 2]))
Expand Down Expand Up @@ -486,47 +516,177 @@ def test_request_entity_status(mock_socket):
assert result.currently_open_sockets == 1


def test_send_diagnostic_postive(mock_socket):
def test_send_diagnostic_ack(mock_socket):
sut = DoIPClient(test_ip, test_logical_address)
mock_socket.rx_queue.append(diagnostic_positive_response)
mock_socket.rx_queue.append(diagnostic_ack)
assert None == sut.send_diagnostic(bytearray([0, 1, 2]))
assert mock_socket.tx_queue[-1] == diagnostic_request


def test_send_diagnostic_negative(mock_socket):
def test_send_diagnostic_nack(mock_socket):
sut = DoIPClient(test_ip, test_logical_address)
mock_socket.rx_queue.append(diagnostic_negative_response)
mock_socket.rx_queue.append(diagnostic_nack)
with pytest.raises(
IOError, match=r"Diagnostic request rejected with negative acknowledge code"
):
result = sut.send_diagnostic(bytearray([0, 1, 2]))
assert mock_socket.tx_queue[-1] == diagnostic_request


def test_send_diagnostic_to_address_positive(mock_socket):
def test_send_diagnostic_ignores_invalid_source_ack(mock_socket):
sut = DoIPClient(test_ip, test_logical_address)
mock_socket.rx_queue.append(diagnostic_ack_invalid_source)
mock_socket.rx_queue.append(diagnostic_ack)
assert None == sut.send_diagnostic(bytearray([0, 1, 2]))
assert len(mock_socket.rx_queue) == 0
assert mock_socket.tx_queue[-1] == diagnostic_request


def test_send_diagnostic_ignores_invalid_source_nack(mock_socket):
sut = DoIPClient(test_ip, test_logical_address)
mock_socket.rx_queue.append(diagnostic_nack_invalid_source)
mock_socket.rx_queue.append(diagnostic_nack)
with pytest.raises(
IOError, match=r"Diagnostic request rejected with negative acknowledge code"
):
sut.send_diagnostic(bytearray([0, 1, 2]))
assert len(mock_socket.rx_queue) == 0
assert mock_socket.tx_queue[-1] == diagnostic_request


def test_send_diagnostic_ignores_invalid_target_ack(mock_socket):
sut = DoIPClient(test_ip, test_logical_address)
mock_socket.rx_queue.append(diagnostic_ack_invalid_target)
mock_socket.rx_queue.append(diagnostic_ack)
assert None == sut.send_diagnostic(bytearray([0, 1, 2]))
assert len(mock_socket.rx_queue) == 0
assert mock_socket.tx_queue[-1] == diagnostic_request


def test_send_diagnostic_ignores_invalid_target_nack(mock_socket):
sut = DoIPClient(test_ip, test_logical_address)
mock_socket.rx_queue.append(diagnostic_nack_invalid_target)
mock_socket.rx_queue.append(diagnostic_nack)
with pytest.raises(
IOError, match=r"Diagnostic request rejected with negative acknowledge code"
):
sut.send_diagnostic(bytearray([0, 1, 2]))
assert len(mock_socket.rx_queue) == 0
assert mock_socket.tx_queue[-1] == diagnostic_request


def test_send_diagnostic_times_out_when_only_invalid_source_ack_nack(mock_socket):
sut = DoIPClient(test_ip, test_logical_address)
mock_socket.rx_queue.append(diagnostic_ack_invalid_source)
mock_socket.rx_queue.append(diagnostic_nack_invalid_source)
with pytest.raises(TimeoutError, match=r"ECU failed to respond in time"):
sut.send_diagnostic(bytearray([0, 1, 2]), timeout=0.01)
assert mock_socket.tx_queue[-1] == diagnostic_request


def test_send_diagnostic_times_out_when_only_invalid_target_ack_nack(mock_socket):
sut = DoIPClient(test_ip, test_logical_address)
mock_socket.rx_queue.append(diagnostic_positive_response)
mock_socket.rx_queue.append(diagnostic_ack_invalid_target)
mock_socket.rx_queue.append(diagnostic_nack_invalid_target)
with pytest.raises(TimeoutError, match=r"ECU failed to respond in time"):
sut.send_diagnostic(bytearray([0, 1, 2]), timeout=0.01)
assert mock_socket.tx_queue[-1] == diagnostic_request


def test_send_diagnostic_to_address_ack(mock_socket):
sut = DoIPClient(test_ip, test_logical_address)
mock_socket.rx_queue.append(diagnostic_ack_to_address)
assert None == sut.send_diagnostic_to_address(0x1234, bytearray([0, 1, 2]))
assert mock_socket.tx_queue[-1] == diagnostic_request_to_address


def test_send_diagnostic_to_address_negative(mock_socket):
def test_send_diagnostic_to_address_nack(mock_socket):
sut = DoIPClient(test_ip, test_logical_address)
mock_socket.rx_queue.append(diagnostic_negative_response)
mock_socket.rx_queue.append(diagnostic_nack_to_address)
with pytest.raises(
IOError, match=r"Diagnostic request rejected with negative acknowledge code"
):
result = sut.send_diagnostic_to_address(0x1234, bytearray([0, 1, 2]))
assert mock_socket.tx_queue[-1] == diagnostic_request_to_address


def test_send_diagnostic_to_address_ignores_invalid_source_ack(mock_socket):
sut = DoIPClient(test_ip, test_logical_address)
mock_socket.rx_queue.append(diagnostic_ack_to_address_invalid_source)
mock_socket.rx_queue.append(diagnostic_ack_to_address)
assert None == sut.send_diagnostic_to_address(0x1234, bytearray([0, 1, 2]))
assert len(mock_socket.rx_queue) == 0
assert mock_socket.tx_queue[-1] == diagnostic_request_to_address


def test_send_diagnostic_to_address_ignores_invalid_target_ack(mock_socket):
sut = DoIPClient(test_ip, test_logical_address)
mock_socket.rx_queue.append(diagnostic_ack_to_address_invalid_target)
mock_socket.rx_queue.append(diagnostic_ack_to_address)
assert None == sut.send_diagnostic_to_address(0x1234, bytearray([0, 1, 2]))
assert len(mock_socket.rx_queue) == 0
assert mock_socket.tx_queue[-1] == diagnostic_request_to_address


def test_send_diagnostic_to_address_ignores_invalid_source_nack(mock_socket):
sut = DoIPClient(test_ip, test_logical_address)
mock_socket.rx_queue.append(diagnostic_nack_to_address_invalid_source)
mock_socket.rx_queue.append(diagnostic_nack_to_address)
with pytest.raises(
IOError, match=r"Diagnostic request rejected with negative acknowledge code"
):
sut.send_diagnostic_to_address(0x1234, bytearray([0, 1, 2]))
assert len(mock_socket.rx_queue) == 0
assert mock_socket.tx_queue[-1] == diagnostic_request_to_address


def test_send_diagnostic_to_address_ignores_invalid_target_nack(mock_socket):
sut = DoIPClient(test_ip, test_logical_address)
mock_socket.rx_queue.append(diagnostic_nack_to_address_invalid_target)
mock_socket.rx_queue.append(diagnostic_nack_to_address)
with pytest.raises(
IOError, match=r"Diagnostic request rejected with negative acknowledge code"
):
sut.send_diagnostic_to_address(0x1234, bytearray([0, 1, 2]))
assert len(mock_socket.rx_queue) == 0
assert mock_socket.tx_queue[-1] == diagnostic_request_to_address


def test_send_diagnostic_to_address_times_out_when_only_invalid_source_ack_nack(
mock_socket,
):
sut = DoIPClient(test_ip, test_logical_address)
mock_socket.rx_queue.append(diagnostic_ack_to_address_invalid_source)
mock_socket.rx_queue.append(diagnostic_nack_to_address_invalid_source)
with pytest.raises(TimeoutError, match=r"ECU failed to respond in time"):
sut.send_diagnostic_to_address(0x1234, bytearray([0, 1, 2]), timeout=0.01)
assert mock_socket.tx_queue[-1] == diagnostic_request_to_address


def test_send_diagnostic_to_address_times_out_when_only_invalid_target_ack_nack(
mock_socket,
):
sut = DoIPClient(test_ip, test_logical_address)
mock_socket.rx_queue.append(diagnostic_ack_to_address_invalid_target)
mock_socket.rx_queue.append(diagnostic_nack_to_address_invalid_target)
with pytest.raises(TimeoutError, match=r"ECU failed to respond in time"):
sut.send_diagnostic_to_address(0x1234, bytearray([0, 1, 2]), timeout=0.01)
assert mock_socket.tx_queue[-1] == diagnostic_request_to_address


def test_receive_diagnostic(mock_socket):
sut = DoIPClient(test_ip, test_logical_address)
mock_socket.rx_queue.append(diagnostic_result)
result = sut.receive_diagnostic()
assert result == bytearray([0, 1, 2, 3])


def test_receive_diagnostic_timeout(mock_socket):
sut = DoIPClient(test_ip, test_logical_address)
with pytest.raises(TimeoutError, match=r"ECU failed to respond in time"):
sut.receive_diagnostic(timeout=0.01)


def test_request_vehicle_identification(mock_socket):
sut = DoIPClient(test_ip, test_logical_address)
mock_socket.rx_queue.append(vehicle_identification_response)
Expand Down
Loading