Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions khly6517Sup/khly6517.db
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,40 @@ record(bo, "$(P)DISABLE")
field(ONAM, "COMMS DISABLED")
}

record(bi, "$(P)ZEROCHECK") {
field(DESC, "Zero-check status")
field(DTYP, "stream")
field(INP, "@khly6517.proto getZeroCheck $(PORT)")
field(SCAN, "2 second")
field(ZNAM, "OFF")
field(ONAM, "ON")
}

record(bo, "$(P)ZEROCHECK:SP") {
field(DESC, "Zero-check set")
field(DTYP, "stream")
field(OUT, "@khly6517.proto setZeroCheck $(PORT)")
field(ZNAM, "OFF")
field(ONAM, "ON")
}

record(bi, "$(P)CURR:AUTORANGE") {
field(DESC, "Current autorange")
field(DTYP, "stream")
field(INP, "@khly6517.proto getCurrAutorange $(PORT)")
field(SCAN, "2 second")
field(ZNAM, "OFF")
field(ONAM, "ON")
}

record(bo, "$(P)CURR:AUTORANGE:SP") {
field(DESC, "Current autorange set")
field(DTYP, "stream")
field(OUT, "@khly6517.proto setCurrAutorange $(PORT)")
field(ZNAM, "OFF")
field(ONAM, "ON")
}

