Skip to content

Commit cfd33fc

Browse files
committed
python: Add interactive terminal
Signed-off-by: Nick Brook <nrbrook@gmail.com>
1 parent 55547bd commit cfd33fc

1 file changed

Lines changed: 251 additions & 0 deletions

File tree

host/min_terminal.py

Lines changed: 251 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,251 @@
1+
"""
2+
Interactive terminal program for sending and receiving MIN frames.
3+
Supports both hex and string input modes.
4+
"""
5+
import argparse
6+
from struct import unpack
7+
from time import sleep
8+
import threading
9+
import logging
10+
import serial.tools.list_ports
11+
12+
from min import ThreadsafeTransportMINSerialHandler
13+
14+
# Set up logger for this module
15+
logger = logging.getLogger(__name__)
16+
17+
18+
def bytes_to_int32(data: bytes, big_endian=True) -> int:
19+
"""Convert 4 bytes to a 32-bit integer."""
20+
if len(data) != 4:
21+
raise ValueError("int32 should be exactly 4 bytes")
22+
if big_endian:
23+
return unpack('>I', data)[0]
24+
else:
25+
return unpack('<I', data)[0]
26+
27+
28+
def parse_hex_input(hex_str: str) -> bytes:
29+
"""Convert a hex string to bytes, handling spaces and 0x prefixes."""
30+
# Remove spaces and 0x prefixes
31+
hex_str = hex_str.replace(' ', '').replace('0x', '')
32+
return bytes.fromhex(hex_str)
33+
34+
35+
def log_and_print(message, level=logging.INFO):
36+
"""Print message to console and log it."""
37+
print(message)
38+
logger.log(level, message)
39+
40+
41+
def receive_frames_thread(
42+
min_handler, hex_mode: bool, stop_event: threading.Event,
43+
callback=None, print_prompt=True
44+
):
45+
"""Thread function to continuously poll for and display received frames."""
46+
while not stop_event.is_set():
47+
frames = min_handler.poll()
48+
for frame in frames:
49+
# Run callback if provided before standard handling
50+
if callback and callback(frame):
51+
# Skip standard handling if callback returns True
52+
continue
53+
54+
if hex_mode:
55+
data = frame.payload.hex()
56+
else:
57+
try:
58+
data = frame.payload.decode('ascii')
59+
except UnicodeDecodeError:
60+
data = frame.payload.hex()
61+
msg = "Frame received: min ID={0} {1}".format(frame.min_id, data)
62+
log_and_print(msg)
63+
64+
# Reprint the input prompt
65+
if print_prompt:
66+
if hex_mode:
67+
print("Enter hex payload: ", end='', flush=True)
68+
else:
69+
print("Enter string payload: ", end='', flush=True)
70+
71+
sleep(0.05) # Small delay to prevent CPU hogging
72+
73+
74+
def parse_log_level(level_name):
75+
"""Convert a log level name to the corresponding logging level."""
76+
levels = {
77+
'debug': logging.DEBUG,
78+
'info': logging.INFO,
79+
'warning': logging.WARNING,
80+
'error': logging.ERROR,
81+
'critical': logging.CRITICAL
82+
}
83+
level_name = level_name.lower()
84+
if level_name not in levels:
85+
raise ValueError(f"Invalid log level: {level_name}")
86+
return levels[level_name]
87+
88+
89+
def setup_min_handler(port, baudrate, loglevel=logging.ERROR):
90+
"""Set up and return a MIN handler with the given parameters."""
91+
# Set up logging configuration
92+
log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
93+
logging.basicConfig(level=loglevel, format=log_format)
94+
95+
min_handler = ThreadsafeTransportMINSerialHandler(
96+
port=port,
97+
baudrate=baudrate,
98+
loglevel=loglevel
99+
)
100+
return min_handler
101+
102+
103+
def start_min_terminal(min_handler, hex_mode=False, min_id=0x00, frame_callback=None):
104+
"""Start an interactive MIN terminal."""
105+
log_and_print(f"Mode: {'Hex' if hex_mode else 'String'}")
106+
log_and_print(f"Using MIN ID: 0x{min_id:02X}")
107+
log_and_print("Press Ctrl+C to exit")
108+
if hex_mode:
109+
log_and_print("Input format: '0x01 02 03' or '01 02 03'")
110+
111+
# Create a stop event and a thread for receiving frames
112+
stop_event = threading.Event()
113+
receive_thread = threading.Thread(
114+
target=receive_frames_thread,
115+
args=(min_handler, hex_mode, stop_event, frame_callback),
116+
daemon=True
117+
)
118+
receive_thread.start()
119+
120+
try:
121+
while True:
122+
# Get input from user
123+
if hex_mode:
124+
user_input = input("Enter hex payload: ")
125+
try:
126+
payload = parse_hex_input(user_input)
127+
except ValueError as e:
128+
log_and_print(f"Invalid hex input: {e}", logging.ERROR)
129+
continue
130+
else:
131+
user_input = input("Enter string payload: ")
132+
payload = user_input.encode('ascii')
133+
134+
# Send the frame using the specified MIN ID
135+
min_handler.queue_frame(min_id=min_id, payload=payload)
136+
137+
except KeyboardInterrupt:
138+
log_and_print("\nTerminating...")
139+
stop_event.set() # Signal the thread to stop
140+
return stop_event
141+
except Exception as e:
142+
log_and_print(f"Error: {e}", logging.ERROR)
143+
return stop_event
144+
145+
146+
def list_available_ports():
147+
"""List all available serial ports and return the selected port."""
148+
ports = list(serial.tools.list_ports.comports())
149+
if not ports:
150+
print("No serial ports found")
151+
return None
152+
153+
print("\nAvailable serial ports:")
154+
for i, port in enumerate(ports, 1):
155+
print(f"{i}. {port.device} - {port.description}")
156+
157+
while True:
158+
try:
159+
choice = input("\nSelect port number (or 'q' to quit): ")
160+
if choice.lower() == 'q':
161+
return None
162+
choice = int(choice)
163+
if 1 <= choice <= len(ports):
164+
return ports[choice - 1].device
165+
print("Invalid selection. Please try again.")
166+
except ValueError:
167+
print("Please enter a valid number or 'q' to quit.")
168+
169+
170+
def parse_args():
171+
"""Parse command line arguments for MIN terminal functionality."""
172+
parser = argparse.ArgumentParser(description='Interactive MIN terminal')
173+
parser.add_argument(
174+
'port',
175+
nargs='?',
176+
help='Serial port (e.g., /dev/tty.usbmodem1421)'
177+
)
178+
parser.add_argument(
179+
'--hex',
180+
action='store_true',
181+
help='Use hex input mode'
182+
)
183+
parser.add_argument(
184+
'--min-id',
185+
type=lambda x: int(x, 0), # Allows for hex (0x01) or decimal input
186+
default=0x00,
187+
help='MIN ID to use when sending frames (default: 0x01)'
188+
)
189+
parser.add_argument(
190+
'--baudrate',
191+
type=int,
192+
default=9600,
193+
help='Baudrate for serial communication (default: 9600)'
194+
)
195+
parser.add_argument(
196+
'--log-level',
197+
type=parse_log_level,
198+
default=logging.ERROR,
199+
help='Set logging level: debug, info, warning, error, critical '
200+
'(default: error)'
201+
)
202+
args = parser.parse_args()
203+
204+
# If no port specified, list available ports and let user choose
205+
if not args.port:
206+
args.port = list_available_ports()
207+
if not args.port:
208+
parser.error("No port selected")
209+
210+
# Validate MIN ID range (0-63 as per the spec)
211+
if args.min_id not in range(64):
212+
parser.error("MIN ID must be in range 0-63")
213+
214+
return args
215+
216+
217+
def main():
218+
"""Run the MIN terminal."""
219+
args = parse_args()
220+
221+
# Set up and connect MIN handler
222+
min_handler = setup_min_handler(
223+
port=args.port,
224+
baudrate=args.baudrate,
225+
loglevel=args.log_level
226+
)
227+
228+
log_and_print(f"Connected to {args.port} at {args.baudrate} baud")
229+
# Use a dictionary to map logging levels to their names
230+
level_names = {
231+
logging.DEBUG: 'DEBUG',
232+
logging.INFO: 'INFO',
233+
logging.WARNING: 'WARNING',
234+
logging.ERROR: 'ERROR',
235+
logging.CRITICAL: 'CRITICAL'
236+
}
237+
log_and_print(f"Log level: {level_names.get(args.log_level, 'UNKNOWN')}")
238+
239+
# Start the interactive terminal
240+
stop_event = start_min_terminal(
241+
min_handler=min_handler,
242+
hex_mode=args.hex,
243+
min_id=args.min_id
244+
)
245+
246+
# Cleanup if the terminal exits
247+
stop_event.set()
248+
249+
250+
if __name__ == "__main__":
251+
main()

0 commit comments

Comments
 (0)