11#!/usr/bin/env python3
22
33import argparse
4+ import csv
45import json
56import os
7+ from collections import Counter
68from pathlib import Path
79import shutil
10+ import time
11+ from zipfile import ZIP_DEFLATED , ZipFile
812
913from saleae import automation
1014
@@ -30,16 +34,227 @@ def configure_custom_analyzers_path(analyzers_dir: Path) -> Path:
3034 return config_path
3135
3236
37+ def add_flexray_analyzer (capture : automation .Capture , args : argparse .Namespace ) -> automation .AnalyzerHandle :
38+ return capture .add_analyzer (
39+ "FlexRay" ,
40+ settings = {
41+ "Input Channel" : args .channel ,
42+ "Bit Rate (Bits/s)" : args .bitrate ,
43+ "CRC Channel" : 0 if args .crc_channel .upper () == "A" else 1 ,
44+ "Sample Point (%)" : args .sample_point ,
45+ "Invert Input" : args .invert ,
46+ },
47+ )
48+
49+
50+ def close_capture_if_present (capture : automation .Capture | None ) -> None :
51+ if capture is None :
52+ return
53+
54+ try :
55+ capture .close ()
56+ except automation .errors .InvalidRequestError as exc :
57+ if 'does not exist' not in str (exc ):
58+ raise
59+
60+
61+ def create_capture_without_saved_analyzers (source_path : Path , output_path : Path ) -> Path :
62+ with ZipFile (source_path , 'r' ) as source_zip :
63+ meta = json .loads (source_zip .read ('meta.json' ))
64+ meta ['data' ]['analyzers' ] = []
65+ meta ['data' ]['highLevelAnalyzers' ] = []
66+
67+ with ZipFile (output_path , 'w' , compression = ZIP_DEFLATED ) as output_zip :
68+ for info in source_zip .infolist ():
69+ if info .filename == 'meta.json' :
70+ continue
71+
72+ output_zip .writestr (info .filename , source_zip .read (info .filename ))
73+
74+ output_zip .writestr ('meta.json' , json .dumps (meta , separators = (',' , ':' )).encode ('utf-8' ))
75+
76+ return output_path
77+
78+
79+ def export_capture_data (
80+ capture : automation .Capture , analyzer : automation .AnalyzerHandle , output_dir : Path , channel : int
81+ ) -> tuple [Path , Path , Path ]:
82+ legacy_csv = output_dir / "flexray_export.csv"
83+ table_csv = output_dir / "flexray_table.csv"
84+ raw_dir = output_dir / "raw"
85+ raw_dir .mkdir (parents = True , exist_ok = True )
86+
87+ capture .legacy_export_analyzer (str (legacy_csv ), analyzer , automation .RadixType .HEXADECIMAL )
88+ capture .export_data_table (
89+ str (table_csv ),
90+ [automation .DataTableExportConfiguration (analyzer = analyzer , radix = automation .RadixType .HEXADECIMAL )],
91+ )
92+ capture .export_raw_data_csv (str (raw_dir ), digital_channels = [channel ])
93+ return legacy_csv , table_csv , raw_dir
94+
95+
96+ def summarize_legacy_export (export_path : Path ) -> dict [str , object ]:
97+ frame_ids : Counter [str ] = Counter ()
98+ summary = {
99+ "rows" : 0 ,
100+ "frames" : 0 ,
101+ "errors" : 0 ,
102+ "symbols" : 0 ,
103+ "frame_ids" : frame_ids ,
104+ }
105+
106+ with export_path .open (newline = "" ) as handle :
107+ reader = csv .DictReader (handle )
108+ for row in reader :
109+ summary ["rows" ] += 1
110+ row_type = row ["Type" ]
111+
112+ if row_type == "frame" :
113+ summary ["frames" ] += 1
114+ frame_id = row ["Frame ID" ]
115+ if frame_id and frame_id != "-" :
116+ frame_ids [frame_id ] += 1
117+ elif row_type == "error" :
118+ summary ["errors" ] += 1
119+ elif row_type == "symbol" :
120+ summary ["symbols" ] += 1
121+
122+ return summary
123+
124+
125+ def find_simulation_device (manager : automation .Manager ) -> automation .DeviceDesc :
126+ devices = manager .get_devices (include_simulation_devices = True )
127+
128+ for device in devices :
129+ if device .is_simulation :
130+ return device
131+
132+ raise RuntimeError ("No Logic 2 simulation device was reported by the automation API." )
133+
134+
135+ def validate_demo_summary (summary : dict [str , object ]) -> None :
136+ frame_count = int (summary ["frames" ])
137+ frame_ids : Counter [str ] = summary ["frame_ids" ] # type: ignore[assignment]
138+
139+ if frame_count == 0 :
140+ raise RuntimeError ("Demo capture produced zero decoded FlexRay frames." )
141+
142+ if len (frame_ids ) < 2 :
143+ observed = ", " .join (frame_ids .keys ()) if frame_ids else "none"
144+ raise RuntimeError (f"Demo capture only decoded one synthetic frame type. Observed frame IDs: { observed } " )
145+
146+
147+ def run_demo_capture (
148+ manager : automation .Manager ,
149+ args : argparse .Namespace ,
150+ output_dir : Path ,
151+ manual_rerun_capture_path : Path ,
152+ ) -> tuple [automation .Capture , automation .AnalyzerHandle , str ]:
153+ device = find_simulation_device (manager )
154+ capture = manager .start_capture (
155+ device_id = device .device_id ,
156+ device_configuration = automation .LogicDeviceConfiguration (
157+ enabled_digital_channels = [args .channel ],
158+ digital_sample_rate = args .sample_rate ,
159+ ),
160+ capture_configuration = automation .CaptureConfiguration (
161+ buffer_size_megabytes = args .buffer_size ,
162+ capture_mode = automation .TimedCaptureMode (duration_seconds = args .duration ),
163+ ),
164+ )
165+ capture .wait ()
166+
167+ add_flexray_analyzer (capture , args )
168+
169+ if args .demo_pause_seconds > 0 :
170+ print (
171+ f"First-pass demo analyzer added at { args .bitrate } bit/s. "
172+ f"Sleeping { args .demo_pause_seconds :.1f} s before continuing."
173+ )
174+ print ("You can inspect Logic 2 now, and press Run manually during this pause if needed." )
175+ time .sleep (args .demo_pause_seconds )
176+
177+ if args .manual_rerun :
178+ prepared_capture_path = manual_rerun_capture_path
179+ print (f"Analyzer added to the demo capture at { args .bitrate } bit/s." )
180+ print (f"Rerun the capture in the Logic 2 GUI, save it to { prepared_capture_path } , then press Enter here." )
181+ input ()
182+
183+ if prepared_capture_path .exists () is False :
184+ raise FileNotFoundError (f"Manual rerun capture not found: { prepared_capture_path } " )
185+ else :
186+ prepared_capture_path = output_dir / "flexray_demo_auto.sal"
187+ try :
188+ capture .save_capture (str (prepared_capture_path ))
189+ except automation .errors .InvalidRequestError as exc :
190+ if 'does not exist' not in str (exc ):
191+ raise
192+
193+ prepared_capture_path = manual_rerun_capture_path
194+ print ("The demo capture was rerun in the GUI during the pause, so the original automation capture no longer exists." )
195+ print (f"Save the current Logic 2 capture to { prepared_capture_path } , then press Enter here." )
196+ input ()
197+
198+ if prepared_capture_path .exists () is False :
199+ raise FileNotFoundError (f"Fallback demo capture not found: { prepared_capture_path } " ) from exc
200+
201+ prepared_capture_for_api = create_capture_without_saved_analyzers (
202+ prepared_capture_path ,
203+ output_dir / f"{ prepared_capture_path .stem } _api{ prepared_capture_path .suffix } " ,
204+ )
205+
206+ close_capture_if_present (capture )
207+ capture = manager .load_capture (str (prepared_capture_for_api ))
208+ analyzer = add_flexray_analyzer (capture , args )
209+
210+ source_description = (
211+ f"Demo capture: { prepared_capture_path } \n "
212+ f"API capture: { prepared_capture_for_api } \n "
213+ f"Sample rate: { args .sample_rate } \n "
214+ f"Bit rate: { args .bitrate } "
215+ )
216+
217+ if args .manual_rerun is False :
218+ source_description += "\n Flow: automated two-pass demo setup"
219+
220+ return capture , analyzer , source_description
221+
222+
33223def main () -> int :
34224 root = Path (__file__ ).resolve ().parents [1 ]
35225
36- parser = argparse .ArgumentParser (description = "Validate the FlexRay analyzer against a Logic 2 capture." )
226+ parser = argparse .ArgumentParser (description = "Validate the FlexRay analyzer against a Logic 2 capture or demo simulation ." )
37227 parser .add_argument ("--logic2" , help = "Path to the Logic 2 binary or AppImage." )
228+ parser .add_argument ("--mode" , choices = ["capture" , "demo" ], default = "capture" , help = "Load a .sal capture or record from the Logic 2 simulation device." )
38229 parser .add_argument ("--capture" , default = str (root / "assets" / "SP2018_FlexRay.sal" ), help = "Path to the .sal capture." )
39230 parser .add_argument ("--output-dir" , default = str (root / "build" / "automation" ), help = "Directory for exported CSV files." )
40231 parser .add_argument ("--analyzers-dir" , default = str (root / "build" / "Analyzers" ), help = "Directory containing libFlexRayAnalyzer.so." )
41232 parser .add_argument ("--channel" , type = int , default = 0 , help = "Digital channel index for the FlexRay signal." )
42233 parser .add_argument ("--bitrate" , type = int , default = 10_000_000 , choices = [2_500_000 , 5_000_000 , 10_000_000 ], help = "FlexRay bit rate." )
234+ parser .add_argument (
235+ "--sample-rate" ,
236+ type = int ,
237+ default = 40_000_000 ,
238+ help = "Digital sample rate for demo mode. Defaults to 40 MS/s to give the analyzer simulator more timing margin." ,
239+ )
240+ parser .add_argument ("--duration" , type = float , default = 0.010 , help = "Capture duration in seconds for demo mode." )
241+ parser .add_argument ("--buffer-size" , type = int , default = 32 , help = "Capture buffer size in megabytes for demo mode." )
242+ parser .add_argument (
243+ "--demo-pause-seconds" ,
244+ type = float ,
245+ default = 2.0 ,
246+ help = "Pause after the first demo analyzer is added so you can observe Logic 2 before the second-pass load/export. Set 0 to disable." ,
247+ )
248+ parser .add_argument ("--save-capture" , action = "store_true" , help = "Save the recorded demo capture as .sal in the output directory." )
249+ parser .add_argument (
250+ "--manual-rerun" ,
251+ action = "store_true" ,
252+ help = "For demo mode: pause after adding the analyzer so you can rerun the capture in the GUI, save it, and then export that saved capture." ,
253+ )
254+ parser .add_argument (
255+ "--manual-rerun-capture" ,
256+ help = "Path to the .sal file you will save from the GUI during --manual-rerun. Defaults to <output-dir>/flexray_demo_manual.sal." ,
257+ )
43258 parser .add_argument ("--crc-channel" , default = "A" , choices = ["A" , "B" , "a" , "b" ], help = "FlexRay CRC channel." )
44259 parser .add_argument ("--sample-point" , type = int , default = 62 , help = "Sampling point percentage." )
45260 parser .add_argument ("--invert" , action = "store_true" , help = "Invert the digital input before decoding." )
@@ -51,6 +266,7 @@ def main() -> int:
51266 capture_path = Path (args .capture ).expanduser ().resolve ()
52267 output_dir = Path (args .output_dir ).expanduser ().resolve ()
53268 analyzers_dir = Path (args .analyzers_dir ).expanduser ().resolve ()
269+ manual_rerun_capture_path = Path (args .manual_rerun_capture ).expanduser ().resolve () if args .manual_rerun_capture else output_dir / "flexray_demo_manual.sal"
54270 output_dir .mkdir (parents = True , exist_ok = True )
55271
56272 if analyzers_dir .exists () is False :
@@ -65,41 +281,44 @@ def main() -> int:
65281 port = args .port ,
66282 )
67283
284+ capture : automation .Capture | None = None
285+
68286 try :
69- capture = manager .load_capture (str (capture_path ))
287+ if args .mode == "capture" :
288+ capture = manager .load_capture (str (capture_path ))
289+ source_description = f"Capture: { capture_path } "
290+ analyzer = add_flexray_analyzer (capture , args )
291+ else :
292+ capture , analyzer , source_description = run_demo_capture (manager , args , output_dir , manual_rerun_capture_path )
293+
294+ if args .save_capture :
295+ final_demo_capture_path = output_dir / "flexray_demo.sal"
296+ capture .save_capture (str (final_demo_capture_path ))
297+ source_description += f"\n Saved capture: { final_demo_capture_path } "
298+
70299 try :
71- analyzer = capture .add_analyzer (
72- "FlexRay" ,
73- settings = {
74- "Input Channel" : args .channel ,
75- "Bit Rate (Bits/s)" : args .bitrate ,
76- "CRC Channel" : 0 if args .crc_channel .upper () == "A" else 1 ,
77- "Sample Point (%)" : args .sample_point ,
78- "Invert Input" : args .invert ,
79- },
80- )
81-
82- legacy_csv = output_dir / "flexray_export.csv"
83- table_csv = output_dir / "flexray_table.csv"
84- raw_dir = output_dir / "raw"
85- raw_dir .mkdir (parents = True , exist_ok = True )
86-
87- capture .legacy_export_analyzer (str (legacy_csv ), analyzer , automation .RadixType .HEXADECIMAL )
88- capture .export_data_table (
89- str (table_csv ),
90- [automation .DataTableExportConfiguration (analyzer = analyzer , radix = automation .RadixType .HEXADECIMAL )],
91- )
92- capture .export_raw_data_csv (str (raw_dir ), digital_channels = [args .channel ])
300+ legacy_csv , table_csv , raw_dir = export_capture_data (capture , analyzer , output_dir , args .channel )
301+ summary = summarize_legacy_export (legacy_csv )
302+
303+ if args .mode == "demo" :
304+ validate_demo_summary (summary )
305+
306+ top_ids = ", " .join (f"{ frame_id } x{ count } " for frame_id , count in summary ["frame_ids" ].most_common (8 )) or "-"
93307
94308 print (f"Logic 2 binary: { logic2_path } " )
95309 print (f"Config file: { config_path } " )
96310 print (f"Analyzers dir: { analyzers_dir } " )
97- print (f"Capture: { capture_path } " )
98- print (f"Legacy export: { legacy_csv } " )
99- print (f"Data table: { table_csv } " )
100- print (f"Raw export: { raw_dir / 'digital.csv' } " )
311+ print (source_description )
312+ print (f"Legacy export: { legacy_csv } " )
313+ print (f"Data table: { table_csv } " )
314+ print (f"Raw export: { raw_dir / 'digital.csv' } " )
315+ print (f"Decoded rows: { summary ['rows' ]} " )
316+ print (f"Frame rows: { summary ['frames' ]} " )
317+ print (f"Symbol rows: { summary ['symbols' ]} " )
318+ print (f"Error rows: { summary ['errors' ]} " )
319+ print (f"Frame IDs: { top_ids } " )
101320 finally :
102- capture . close ( )
321+ close_capture_if_present ( capture )
103322 finally :
104323 manager .close ()
105324
0 commit comments