Skip to content

Commit c1b0089

Browse files
committed
python: Add interactive terminal
Signed-off-by: Nick Brook <nrbrook@gmail.com>
1 parent 55547bd commit c1b0089

1 file changed

Lines changed: 219 additions & 0 deletions

File tree

host/min_terminal.py

Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
"""
2+
Interactive terminal program for sending and receiving MIN frames.
3+
Supports both hex and string input modes.
4+
"""
5+
import argparse
6+
from struct import unpack
7+
from time import sleep
8+
import threading
9+
import logging
10+
11+
from min import ThreadsafeTransportMINSerialHandler
12+
13+
# Set up logger for this module
14+
logger = logging.getLogger(__name__)
15+
16+
17+
def bytes_to_int32(data: bytes, big_endian=True) -> int:
18+
"""Convert 4 bytes to a 32-bit integer."""
19+
if len(data) != 4:
20+
raise ValueError("int32 should be exactly 4 bytes")
21+
if big_endian:
22+
return unpack('>I', data)[0]
23+
else:
24+
return unpack('<I', data)[0]
25+
26+
27+
def parse_hex_input(hex_str: str) -> bytes:
28+
"""Convert a hex string to bytes, handling spaces and 0x prefixes."""
29+
# Remove spaces and 0x prefixes
30+
hex_str = hex_str.replace(' ', '').replace('0x', '')
31+
return bytes.fromhex(hex_str)
32+
33+
34+
def log_and_print(message, level=logging.INFO):
35+
"""Print message to console and log it."""
36+
print(message)
37+
logger.log(level, message)
38+
39+
40+
def receive_frames_thread(
41+
min_handler, hex_mode: bool, stop_event: threading.Event,
42+
callback=None, print_prompt=True
43+
):
44+
"""Thread function to continuously poll for and display received frames."""
45+
while not stop_event.is_set():
46+
frames = min_handler.poll()
47+
for frame in frames:
48+
# Run callback if provided before standard handling
49+
if callback and callback(frame):
50+
# Skip standard handling if callback returns True
51+
continue
52+
53+
if hex_mode:
54+
data = frame.payload.hex()
55+
else:
56+
try:
57+
data = frame.payload.decode('ascii')
58+
except UnicodeDecodeError:
59+
data = frame.payload.hex()
60+
msg = "Frame received: min ID={0} {1}".format(frame.min_id, data)
61+
log_and_print(msg)
62+
63+
# Reprint the input prompt
64+
if print_prompt:
65+
if hex_mode:
66+
print("Enter hex payload: ", end='', flush=True)
67+
else:
68+
print("Enter string payload: ", end='', flush=True)
69+
70+
sleep(0.05) # Small delay to prevent CPU hogging
71+
72+
73+
def parse_log_level(level_name):
74+
"""Convert a log level name to the corresponding logging level."""
75+
levels = {
76+
'debug': logging.DEBUG,
77+
'info': logging.INFO,
78+
'warning': logging.WARNING,
79+
'error': logging.ERROR,
80+
'critical': logging.CRITICAL
81+
}
82+
level_name = level_name.lower()
83+
if level_name not in levels:
84+
raise ValueError(f"Invalid log level: {level_name}")
85+
return levels[level_name]
86+
87+
88+
def setup_min_handler(port, baudrate, loglevel=logging.ERROR):
89+
"""Set up and return a MIN handler with the given parameters."""
90+
# Set up logging configuration
91+
log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
92+
logging.basicConfig(level=loglevel, format=log_format)
93+
94+
min_handler = ThreadsafeTransportMINSerialHandler(
95+
port=port,
96+
baudrate=baudrate,
97+
loglevel=loglevel
98+
)
99+
return min_handler
100+
101+
102+
def start_min_terminal(min_handler, hex_mode=False, min_id=0x00, frame_callback=None):
103+
"""Start an interactive MIN terminal."""
104+
log_and_print(f"Mode: {'Hex' if hex_mode else 'String'}")
105+
log_and_print(f"Using MIN ID: 0x{min_id:02X}")
106+
log_and_print("Press Ctrl+C to exit")
107+
if hex_mode:
108+
log_and_print("Input format: '0x01 02 03' or '01 02 03'")
109+
110+
# Create a stop event and a thread for receiving frames
111+
stop_event = threading.Event()
112+
receive_thread = threading.Thread(
113+
target=receive_frames_thread,
114+
args=(min_handler, hex_mode, stop_event, frame_callback),
115+
daemon=True
116+
)
117+
receive_thread.start()
118+
119+
try:
120+
while True:
121+
# Get input from user
122+
if hex_mode:
123+
user_input = input("Enter hex payload: ")
124+
try:
125+
payload = parse_hex_input(user_input)
126+
except ValueError as e:
127+
log_and_print(f"Invalid hex input: {e}", logging.ERROR)
128+
continue
129+
else:
130+
user_input = input("Enter string payload: ")
131+
payload = user_input.encode('ascii')
132+
133+
# Send the frame using the specified MIN ID
134+
min_handler.queue_frame(min_id=min_id, payload=payload)
135+
136+
except KeyboardInterrupt:
137+
log_and_print("\nTerminating...")
138+
stop_event.set() # Signal the thread to stop
139+
return stop_event
140+
except Exception as e:
141+
log_and_print(f"Error: {e}", logging.ERROR)
142+
return stop_event
143+
144+
145+
def parse_args():
146+
"""Parse command line arguments for MIN terminal functionality."""
147+
parser = argparse.ArgumentParser(description='Interactive MIN terminal')
148+
parser.add_argument(
149+
'port',
150+
help='Serial port (e.g., /dev/tty.usbmodem1421)'
151+
)
152+
parser.add_argument(
153+
'--hex',
154+
action='store_true',
155+
help='Use hex input mode'
156+
)
157+
parser.add_argument(
158+
'--min-id',
159+
type=lambda x: int(x, 0), # Allows for hex (0x01) or decimal input
160+
default=0x00,
161+
help='MIN ID to use when sending frames (default: 0x01)'
162+
)
163+
parser.add_argument(
164+
'--baudrate',
165+
type=int,
166+
default=9600,
167+
help='Baudrate for serial communication (default: 9600)'
168+
)
169+
parser.add_argument(
170+
'--log-level',
171+
type=parse_log_level,
172+
default=logging.ERROR,
173+
help='Set logging level: debug, info, warning, error, critical '
174+
'(default: error)'
175+
)
176+
args = parser.parse_args()
177+
178+
# Validate MIN ID range (0-63 as per the spec)
179+
if args.min_id not in range(64):
180+
parser.error("MIN ID must be in range 0-63")
181+
182+
return args
183+
184+
185+
def main():
186+
"""Run the MIN terminal."""
187+
args = parse_args()
188+
189+
# Set up and connect MIN handler
190+
min_handler = setup_min_handler(
191+
port=args.port,
192+
baudrate=args.baudrate,
193+
loglevel=args.log_level
194+
)
195+
196+
log_and_print(f"Connected to {args.port} at {args.baudrate} baud")
197+
# Use a dictionary to map logging levels to their names
198+
level_names = {
199+
logging.DEBUG: 'DEBUG',
200+
logging.INFO: 'INFO',
201+
logging.WARNING: 'WARNING',
202+
logging.ERROR: 'ERROR',
203+
logging.CRITICAL: 'CRITICAL'
204+
}
205+
log_and_print(f"Log level: {level_names.get(args.log_level, 'UNKNOWN')}")
206+
207+
# Start the interactive terminal
208+
stop_event = start_min_terminal(
209+
min_handler=min_handler,
210+
hex_mode=args.hex,
211+
min_id=args.min_id
212+
)
213+
214+
# Cleanup if the terminal exits
215+
stop_event.set()
216+
217+
218+
if __name__ == "__main__":
219+
main()

0 commit comments

Comments
 (0)