Skip to content
This repository was archived by the owner on Jan 23, 2026. It is now read-only.

Commit e980a0f

Browse files
committed
sigrok: csv and vcd parsing
1 parent 953cc5c commit e980a0f

9 files changed

Lines changed: 1061 additions & 4 deletions

File tree

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
1-
from jumpstarter_driver_sigrok.common import CaptureConfig, CaptureResult, DecoderConfig
1+
from jumpstarter_driver_sigrok.common import (
2+
CaptureConfig,
3+
CaptureResult,
4+
DecoderConfig,
5+
OutputFormat,
6+
Sample,
7+
)
28
from jumpstarter_driver_sigrok.driver import Sigrok
39

4-
__all__ = ["Sigrok", "CaptureConfig", "CaptureResult", "DecoderConfig"]
10+
__all__ = ["Sigrok", "CaptureConfig", "CaptureResult", "DecoderConfig", "OutputFormat", "Sample"]
511

packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/common.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,27 @@
55
from pydantic import BaseModel, Field
66

77

8+
class OutputFormat:
9+
"""Constants for sigrok output formats."""
10+
CSV = "csv"
11+
BITS = "bits"
12+
ASCII = "ascii"
13+
BINARY = "binary"
14+
SRZIP = "srzip"
15+
VCD = "vcd"
16+
17+
@classmethod
18+
def all(cls) -> list[str]:
19+
return [cls.CSV, cls.BITS, cls.ASCII, cls.BINARY, cls.SRZIP, cls.VCD]
20+
21+
22+
class Sample(BaseModel):
23+
"""A single sample with timing information."""
24+
sample: int # Sample index
25+
time_ns: int # Time in nanoseconds
26+
values: dict[str, int | float] # Channel values (digital: 0/1, analog: voltage)
27+
28+
829
class DecoderConfig(BaseModel):
930
"""Protocol decoder configuration (real-time during capture)."""
1031

@@ -47,3 +68,59 @@ def data(self) -> bytes:
4768
from base64 import b64decode
4869
return b64decode(self.data_b64)
4970

