|
| 1 | +import can |
| 2 | +import time |
| 3 | +import random |
| 4 | +import sys |
| 5 | +from collections import defaultdict |
| 6 | + |
| 7 | +class FrameCounter(can.Listener): |
| 8 | + """A can.Listener that counts frames received and sent, with per-channel stats.""" |
| 9 | + def __init__(self): |
| 10 | + super().__init__() |
| 11 | + self.rx_cnt = defaultdict(int) |
| 12 | + self.tx_cnt = defaultdict(int) |
| 13 | + self.err_cnt = 0 |
| 14 | + self.st = time.time() |
| 15 | + |
| 16 | + def on_message_received(self, msg: can.Message): |
| 17 | + print(f'\033[2K\r{msg}', end='', flush=True) |
| 18 | + if msg.is_error_frame: |
| 19 | + self.err_cnt += 1 |
| 20 | + return |
| 21 | + |
| 22 | + ch = msg.channel |
| 23 | + if msg.is_rx: |
| 24 | + self.rx_cnt[ch] += 1 |
| 25 | + else: |
| 26 | + # TX echoes are not always enabled, but we count them if they appear. |
| 27 | + # The main count comes from the sending side. |
| 28 | + self.tx_cnt[ch] += 1 |
| 29 | + |
| 30 | + def stop(self): |
| 31 | + total_tx = sum(self.tx_cnt.values()) |
| 32 | + total_rx = sum(self.rx_cnt.values()) |
| 33 | + print(f'\ntx: {total_tx}, rx: {total_rx}, err: {self.err_cnt}, dt: {time.time() - self.st:.2f} s') |
| 34 | + if self.rx_cnt: |
| 35 | + print('RX counts per channel:') |
| 36 | + for ch, count in sorted(self.rx_cnt.items()): |
| 37 | + print(f' - Channel {ch}: {count}') |
| 38 | + if self.tx_cnt: |
| 39 | + print('TX counts (from echoes) per channel:') |
| 40 | + for ch, count in sorted(self.tx_cnt.items()): |
| 41 | + print(f' - Channel {ch}: {count}') |
| 42 | + |
| 43 | + |
| 44 | +def run_stress_test(channels_to_test: list, num_messages: int = 10000): |
| 45 | + """Runs a stress test on a given list of channels.""" |
| 46 | + frame_counter = FrameCounter() |
| 47 | + |
| 48 | + if not channels_to_test: |
| 49 | + print("No channels provided for stress test.") |
| 50 | + return |
| 51 | + |
| 52 | + try: |
| 53 | + # Note: `can.Bus` will select the `CandleBus` based on `interface='candle'`. |
| 54 | + # The modified `CandleBus` accepts a list of channels. |
| 55 | + # Loopback is enabled to receive the frames we send. |
| 56 | + with can.Bus(interface='candle', channel=channels_to_test, fd=True, loop_back=True, bitrate=1000000) as bus: |
| 57 | + notifier = can.Notifier(bus, [frame_counter]) |
| 58 | + |
| 59 | + # Extract integer channel numbers for sending |
| 60 | + channel_numbers = [int(ch.split(':')[1]) for ch in channels_to_test] |
| 61 | + |
| 62 | + print(f"Sending {num_messages} messages across {len(channel_numbers)} channels...") |
| 63 | + for i in range(num_messages): |
| 64 | + # Round-robin send to each channel |
| 65 | + target_channel = channel_numbers[i % len(channel_numbers)] |
| 66 | + msg = can.Message( |
| 67 | + arbitration_id=random.randrange(0, 1 << 11), |
| 68 | + is_extended_id=False, |
| 69 | + data=random.randbytes(random.randint(1, 8)), |
| 70 | + is_fd=True, |
| 71 | + channel=target_channel # Set target channel for CandleBus to route it |
| 72 | + ) |
| 73 | + bus.send(msg) |
| 74 | + |
| 75 | + print("\nWaiting for messages to be received...") |
| 76 | + time.sleep(2) # Give some time for all messages to be received via loopback |
| 77 | + notifier.stop() |
| 78 | + frame_counter.stop() |
| 79 | + |
| 80 | + # Verification |
| 81 | + total_sent = num_messages |
| 82 | + total_received = sum(frame_counter.rx_cnt.values()) |
| 83 | + if total_received >= total_sent: |
| 84 | + print(f"PASS: Sent {total_sent} and received {total_received} messages.") |
| 85 | + else: |
| 86 | + print(f"FAIL: Sent {total_sent} but only received {total_received} messages.") |
| 87 | + |
| 88 | + except Exception as e: |
| 89 | + print(f"FAIL: An error occurred during the stress test: {e}") |
| 90 | + |
| 91 | + |
| 92 | +def main(): |
| 93 | + """ |
| 94 | + Detects available candle devices and runs multi-channel stress tests. |
| 95 | + """ |
| 96 | + print("Detecting available candle devices...") |
| 97 | + try: |
| 98 | + # Use the static method from CandleBus to find channels |
| 99 | + available_configs = can.detect_available_configs(interfaces='candle') |
| 100 | + except Exception as e: |
| 101 | + print(f"Could not detect candle channels: {e}") |
| 102 | + sys.exit(1) |
| 103 | + |
| 104 | + if not available_configs: |
| 105 | + print("No candle devices found.") |
| 106 | + sys.exit(0) |
| 107 | + |
| 108 | + channels_by_device = defaultdict(list) |
| 109 | + for config in available_configs: |
| 110 | + serial, _ = config['channel'].split(':') |
| 111 | + channels_by_device[serial].append(config['channel']) |
| 112 | + |
| 113 | + print("Found candle devices and channels:") |
| 114 | + for serial, channels in channels_by_device.items(): |
| 115 | + print(f" - Device {serial}: Channels {[ch.split(':')[1] for ch in channels]}") |
| 116 | + |
| 117 | + # --- Test 1: Single device with multiple channels --- |
| 118 | + print("\n--- Test 1: Single device, multiple channels ---") |
| 119 | + tested_single_multi = False |
| 120 | + for serial, channels in channels_by_device.items(): |
| 121 | + if len(channels) > 1: |
| 122 | + print(f"\n=> Testing device {serial} with all its channels: {channels}") |
| 123 | + run_stress_test(channels) |
| 124 | + tested_single_multi = True |
| 125 | + break # Only test the first one found |
| 126 | + if not tested_single_multi: |
| 127 | + print("Skipped: No single device with multiple channels found.") |
| 128 | + |
| 129 | + # --- Test 2: All channels of each device sequentially --- |
| 130 | + print("\n--- Test 2: Each device's channels tested individually ---") |
| 131 | + if len(channels_by_device) > 1: |
| 132 | + for serial, channels in channels_by_device.items(): |
| 133 | + print(f"\n=> Testing device {serial} with its channels: {channels}") |
| 134 | + run_stress_test(channels) |
| 135 | + else: |
| 136 | + print("Skipped: Fewer than two devices found, or already tested above.") |
| 137 | + |
| 138 | + # --- Test 3: Invalid multi-device configuration --- |
| 139 | + print("\n--- Test 3: Invalid multi-device configuration ---") |
| 140 | + if len(channels_by_device) > 1: |
| 141 | + # Take one channel from the first two devices |
| 142 | + channels_from_different_devices = [ |
| 143 | + list(channels_by_device.values())[0][0], |
| 144 | + list(channels_by_device.values())[1][0] |
| 145 | + ] |
| 146 | + print(f"\n=> Attempting to initialize bus with channels from different devices: {channels_from_different_devices}") |
| 147 | + try: |
| 148 | + with can.Bus(interface='candle', channel=channels_from_different_devices): |
| 149 | + print("FAIL: can.CanInitializationError was NOT raised when it should have been.") |
| 150 | + except can.CanInitializationError as e: |
| 151 | + print(f"PASS: Caught expected error: {e}") |
| 152 | + except Exception as e: |
| 153 | + print(f"FAIL: Caught unexpected error: {e}") |
| 154 | + else: |
| 155 | + print("Skipped: Fewer than two devices found.") |
| 156 | + |
| 157 | + |
| 158 | +if __name__ == '__main__': |
| 159 | + main() |
0 commit comments