-
Notifications
You must be signed in to change notification settings - Fork 351
Expand file tree
/
Copy pathami430.py
More file actions
197 lines (159 loc) · 6.87 KB
/
ami430.py
File metadata and controls
197 lines (159 loc) · 6.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
import re
import time
from datetime import datetime
from typing import ClassVar
class MockAMI430:
states: ClassVar[dict[str, str]] = {
"RAMPING to target field/current": "1",
"HOLDING at the target field/current": "2",
"PAUSED": "3",
"Ramping in MANUAL UP mode": "4",
"Ramping in MANUAL DOWN mode": "5",
"ZEROING CURRENT (in progress)": "6",
"Quench detected": "7",
"At ZERO current": "8",
"Heating persistent switch": "9",
"Cooling persistent switch": "10",
}
field_units: ClassVar[dict[str, str]] = {"tesla": "1", "kilogauss": "0"}
ramp_rate_units: ClassVar[dict[str, str]] = {"A/s": "0", "A/min": "1"}
quench_state: ClassVar[dict[bool, str]] = {False: "0", True: "1"}
def __init__(self, name):
self.name = name
self.log_messages = []
self._field_mag = 0
self._field_target = 0
self._state = MockAMI430.states["HOLDING at the target field/current"]
self.handlers = {
"RAMP:RATE:UNITS": {"get": MockAMI430.ramp_rate_units["A/s"], "set": None},
"FIELD:UNITS": {"get": MockAMI430.field_units["tesla"], "set": None},
"*IDN": {"get": "v0.1 Mock", "set": None},
"STATE": {"get": self._getter("_state"), "set": self._setter("_state")},
"FIELD:MAG": {"get": self._getter("_field_mag"), "set": None},
"QU": {
"get": MockAMI430.quench_state[
False
], # We are never in a quenching state so always return the
# same value
"set": None,
},
"PERS": {"get": "0", "set": None},
"PAUSE": {"get": self._is_paused, "set": self._do_pause},
"CONF:FIELD:TARG": {
"get": None, # To get the field target, send a message "FIELD:TARG?"
"set": self._setter("_field_target"),
},
"FIELD:TARG": {"get": self._getter("_field_target"), "set": None},
"PS": {
"get": "0", # The heater is off
"set": None,
},
"RAMP": {"set": self._do_ramp, "get": None},
"RAMP:RATE:CURRENT": {"get": "0.1000,50.0000", "set": None},
"COIL": {"get": "1", "set": None},
}
@staticmethod
def message_parser(gs, msg_str, key):
"""
* If gs = "get":
Let suppose key = "RAMP:RATE:UNITS", then if we get msg_str =
"RAMP:RATE:UNITS?" then match will be True and args = None. If
msg_str = "RAMP:RATE:UNITS:10?" then match = True and args =
"10". On the other hand if key = "RAMP" then both
"RAMP:RATE:UNITS?" and "RAMP:RATE:UNITS:10?" will cause match
to be False
* If gs = "set"
If key = "STATE" and msg_str = "STATE 2,1" then match = True
and args = "2,1". If key="STATE" and msg_str = STATE:ELSE 2,1
then match is False.
Consult [1] for a complete description of the AMI430 protocol.
[1]
http://www.americanmagnetics.com/support/manuals/mn-4Q06125PS-430.pdf
Args: gs (string): "get", or "set" msg_str (string): the
message string the mock instrument gets. key (string):
one of the keys in self.handlers
Returns: match (bool): if the key and the msg_str match, then
match = True args (string): if any arguments are present
in the message string these will be passed along. This is
always None when match = False
"""
# If the message string matches a key exactly we have a match
# with no arguments
if msg_str == key:
return True, None
# We use regular expressions to find out if the message string
# and the key match. We need to replace reserved regular
# expression characters in the key. For instance replace
# "*IDN" with "\*IDN".
reserved_re_characters = r"\^${}[]().*+?|<>-&"
for c in reserved_re_characters:
key = key.replace(c, rf"\{c}")
# Get and set messages use different regular expression
s = {"get": r"(:[^:]*)?\?$", "set": "([^:]+)"}[gs]
# patterns to determine a match
search_string = "^" + key + s
r = re.search(search_string, msg_str)
match = r is not None
args = None
if match:
args = r.groups()[0]
if args is not None:
args = args.strip(":")
return match, args
def _getter(self, attribute):
return lambda _: getattr(self, attribute)
def _setter(self, attribute):
return lambda value: setattr(self, attribute, value)
def _log(self, msg):
now = datetime.now()
log_line = "[{}] {}: {}".format(
now.strftime("%d:%m:%Y-%H:%M:%S.%f"), self.name, msg
)
self.log_messages.append(log_line)
def _handle_messages(self, msg):
"""
Args:
msg (string): a message received through the socket
communication layer
Returns:
rval (string or None): If the type of message requests a
value (a get message) then this value is returned by this
function. A set message will return a None value.
"""
# A "get" message ends with a "?" and will invoke the get
# part of the handler defined in self.handlers.
gs = {True: "get", False: "set"}[msg.endswith("?")]
rval = None
handler = None
# Find which handler is suitable to handle the message
for key in self.handlers:
match, args = MockAMI430.message_parser(gs, msg, key)
if not match:
continue
handler = self.handlers[key][gs]
if callable(handler):
# some of the callables in the dict does not take arguments.
# ignore that warning for now since this is mock code only
rval = handler(args) # pyright: ignore[reportCallIssue] # ty: ignore[ too-many-positional-arguments]
else:
rval = handler
break
if handler is None:
self._log(f"Command {msg} unknown")
return rval
def _do_pause(self, _):
self._state = MockAMI430.states["PAUSED"]
def _is_paused(self):
return self._state == MockAMI430.states["PAUSED"]
def _do_ramp(self, _):
self._log(f"Ramping to {self._field_target}")
self._state = MockAMI430.states["RAMPING to target field/current"]
time.sleep(0.1) # Lets pretend to be ramping for a bit
self._field_mag = self._field_target
self._state = MockAMI430.states["HOLDING at the target field/current"]
def get_log_messages(self):
return self.log_messages
def ask(self, msg):
return self._handle_messages(msg)
def write(self, msg):
self._handle_messages(msg)