71+
def decode(self) -> list[Sample] | dict[str, list[int]] | str:
72+
"""Parse captured data based on output format.
73+
74+
Returns:
75+
- CSV format: list[Sample] with timing and all values per sample
76+
- VCD format: list[Sample] with timing and only changed values
77+
- Bits format: dict[str, list[int]] with channel→bit sequences
78+
- ASCII format: str with ASCII art visualization
79+
- Other formats: raises NotImplementedError (use .data for raw bytes)
80+
81+
Raises:
82+
NotImplementedError: For binary/srzip formats (use .data property)
83+
"""
84+
if self.output_format == OutputFormat.CSV:
85+
from .csv import parse_csv
86+
samples_data = parse_csv(self.data, self.sample_rate)
87+
return [Sample.model_validate(s) for s in samples_data]
88+
elif self.output_format == OutputFormat.VCD:
89+
from .vcd import parse_vcd
90+
samples_data = parse_vcd(self.data, self.sample_rate)
91+
return [Sample.model_validate(s) for s in samples_data]
92+
elif self.output_format == OutputFormat.BITS:
93+
return self._parse_bits()
94+
elif self.output_format == OutputFormat.ASCII:
95+
return self.data.decode("utf-8")
96+
else:
97+
raise NotImplementedError(
98+
f"Parsing not implemented for {self.output_format} format. "
99+
f"Use .data property to get raw bytes."
100+
)
101+
102+
def _parse_bits(self) -> dict[str, list[int]]:
103+
"""Parse bits format to dict of channel→bit sequences."""
104+
text = self.data.decode("utf-8")
105+
lines = [line.strip() for line in text.strip().split("\n") if line.strip()]
106+
107+
# bits format is just columns of 0/1
108+
# TODO: Need to determine channel mapping from somewhere
109+
# For now, return as generic numbered channels
110+
result: dict[str, list[int]] = {}
111+
112+
for line in lines:
113+
# Each line might be space/comma separated bits
114+
bits = [int(b) for b in line if b in "01"]
115+
if not result:
116+
# Initialize channels
117+
for i, bit in enumerate(bits):
118+
result[f"CH{i}"] = [bit]
119+
else:
120+
# Append to existing channels
121+
for i, bit in enumerate(bits):
122+
if f"CH{i}" in result:
123+
result[f"CH{i}"].append(bit)
124+
125+
return result
126+
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
"""CSV format parser for sigrok captures."""
2+
3+
from __future__ import annotations
4+
5+
import csv
6+
7+
8+
def parse_csv(data: bytes, sample_rate: str) -> list[dict]:
9+
"""Parse CSV format to list of samples with timing.
10+
11+
Args:
12+
data: Raw CSV data as bytes
13+
sample_rate: Sample rate string (e.g., "100kHz", "1MHz")
14+
15+
Returns:
16+
List of dicts with keys: sample, time_ns, values
17+
"""
18+
text = data.decode("utf-8")
19+
lines = text.strip().split("\n")
20+
21+
# Parse sample rate for timing calculation
22+
sample_rate_hz = _parse_sample_rate_hz(sample_rate)
23+
time_step_ns = int(1_000_000_000.0 / sample_rate_hz)
24+
25+
# Skip comment lines and analog preview lines (format: "A0: -10.0000 V DC")
26+
# The actual data starts after a header row with types like "logic,logic,V DC,V DC"
27+
data_lines = _extract_csv_data_lines(lines)
28+
29+
if not data_lines or len(data_lines) < 2:
30+
return []
31+
32+
# Parse the CSV data
33+
reader = csv.reader(data_lines)
34+
35+
# First row is types (logic, V DC, etc.) - use for channel name inference
36+
types_row = next(reader)
37+
38+
# Get channel names from types
39+
channel_names = _infer_channel_names(types_row)
40+
41+
# Parse data rows
42+
samples: list[dict] = []
43+
for idx, row in enumerate(reader):
44+
values = _parse_csv_row(channel_names, row)
45+
samples.append({
46+
"sample": idx,
47+
"time_ns": idx * time_step_ns,
48+
"values": values,
49+
})
50+
51+
return samples
52+
53+
54+
def _parse_sample_rate_hz(sample_rate: str) -> float:
55+
"""Parse sample rate string to Hz."""
56+
rate = sample_rate.strip().upper()
57+
multipliers = {"K": 1e3, "M": 1e6, "G": 1e9}
58+
59+
for suffix, mult in multipliers.items():
60+
if rate.endswith(f"{suffix}HZ"):
61+
return float(rate[:-3]) * mult
62+
elif rate.endswith(suffix):
63+
return float(rate[:-1]) * mult
64+
65+
# Assume Hz if no suffix
66+
return float(rate.rstrip("HZ"))
67+
68+
69+
def _extract_csv_data_lines(lines: list[str]) -> list[str]:
70+
"""Extract actual CSV data lines, skipping comments and analog preview lines."""
71+
data_lines = []
72+
73+
for _i, line in enumerate(lines):
74+
line = line.strip()
75+
# Skip comment lines
76+
if line.startswith(";"):
77+
continue
78+
# Skip analog preview lines (contain colon, not CSV comma-separated)
79+
if ":" in line and "," not in line:
80+
continue
81+
# This is CSV data
82+
data_lines.append(line)
83+
84+
return data_lines
85+
86+
87+
def _infer_channel_names(types_row: list[str]) -> list[str]:
88+
"""Infer channel names from CSV type header row.
89+
90+
Args:
91+
types_row: List of type strings like ["logic", "logic", "V DC", "V DC"]
92+
93+
Returns:
94+
List of channel names like ["D0", "D1", "A0", "A1"]
95+
"""
96+
channel_names = []
97+
digital_count = 0
98+
analog_count = 0
99+
100+
for type_str in types_row:
101+
type_lower = type_str.lower()
102+
if "logic" in type_lower:
103+
channel_names.append(f"D{digital_count}")
104+
digital_count += 1
105+
elif "v" in type_lower or "dc" in type_lower:
106+
# Analog channel
107+
channel_names.append(f"A{analog_count}")
108+
analog_count += 1
109+
else:
110+
# Unknown type, use generic name
111+
channel_names.append(f"CH{len(channel_names)}")
112+
113+
return channel_names
114+
115+
116+
def _parse_csv_row(channel_names: list[str], row: list[str]) -> dict[str, int | float]:
117+
"""Parse a CSV data row into channel values.
118+
119+
Args:
120+
channel_names: List of channel names
121+
row: List of value strings
122+
123+
Returns:
124+
Dict mapping channel name to parsed value
125+
"""
126+
values = {}
127+
128+
for channel, value in zip(channel_names, row, strict=True):
129+
value = value.strip()
130+
# Try to parse as number (analog) or binary (digital)
131+
try:
132+
if "." in value or "e" in value.lower():
133+
values[channel] = float(value)
134+
else:
135+
values[channel] = int(value)
136+
except ValueError:
137+
# Keep as string if not a number
138+
values[channel] = value
139+
140+
return values
141+
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
"""Tests for CSV format parser."""
2+
3+
from shutil import which
4+
5+
import pytest
6+
7+
from .client import SigrokClient
8+
from .common import CaptureConfig, CaptureResult, OutputFormat
9+
from .driver import Sigrok
10+
from jumpstarter.common.utils import serve
11+
12+
13+
@pytest.fixture
14+
def demo_driver_instance():
15+
"""Create a Sigrok driver instance configured for the demo device."""
16+
# Demo driver has 8 digital channels (D0-D7) and 5 analog (A0-A4)
17+
# Map device channels to decoder-friendly semantic names
18+
return Sigrok(
19+
driver="demo",
20+
executable="sigrok-cli",
21+
channels={
22+
"D0": "vcc",
23+
"D1": "cs",
24+
"D2": "miso",
25+
"D3": "mosi",
26+
"D4": "clk",
27+
"D5": "sda",
28+
"D6": "scl",
29+
"D7": "gnd",
30+
},
31+
)
32+
33+
34+
@pytest.fixture
35+
def demo_client(demo_driver_instance):
36+
"""Create a client for the demo Sigrok driver."""
37+
with serve(demo_driver_instance) as client:
38+
yield client
39+
40+
41+
@pytest.mark.skipif(which("sigrok-cli") is None, reason="sigrok-cli not installed")
42+
def test_csv_format_basic(demo_client: SigrokClient):
43+
"""Test CSV format capture with demo driver."""
44+
cfg = CaptureConfig(
45+
sample_rate="50kHz",
46+
samples=50,
47+
output_format=OutputFormat.CSV,
48+
channels=["vcc", "cs"], # Select specific digital channels
49+
)
50+
51+
result = demo_client.capture(cfg)
52+
assert isinstance(result, CaptureResult)
53+
assert isinstance(result.data, bytes)
54+
decoded_data = result.decode()
55+
assert isinstance(decoded_data, list)
56+
assert len(decoded_data) > 0
57+
# Verify channel names are in the data
58+
first_sample = decoded_data[0]
59+
assert "D0" in first_sample.values or "D1" in first_sample.values
60+
61+
62+
@pytest.mark.skipif(which("sigrok-cli") is None, reason="sigrok-cli not installed")
63+
def test_csv_format_timing(demo_client: SigrokClient):
64+
"""Test CSV format timing calculations with integer nanoseconds."""
65+
cfg = CaptureConfig(
66+
sample_rate="100kHz",
67+
samples=50,
68+
output_format=OutputFormat.CSV,
69+
channels=["D0", "D1", "D2"], # Select specific channels
70+
)
71+
72+
result = demo_client.capture(cfg)
73+
assert isinstance(result, CaptureResult)
74+
75+
# Decode the CSV data
76+
samples = result.decode()
77+
assert isinstance(samples, list)
78+
assert len(samples) > 0
79+
80+
# Verify timing progresses correctly
81+
for sample in samples:
82+
assert isinstance(sample.time_ns, int)
83+
# Verify timing progresses (1/100kHz = 10,000ns per sample)
84+
assert sample.time_ns == sample.sample * 10_000
85+
86+
87+
@pytest.mark.skipif(which("sigrok-cli") is None, reason="sigrok-cli not installed")
88+
def test_csv_format_analog_channels(demo_client: SigrokClient):
89+
"""Test CSV capture of analog channels with voltage values."""
90+
cfg = CaptureConfig(
91+
sample_rate="100kHz",
92+
samples=20,
93+
output_format=OutputFormat.CSV,
94+
channels=["A0", "A1"], # Select specific analog channels
95+
)
96+
97+
result = demo_client.capture(cfg)
98+
assert isinstance(result, CaptureResult)
99+
assert isinstance(result.data, bytes)
100+
decoded_data = result.decode()
101+
assert isinstance(decoded_data, list)
102+
assert len(decoded_data) > 0
103+
104+
# Check first sample for analog values
105+
first_sample = decoded_data[0]
106+
assert len(first_sample.values) > 0
107+
108+
# Analog values should be floats (voltages)
109+
for _channel, value in first_sample.values.items():
110+
assert isinstance(value, (int, float))
111+
112+
113+
@pytest.mark.skipif(which("sigrok-cli") is None, reason="sigrok-cli not installed")
114+
def test_csv_format_mixed_channels(demo_client: SigrokClient):
115+
"""Test CSV with both digital and analog channels."""
116+
cfg = CaptureConfig(
117+
sample_rate="100kHz",
118+
samples=30,
119+
output_format=OutputFormat.CSV,
120+
channels=["D0", "D1", "A0"], # Mix of digital and analog
121+
)
122+
123+
result = demo_client.capture(cfg)
124+
samples = result.decode()
125+
126+
assert isinstance(samples, list)
127+
assert len(samples) > 0
128+
129+
# Verify we have values for channels
130+
first_sample = samples[0]
131+
assert len(first_sample.values) > 0
132+

packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from shutil import which
99
from tempfile import TemporaryDirectory
1010

11-
from .common import CaptureConfig, DecoderConfig
11+
from .common import CaptureConfig, DecoderConfig, OutputFormat
1212
from jumpstarter.driver import Driver, export
1313

1414

@@ -65,7 +65,7 @@ def get_channel_map(self) -> dict[int, str]:
6565

6666
@export
6767
def list_output_formats(self) -> list[str]:
68-
return ["csv", "srzip", "vcd", "binary", "bits", "ascii"]
68+
return OutputFormat.all()
6969

7070
@export
7171
def capture(self, config: CaptureConfig | dict) -> dict:

0 commit comments

Comments
 (0)