From dfb3ef7a47061833573eababbb3fddecfaf42d20 Mon Sep 17 00:00:00 2001 From: Tom Willemsen Date: Thu, 4 Jun 2026 12:38:51 +0100 Subject: [PATCH 1/2] Add zero check & current autorange --- khly6517Sup/khly6517.db | 34 +++++++++++++++++++ khly6517Sup/khly6517.proto | 18 ++++++++++ .../lewis_emulators/Khly6517/device.py | 3 ++ .../Khly6517/interfaces/stream_interface.py | 16 +++++++++ system_tests/tests/khly6517.py | 8 +++++ 5 files changed, 79 insertions(+) diff --git a/khly6517Sup/khly6517.db b/khly6517Sup/khly6517.db index 16151f7..d3bb467 100644 --- a/khly6517Sup/khly6517.db +++ b/khly6517Sup/khly6517.db @@ -17,6 +17,40 @@ record(bo, "$(P)DISABLE") field(ONAM, "COMMS DISABLED") } +record(bi, "$(P)ZEROCHECK") { + field(DESC, "Zero-check status") + field(DTYP, "stream") + field(INP, "@khly6517.proto getZeroCheck $(PORT)") + field(SCAN, "2 second") + field(ZNAM, "OFF") + field(ONAM, "ON") +} + +record(bo, "$(P)ZEROCHECK:SP") { + field(DESC, "Zero-check set") + field(DTYP, "stream") + field(OUT, "@khly6517.proto setZeroCheck $(PORT)") + field(ZNAM, "OFF") + field(ONAM, "ON") +} + +record(bi, "$(P)CURR:AUTORANGE") { + field(DESC, "Current autorange") + field(DTYP, "stream") + field(INP, "@khly6517.proto getCurrAutorange $(PORT)") + field(SCAN, "2 second") + field(ZNAM, "OFF") + field(ONAM, "ON") +} + +record(bo, "$(P)CURR:AUTORANGE:SP") { + field(DESC, "Current autorange set") + field(DTYP, "stream") + field(OUT, "@khly6517.proto setCurrAutorange $(PORT)") + field(ZNAM, "OFF") + field(ONAM, "ON") +} + record(stringin, "$(P)FUNC") { field(SCAN, "Passive") diff --git a/khly6517Sup/khly6517.proto b/khly6517Sup/khly6517.proto index de02eed..9bedbfa 100644 --- a/khly6517Sup/khly6517.proto +++ b/khly6517Sup/khly6517.proto @@ -59,3 +59,21 @@ setVoltage { setCurrent { out "FUNC CURR" } + +getZeroCheck { + out ":SYST:ZCH?"; + in "%#{OFF=0|0=0|ON=1|1=1}"; +} + +setZeroCheck { + out ":SYST:ZCH %{OFF|ON}"; +} + +getCurrAutorange { + out ":SENS:CURR:RANG:AUTO?"; + in "%#{OFF=0|0=0|ON=1|1=1}"; +} + +setCurrAutorange { + out ":SENS:CURR:RANG:AUTO %{OFF|ON}"; +} diff --git a/system_tests/lewis_emulators/Khly6517/device.py b/system_tests/lewis_emulators/Khly6517/device.py index 89be23b..2e1e655 100644 --- a/system_tests/lewis_emulators/Khly6517/device.py +++ b/system_tests/lewis_emulators/Khly6517/device.py @@ -26,6 +26,9 @@ def _initialize_data(self): self.error_queue = [] + self.curr_autorange = False + self.zero_check = False + # Mock data self.random_mode = True diff --git a/system_tests/lewis_emulators/Khly6517/interfaces/stream_interface.py b/system_tests/lewis_emulators/Khly6517/interfaces/stream_interface.py index d2bb72b..a53df3f 100644 --- a/system_tests/lewis_emulators/Khly6517/interfaces/stream_interface.py +++ b/system_tests/lewis_emulators/Khly6517/interfaces/stream_interface.py @@ -30,6 +30,10 @@ class Khly6517StreamInterface(StreamInterface): .escape("?") .eos() .build(), + CmdBuilder("get_zero_check").escape(":SYST:ZCH?").eos().build(), + CmdBuilder("set_zero_check").escape(":SYST:ZCH ").any().eos().build(), + CmdBuilder("get_curr_autorange").escape(":SENS:CURR:RANG:AUTO?").eos().build(), + CmdBuilder("set_curr_autorange").escape(":SENS:CURR:RANG:AUTO ").any().eos().build(), } in_terminator = "\r\n" @@ -76,3 +80,15 @@ def set_func(self, mode): def get_err(self): return self._device.get_error() + + def get_curr_autorange(self): + return "ON" if self._device.curr_autorange else "OFF" + + def set_curr_autorange(self, curr_autorange): + self._device.curr_autorange = curr_autorange in ("ON", 1) + + def get_zero_check(self): + return "ON" if self._device.zero_check else "OFF" + + def set_zero_check(self, zero_check): + self._device.zero_check = zero_check in ("ON", 1) diff --git a/system_tests/tests/khly6517.py b/system_tests/tests/khly6517.py index 8a7dfb2..8974d52 100644 --- a/system_tests/tests/khly6517.py +++ b/system_tests/tests/khly6517.py @@ -177,3 +177,11 @@ def test_WHEN_continuous_polling_set_THEN_read_at_intervals(self): self.button_and_check_reading("BTN:CURR", False) self.button_and_check_reading("BTN:VOLT", False) + + def test_curr_autorange(self): + self.ca.assert_setting_setpoint_sets_readback("ON", "CURR:AUTORANGE") + self.ca.assert_setting_setpoint_sets_readback("OFF", "CURR:AUTORANGE") + + def test_zero_check(self): + self.ca.assert_setting_setpoint_sets_readback("ON", "ZEROCHECK") + self.ca.assert_setting_setpoint_sets_readback("OFF", "ZEROCHECK") From b0f2a9055055f43a203491510f1a65362e1c9385 Mon Sep 17 00:00:00 2001 From: Tom Willemsen Date: Thu, 4 Jun 2026 12:47:25 +0100 Subject: [PATCH 2/2] ruff --- requirements.txt | 1 + .../lewis_emulators/Khly6517/device.py | 34 +++++++++-------- .../Khly6517/interfaces/stream_interface.py | 38 ++++++++++++------- 3 files changed, 43 insertions(+), 30 deletions(-) create mode 100644 requirements.txt diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..496988f --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +lewis diff --git a/system_tests/lewis_emulators/Khly6517/device.py b/system_tests/lewis_emulators/Khly6517/device.py index 2e1e655..5df0734 100644 --- a/system_tests/lewis_emulators/Khly6517/device.py +++ b/system_tests/lewis_emulators/Khly6517/device.py @@ -1,21 +1,23 @@ import queue import random from collections import OrderedDict +from typing import Callable +from lewis.core.statemachine import State from lewis.devices import StateMachineDevice from .states import DefaultState class Measurement: - def __init__(self, name): + def __init__(self, name: str) -> None: self.name = name self.range = 0 self.values = queue.Queue() class SimulatedKhly6517(StateMachineDevice): - def _initialize_data(self): + def _initialize_data(self) -> None: self.latest_reading = 0 self.idle = True @@ -32,40 +34,40 @@ def _initialize_data(self): # Mock data self.random_mode = True - def _get_state_handlers(self): + def _get_state_handlers(self) -> dict[str, State]: return { "default": DefaultState(), } - def _get_initial_state(self): + def _get_initial_state(self) -> str: return "default" - def _get_transition_handlers(self): + def _get_transition_handlers(self) -> dict[tuple[str, str], Callable[[], bool]]: return OrderedDict([]) # This command does not trigger a new reading, only return last reading - def fetch(self): + def fetch(self) -> float: return self.latest_reading - def abort(self): + def abort(self) -> None: self.idle = True self.latest_reading = 0 - def initiate(self): + def initiate(self) -> None: self.idle = False self.measurement() - def get_selected_ranged_value(self): + def get_selected_ranged_value(self) -> float: value = self.selected_data.values.get() if value > self.selected_data.range: # Parameter data out of range self.add_error(-222) return max(min(value, self.selected_data.range), 0) - def add_random_reading(self): + def add_random_reading(self) -> None: self.selected_data.values.put(random.random() * self.selected_data.range) - def measurement(self): + def measurement(self) -> None: if self.random_mode: self.add_random_reading() if not self.selected_data.values.empty(): @@ -73,14 +75,14 @@ def measurement(self): else: self.latest_reading = 0 - def add_error(self, error): + def add_error(self, error: int) -> None: if len(self.error_queue) < 9: self.error_queue.append(error) elif len(self.error_queue) == 9: # Queue overflow self.error_queue.append(-350) - def get_error(self): + def get_error(self) -> int: if len(self.error_queue) > 0: error = self.error_queue[0] self.error_queue = self.error_queue[1:] @@ -88,13 +90,13 @@ def get_error(self): else: return 0 - def clear_error_queue(self): + def clear_error_queue(self) -> None: self.error_queue = [] - def add_mock_errors(self): + def add_mock_errors(self) -> None: self.add_error(2) self.add_error(5) self.add_error(10) - def insert_mock_readings(self, readings, mode): + def insert_mock_readings(self, readings: list[float], mode: str) -> None: [self.mode_dict[mode.lower()].values.put(reading) for reading in readings] diff --git a/system_tests/lewis_emulators/Khly6517/interfaces/stream_interface.py b/system_tests/lewis_emulators/Khly6517/interfaces/stream_interface.py index a53df3f..1df5237 100644 --- a/system_tests/lewis_emulators/Khly6517/interfaces/stream_interface.py +++ b/system_tests/lewis_emulators/Khly6517/interfaces/stream_interface.py @@ -1,7 +1,13 @@ +import logging +import typing + from lewis.adapters.stream import StreamInterface from lewis.core.logging import has_log from lewis.utils.command_builder import CmdBuilder +if typing.TYPE_CHECKING: + from ..device import SimulatedKhly6517 + @has_log class Khly6517StreamInterface(StreamInterface): @@ -13,7 +19,8 @@ class Khly6517StreamInterface(StreamInterface): CmdBuilder("set_func").escape("FUNC '").arg("CURR|VOLT").escape(":DC'").eos().build(), CmdBuilder("set_rang").arg("CURR|VOLT").escape(":DC:RANG ").float().eos().build(), CmdBuilder("set_rang").arg("curr|volt").escape(":dc:rang ").float().eos().build(), - # According to the manual, SYST:ERR? and STAT:QUE? perform the same function (14-124 in manual) + # According to the manual, SYST:ERR? and STAT:QUE? + # perform the same function (14-124 in manual) CmdBuilder("get_err") .escape(":SYST") .optional("em") @@ -39,21 +46,24 @@ class Khly6517StreamInterface(StreamInterface): in_terminator = "\r\n" out_terminator = "\r\n" - def handle_error(self, request, error): + _device: "SimulatedKhly6517" + log: logging.Logger + + def handle_error(self, request: str, error: Exception) -> None: self.log.error("An error occurred at request " + repr(request) + ": " + repr(error)) - def catch_all(self, command): + def catch_all(self, command: str) -> None: pass - def get_func(self): + def get_func(self) -> str: return "{}".format(self._device.selected_data.name.upper()) - def get_read(self): + def get_read(self) -> str: self._device.abort() self._device.initiate() return "{}".format(self._device.fetch()) - def get_rang(self, mode): + def get_rang(self, mode: str) -> float: try: return self._device.mode_dict[mode.lower()].range except KeyError: @@ -61,34 +71,34 @@ def get_rang(self, mode): self._device.add_error(-285) return 0 - def set_rang(self, mode, rang): + def set_rang(self, mode: str, rang: float) -> None: try: self._device.mode_dict[mode.lower()].range = rang except KeyError: # Program syntax error self._device.add_error(-285) - def clear_error_queue(self): + def clear_error_queue(self) -> None: self._device.clear_error_queue() - def set_func(self, mode): + def set_func(self, mode: str) -> None: try: self._device.selected_data = self._device.mode_dict[mode.lower()] except KeyError: # Program syntax error self._device.add_error(-285) - def get_err(self): + def get_err(self) -> int: return self._device.get_error() - def get_curr_autorange(self): + def get_curr_autorange(self) -> str: return "ON" if self._device.curr_autorange else "OFF" - def set_curr_autorange(self, curr_autorange): + def set_curr_autorange(self, curr_autorange: str) -> None: self._device.curr_autorange = curr_autorange in ("ON", 1) - def get_zero_check(self): + def get_zero_check(self) -> str: return "ON" if self._device.zero_check else "OFF" - def set_zero_check(self, zero_check): + def set_zero_check(self, zero_check: str) -> None: self._device.zero_check = zero_check in ("ON", 1)