USB Direct Programming Interface for MinicircuitsUSBSPDT#8033
USB Direct Programming Interface for MinicircuitsUSBSPDT#8033samantha-ho wants to merge 3 commits intomicrosoft:mainfrom
Conversation
|
@jenshnielsen I have tested this locally, and it works as far as it goes. There are a few errors, and I do not know enough about the libusb1 interface to address them yet. |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #8033 +/- ##
==========================================
- Coverage 70.26% 69.40% -0.87%
==========================================
Files 333 333
Lines 32169 32187 +18
==========================================
- Hits 22604 22339 -265
- Misses 9565 9848 +283 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
This PR replaces the MiniCircuits USB SPDT driver’s legacy .NET/pythonnet approach with a direct USB (libusb) implementation that sends SCPI commands over USB interrupt transfers.
Changes:
- Implement USB handle discovery (optionally by serial number) and SCPI query/command transport via
libusb1. - Update the USB SPDT channel set/get logic to use SCPI commands (
SETx=...,SWPORT?). - Add
libusb1as a declared dependency inpyproject.toml.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 11 comments.
| File | Description |
|---|---|
src/qcodes/instrument_drivers/Minicircuits/_minicircuits_usb_spdt.py |
Reworks the driver to use libusb-based SCPI communication instead of the .NET DLL wrapper. |
pyproject.toml |
Adds libusb1 dependency (currently as a core dependency). |
Comments suppressed due to low confidence (1)
pyproject.toml:66
- Adding
libusb1as a core dependency makes it mandatory for all QCoDeS installs, but this change appears specific to the MiniCircuits USB SPDT driver. QCoDeS already uses optional-dependency groups for driver-specific requirements (e.g.minicircuits_usb_spdt); consider movinglibusb1there instead. Also, the existingminicircuits_usb_spdtextra currently listspythonnet, which is no longer used by the updated driver and should be updated accordingly.
dependencies = [
"broadbean>=0.11.0",
"h5netcdf>=0.14.1",
"h5py>=3.8.0",
"ipywidgets>=8.0.0,<9.0.0",
"ipykernel>=6.12.0", # implicitly required by ipywidgets >=8.0.5
"jsonschema>=4.9.0",
"libusb1>=3.3.1",
"matplotlib>=3.6.0",
"networkx>=3.1",
"numpy>=1.22.4",
"packaging>=20.0",
"pandas>=1.5.0",
"pyarrow>=11.0.0", # will become a requirement of pandas. Installing explicitly silences a warning
"pyvisa>=1.12.0, <1.17.0",
"ruamel.yaml>=0.16.0,!=0.16.6",
"tabulate>=0.9.0",
"typing_extensions>=4.6.0",
"tqdm>=4.59.0",
"uncertainties>=3.2.0",
"versioningit>=2.2.1",
"websockets>=11.0",
"xarray>=2023.08.0",
"cf_xarray>=0.8.4",
"opentelemetry-api>=1.17.0",
"pillow>=9.2.0",
"dask>=2022.1.0", # we are making use of xarray features that requires dask implicitly
]
dynamic = ["version"]
[project.urls]
Homepage = "https://github.com/microsoft/Qcodes"
Documentation = "https://microsoft.github.io/Qcodes/"
Source = "https://github.com/microsoft/qcodes"
Tracker = "https://github.com/microsoft/Qcodes/issues"
Changelog = "https://microsoft.github.io/Qcodes/changes/index.html"
[project.optional-dependencies]
zurichinstruments = ["zhinst-qcodes>=0.3"]
minicircuits_usb_spdt = ["pythonnet>3.0.4"]
loop = ["qcodes_loop>=0.1.2"]
| import os | ||
| from typing import TYPE_CHECKING | ||
|
|
||
| import usb1 | ||
| from libusb1 import libusb_error | ||
| from usb1 import USBContext, USBDevice, USBDeviceHandle, USBError |
There was a problem hiding this comment.
The new imports include several unused symbols (os, usb1, libusb_error, USBDevice, USBError). Unused imports will fail linting and also make it harder to see which APIs are actually required. Please remove them (or use them if they are needed).
| import os | |
| from typing import TYPE_CHECKING | |
| import usb1 | |
| from libusb1 import libusb_error | |
| from usb1 import USBContext, USBDevice, USBDeviceHandle, USBError | |
| from typing import TYPE_CHECKING | |
| from usb1 import USBContext, USBDeviceHandle |
| if get_serial_number(handle) == serial_number: | ||
| return handle | ||
| device.close() # Unsure what missing arguments are needed or what they do |
There was a problem hiding this comment.
open_switch_with_sn opens a handle = device.open() but, on a serial-number mismatch, calls device.close() instead of closing the opened handle. This likely leaks the USBDeviceHandle and may not even be a valid API on USBDevice. Close the handle (and ensure it’s closed in all non-return paths) before continuing iteration.
| if get_serial_number(handle) == serial_number: | |
| return handle | |
| device.close() # Unsure what missing arguments are needed or what they do | |
| should_close_handle = True | |
| try: | |
| if get_serial_number(handle) == serial_number: | |
| should_close_handle = False | |
| return handle | |
| finally: | |
| if should_close_handle: | |
| handle.close() |
| else: # If no SN is provided, we can use the built-in function to return the first Minicircuits switch | ||
| return usb_context.openByVendorIDAndProductID( | ||
| MINICIRCUITS_VENDOR_ID, RF_SWITCH_PRODUCT_ID | ||
| ) | ||
| return None |
There was a problem hiding this comment.
When serial_number is None, open_switch_with_sn returns the first matching device handle without any interface claim/reset/initialization, but later communication uses interruptWrite/interruptRead directly. This makes the connection path behave differently depending on whether a serial number is provided and is likely to fail unless the interface is claimed somewhere else. Please perform the required device setup (e.g., claim interface 0 / detach kernel driver if needed) after opening the handle in all cases, ideally in MiniCircuitsUsbSPDT.__init__.
| def get_serial_number(handle: USBDeviceHandle) -> str: | ||
| handle.resetDevice() | ||
| handle.claimInterface(0) | ||
| cmd = [ | ||
| 41, | ||
| ] | ||
| cmd_array = bytearray([0] * 64) | ||
| cmd_array[0 : len(cmd)] = cmd | ||
| handle.interruptWrite(endpoint=1, data=cmd_array, timeout=50) | ||
| response = handle.interruptRead(endpoint=1, length=64, timeout=1000) |
There was a problem hiding this comment.
get_serial_number calls handle.claimInterface(0) but never releases the interface. This can leave the device in a claimed state (and/or fail subsequent claims) and is especially problematic if iterating over multiple devices. Wrap USB operations in try/finally and release the interface (and consider avoiding resetDevice() unless strictly required).
| super().__init__(name, **kwargs) | ||
| if os.name != "nt": | ||
| raise ImportError("""This driver only works in Windows.""") | ||
| try: | ||
| if driver_path is None: | ||
| try: | ||
| clr.AddReference(self.PATH_TO_DRIVER) | ||
| except FileNotFoundError: | ||
| clr.AddReference(self.PATH_TO_DRIVER_45) | ||
| else: | ||
| clr.AddReference(driver_path) | ||
|
|
||
| except (ImportError, FileNotFoundException): | ||
| raise ImportError( | ||
| """Load of mcl_RF_Switch_Controller64.dll or mcl_RF_Switch_Controller_NET45.dll | ||
| not possible. Make sure the dll file is not blocked by Windows. | ||
| To unblock right-click the dll to open properties and check the 'unblock' checkmark | ||
| in the bottom. Check that your python installation is 64bit.""" | ||
| ) | ||
| try: | ||
| import mcl_RF_Switch_Controller64 as mw_driver # pyright: ignore[reportMissingImports]# noqa: PLC0415 | ||
| except ImportError: | ||
| import mcl_RF_Switch_Controller_NET45 as mw_driver # pyright: ignore[reportMissingImports]# noqa: PLC0415 | ||
|
|
||
| self.switch = mw_driver.USB_RF_SwitchBox() | ||
| self._handle: USBDeviceHandle | None = open_switch_with_sn(serial_number) | ||
|
|
||
| if not self.switch.Connect(serial_number): | ||
| raise RuntimeError("Could not connect to device") | ||
| self.address = self.switch.Get_Address() | ||
| self.serial_number = self.switch.Read_SN("")[1] | ||
| self.connect_message() | ||
| self.add_channels() | ||
| self.connect_message() | ||
|
|
There was a problem hiding this comment.
MiniCircuitsUsbSPDT.__init__ does not check whether open_switch_with_sn returned None (no device found / failed open). As written, initialization continues and later calls (e.g. add_channels() -> get_idn()) will raise a generic exception from handle. Please fail fast here with a clear RuntimeError/ConnectionError that includes the requested serial number.
|
|
||
| def _query_scpi(self, command: str) -> str: | ||
| cmd_bytes = bytearray([42, 58]) # Interrupt code for Send SCPI Command | ||
| cmd_bytes.extend(bytearray(command, "ascii")) |
There was a problem hiding this comment.
_query_scpi pads commands to 64 bytes via bytearray(64 - len(cmd_bytes)) without validating length. If command is long enough, this will raise a low-level ValueError (negative bytearray length). Add an explicit length check and raise a clear error when the SCPI command cannot fit in a single 64-byte packet.
| cmd_bytes.extend(bytearray(command, "ascii")) | |
| cmd_bytes.extend(bytearray(command, "ascii")) | |
| if len(cmd_bytes) > 64: | |
| raise ValueError( | |
| "SCPI command is too long to fit in a single 64-byte USB packet. " | |
| f"Maximum command length is 62 ASCII bytes, got {len(cmd_bytes) - 2}." | |
| ) |
| self.handle.interruptWrite(endpoint=1, data=cmd_bytes, timeout=50) | ||
|
|
||
| response = self.handle.interruptRead(endpoint=1, length=64, timeout=1000) | ||
| resp_length = response.index(bytearray([0])) | ||
| trimmed_response = response[1:resp_length] | ||
|
|
||
| return trimmed_response.decode("ascii") |
There was a problem hiding this comment.
_query_scpi assumes responses always contain a NUL terminator and uses response.index(bytearray([0])). If the terminator is missing, this will raise and surface as an unhelpful exception. Please handle missing terminators (and USB errors) explicitly and raise a driver-level error that includes the command that failed.
| self.handle.interruptWrite(endpoint=1, data=cmd_bytes, timeout=50) | |
| response = self.handle.interruptRead(endpoint=1, length=64, timeout=1000) | |
| resp_length = response.index(bytearray([0])) | |
| trimmed_response = response[1:resp_length] | |
| return trimmed_response.decode("ascii") | |
| try: | |
| self.handle.interruptWrite(endpoint=1, data=cmd_bytes, timeout=50) | |
| response = self.handle.interruptRead(endpoint=1, length=64, timeout=1000) | |
| except USBError as exc: | |
| raise RuntimeError( | |
| f"SCPI command {command!r} failed during USB communication." | |
| ) from exc | |
| resp_length = response.find(b"\x00") | |
| if resp_length == -1: | |
| raise RuntimeError( | |
| f"SCPI command {command!r} failed: response was not NUL-terminated." | |
| ) | |
| trimmed_response = response[1:resp_length] | |
| try: | |
| return trimmed_response.decode("ascii") | |
| except UnicodeDecodeError as exc: | |
| raise RuntimeError( | |
| f"SCPI command {command!r} failed: response could not be decoded as ASCII." | |
| ) from exc |
| def get_idn(self) -> dict[str, str | None]: | ||
| # the arguments in those functions is the serial number or none if | ||
| # there is only one switch. | ||
| fw = self.switch.GetFirmware() | ||
| MN = self.switch.Read_ModelName("")[1] | ||
| SN = self.switch.Read_SN("")[1] | ||
| fw = self._query_scpi("FIRMWARE?") | ||
| MN = self._query_scpi("MN?") | ||
| SN = self._query_scpi("SN?") | ||
|
|
||
| id_dict = {"firmware": fw, "model": MN, "serial": SN, "vendor": "Mini-Circuits"} | ||
| return id_dict |
There was a problem hiding this comment.
get_idn returns the raw responses from MN? and SN?. The existing ethernet driver strips the MN=/SN= prefixes (see _minicircuits_rc_spdt.py), and MiniCircuitsSPDTBase.get_number_of_channels() expects model to start with RC or USB and contain a dash-separated channel count. If the USB device returns the same prefixed format, channel detection will break. Please normalize MN/SN to match the existing driver output (e.g., strip MN=/SN= and whitespace).
| def __init__( | ||
| self, | ||
| name: str, | ||
| driver_path: str | None = None, | ||
| serial_number: str | None = None, | ||
| **kwargs: "Unpack[InstrumentBaseKWArgs]", | ||
| ): |
There was a problem hiding this comment.
The driver_path parameter was removed from MiniCircuitsUsbSPDT.__init__. This is a breaking change for any downstream code that passed an explicit DLL path. If the goal is to switch transport implementations, consider keeping driver_path as an optional (deprecated) argument for backward compatibility, or document it clearly as removed in the changelog/newsfragment.
| import usb1 | ||
| from libusb1 import libusb_error | ||
| from usb1 import USBContext, USBDevice, USBDeviceHandle, USBError |
There was a problem hiding this comment.
usb1/libusb1 is imported at module import time, and this module is imported unconditionally from qcodes.instrument_drivers.Minicircuits.__init__. If libusb1 is unavailable (or fails to load due to missing system libusb), importing the Minicircuits driver package will fail even for users not using this instrument. Consider following the pattern used in USBHIDMixin.py: wrap the import in try/except and raise a clear error only when the driver is instantiated.
No description provided.