From 63c807bee329b627182363a94ec60bba1b1437c5 Mon Sep 17 00:00:00 2001 From: Henri Ervasti Date: Fri, 19 Dec 2025 18:04:44 +0100 Subject: [PATCH 01/14] [135-make-pololu-maestro-driver-uart-mode] MAde changes, but unit-testing UART creates issues. This is due to the import structure of the adafruit-blinka --- qmi/instruments/pololu/maestro.py | 192 +++++++++++++++-------- qmi/instruments/pololu/qmi_uart.py | 98 ++++++++++++ tests/instruments/pololu/test_maestro.py | 93 +++++++---- 3 files changed, 286 insertions(+), 97 deletions(-) create mode 100644 qmi/instruments/pololu/qmi_uart.py diff --git a/qmi/instruments/pololu/maestro.py b/qmi/instruments/pololu/maestro.py index 8fd41bc8..9fcd829b 100644 --- a/qmi/instruments/pololu/maestro.py +++ b/qmi/instruments/pololu/maestro.py @@ -1,38 +1,71 @@ """Instrument driver for the Pololu maestro servo controller""" import logging -from typing import Dict, Generator, List, Optional, Tuple +from typing import TYPE_CHECKING, Generator from time import sleep from qmi.core.context import QMI_Context from qmi.core.instrument import QMI_Instrument, QMI_InstrumentIdentification from qmi.core.rpc import rpc_method -from qmi.core.transport import create_transport +from qmi.core.transport import create_transport, SerialTransportDescriptorParser, QMI_Transport from qmi.core.exceptions import QMI_InstrumentException import warnings +# Lazy import of the adafruit-blinka module 'board' and 'QMI_Uart'. See the function _import_modules() below. +if TYPE_CHECKING: + import board + from qmi.instruments.pololu.qmi_uart import QMI_Uart +else: + board = None + QMI_Uart = None + # Global variable holding the logger for this module. _logger = logging.getLogger(__name__) -class Pololu_Maestro(QMI_Instrument): - """Instrument driver for the Pololu Maestro servo controller.""" +def _import_modules() -> None: + """Import the adafruit-blinka module and busio extension. - # Baudrate of instrument. - BAUDRATE = 9600 + This import is done in a function, instead of at the top-level, + to avoid an unnecessary dependency for programs that do not access + the instrument directly. + """ + global board, QMI_Uart + if board is None or QMI_Uart is None: + import board + from qmi.instruments.pololu.qmi_uart import QMI_Uart - # Instrument should respond within 2 seconds. - RESPONSE_TIMEOUT = 2.0 - # Default number of channels. +class Pololu_Maestro(QMI_Instrument): + """Instrument driver for the Pololu Maestro servo controller. + + Note that new devices are delivered standard in UART mode. If the user wishes to use the device + in USB Dual Port or USB Chained serial mode, the mode has to be changed with the Pololu's + Maestro Control Center program. + + Attributes: + BAUDRATE: Default Baudrate of instrument with serial connection. + RESPONSE_TIMEOUT: Default response timeout for I/O actions. + DEFAULT_NUM_CHANNELS: Default number of channels. + DEFAULT_MIN_VALUE: Default minimum settable target and speed values. + DEFAULT_MAX_VALUE: Default maximum settable target and speed values. + DEFAULT_MIN_ACCELERATION: Default minimum acceleration. + DEFAULT_MAX_ACCELERATION: Default maximum acceleration. + """ + _rpc_constants = [ + "BAUDRATE", + "RESPONSE_TIMEOUT", + "DEFAULT_NUM_CHANNELS", + "DEFAULT_MIN_VALUE", + "DEFAULT_MAX_VALUE", + "DEFAULT_MIN_ACCELERATION", + "DEFAULT_MAX_ACCELERATION", + ] + BAUDRATE = 9600 + RESPONSE_TIMEOUT = 2.0 DEFAULT_NUM_CHANNELS = 6 - - # Default minimum and maximum settable values. - # These are determined by the bit lengths for the values (16 bits) DEFAULT_MIN_VALUE = 0 DEFAULT_MAX_VALUE = 16383 - - # Default minimum and maximum accleration. DEFAULT_MIN_ACCELERATION = 0 DEFAULT_MAX_ACCELERATION = 255 @@ -72,11 +105,12 @@ class Pololu_Maestro(QMI_Instrument): } def __init__( - self, context: QMI_Context, name: str, transport: str, num_channels: int = DEFAULT_NUM_CHANNELS, - channels_min_max_targets: Optional[Dict[int, Tuple[int, int]]] = None, - channels_min_max_speeds: Optional[Dict[int, Tuple[int, int]]] = None, - channels_min_max_accelerations: Optional[Dict[int, Tuple[int, int]]] = None) -> None: - """Initialize driver. + self, context: QMI_Context, name: str, transport: str, num_channels: int = DEFAULT_NUM_CHANNELS, + channels_min_max_targets: None | dict[int, tuple[int, int]] = None, + channels_min_max_speeds: None | dict[int, tuple[int, int]] = None, + channels_min_max_accelerations: None | dict[int, tuple[int, int]] = None + ) -> None: + """Initialize driver. This driver can be used for Pololu Micro Maestro in UART or Dual Port USB modes. Parameters: context: The QMI context @@ -91,41 +125,47 @@ def __init__( of the min and max. """ super().__init__(context, name) - self._transport = create_transport(transport, default_attributes={ - "baudrate": self.BAUDRATE}) + self._transport: QMI_Transport | QMI_Uart + if transport.lower().startswith("uart"): + # import modules + _import_modules() + # Do a manual parsing of the transport string + transport = transport.lower().replace("uart", "serial") + parsed_transport_dict = SerialTransportDescriptorParser.parse_parameter_strings(transport) + baudrate = parsed_transport_dict["baudrate"] if "baudrate" in parsed_transport_dict else self.BAUDRATE + self._transport = QMI_Uart(board.TX, board.RX, baudrate) + else: + self._transport = create_transport(transport, default_attributes={"baudrate": self.BAUDRATE}) + # This device number defaults to 0x0C (or 12 in decimal). device_nr = 0x0C # Command lead-in and device number are sent for each Pololu serial command. self._cmd_lead_in = chr(0xAA) + chr(device_nr) # Track the target value of each channel - self._targets: List[int] = [0] * num_channels + self._targets: list[int] = [0] * num_channels # Store the minimum and maximum target values of each channel - self._min_target_values: List[int] = [ - self.DEFAULT_MIN_VALUE] * num_channels - self._max_target_values: List[int] = [ - self.DEFAULT_MAX_VALUE] * num_channels - - channels_min_max_targets = channels_min_max_targets if channels_min_max_targets else {} - channels_min_max_speeds = channels_min_max_speeds if channels_min_max_speeds else {} - channels_min_max_accelerations = channels_min_max_accelerations if channels_min_max_accelerations else {} + self._min_target_values: list[int] = [self.DEFAULT_MIN_VALUE] * num_channels + self._max_target_values: list[int] = [self.DEFAULT_MAX_VALUE] * num_channels + + channels_min_max_targets = channels_min_max_targets or {} + channels_min_max_speeds = channels_min_max_speeds or {} + channels_min_max_accelerations = channels_min_max_accelerations or {} # Overwrite channels that have a provided min/max target value for c in channels_min_max_targets.keys(): self._min_target_values[c] = channels_min_max_targets[c][0] self._max_target_values[c] = channels_min_max_targets[c][1] + # Store the minimum and maximum speeds of each channel - self._min_speeds: List[int] = [ - self.DEFAULT_MIN_VALUE] * num_channels - self._max_speeds: List[int] = [ - self.DEFAULT_MAX_VALUE] * num_channels + self._min_speeds: list[int] = [self.DEFAULT_MIN_VALUE] * num_channels + self._max_speeds: list[int] = [self.DEFAULT_MAX_VALUE] * num_channels # Overwrite channels that have a provided min/max speed for c in channels_min_max_speeds.keys(): self._min_speeds[c] = channels_min_max_speeds[c][0] self._max_speeds[c] = channels_min_max_speeds[c][1] + # Store the minimum and maximum accelerations of each channel - self._min_accelerations: List[int] = [ - self.DEFAULT_MIN_ACCELERATION] * num_channels - self._max_accelerations: List[int] = [ - self.DEFAULT_MAX_ACCELERATION] * num_channels + self._min_accelerations: list[int] = [self.DEFAULT_MIN_ACCELERATION] * num_channels + self._max_accelerations: list[int] = [self.DEFAULT_MAX_ACCELERATION] * num_channels # Overwrite channels that have a provided min/max acceleration for c in channels_min_max_accelerations.keys(): self._min_accelerations[c] = channels_min_max_accelerations[c][0] @@ -141,6 +181,9 @@ def _compile_write_command(cmd: int, channel: int, value: int) -> str: Parameters: channel: Channel number. value: Value for the command. + + Returns: + command: Four-character compiled command string "". """ lsb = value & 0x7F msb = (value >> 7) & 0x7F @@ -157,17 +200,16 @@ def _get_high_bits(self, num: int) -> Generator[int, None, None]: yield bit num ^= bit - def _get_errors(self) -> List[str]: + def _get_errors(self) -> list[str]: """Get errors. Returns: - List of errors. + list of errors. """ self._transport.discard_read() # Discard possible read buffer cmd = self._cmd_lead_in + chr(0x21) self._transport.write(bytes(cmd, "latin-1")) - bin_err = self._transport.read( - nbytes=2, timeout=self.RESPONSE_TIMEOUT) + bin_err = self._transport.read_until_timeout(nbytes=2, timeout=self.RESPONSE_TIMEOUT) int_err = int.from_bytes(bin_err, "big") errors = [] # loop over bits and log the errors @@ -191,7 +233,8 @@ def _write(self, cmd: str) -> None: if errs: formatted_errs = '\n'.join(map(str, errs)) raise QMI_InstrumentException( - f"Pololu Maestro servo command {cmd} resulted in the following error(s).\n{formatted_errs}") + f"Pololu Maestro servo command {cmd} resulted in the following error(s).\n{formatted_errs}" + ) def _ask(self, cmd: str) -> int: """Send ask command to instrument. @@ -200,26 +243,25 @@ def _ask(self, cmd: str) -> int: cmd: The command string to be written. Raises: - QMI_InstrumentException if the servo command returns an error. + QMI_InstrumentException: If the servo command returns an error. Returns: - The queried value. + sum_msb_lsb: The queried value. """ cmd = self._cmd_lead_in + cmd self._transport.write(bytes(cmd, "latin-1")) # Sleep over the bits to be sent sleep(len(cmd) / self.BAUDRATE * 100) # Read back the response bits - lsb = ord(self._transport.read( - nbytes=1, timeout=self.RESPONSE_TIMEOUT)) - msb = ord(self._transport.read( - nbytes=1, timeout=self.RESPONSE_TIMEOUT)) + lsb = ord(self._transport.read_until_timeout(nbytes=1, timeout=self.RESPONSE_TIMEOUT)) + msb = ord(self._transport.read_until_timeout(nbytes=1, timeout=self.RESPONSE_TIMEOUT)) errs = self._get_errors() if errs: formatted_errs = '\n'.join(map(str, errs)) raise QMI_InstrumentException( - f"Pololu Maestro servo command {cmd} resulted in the following error(s).\n{formatted_errs}") + f"Pololu Maestro servo command {cmd} resulted in the following error(s).\n{formatted_errs}" + ) return (msb << 8) + lsb @@ -260,7 +302,7 @@ def get_idn(self) -> QMI_InstrumentIdentification: def set_target(self, channel: int, value: int) -> None: """Set channel to a specified target value. The target value is of units 0.25us. For example, if you want to set the target to 1500 us, you need to give as input 6000: - (1500×4 = 6000 = 01011101110000 in binary) + (1500×4 = 6000 = 01011101110000 in binary). Servo will begin moving based on speed and acceleration parameters previously set. This method will check against the maximum and minimum target value of the channel. If the value to be set is out of the min/max range, the method will set the target value to the min/max. @@ -287,11 +329,11 @@ def set_target(self, channel: int, value: int) -> None: self._targets[channel] = target @rpc_method - def get_targets(self) -> List[int]: + def get_targets(self) -> list[int]: """Get the current targets for each channel. Returns: - The target values for each channel. + self_targets: The target values for each channel. """ return self._targets @@ -303,7 +345,7 @@ def get_position(self, channel: int) -> int: channel: The channel number. Returns: - The position in quarter-microsecond units. + position: The position in quarter-microsecond units. """ self._check_is_open() self._check_channel(channel) @@ -321,7 +363,7 @@ def set_speed(self, channel: int, speed: int) -> None: speed: The target speed. Raises: - ValueError if speed is out of allowable range. + ValueError: If speed is out of allowable range. """ self._check_is_open() self._check_channel(channel) @@ -344,7 +386,7 @@ def set_acceleration(self, channel: int, acceleration: int) -> None: acceleration: The target acceleration. Raises: - ValueError if acceleration not within 0 to 255 + ValueError: If acceleration not within 0 to 255. """ self._check_is_open() self._check_channel(channel) @@ -358,7 +400,7 @@ def set_acceleration(self, channel: int, acceleration: int) -> None: self._write(cmd) @rpc_method - def go_home(self): + def go_home(self) -> None: """Send all servos and outputs to their home values.""" self._check_is_open() self._write(chr(0x22)) @@ -370,11 +412,15 @@ def set_min_target(self, channel: int, value: int) -> None: Parameters: channel: The channel number. value: The new minimum target value. + + Raises: + ValueError: If minimum target value is not in valid range. """ if value < self.DEFAULT_MIN_VALUE or value > self._max_target_values[channel]: raise ValueError( - f"Minimum value invalid, should be in range [{self.DEFAULT_MIN_VALUE},\ - {self._max_target_values[channel]}") + f"Minimum value invalid, should be in range [{self.DEFAULT_MIN_VALUE}," + + f"{self._max_target_values[channel]}]" + ) self._min_target_values[channel] = value @@ -384,6 +430,9 @@ def get_min_target(self, channel: int) -> int: Parameters: channel: The channel number. + + Returns: + min_target_value: The queried value. """ return self._min_target_values[channel] @@ -394,11 +443,15 @@ def set_max_target(self, channel: int, value: int) -> None: Parameters: channel: The channel number. value: The new maximum target value. + + Raises: + ValueError: If given maximum target value is not in valid range. """ if value <= self._min_target_values[channel] or value > self.DEFAULT_MAX_VALUE: raise ValueError( - f"Maximum value invalid, should be in range [{self._min_target_values[channel] + 1},\ - {self.DEFAULT_MAX_VALUE}]") + f"Maximum value invalid, should be in range [{self._min_target_values[channel] + 1}," + + f"{self.DEFAULT_MAX_VALUE}]" + ) self._max_target_values[channel] = value @@ -408,6 +461,9 @@ def get_max_target(self, channel: int) -> int: Parameters: channel: The channel number. + + Returns: + max_target_value: The queried maximum target value. """ return self._max_target_values[channel] @@ -421,7 +477,8 @@ def set_max(self, channel: int, value: int) -> None: """ warnings.warn( f"{self.set_max.__name__} has been deprecated. Please use {self.set_max_target.__name__}.", - DeprecationWarning) + DeprecationWarning + ) self.set_max_target(channel, value) @rpc_method @@ -434,14 +491,15 @@ def set_min(self, channel: int, value: int) -> None: """ warnings.warn( f"{self.set_min.__name__} has been deprecated. Please use {self.set_min_target.__name__}.", - DeprecationWarning) + DeprecationWarning + ) self.set_min_target(channel, value) @rpc_method def set_target_value(self, channel: int, value: int) -> None: """Set channel to a specified target value. The target value is of units 0.25us. For example, if you want to set the target to 1500 us, you need to give as input 6000: - (1500×4 = 6000 = 01011101110000 in binary) + (1500×4 = 6000 = 01011101110000 in binary). Servo will begin moving based on speed and acceleration parameters previously set. This method will check against the maximum and minimum target value of the channel. If the value to be set is out of the min/max range, the method will set the target value to the min/max. @@ -479,7 +537,8 @@ def move_up(self, channel: int) -> None: """ warnings.warn( f"{self.move_up.__name__} has been deprecated. There is no replacement for this.", - DeprecationWarning) + DeprecationWarning + ) @rpc_method def move_down(self, channel: int) -> None: @@ -490,4 +549,5 @@ def move_down(self, channel: int) -> None: """ warnings.warn( f"{self.move_down.__name__} has been deprecated. There is no replacement for this.", - DeprecationWarning) + DeprecationWarning + ) diff --git a/qmi/instruments/pololu/qmi_uart.py b/qmi/instruments/pololu/qmi_uart.py new file mode 100644 index 00000000..8ca76bc0 --- /dev/null +++ b/qmi/instruments/pololu/qmi_uart.py @@ -0,0 +1,98 @@ +import time + +import busio + +from qmi.core.exceptions import QMI_InvalidOperationException + + +class QMI_Uart(busio.UART): + """Extension of the class to make compatible with QMI_Transport calls. + + Attributes: + READ_BYTE_BATCH_SIZE: The default size of the read buffer, based on . + """ + READ_BYTE_BATCH_SIZE = 2 + + def __init__(self, *args, **kwargs): + super().__init__(args, kwargs) + self._is_open = True + + def _check_is_open(self) -> None: + """Verify that the transport is open, otherwise raise exception.""" + if not self._is_open: + raise QMI_InvalidOperationException( + f"Operation not allowed on closed transport {type(self).__name__}") + + def open(self) -> None: + """The instrument is opened already at the __init__.""" + pass + + def close(self) -> None: + """Close the transport and de-initialize the device.""" + self._check_is_open() + self.deinit() + self._is_open = False + + def write(self, data: bytes) -> None: + """Write a sequence of bytes to the transport. + + When this method returns, all bytes are written to the transport + or queued to be written to the transport. + + An exception is raised if the transport is closed from the remote + side before all bytes could be written. + + Subclasses must override this method, if applicable. + """ + self._check_is_open() + super().write(data) + + def read_until_timeout(self, nbytes: int, timeout: float) -> bytes: + """Read a sequence of bytes from the transport. + + This method blocks until either the specified number of bytes + are available or the timeout (in seconds) expires, whichever occurs + sooner. + + If timeout occurs, the partial sequence of available bytes is returned. + This sequence may be empty if timeout occurs before any byte was available. + + If the transport has been closed on the remote side, any remaining + input bytes are returned (up to the maximum number of bytes requested). + If there are no more bytes to read, QMI_EndOfInputException is raised. + + Subclasses must override this method, if applicable. + + Parameters: + nbytes: Maximum number of bytes to read. + timeout: Maximum time to wait (in seconds). + + Returns: + Received bytes. + + Raises: + ~qmi.core.exceptions.QMI_EndOfInputException: If the transport has been closed on + the remote side and there are no more bytes to read. + """ + self._check_is_open() + buffer = bytearray() + batch = self.READ_BYTE_BATCH_SIZE + bytes_read = 0 + start_time = time.time() + while bytes_read < nbytes: + # Extend buffer, for now try in batches of 4 bytes. + self.readinto(buffer, nbytes=batch) + bytes_read += batch + # Check on remaining buffer and adjust read batch size if necessary. + if bytes_read + batch > nbytes: + self.readinto(buffer, nbytes=nbytes - bytes_read) + break + + if time.time() - start_time > timeout: + break + + return bytes(buffer) + + def discard_read(self) -> None: + """Discard all bytes that are immediately available for reading.""" + self.readline() # Warning! This might block if there is nothing to read. diff --git a/tests/instruments/pololu/test_maestro.py b/tests/instruments/pololu/test_maestro.py index 6cb5c727..2dddf07b 100644 --- a/tests/instruments/pololu/test_maestro.py +++ b/tests/instruments/pololu/test_maestro.py @@ -2,21 +2,57 @@ from unittest.mock import Mock, patch, call import logging -import qmi -from qmi.core.transport import QMI_TcpTransport -from qmi.instruments.pololu import Pololu_Maestro +board_mock = Mock() +board_mock.TX = Mock() +board_mock.RX = Mock() + +from qmi.core.transport import QMI_SerialTransport from qmi.core.exceptions import QMI_InstrumentException +with patch("typing.TYPE_CHECKING", True): + import qmi.instruments + import qmi.instruments.pololu + with patch("qmi.instruments.pololu"): + with patch("qmi.instruments.pololu.maestro.board", board_mock), patch( + "pkg_resources._typeshed"): + from qmi.instruments.pololu import Pololu_Maestro + +from tests.patcher import PatcherQmiContext as QMI_Context + + +class PololuMaestroUartOpenCloseTestCase(unittest.TestCase): + + def setUp(self) -> None: #, imp) -> None: + self.baudrate = 115200 + ctx = QMI_Context("pololu_unit_test") + transport = f"uart:COM1:baudrate={self.baudrate}" + self.instr = Pololu_Maestro(ctx, "Pololu", transport) + + self.addCleanup(self.board) + + def test_open_close(self): + """Test opening and closing the instrument""" + self.instr.open() + self.assertTrue(self.instr.is_open()) + self.instr.close() + self.assertFalse(self.instr.is_open()) + self.board.assert_called_once_with( + "COM1", + baudrate=self.baudrate, # The rest are defaults + bytesize=8, + parity='N', + rtscts=False, + stopbits=1.0, + timeout=0.04 + ) -class PololuMaestroOpenCloseTestCase(unittest.TestCase): + +class PololuMaestroSerialOpenCloseTestCase(unittest.TestCase): def setUp(self) -> None: - qmi.start("pololu_unit_test") + ctx = QMI_Context("pololu_unit_test") transport = "serial:COM1" - self.instr = qmi.make_instrument("Pololu", Pololu_Maestro, transport) - - def tearDown(self) -> None: - qmi.stop() + self.instr = Pololu_Maestro(ctx, "Pololu", transport) def test_open_close(self): """Test opening and closing the instrument""" @@ -50,20 +86,19 @@ def setUp(self) -> None: logging.CRITICAL) self._cmd_lead = chr(0xAA) + chr(0x0C) self._error_check_cmd = bytes(self._cmd_lead + chr(0x21), "latin-1") - qmi.start("pololu_unit_test") + ctx = QMI_Context("pololu_unit_test") patcher = patch( - 'qmi.instruments.pololu.maestro.create_transport', spec=QMI_TcpTransport) + 'qmi.instruments.pololu.maestro.create_transport', spec=QMI_SerialTransport) self._transport_mock: Mock = patcher.start().return_value self.addCleanup(patcher.stop) - self.instr: Pololu_Maestro = qmi.make_instrument( - "Pololu", Pololu_Maestro, self.TRANSPORT_STR, + self.instr: Pololu_Maestro = Pololu_Maestro( + ctx, "Pololu", self.TRANSPORT_STR, channels_min_max_targets={1: (self.CHANNEL1_MIN, self.CHANNEL1_MAX), 3: (self.CHANNEL3_MIN, self.CHANNEL3_MAX)}) self.instr.open() def tearDown(self) -> None: self.instr.close() - qmi.stop() def test_setting_channel_min_and_max_targets(self): """Test setting the min and max targets of channels.""" @@ -102,19 +137,18 @@ def setUp(self) -> None: logging.CRITICAL) self._cmd_lead = chr(0xAA) + chr(0x0C) self._error_check_cmd = bytes(self._cmd_lead + chr(0x21), "latin-1") - qmi.start("pololu_unit_test") + ctx = QMI_Context("pololu_unit_test") patcher = patch( - 'qmi.instruments.pololu.maestro.create_transport', spec=QMI_TcpTransport) + 'qmi.instruments.pololu.maestro.create_transport', spec=QMI_SerialTransport) self._transport_mock: Mock = patcher.start().return_value self.addCleanup(patcher.stop) - self.instr: Pololu_Maestro = qmi.make_instrument( - "Pololu", Pololu_Maestro, self.TRANSPORT_STR, + self.instr: Pololu_Maestro = Pololu_Maestro( + ctx, "Pololu", self.TRANSPORT_STR, channels_min_max_speeds={1: (self.CHANNEL1_MIN, self.CHANNEL1_MAX)}) self.instr.open() def tearDown(self) -> None: self.instr.close() - qmi.stop() def test_setting_channel_min_and_max_speed(self): """Test setting the min and max speeds of channels.""" @@ -138,19 +172,18 @@ def setUp(self) -> None: logging.CRITICAL) self._cmd_lead = chr(0xAA) + chr(0x0C) self._error_check_cmd = bytes(self._cmd_lead + chr(0x21), "latin-1") - qmi.start("pololu_unit_test") + ctx = QMI_Context("pololu_unit_test") patcher = patch( - 'qmi.instruments.pololu.maestro.create_transport', spec=QMI_TcpTransport) + 'qmi.instruments.pololu.maestro.create_transport', spec=QMI_SerialTransport) self._transport_mock: Mock = patcher.start().return_value self.addCleanup(patcher.stop) - self.instr: Pololu_Maestro = qmi.make_instrument( - "Pololu", Pololu_Maestro, self.TRANSPORT_STR, + self.instr: Pololu_Maestro = Pololu_Maestro( + ctx, "Pololu", self.TRANSPORT_STR, channels_min_max_accelerations={1: (self.CHANNEL1_MIN, self.CHANNEL1_MAX)}) self.instr.open() def tearDown(self) -> None: self.instr.close() - qmi.stop() def test_setting_channel_min_and_max_acceleration(self): """Test setting the min and max accelerations of channels.""" @@ -172,18 +205,16 @@ def setUp(self) -> None: logging.CRITICAL) self._cmd_lead = chr(0xAA) + chr(0x0C) self._error_check_cmd = bytes(self._cmd_lead + chr(0x21), "latin-1") - qmi.start("pololu_unit_test") + ctx = QMI_Context("pololu_unit_test") patcher = patch( - 'qmi.instruments.pololu.maestro.create_transport', spec=QMI_TcpTransport) + 'qmi.instruments.pololu.maestro.create_transport', spec=QMI_SerialTransport) self._transport_mock: Mock = patcher.start().return_value self.addCleanup(patcher.stop) - self.instr: Pololu_Maestro = qmi.make_instrument( - "Pololu", Pololu_Maestro, self.TRANSPORT_STR) + self.instr: Pololu_Maestro = Pololu_Maestro(ctx, "Pololu", self.TRANSPORT_STR) self.instr.open() def tearDown(self) -> None: self.instr.close() - qmi.stop() def test_get_idn(self): """Test getting the QMI instrument ID.""" @@ -278,7 +309,7 @@ def test_get_position(self): cmd = chr(0x10) + chr(channel) expected_command = bytes(self._cmd_lead + cmd, "latin-1") low_bit, high_bit = expected - (expected // 256 * 256), expected // 256 - self._transport_mock.read.side_effect = [chr(low_bit), + self._transport_mock.read_until_timeout.side_effect = [chr(low_bit), chr(high_bit), b'\x00\x00'] expected_calls = [ call(expected_command), @@ -413,7 +444,7 @@ def test_command_error_check_excepts(self): # Arrange cmd = chr(0x22) expected_command = bytes(self._cmd_lead + cmd, "latin-1") - self._transport_mock.read.return_value = b'\x00\xF0' + self._transport_mock.read_until_timeout.return_value = b'\x00\xF0' expected_calls = [ call(expected_command), call(self._error_check_cmd) From f1554cd05a22b3d5348d5312049341d5e0ced55c Mon Sep 17 00:00:00 2001 From: Henri Ervasti Date: Mon, 22 Dec 2025 15:37:50 +0100 Subject: [PATCH 02/14] [135-make-pololu-maestro-driver-uart-mode] Now managed to fix the unit-tests as well. --- qmi/instruments/pololu/qmi_uart.py | 15 ++-- tests/instruments/pololu/test_maestro.py | 99 +++++++++++++++++------- 2 files changed, 81 insertions(+), 33 deletions(-) diff --git a/qmi/instruments/pololu/qmi_uart.py b/qmi/instruments/pololu/qmi_uart.py index 8ca76bc0..3554ad2e 100644 --- a/qmi/instruments/pololu/qmi_uart.py +++ b/qmi/instruments/pololu/qmi_uart.py @@ -14,7 +14,7 @@ class QMI_Uart(busio.UART): READ_BYTE_BATCH_SIZE = 2 def __init__(self, *args, **kwargs): - super().__init__(args, kwargs) + super().__init__(*args, **kwargs) self._is_open = True def _check_is_open(self) -> None: @@ -76,22 +76,25 @@ def read_until_timeout(self, nbytes: int, timeout: float) -> bytes: """ self._check_is_open() buffer = bytearray() - batch = self.READ_BYTE_BATCH_SIZE + batch = self.READ_BYTE_BATCH_SIZE if nbytes > self.READ_BYTE_BATCH_SIZE else nbytes bytes_read = 0 start_time = time.time() while bytes_read < nbytes: - # Extend buffer, for now try in batches of 4 bytes. - self.readinto(buffer, nbytes=batch) + # Extend buffer, for now try in batches (of 1 or more bytes). + buffer = self.readinto(buffer, nbytes=batch) bytes_read += batch + if bytes_read == nbytes: + break + # Check on remaining buffer and adjust read batch size if necessary. if bytes_read + batch > nbytes: - self.readinto(buffer, nbytes=nbytes - bytes_read) + buffer = self.readinto(buffer, nbytes=nbytes - bytes_read) break if time.time() - start_time > timeout: break - return bytes(buffer) + return buffer def discard_read(self) -> None: """Discard all bytes that are immediately available for reading.""" diff --git a/tests/instruments/pololu/test_maestro.py b/tests/instruments/pololu/test_maestro.py index 2dddf07b..62d946b4 100644 --- a/tests/instruments/pololu/test_maestro.py +++ b/tests/instruments/pololu/test_maestro.py @@ -2,33 +2,33 @@ from unittest.mock import Mock, patch, call import logging -board_mock = Mock() -board_mock.TX = Mock() -board_mock.RX = Mock() - from qmi.core.transport import QMI_SerialTransport from qmi.core.exceptions import QMI_InstrumentException -with patch("typing.TYPE_CHECKING", True): - import qmi.instruments - import qmi.instruments.pololu - with patch("qmi.instruments.pololu"): - with patch("qmi.instruments.pololu.maestro.board", board_mock), patch( - "pkg_resources._typeshed"): - from qmi.instruments.pololu import Pololu_Maestro +with patch("qmi.instruments.pololu.maestro.TYPE_CHECKING", True): + import qmi.instruments.pololu.qmi_uart # We need this import, do not remove + from qmi.instruments.pololu import Pololu_Maestro from tests.patcher import PatcherQmiContext as QMI_Context class PololuMaestroUartOpenCloseTestCase(unittest.TestCase): + board_mock = Mock() + board_mock.TX = 2 + board_mock.RX = 3 + busio_mock = Mock() + uart_ports_mock = Mock() def setUp(self) -> None: #, imp) -> None: self.baudrate = 115200 + self._cmd_lead = chr(0xAA) + chr(0x0C) ctx = QMI_Context("pololu_unit_test") transport = f"uart:COM1:baudrate={self.baudrate}" - self.instr = Pololu_Maestro(ctx, "Pololu", transport) - - self.addCleanup(self.board) + with patch.dict("sys.modules", { + "board": self.board_mock, "busio": self.busio_mock, "machine": Mock(), "microcontroller.pin": self.uart_ports_mock + }) as self.sys_patch: + self.uart_ports_mock.uartPorts = ([[10, 2, 3]]) + self.instr = Pololu_Maestro(ctx, "Pololu", transport) def test_open_close(self): """Test opening and closing the instrument""" @@ -36,15 +36,57 @@ def test_open_close(self): self.assertTrue(self.instr.is_open()) self.instr.close() self.assertFalse(self.instr.is_open()) - self.board.assert_called_once_with( - "COM1", - baudrate=self.baudrate, # The rest are defaults - bytesize=8, - parity='N', - rtscts=False, - stopbits=1.0, - timeout=0.04 - ) + + def test_write_with_set_target_value(self): + """Test writing by setting a target value.""" + # Arrange + target, channel = 5000, 1 + lsb = target & 0x7F # 7 bits for least significant byte + msb = (target >> 7) & 0x7F # shift 7 and take next 7 bits for msb + cmd = chr(0x04) + chr(channel) + chr(lsb) + chr(msb) + expected_command = bytes(self._cmd_lead + cmd, "latin-1") + self.instr._transport._uart.readinto.return_value = b'\x00\x00' + expected_calls = [ + call(expected_command), call(bytes(self._cmd_lead + '!', "latin-1")) + ] + + # Act + self.instr.open() + self.instr.set_target(channel, target) + + # Assert + self.instr._transport._uart.write.assert_has_calls(expected_calls) + self.instr._transport._uart.readline.assert_has_calls([call()]) + + def test_read_with_get_position(self): + """Test get position returns expected value.""" + # Arrange + expected, channel = 5000, 0 + cmd = chr(0x10) + chr(channel) + expected_command = bytes(self._cmd_lead + cmd, "latin-1") + low_bit, high_bit = expected - (expected // 256 * 256), expected // 256 + self.instr._transport._uart.readinto.side_effect = [ + chr(low_bit), + chr(high_bit), + b'\x00\x00', + ] + expected_write_calls = [ + call(expected_command), call(bytes(self._cmd_lead + '!', "latin-1")) + ] + expected_read_calls = [ + call(bytearray(b''), 1), + call(bytearray(b''), 1), + call(bytearray(b''), 2), + ] + + # Act + self.instr.open() + result = self.instr.get_position(channel) + + # Assert + self.instr._transport._uart.write.assert_has_calls(expected_write_calls) + self.instr._transport._uart.readinto.assert_has_calls(expected_read_calls) + self.assertEqual(expected, result) class PololuMaestroSerialOpenCloseTestCase(unittest.TestCase): @@ -309,9 +351,12 @@ def test_get_position(self): cmd = chr(0x10) + chr(channel) expected_command = bytes(self._cmd_lead + cmd, "latin-1") low_bit, high_bit = expected - (expected // 256 * 256), expected // 256 - self._transport_mock.read_until_timeout.side_effect = [chr(low_bit), - chr(high_bit), b'\x00\x00'] - expected_calls = [ + self._transport_mock.read_until_timeout.side_effect = [ + chr(low_bit), + chr(high_bit), + b'\x00\x00' + ] + expected_write_calls = [ call(expected_command), call(self._error_check_cmd) ] @@ -320,7 +365,7 @@ def test_get_position(self): result = self.instr.get_position(channel) # Assert - self._transport_mock.write.assert_has_calls(expected_calls) + self._transport_mock.write.assert_has_calls(expected_write_calls) self.assertEqual(result, expected) def test_set_speed(self): From ee0d11392bed07e1da1cb5969aee3953914c7c7d Mon Sep 17 00:00:00 2001 From: Henri Ervasti Date: Mon, 22 Dec 2025 15:46:01 +0100 Subject: [PATCH 03/14] [135-make-pololu-maestro-driver-uart-mode] Set mypy to ignore import errors. --- CHANGELOG.md | 1 + qmi/instruments/pololu/maestro.py | 2 +- qmi/instruments/pololu/qmi_uart.py | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3ad1a19a..f0f586d9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [0.52.0-beta.0] - Unreleased ### Added +- Pololu QMI driver can now be used also in UART mode. For this, use of special transport line "uart:
[:baudrate=...]" is needed. ### Changed diff --git a/qmi/instruments/pololu/maestro.py b/qmi/instruments/pololu/maestro.py index 9fcd829b..fed0b07f 100644 --- a/qmi/instruments/pololu/maestro.py +++ b/qmi/instruments/pololu/maestro.py @@ -13,7 +13,7 @@ # Lazy import of the adafruit-blinka module 'board' and 'QMI_Uart'. See the function _import_modules() below. if TYPE_CHECKING: - import board + import board # mypy: ignore from qmi.instruments.pololu.qmi_uart import QMI_Uart else: board = None diff --git a/qmi/instruments/pololu/qmi_uart.py b/qmi/instruments/pololu/qmi_uart.py index 3554ad2e..65b6f16f 100644 --- a/qmi/instruments/pololu/qmi_uart.py +++ b/qmi/instruments/pololu/qmi_uart.py @@ -1,6 +1,6 @@ import time -import busio +import busio # mypy: ignore from qmi.core.exceptions import QMI_InvalidOperationException From 6ce513e7f2da2cf1192a7cf90a4ade205d78c881 Mon Sep 17 00:00:00 2001 From: Henri Ervasti Date: Mon, 22 Dec 2025 15:51:44 +0100 Subject: [PATCH 04/14] [135-make-pololu-maestro-driver-uart-mode] Set mypy to ignore import errors. --- qmi/instruments/pololu/maestro.py | 2 +- qmi/instruments/pololu/qmi_uart.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/qmi/instruments/pololu/maestro.py b/qmi/instruments/pololu/maestro.py index fed0b07f..aaabdaee 100644 --- a/qmi/instruments/pololu/maestro.py +++ b/qmi/instruments/pololu/maestro.py @@ -13,7 +13,7 @@ # Lazy import of the adafruit-blinka module 'board' and 'QMI_Uart'. See the function _import_modules() below. if TYPE_CHECKING: - import board # mypy: ignore + import board # type: ignore from qmi.instruments.pololu.qmi_uart import QMI_Uart else: board = None diff --git a/qmi/instruments/pololu/qmi_uart.py b/qmi/instruments/pololu/qmi_uart.py index 65b6f16f..4d196cd1 100644 --- a/qmi/instruments/pololu/qmi_uart.py +++ b/qmi/instruments/pololu/qmi_uart.py @@ -1,6 +1,6 @@ import time -import busio # mypy: ignore +import busio # type: ignore from qmi.core.exceptions import QMI_InvalidOperationException From f2deb7043627300413a1eefea7cd52f15285f844 Mon Sep 17 00:00:00 2001 From: Henri Ervasti Date: Mon, 22 Dec 2025 16:49:59 +0100 Subject: [PATCH 05/14] [135-make-pololu-maestro-driver-uart-mode] Issues still with unit-tests. Now setting more patching for imports in `test_maestro.py``but this means we need to separately test `qmi_uart.py` as we mock most of it. --- tests/instruments/pololu/test_maestro.py | 30 ++++++++++++++---------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/tests/instruments/pololu/test_maestro.py b/tests/instruments/pololu/test_maestro.py index 62d946b4..e31d9ebb 100644 --- a/tests/instruments/pololu/test_maestro.py +++ b/tests/instruments/pololu/test_maestro.py @@ -6,7 +6,6 @@ from qmi.core.exceptions import QMI_InstrumentException with patch("qmi.instruments.pololu.maestro.TYPE_CHECKING", True): - import qmi.instruments.pololu.qmi_uart # We need this import, do not remove from qmi.instruments.pololu import Pololu_Maestro from tests.patcher import PatcherQmiContext as QMI_Context @@ -17,15 +16,19 @@ class PololuMaestroUartOpenCloseTestCase(unittest.TestCase): board_mock.TX = 2 board_mock.RX = 3 busio_mock = Mock() + uart_mock = Mock() uart_ports_mock = Mock() - def setUp(self) -> None: #, imp) -> None: + def setUp(self) -> None: + self.busio_mock.UART = Mock(return_value=None) self.baudrate = 115200 self._cmd_lead = chr(0xAA) + chr(0x0C) ctx = QMI_Context("pololu_unit_test") transport = f"uart:COM1:baudrate={self.baudrate}" with patch.dict("sys.modules", { - "board": self.board_mock, "busio": self.busio_mock, "machine": Mock(), "microcontroller.pin": self.uart_ports_mock + "qmi.instruments.pololu.qmi_uart": self.uart_mock, + "board": self.board_mock, "busio": self.busio_mock, + "machine": Mock(), "microcontroller.pin": self.uart_ports_mock }) as self.sys_patch: self.uart_ports_mock.uartPorts = ([[10, 2, 3]]) self.instr = Pololu_Maestro(ctx, "Pololu", transport) @@ -45,18 +48,19 @@ def test_write_with_set_target_value(self): msb = (target >> 7) & 0x7F # shift 7 and take next 7 bits for msb cmd = chr(0x04) + chr(channel) + chr(lsb) + chr(msb) expected_command = bytes(self._cmd_lead + cmd, "latin-1") - self.instr._transport._uart.readinto.return_value = b'\x00\x00' - expected_calls = [ + self.instr._transport.read_until_timeout.side_effect = [b'\x00\x00'] * 2 + expected_write_calls = [ call(expected_command), call(bytes(self._cmd_lead + '!', "latin-1")) ] + expected_read_call = [call(nbytes=2, timeout=Pololu_Maestro.RESPONSE_TIMEOUT)] # Act self.instr.open() self.instr.set_target(channel, target) # Assert - self.instr._transport._uart.write.assert_has_calls(expected_calls) - self.instr._transport._uart.readline.assert_has_calls([call()]) + self.instr._transport.write.assert_has_calls(expected_write_calls) + self.instr._transport.read_until_timeout.assert_has_calls(expected_read_call) def test_read_with_get_position(self): """Test get position returns expected value.""" @@ -65,7 +69,7 @@ def test_read_with_get_position(self): cmd = chr(0x10) + chr(channel) expected_command = bytes(self._cmd_lead + cmd, "latin-1") low_bit, high_bit = expected - (expected // 256 * 256), expected // 256 - self.instr._transport._uart.readinto.side_effect = [ + self.instr._transport.read_until_timeout.side_effect = [ chr(low_bit), chr(high_bit), b'\x00\x00', @@ -74,9 +78,9 @@ def test_read_with_get_position(self): call(expected_command), call(bytes(self._cmd_lead + '!', "latin-1")) ] expected_read_calls = [ - call(bytearray(b''), 1), - call(bytearray(b''), 1), - call(bytearray(b''), 2), + call(nbytes=1, timeout=Pololu_Maestro.RESPONSE_TIMEOUT), + call(nbytes=1, timeout=Pololu_Maestro.RESPONSE_TIMEOUT), + call(nbytes=2, timeout=Pololu_Maestro.RESPONSE_TIMEOUT), ] # Act @@ -84,8 +88,8 @@ def test_read_with_get_position(self): result = self.instr.get_position(channel) # Assert - self.instr._transport._uart.write.assert_has_calls(expected_write_calls) - self.instr._transport._uart.readinto.assert_has_calls(expected_read_calls) + self.instr._transport.write.assert_has_calls(expected_write_calls) + self.instr._transport.read_until_timeout.assert_has_calls(expected_read_calls) self.assertEqual(expected, result) From 889c25f2fcf4bb74507ff09b35808485605ff1e0 Mon Sep 17 00:00:00 2001 From: Henri Ervasti Date: Mon, 22 Dec 2025 17:40:19 +0100 Subject: [PATCH 06/14] [135-make-pololu-maestro-driver-uart-mode] Issues still with unit-tests. Now setting more patching for imports in `test_maestro.py``but this means we need to separately test `qmi_uart.py` as we mock most of it. --- tests/instruments/pololu/test_qmi_uart.py | 52 +++++++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 tests/instruments/pololu/test_qmi_uart.py diff --git a/tests/instruments/pololu/test_qmi_uart.py b/tests/instruments/pololu/test_qmi_uart.py new file mode 100644 index 00000000..cf241b83 --- /dev/null +++ b/tests/instruments/pololu/test_qmi_uart.py @@ -0,0 +1,52 @@ +import unittest +from unittest.mock import Mock, patch, call + + +class UART_stub: + def __init__(self, *args, **kwargs): + self.deinit_called = False + self.write_buffer = bytearray() + self.read_buffer = bytearray() + + def deinit(self): + self.deinit_called = True + + def write(self, data: bytes): + self.write_buffer += list(data) + + def readinto(self, buf: bytearray, nbytes: int): + return buf + bytearray(int(f"{self.read_buffer.pop(0)}").to_bytes()) + + def readline(self): + try: + return self.read_buffer + finally: + self.read_buffer = bytearray() + + +busio_mock = Mock() +busio_mock.UART = UART_stub +with patch.dict("sys.modules", {"busio": busio_mock}) as sys_patch: + from qmi.instruments.pololu.qmi_uart import QMI_Uart + + +class QmiUartTestCase(unittest.TestCase): + def setUp(self): + self.uart_mock = Mock() + + def test_something(self): + busio_mock.UART = self.uart_mock + qmi_uart = QMI_Uart() + qmi_uart.open() + + self.assertTrue(qmi_uart._is_open) + + qmi_uart.close() + + self.assertFalse(qmi_uart._is_open) + + busio_mock.UART.deinit.assert_called_once_with() + + +if __name__ == '__main__': + unittest.main() From af0d239b159e1ad8d5f29fd963de442a5515d9dd Mon Sep 17 00:00:00 2001 From: Henri Ervasti Date: Wed, 24 Dec 2025 12:00:09 +0100 Subject: [PATCH 07/14] [135-make-pololu-maestro-driver-uart-mode] Made all unit-tests, but due to high level of mocking, a hardware testing should be also done before review. --- qmi/instruments/pololu/qmi_uart.py | 45 ++++++---- tests/instruments/pololu/test_qmi_uart.py | 105 +++++++++++++++++----- 2 files changed, 110 insertions(+), 40 deletions(-) diff --git a/qmi/instruments/pololu/qmi_uart.py b/qmi/instruments/pololu/qmi_uart.py index 4d196cd1..c56383b0 100644 --- a/qmi/instruments/pololu/qmi_uart.py +++ b/qmi/instruments/pololu/qmi_uart.py @@ -1,4 +1,5 @@ import time +from queue import Empty import busio # type: ignore @@ -8,6 +9,14 @@ class QMI_Uart(busio.UART): """Extension of the class to make compatible with QMI_Transport calls. + The UART class opens a serial connection behind the scenes, see class binhoHostAdapter in binhoHostAdapter.py. + The default read and write timeouts are: timeout=0.025, write_timeout=0.05. These are not changeable through the + API, but would need a workaround through the 'serial' module interface. + + The docstring of busio.readline says a 'line' is read until a _newline_ character, but in `uart.py` we can see that + ``` while out != "\r":``` is used. Thus, the correct docstring should be that a line is read until a + _carriage return_ character. + Attributes: READ_BYTE_BATCH_SIZE: The default size of the read buffer, based on . """ @@ -24,7 +33,9 @@ def _check_is_open(self) -> None: f"Operation not allowed on closed transport {type(self).__name__}") def open(self) -> None: - """The instrument is opened already at the __init__.""" + """The instrument is opened already at the __init__. Note that if close() -> self.deinit() was called, + we cannot simply 're-open', but need to make a new instance to open the connection again. + """ pass def close(self) -> None: @@ -39,10 +50,8 @@ def write(self, data: bytes) -> None: When this method returns, all bytes are written to the transport or queued to be written to the transport. - An exception is raised if the transport is closed from the remote - side before all bytes could be written. - - Subclasses must override this method, if applicable. + Parameters: + data: Bytes to write. """ self._check_is_open() super().write(data) @@ -59,20 +68,13 @@ def read_until_timeout(self, nbytes: int, timeout: float) -> bytes: If the transport has been closed on the remote side, any remaining input bytes are returned (up to the maximum number of bytes requested). - If there are no more bytes to read, QMI_EndOfInputException is raised. - - Subclasses must override this method, if applicable. Parameters: nbytes: Maximum number of bytes to read. timeout: Maximum time to wait (in seconds). Returns: - Received bytes. - - Raises: - ~qmi.core.exceptions.QMI_EndOfInputException: If the transport has been closed on - the remote side and there are no more bytes to read. + data: Received bytes. """ self._check_is_open() buffer = bytearray() @@ -83,6 +85,9 @@ def read_until_timeout(self, nbytes: int, timeout: float) -> bytes: # Extend buffer, for now try in batches (of 1 or more bytes). buffer = self.readinto(buffer, nbytes=batch) bytes_read += batch + if time.time() - start_time > timeout: + break + if bytes_read == nbytes: break @@ -91,11 +96,15 @@ def read_until_timeout(self, nbytes: int, timeout: float) -> bytes: buffer = self.readinto(buffer, nbytes=nbytes - bytes_read) break - if time.time() - start_time > timeout: - break - return buffer def discard_read(self) -> None: - """Discard all bytes that are immediately available for reading.""" - self.readline() # Warning! This might block if there is nothing to read. + """Discard all bytes that are immediately available for reading. As using the read methods from uart.py use + `Queue.get()`, which means blocking until something is received, we work around this by calling the queue + without blocking. + """ + while True: + try: + self._uart._nova._rxdQueue.get(block=False) + except Empty: + break diff --git a/tests/instruments/pololu/test_qmi_uart.py b/tests/instruments/pololu/test_qmi_uart.py index cf241b83..81201b79 100644 --- a/tests/instruments/pololu/test_qmi_uart.py +++ b/tests/instruments/pololu/test_qmi_uart.py @@ -1,9 +1,31 @@ +from queue import Empty import unittest -from unittest.mock import Mock, patch, call +from unittest.mock import Mock, patch class UART_stub: + + class _Uart: + + class Nova: + + class RxdQueue: + block_count = 0 + def get(self, block): + if self.block_count: + raise Empty() + + self.block_count += 1 + return b"\r" + + def __init__(self): + self._rxdQueue = self.RxdQueue() + + def __init__(self): + self._nova = self.Nova() + def __init__(self, *args, **kwargs): + self._uart = self._Uart() self.deinit_called = False self.write_buffer = bytearray() self.read_buffer = bytearray() @@ -12,16 +34,13 @@ def deinit(self): self.deinit_called = True def write(self, data: bytes): - self.write_buffer += list(data) + self.write_buffer += bytearray(data) def readinto(self, buf: bytearray, nbytes: int): - return buf + bytearray(int(f"{self.read_buffer.pop(0)}").to_bytes()) + for _ in range(nbytes): + buf += bytearray(int(f"{self.read_buffer.pop(0)}").to_bytes()) - def readline(self): - try: - return self.read_buffer - finally: - self.read_buffer = bytearray() + return buf busio_mock = Mock() @@ -32,20 +51,62 @@ def readline(self): class QmiUartTestCase(unittest.TestCase): def setUp(self): - self.uart_mock = Mock() - - def test_something(self): - busio_mock.UART = self.uart_mock - qmi_uart = QMI_Uart() - qmi_uart.open() - - self.assertTrue(qmi_uart._is_open) - - qmi_uart.close() - - self.assertFalse(qmi_uart._is_open) - - busio_mock.UART.deinit.assert_called_once_with() + self.qmi_uart = QMI_Uart() + + def test_open_close(self): + """Simple open-close test.""" + self.qmi_uart.open() + + self.assertTrue(self.qmi_uart._is_open) + self.assertFalse(self.qmi_uart.deinit_called) + + self.qmi_uart.close() + + self.assertFalse(self.qmi_uart._is_open) + self.assertTrue(self.qmi_uart.deinit_called) + + def test_write(self): + """Test write function.""" + data = b"01234" + expected_write = bytearray(data) + self.qmi_uart.write(data) + self.qmi_uart.close() + + self.assertEqual(expected_write, self.qmi_uart.write_buffer) + + def test_read_until_timeout(self): + """Test read_until_timeout function.""" + expected_read = b"01234" + data = bytearray(expected_read) + self.qmi_uart.read_buffer = data + + # First read two bytes + retval = self.qmi_uart.read_until_timeout(2, 1.0) + self.assertEqual(expected_read[:2], retval) + # Then try to read 3 bytes, where we are limited by timeout and batch size, and get back only 2. + retval_2 = self.qmi_uart.read_until_timeout(3, 0.0) + self.assertEqual(expected_read[2:4], retval_2) + # Replenish buffer + self.qmi_uart.read_buffer = data + + def test_read_until_timeout_batch_size_check(self): + """Test read_until_timeout function with read 5 bytes, + which triggers the check on remaining buffer size vs batch size. + """ + expected_read = b"01234" + data = bytearray(expected_read) + self.qmi_uart.read_buffer = data + + retval = self.qmi_uart.read_until_timeout(5, 1.0) + self.assertEqual(expected_read, retval) + + self.qmi_uart.close() + + def test_discard_read(self): + """Test discard_read to see it reads from "queue".""" + self.assertEqual(0, self.qmi_uart._uart._nova._rxdQueue.block_count) + self.qmi_uart.discard_read() + self.assertEqual(1, self.qmi_uart._uart._nova._rxdQueue.block_count) if __name__ == '__main__': From cbcd57c8f91ad1468379b3147d83f3c299c4cfd5 Mon Sep 17 00:00:00 2001 From: Henri Ervasti Date: Wed, 24 Dec 2025 12:32:56 +0100 Subject: [PATCH 08/14] [135-make-pololu-maestro-driver-uart-mode] Fixing an issue in `proc.py` since changes in `psutil` version 7.2.0. --- CHANGELOG.md | 1 + qmi/tools/proc.py | 7 ++++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f0f586d9..48952ac2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed ### Fixed +- `psutil._common` does not contain `snicaddr` namedtuple since version 7.2.0. It has been moved to `psutil._ntuples`. Fixed this in `proc.py`. ### Removed diff --git a/qmi/tools/proc.py b/qmi/tools/proc.py index 19d4c61f..9795b9d5 100644 --- a/qmi/tools/proc.py +++ b/qmi/tools/proc.py @@ -253,7 +253,12 @@ def is_local_host(host: str) -> bool: # Return True if a local IP address matches the specified host. for addrs in if_addrs.values(): for addr in addrs: - assert isinstance(addr, psutil._common.snicaddr) + v, r, _ = map(int, psutil.__version__.split(".")) + if (v == 7 and r >= 2) or v > 7: + assert isinstance(addr, psutil._ntuples.snicaddr) + else: + assert isinstance(addr, psutil._common.snicaddr) + if addr.address in host_ips: return True From f61180d3442f3ce8fcf661a7e2bde94d367a29b3 Mon Sep 17 00:00:00 2001 From: Henri Ervasti Date: Wed, 24 Dec 2025 12:43:31 +0100 Subject: [PATCH 09/14] [135-make-pololu-maestro-driver-uart-mode] Fixing an issue in `proc.py` since changes in `psutil` version 7.2.0. --- tests/tools/test_proc.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/tools/test_proc.py b/tests/tools/test_proc.py index 303597c9..32ae7693 100644 --- a/tests/tools/test_proc.py +++ b/tests/tools/test_proc.py @@ -389,7 +389,11 @@ def test_is_local_host_assert_exceptions(self, psutil_patch, socket_patch): "some_if": ["foute_addr"] }) psutil_patch.net_if_addrs = net_if_addrs - psutil_patch._common.snicaddr = psutil._common.snicaddr + v, r, _ = map(int, psutil.__version__.split(".")) + if (v == 7 and r >= 2) or v > 7: + psutil_patch._ntuples.snicaddr = psutil._ntuples.snicaddr + else: + psutil_patch._common.snicaddr = psutil._common.snicaddr # Except first the second assertion with self.assertRaises(AssertionError) as ass_err_2: proc.is_local_host("123.45.67.89") From 7ae4ae83cedd65fb11b40cbd90285849158fc055 Mon Sep 17 00:00:00 2001 From: Henri Ervasti Date: Wed, 24 Dec 2025 12:51:38 +0100 Subject: [PATCH 10/14] [135-make-pololu-maestro-driver-uart-mode] Fixing an issue in `proc.py` since changes in `psutil` version 7.2.0. --- qmi/tools/proc.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/qmi/tools/proc.py b/qmi/tools/proc.py index 9795b9d5..17f7aece 100644 --- a/qmi/tools/proc.py +++ b/qmi/tools/proc.py @@ -53,6 +53,8 @@ else: _logger = logging.getLogger(__name__) +# psutil version info +V, R, _ = map(int, psutil.__version__.split(".")) # Result from shutdown_context(). ShutdownResult = NamedTuple("ShutdownResult", [ @@ -253,8 +255,7 @@ def is_local_host(host: str) -> bool: # Return True if a local IP address matches the specified host. for addrs in if_addrs.values(): for addr in addrs: - v, r, _ = map(int, psutil.__version__.split(".")) - if (v == 7 and r >= 2) or v > 7: + if (V == 7 and R >= 2) or V > 7: assert isinstance(addr, psutil._ntuples.snicaddr) else: assert isinstance(addr, psutil._common.snicaddr) From eb6255a8bc1d23ab57f96ee71a426a30ea80a8e5 Mon Sep 17 00:00:00 2001 From: Henri Ervasti Date: Mon, 5 Jan 2026 08:53:48 +0100 Subject: [PATCH 11/14] [135-make-pololu-maestro-driver-uart-mode] Fixing an issue in `proc.py` since changes in `psutil` version 7.2.0. --- tests/tools/test_proc.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/tools/test_proc.py b/tests/tools/test_proc.py index 32ae7693..aac78988 100644 --- a/tests/tools/test_proc.py +++ b/tests/tools/test_proc.py @@ -389,6 +389,7 @@ def test_is_local_host_assert_exceptions(self, psutil_patch, socket_patch): "some_if": ["foute_addr"] }) psutil_patch.net_if_addrs = net_if_addrs + psutil_patch.__version__ = psutil.__version__ v, r, _ = map(int, psutil.__version__.split(".")) if (v == 7 and r >= 2) or v > 7: psutil_patch._ntuples.snicaddr = psutil._ntuples.snicaddr From 3ba5f71d5aec9f876635b3d79bde095ee2e206c9 Mon Sep 17 00:00:00 2001 From: Henri Ervasti Date: Tue, 6 Jan 2026 08:57:30 +0100 Subject: [PATCH 12/14] [135-make-pololu-maestro-driver-uart-mode] Docstring edits. --- qmi/instruments/pololu/maestro.py | 12 +++++++----- qmi/instruments/pololu/qmi_uart.py | 2 +- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/qmi/instruments/pololu/maestro.py b/qmi/instruments/pololu/maestro.py index aaabdaee..4abbc35c 100644 --- a/qmi/instruments/pololu/maestro.py +++ b/qmi/instruments/pololu/maestro.py @@ -24,11 +24,10 @@ def _import_modules() -> None: - """Import the adafruit-blinka module and busio extension. + """Import the adafruit-blinka modules 'board', and 'busio' in qmi_uart.py. - This import is done in a function, instead of at the top-level, - to avoid an unnecessary dependency for programs that do not access - the instrument directly. + This import is done in a function, instead of at the top-level, to avoid an unnecessary dependency for programs + that do not access the instrument directly. """ global board, QMI_Uart if board is None or QMI_Uart is None: @@ -39,7 +38,9 @@ def _import_modules() -> None: class Pololu_Maestro(QMI_Instrument): """Instrument driver for the Pololu Maestro servo controller. - Note that new devices are delivered standard in UART mode. If the user wishes to use the device + Note that new devices are delivered standard in UART mode. The UART mode can be used by this driver by providing a + special transport string, f.ex. "uart:COMx:baudrate=115200". The device address (COM port number in this example) + is ignored by the driver in UART mode, but needed for parsing the string. If the user wishes to use the device in USB Dual Port or USB Chained serial mode, the mode has to be changed with the Pololu's Maestro Control Center program. @@ -134,6 +135,7 @@ def __init__( parsed_transport_dict = SerialTransportDescriptorParser.parse_parameter_strings(transport) baudrate = parsed_transport_dict["baudrate"] if "baudrate" in parsed_transport_dict else self.BAUDRATE self._transport = QMI_Uart(board.TX, board.RX, baudrate) + else: self._transport = create_transport(transport, default_attributes={"baudrate": self.BAUDRATE}) diff --git a/qmi/instruments/pololu/qmi_uart.py b/qmi/instruments/pololu/qmi_uart.py index c56383b0..bdc47ac8 100644 --- a/qmi/instruments/pololu/qmi_uart.py +++ b/qmi/instruments/pololu/qmi_uart.py @@ -74,7 +74,7 @@ def read_until_timeout(self, nbytes: int, timeout: float) -> bytes: timeout: Maximum time to wait (in seconds). Returns: - data: Received bytes. + buffer: Received bytes. """ self._check_is_open() buffer = bytearray() From 101328625c899a323a005554215217694a12f102 Mon Sep 17 00:00:00 2001 From: Henri Ervasti Date: Mon, 26 Jan 2026 10:01:36 +0100 Subject: [PATCH 13/14] [135-make-pololu-maestro-driver] Moving literal check inside conditional in scheduled-full-ci.yml. --- .github/workflows/scheduled-full-ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/scheduled-full-ci.yml b/.github/workflows/scheduled-full-ci.yml index 1dfb87f6..1276b606 100644 --- a/.github/workflows/scheduled-full-ci.yml +++ b/.github/workflows/scheduled-full-ci.yml @@ -43,7 +43,7 @@ jobs: python-version: ${{ matrix.python-version }} - name: Check py-xdrlib installation for Python 3.13 - if: ${{ matrix.python-version }} == "3.13" + if: ${{ matrix.python-version == "3.13" }} run: python -m pip install py-xdrlib - name: Run unit tests and generate report From d55a71c67513efb84084e62f4d0e95ea04b8c171 Mon Sep 17 00:00:00 2001 From: Henri Ervasti Date: Mon, 26 Jan 2026 10:02:54 +0100 Subject: [PATCH 14/14] [135-make-pololu-maestro-driver] Moving literal check inside conditional in scheduled-full-ci.yml. --- .github/workflows/scheduled-full-ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/scheduled-full-ci.yml b/.github/workflows/scheduled-full-ci.yml index 1276b606..83e599ac 100644 --- a/.github/workflows/scheduled-full-ci.yml +++ b/.github/workflows/scheduled-full-ci.yml @@ -43,7 +43,7 @@ jobs: python-version: ${{ matrix.python-version }} - name: Check py-xdrlib installation for Python 3.13 - if: ${{ matrix.python-version == "3.13" }} + if: ${{ matrix.python-version == 3.13 }} run: python -m pip install py-xdrlib - name: Run unit tests and generate report