From ab983af691ad7b78ccd7f6f2b5a10d55d76600ed Mon Sep 17 00:00:00 2001 From: ssweber <57631333+ssweber@users.noreply.github.com> Date: Tue, 7 Apr 2026 12:20:01 -0400 Subject: [PATCH 01/15] fix(encode): suppress AF summary for multi-row AFs and support non-zero pinned rows MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The AF summary block (appended to the last AF cell when 2+ AFs exist) was being emitted even when a multi-row AF (e.g. retained timer) was present. Native Click captures omit the summary in that case — the extra 32 bytes misaligned the grid and crashed Click on paste. Also updates csv/converter.py and csv/writer.py to support pinned AF instructions (timer/counter/shift/drum) at any row position, not just row 0. This enables rungs like coil-at-row-0 + timer-at-row-1. Co-Authored-By: Claude Opus 4.6 --- src/laddercodec/_grid.py | 11 +- src/laddercodec/csv/converter.py | 27 +++-- src/laddercodec/csv/writer.py | 171 +++++++++++++++++++------------ 3 files changed, 128 insertions(+), 81 deletions(-) diff --git a/src/laddercodec/_grid.py b/src/laddercodec/_grid.py index 363de74..096bf3d 100644 --- a/src/laddercodec/_grid.py +++ b/src/laddercodec/_grid.py @@ -137,10 +137,17 @@ def _compute_rung_metadata( idx += 1 # AF summary block — needed on the last AF instruction cell when 2+ AFs - # (single-rung only; multi-rung does not use af_summary). + # are all single-row (single-rung only; multi-rung does not use + # af_summary). Suppressed when any AF is multi-row (e.g. retained + # timer with reset pin) — native captures omit it in that case. af_summary_block = b"" af_rows = sorted(af_instr_indices.keys()) - if single_rung and len(af_rows) >= 2: + any_multi_row_af = any( + af_tokens[r].cell_params().get("visual_rows", 1) > 1 + for r in af_rows + if isinstance(af_tokens[r], AfInstruction) + ) + if single_rung and len(af_rows) >= 2 and not any_multi_row_af: af_entries: list[tuple[int, int, bool]] = [] for r in af_rows: cond_count = sum(1 for t in condition_rows[r] if isinstance(t, ConditionInstruction)) diff --git a/src/laddercodec/csv/converter.py b/src/laddercodec/csv/converter.py index 5396fa0..23083ca 100644 --- a/src/laddercodec/csv/converter.py +++ b/src/laddercodec/csv/converter.py @@ -174,6 +174,7 @@ def convert_rung( parent_counter: Counter | None = None parent_shift: Shift | None = None parent_drum: Drum | None = None + parent_af_idx: int | None = None counter_up_conditions: list[ConditionToken] | None = None counter_top_conditions: list[ConditionToken] | None = None counter_bridge_conditions: list[ConditionToken] | None = None @@ -203,13 +204,13 @@ def convert_rung( raise ConvertError(".down() is only valid for count_up()") counter_down_conditions = conds parent_counter = replace(parent_counter, down_enabled=True) - af_tokens[0] = parent_counter + af_tokens[parent_af_idx] = parent_counter continue if pin_name == ".reset": counter_reset_conditions = conds parent_counter = replace(parent_counter, reset_enabled=True) - af_tokens[0] = parent_counter + af_tokens[parent_af_idx] = parent_counter continue raise ConvertError( @@ -223,12 +224,12 @@ def convert_rung( if pin_name == ".jump": jump_target = af_node.args[0] if af_node.args else "" parent_drum = replace(parent_drum, jump_enabled=True, jump_target=jump_target) - af_tokens[0] = parent_drum + af_tokens[parent_af_idx] = parent_drum drum_jump_conditions = conds continue if pin_name == ".jog": parent_drum = replace(parent_drum, jog_enabled=True) - af_tokens[0] = parent_drum + af_tokens[parent_af_idx] = parent_drum drum_jog_conditions = conds continue raise ConvertError( @@ -247,8 +248,7 @@ def convert_rung( if pin_name in _RETENTIVE_PINS and parent_timer is not None: # .reset() makes the parent timer retentive. parent_timer = replace(parent_timer, retained=True) - # Update af_tokens[0] with the modified timer. - af_tokens[0] = parent_timer + af_tokens[parent_af_idx] = parent_timer # Pin row contributes its conditions/wires as a normal data row, # but the AF token becomes blank (no separate instruction). @@ -274,6 +274,7 @@ def convert_rung( condition_rows.append(conds) af_tokens.append(token) parent_counter = token + parent_af_idx = len(af_tokens) - 1 counter_up_conditions = conds continue @@ -289,17 +290,21 @@ def convert_rung( condition_rows.append(conds) af_tokens.append(token) - if row_idx == 0 and isinstance(af_node, AfCall) and af_node.name.upper() != "NOP": - if isinstance(token, Timer): + if isinstance(af_node, AfCall) and af_node.name.upper() != "NOP": + if isinstance(token, Timer) and parent_timer is None: parent_timer = token - if isinstance(token, Counter): + parent_af_idx = len(af_tokens) - 1 + if isinstance(token, Counter) and parent_counter is None: parent_counter = token + parent_af_idx = len(af_tokens) - 1 counter_up_conditions = conds - if isinstance(token, Shift): + if isinstance(token, Shift) and parent_shift is None: parent_shift = token + parent_af_idx = len(af_tokens) - 1 shift_data_conditions = conds - if isinstance(token, Drum): + if isinstance(token, Drum) and parent_drum is None: parent_drum = token + parent_af_idx = len(af_tokens) - 1 # --- Drum row shaping --- if parent_drum is not None: diff --git a/src/laddercodec/csv/writer.py b/src/laddercodec/csv/writer.py index 9b3e6c9..d290efe 100644 --- a/src/laddercodec/csv/writer.py +++ b/src/laddercodec/csv/writer.py @@ -107,98 +107,133 @@ def decoded_rung_to_rows(rung: Rung) -> list[list[str]]: condition_rows = list(rung.conditions) af_tokens = list(rung.instructions) - # Counters need custom CSV shaping. Native count_down decodes as AF row 0 - # plus a NOP bridge row; when row 0 is truly empty we collapse that spacer - # row in CSV, but preserve any real row-above content. - counter_row = next((idx for idx, af in enumerate(af_tokens) if isinstance(af, Counter)), None) - if counter_row is not None: - counter = cast(Counter, af_tokens[counter_row]) - if len(condition_rows) < 3 or len(af_tokens) < 3: + # --- Find pinned AF instruction (counter/timer/shift/drum) at any row --- + pinned_row: int | None = None + pinned_af: AfInstruction | None = None + pinned_family: str | None = None + for idx, af in enumerate(af_tokens): + if isinstance(af, AfInstruction): + fam = get_af_family_for_token(af) + if fam is not None and fam.family_name in ("counter", "timer", "shift", "drum"): + pinned_row = idx + pinned_af = af + pinned_family = fam.family_name + break + + is_retained_timer = ( + pinned_family == "timer" and isinstance(pinned_af, Timer) and pinned_af.retained + ) + is_tall = any( + isinstance(af, AfInstruction) and af.cell_params().get("visual_rows", 1) > 1 + for af in af_tokens + ) + + # Helper: emit regular data rows for indices [0, up_to). + def _emit_prefix(up_to: int) -> None: + for i in range(up_to): + marker = "R" if i == 0 else "" + rows.append( + [marker] + + [_token_to_csv(c) for c in condition_rows[i]] + + [_token_to_csv(af_tokens[i])] + ) + + if pinned_family == "counter": + assert pinned_row is not None + counter = cast(Counter, pinned_af) + needed = pinned_row + 3 + if len(condition_rows) < needed or len(af_tokens) < needed: raise WriterError( - f"{counter.counter_type} requires 3 decoded rows; got {len(condition_rows)}" + f"{counter.counter_type} requires {needed} decoded rows; got {len(condition_rows)}" ) + _emit_prefix(pinned_row) + first_marker = "R" if pinned_row == 0 else "" if counter.counter_type == "count_up": - if counter_row != 0: - raise WriterError("count_up must appear on decoded row 0") - - top_conditions = condition_rows[0] - rows.append(["R"] + [_token_to_csv(c) for c in top_conditions] + [counter.to_csv()]) - + rows.append( + [first_marker] + + [_token_to_csv(c) for c in condition_rows[pinned_row]] + + [counter.to_csv()] + ) if counter.down_enabled: - down_conditions = condition_rows[1] - rows.append([""] + [_token_to_csv(c) for c in down_conditions] + [".down()"]) - + rows.append( + [""] + [_token_to_csv(c) for c in condition_rows[pinned_row + 1]] + [".down()"] + ) if counter.reset_enabled: - reset_conditions = condition_rows[2] - rows.append([""] + [_token_to_csv(c) for c in reset_conditions] + [".reset()"]) - + rows.append( + [""] + [_token_to_csv(c) for c in condition_rows[pinned_row + 2]] + [".reset()"] + ) return rows - if counter_row == 0: - if af_tokens[1] != "NOP": - raise WriterError("count_down requires a NOP bridge row in the decoded rung") - elif counter_row != 1: - raise WriterError("count_down must appear on decoded row 0 or row 1") - - top_conditions = condition_rows[0] - bridge_conditions = condition_rows[1] - if _conditions_are_blank(top_conditions): - rows.append(["R"] + [_token_to_csv(c) for c in bridge_conditions] + [counter.to_csv()]) + # count_down: NOP bridge row follows the counter. + bridge_row = pinned_row + 1 + if af_tokens[bridge_row] != "NOP": + raise WriterError("count_down requires a NOP bridge row after the counter") + counter_conditions = condition_rows[pinned_row] + bridge_conditions = condition_rows[bridge_row] + if _conditions_are_blank(counter_conditions): + rows.append( + [first_marker] + [_token_to_csv(c) for c in bridge_conditions] + [counter.to_csv()] + ) else: - rows.append(["R"] + [_token_to_csv(c) for c in top_conditions] + [""]) + rows.append([first_marker] + [_token_to_csv(c) for c in counter_conditions] + [""]) rows.append([""] + [_token_to_csv(c) for c in bridge_conditions] + [counter.to_csv()]) - if counter.reset_enabled: - reset_conditions = condition_rows[2] - rows.append([""] + [_token_to_csv(c) for c in reset_conditions] + [".reset()"]) - + rows.append( + [""] + [_token_to_csv(c) for c in condition_rows[bridge_row + 1]] + [".reset()"] + ) return rows - # --- Determine timer retention / tall padding --- - af0 = rung.instructions[0] if rung.instructions else None - af0_family = get_af_family_for_token(af0) if isinstance(af0, AfInstruction) else None - family_name = af0_family.family_name if af0_family is not None else None - is_retained_timer = family_name == "timer" and isinstance(af0, Timer) and af0.retained - is_tall = isinstance(af0, AfInstruction) and af0.cell_params().get("visual_rows", 1) > 1 - - if family_name == "shift": - shift = cast(Shift, af0) - if len(condition_rows) != 3 or len(af_tokens) != 3: - raise WriterError(f"shift requires 3 decoded rows; got {len(condition_rows)}") - - rows.append(["R"] + [_token_to_csv(c) for c in condition_rows[0]] + [shift.to_csv()]) - rows.append([""] + [_token_to_csv(c) for c in condition_rows[1]] + [".clock()"]) - rows.append([""] + [_token_to_csv(c) for c in condition_rows[2]] + [".reset()"]) + if pinned_family == "shift": + assert pinned_row is not None + shift = cast(Shift, pinned_af) + needed = pinned_row + 3 + if len(condition_rows) < needed or len(af_tokens) < needed: + raise WriterError(f"shift requires {needed} decoded rows; got {len(condition_rows)}") + _emit_prefix(pinned_row) + marker = "R" if pinned_row == 0 else "" + rows.append( + [marker] + [_token_to_csv(c) for c in condition_rows[pinned_row]] + [shift.to_csv()] + ) + rows.append( + [""] + [_token_to_csv(c) for c in condition_rows[pinned_row + 1]] + [".clock()"] + ) + rows.append( + [""] + [_token_to_csv(c) for c in condition_rows[pinned_row + 2]] + [".reset()"] + ) return rows - if family_name == "drum": - drum = cast(Drum, af0) - if len(condition_rows) < 4 or len(af_tokens) < 4: - raise WriterError(f"Drum requires 4 decoded rows; got {len(condition_rows)}") - - # Row 0: main + drum instruction - rows.append(["R"] + [_token_to_csv(c) for c in condition_rows[0]] + [drum.to_csv()]) - # Row 1: .reset() — always present - rows.append([""] + [_token_to_csv(c) for c in condition_rows[1]] + [".reset()"]) - # Row 2: .jump(target) if enabled + if pinned_family == "drum": + assert pinned_row is not None + drum = cast(Drum, pinned_af) + needed = pinned_row + 4 + if len(condition_rows) < needed or len(af_tokens) < needed: + raise WriterError(f"Drum requires {needed} decoded rows; got {len(condition_rows)}") + _emit_prefix(pinned_row) + marker = "R" if pinned_row == 0 else "" + rows.append( + [marker] + [_token_to_csv(c) for c in condition_rows[pinned_row]] + [drum.to_csv()] + ) + rows.append( + [""] + [_token_to_csv(c) for c in condition_rows[pinned_row + 1]] + [".reset()"] + ) if drum.jump_enabled: rows.append( [""] - + [_token_to_csv(c) for c in condition_rows[2]] + + [_token_to_csv(c) for c in condition_rows[pinned_row + 2]] + [f".jump({drum.jump_target})"] ) - # Row 3: .jog() if enabled if drum.jog_enabled: - rows.append([""] + [_token_to_csv(c) for c in condition_rows[3]] + [".jog()"]) + rows.append( + [""] + [_token_to_csv(c) for c in condition_rows[pinned_row + 3]] + [".jog()"] + ) return rows if is_retained_timer: - # Retained timer: the second row becomes a .reset() pin row. - # Keep all rows — the second row carries reset-enable conditions. + # Retained timer: keep all rows — the row after the timer is a .reset() pin. pass elif is_tall: - # Tall instruction (timer/copy): strip trailing blank padding rows. + # Tall instruction (timer/copy/search): strip trailing blank padding rows. while len(condition_rows) > 1 and _is_blank_row(condition_rows[-1], af_tokens[-1]): condition_rows = condition_rows[:-1] af_tokens = af_tokens[:-1] @@ -207,8 +242,8 @@ def decoded_rung_to_rows(rung: Rung) -> list[list[str]]: for i, (conditions, af) in enumerate(zip(condition_rows, af_tokens, strict=True)): marker = "R" if i == 0 else "" - # Pin row: retained timer's second row gets .reset() as AF. - if is_retained_timer and i == 1: + # Pin row: retained timer's row after the timer gets .reset() as AF. + if is_retained_timer and pinned_row is not None and i == pinned_row + 1: af_str = ".reset()" else: af_str = _token_to_csv(af) From 95db7ae46e78d1e3fb86a10f814cfe7d2dc00440 Mon Sep 17 00:00:00 2001 From: ssweber <57631333+ssweber@users.noreply.github.com> Date: Tue, 7 Apr 2026 12:20:16 -0400 Subject: [PATCH 02/15] test: add instr-coil-timer-ret golden fixture MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 3-row rung with coil at AF row 0 and retained timer at AF row 1 with .reset() pin on row 2. Verified via Click paste round-trip. Regenerates instr-6row-multi-output.bin (affected by AF summary suppression) — needs re-verification. Co-Authored-By: Claude Opus 4.6 --- .../golden/instr-6row-multi-output.bin | Bin 16384 -> 16384 bytes .../golden/instr-coil-timer-ret.bin | Bin 0 -> 12288 bytes .../golden/instr-coil-timer-ret.csv | 7 +++++++ .../golden/verify_progress.log | 14 +++++++------- 4 files changed, 14 insertions(+), 7 deletions(-) create mode 100644 tests/fixtures/ladder_captures/golden/instr-coil-timer-ret.bin create mode 100644 tests/fixtures/ladder_captures/golden/instr-coil-timer-ret.csv diff --git a/tests/fixtures/ladder_captures/golden/instr-6row-multi-output.bin b/tests/fixtures/ladder_captures/golden/instr-6row-multi-output.bin index ec296d3ba1b0530a69874f00d1d76df5bdb515ed..7069255bf9cb690d574578a1c4b230834080e0ca 100644 GIT binary patch delta 24 ecmZo@U~Fh$+#qMm#>fB!>>Cp|Gj1%u(!H6i2U9%A-&qKq0)BPsCDHtsO_SL6ty`kPs+E)RkIkrLrV;;>KM^eh^iq z68|(3j}LIq#N#}0sOz8jfV2~NUOUHgXV%V~Y?|MG-TWp=60@%^G;PB&yJJIZ+rab8 zZd+~-z1;Mkq3zqq`gY*$EpH#%cRs3b54^tZS88_6%YDCIrvLX2Z24VH!l$4&xwnZm z{chixV_6j+^NYVIPo+xbi^^Y~{&C^dsKAYU%;Rh@>eRBl7SwwWCI(dNny7 zS3A0Ns_$rcqgt(QFqwbDlB7Q#t={uDtRD9i_jlU8{LyO5-=nqZ*}B*%xaf{< zwjK_v*Fx8S6`pSm@%a`V-?Mm+p5dNx7b9j%lY6=-e?jD*6ZwlG|Gdb*Ao4GY{7WK# zN#tJ^`By~#vdF(G@~?^f>mt7_@^6Uzn~$X^lpw?+OPkzWz{cSZg^k$+$0KM?s3 zMgAj^|5)JTAxv4BG(w&BDm4PSG#Me|J1zw>oV=s zYb$kX4e!5UpLyGicBq6bOc^H%rDqHY7ktRcYsc8M7KV`_5n-%np_~jkS)YFqYfdk) z2uvgfvB)CWONGZo{*Cv`vl2+)FcF2vL=+wqQFu&5;V}_~$3zq!6H$0fMBy6VS%Bh0P@d*Huwo0j6-^+Ps9fb!3jK` zlSgr2&d-Q`>JpKUZV|CC$Q$$f#JZ0@yNIVq5tGIjW;SaPF_0M8q?|4SABl@tgvUqD z{#5rxM`6doT@czi_R@ezf` zM-(0(QFwer;qeiL$43+%A5nOGMB(ufg~vw}9v@M7d_>{#5rxM`6doT@czgtY$+P&k ztv5r8=SR!`WjXMk759-PtMDs(F9=90Jj!?w>O9owPI@;(|L>o0pz?_0k7@1CP-E=H z4bSYlKk`6SXfqw{x@!&zI#fayf{Fjf2u~5bCq7Mx!(znqYe(JDXVi{3X$*om Date: Tue, 7 Apr 2026 12:29:43 -0400 Subject: [PATCH 03/15] docs: add troubleshooting guide for encode failures Covers the inspect_cells workflow for comparing native vs encoded binaries, cell header field reference, and known pitfalls (AF summary suppression, tall instruction visual_rows). Co-Authored-By: Claude Opus 4.6 --- docs/guides/troubleshooting.md | 114 +++++++++++++++++++++++++++++++++ mkdocs.yml | 1 + 2 files changed, 115 insertions(+) create mode 100644 docs/guides/troubleshooting.md diff --git a/docs/guides/troubleshooting.md b/docs/guides/troubleshooting.md new file mode 100644 index 0000000..19e60f9 --- /dev/null +++ b/docs/guides/troubleshooting.md @@ -0,0 +1,114 @@ +# Troubleshooting Encode Failures + +When an encoded binary crashes Click on paste (or renders incorrectly), the problem is usually a structural byte difference in the cell grid — not the instruction blob content. This guide covers how to find it. + +## Tools + +| Tool | What it does | +|---|---| +| `devtools/inspect_bin.py` | Decodes a `.bin` and prints all instructions with `to_csv()` output. Good for checking logical correctness, but doesn't show structural bytes. | +| `inspect_cells()` | Dumps raw cell bytes, header fields, flags, and decoded tokens for specific cells. This is the primary debugging tool for encode failures. | + +`inspect_cells` lives in `laddercodec.decode` (not yet in the public API — import it directly): + +```python +from laddercodec.decode import inspect_cells + +data = open("capture.bin", "rb").read() +dumps = inspect_cells(data, [(0, 0, "AF"), (0, 1, "AF"), (0, 2, "AF")]) +for d in dumps: + print(d) +``` + +The second argument is a list of `(rung_index, visual_row, column_letter)` tuples. Column is `"A"` through `"AE"` for conditions, `"AF"` for the output column. + +Each `CellDump` has: + +- `offset` — absolute byte position in the buffer +- `size` — cell size (0x40 for wire cells, larger for instruction cells) +- `flags` — `(segment, right, down)` from `+0x19/+0x1D/+0x21` +- `token` — decoded instruction or wire token +- `raw` — full cell bytes +- `hex()` — formatted hex dump + +## Workflow + +### 1. Get a native capture + +Paste the same rung shape manually in Click, then copy it back to get a native `.bin`. This is your ground truth. The native capture can come from `clicknick-rung guided` or by manually copying from Click's clipboard (format 522). + +### 2. Compare with inspect_cells + +Run `inspect_cells` on both the native capture and your encoded output. Focus on the AF column and any instruction cells: + +```python +from laddercodec.decode import inspect_cells + +native = open("native.bin", "rb").read() +ours = open("encoded.bin", "rb").read() + +cells = [(0, r, "AF") for r in range(3)] + +for label, data in [("NATIVE", native), ("OURS", ours)]: + print(f"\n=== {label} ===") + for d in inspect_cells(data, cells): + hdr = d.raw[:0x25] + print(f"[{d.row}][{d.col}] size={d.size} flags={d.flags}") + print(f" row_span={hdr[0x09]} vis_rows={hdr[0x0A]}") + print(f" instr_idx={int.from_bytes(hdr[0x0D:0x11], 'little', signed=True)}") + print(f" tail: {d.raw[-16:].hex(' ')}") +``` + +### 3. What to look for + +The cell header is 0x25 (37) bytes. Key fields: + +| Offset | Field | Notes | +|---|---|---| +| +0x01 | column | 4-byte LE, should match the column index | +| +0x05 | global_row | 4-byte LE, `row + 1` (varies by rung position — ok to differ) | +| +0x09 | row_span | How many grid rows this cell occupies | +| +0x0A | visual_rows | Visual sub-row count (1 = normal, 2 = timer, 3 = retained timer) | +| +0x0D | instr_index | 4-byte LE signed. `-1` (0xFFFFFFFF) for data cells | +| +0x15 | contact_flag | 4-byte LE | +| +0x19 | segment | 4-byte LE — load-bearing flag | +| +0x1D | wire_right | 4-byte LE | +| +0x21 | wire_down | 4-byte LE | + +After the header: instruction blob (variable length), then a 16-byte tail. + +**Size differences** are the most important signal. If a cell is 32 bytes larger in your output, you're probably emitting an AF summary block that shouldn't be there — or vice versa. + +**Flag differences** (segment, wire_right, wire_down) can cause visual corruption but usually don't crash Click. + +**Tail differences** are usually cosmetic (rung index encoding, row hints). + +### 4. Don't bother with hex diffs + +Raw `xxd` / hex diffs of the two binaries are nearly useless because: + +- The global header (0x0000–0x0253) contains file paths, font tables, and MDB data that differ between every capture. +- Instruction cells are variable-length, so a single size difference shifts every subsequent byte. +- Comment RTF formatting varies slightly (e.g. `\par ` vs `\r\n\par `), shifting the entire grid region. + +`inspect_cells` handles all of this — it walks the variable-length grid correctly and gives you per-cell structural data. + +## Known pitfalls + +### AF summary block + +When a single-rung buffer has 2+ AF instructions, the encoder appends a 32-byte summary block to the last AF cell. **This summary must be suppressed when any AF is multi-row** (e.g. a retained timer with a `.reset()` pin row). + +The summary is built by `_build_af_summary()` in `encode.py` and gated by `_compute_rung_metadata()` in `_grid.py`. If your rung has a mix of single-row AFs (coils, copies) and multi-row AFs (timers), check that the summary isn't being emitted. + +Symptom: the last AF instruction cell is ~32 bytes larger than in the native capture. Click crashes on paste because the extra bytes misalign the rest of the grid. + +### Tall instruction visual_rows + +The byte at cell offset +0x0A (`visual_rows`) is not always the same as the instruction's `cell_params()["visual_rows"]`. For instructions with pin rows (retained timers, counters, shifts, drums), the native binary may use a different value that accounts for the pin rows. + +If your instruction cell is the right size but Click still renders it wrong, compare +0x09 and +0x0A against the native capture. + +### Payload space slots + +The 31 slots between the rung preamble (0x0260) and the cell grid (0x0A60) contain structural bytes that differ between our encoder and native Click. These differences are tolerated — Click reads them but doesn't crash on mismatches. Don't chase these. diff --git a/mkdocs.yml b/mkdocs.yml index 36447cf..b24149c 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -13,6 +13,7 @@ nav: - Decoding: guides/decoding.md - CSV Format: guides/csv-format.md - Adding Instructions: guides/adding-instructions.md + - Troubleshooting: guides/troubleshooting.md - Internals: - Binary Format: internals/binary-format.md - Wire Rendering: internals/wire-rendering.md From 48ca13efa330767d5f41654cd81c3004a5525a8b Mon Sep 17 00:00:00 2001 From: ssweber <57631333+ssweber@users.noreply.github.com> Date: Tue, 7 Apr 2026 13:04:30 -0400 Subject: [PATCH 04/15] fix(csv): support roundtrip for multiple pinned AF blocks --- src/laddercodec/csv/converter.py | 620 ++++++++++++++++++++----------- src/laddercodec/csv/writer.py | 412 +++++++++++++------- tests/csv/test_writer.py | 183 +++++++++ 3 files changed, 854 insertions(+), 361 deletions(-) diff --git a/src/laddercodec/csv/converter.py b/src/laddercodec/csv/converter.py index 23083ca..a904436 100644 --- a/src/laddercodec/csv/converter.py +++ b/src/laddercodec/csv/converter.py @@ -34,7 +34,7 @@ from __future__ import annotations -from dataclasses import replace +from dataclasses import dataclass, field, replace from typing import cast from ..encode import AfToken, ConditionToken @@ -78,6 +78,372 @@ # --------------------------------------------------------------------------- CONDITION_COLUMNS = 31 +_BLOCK_FAMILIES = frozenset({"timer", "counter", "shift", "drum"}) + + +@dataclass +class _ParsedRow: + """One CSV data row converted into condition tokens plus AF classification.""" + + conditions: list[ConditionToken] + kind: str + token: AfToken = "" + family_name: str | None = None + pin_name: str | None = None + pin_args: tuple[str, ...] = () + + +@dataclass +class _ActiveBlock: + """A multi-row AF block that is still gathering continuation rows.""" + + family_name: str + token: Timer | Counter | Shift | Drum + main_conditions: list[ConditionToken] + leading_blank_conditions: list[ConditionToken] | None = None + continuations: list[_ParsedRow] = field(default_factory=list) + + +def _blank_condition_row() -> list[ConditionToken]: + """Return one all-blank condition row.""" + return cast(list[ConditionToken], [""] * CONDITION_COLUMNS) + + +def _parse_rung_row(row: object, *, strict: bool) -> _ParsedRow: + """Convert one parsed CSV row into tokens plus AF-row classification.""" + conds = [condition_node_to_token(n) for n in row.condition_nodes] + af_node = row.af_node + + if isinstance(af_node, AfCall) and af_node.name.startswith("."): + pin_name = af_node.name + if pin_name not in KNOWN_PIN_NAMES: + raise ConvertError(f"Unknown pin token: {pin_name!r}") + return _ParsedRow( + conditions=conds, + kind="pin", + pin_name=pin_name, + pin_args=af_node.args, + ) + + token = af_node_to_token(af_node, strict=strict) + if token == "": + return _ParsedRow(conditions=conds, kind="blank") + if token == "NOP": + return _ParsedRow(conditions=conds, kind="nop", token="NOP") + + family = get_af_family_for_token(token) + if family is not None and family.family_name in _BLOCK_FAMILIES: + return _ParsedRow( + conditions=conds, + kind="block_main", + token=token, + family_name=family.family_name, + ) + return _ParsedRow(conditions=conds, kind="main", token=token) + + +def _flush_plain_rows( + plain_rows: list[_ParsedRow], + condition_rows: list[list[ConditionToken]], + af_tokens: list[AfToken], +) -> None: + """Append buffered non-block rows to the final decode-ready output.""" + for row in plain_rows: + condition_rows.append(row.conditions) + af_tokens.append(row.token) + plain_rows.clear() + + +def _start_active_block( + row: _ParsedRow, + *, + leading_blank_conditions: list[ConditionToken] | None = None, +) -> _ActiveBlock: + """Create a new active multi-row AF block from its main AF row.""" + if not isinstance(row.token, (Timer, Counter, Shift, Drum)): + raise AssertionError("block_main row must carry a timer/counter/shift/drum token") + return _ActiveBlock( + family_name=cast(str, row.family_name), + token=row.token, + main_conditions=row.conditions, + leading_blank_conditions=leading_blank_conditions, + ) + + +def _consume_active_row(block: _ActiveBlock, row: _ParsedRow) -> bool: + """Consume a continuation row for the active block when it belongs there.""" + if row.kind == "blank": + block.continuations.append(row) + return True + + if row.kind == "pin": + pin_name = cast(str, row.pin_name) + if isinstance(block.token, Counter): + if pin_name not in {".down", ".reset"}: + raise ConvertError( + f"Pin token {pin_name!r} is not supported for {block.token.counter_type}()" + ) + block.continuations.append(row) + return True + + if isinstance(block.token, Drum): + if pin_name not in {".reset", ".jump", ".jog"}: + raise ConvertError( + f"Pin token {pin_name!r} is not supported for {block.token.drum_kind}_drum()" + ) + block.continuations.append(row) + return True + + if isinstance(block.token, Shift): + if pin_name not in {".clock", ".reset"}: + raise ConvertError(f"Pin token {pin_name!r} is not supported for shift()") + block.continuations.append(row) + return True + + # Timers remain lenient for non-.reset() pin rows: absorb them as blank rows. + if isinstance(block.token, Timer): + block.continuations.append(row) + return True + + if isinstance(block.token, Counter) and block.token.counter_type == "count_down": + if row.kind == "nop": + block.continuations.append(row) + return True + + return False + + +def _finalize_timer_block(block: _ActiveBlock) -> tuple[list[list[ConditionToken]], list[AfToken]]: + """Expand one timer block back to its decoded 2-row span.""" + timer = cast(Timer, block.token) + if len(block.continuations) > 1: + raise ConvertError("Timer block uses more than one continuation row") + + continuation_conditions = _blank_condition_row() + if block.continuations: + continuation = block.continuations[0] + if continuation.kind == "nop": + raise ConvertError("Timer continuation rows cannot use NOP") + continuation_conditions = continuation.conditions + if continuation.kind == "pin" and continuation.pin_name in _RETENTIVE_PINS: + timer = replace(timer, retained=True) + + return [block.main_conditions, continuation_conditions], [timer, ""] + + +def _finalize_count_up_block( + block: _ActiveBlock, +) -> tuple[list[list[ConditionToken]], list[AfToken]]: + """Expand one count_up block back to its decoded 3-row span.""" + counter = cast(Counter, block.token) + middle_conditions: list[ConditionToken] | None = None + reset_conditions: list[ConditionToken] | None = None + + for row in block.continuations: + if row.kind == "nop": + raise ConvertError("count_up does not support NOP bridge rows") + + if row.kind == "pin": + pin_name = cast(str, row.pin_name) + if pin_name == ".down": + counter = replace(counter, down_enabled=True) + middle_conditions = row.conditions + continue + if pin_name == ".reset": + counter = replace(counter, reset_enabled=True) + reset_conditions = row.conditions + continue + + if middle_conditions is None: + middle_conditions = row.conditions + continue + if reset_conditions is None: + reset_conditions = row.conditions + continue + raise ConvertError("count_up block uses more than two continuation rows") + + if not counter.reset_enabled: + raise ConvertError(f"{counter.counter_type} requires a .reset() pin row") + + if middle_conditions is None: + middle_conditions = _blank_condition_row() + if reset_conditions is None: + reset_conditions = _blank_condition_row() + + return [block.main_conditions, middle_conditions, reset_conditions], [counter, "", ""] + + +def _finalize_count_down_block( + block: _ActiveBlock, +) -> tuple[list[list[ConditionToken]], list[AfToken]]: + """Expand one count_down block back to its decoded 3-row span.""" + counter = cast(Counter, block.token) + explicit_bridge_conditions: list[ConditionToken] | None = None + reset_conditions: list[ConditionToken] | None = None + + for row in block.continuations: + if row.kind == "pin": + pin_name = cast(str, row.pin_name) + if pin_name == ".down": + raise ConvertError("count_down does not support .down()") + if pin_name == ".reset": + counter = replace(counter, reset_enabled=True) + reset_conditions = row.conditions + continue + + if row.kind == "nop": + if block.leading_blank_conditions is not None: + raise ConvertError("count_down block cannot mix a leading blank top row with NOP") + if explicit_bridge_conditions is not None: + raise ConvertError("count_down block uses more than one bridge row") + explicit_bridge_conditions = row.conditions + continue + + if block.leading_blank_conditions is None and explicit_bridge_conditions is None: + explicit_bridge_conditions = row.conditions + continue + if reset_conditions is None: + reset_conditions = row.conditions + continue + raise ConvertError("count_down block uses more than two continuation rows") + + if not counter.reset_enabled: + raise ConvertError(f"{counter.counter_type} requires a .reset() pin row") + + if reset_conditions is None: + reset_conditions = _blank_condition_row() + + if block.leading_blank_conditions is not None: + condition_rows = [ + block.leading_blank_conditions, + block.main_conditions, + reset_conditions, + ] + elif explicit_bridge_conditions is not None: + condition_rows = [ + block.main_conditions, + explicit_bridge_conditions, + reset_conditions, + ] + else: + condition_rows = [ + _blank_condition_row(), + block.main_conditions, + reset_conditions, + ] + + return condition_rows, [counter, "NOP", ""] + + +def _finalize_counter_block( + block: _ActiveBlock, +) -> tuple[list[list[ConditionToken]], list[AfToken]]: + """Expand one counter block back to its decoded 3-row span.""" + counter = cast(Counter, block.token) + if counter.counter_type == "count_up": + return _finalize_count_up_block(block) + return _finalize_count_down_block(block) + + +def _finalize_shift_block(block: _ActiveBlock) -> tuple[list[list[ConditionToken]], list[AfToken]]: + """Expand one shift block back to its decoded 3-row span.""" + shift = cast(Shift, block.token) + clock_conditions: list[ConditionToken] | None = None + reset_conditions: list[ConditionToken] | None = None + + for row in block.continuations: + if row.kind == "nop": + raise ConvertError("Shift continuation rows cannot use NOP") + + if row.kind == "pin": + pin_name = cast(str, row.pin_name) + if pin_name == ".clock": + clock_conditions = row.conditions + continue + if pin_name == ".reset": + reset_conditions = row.conditions + continue + + if clock_conditions is None: + clock_conditions = row.conditions + continue + if reset_conditions is None: + reset_conditions = row.conditions + continue + raise ConvertError("Shift block uses more than two continuation rows") + + if clock_conditions is None: + clock_conditions = _blank_condition_row() + if reset_conditions is None: + reset_conditions = _blank_condition_row() + + return [block.main_conditions, clock_conditions, reset_conditions], [shift, "", ""] + + +def _finalize_drum_block(block: _ActiveBlock) -> tuple[list[list[ConditionToken]], list[AfToken]]: + """Expand one drum block back to its decoded 4-row span.""" + drum = cast(Drum, block.token) + reset_conditions: list[ConditionToken] | None = None + jump_conditions: list[ConditionToken] | None = None + jog_conditions: list[ConditionToken] | None = None + + for row in block.continuations: + if row.kind == "nop": + raise ConvertError("Drum continuation rows cannot use NOP") + + if row.kind == "pin": + pin_name = cast(str, row.pin_name) + if pin_name == ".reset": + reset_conditions = row.conditions + continue + if pin_name == ".jump": + jump_target = row.pin_args[0] if row.pin_args else "" + drum = replace(drum, jump_enabled=True, jump_target=jump_target) + jump_conditions = row.conditions + continue + if pin_name == ".jog": + drum = replace(drum, jog_enabled=True) + jog_conditions = row.conditions + continue + + if reset_conditions is None: + reset_conditions = row.conditions + continue + if jump_conditions is None: + jump_conditions = row.conditions + continue + if jog_conditions is None: + jog_conditions = row.conditions + continue + raise ConvertError("Drum block uses more than three continuation rows") + + if reset_conditions is None: + raise ConvertError("Drum requires a .reset() pin row") + + if jump_conditions is None: + jump_conditions = _blank_condition_row() + if jog_conditions is None: + jog_conditions = _blank_condition_row() + + return [ + block.main_conditions, + reset_conditions, + jump_conditions, + jog_conditions, + ], [drum, "", "", ""] + + +def _finalize_active_block(block: _ActiveBlock) -> tuple[list[list[ConditionToken]], list[AfToken]]: + """Expand the current active block into decode-ready rows/tokens.""" + if isinstance(block.token, Timer): + return _finalize_timer_block(block) + if isinstance(block.token, Counter): + return _finalize_counter_block(block) + if isinstance(block.token, Shift): + return _finalize_shift_block(block) + if isinstance(block.token, Drum): + return _finalize_drum_block(block) + raise AssertionError(f"Unsupported active block token: {type(block.token).__name__}") def condition_node_to_token(node: object) -> ConditionToken: @@ -167,234 +533,46 @@ def convert_rung( lines = [r.canonical.conditions[0] for r in rung.comment_rows] comment = "\n".join(lines) - # --- Classify rows: instruction rows vs pin rows --- condition_rows: list[list[ConditionToken]] = [] af_tokens: list[AfToken] = [] - parent_timer: Timer | None = None - parent_counter: Counter | None = None - parent_shift: Shift | None = None - parent_drum: Drum | None = None - parent_af_idx: int | None = None - counter_up_conditions: list[ConditionToken] | None = None - counter_top_conditions: list[ConditionToken] | None = None - counter_bridge_conditions: list[ConditionToken] | None = None - counter_down_conditions: list[ConditionToken] | None = None - counter_reset_conditions: list[ConditionToken] | None = None - shift_data_conditions: list[ConditionToken] | None = None - shift_clock_conditions: list[ConditionToken] | None = None - shift_reset_conditions: list[ConditionToken] | None = None - drum_reset_conditions: list[ConditionToken] | None = None - drum_jump_conditions: list[ConditionToken] | None = None - drum_jog_conditions: list[ConditionToken] | None = None - - for row_idx, row in enumerate(rung.rows): - af_node = row.af_node - - # Check for pin row (dot-prefixed AF token). - if isinstance(af_node, AfCall) and af_node.name.startswith("."): - pin_name = af_node.name - if pin_name not in KNOWN_PIN_NAMES: - raise ConvertError(f"Unknown pin token: {pin_name!r}") - - conds = [condition_node_to_token(n) for n in row.condition_nodes] - - if parent_counter is not None: - if pin_name == ".down": - if parent_counter.counter_type != "count_up": - raise ConvertError(".down() is only valid for count_up()") - counter_down_conditions = conds - parent_counter = replace(parent_counter, down_enabled=True) - af_tokens[parent_af_idx] = parent_counter - continue - - if pin_name == ".reset": - counter_reset_conditions = conds - parent_counter = replace(parent_counter, reset_enabled=True) - af_tokens[parent_af_idx] = parent_counter - continue - - raise ConvertError( - f"Pin token {pin_name!r} is not supported for {parent_counter.counter_type}()" - ) - - if parent_drum is not None: - if pin_name == ".reset": - drum_reset_conditions = conds - continue - if pin_name == ".jump": - jump_target = af_node.args[0] if af_node.args else "" - parent_drum = replace(parent_drum, jump_enabled=True, jump_target=jump_target) - af_tokens[parent_af_idx] = parent_drum - drum_jump_conditions = conds - continue - if pin_name == ".jog": - parent_drum = replace(parent_drum, jog_enabled=True) - af_tokens[parent_af_idx] = parent_drum - drum_jog_conditions = conds - continue - raise ConvertError( - f"Pin token {pin_name!r} is not supported for {parent_drum.drum_kind}_drum()" - ) - - if parent_shift is not None: - if pin_name == ".clock": - shift_clock_conditions = conds - continue - if pin_name == ".reset": - shift_reset_conditions = conds - continue - raise ConvertError(f"Pin token {pin_name!r} is not supported for shift()") - - if pin_name in _RETENTIVE_PINS and parent_timer is not None: - # .reset() makes the parent timer retentive. - parent_timer = replace(parent_timer, retained=True) - af_tokens[parent_af_idx] = parent_timer - - # Pin row contributes its conditions/wires as a normal data row, - # but the AF token becomes blank (no separate instruction). - condition_rows.append(conds) - af_tokens.append("") - continue - - # Normal row — convert conditions. - conds = [condition_node_to_token(n) for n in row.condition_nodes] - - # Convert AF token. - token = af_node_to_token(af_node, strict=strict) - if ( - parent_counter is None - and isinstance(token, Counter) - and token.counter_type == "count_down" - and row_idx > 0 - ): - if len(condition_rows) != 1 or af_tokens != [""]: - raise ConvertError("count_down visual layout requires exactly one blank-AF top row") - counter_top_conditions = condition_rows.pop() - af_tokens.pop() - condition_rows.append(conds) - af_tokens.append(token) - parent_counter = token - parent_af_idx = len(af_tokens) - 1 - counter_up_conditions = conds + plain_rows: list[_ParsedRow] = [] + active_block: _ActiveBlock | None = None + + for row in rung.rows: + parsed = _parse_rung_row(row, strict=strict) + + if active_block is not None: + if _consume_active_row(active_block, parsed): + continue + block_conditions, block_afs = _finalize_active_block(active_block) + condition_rows.extend(block_conditions) + af_tokens.extend(block_afs) + active_block = None + + if parsed.kind == "block_main": + leading_blank_conditions: list[ConditionToken] | None = None + if isinstance(parsed.token, Counter) and parsed.token.counter_type == "count_down": + if plain_rows and plain_rows[-1].kind == "blank": + leading_blank_conditions = plain_rows.pop().conditions + _flush_plain_rows(plain_rows, condition_rows, af_tokens) + active_block = _start_active_block( + parsed, + leading_blank_conditions=leading_blank_conditions, + ) continue - if ( - parent_counter is not None - and parent_counter.counter_type == "count_down" - and row_idx > 0 - and counter_bridge_conditions is None - and token in ("", "NOP") - ): - counter_bridge_conditions = conds + if parsed.kind == "pin": + plain_rows.append(_ParsedRow(conditions=parsed.conditions, kind="blank")) continue - condition_rows.append(conds) - af_tokens.append(token) - if isinstance(af_node, AfCall) and af_node.name.upper() != "NOP": - if isinstance(token, Timer) and parent_timer is None: - parent_timer = token - parent_af_idx = len(af_tokens) - 1 - if isinstance(token, Counter) and parent_counter is None: - parent_counter = token - parent_af_idx = len(af_tokens) - 1 - counter_up_conditions = conds - if isinstance(token, Shift) and parent_shift is None: - parent_shift = token - parent_af_idx = len(af_tokens) - 1 - shift_data_conditions = conds - if isinstance(token, Drum) and parent_drum is None: - parent_drum = token - parent_af_idx = len(af_tokens) - 1 - - # --- Drum row shaping --- - if parent_drum is not None: - if drum_reset_conditions is None: - raise ConvertError("Drum requires a .reset() pin row") - - if len(condition_rows) != 1: - raise ConvertError( - "Drum pin rows must use .reset()/.jump()/.jog() tokens " - "(blank-AF continuation rows are unsupported)" - ) + plain_rows.append(parsed) - main_conds = condition_rows[0] - blank_row = cast(list[ConditionToken], [""] * CONDITION_COLUMNS) + if active_block is not None: + block_conditions, block_afs = _finalize_active_block(active_block) + condition_rows.extend(block_conditions) + af_tokens.extend(block_afs) - condition_rows = [ - main_conds, - drum_reset_conditions, - drum_jump_conditions if drum_jump_conditions is not None else blank_row, - drum_jog_conditions if drum_jog_conditions is not None else blank_row, - ] - af_tokens = [parent_drum, "", "", ""] - - # --- Counter row shaping --- - if parent_counter is not None: - if counter_up_conditions is None: - raise ConvertError("Counter rung is missing a primary row") - - if len(condition_rows) != 1: - raise ConvertError( - "Counter pin rows must use .down()/.reset() tokens (blank-AF continuation rows are unsupported)" - ) - - if counter_reset_conditions is None: - raise ConvertError(f"{parent_counter.counter_type} requires a .reset() pin row") - - assert counter_up_conditions is not None - assert counter_reset_conditions is not None - blank_row = cast(list[ConditionToken], [""] * CONDITION_COLUMNS) - - if parent_counter.counter_type == "count_up": - if parent_counter.down_enabled: - if counter_down_conditions is None: - raise ConvertError("count_up with .down() is missing down conditions") - middle_row = counter_down_conditions - else: - middle_row = blank_row - condition_rows = [counter_up_conditions, middle_row, counter_reset_conditions] - af_tokens = [parent_counter, "", ""] - else: - if counter_down_conditions is not None: - raise ConvertError("count_down does not support .down()") - if counter_top_conditions is not None: - condition_rows = [ - counter_top_conditions, - counter_up_conditions, - counter_reset_conditions, - ] - elif counter_bridge_conditions is not None: - condition_rows = [ - counter_up_conditions, - counter_bridge_conditions, - counter_reset_conditions, - ] - else: - condition_rows = [blank_row, counter_up_conditions, counter_reset_conditions] - af_tokens = [parent_counter, "NOP", ""] - - # --- Shift row shaping --- - if parent_shift is not None: - if shift_data_conditions is None: - raise ConvertError("Shift rung is missing a primary row") - - if len(condition_rows) != 1: - raise ConvertError( - "Shift pin rows must use .clock()/.reset() tokens (blank-AF continuation rows are unsupported)" - ) - - clock_conditions = ( - shift_clock_conditions - if shift_clock_conditions is not None - else cast(list[ConditionToken], [""] * CONDITION_COLUMNS) - ) - reset_conditions = ( - shift_reset_conditions - if shift_reset_conditions is not None - else cast(list[ConditionToken], [""] * CONDITION_COLUMNS) - ) - condition_rows = [shift_data_conditions, clock_conditions, reset_conditions] - af_tokens = [parent_shift, "", ""] + _flush_plain_rows(plain_rows, condition_rows, af_tokens) # --- Auto-pad for tall instructions --- af0 = af_tokens[0] if af_tokens else "" diff --git a/src/laddercodec/csv/writer.py b/src/laddercodec/csv/writer.py index d290efe..c88d2bf 100644 --- a/src/laddercodec/csv/writer.py +++ b/src/laddercodec/csv/writer.py @@ -29,7 +29,6 @@ import csv as csv_mod from collections.abc import Sequence from pathlib import Path -from typing import cast from ..decode import Rung from ..instructions import ( @@ -86,6 +85,234 @@ def _conditions_are_blank(conditions: Sequence[object]) -> bool: return all(c == "" for c in conditions) +def _append_data_row( + rows: list[list[str]], + data_row_count: int, + conditions: Sequence[object], + af: object, +) -> int: + """Append one CSV data row and return the new emitted-data-row count.""" + marker = "R" if data_row_count == 0 else "" + rows.append([marker] + [_token_to_csv(c) for c in conditions] + [_token_to_csv(af)]) + return data_row_count + 1 + + +def _emit_blank_continuation( + rows: list[list[str]], + data_row_count: int, + conditions: Sequence[object], +) -> int: + """Emit a blank-AF continuation row only when it carries geometry.""" + if _conditions_are_blank(conditions): + return data_row_count + return _append_data_row(rows, data_row_count, conditions, "") + + +def _require_blank_af(af: object, *, message: str) -> None: + """Ensure a consumed continuation row does not hide another AF token.""" + if af != "": + raise WriterError(message) + + +def _emit_timer_block( + rows: list[list[str]], + data_row_count: int, + condition_rows: Sequence[Sequence[object]], + af_tokens: Sequence[object], + start: int, + timer: Timer, +) -> tuple[int, int]: + """Emit one timer block and return ``(new_count, consumed_rows)``.""" + needed = start + 2 + if len(condition_rows) < needed or len(af_tokens) < needed: + raise WriterError(f"timer requires {needed} decoded rows; got {len(condition_rows)}") + + _require_blank_af( + af_tokens[start + 1], + message="timer continuation row must be blank AF", + ) + + data_row_count = _append_data_row(rows, data_row_count, condition_rows[start], timer) + if timer.retained: + data_row_count = _append_data_row(rows, data_row_count, condition_rows[start + 1], ".reset()") + else: + data_row_count = _emit_blank_continuation( + rows, data_row_count, condition_rows[start + 1] + ) + return data_row_count, 2 + + +def _emit_counter_block( + rows: list[list[str]], + data_row_count: int, + condition_rows: Sequence[Sequence[object]], + af_tokens: Sequence[object], + start: int, + counter: Counter, +) -> tuple[int, int]: + """Emit one counter block and return ``(new_count, consumed_rows)``.""" + needed = start + 3 + if len(condition_rows) < needed or len(af_tokens) < needed: + raise WriterError( + f"{counter.counter_type} requires {needed} decoded rows; got {len(condition_rows)}" + ) + + if counter.counter_type == "count_up": + _require_blank_af( + af_tokens[start + 1], + message="count_up continuation row must be blank AF", + ) + _require_blank_af( + af_tokens[start + 2], + message="count_up reset row must be blank AF", + ) + data_row_count = _append_data_row(rows, data_row_count, condition_rows[start], counter) + if counter.down_enabled: + data_row_count = _append_data_row(rows, data_row_count, condition_rows[start + 1], ".down()") + else: + data_row_count = _emit_blank_continuation( + rows, data_row_count, condition_rows[start + 1] + ) + if counter.reset_enabled: + data_row_count = _append_data_row( + rows, data_row_count, condition_rows[start + 2], ".reset()" + ) + else: + data_row_count = _emit_blank_continuation( + rows, data_row_count, condition_rows[start + 2] + ) + return data_row_count, 3 + + if af_tokens[start + 1] != "NOP": + raise WriterError("count_down requires a NOP bridge row after the counter") + _require_blank_af( + af_tokens[start + 2], + message="count_down reset row must be blank AF", + ) + + counter_conditions = condition_rows[start] + bridge_conditions = condition_rows[start + 1] + if _conditions_are_blank(counter_conditions): + data_row_count = _append_data_row(rows, data_row_count, bridge_conditions, counter) + else: + data_row_count = _append_data_row(rows, data_row_count, counter_conditions, "") + data_row_count = _append_data_row(rows, data_row_count, bridge_conditions, counter) + + if counter.reset_enabled: + data_row_count = _append_data_row( + rows, data_row_count, condition_rows[start + 2], ".reset()" + ) + else: + data_row_count = _emit_blank_continuation( + rows, data_row_count, condition_rows[start + 2] + ) + return data_row_count, 3 + + +def _emit_shift_block( + rows: list[list[str]], + data_row_count: int, + condition_rows: Sequence[Sequence[object]], + af_tokens: Sequence[object], + start: int, + shift: Shift, +) -> tuple[int, int]: + """Emit one shift block and return ``(new_count, consumed_rows)``.""" + needed = start + 3 + if len(condition_rows) < needed or len(af_tokens) < needed: + raise WriterError(f"shift requires {needed} decoded rows; got {len(condition_rows)}") + + _require_blank_af( + af_tokens[start + 1], + message="shift .clock() row must be blank AF", + ) + _require_blank_af( + af_tokens[start + 2], + message="shift .reset() row must be blank AF", + ) + + data_row_count = _append_data_row(rows, data_row_count, condition_rows[start], shift) + data_row_count = _append_data_row(rows, data_row_count, condition_rows[start + 1], ".clock()") + data_row_count = _append_data_row(rows, data_row_count, condition_rows[start + 2], ".reset()") + return data_row_count, 3 + + +def _emit_drum_block( + rows: list[list[str]], + data_row_count: int, + condition_rows: Sequence[Sequence[object]], + af_tokens: Sequence[object], + start: int, + drum: Drum, +) -> tuple[int, int]: + """Emit one drum block and return ``(new_count, consumed_rows)``.""" + needed = start + 4 + if len(condition_rows) < needed or len(af_tokens) < needed: + raise WriterError(f"Drum requires {needed} decoded rows; got {len(condition_rows)}") + + _require_blank_af( + af_tokens[start + 1], + message="drum .reset() row must be blank AF", + ) + _require_blank_af( + af_tokens[start + 2], + message="drum .jump() row must be blank AF", + ) + _require_blank_af( + af_tokens[start + 3], + message="drum .jog() row must be blank AF", + ) + + data_row_count = _append_data_row(rows, data_row_count, condition_rows[start], drum) + data_row_count = _append_data_row(rows, data_row_count, condition_rows[start + 1], ".reset()") + if drum.jump_enabled: + data_row_count = _append_data_row( + rows, + data_row_count, + condition_rows[start + 2], + f".jump({drum.jump_target})", + ) + else: + data_row_count = _emit_blank_continuation( + rows, data_row_count, condition_rows[start + 2] + ) + if drum.jog_enabled: + data_row_count = _append_data_row(rows, data_row_count, condition_rows[start + 3], ".jog()") + else: + data_row_count = _emit_blank_continuation( + rows, data_row_count, condition_rows[start + 3] + ) + return data_row_count, 4 + + +def _emit_generic_tall_block( + rows: list[list[str]], + data_row_count: int, + condition_rows: Sequence[Sequence[object]], + af_tokens: Sequence[object], + start: int, + af: AfInstruction, +) -> tuple[int, int]: + """Emit a non-pinned tall AF block, preserving only nonblank continuation rows.""" + visual_rows = int(af.cell_params().get("visual_rows", 1)) + needed = start + visual_rows + if len(condition_rows) < needed or len(af_tokens) < needed: + raise WriterError( + f"{type(af).__name__} requires {needed} decoded rows; got {len(condition_rows)}" + ) + + data_row_count = _append_data_row(rows, data_row_count, condition_rows[start], af) + for offset in range(1, visual_rows): + _require_blank_af( + af_tokens[start + offset], + message=f"{type(af).__name__} continuation row must be blank AF", + ) + data_row_count = _emit_blank_continuation( + rows, data_row_count, condition_rows[start + offset] + ) + return data_row_count, visual_rows + + def decoded_rung_to_rows(rung: Rung) -> list[list[str]]: """Convert a ``Rung`` to a list of CSV row lists. @@ -94,7 +321,8 @@ def decoded_rung_to_rows(rung: Rung) -> list[list[str]]: Data rows have marker ``"R"`` (first) or ``""`` (continuation). Retained timers produce a ``.reset()`` pin row from the second - grid row. Non-retained timers strip trailing blank padding. + grid row. Multi-row AF families are streamed in order so multiple + pinned/tall blocks survive CSV round-trip without truncation. """ rows: list[list[str]] = [] @@ -103,153 +331,57 @@ def decoded_rung_to_rows(rung: Rung) -> list[list[str]]: for line in rung.comment.split("\n"): rows.append(["#", line]) - # Build working copies for potential stripping. condition_rows = list(rung.conditions) af_tokens = list(rung.instructions) + data_row_count = 0 + row_idx = 0 + + while row_idx < len(condition_rows): + af = af_tokens[row_idx] + consumed = 1 - # --- Find pinned AF instruction (counter/timer/shift/drum) at any row --- - pinned_row: int | None = None - pinned_af: AfInstruction | None = None - pinned_family: str | None = None - for idx, af in enumerate(af_tokens): if isinstance(af, AfInstruction): - fam = get_af_family_for_token(af) - if fam is not None and fam.family_name in ("counter", "timer", "shift", "drum"): - pinned_row = idx - pinned_af = af - pinned_family = fam.family_name - break - - is_retained_timer = ( - pinned_family == "timer" and isinstance(pinned_af, Timer) and pinned_af.retained - ) - is_tall = any( - isinstance(af, AfInstruction) and af.cell_params().get("visual_rows", 1) > 1 - for af in af_tokens - ) + family = get_af_family_for_token(af) + family_name = family.family_name if family is not None else None - # Helper: emit regular data rows for indices [0, up_to). - def _emit_prefix(up_to: int) -> None: - for i in range(up_to): - marker = "R" if i == 0 else "" - rows.append( - [marker] - + [_token_to_csv(c) for c in condition_rows[i]] - + [_token_to_csv(af_tokens[i])] - ) + if family_name == "timer" and isinstance(af, Timer): + data_row_count, consumed = _emit_timer_block( + rows, data_row_count, condition_rows, af_tokens, row_idx, af + ) + row_idx += consumed + continue - if pinned_family == "counter": - assert pinned_row is not None - counter = cast(Counter, pinned_af) - needed = pinned_row + 3 - if len(condition_rows) < needed or len(af_tokens) < needed: - raise WriterError( - f"{counter.counter_type} requires {needed} decoded rows; got {len(condition_rows)}" - ) - _emit_prefix(pinned_row) - first_marker = "R" if pinned_row == 0 else "" - - if counter.counter_type == "count_up": - rows.append( - [first_marker] - + [_token_to_csv(c) for c in condition_rows[pinned_row]] - + [counter.to_csv()] - ) - if counter.down_enabled: - rows.append( - [""] + [_token_to_csv(c) for c in condition_rows[pinned_row + 1]] + [".down()"] + if family_name == "counter" and isinstance(af, Counter): + data_row_count, consumed = _emit_counter_block( + rows, data_row_count, condition_rows, af_tokens, row_idx, af + ) + row_idx += consumed + continue + + if family_name == "shift" and isinstance(af, Shift): + data_row_count, consumed = _emit_shift_block( + rows, data_row_count, condition_rows, af_tokens, row_idx, af ) - if counter.reset_enabled: - rows.append( - [""] + [_token_to_csv(c) for c in condition_rows[pinned_row + 2]] + [".reset()"] + row_idx += consumed + continue + + if family_name == "drum" and isinstance(af, Drum): + data_row_count, consumed = _emit_drum_block( + rows, data_row_count, condition_rows, af_tokens, row_idx, af ) - return rows - - # count_down: NOP bridge row follows the counter. - bridge_row = pinned_row + 1 - if af_tokens[bridge_row] != "NOP": - raise WriterError("count_down requires a NOP bridge row after the counter") - counter_conditions = condition_rows[pinned_row] - bridge_conditions = condition_rows[bridge_row] - if _conditions_are_blank(counter_conditions): - rows.append( - [first_marker] + [_token_to_csv(c) for c in bridge_conditions] + [counter.to_csv()] - ) - else: - rows.append([first_marker] + [_token_to_csv(c) for c in counter_conditions] + [""]) - rows.append([""] + [_token_to_csv(c) for c in bridge_conditions] + [counter.to_csv()]) - if counter.reset_enabled: - rows.append( - [""] + [_token_to_csv(c) for c in condition_rows[bridge_row + 1]] + [".reset()"] - ) - return rows - - if pinned_family == "shift": - assert pinned_row is not None - shift = cast(Shift, pinned_af) - needed = pinned_row + 3 - if len(condition_rows) < needed or len(af_tokens) < needed: - raise WriterError(f"shift requires {needed} decoded rows; got {len(condition_rows)}") - _emit_prefix(pinned_row) - marker = "R" if pinned_row == 0 else "" - rows.append( - [marker] + [_token_to_csv(c) for c in condition_rows[pinned_row]] + [shift.to_csv()] - ) - rows.append( - [""] + [_token_to_csv(c) for c in condition_rows[pinned_row + 1]] + [".clock()"] - ) - rows.append( - [""] + [_token_to_csv(c) for c in condition_rows[pinned_row + 2]] + [".reset()"] - ) - return rows - - if pinned_family == "drum": - assert pinned_row is not None - drum = cast(Drum, pinned_af) - needed = pinned_row + 4 - if len(condition_rows) < needed or len(af_tokens) < needed: - raise WriterError(f"Drum requires {needed} decoded rows; got {len(condition_rows)}") - _emit_prefix(pinned_row) - marker = "R" if pinned_row == 0 else "" - rows.append( - [marker] + [_token_to_csv(c) for c in condition_rows[pinned_row]] + [drum.to_csv()] - ) - rows.append( - [""] + [_token_to_csv(c) for c in condition_rows[pinned_row + 1]] + [".reset()"] - ) - if drum.jump_enabled: - rows.append( - [""] - + [_token_to_csv(c) for c in condition_rows[pinned_row + 2]] - + [f".jump({drum.jump_target})"] - ) - if drum.jog_enabled: - rows.append( - [""] + [_token_to_csv(c) for c in condition_rows[pinned_row + 3]] + [".jog()"] - ) - return rows - - if is_retained_timer: - # Retained timer: keep all rows — the row after the timer is a .reset() pin. - pass - elif is_tall: - # Tall instruction (timer/copy/search): strip trailing blank padding rows. - while len(condition_rows) > 1 and _is_blank_row(condition_rows[-1], af_tokens[-1]): - condition_rows = condition_rows[:-1] - af_tokens = af_tokens[:-1] - - # --- Data rows --- - for i, (conditions, af) in enumerate(zip(condition_rows, af_tokens, strict=True)): - marker = "R" if i == 0 else "" - - # Pin row: retained timer's row after the timer gets .reset() as AF. - if is_retained_timer and pinned_row is not None and i == pinned_row + 1: - af_str = ".reset()" - else: - af_str = _token_to_csv(af) + row_idx += consumed + continue + + visual_rows = int(af.cell_params().get("visual_rows", 1)) + if visual_rows > 1: + data_row_count, consumed = _emit_generic_tall_block( + rows, data_row_count, condition_rows, af_tokens, row_idx, af + ) + row_idx += consumed + continue - cond_strs = [_token_to_csv(c) for c in conditions] - rows.append([marker] + cond_strs + [af_str]) + data_row_count = _append_data_row(rows, data_row_count, condition_rows[row_idx], af) + row_idx += consumed return rows diff --git a/tests/csv/test_writer.py b/tests/csv/test_writer.py index 2226b4e..aebc621 100644 --- a/tests/csv/test_writer.py +++ b/tests/csv/test_writer.py @@ -40,6 +40,21 @@ def _golden_paths() -> list[Path]: return [b for b in bins if b.with_suffix(".csv").exists()] +def _blank_conditions() -> list[object]: + """Return one all-blank decoded condition row.""" + return [""] * 31 + + +def _contact_row(operand: str) -> list[object]: + """Return a simple NO-contact row with trailing wires.""" + return [Contact(InstructionType.CONTACT_NO, operand)] + ["-"] * 30 + + +def _edge_row(operand: str) -> list[object]: + """Return a simple rising-edge contact row with trailing wires.""" + return [Contact(InstructionType.CONTACT_EDGE, operand, edge_kind="rise")] + ["-"] * 30 + + # --------------------------------------------------------------------------- # Round-trip: golden.bin → decode → CSV → read_csv → compare # --------------------------------------------------------------------------- @@ -306,6 +321,78 @@ def test_count_down_preserves_wire_only_top_row(self) -> None: assert rows[1][1] == "rise(C78)" assert rows[2][32] == ".reset()" + def test_count_up_then_retained_timer_emits_both_blocks(self) -> None: + counter = Counter( + counter_type="count_up", + done_bit="CT5", + current="CTD5", + preset="100", + down_enabled=False, + reset_enabled=True, + ) + timer = Timer("on_delay", "T5", "TD5", "10", "Tm", retained=True) + rung = Rung( + logical_rows=5, + conditions=[ + _edge_row("C81"), + _blank_conditions(), + _contact_row("C82"), + _contact_row("C83"), + _contact_row("C84"), + ], + instructions=[counter, "", "", timer, ""], + comment_rtf=None, + comment=None, + ) + + rows = decoded_rung_to_rows(rung) + assert len(rows) == 4 + assert [row[32] for row in rows] == [ + "count_up(CT5,CTD5,preset=100)", + ".reset()", + "on_delay(T5,TD5,preset=10,unit=Tm)", + ".reset()", + ] + assert rows[1][1] == "C82" + assert rows[3][1] == "C84" + + def test_count_down_then_retained_timer_preserves_bridge_shape(self) -> None: + counter = Counter( + counter_type="count_down", + done_bit="CT6", + current="CTD6", + preset="25", + down_enabled=False, + reset_enabled=True, + ) + timer = Timer("on_delay", "T6", "TD6", "20", "Ts", retained=True) + rung = Rung( + logical_rows=5, + conditions=[ + _contact_row("C85"), + _edge_row("C86"), + _contact_row("C87"), + _contact_row("C88"), + _contact_row("C89"), + ], + instructions=[counter, "NOP", "", timer, ""], + comment_rtf=None, + comment=None, + ) + + rows = decoded_rung_to_rows(rung) + assert len(rows) == 5 + assert [row[32] for row in rows] == [ + "", + "count_down(CT6,CTD6,preset=25)", + ".reset()", + "on_delay(T6,TD6,preset=20,unit=Ts)", + ".reset()", + ] + assert rows[0][1] == "C85" + assert rows[1][1] == "rise(C86)" + assert rows[4][1] == "C89" + def test_shift_emits_clock_and_reset_pins(self) -> None: shift = Shift("C99", "C106") rung = Rung( @@ -326,6 +413,34 @@ def test_shift_emits_clock_and_reset_pins(self) -> None: assert rows[1][32] == ".clock()" assert rows[2][32] == ".reset()" + def test_shift_then_timer_preserves_nonblank_timer_continuation(self) -> None: + shift = Shift("C99", "C106") + timer = Timer("on_delay", "T7", "TD7", "30", "Tms", retained=False) + rung = Rung( + logical_rows=5, + conditions=[ + _contact_row("C90"), + _contact_row("C91"), + _contact_row("C92"), + _contact_row("C93"), + _contact_row("C94"), + ], + instructions=[shift, "", "", timer, ""], + comment_rtf=None, + comment=None, + ) + + rows = decoded_rung_to_rows(rung) + assert len(rows) == 5 + assert [row[32] for row in rows] == [ + "shift(C99..C106)", + ".clock()", + ".reset()", + "on_delay(T7,TD7,preset=30,unit=Tms)", + "", + ] + assert rows[4][1] == "C94" + def test_shift_requires_three_rows(self) -> None: shift = Shift("C99", "C106") rung = Rung( @@ -389,3 +504,71 @@ def test_unknown_instruction_raises(self) -> None: ) with pytest.raises(WriterError): decoded_rung_to_rows(rung) + + +class TestMultiPinnedRoundTrip: + def test_count_up_then_retained_timer_roundtrip(self, tmp_path: Path) -> None: + counter = Counter( + counter_type="count_up", + done_bit="CT8", + current="CTD8", + preset="100", + down_enabled=False, + reset_enabled=True, + ) + timer = Timer("on_delay", "T8", "TD8", "10", "Tm", retained=True) + rung = Rung( + logical_rows=5, + conditions=[ + _edge_row("C95"), + _blank_conditions(), + _contact_row("C96"), + _contact_row("C97"), + _contact_row("C98"), + ], + instructions=[counter, "", "", timer, ""], + comment_rtf=None, + comment=None, + ) + + out_csv = tmp_path / "count-up-timer.csv" + write_csv(out_csv, [rung]) + [round_tripped] = read_csv(out_csv) + + assert round_tripped.logical_rows == 5 + assert isinstance(round_tripped.instructions[0], Counter) + assert isinstance(round_tripped.instructions[3], Timer) + assert round_tripped.instructions[3].retained is True + assert isinstance(round_tripped.conditions[2][0], Contact) + assert round_tripped.conditions[2][0].operand == "C96" + assert isinstance(round_tripped.conditions[4][0], Contact) + assert round_tripped.conditions[4][0].operand == "C98" + + def test_shift_then_timer_roundtrip_preserves_blank_timer_row(self, tmp_path: Path) -> None: + shift = Shift("C120", "C127") + timer = Timer("on_delay", "T9", "TD9", "50", "Tms", retained=False) + rung = Rung( + logical_rows=5, + conditions=[ + _contact_row("C99"), + _contact_row("C100"), + _contact_row("C101"), + _contact_row("C102"), + _contact_row("C103"), + ], + instructions=[shift, "", "", timer, ""], + comment_rtf=None, + comment=None, + ) + + out_csv = tmp_path / "shift-timer.csv" + write_csv(out_csv, [rung]) + [round_tripped] = read_csv(out_csv) + + assert round_tripped.logical_rows == 5 + assert isinstance(round_tripped.instructions[0], Shift) + assert isinstance(round_tripped.instructions[3], Timer) + assert round_tripped.instructions[3].retained is False + assert round_tripped.instructions[4] == "" + assert isinstance(round_tripped.conditions[4][0], Contact) + assert round_tripped.conditions[4][0].operand == "C103" From 6156bd3693a8180c93bfb8652248bebef1621849 Mon Sep 17 00:00:00 2001 From: ssweber <57631333+ssweber@users.noreply.github.com> Date: Tue, 7 Apr 2026 13:22:30 -0400 Subject: [PATCH 05/15] fix(types): resolve ty unresolved-attribute errors in converter and _grid Use RowAst type for _parse_rung_row() parameter instead of object, and use walrus operator in _grid.py generator to let ty narrow the type across the for/if split. Also adds verified instr-6row-multi-output golden fixture. Co-Authored-By: Claude Opus 4.6 --- src/laddercodec/_grid.py | 4 ++-- src/laddercodec/csv/converter.py | 3 ++- src/laddercodec/csv/writer.py | 24 ++++++++----------- .../golden/verify_progress.log | 1 + 4 files changed, 15 insertions(+), 17 deletions(-) diff --git a/src/laddercodec/_grid.py b/src/laddercodec/_grid.py index 096bf3d..456f41e 100644 --- a/src/laddercodec/_grid.py +++ b/src/laddercodec/_grid.py @@ -143,9 +143,9 @@ def _compute_rung_metadata( af_summary_block = b"" af_rows = sorted(af_instr_indices.keys()) any_multi_row_af = any( - af_tokens[r].cell_params().get("visual_rows", 1) > 1 + tok.cell_params().get("visual_rows", 1) > 1 for r in af_rows - if isinstance(af_tokens[r], AfInstruction) + if isinstance((tok := af_tokens[r]), AfInstruction) ) if single_rung and len(af_rows) >= 2 and not any_multi_row_af: af_entries: list[tuple[int, int, bool]] = [] diff --git a/src/laddercodec/csv/converter.py b/src/laddercodec/csv/converter.py index a904436..300e58b 100644 --- a/src/laddercodec/csv/converter.py +++ b/src/laddercodec/csv/converter.py @@ -61,6 +61,7 @@ GenericCondition, HorizontalWire, JunctionDownWire, + RowAst, RungAst, VerticalPassThroughWire, ) @@ -109,7 +110,7 @@ def _blank_condition_row() -> list[ConditionToken]: return cast(list[ConditionToken], [""] * CONDITION_COLUMNS) -def _parse_rung_row(row: object, *, strict: bool) -> _ParsedRow: +def _parse_rung_row(row: RowAst, *, strict: bool) -> _ParsedRow: """Convert one parsed CSV row into tokens plus AF-row classification.""" conds = [condition_node_to_token(n) for n in row.condition_nodes] af_node = row.af_node diff --git a/src/laddercodec/csv/writer.py b/src/laddercodec/csv/writer.py index c88d2bf..be8b3a2 100644 --- a/src/laddercodec/csv/writer.py +++ b/src/laddercodec/csv/writer.py @@ -134,11 +134,11 @@ def _emit_timer_block( data_row_count = _append_data_row(rows, data_row_count, condition_rows[start], timer) if timer.retained: - data_row_count = _append_data_row(rows, data_row_count, condition_rows[start + 1], ".reset()") - else: - data_row_count = _emit_blank_continuation( - rows, data_row_count, condition_rows[start + 1] + data_row_count = _append_data_row( + rows, data_row_count, condition_rows[start + 1], ".reset()" ) + else: + data_row_count = _emit_blank_continuation(rows, data_row_count, condition_rows[start + 1]) return data_row_count, 2 @@ -168,7 +168,9 @@ def _emit_counter_block( ) data_row_count = _append_data_row(rows, data_row_count, condition_rows[start], counter) if counter.down_enabled: - data_row_count = _append_data_row(rows, data_row_count, condition_rows[start + 1], ".down()") + data_row_count = _append_data_row( + rows, data_row_count, condition_rows[start + 1], ".down()" + ) else: data_row_count = _emit_blank_continuation( rows, data_row_count, condition_rows[start + 1] @@ -203,9 +205,7 @@ def _emit_counter_block( rows, data_row_count, condition_rows[start + 2], ".reset()" ) else: - data_row_count = _emit_blank_continuation( - rows, data_row_count, condition_rows[start + 2] - ) + data_row_count = _emit_blank_continuation(rows, data_row_count, condition_rows[start + 2]) return data_row_count, 3 @@ -273,15 +273,11 @@ def _emit_drum_block( f".jump({drum.jump_target})", ) else: - data_row_count = _emit_blank_continuation( - rows, data_row_count, condition_rows[start + 2] - ) + data_row_count = _emit_blank_continuation(rows, data_row_count, condition_rows[start + 2]) if drum.jog_enabled: data_row_count = _append_data_row(rows, data_row_count, condition_rows[start + 3], ".jog()") else: - data_row_count = _emit_blank_continuation( - rows, data_row_count, condition_rows[start + 3] - ) + data_row_count = _emit_blank_continuation(rows, data_row_count, condition_rows[start + 3]) return data_row_count, 4 diff --git a/tests/fixtures/ladder_captures/golden/verify_progress.log b/tests/fixtures/ladder_captures/golden/verify_progress.log index 26413d1..57e7468 100644 --- a/tests/fixtures/ladder_captures/golden/verify_progress.log +++ b/tests/fixtures/ladder_captures/golden/verify_progress.log @@ -41,3 +41,4 @@ instr-3row-or: worked instr-3row-scatter: worked instr-timer-ret: worked instr-coil-timer-ret: worked +instr-6row-multi-output: worked From 2907896a500ff0b44c3a4681cb78544ddafe8d22 Mon Sep 17 00:00:00 2001 From: ssweber <57631333+ssweber@users.noreply.github.com> Date: Tue, 7 Apr 2026 13:32:23 -0400 Subject: [PATCH 06/15] fix(csv): stop greedy row consumption in multi-row AF blocks _consume_active_row unconditionally absorbed all blank rows following a block, even after the block was full. This caused timer/counter/shift/drum blocks to reject rungs with extra condition rows below the block. Add _max_continuations() gate so blocks stop consuming once full (timer=1, counter=2, shift=2, drum=3) and remaining rows flow through as normal rung rows. Co-Authored-By: Claude Opus 4.6 --- src/laddercodec/csv/converter.py | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/src/laddercodec/csv/converter.py b/src/laddercodec/csv/converter.py index 300e58b..4bbc0ff 100644 --- a/src/laddercodec/csv/converter.py +++ b/src/laddercodec/csv/converter.py @@ -171,8 +171,24 @@ def _start_active_block( ) +def _max_continuations(block: _ActiveBlock) -> int: + """Maximum number of continuation rows a block type can absorb.""" + if isinstance(block.token, Timer): + return 1 + if isinstance(block.token, Counter): + return 2 + if isinstance(block.token, Shift): + return 2 + if isinstance(block.token, Drum): + return 3 + return 0 + + def _consume_active_row(block: _ActiveBlock, row: _ParsedRow) -> bool: """Consume a continuation row for the active block when it belongs there.""" + if len(block.continuations) >= _max_continuations(block): + return False + if row.kind == "blank": block.continuations.append(row) return True @@ -217,9 +233,6 @@ def _consume_active_row(block: _ActiveBlock, row: _ParsedRow) -> bool: def _finalize_timer_block(block: _ActiveBlock) -> tuple[list[list[ConditionToken]], list[AfToken]]: """Expand one timer block back to its decoded 2-row span.""" timer = cast(Timer, block.token) - if len(block.continuations) > 1: - raise ConvertError("Timer block uses more than one continuation row") - continuation_conditions = _blank_condition_row() if block.continuations: continuation = block.continuations[0] @@ -261,7 +274,6 @@ def _finalize_count_up_block( if reset_conditions is None: reset_conditions = row.conditions continue - raise ConvertError("count_up block uses more than two continuation rows") if not counter.reset_enabled: raise ConvertError(f"{counter.counter_type} requires a .reset() pin row") @@ -306,7 +318,6 @@ def _finalize_count_down_block( if reset_conditions is None: reset_conditions = row.conditions continue - raise ConvertError("count_down block uses more than two continuation rows") if not counter.reset_enabled: raise ConvertError(f"{counter.counter_type} requires a .reset() pin row") @@ -371,7 +382,6 @@ def _finalize_shift_block(block: _ActiveBlock) -> tuple[list[list[ConditionToken if reset_conditions is None: reset_conditions = row.conditions continue - raise ConvertError("Shift block uses more than two continuation rows") if clock_conditions is None: clock_conditions = _blank_condition_row() @@ -416,7 +426,6 @@ def _finalize_drum_block(block: _ActiveBlock) -> tuple[list[list[ConditionToken] if jog_conditions is None: jog_conditions = row.conditions continue - raise ConvertError("Drum block uses more than three continuation rows") if reset_conditions is None: raise ConvertError("Drum requires a .reset() pin row") From 98fb8d09adaa9b5a4cfa5272b10c224b42aea8ff Mon Sep 17 00:00:00 2001 From: ssweber <57631333+ssweber@users.noreply.github.com> Date: Tue, 7 Apr 2026 13:47:24 -0400 Subject: [PATCH 07/15] fix(csv): preserve generic multi-row AF continuation rows --- src/laddercodec/csv/converter.py | 74 ++++++++++++++++++++++---------- tests/csv/test_converter.py | 43 +++++++++++++++++++ 2 files changed, 95 insertions(+), 22 deletions(-) diff --git a/src/laddercodec/csv/converter.py b/src/laddercodec/csv/converter.py index 4bbc0ff..02e9dd7 100644 --- a/src/laddercodec/csv/converter.py +++ b/src/laddercodec/csv/converter.py @@ -7,8 +7,11 @@ - AF token conversion (coils, timers, NOP) - Pin continuation rows (``.reset()``, ``.down()``, etc.) — absorbed into the parent instruction rather than emitted as separate rows -- Auto-padding: tall AF instructions (timers, counters) get a blank - continuation row appended when the user provides only one row +- Multi-row AF instructions: pinned families (timers, counters, shift, + drum) absorb their continuation rows; other tall AF instructions keep + blank-AF continuation rows as part of the same visual block +- Auto-padding: tall AF instructions get blank continuation rows + appended when the user omits them Pin rows -------- @@ -27,9 +30,10 @@ Tall instructions ----------------- -Some AF instructions occupy more than one grid row visually (timers = 2). -If the user CSV has only the instruction row, a blank padding row is -auto-appended so ``encode_rung()`` receives the correct ``logical_rows``. +Some AF instructions occupy more than one grid row visually (timers = 2, +search/copy = 2, send/receive = 3, etc.). If the user CSV omits blank +continuation rows, they are auto-appended so ``encode_rung()`` receives +the correct ``logical_rows``. """ from __future__ import annotations @@ -79,7 +83,7 @@ # --------------------------------------------------------------------------- CONDITION_COLUMNS = 31 -_BLOCK_FAMILIES = frozenset({"timer", "counter", "shift", "drum"}) +_PINNED_BLOCK_FAMILIES = frozenset({"timer", "counter", "shift", "drum"}) @dataclass @@ -98,8 +102,8 @@ class _ParsedRow: class _ActiveBlock: """A multi-row AF block that is still gathering continuation rows.""" - family_name: str - token: Timer | Counter | Shift | Drum + family_name: str | None + token: AfInstruction main_conditions: list[ConditionToken] leading_blank_conditions: list[ConditionToken] | None = None continuations: list[_ParsedRow] = field(default_factory=list) @@ -110,6 +114,13 @@ def _blank_condition_row() -> list[ConditionToken]: return cast(list[ConditionToken], [""] * CONDITION_COLUMNS) +def _af_visual_rows(token: AfToken) -> int: + """Return the AF token's visual row count.""" + if isinstance(token, AfInstruction): + return max(1, int(token.cell_params().get("visual_rows", 1))) + return 1 + + def _parse_rung_row(row: RowAst, *, strict: bool) -> _ParsedRow: """Convert one parsed CSV row into tokens plus AF-row classification.""" conds = [condition_node_to_token(n) for n in row.condition_nodes] @@ -133,13 +144,22 @@ def _parse_rung_row(row: RowAst, *, strict: bool) -> _ParsedRow: return _ParsedRow(conditions=conds, kind="nop", token="NOP") family = get_af_family_for_token(token) - if family is not None and family.family_name in _BLOCK_FAMILIES: + visual_rows = _af_visual_rows(token) + if family is not None and ( + family.family_name in _PINNED_BLOCK_FAMILIES or visual_rows > 1 + ): return _ParsedRow( conditions=conds, kind="block_main", token=token, family_name=family.family_name, ) + if visual_rows > 1: + return _ParsedRow( + conditions=conds, + kind="block_main", + token=token, + ) return _ParsedRow(conditions=conds, kind="main", token=token) @@ -161,10 +181,10 @@ def _start_active_block( leading_blank_conditions: list[ConditionToken] | None = None, ) -> _ActiveBlock: """Create a new active multi-row AF block from its main AF row.""" - if not isinstance(row.token, (Timer, Counter, Shift, Drum)): - raise AssertionError("block_main row must carry a timer/counter/shift/drum token") + if not isinstance(row.token, AfInstruction): + raise AssertionError("block_main row must carry an AF instruction token") return _ActiveBlock( - family_name=cast(str, row.family_name), + family_name=row.family_name, token=row.token, main_conditions=row.conditions, leading_blank_conditions=leading_blank_conditions, @@ -173,15 +193,7 @@ def _start_active_block( def _max_continuations(block: _ActiveBlock) -> int: """Maximum number of continuation rows a block type can absorb.""" - if isinstance(block.token, Timer): - return 1 - if isinstance(block.token, Counter): - return 2 - if isinstance(block.token, Shift): - return 2 - if isinstance(block.token, Drum): - return 3 - return 0 + return max(0, _af_visual_rows(block.token) - 1) def _consume_active_row(block: _ActiveBlock, row: _ParsedRow) -> bool: @@ -443,6 +455,24 @@ def _finalize_drum_block(block: _ActiveBlock) -> tuple[list[list[ConditionToken] ], [drum, "", "", ""] +def _finalize_generic_tall_block( + block: _ActiveBlock, +) -> tuple[list[list[ConditionToken]], list[AfToken]]: + """Expand a non-pinned tall AF block to its full visual row span.""" + visual_rows = _af_visual_rows(block.token) + condition_rows = [block.main_conditions] + af_tokens: list[AfToken] = [block.token] + + for offset in range(visual_rows - 1): + if offset < len(block.continuations): + condition_rows.append(block.continuations[offset].conditions) + else: + condition_rows.append(_blank_condition_row()) + af_tokens.append("") + + return condition_rows, af_tokens + + def _finalize_active_block(block: _ActiveBlock) -> tuple[list[list[ConditionToken]], list[AfToken]]: """Expand the current active block into decode-ready rows/tokens.""" if isinstance(block.token, Timer): @@ -453,7 +483,7 @@ def _finalize_active_block(block: _ActiveBlock) -> tuple[list[list[ConditionToke return _finalize_shift_block(block) if isinstance(block.token, Drum): return _finalize_drum_block(block) - raise AssertionError(f"Unsupported active block token: {type(block.token).__name__}") + return _finalize_generic_tall_block(block) def condition_node_to_token(node: object) -> ConditionToken: diff --git a/tests/csv/test_converter.py b/tests/csv/test_converter.py index 9e02846..ea7a842 100644 --- a/tests/csv/test_converter.py +++ b/tests/csv/test_converter.py @@ -34,6 +34,7 @@ Next, RawInstruction, Return, + Search, Shift, Timer, ) @@ -134,6 +135,48 @@ def test_no_autopad_when_user_provides_two_rows(self, tmp_path: Path) -> None: assert all(c == "-" for c in conds[1]) # user wires preserved +class TestGenericTallRows: + def test_search_inserts_blank_continuation_before_following_af(self, tmp_path: Path) -> None: + csv_path = tmp_path / "main.csv" + _write_csv( + csv_path, + [ + ("R", _wire_row("C10"), "search(DS72..DS81 == DS71,result=DS82,found=C81)"), + ("", _wire_row("C11"), "out(Y001)"), + ], + ) + + rung = parse_csv_file(csv_path).rungs[0] + lr, conds, afs, _ = convert_rung(rung) + + assert lr == 3 + assert isinstance(afs[0], Search) + assert afs[1] == "" + assert isinstance(afs[2], Coil) + assert all(c == "" for c in conds[1]) + assert isinstance(conds[2][0], Contact) + assert conds[2][0].operand == "C11" + + def test_search_later_in_rung_auto_adds_trailing_blank_row(self, tmp_path: Path) -> None: + csv_path = tmp_path / "main.csv" + _write_csv( + csv_path, + [ + ("R", _wire_row("C12"), "out(Y002)"), + ("", _wire_row("C13"), "search(DS90..DS99 == DS89,result=DS100,found=C14)"), + ], + ) + + rung = parse_csv_file(csv_path).rungs[0] + lr, conds, afs, _ = convert_rung(rung) + + assert lr == 3 + assert isinstance(afs[0], Coil) + assert isinstance(afs[1], Search) + assert afs[2] == "" + assert all(c == "" for c in conds[2]) + + class TestPinRows: def test_reset_makes_timer_retentive(self, tmp_path: Path) -> None: csv_path = tmp_path / "main.csv" From 16b7f3c48dbb54067381a14182b8974916c94bb7 Mon Sep 17 00:00:00 2001 From: ssweber <57631333+ssweber@users.noreply.github.com> Date: Tue, 7 Apr 2026 13:52:25 -0400 Subject: [PATCH 08/15] test(fixtures): add multi-row copy golden capture --- .../golden/instr-copy-multirow-2af.bin | Bin 0 -> 12288 bytes .../golden/instr-copy-multirow-2af.csv | 3 +++ .../ladder_captures/golden/verify_progress.log | 1 + 3 files changed, 4 insertions(+) create mode 100644 tests/fixtures/ladder_captures/golden/instr-copy-multirow-2af.bin create mode 100644 tests/fixtures/ladder_captures/golden/instr-copy-multirow-2af.csv diff --git a/tests/fixtures/ladder_captures/golden/instr-copy-multirow-2af.bin b/tests/fixtures/ladder_captures/golden/instr-copy-multirow-2af.bin new file mode 100644 index 0000000000000000000000000000000000000000..3e1476ce5613aa817c254ed4f8b6cdc1f5d2f01a GIT binary patch literal 12288 zcmeI%OHUI~7=Yogpm@K4ig+)&U}umRZ!41$6Qe;B!;Xc-umKe!DDmI;Cr#Yx_a1si zijAp#SHw=j+@E%)RnP0tdHKBaqTOysJR9%ClURx6*p9XMl=GE%6RYtdmy5Y)J2qk` zHe)NdmvVbIUguexv7YPi^2oioo6C(nKAH~qC$N;?YP0VpOk(|`U}!ul>U2C?&jNSPft?mYk z{Xe{XCxhc-Cgf@f$)$Le`zGf&kI{4OE#1(`z>N4l$@+Y*%`C8XKj!0cJk0F~w#!PH zmABjfMqB`=lmmH6J@%XQtd-N45fJ6voDJg%2;=z(j3*$BCm@U`AdDv3kk+v@H_OC+CIVn9zUJC&8TR=%Yo`qR4EmqTTEv&KT+F9Z?A`6| zow&`YOcMNP;^0oRZ!Z%bWh6YvREU^sj?7L~H(gSKqpfCN-<+t=63T7lweQb?)PI}G zoz~ko9&0=~VmvuwJUL=KIbu9HVmvuwJUL=KIbu9HVmvuwJb$%~Cr6AYM~o*&j3-Bo zCr6AYM~o*&j3-BoCr6AYM~o*&j3-BoCr6AYM~o*&j3-BoCr6AYM~3zFcLC+U^f{t4 zsF#5~mrQkiu;&>IDgNp7NaNK-9&k2Kgtnmbd@dSkN1cdPf zgz*G~@dSkN1cdPfgz*G~@dSkN1cdPfgz*G~@dSkN1cdPfgz*G~@dSkN1cdPfgz*G~ z@dSkN1cdPfgz*G~@dSkN1cdPf1fJ&^<#lqsPIde|B+oM#ikZr<81mW==EUC7zVmXY z#Ys<&nD^%^Fdl0>Ibu9HVmvuwJUL=KIbu9HVmvuwJUL=KIbu9HVmvuwJUL=KIbu9H zVmvuwJUL=KIbu9HVmvuwJUL=KIbu9HVmvuwJUL=KIbu9HVmvuAgilo%%h5FY1V*30 I|M?000(m<{!2kdN literal 0 HcmV?d00001 diff --git a/tests/fixtures/ladder_captures/golden/instr-copy-multirow-2af.csv b/tests/fixtures/ladder_captures/golden/instr-copy-multirow-2af.csv new file mode 100644 index 0000000..c0c9783 --- /dev/null +++ b/tests/fixtures/ladder_captures/golden/instr-copy-multirow-2af.csv @@ -0,0 +1,3 @@ +marker,A,B,C,D,E,F,G,H,I,J,K,L,M,N,O,P,Q,R,S,T,U,V,W,X,Y,Z,AA,AB,AC,AD,AE,AF +R,C1,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,"copy(1,DS1)" +,C2,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,"copy(1,DS1)" diff --git a/tests/fixtures/ladder_captures/golden/verify_progress.log b/tests/fixtures/ladder_captures/golden/verify_progress.log index 57e7468..10e4753 100644 --- a/tests/fixtures/ladder_captures/golden/verify_progress.log +++ b/tests/fixtures/ladder_captures/golden/verify_progress.log @@ -42,3 +42,4 @@ instr-3row-scatter: worked instr-timer-ret: worked instr-coil-timer-ret: worked instr-6row-multi-output: worked +instr-copy-multirow-2af: worked From 8bd54a12d1e8258ccb309e04c35ccf3add2c9c07 Mon Sep 17 00:00:00 2001 From: ssweber <57631333+ssweber@users.noreply.github.com> Date: Tue, 7 Apr 2026 14:08:50 -0400 Subject: [PATCH 09/15] refactor(csv): remove unused strip_tall_padding helper --- src/laddercodec/csv/converter.py | 49 +------------ tests/csv/test_converter.py | 52 -------------- tests/csv/test_writer.py | 119 ++++++++++++++++++++++++++++++- 3 files changed, 119 insertions(+), 101 deletions(-) diff --git a/src/laddercodec/csv/converter.py b/src/laddercodec/csv/converter.py index 02e9dd7..b49a8bc 100644 --- a/src/laddercodec/csv/converter.py +++ b/src/laddercodec/csv/converter.py @@ -145,9 +145,7 @@ def _parse_rung_row(row: RowAst, *, strict: bool) -> _ParsedRow: family = get_af_family_for_token(token) visual_rows = _af_visual_rows(token) - if family is not None and ( - family.family_name in _PINNED_BLOCK_FAMILIES or visual_rows > 1 - ): + if family is not None and (family.family_name in _PINNED_BLOCK_FAMILIES or visual_rows > 1): return _ParsedRow( conditions=conds, kind="block_main", @@ -625,48 +623,3 @@ def convert_rung( logical_rows = len(condition_rows) return logical_rows, condition_rows, af_tokens, comment - - -# --------------------------------------------------------------------------- -# Decode-side: strip blank padding rows from tall instructions -# --------------------------------------------------------------------------- - - -def _is_blank_row( - conditions: list[object], - af: object, -) -> bool: - """Return True if all conditions are blank and AF is blank.""" - if af != "": - return False - return all(c == "" for c in conditions) - - -def strip_tall_padding( - logical_rows: int, - condition_rows: list[list[object]], - af_tokens: list[object], -) -> tuple[int, list[list[object]], list[object]]: - """Remove trailing blank rows that are just visual padding for tall instructions. - - After decoding, a timer rung may have a trailing all-blank row that - exists only because the timer cell is visually tall. This function - strips such rows so the decoded output matches the user's CSV input. - - Rows with any content (wires, contacts, etc.) are kept. - """ - if logical_rows < 2: - return logical_rows, condition_rows, af_tokens - - # Check if row 0 has a tall AF instruction. - af0 = af_tokens[0] - is_tall = isinstance(af0, AfInstruction) and af0.cell_params().get("visual_rows", 1) > 1 - if not is_tall: - return logical_rows, condition_rows, af_tokens - - # Strip trailing blank rows (from the end, in case of future >2-row tall). - while len(condition_rows) > 1 and _is_blank_row(condition_rows[-1], af_tokens[-1]): - condition_rows = condition_rows[:-1] - af_tokens = af_tokens[:-1] - - return len(condition_rows), condition_rows, af_tokens diff --git a/tests/csv/test_converter.py b/tests/csv/test_converter.py index ea7a842..c46d77b 100644 --- a/tests/csv/test_converter.py +++ b/tests/csv/test_converter.py @@ -22,7 +22,6 @@ af_node_to_token, condition_node_to_token, convert_rung, - strip_tall_padding, ) from laddercodec.csv.parser import parse_csv_file from laddercodec.instructions import ( @@ -250,57 +249,6 @@ def test_unknown_pin_raises(self, tmp_path: Path) -> None: convert_rung(rung) -class TestStripTallPadding: - def test_strips_blank_trailing_row(self) -> None: - timer = Timer("on_delay", "T1", "TD1", "1000", "Tms") - conds = [ - [Contact(InstructionType.CONTACT_NO, "X001")] + ["-"] * 30, - [""] * 31, - ] - afs: list[object] = [timer, ""] - lr, new_conds, new_afs = strip_tall_padding(2, conds, afs) - assert lr == 1 - assert len(new_conds) == 1 - assert len(new_afs) == 1 - - def test_keeps_row_with_wires(self) -> None: - timer = Timer("on_delay", "T1", "TD1", "1000", "Tms") - conds = [ - [Contact(InstructionType.CONTACT_NO, "X001")] + ["-"] * 30, - ["-"] * 31, - ] - afs: list[object] = [timer, ""] - lr, new_conds, new_afs = strip_tall_padding(2, conds, afs) - assert lr == 2 # kept because row has wires - - def test_keeps_row_with_contacts(self) -> None: - timer = Timer("on_delay", "T1", "TD1", "1000", "Tms", retained=True) - conds = [ - [Contact(InstructionType.CONTACT_NO, "X001")] + ["-"] * 30, - [Contact(InstructionType.CONTACT_NO, "X002")] + ["-"] * 30, - ] - afs: list[object] = [timer, ""] - lr, new_conds, new_afs = strip_tall_padding(2, conds, afs) - assert lr == 2 - - def test_no_strip_for_non_timer(self) -> None: - coil = Coil(InstructionType.COIL_OUT, "Y001") - conds = [ - [Contact(InstructionType.CONTACT_NO, "X001")] + ["-"] * 30, - [""] * 31, - ] - afs: list[object] = [coil, ""] - lr, new_conds, new_afs = strip_tall_padding(2, conds, afs) - assert lr == 2 # not a tall instruction, no strip - - def test_single_row_passthrough(self) -> None: - timer = Timer("on_delay", "T1", "TD1", "1000", "Tms") - conds = [[Contact(InstructionType.CONTACT_NO, "X001")] + ["-"] * 30] - afs: list[object] = [timer] - lr, new_conds, new_afs = strip_tall_padding(1, conds, afs) - assert lr == 1 - - class TestConditionNodeToToken: def test_blank(self) -> None: assert condition_node_to_token(BlankCondition()) == "" diff --git a/tests/csv/test_writer.py b/tests/csv/test_writer.py index aebc621..16bbd0b 100644 --- a/tests/csv/test_writer.py +++ b/tests/csv/test_writer.py @@ -23,7 +23,15 @@ WriterError, decoded_rung_to_rows, ) -from laddercodec.instructions import UnknownInstruction +from laddercodec.instructions import ( + Copy, + ModbusRtuTarget, + RawInstruction, + Receive, + Search, + Send, + UnknownInstruction, +) from laddercodec.model import InstructionType GOLDEN_DIR = Path(__file__).resolve().parent.parent / "fixtures" / "ladder_captures" / "golden" @@ -55,6 +63,47 @@ def _edge_row(operand: str) -> list[object]: return [Contact(InstructionType.CONTACT_EDGE, operand, edge_kind="rise")] + ["-"] * 30 +def _generic_tall_af_cases() -> list[pytest.ParameterSet]: + """Return representative non-pinned tall AF instructions.""" + return [ + pytest.param(Copy("DS7", "DS8"), 2, id="copy"), + pytest.param(Search("DS72", "DS81", "DS71", "DS82", "C81", "=="), 2, id="search"), + pytest.param( + Send( + ModbusRtuTarget("rtu", "cpu2", 5), + "DS1", + "DS1", + 1, + "C1", + "C2", + "C3", + "DS100", + ), + 3, + id="send", + ), + pytest.param( + Receive( + ModbusRtuTarget("rtu", "cpu2", 5), + "DS1", + "DS1", + 1, + "C1", + "C2", + "C3", + "DS100", + ), + 3, + id="receive", + ), + pytest.param( + RawInstruction.from_csv_token("raw(X,0x2711,3,0000=)"), + 3, + id="raw", + ), + ] + + # --------------------------------------------------------------------------- # Round-trip: golden.bin → decode → CSV → read_csv → compare # --------------------------------------------------------------------------- @@ -506,6 +555,74 @@ def test_unknown_instruction_raises(self) -> None: decoded_rung_to_rows(rung) +class TestGenericTallRoundTrip: + @pytest.mark.parametrize(("af", "visual_rows"), _generic_tall_af_cases()) + def test_later_tall_block_at_end_roundtrips( + self, + tmp_path: Path, + af: object, + visual_rows: int, + ) -> None: + lead = Coil(InstructionType.COIL_OUT, "Y900") + rung = Rung( + logical_rows=visual_rows + 1, + conditions=[ + _contact_row("C200"), + _contact_row("C201"), + *[_blank_conditions() for _ in range(visual_rows - 1)], + ], + instructions=[lead, af, *([""] * (visual_rows - 1))], + comment_rtf=None, + comment=None, + ) + + rows = decoded_rung_to_rows(rung) + assert len(rows) == 2 + assert [row[32] for row in rows] == ["out(Y900)", af.to_csv()] + + out_csv = tmp_path / f"later-tall-end-{type(af).__name__}.csv" + write_csv(out_csv, [rung]) + [round_tripped] = read_csv(out_csv) + + assert round_tripped.logical_rows == rung.logical_rows + assert round_tripped.conditions == rung.conditions + assert round_tripped.instructions == rung.instructions + + @pytest.mark.parametrize(("af", "visual_rows"), _generic_tall_af_cases()) + def test_later_tall_block_before_following_af_roundtrips( + self, + tmp_path: Path, + af: object, + visual_rows: int, + ) -> None: + lead = Coil(InstructionType.COIL_OUT, "Y901") + tail = Coil(InstructionType.COIL_OUT, "Y902") + rung = Rung( + logical_rows=visual_rows + 2, + conditions=[ + _contact_row("C210"), + _contact_row("C211"), + *[_blank_conditions() for _ in range(visual_rows - 1)], + _contact_row("C212"), + ], + instructions=[lead, af, *([""] * (visual_rows - 1)), tail], + comment_rtf=None, + comment=None, + ) + + rows = decoded_rung_to_rows(rung) + assert len(rows) == 3 + assert [row[32] for row in rows] == ["out(Y901)", af.to_csv(), "out(Y902)"] + + out_csv = tmp_path / f"later-tall-mid-{type(af).__name__}.csv" + write_csv(out_csv, [rung]) + [round_tripped] = read_csv(out_csv) + + assert round_tripped.logical_rows == rung.logical_rows + assert round_tripped.conditions == rung.conditions + assert round_tripped.instructions == rung.instructions + + class TestMultiPinnedRoundTrip: def test_count_up_then_retained_timer_roundtrip(self, tmp_path: Path) -> None: counter = Counter( From c471661785049a4978d89804ad8ecc6acef5e3eb Mon Sep 17 00:00:00 2001 From: ssweber <57631333+ssweber@users.noreply.github.com> Date: Tue, 7 Apr 2026 14:10:22 -0400 Subject: [PATCH 10/15] fix(decode): restore omitted wiring for generic tall AF continuation rows --- src/laddercodec/decode_program.py | 42 +++++---- tests/ladder/test_decode_program.py | 134 ++++++++++++++++++++++++++++ 2 files changed, 161 insertions(+), 15 deletions(-) diff --git a/src/laddercodec/decode_program.py b/src/laddercodec/decode_program.py index fa0dcb3..3fbb5c4 100644 --- a/src/laddercodec/decode_program.py +++ b/src/laddercodec/decode_program.py @@ -26,6 +26,7 @@ RawInstruction, from_tags_af, from_tags_condition, + get_af_family_for_token, ) from .instructions.comparison import CompareContact from .instructions.contact import Contact @@ -926,12 +927,15 @@ def _build_topology_backed_rung( def _implied_modifier_row_offsets(af: object) -> set[int]: - """Return AF-relative row offsets whose logic path may be omitted in SCR. + """Return AF-relative row offsets whose horizontal path may be omitted in SCR. Click can store ``count=0`` or omit continuation-row topology blocks for - some AF modifier rows even when the visible rung still carries logic across - that row. The AF blob does still tell us which visual sub-rows are actual - logic inputs, so we restrict implied dash fill to those known pin rows. + some tall-AF continuation rows even when the visible rung still carries + logic across that row. Pinned families still use hand-tuned offsets so + optional pin rows only opt in when the AF state says that pin is active. + Non-pinned tall families (copy/search/send_receive/raw) expose plain + continuation rows instead, so every visual continuation row can carry logic + when Click suppresses its explicit topology block. """ if isinstance(af, Shift): return {1, 2} @@ -959,6 +963,12 @@ def _implied_modifier_row_offsets(af: object) -> set[int]: rows.add(3) return rows + family = get_af_family_for_token(af) + if family is not None and not family.pin_names and hasattr(af, "cell_params"): + visual_rows = max(1, int(af.cell_params().get("visual_rows", 1))) + if visual_rows > 1: + return set(range(1, visual_rows)) + return set() @@ -1104,8 +1114,9 @@ def _build_rung( if 0 < row < logical_rows: implied_modifier_rows.add(row) - # 4. Targeted fallback for modifier rows whose topology Click omits from - # SCR even though the AF blob says that visual sub-row accepts logic. + # 4. Fallback for logic-carrying continuation rows whose topology Click + # omits from SCR. Rebuild the implied horizontal path and merge any + # pre-parsed wire_down markers into T-junctions. for row in sorted(implied_modifier_rows): explicit_right_wires = row - 1 < len(extra_rows_right_wires) and bool( extra_rows_right_wires[row - 1] @@ -1113,19 +1124,20 @@ def _build_rung( if explicit_right_wires: continue - rightmost = -1 + leftmost = None for col in range(_CONDITION_COLUMNS): if conditions[row][col] != "": - rightmost = col - if rightmost < 0: + leftmost = col + break + if leftmost is None: continue - for col in range(rightmost + 1, _CONDITION_COLUMNS): - if conditions[row][col] != "": - continue - if row > 0 and conditions[row - 1][col] == "|": - continue - conditions[row][col] = "-" + for col in range(leftmost, _CONDITION_COLUMNS): + cell = conditions[row][col] + if cell == "": + conditions[row][col] = "-" + elif cell == "|": + conditions[row][col] = "T" return Rung( logical_rows=logical_rows, diff --git a/tests/ladder/test_decode_program.py b/tests/ladder/test_decode_program.py index ee594c0..7e93d87 100644 --- a/tests/ladder/test_decode_program.py +++ b/tests/ladder/test_decode_program.py @@ -10,6 +10,7 @@ from laddercodec.decode_program import ( _find_row_topology_block, _find_sections, + _implied_modifier_row_offsets, _parse_extra_row_right_wires, _parse_header, _parse_scr_tags, @@ -18,10 +19,15 @@ decode_program, ) from laddercodec.instructions import from_tags_af +from laddercodec.instructions.contact import Contact from laddercodec.instructions.home import from_tags as home_from_tags from laddercodec.instructions.math import Math from laddercodec.instructions.position import from_tags as position_from_tags +from laddercodec.instructions.raw import RawInstruction, _decompose_blob, _fields_to_tag_dicts +from laddercodec.instructions.search import Search +from laddercodec.instructions.send_receive import ModbusRtuTarget, Receive from laddercodec.instructions.timer import Timer +from laddercodec.model import InstructionType decode_program_module = importlib.import_module("laddercodec.decode_program") _SCR_FIXTURE_DIR = Path(__file__).resolve().parents[1] / "fixtures" / "scr_captures" @@ -165,6 +171,23 @@ def _compact_scr_variant_u16_field(tag: int, entries: dict[int, int]) -> bytes: return bytes(out) +def _section_instruction_from_token(row: int, col: int, token) -> tuple: + blob = token.build_blob() + class_name, type_code, part_count, _extra_bytes, fields = _decompose_blob(blob) + tags, tag_byte_lens, variant_u16_tags, variant_string_tags = _fields_to_tag_dicts(fields) + return ( + row, + col, + class_name, + type_code, + tags, + part_count, + tag_byte_lens, + variant_u16_tags, + variant_string_tags, + ) + + def test_decode_program_matches_or_topology_fixture(): clip_rungs, scr_rungs = _load_fixture_pair("or_topology") @@ -277,6 +300,117 @@ def test_parse_wiredown_uses_explicit_row_indices(): assert _parse_wiredown(data, 0, len(data)) == {1: (1, 2, 3, 4, 5)} +def test_implied_modifier_row_offsets_include_generic_tall_af_continuations(): + receive = Receive( + target=ModbusRtuTarget(name="rtu", com_port="cpu2", device_id=1), + remote_start="DS1", + dest="DS10", + quantity=1, + receiving="", + success="", + error="", + exception_response="", + ) + + assert _implied_modifier_row_offsets(Search("DS2", "DS11", "DS1", "DS12", "C1", "==")) == {1} + assert _implied_modifier_row_offsets(receive) == {1, 2} + assert _implied_modifier_row_offsets( + RawInstruction(class_name="Mystery", blob=b"", part_count=4) + ) == { + 1, + 2, + 3, + } + + +def test_build_topology_backed_rung_reconstructs_omitted_generic_tall_row_with_junction(): + topology_block = decode_program_module._ScrRowTopologyBlock( + start=0, + row_word=3, + prelude=b"", + leading_rows_right_wires=[], + row0_flag_count=0, + row0_flags={}, + flags_start=0, + continuation_start=0, + ) + section_instructions = [ + _section_instruction_from_token( + 0, + 31, + Search("DS2", "DS11", "DS1", "DS12", "C1", "=="), + ), + _section_instruction_from_token( + 1, + 0, + Contact(InstructionType.CONTACT_NO, "C10"), + ), + ] + data = bytes([0x20, 0x00, 0x00, 0x00, 0x01, 0x00, 0x02]) + + rung = decode_program_module._build_topology_backed_rung( + data=data, + topology_block=topology_block, + rung_end=len(data), + logical_rows=2, + section_instructions=section_instructions, + comment=None, + comment_rtf=None, + ) + + assert isinstance(rung.conditions[1][0], Contact) + assert rung.conditions[1][0].operand == "C10" + assert rung.conditions[1][1] == "T" + assert all(cell == "-" for cell in rung.conditions[1][2:]) + + +def test_build_topology_backed_rung_reconstructs_all_omitted_receive_rows(): + topology_block = decode_program_module._ScrRowTopologyBlock( + start=0, + row_word=4, + prelude=b"", + leading_rows_right_wires=[], + row0_flag_count=0, + row0_flags={}, + flags_start=0, + continuation_start=0, + ) + receive = Receive( + target=ModbusRtuTarget(name="rtu", com_port="cpu2", device_id=1), + remote_start="DS1", + dest="DS10", + quantity=1, + receiving="", + success="", + error="", + exception_response="", + ) + section_instructions = [ + _section_instruction_from_token(0, 31, receive), + _section_instruction_from_token(1, 0, Contact(InstructionType.CONTACT_NO, "C11")), + _section_instruction_from_token(2, 2, Contact(InstructionType.CONTACT_NO, "C12")), + ] + data = b"\x20\x00" + + rung = decode_program_module._build_topology_backed_rung( + data=data, + topology_block=topology_block, + rung_end=len(data), + logical_rows=3, + section_instructions=section_instructions, + comment=None, + comment_rtf=None, + ) + + assert isinstance(rung.instructions[0], Receive) + assert isinstance(rung.conditions[1][0], Contact) + assert all(cell == "-" for cell in rung.conditions[1][1:]) + assert rung.conditions[2][0] == "" + assert rung.conditions[2][1] == "" + assert isinstance(rung.conditions[2][2], Contact) + assert all(cell == "-" for cell in rung.conditions[2][3:]) + + def test_parse_scr_tags_handles_compact_home_raw_fields(): raw = _compact_scr_blob( "Home", From 993a17c4b42712ffb1f6bde932cfaf122bc42d0e Mon Sep 17 00:00:00 2001 From: ssweber <57631333+ssweber@users.noreply.github.com> Date: Tue, 7 Apr 2026 14:13:49 -0400 Subject: [PATCH 11/15] fix: narrow modifier-row offset typing --- src/laddercodec/decode_program.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/laddercodec/decode_program.py b/src/laddercodec/decode_program.py index 3fbb5c4..107c57f 100644 --- a/src/laddercodec/decode_program.py +++ b/src/laddercodec/decode_program.py @@ -24,6 +24,7 @@ from .instructions import ( INSTRUCTION_MODULES, RawInstruction, + UnknownInstruction, from_tags_af, from_tags_condition, get_af_family_for_token, @@ -34,7 +35,7 @@ from .instructions.drum import Drum from .instructions.shift import Shift from .instructions.timer import Timer -from .model import Program +from .model import AfInstruction, Program from .topology import CONDITION_COLUMNS as _CONDITION_COLUMNS # --------------------------------------------------------------------------- @@ -926,7 +927,7 @@ def _build_topology_backed_rung( # --------------------------------------------------------------------------- -def _implied_modifier_row_offsets(af: object) -> set[int]: +def _implied_modifier_row_offsets(af: AfInstruction | UnknownInstruction) -> set[int]: """Return AF-relative row offsets whose horizontal path may be omitted in SCR. Click can store ``count=0`` or omit continuation-row topology blocks for @@ -963,8 +964,11 @@ def _implied_modifier_row_offsets(af: object) -> set[int]: rows.add(3) return rows + if isinstance(af, UnknownInstruction): + return set() + family = get_af_family_for_token(af) - if family is not None and not family.pin_names and hasattr(af, "cell_params"): + if family is not None and not family.pin_names: visual_rows = max(1, int(af.cell_params().get("visual_rows", 1))) if visual_rows > 1: return set(range(1, visual_rows)) From 9c33b8a3b6f0f7a801f3d7d0bc069bd772bf4a47 Mon Sep 17 00:00:00 2001 From: ssweber <57631333+ssweber@users.noreply.github.com> Date: Tue, 7 Apr 2026 14:34:32 -0400 Subject: [PATCH 12/15] fix(csv): fail loudly on lossy csv writes --- CHANGELOG.md | 7 ++ src/laddercodec/csv/parser.py | 12 +++ src/laddercodec/csv/writer.py | 72 +++++++++++++- tests/csv/test_writer.py | 181 ++++++++++++++++++++++++++++++++++ 4 files changed, 271 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c76ed28..a84cdd5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # Changelog +## Unreleased + +### Fixed +- **CSV writer**: fail loudly when emitted CSV rows lose decoded rung + semantics by reparsing writer output and checking for row, condition, or AF + mismatches before writing to disk + ## 0.1.3 ### Fixed diff --git a/src/laddercodec/csv/parser.py b/src/laddercodec/csv/parser.py index 41e08d0..09b4790 100644 --- a/src/laddercodec/csv/parser.py +++ b/src/laddercodec/csv/parser.py @@ -3,6 +3,7 @@ from __future__ import annotations import csv +from collections.abc import Sequence from pathlib import Path from typing import Literal @@ -88,6 +89,17 @@ def _segment_rungs(rows: tuple[RowAst, ...]) -> tuple[RungAst, ...]: return tuple(rungs) +def _parse_single_rung_rows(row_fields: Sequence[Sequence[str]]) -> RungAst: + """Parse in-memory canonical row fields into exactly one ``RungAst``.""" + rows = tuple(_row_ast(_canonical_row_from_fields(list(fields))) for fields in row_fields) + rungs = _segment_rungs(rows) + if not rungs: + raise ValueError("Expected one rung in emitted CSV rows; got none") + if len(rungs) != 1: + raise ValueError(f"Expected one rung in emitted CSV rows; got {len(rungs)}") + return rungs[0] + + def _load_canonical_rows(path: Path) -> tuple[CanonicalRow, ...]: rows: list[CanonicalRow] = [] with path.open("r", encoding="utf-8-sig", newline="") as handle: diff --git a/src/laddercodec/csv/writer.py b/src/laddercodec/csv/writer.py index be8b3a2..f272932 100644 --- a/src/laddercodec/csv/writer.py +++ b/src/laddercodec/csv/writer.py @@ -42,7 +42,9 @@ UnknownInstruction, get_af_family_for_token, ) -from .contract import CSV_HEADER +from .contract import CONDITION_COLUMNS, CSV_HEADER +from .converter import convert_rung +from .parser import _parse_single_rung_rows class WriterError(ValueError): @@ -309,6 +311,73 @@ def _emit_generic_tall_block( return data_row_count, visual_rows +def _display_token(token: object) -> str: + """Return a compact user-facing representation for mismatch errors.""" + try: + return repr(_token_to_csv(token)) + except WriterError: + return repr(token) + + +def _rebuild_rung_from_rows(rows: Sequence[Sequence[str]]) -> Rung: + """Round-trip emitted CSV rows back into a decoded-style ``Rung``.""" + try: + rung_ast = _parse_single_rung_rows(rows) + logical_rows, conditions, instructions, comment = convert_rung(rung_ast) + except (TypeError, ValueError) as exc: + raise WriterError( + f"CSV round-trip validation failed: emitted rows do not reparse: {exc}" + ) from exc + + return Rung( + logical_rows=logical_rows, + conditions=conditions, + instructions=instructions, + comment=comment, + comment_rtf=None, + ) + + +def _validate_roundtrip(rung: Rung, rows: Sequence[Sequence[str]]) -> None: + """Fail loudly when emitted CSV rows lose information from the decoded rung.""" + rebuilt = _rebuild_rung_from_rows(rows) + + if rebuilt.logical_rows != rung.logical_rows: + raise WriterError( + "CSV round-trip validation failed: " + f"logical row count mismatch: expected {rung.logical_rows}, got {rebuilt.logical_rows}" + ) + + if rebuilt.comment != rung.comment: + raise WriterError( + "CSV round-trip validation failed: " + f"comment mismatch: expected {rung.comment!r}, got {rebuilt.comment!r}" + ) + + for row_idx, (expected_row, actual_row) in enumerate( + zip(rung.conditions, rebuilt.conditions, strict=True), + start=1, + ): + for col_idx, (expected, actual) in enumerate(zip(expected_row, actual_row, strict=True)): + if expected != actual: + raise WriterError( + "CSV round-trip validation failed: " + f"condition mismatch at row {row_idx} col {CONDITION_COLUMNS[col_idx]}: " + f"expected {_display_token(expected)}, got {_display_token(actual)}" + ) + + for row_idx, (expected, actual) in enumerate( + zip(rung.instructions, rebuilt.instructions, strict=True), + start=1, + ): + if expected != actual: + raise WriterError( + "CSV round-trip validation failed: " + f"AF mismatch at row {row_idx}: " + f"expected {_display_token(expected)}, got {_display_token(actual)}" + ) + + def decoded_rung_to_rows(rung: Rung) -> list[list[str]]: """Convert a ``Rung`` to a list of CSV row lists. @@ -379,6 +448,7 @@ def decoded_rung_to_rows(rung: Rung) -> list[list[str]]: data_row_count = _append_data_row(rows, data_row_count, condition_rows[row_idx], af) row_idx += consumed + _validate_roundtrip(rung, rows) return rows diff --git a/tests/csv/test_writer.py b/tests/csv/test_writer.py index 16bbd0b..cb39a67 100644 --- a/tests/csv/test_writer.py +++ b/tests/csv/test_writer.py @@ -8,8 +8,10 @@ from laddercodec import ( Coil, + CompareContact, Contact, Counter, + Drum, ForLoop, Next, Rung, @@ -21,6 +23,7 @@ ) from laddercodec.csv.writer import ( WriterError, + _validate_roundtrip, decoded_rung_to_rows, ) from laddercodec.instructions import ( @@ -104,6 +107,26 @@ def _generic_tall_af_cases() -> list[pytest.ParameterSet]: ] +def _search_continuation_row() -> list[object]: + """Return a nonblank continuation row with comparison and wire geometry.""" + return [CompareContact("==", "DS300", "1", wire_down=True), "T", "|"] + [""] * 28 + + +def _event_drum() -> Drum: + """Return a compact event drum that exercises reset/jump/jog pin rows.""" + return Drum( + drum_kind="event", + outputs=["Y10", "Y11"], + events_or_presets=["C10", "C11"], + pattern=[[1, 0], [0, 1]], + current_step="DS10", + completion_flag="C12", + jog_enabled=True, + jump_enabled=True, + jump_target="DS11", + ) + + # --------------------------------------------------------------------------- # Round-trip: golden.bin → decode → CSV → read_csv → compare # --------------------------------------------------------------------------- @@ -689,3 +712,161 @@ def test_shift_then_timer_roundtrip_preserves_blank_timer_row(self, tmp_path: Pa assert round_tripped.instructions[4] == "" assert isinstance(round_tripped.conditions[4][0], Contact) assert round_tripped.conditions[4][0].operand == "C103" + + +class TestRoundTripValidator: + def test_accepts_retained_timer(self) -> None: + rung = Rung( + logical_rows=2, + conditions=[ + _contact_row("C300"), + _contact_row("C301"), + ], + instructions=[Timer("on_delay", "T30", "TD30", "10", "Tm", retained=True), ""], + comment_rtf=None, + comment=None, + ) + + _validate_roundtrip(rung, decoded_rung_to_rows(rung)) + + def test_accepts_count_down_bridge_shape(self) -> None: + rung = Rung( + logical_rows=3, + conditions=[ + _contact_row("C302"), + _edge_row("C303"), + _contact_row("C304"), + ], + instructions=[ + Counter( + counter_type="count_down", + done_bit="CT30", + current="CTD30", + preset="50", + down_enabled=False, + reset_enabled=True, + ), + "NOP", + "", + ], + comment_rtf=None, + comment=None, + ) + + _validate_roundtrip(rung, decoded_rung_to_rows(rung)) + + def test_accepts_generic_tall_continuation_with_comparison_and_t(self) -> None: + rung = Rung( + logical_rows=2, + conditions=[ + _contact_row("C305"), + _search_continuation_row(), + ], + instructions=[Search("DS72", "DS81", "DS71", "DS82", "C81", "=="), ""], + comment_rtf=None, + comment=None, + ) + + _validate_roundtrip(rung, decoded_rung_to_rows(rung)) + + def test_accepts_drum_pin_rows(self) -> None: + rung = Rung( + logical_rows=4, + conditions=[ + _contact_row("C306"), + _contact_row("C307"), + _contact_row("C308"), + _contact_row("C309"), + ], + instructions=[_event_drum(), "", "", ""], + comment_rtf=None, + comment=None, + ) + + _validate_roundtrip(rung, decoded_rung_to_rows(rung)) + + def test_raises_on_missing_retained_timer_reset_pin(self) -> None: + rung = Rung( + logical_rows=2, + conditions=[ + _contact_row("C310"), + _contact_row("C311"), + ], + instructions=[Timer("on_delay", "T31", "TD31", "20", "Tm", retained=True), ""], + comment_rtf=None, + comment=None, + ) + + rows = decoded_rung_to_rows(rung) + rows[1][32] = "" + + with pytest.raises(WriterError, match=r"AF mismatch at row 1"): + _validate_roundtrip(rung, rows) + + def test_raises_on_missing_count_down_top_row_contact(self) -> None: + rung = Rung( + logical_rows=3, + conditions=[ + _contact_row("C312"), + _edge_row("C313"), + _contact_row("C314"), + ], + instructions=[ + Counter( + counter_type="count_down", + done_bit="CT31", + current="CTD31", + preset="75", + down_enabled=False, + reset_enabled=True, + ), + "NOP", + "", + ], + comment_rtf=None, + comment=None, + ) + + rows = decoded_rung_to_rows(rung) + rows[0][1] = "" + + with pytest.raises(WriterError, match=r"condition mismatch at row 1 col A"): + _validate_roundtrip(rung, rows) + + def test_raises_on_missing_generic_tall_continuation_comparison(self) -> None: + rung = Rung( + logical_rows=2, + conditions=[ + _contact_row("C315"), + _search_continuation_row(), + ], + instructions=[Search("DS90", "DS99", "DS89", "DS100", "C316", "=="), ""], + comment_rtf=None, + comment=None, + ) + + rows = decoded_rung_to_rows(rung) + rows[1][1] = "" + + with pytest.raises(WriterError, match=r"condition mismatch at row 2 col A"): + _validate_roundtrip(rung, rows) + + def test_raises_on_missing_drum_jump_pin(self) -> None: + rung = Rung( + logical_rows=4, + conditions=[ + _contact_row("C317"), + _contact_row("C318"), + _contact_row("C319"), + _contact_row("C320"), + ], + instructions=[_event_drum(), "", "", ""], + comment_rtf=None, + comment=None, + ) + + rows = decoded_rung_to_rows(rung) + rows[2][32] = "" + + with pytest.raises(WriterError, match=r"AF mismatch at row 1"): + _validate_roundtrip(rung, rows) From 0589b50b4282d9e715a9f1f84273ac80add5d9cc Mon Sep 17 00:00:00 2001 From: ssweber <57631333+ssweber@users.noreply.github.com> Date: Tue, 7 Apr 2026 14:55:39 -0400 Subject: [PATCH 13/15] docs(changelog): finalize v0.1.4 release notes --- CHANGELOG.md | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a84cdd5..1e390d9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,8 +1,14 @@ # Changelog -## Unreleased +## 0.1.4 ### Fixed +- **CSV**: preserve multi-row AF round-trips, including pinned/tall AF blocks + away from row 0, multiple tall blocks in one rung, and generic tall + continuation rows +- **Encode**: suppress AF summary data when a rung contains multi-row AFs, + avoiding grid misalignment and Click paste crashes +- **SCR decode**: restore omitted wiring for generic tall AF continuation rows - **CSV writer**: fail loudly when emitted CSV rows lose decoded rung semantics by reparsing writer output and checking for row, condition, or AF mismatches before writing to disk From 0d8559dd363c5ff369f13496b2a710f9e7e6b55b Mon Sep 17 00:00:00 2001 From: ssweber <57631333+ssweber@users.noreply.github.com> Date: Tue, 7 Apr 2026 15:31:22 -0400 Subject: [PATCH 14/15] fix(csv): cast round-trip writer tokens to Rung types --- src/laddercodec/csv/writer.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/laddercodec/csv/writer.py b/src/laddercodec/csv/writer.py index f272932..9e4d20f 100644 --- a/src/laddercodec/csv/writer.py +++ b/src/laddercodec/csv/writer.py @@ -29,11 +29,14 @@ import csv as csv_mod from collections.abc import Sequence from pathlib import Path +from typing import cast from ..decode import Rung from ..instructions import ( AfInstruction, + AfToken, ConditionInstruction, + ConditionToken, Counter, Drum, Shift, @@ -331,8 +334,8 @@ def _rebuild_rung_from_rows(rows: Sequence[Sequence[str]]) -> Rung: return Rung( logical_rows=logical_rows, - conditions=conditions, - instructions=instructions, + conditions=cast(list[list[ConditionToken]], conditions), + instructions=cast(list[AfToken], instructions), comment=comment, comment_rtf=None, ) From c904337124b1be37874a2afdfe42f13a85cff7ef Mon Sep 17 00:00:00 2001 From: ssweber <57631333+ssweber@users.noreply.github.com> Date: Tue, 7 Apr 2026 15:42:05 -0400 Subject: [PATCH 15/15] fix(csv): allow raw AF fallback in roundtrip validation --- src/laddercodec/csv/writer.py | 20 ++++++- src/laddercodec/instructions/home.py | 2 +- src/laddercodec/instructions/position.py | 2 +- tests/csv/test_writer_raw_validator.py | 68 ++++++++++++++++++++++++ 4 files changed, 89 insertions(+), 3 deletions(-) create mode 100644 tests/csv/test_writer_raw_validator.py diff --git a/src/laddercodec/csv/writer.py b/src/laddercodec/csv/writer.py index 9e4d20f..d627e6b 100644 --- a/src/laddercodec/csv/writer.py +++ b/src/laddercodec/csv/writer.py @@ -39,6 +39,7 @@ ConditionToken, Counter, Drum, + RawInstruction, Shift, Timer, UnknownCondition, @@ -322,6 +323,23 @@ def _display_token(token: object) -> str: return repr(token) +def _af_tokens_match(expected: object, actual: object) -> bool: + """Return True when AF tokens are semantically equivalent for CSV replay.""" + if expected == actual: + return True + + if not isinstance(expected, AfInstruction) or not isinstance(actual, AfInstruction): + return False + + if not isinstance(expected, RawInstruction) and not isinstance(actual, RawInstruction): + return False + + return ( + expected.build_blob() == actual.build_blob() + and expected.cell_params() == actual.cell_params() + ) + + def _rebuild_rung_from_rows(rows: Sequence[Sequence[str]]) -> Rung: """Round-trip emitted CSV rows back into a decoded-style ``Rung``.""" try: @@ -373,7 +391,7 @@ def _validate_roundtrip(rung: Rung, rows: Sequence[Sequence[str]]) -> None: zip(rung.instructions, rebuilt.instructions, strict=True), start=1, ): - if expected != actual: + if not _af_tokens_match(expected, actual): raise WriterError( "CSV round-trip validation failed: " f"AF mismatch at row {row_idx}: " diff --git a/src/laddercodec/instructions/home.py b/src/laddercodec/instructions/home.py index d3ee9b4..738936f 100644 --- a/src/laddercodec/instructions/home.py +++ b/src/laddercodec/instructions/home.py @@ -22,7 +22,7 @@ _TYPE_CODE = 0x2734 _CLASS_NAME = "Home" -_FIELD_COUNT = 21 +_FIELD_COUNT = 22 # --------------------------------------------------------------------------- # Shared field extraction diff --git a/src/laddercodec/instructions/position.py b/src/laddercodec/instructions/position.py index ef7385e..21e9a67 100644 --- a/src/laddercodec/instructions/position.py +++ b/src/laddercodec/instructions/position.py @@ -22,7 +22,7 @@ _TYPE_CODE = 0x2736 _CLASS_NAME = "Position" -_FIELD_COUNT = 18 +_FIELD_COUNT = 19 # --------------------------------------------------------------------------- # Shared field extraction diff --git a/tests/csv/test_writer_raw_validator.py b/tests/csv/test_writer_raw_validator.py new file mode 100644 index 0000000..ac75261 --- /dev/null +++ b/tests/csv/test_writer_raw_validator.py @@ -0,0 +1,68 @@ +from __future__ import annotations + +import pytest + +from laddercodec import Rung +from laddercodec.csv.writer import _validate_roundtrip, decoded_rung_to_rows +from laddercodec.instructions import Email, Home, Position, Velocity + + +@pytest.mark.parametrize( + "instruction", + [ + pytest.param( + Email( + tag_60a5="1", + tag_6235="TEST", + tag_6236="TEST", + tag_6217="TEST", + tag_622a="TEST", + tag_607c="C1", + tag_607b="C2", + tag_607d="C3", + tag_6083="DS1", + ), + id="email", + ), + pytest.param( + Home( + tag_6096="1", + tag_6097="1", + tag_609f="X004", + tag_60a0="X005", + tag_609c="1", + tag_609d="1", + ), + id="home", + ), + pytest.param( + Position( + tag_6098="1", + tag_609b="1", + tag_609c="1", + tag_609d="1", + ), + id="position", + ), + pytest.param( + Velocity( + tag_609b="1", + tag_609c="1", + tag_609d="1", + ), + id="velocity", + ), + ], +) +def test_validate_roundtrip_accepts_equivalent_raw_instruction( + instruction: Email | Home | Position | Velocity, +) -> None: + rung = Rung( + logical_rows=1, + conditions=[["-"] * 31], + instructions=[instruction], + comment=None, + comment_rtf=None, + ) + + _validate_roundtrip(rung, decoded_rung_to_rows(rung))