record(stringin, "$(P)FUNC")
{
field(SCAN, "Passive")
Expand Down
18 changes: 18 additions & 0 deletions khly6517Sup/khly6517.proto
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,21 @@ setVoltage {
setCurrent {
out "FUNC CURR"
}

getZeroCheck {
out ":SYST:ZCH?";
in "%#{OFF=0|0=0|ON=1|1=1}";
}

setZeroCheck {
out ":SYST:ZCH %{OFF|ON}";
}

getCurrAutorange {
out ":SENS:CURR:RANG:AUTO?";
in "%#{OFF=0|0=0|ON=1|1=1}";
}

setCurrAutorange {
out ":SENS:CURR:RANG:AUTO %{OFF|ON}";
}
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
lewis
37 changes: 21 additions & 16 deletions system_tests/lewis_emulators/Khly6517/device.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
import queue
import random
from collections import OrderedDict
from typing import Callable

from lewis.core.statemachine import State
from lewis.devices import StateMachineDevice

from .states import DefaultState


class Measurement:
def __init__(self, name):
def __init__(self, name: str) -> None:
self.name = name
self.range = 0
self.values = queue.Queue()


class SimulatedKhly6517(StateMachineDevice):
def _initialize_data(self):
def _initialize_data(self) -> None:
self.latest_reading = 0
self.idle = True

Expand All @@ -26,72 +28,75 @@ def _initialize_data(self):

self.error_queue = []

self.curr_autorange = False
self.zero_check = False

# Mock data
self.random_mode = True

def _get_state_handlers(self):
def _get_state_handlers(self) -> dict[str, State]:
return {
"default": DefaultState(),
}

def _get_initial_state(self):
def _get_initial_state(self) -> str:
return "default"

def _get_transition_handlers(self):
def _get_transition_handlers(self) -> dict[tuple[str, str], Callable[[], bool]]:
return OrderedDict([])

# This command does not trigger a new reading, only return last reading
def fetch(self):
def fetch(self) -> float:
return self.latest_reading

def abort(self):
def abort(self) -> None:
self.idle = True
self.latest_reading = 0

def initiate(self):
def initiate(self) -> None:
self.idle = False
self.measurement()

def get_selected_ranged_value(self):
def get_selected_ranged_value(self) -> float:
value = self.selected_data.values.get()
if value > self.selected_data.range:
# Parameter data out of range
self.add_error(-222)
return max(min(value, self.selected_data.range), 0)

def add_random_reading(self):
def add_random_reading(self) -> None:
self.selected_data.values.put(random.random() * self.selected_data.range)

def measurement(self):
def measurement(self) -> None:
if self.random_mode:
self.add_random_reading()
if not self.selected_data.values.empty():
self.latest_reading = self.get_selected_ranged_value()
else:
self.latest_reading = 0

def add_error(self, error):
def add_error(self, error: int) -> None:
if len(self.error_queue) < 9:
self.error_queue.append(error)
elif len(self.error_queue) == 9:
# Queue overflow
self.error_queue.append(-350)

def get_error(self):
def get_error(self) -> int:
if len(self.error_queue) > 0:
error = self.error_queue[0]
self.error_queue = self.error_queue[1:]
return error
else:
return 0

def clear_error_queue(self):
def clear_error_queue(self) -> None:
self.error_queue = []

def add_mock_errors(self):
def add_mock_errors(self) -> None:
self.add_error(2)
self.add_error(5)
self.add_error(10)

def insert_mock_readings(self, readings, mode):
def insert_mock_readings(self, readings: list[float], mode: str) -> None:
[self.mode_dict[mode.lower()].values.put(reading) for reading in readings]
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import logging
import typing

from lewis.adapters.stream import StreamInterface
from lewis.core.logging import has_log
from lewis.utils.command_builder import CmdBuilder

if typing.TYPE_CHECKING:
from ..device import SimulatedKhly6517


@has_log
class Khly6517StreamInterface(StreamInterface):
Expand All @@ -13,7 +19,8 @@ class Khly6517StreamInterface(StreamInterface):
CmdBuilder("set_func").escape("FUNC '").arg("CURR|VOLT").escape(":DC'").eos().build(),
CmdBuilder("set_rang").arg("CURR|VOLT").escape(":DC:RANG ").float().eos().build(),
CmdBuilder("set_rang").arg("curr|volt").escape(":dc:rang ").float().eos().build(),
# According to the manual, SYST:ERR? and STAT:QUE? perform the same function (14-124 in manual)
# According to the manual, SYST:ERR? and STAT:QUE?
# perform the same function (14-124 in manual)
CmdBuilder("get_err")
.escape(":SYST")
.optional("em")
Expand All @@ -30,49 +37,68 @@ class Khly6517StreamInterface(StreamInterface):
.escape("?")
.eos()
.build(),
CmdBuilder("get_zero_check").escape(":SYST:ZCH?").eos().build(),
CmdBuilder("set_zero_check").escape(":SYST:ZCH ").any().eos().build(),
CmdBuilder("get_curr_autorange").escape(":SENS:CURR:RANG:AUTO?").eos().build(),
CmdBuilder("set_curr_autorange").escape(":SENS:CURR:RANG:AUTO ").any().eos().build(),
}

in_terminator = "\r\n"
out_terminator = "\r\n"

def handle_error(self, request, error):
_device: "SimulatedKhly6517"
log: logging.Logger

def handle_error(self, request: str, error: Exception) -> None:
self.log.error("An error occurred at request " + repr(request) + ": " + repr(error))

def catch_all(self, command):
def catch_all(self, command: str) -> None:
pass

def get_func(self):
def get_func(self) -> str:
return "{}".format(self._device.selected_data.name.upper())

def get_read(self):
def get_read(self) -> str:
self._device.abort()
self._device.initiate()
return "{}".format(self._device.fetch())

def get_rang(self, mode):
def get_rang(self, mode: str) -> float:
try:
return self._device.mode_dict[mode.lower()].range
except KeyError:
# Program syntax error
self._device.add_error(-285)
return 0

def set_rang(self, mode, rang):
def set_rang(self, mode: str, rang: float) -> None:
try:
self._device.mode_dict[mode.lower()].range = rang
except KeyError:
# Program syntax error
self._device.add_error(-285)

def clear_error_queue(self):
def clear_error_queue(self) -> None:
self._device.clear_error_queue()

def set_func(self, mode):
def set_func(self, mode: str) -> None:
try:
self._device.selected_data = self._device.mode_dict[mode.lower()]
except KeyError:
# Program syntax error
self._device.add_error(-285)

def get_err(self):
def get_err(self) -> int:
return self._device.get_error()

def get_curr_autorange(self) -> str:
return "ON" if self._device.curr_autorange else "OFF"

def set_curr_autorange(self, curr_autorange: str) -> None:
self._device.curr_autorange = curr_autorange in ("ON", 1)

def get_zero_check(self) -> str:
return "ON" if self._device.zero_check else "OFF"

def set_zero_check(self, zero_check: str) -> None:
self._device.zero_check = zero_check in ("ON", 1)
8 changes: 8 additions & 0 deletions system_tests/tests/khly6517.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,11 @@ def test_WHEN_continuous_polling_set_THEN_read_at_intervals(self):

self.button_and_check_reading("BTN:CURR", False)
self.button_and_check_reading("BTN:VOLT", False)

def test_curr_autorange(self):
self.ca.assert_setting_setpoint_sets_readback("ON", "CURR:AUTORANGE")
self.ca.assert_setting_setpoint_sets_readback("OFF", "CURR:AUTORANGE")

def test_zero_check(self):
self.ca.assert_setting_setpoint_sets_readback("ON", "ZEROCHECK")
self.ca.assert_setting_setpoint_sets_readback("OFF", "ZEROCHECK")
Loading