diff --git a/CHANGELOG.md b/CHANGELOG.md index c76ed28..1e390d9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,18 @@ # Changelog +## 0.1.4 + +### Fixed +- **CSV**: preserve multi-row AF round-trips, including pinned/tall AF blocks + away from row 0, multiple tall blocks in one rung, and generic tall + continuation rows +- **Encode**: suppress AF summary data when a rung contains multi-row AFs, + avoiding grid misalignment and Click paste crashes +- **SCR decode**: restore omitted wiring for generic tall AF continuation rows +- **CSV writer**: fail loudly when emitted CSV rows lose decoded rung + semantics by reparsing writer output and checking for row, condition, or AF + mismatches before writing to disk + ## 0.1.3 ### Fixed diff --git a/docs/guides/troubleshooting.md b/docs/guides/troubleshooting.md new file mode 100644 index 0000000..19e60f9 --- /dev/null +++ b/docs/guides/troubleshooting.md @@ -0,0 +1,114 @@ +# Troubleshooting Encode Failures + +When an encoded binary crashes Click on paste (or renders incorrectly), the problem is usually a structural byte difference in the cell grid — not the instruction blob content. This guide covers how to find it. + +## Tools + +| Tool | What it does | +|---|---| +| `devtools/inspect_bin.py` | Decodes a `.bin` and prints all instructions with `to_csv()` output. Good for checking logical correctness, but doesn't show structural bytes. | +| `inspect_cells()` | Dumps raw cell bytes, header fields, flags, and decoded tokens for specific cells. This is the primary debugging tool for encode failures. | + +`inspect_cells` lives in `laddercodec.decode` (not yet in the public API — import it directly): + +```python +from laddercodec.decode import inspect_cells + +data = open("capture.bin", "rb").read() +dumps = inspect_cells(data, [(0, 0, "AF"), (0, 1, "AF"), (0, 2, "AF")]) +for d in dumps: + print(d) +``` + +The second argument is a list of `(rung_index, visual_row, column_letter)` tuples. Column is `"A"` through `"AE"` for conditions, `"AF"` for the output column. + +Each `CellDump` has: + +- `offset` — absolute byte position in the buffer +- `size` — cell size (0x40 for wire cells, larger for instruction cells) +- `flags` — `(segment, right, down)` from `+0x19/+0x1D/+0x21` +- `token` — decoded instruction or wire token +- `raw` — full cell bytes +- `hex()` — formatted hex dump + +## Workflow + +### 1. Get a native capture + +Paste the same rung shape manually in Click, then copy it back to get a native `.bin`. This is your ground truth. The native capture can come from `clicknick-rung guided` or by manually copying from Click's clipboard (format 522). + +### 2. Compare with inspect_cells + +Run `inspect_cells` on both the native capture and your encoded output. Focus on the AF column and any instruction cells: + +```python +from laddercodec.decode import inspect_cells + +native = open("native.bin", "rb").read() +ours = open("encoded.bin", "rb").read() + +cells = [(0, r, "AF") for r in range(3)] + +for label, data in [("NATIVE", native), ("OURS", ours)]: + print(f"\n=== {label} ===") + for d in inspect_cells(data, cells): + hdr = d.raw[:0x25] + print(f"[{d.row}][{d.col}] size={d.size} flags={d.flags}") + print(f" row_span={hdr[0x09]} vis_rows={hdr[0x0A]}") + print(f" instr_idx={int.from_bytes(hdr[0x0D:0x11], 'little', signed=True)}") + print(f" tail: {d.raw[-16:].hex(' ')}") +``` + +### 3. What to look for + +The cell header is 0x25 (37) bytes. Key fields: + +| Offset | Field | Notes | +|---|---|---| +| +0x01 | column | 4-byte LE, should match the column index | +| +0x05 | global_row | 4-byte LE, `row + 1` (varies by rung position — ok to differ) | +| +0x09 | row_span | How many grid rows this cell occupies | +| +0x0A | visual_rows | Visual sub-row count (1 = normal, 2 = timer, 3 = retained timer) | +| +0x0D | instr_index | 4-byte LE signed. `-1` (0xFFFFFFFF) for data cells | +| +0x15 | contact_flag | 4-byte LE | +| +0x19 | segment | 4-byte LE — load-bearing flag | +| +0x1D | wire_right | 4-byte LE | +| +0x21 | wire_down | 4-byte LE | + +After the header: instruction blob (variable length), then a 16-byte tail. + +**Size differences** are the most important signal. If a cell is 32 bytes larger in your output, you're probably emitting an AF summary block that shouldn't be there — or vice versa. + +**Flag differences** (segment, wire_right, wire_down) can cause visual corruption but usually don't crash Click. + +**Tail differences** are usually cosmetic (rung index encoding, row hints). + +### 4. Don't bother with hex diffs + +Raw `xxd` / hex diffs of the two binaries are nearly useless because: + +- The global header (0x0000–0x0253) contains file paths, font tables, and MDB data that differ between every capture. +- Instruction cells are variable-length, so a single size difference shifts every subsequent byte. +- Comment RTF formatting varies slightly (e.g. `\par ` vs `\r\n\par `), shifting the entire grid region. + +`inspect_cells` handles all of this — it walks the variable-length grid correctly and gives you per-cell structural data. + +## Known pitfalls + +### AF summary block + +When a single-rung buffer has 2+ AF instructions, the encoder appends a 32-byte summary block to the last AF cell. **This summary must be suppressed when any AF is multi-row** (e.g. a retained timer with a `.reset()` pin row). + +The summary is built by `_build_af_summary()` in `encode.py` and gated by `_compute_rung_metadata()` in `_grid.py`. If your rung has a mix of single-row AFs (coils, copies) and multi-row AFs (timers), check that the summary isn't being emitted. + +Symptom: the last AF instruction cell is ~32 bytes larger than in the native capture. Click crashes on paste because the extra bytes misalign the rest of the grid. + +### Tall instruction visual_rows + +The byte at cell offset +0x0A (`visual_rows`) is not always the same as the instruction's `cell_params()["visual_rows"]`. For instructions with pin rows (retained timers, counters, shifts, drums), the native binary may use a different value that accounts for the pin rows. + +If your instruction cell is the right size but Click still renders it wrong, compare +0x09 and +0x0A against the native capture. + +### Payload space slots + +The 31 slots between the rung preamble (0x0260) and the cell grid (0x0A60) contain structural bytes that differ between our encoder and native Click. These differences are tolerated — Click reads them but doesn't crash on mismatches. Don't chase these. diff --git a/mkdocs.yml b/mkdocs.yml index 36447cf..b24149c 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -13,6 +13,7 @@ nav: - Decoding: guides/decoding.md - CSV Format: guides/csv-format.md - Adding Instructions: guides/adding-instructions.md + - Troubleshooting: guides/troubleshooting.md - Internals: - Binary Format: internals/binary-format.md - Wire Rendering: internals/wire-rendering.md diff --git a/src/laddercodec/_grid.py b/src/laddercodec/_grid.py index 363de74..456f41e 100644 --- a/src/laddercodec/_grid.py +++ b/src/laddercodec/_grid.py @@ -137,10 +137,17 @@ def _compute_rung_metadata( idx += 1 # AF summary block — needed on the last AF instruction cell when 2+ AFs - # (single-rung only; multi-rung does not use af_summary). + # are all single-row (single-rung only; multi-rung does not use + # af_summary). Suppressed when any AF is multi-row (e.g. retained + # timer with reset pin) — native captures omit it in that case. af_summary_block = b"" af_rows = sorted(af_instr_indices.keys()) - if single_rung and len(af_rows) >= 2: + any_multi_row_af = any( + tok.cell_params().get("visual_rows", 1) > 1 + for r in af_rows + if isinstance((tok := af_tokens[r]), AfInstruction) + ) + if single_rung and len(af_rows) >= 2 and not any_multi_row_af: af_entries: list[tuple[int, int, bool]] = [] for r in af_rows: cond_count = sum(1 for t in condition_rows[r] if isinstance(t, ConditionInstruction)) diff --git a/src/laddercodec/csv/converter.py b/src/laddercodec/csv/converter.py index 5396fa0..b49a8bc 100644 --- a/src/laddercodec/csv/converter.py +++ b/src/laddercodec/csv/converter.py @@ -7,8 +7,11 @@ - AF token conversion (coils, timers, NOP) - Pin continuation rows (``.reset()``, ``.down()``, etc.) — absorbed into the parent instruction rather than emitted as separate rows -- Auto-padding: tall AF instructions (timers, counters) get a blank - continuation row appended when the user provides only one row +- Multi-row AF instructions: pinned families (timers, counters, shift, + drum) absorb their continuation rows; other tall AF instructions keep + blank-AF continuation rows as part of the same visual block +- Auto-padding: tall AF instructions get blank continuation rows + appended when the user omits them Pin rows -------- @@ -27,14 +30,15 @@ Tall instructions ----------------- -Some AF instructions occupy more than one grid row visually (timers = 2). -If the user CSV has only the instruction row, a blank padding row is -auto-appended so ``encode_rung()`` receives the correct ``logical_rows``. +Some AF instructions occupy more than one grid row visually (timers = 2, +search/copy = 2, send/receive = 3, etc.). If the user CSV omits blank +continuation rows, they are auto-appended so ``encode_rung()`` receives +the correct ``logical_rows``. """ from __future__ import annotations -from dataclasses import replace +from dataclasses import dataclass, field, replace from typing import cast from ..encode import AfToken, ConditionToken @@ -61,6 +65,7 @@ GenericCondition, HorizontalWire, JunctionDownWire, + RowAst, RungAst, VerticalPassThroughWire, ) @@ -78,6 +83,405 @@ # --------------------------------------------------------------------------- CONDITION_COLUMNS = 31 +_PINNED_BLOCK_FAMILIES = frozenset({"timer", "counter", "shift", "drum"}) + + +@dataclass +class _ParsedRow: + """One CSV data row converted into condition tokens plus AF classification.""" + + conditions: list[ConditionToken] + kind: str + token: AfToken = "" + family_name: str | None = None + pin_name: str | None = None + pin_args: tuple[str, ...] = () + + +@dataclass +class _ActiveBlock: + """A multi-row AF block that is still gathering continuation rows.""" + + family_name: str | None + token: AfInstruction + main_conditions: list[ConditionToken] + leading_blank_conditions: list[ConditionToken] | None = None + continuations: list[_ParsedRow] = field(default_factory=list) + + +def _blank_condition_row() -> list[ConditionToken]: + """Return one all-blank condition row.""" + return cast(list[ConditionToken], [""] * CONDITION_COLUMNS) + + +def _af_visual_rows(token: AfToken) -> int: + """Return the AF token's visual row count.""" + if isinstance(token, AfInstruction): + return max(1, int(token.cell_params().get("visual_rows", 1))) + return 1 + + +def _parse_rung_row(row: RowAst, *, strict: bool) -> _ParsedRow: + """Convert one parsed CSV row into tokens plus AF-row classification.""" + conds = [condition_node_to_token(n) for n in row.condition_nodes] + af_node = row.af_node + + if isinstance(af_node, AfCall) and af_node.name.startswith("."): + pin_name = af_node.name + if pin_name not in KNOWN_PIN_NAMES: + raise ConvertError(f"Unknown pin token: {pin_name!r}") + return _ParsedRow( + conditions=conds, + kind="pin", + pin_name=pin_name, + pin_args=af_node.args, + ) + + token = af_node_to_token(af_node, strict=strict) + if token == "": + return _ParsedRow(conditions=conds, kind="blank") + if token == "NOP": + return _ParsedRow(conditions=conds, kind="nop", token="NOP") + + family = get_af_family_for_token(token) + visual_rows = _af_visual_rows(token) + if family is not None and (family.family_name in _PINNED_BLOCK_FAMILIES or visual_rows > 1): + return _ParsedRow( + conditions=conds, + kind="block_main", + token=token, + family_name=family.family_name, + ) + if visual_rows > 1: + return _ParsedRow( + conditions=conds, + kind="block_main", + token=token, + ) + return _ParsedRow(conditions=conds, kind="main", token=token) + + +def _flush_plain_rows( + plain_rows: list[_ParsedRow], + condition_rows: list[list[ConditionToken]], + af_tokens: list[AfToken], +) -> None: + """Append buffered non-block rows to the final decode-ready output.""" + for row in plain_rows: + condition_rows.append(row.conditions) + af_tokens.append(row.token) + plain_rows.clear() + + +def _start_active_block( + row: _ParsedRow, + *, + leading_blank_conditions: list[ConditionToken] | None = None, +) -> _ActiveBlock: + """Create a new active multi-row AF block from its main AF row.""" + if not isinstance(row.token, AfInstruction): + raise AssertionError("block_main row must carry an AF instruction token") + return _ActiveBlock( + family_name=row.family_name, + token=row.token, + main_conditions=row.conditions, + leading_blank_conditions=leading_blank_conditions, + ) + + +def _max_continuations(block: _ActiveBlock) -> int: + """Maximum number of continuation rows a block type can absorb.""" + return max(0, _af_visual_rows(block.token) - 1) + + +def _consume_active_row(block: _ActiveBlock, row: _ParsedRow) -> bool: + """Consume a continuation row for the active block when it belongs there.""" + if len(block.continuations) >= _max_continuations(block): + return False + + if row.kind == "blank": + block.continuations.append(row) + return True + + if row.kind == "pin": + pin_name = cast(str, row.pin_name) + if isinstance(block.token, Counter): + if pin_name not in {".down", ".reset"}: + raise ConvertError( + f"Pin token {pin_name!r} is not supported for {block.token.counter_type}()" + ) + block.continuations.append(row) + return True + + if isinstance(block.token, Drum): + if pin_name not in {".reset", ".jump", ".jog"}: + raise ConvertError( + f"Pin token {pin_name!r} is not supported for {block.token.drum_kind}_drum()" + ) + block.continuations.append(row) + return True + + if isinstance(block.token, Shift): + if pin_name not in {".clock", ".reset"}: + raise ConvertError(f"Pin token {pin_name!r} is not supported for shift()") + block.continuations.append(row) + return True + + # Timers remain lenient for non-.reset() pin rows: absorb them as blank rows. + if isinstance(block.token, Timer): + block.continuations.append(row) + return True + + if isinstance(block.token, Counter) and block.token.counter_type == "count_down": + if row.kind == "nop": + block.continuations.append(row) + return True + + return False + + +def _finalize_timer_block(block: _ActiveBlock) -> tuple[list[list[ConditionToken]], list[AfToken]]: + """Expand one timer block back to its decoded 2-row span.""" + timer = cast(Timer, block.token) + continuation_conditions = _blank_condition_row() + if block.continuations: + continuation = block.continuations[0] + if continuation.kind == "nop": + raise ConvertError("Timer continuation rows cannot use NOP") + continuation_conditions = continuation.conditions + if continuation.kind == "pin" and continuation.pin_name in _RETENTIVE_PINS: + timer = replace(timer, retained=True) + + return [block.main_conditions, continuation_conditions], [timer, ""] + + +def _finalize_count_up_block( + block: _ActiveBlock, +) -> tuple[list[list[ConditionToken]], list[AfToken]]: + """Expand one count_up block back to its decoded 3-row span.""" + counter = cast(Counter, block.token) + middle_conditions: list[ConditionToken] | None = None + reset_conditions: list[ConditionToken] | None = None + + for row in block.continuations: + if row.kind == "nop": + raise ConvertError("count_up does not support NOP bridge rows") + + if row.kind == "pin": + pin_name = cast(str, row.pin_name) + if pin_name == ".down": + counter = replace(counter, down_enabled=True) + middle_conditions = row.conditions + continue + if pin_name == ".reset": + counter = replace(counter, reset_enabled=True) + reset_conditions = row.conditions + continue + + if middle_conditions is None: + middle_conditions = row.conditions + continue + if reset_conditions is None: + reset_conditions = row.conditions + continue + + if not counter.reset_enabled: + raise ConvertError(f"{counter.counter_type} requires a .reset() pin row") + + if middle_conditions is None: + middle_conditions = _blank_condition_row() + if reset_conditions is None: + reset_conditions = _blank_condition_row() + + return [block.main_conditions, middle_conditions, reset_conditions], [counter, "", ""] + + +def _finalize_count_down_block( + block: _ActiveBlock, +) -> tuple[list[list[ConditionToken]], list[AfToken]]: + """Expand one count_down block back to its decoded 3-row span.""" + counter = cast(Counter, block.token) + explicit_bridge_conditions: list[ConditionToken] | None = None + reset_conditions: list[ConditionToken] | None = None + + for row in block.continuations: + if row.kind == "pin": + pin_name = cast(str, row.pin_name) + if pin_name == ".down": + raise ConvertError("count_down does not support .down()") + if pin_name == ".reset": + counter = replace(counter, reset_enabled=True) + reset_conditions = row.conditions + continue + + if row.kind == "nop": + if block.leading_blank_conditions is not None: + raise ConvertError("count_down block cannot mix a leading blank top row with NOP") + if explicit_bridge_conditions is not None: + raise ConvertError("count_down block uses more than one bridge row") + explicit_bridge_conditions = row.conditions + continue + + if block.leading_blank_conditions is None and explicit_bridge_conditions is None: + explicit_bridge_conditions = row.conditions + continue + if reset_conditions is None: + reset_conditions = row.conditions + continue + + if not counter.reset_enabled: + raise ConvertError(f"{counter.counter_type} requires a .reset() pin row") + + if reset_conditions is None: + reset_conditions = _blank_condition_row() + + if block.leading_blank_conditions is not None: + condition_rows = [ + block.leading_blank_conditions, + block.main_conditions, + reset_conditions, + ] + elif explicit_bridge_conditions is not None: + condition_rows = [ + block.main_conditions, + explicit_bridge_conditions, + reset_conditions, + ] + else: + condition_rows = [ + _blank_condition_row(), + block.main_conditions, + reset_conditions, + ] + + return condition_rows, [counter, "NOP", ""] + + +def _finalize_counter_block( + block: _ActiveBlock, +) -> tuple[list[list[ConditionToken]], list[AfToken]]: + """Expand one counter block back to its decoded 3-row span.""" + counter = cast(Counter, block.token) + if counter.counter_type == "count_up": + return _finalize_count_up_block(block) + return _finalize_count_down_block(block) + + +def _finalize_shift_block(block: _ActiveBlock) -> tuple[list[list[ConditionToken]], list[AfToken]]: + """Expand one shift block back to its decoded 3-row span.""" + shift = cast(Shift, block.token) + clock_conditions: list[ConditionToken] | None = None + reset_conditions: list[ConditionToken] | None = None + + for row in block.continuations: + if row.kind == "nop": + raise ConvertError("Shift continuation rows cannot use NOP") + + if row.kind == "pin": + pin_name = cast(str, row.pin_name) + if pin_name == ".clock": + clock_conditions = row.conditions + continue + if pin_name == ".reset": + reset_conditions = row.conditions + continue + + if clock_conditions is None: + clock_conditions = row.conditions + continue + if reset_conditions is None: + reset_conditions = row.conditions + continue + + if clock_conditions is None: + clock_conditions = _blank_condition_row() + if reset_conditions is None: + reset_conditions = _blank_condition_row() + + return [block.main_conditions, clock_conditions, reset_conditions], [shift, "", ""] + + +def _finalize_drum_block(block: _ActiveBlock) -> tuple[list[list[ConditionToken]], list[AfToken]]: + """Expand one drum block back to its decoded 4-row span.""" + drum = cast(Drum, block.token) + reset_conditions: list[ConditionToken] | None = None + jump_conditions: list[ConditionToken] | None = None + jog_conditions: list[ConditionToken] | None = None + + for row in block.continuations: + if row.kind == "nop": + raise ConvertError("Drum continuation rows cannot use NOP") + + if row.kind == "pin": + pin_name = cast(str, row.pin_name) + if pin_name == ".reset": + reset_conditions = row.conditions + continue + if pin_name == ".jump": + jump_target = row.pin_args[0] if row.pin_args else "" + drum = replace(drum, jump_enabled=True, jump_target=jump_target) + jump_conditions = row.conditions + continue + if pin_name == ".jog": + drum = replace(drum, jog_enabled=True) + jog_conditions = row.conditions + continue + + if reset_conditions is None: + reset_conditions = row.conditions + continue + if jump_conditions is None: + jump_conditions = row.conditions + continue + if jog_conditions is None: + jog_conditions = row.conditions + continue + + if reset_conditions is None: + raise ConvertError("Drum requires a .reset() pin row") + + if jump_conditions is None: + jump_conditions = _blank_condition_row() + if jog_conditions is None: + jog_conditions = _blank_condition_row() + + return [ + block.main_conditions, + reset_conditions, + jump_conditions, + jog_conditions, + ], [drum, "", "", ""] + + +def _finalize_generic_tall_block( + block: _ActiveBlock, +) -> tuple[list[list[ConditionToken]], list[AfToken]]: + """Expand a non-pinned tall AF block to its full visual row span.""" + visual_rows = _af_visual_rows(block.token) + condition_rows = [block.main_conditions] + af_tokens: list[AfToken] = [block.token] + + for offset in range(visual_rows - 1): + if offset < len(block.continuations): + condition_rows.append(block.continuations[offset].conditions) + else: + condition_rows.append(_blank_condition_row()) + af_tokens.append("") + + return condition_rows, af_tokens + + +def _finalize_active_block(block: _ActiveBlock) -> tuple[list[list[ConditionToken]], list[AfToken]]: + """Expand the current active block into decode-ready rows/tokens.""" + if isinstance(block.token, Timer): + return _finalize_timer_block(block) + if isinstance(block.token, Counter): + return _finalize_counter_block(block) + if isinstance(block.token, Shift): + return _finalize_shift_block(block) + if isinstance(block.token, Drum): + return _finalize_drum_block(block) + return _finalize_generic_tall_block(block) def condition_node_to_token(node: object) -> ConditionToken: @@ -167,229 +571,46 @@ def convert_rung( lines = [r.canonical.conditions[0] for r in rung.comment_rows] comment = "\n".join(lines) - # --- Classify rows: instruction rows vs pin rows --- condition_rows: list[list[ConditionToken]] = [] af_tokens: list[AfToken] = [] - parent_timer: Timer | None = None - parent_counter: Counter | None = None - parent_shift: Shift | None = None - parent_drum: Drum | None = None - counter_up_conditions: list[ConditionToken] | None = None - counter_top_conditions: list[ConditionToken] | None = None - counter_bridge_conditions: list[ConditionToken] | None = None - counter_down_conditions: list[ConditionToken] | None = None - counter_reset_conditions: list[ConditionToken] | None = None - shift_data_conditions: list[ConditionToken] | None = None - shift_clock_conditions: list[ConditionToken] | None = None - shift_reset_conditions: list[ConditionToken] | None = None - drum_reset_conditions: list[ConditionToken] | None = None - drum_jump_conditions: list[ConditionToken] | None = None - drum_jog_conditions: list[ConditionToken] | None = None - - for row_idx, row in enumerate(rung.rows): - af_node = row.af_node - - # Check for pin row (dot-prefixed AF token). - if isinstance(af_node, AfCall) and af_node.name.startswith("."): - pin_name = af_node.name - if pin_name not in KNOWN_PIN_NAMES: - raise ConvertError(f"Unknown pin token: {pin_name!r}") - - conds = [condition_node_to_token(n) for n in row.condition_nodes] - - if parent_counter is not None: - if pin_name == ".down": - if parent_counter.counter_type != "count_up": - raise ConvertError(".down() is only valid for count_up()") - counter_down_conditions = conds - parent_counter = replace(parent_counter, down_enabled=True) - af_tokens[0] = parent_counter - continue - - if pin_name == ".reset": - counter_reset_conditions = conds - parent_counter = replace(parent_counter, reset_enabled=True) - af_tokens[0] = parent_counter - continue - - raise ConvertError( - f"Pin token {pin_name!r} is not supported for {parent_counter.counter_type}()" - ) - - if parent_drum is not None: - if pin_name == ".reset": - drum_reset_conditions = conds - continue - if pin_name == ".jump": - jump_target = af_node.args[0] if af_node.args else "" - parent_drum = replace(parent_drum, jump_enabled=True, jump_target=jump_target) - af_tokens[0] = parent_drum - drum_jump_conditions = conds - continue - if pin_name == ".jog": - parent_drum = replace(parent_drum, jog_enabled=True) - af_tokens[0] = parent_drum - drum_jog_conditions = conds - continue - raise ConvertError( - f"Pin token {pin_name!r} is not supported for {parent_drum.drum_kind}_drum()" - ) - - if parent_shift is not None: - if pin_name == ".clock": - shift_clock_conditions = conds - continue - if pin_name == ".reset": - shift_reset_conditions = conds - continue - raise ConvertError(f"Pin token {pin_name!r} is not supported for shift()") - - if pin_name in _RETENTIVE_PINS and parent_timer is not None: - # .reset() makes the parent timer retentive. - parent_timer = replace(parent_timer, retained=True) - # Update af_tokens[0] with the modified timer. - af_tokens[0] = parent_timer - - # Pin row contributes its conditions/wires as a normal data row, - # but the AF token becomes blank (no separate instruction). - condition_rows.append(conds) - af_tokens.append("") + plain_rows: list[_ParsedRow] = [] + active_block: _ActiveBlock | None = None + + for row in rung.rows: + parsed = _parse_rung_row(row, strict=strict) + + if active_block is not None: + if _consume_active_row(active_block, parsed): + continue + block_conditions, block_afs = _finalize_active_block(active_block) + condition_rows.extend(block_conditions) + af_tokens.extend(block_afs) + active_block = None + + if parsed.kind == "block_main": + leading_blank_conditions: list[ConditionToken] | None = None + if isinstance(parsed.token, Counter) and parsed.token.counter_type == "count_down": + if plain_rows and plain_rows[-1].kind == "blank": + leading_blank_conditions = plain_rows.pop().conditions + _flush_plain_rows(plain_rows, condition_rows, af_tokens) + active_block = _start_active_block( + parsed, + leading_blank_conditions=leading_blank_conditions, + ) continue - # Normal row — convert conditions. - conds = [condition_node_to_token(n) for n in row.condition_nodes] - - # Convert AF token. - token = af_node_to_token(af_node, strict=strict) - if ( - parent_counter is None - and isinstance(token, Counter) - and token.counter_type == "count_down" - and row_idx > 0 - ): - if len(condition_rows) != 1 or af_tokens != [""]: - raise ConvertError("count_down visual layout requires exactly one blank-AF top row") - counter_top_conditions = condition_rows.pop() - af_tokens.pop() - condition_rows.append(conds) - af_tokens.append(token) - parent_counter = token - counter_up_conditions = conds + if parsed.kind == "pin": + plain_rows.append(_ParsedRow(conditions=parsed.conditions, kind="blank")) continue - if ( - parent_counter is not None - and parent_counter.counter_type == "count_down" - and row_idx > 0 - and counter_bridge_conditions is None - and token in ("", "NOP") - ): - counter_bridge_conditions = conds - continue + plain_rows.append(parsed) - condition_rows.append(conds) - af_tokens.append(token) - if row_idx == 0 and isinstance(af_node, AfCall) and af_node.name.upper() != "NOP": - if isinstance(token, Timer): - parent_timer = token - if isinstance(token, Counter): - parent_counter = token - counter_up_conditions = conds - if isinstance(token, Shift): - parent_shift = token - shift_data_conditions = conds - if isinstance(token, Drum): - parent_drum = token - - # --- Drum row shaping --- - if parent_drum is not None: - if drum_reset_conditions is None: - raise ConvertError("Drum requires a .reset() pin row") - - if len(condition_rows) != 1: - raise ConvertError( - "Drum pin rows must use .reset()/.jump()/.jog() tokens " - "(blank-AF continuation rows are unsupported)" - ) + if active_block is not None: + block_conditions, block_afs = _finalize_active_block(active_block) + condition_rows.extend(block_conditions) + af_tokens.extend(block_afs) - main_conds = condition_rows[0] - blank_row = cast(list[ConditionToken], [""] * CONDITION_COLUMNS) - - condition_rows = [ - main_conds, - drum_reset_conditions, - drum_jump_conditions if drum_jump_conditions is not None else blank_row, - drum_jog_conditions if drum_jog_conditions is not None else blank_row, - ] - af_tokens = [parent_drum, "", "", ""] - - # --- Counter row shaping --- - if parent_counter is not None: - if counter_up_conditions is None: - raise ConvertError("Counter rung is missing a primary row") - - if len(condition_rows) != 1: - raise ConvertError( - "Counter pin rows must use .down()/.reset() tokens (blank-AF continuation rows are unsupported)" - ) - - if counter_reset_conditions is None: - raise ConvertError(f"{parent_counter.counter_type} requires a .reset() pin row") - - assert counter_up_conditions is not None - assert counter_reset_conditions is not None - blank_row = cast(list[ConditionToken], [""] * CONDITION_COLUMNS) - - if parent_counter.counter_type == "count_up": - if parent_counter.down_enabled: - if counter_down_conditions is None: - raise ConvertError("count_up with .down() is missing down conditions") - middle_row = counter_down_conditions - else: - middle_row = blank_row - condition_rows = [counter_up_conditions, middle_row, counter_reset_conditions] - af_tokens = [parent_counter, "", ""] - else: - if counter_down_conditions is not None: - raise ConvertError("count_down does not support .down()") - if counter_top_conditions is not None: - condition_rows = [ - counter_top_conditions, - counter_up_conditions, - counter_reset_conditions, - ] - elif counter_bridge_conditions is not None: - condition_rows = [ - counter_up_conditions, - counter_bridge_conditions, - counter_reset_conditions, - ] - else: - condition_rows = [blank_row, counter_up_conditions, counter_reset_conditions] - af_tokens = [parent_counter, "NOP", ""] - - # --- Shift row shaping --- - if parent_shift is not None: - if shift_data_conditions is None: - raise ConvertError("Shift rung is missing a primary row") - - if len(condition_rows) != 1: - raise ConvertError( - "Shift pin rows must use .clock()/.reset() tokens (blank-AF continuation rows are unsupported)" - ) - - clock_conditions = ( - shift_clock_conditions - if shift_clock_conditions is not None - else cast(list[ConditionToken], [""] * CONDITION_COLUMNS) - ) - reset_conditions = ( - shift_reset_conditions - if shift_reset_conditions is not None - else cast(list[ConditionToken], [""] * CONDITION_COLUMNS) - ) - condition_rows = [shift_data_conditions, clock_conditions, reset_conditions] - af_tokens = [parent_shift, "", ""] + _flush_plain_rows(plain_rows, condition_rows, af_tokens) # --- Auto-pad for tall instructions --- af0 = af_tokens[0] if af_tokens else "" @@ -402,48 +623,3 @@ def convert_rung( logical_rows = len(condition_rows) return logical_rows, condition_rows, af_tokens, comment - - -# --------------------------------------------------------------------------- -# Decode-side: strip blank padding rows from tall instructions -# --------------------------------------------------------------------------- - - -def _is_blank_row( - conditions: list[object], - af: object, -) -> bool: - """Return True if all conditions are blank and AF is blank.""" - if af != "": - return False - return all(c == "" for c in conditions) - - -def strip_tall_padding( - logical_rows: int, - condition_rows: list[list[object]], - af_tokens: list[object], -) -> tuple[int, list[list[object]], list[object]]: - """Remove trailing blank rows that are just visual padding for tall instructions. - - After decoding, a timer rung may have a trailing all-blank row that - exists only because the timer cell is visually tall. This function - strips such rows so the decoded output matches the user's CSV input. - - Rows with any content (wires, contacts, etc.) are kept. - """ - if logical_rows < 2: - return logical_rows, condition_rows, af_tokens - - # Check if row 0 has a tall AF instruction. - af0 = af_tokens[0] - is_tall = isinstance(af0, AfInstruction) and af0.cell_params().get("visual_rows", 1) > 1 - if not is_tall: - return logical_rows, condition_rows, af_tokens - - # Strip trailing blank rows (from the end, in case of future >2-row tall). - while len(condition_rows) > 1 and _is_blank_row(condition_rows[-1], af_tokens[-1]): - condition_rows = condition_rows[:-1] - af_tokens = af_tokens[:-1] - - return len(condition_rows), condition_rows, af_tokens diff --git a/src/laddercodec/csv/parser.py b/src/laddercodec/csv/parser.py index 41e08d0..09b4790 100644 --- a/src/laddercodec/csv/parser.py +++ b/src/laddercodec/csv/parser.py @@ -3,6 +3,7 @@ from __future__ import annotations import csv +from collections.abc import Sequence from pathlib import Path from typing import Literal @@ -88,6 +89,17 @@ def _segment_rungs(rows: tuple[RowAst, ...]) -> tuple[RungAst, ...]: return tuple(rungs) +def _parse_single_rung_rows(row_fields: Sequence[Sequence[str]]) -> RungAst: + """Parse in-memory canonical row fields into exactly one ``RungAst``.""" + rows = tuple(_row_ast(_canonical_row_from_fields(list(fields))) for fields in row_fields) + rungs = _segment_rungs(rows) + if not rungs: + raise ValueError("Expected one rung in emitted CSV rows; got none") + if len(rungs) != 1: + raise ValueError(f"Expected one rung in emitted CSV rows; got {len(rungs)}") + return rungs[0] + + def _load_canonical_rows(path: Path) -> tuple[CanonicalRow, ...]: rows: list[CanonicalRow] = [] with path.open("r", encoding="utf-8-sig", newline="") as handle: diff --git a/src/laddercodec/csv/writer.py b/src/laddercodec/csv/writer.py index 9b3e6c9..d627e6b 100644 --- a/src/laddercodec/csv/writer.py +++ b/src/laddercodec/csv/writer.py @@ -34,16 +34,21 @@ from ..decode import Rung from ..instructions import ( AfInstruction, + AfToken, ConditionInstruction, + ConditionToken, Counter, Drum, + RawInstruction, Shift, Timer, UnknownCondition, UnknownInstruction, get_af_family_for_token, ) -from .contract import CSV_HEADER +from .contract import CONDITION_COLUMNS, CSV_HEADER +from .converter import convert_rung +from .parser import _parse_single_rung_rows class WriterError(ValueError): @@ -86,6 +91,314 @@ def _conditions_are_blank(conditions: Sequence[object]) -> bool: return all(c == "" for c in conditions) +def _append_data_row( + rows: list[list[str]], + data_row_count: int, + conditions: Sequence[object], + af: object, +) -> int: + """Append one CSV data row and return the new emitted-data-row count.""" + marker = "R" if data_row_count == 0 else "" + rows.append([marker] + [_token_to_csv(c) for c in conditions] + [_token_to_csv(af)]) + return data_row_count + 1 + + +def _emit_blank_continuation( + rows: list[list[str]], + data_row_count: int, + conditions: Sequence[object], +) -> int: + """Emit a blank-AF continuation row only when it carries geometry.""" + if _conditions_are_blank(conditions): + return data_row_count + return _append_data_row(rows, data_row_count, conditions, "") + + +def _require_blank_af(af: object, *, message: str) -> None: + """Ensure a consumed continuation row does not hide another AF token.""" + if af != "": + raise WriterError(message) + + +def _emit_timer_block( + rows: list[list[str]], + data_row_count: int, + condition_rows: Sequence[Sequence[object]], + af_tokens: Sequence[object], + start: int, + timer: Timer, +) -> tuple[int, int]: + """Emit one timer block and return ``(new_count, consumed_rows)``.""" + needed = start + 2 + if len(condition_rows) < needed or len(af_tokens) < needed: + raise WriterError(f"timer requires {needed} decoded rows; got {len(condition_rows)}") + + _require_blank_af( + af_tokens[start + 1], + message="timer continuation row must be blank AF", + ) + + data_row_count = _append_data_row(rows, data_row_count, condition_rows[start], timer) + if timer.retained: + data_row_count = _append_data_row( + rows, data_row_count, condition_rows[start + 1], ".reset()" + ) + else: + data_row_count = _emit_blank_continuation(rows, data_row_count, condition_rows[start + 1]) + return data_row_count, 2 + + +def _emit_counter_block( + rows: list[list[str]], + data_row_count: int, + condition_rows: Sequence[Sequence[object]], + af_tokens: Sequence[object], + start: int, + counter: Counter, +) -> tuple[int, int]: + """Emit one counter block and return ``(new_count, consumed_rows)``.""" + needed = start + 3 + if len(condition_rows) < needed or len(af_tokens) < needed: + raise WriterError( + f"{counter.counter_type} requires {needed} decoded rows; got {len(condition_rows)}" + ) + + if counter.counter_type == "count_up": + _require_blank_af( + af_tokens[start + 1], + message="count_up continuation row must be blank AF", + ) + _require_blank_af( + af_tokens[start + 2], + message="count_up reset row must be blank AF", + ) + data_row_count = _append_data_row(rows, data_row_count, condition_rows[start], counter) + if counter.down_enabled: + data_row_count = _append_data_row( + rows, data_row_count, condition_rows[start + 1], ".down()" + ) + else: + data_row_count = _emit_blank_continuation( + rows, data_row_count, condition_rows[start + 1] + ) + if counter.reset_enabled: + data_row_count = _append_data_row( + rows, data_row_count, condition_rows[start + 2], ".reset()" + ) + else: + data_row_count = _emit_blank_continuation( + rows, data_row_count, condition_rows[start + 2] + ) + return data_row_count, 3 + + if af_tokens[start + 1] != "NOP": + raise WriterError("count_down requires a NOP bridge row after the counter") + _require_blank_af( + af_tokens[start + 2], + message="count_down reset row must be blank AF", + ) + + counter_conditions = condition_rows[start] + bridge_conditions = condition_rows[start + 1] + if _conditions_are_blank(counter_conditions): + data_row_count = _append_data_row(rows, data_row_count, bridge_conditions, counter) + else: + data_row_count = _append_data_row(rows, data_row_count, counter_conditions, "") + data_row_count = _append_data_row(rows, data_row_count, bridge_conditions, counter) + + if counter.reset_enabled: + data_row_count = _append_data_row( + rows, data_row_count, condition_rows[start + 2], ".reset()" + ) + else: + data_row_count = _emit_blank_continuation(rows, data_row_count, condition_rows[start + 2]) + return data_row_count, 3 + + +def _emit_shift_block( + rows: list[list[str]], + data_row_count: int, + condition_rows: Sequence[Sequence[object]], + af_tokens: Sequence[object], + start: int, + shift: Shift, +) -> tuple[int, int]: + """Emit one shift block and return ``(new_count, consumed_rows)``.""" + needed = start + 3 + if len(condition_rows) < needed or len(af_tokens) < needed: + raise WriterError(f"shift requires {needed} decoded rows; got {len(condition_rows)}") + + _require_blank_af( + af_tokens[start + 1], + message="shift .clock() row must be blank AF", + ) + _require_blank_af( + af_tokens[start + 2], + message="shift .reset() row must be blank AF", + ) + + data_row_count = _append_data_row(rows, data_row_count, condition_rows[start], shift) + data_row_count = _append_data_row(rows, data_row_count, condition_rows[start + 1], ".clock()") + data_row_count = _append_data_row(rows, data_row_count, condition_rows[start + 2], ".reset()") + return data_row_count, 3 + + +def _emit_drum_block( + rows: list[list[str]], + data_row_count: int, + condition_rows: Sequence[Sequence[object]], + af_tokens: Sequence[object], + start: int, + drum: Drum, +) -> tuple[int, int]: + """Emit one drum block and return ``(new_count, consumed_rows)``.""" + needed = start + 4 + if len(condition_rows) < needed or len(af_tokens) < needed: + raise WriterError(f"Drum requires {needed} decoded rows; got {len(condition_rows)}") + + _require_blank_af( + af_tokens[start + 1], + message="drum .reset() row must be blank AF", + ) + _require_blank_af( + af_tokens[start + 2], + message="drum .jump() row must be blank AF", + ) + _require_blank_af( + af_tokens[start + 3], + message="drum .jog() row must be blank AF", + ) + + data_row_count = _append_data_row(rows, data_row_count, condition_rows[start], drum) + data_row_count = _append_data_row(rows, data_row_count, condition_rows[start + 1], ".reset()") + if drum.jump_enabled: + data_row_count = _append_data_row( + rows, + data_row_count, + condition_rows[start + 2], + f".jump({drum.jump_target})", + ) + else: + data_row_count = _emit_blank_continuation(rows, data_row_count, condition_rows[start + 2]) + if drum.jog_enabled: + data_row_count = _append_data_row(rows, data_row_count, condition_rows[start + 3], ".jog()") + else: + data_row_count = _emit_blank_continuation(rows, data_row_count, condition_rows[start + 3]) + return data_row_count, 4 + + +def _emit_generic_tall_block( + rows: list[list[str]], + data_row_count: int, + condition_rows: Sequence[Sequence[object]], + af_tokens: Sequence[object], + start: int, + af: AfInstruction, +) -> tuple[int, int]: + """Emit a non-pinned tall AF block, preserving only nonblank continuation rows.""" + visual_rows = int(af.cell_params().get("visual_rows", 1)) + needed = start + visual_rows + if len(condition_rows) < needed or len(af_tokens) < needed: + raise WriterError( + f"{type(af).__name__} requires {needed} decoded rows; got {len(condition_rows)}" + ) + + data_row_count = _append_data_row(rows, data_row_count, condition_rows[start], af) + for offset in range(1, visual_rows): + _require_blank_af( + af_tokens[start + offset], + message=f"{type(af).__name__} continuation row must be blank AF", + ) + data_row_count = _emit_blank_continuation( + rows, data_row_count, condition_rows[start + offset] + ) + return data_row_count, visual_rows + + +def _display_token(token: object) -> str: + """Return a compact user-facing representation for mismatch errors.""" + try: + return repr(_token_to_csv(token)) + except WriterError: + return repr(token) + + +def _af_tokens_match(expected: object, actual: object) -> bool: + """Return True when AF tokens are semantically equivalent for CSV replay.""" + if expected == actual: + return True + + if not isinstance(expected, AfInstruction) or not isinstance(actual, AfInstruction): + return False + + if not isinstance(expected, RawInstruction) and not isinstance(actual, RawInstruction): + return False + + return ( + expected.build_blob() == actual.build_blob() + and expected.cell_params() == actual.cell_params() + ) + + +def _rebuild_rung_from_rows(rows: Sequence[Sequence[str]]) -> Rung: + """Round-trip emitted CSV rows back into a decoded-style ``Rung``.""" + try: + rung_ast = _parse_single_rung_rows(rows) + logical_rows, conditions, instructions, comment = convert_rung(rung_ast) + except (TypeError, ValueError) as exc: + raise WriterError( + f"CSV round-trip validation failed: emitted rows do not reparse: {exc}" + ) from exc + + return Rung( + logical_rows=logical_rows, + conditions=cast(list[list[ConditionToken]], conditions), + instructions=cast(list[AfToken], instructions), + comment=comment, + comment_rtf=None, + ) + + +def _validate_roundtrip(rung: Rung, rows: Sequence[Sequence[str]]) -> None: + """Fail loudly when emitted CSV rows lose information from the decoded rung.""" + rebuilt = _rebuild_rung_from_rows(rows) + + if rebuilt.logical_rows != rung.logical_rows: + raise WriterError( + "CSV round-trip validation failed: " + f"logical row count mismatch: expected {rung.logical_rows}, got {rebuilt.logical_rows}" + ) + + if rebuilt.comment != rung.comment: + raise WriterError( + "CSV round-trip validation failed: " + f"comment mismatch: expected {rung.comment!r}, got {rebuilt.comment!r}" + ) + + for row_idx, (expected_row, actual_row) in enumerate( + zip(rung.conditions, rebuilt.conditions, strict=True), + start=1, + ): + for col_idx, (expected, actual) in enumerate(zip(expected_row, actual_row, strict=True)): + if expected != actual: + raise WriterError( + "CSV round-trip validation failed: " + f"condition mismatch at row {row_idx} col {CONDITION_COLUMNS[col_idx]}: " + f"expected {_display_token(expected)}, got {_display_token(actual)}" + ) + + for row_idx, (expected, actual) in enumerate( + zip(rung.instructions, rebuilt.instructions, strict=True), + start=1, + ): + if not _af_tokens_match(expected, actual): + raise WriterError( + "CSV round-trip validation failed: " + f"AF mismatch at row {row_idx}: " + f"expected {_display_token(expected)}, got {_display_token(actual)}" + ) + + def decoded_rung_to_rows(rung: Rung) -> list[list[str]]: """Convert a ``Rung`` to a list of CSV row lists. @@ -94,7 +407,8 @@ def decoded_rung_to_rows(rung: Rung) -> list[list[str]]: Data rows have marker ``"R"`` (first) or ``""`` (continuation). Retained timers produce a ``.reset()`` pin row from the second - grid row. Non-retained timers strip trailing blank padding. + grid row. Multi-row AF families are streamed in order so multiple + pinned/tall blocks survive CSV round-trip without truncation. """ rows: list[list[str]] = [] @@ -103,119 +417,59 @@ def decoded_rung_to_rows(rung: Rung) -> list[list[str]]: for line in rung.comment.split("\n"): rows.append(["#", line]) - # Build working copies for potential stripping. condition_rows = list(rung.conditions) af_tokens = list(rung.instructions) - - # Counters need custom CSV shaping. Native count_down decodes as AF row 0 - # plus a NOP bridge row; when row 0 is truly empty we collapse that spacer - # row in CSV, but preserve any real row-above content. - counter_row = next((idx for idx, af in enumerate(af_tokens) if isinstance(af, Counter)), None) - if counter_row is not None: - counter = cast(Counter, af_tokens[counter_row]) - if len(condition_rows) < 3 or len(af_tokens) < 3: - raise WriterError( - f"{counter.counter_type} requires 3 decoded rows; got {len(condition_rows)}" - ) - - if counter.counter_type == "count_up": - if counter_row != 0: - raise WriterError("count_up must appear on decoded row 0") - - top_conditions = condition_rows[0] - rows.append(["R"] + [_token_to_csv(c) for c in top_conditions] + [counter.to_csv()]) - - if counter.down_enabled: - down_conditions = condition_rows[1] - rows.append([""] + [_token_to_csv(c) for c in down_conditions] + [".down()"]) - - if counter.reset_enabled: - reset_conditions = condition_rows[2] - rows.append([""] + [_token_to_csv(c) for c in reset_conditions] + [".reset()"]) - - return rows - - if counter_row == 0: - if af_tokens[1] != "NOP": - raise WriterError("count_down requires a NOP bridge row in the decoded rung") - elif counter_row != 1: - raise WriterError("count_down must appear on decoded row 0 or row 1") - - top_conditions = condition_rows[0] - bridge_conditions = condition_rows[1] - if _conditions_are_blank(top_conditions): - rows.append(["R"] + [_token_to_csv(c) for c in bridge_conditions] + [counter.to_csv()]) - else: - rows.append(["R"] + [_token_to_csv(c) for c in top_conditions] + [""]) - rows.append([""] + [_token_to_csv(c) for c in bridge_conditions] + [counter.to_csv()]) - - if counter.reset_enabled: - reset_conditions = condition_rows[2] - rows.append([""] + [_token_to_csv(c) for c in reset_conditions] + [".reset()"]) - - return rows - - # --- Determine timer retention / tall padding --- - af0 = rung.instructions[0] if rung.instructions else None - af0_family = get_af_family_for_token(af0) if isinstance(af0, AfInstruction) else None - family_name = af0_family.family_name if af0_family is not None else None - is_retained_timer = family_name == "timer" and isinstance(af0, Timer) and af0.retained - is_tall = isinstance(af0, AfInstruction) and af0.cell_params().get("visual_rows", 1) > 1 - - if family_name == "shift": - shift = cast(Shift, af0) - if len(condition_rows) != 3 or len(af_tokens) != 3: - raise WriterError(f"shift requires 3 decoded rows; got {len(condition_rows)}") - - rows.append(["R"] + [_token_to_csv(c) for c in condition_rows[0]] + [shift.to_csv()]) - rows.append([""] + [_token_to_csv(c) for c in condition_rows[1]] + [".clock()"]) - rows.append([""] + [_token_to_csv(c) for c in condition_rows[2]] + [".reset()"]) - return rows - - if family_name == "drum": - drum = cast(Drum, af0) - if len(condition_rows) < 4 or len(af_tokens) < 4: - raise WriterError(f"Drum requires 4 decoded rows; got {len(condition_rows)}") - - # Row 0: main + drum instruction - rows.append(["R"] + [_token_to_csv(c) for c in condition_rows[0]] + [drum.to_csv()]) - # Row 1: .reset() — always present - rows.append([""] + [_token_to_csv(c) for c in condition_rows[1]] + [".reset()"]) - # Row 2: .jump(target) if enabled - if drum.jump_enabled: - rows.append( - [""] - + [_token_to_csv(c) for c in condition_rows[2]] - + [f".jump({drum.jump_target})"] - ) - # Row 3: .jog() if enabled - if drum.jog_enabled: - rows.append([""] + [_token_to_csv(c) for c in condition_rows[3]] + [".jog()"]) - return rows - - if is_retained_timer: - # Retained timer: the second row becomes a .reset() pin row. - # Keep all rows — the second row carries reset-enable conditions. - pass - elif is_tall: - # Tall instruction (timer/copy): strip trailing blank padding rows. - while len(condition_rows) > 1 and _is_blank_row(condition_rows[-1], af_tokens[-1]): - condition_rows = condition_rows[:-1] - af_tokens = af_tokens[:-1] - - # --- Data rows --- - for i, (conditions, af) in enumerate(zip(condition_rows, af_tokens, strict=True)): - marker = "R" if i == 0 else "" - - # Pin row: retained timer's second row gets .reset() as AF. - if is_retained_timer and i == 1: - af_str = ".reset()" - else: - af_str = _token_to_csv(af) - - cond_strs = [_token_to_csv(c) for c in conditions] - rows.append([marker] + cond_strs + [af_str]) - + data_row_count = 0 + row_idx = 0 + + while row_idx < len(condition_rows): + af = af_tokens[row_idx] + consumed = 1 + + if isinstance(af, AfInstruction): + family = get_af_family_for_token(af) + family_name = family.family_name if family is not None else None + + if family_name == "timer" and isinstance(af, Timer): + data_row_count, consumed = _emit_timer_block( + rows, data_row_count, condition_rows, af_tokens, row_idx, af + ) + row_idx += consumed + continue + + if family_name == "counter" and isinstance(af, Counter): + data_row_count, consumed = _emit_counter_block( + rows, data_row_count, condition_rows, af_tokens, row_idx, af + ) + row_idx += consumed + continue + + if family_name == "shift" and isinstance(af, Shift): + data_row_count, consumed = _emit_shift_block( + rows, data_row_count, condition_rows, af_tokens, row_idx, af + ) + row_idx += consumed + continue + + if family_name == "drum" and isinstance(af, Drum): + data_row_count, consumed = _emit_drum_block( + rows, data_row_count, condition_rows, af_tokens, row_idx, af + ) + row_idx += consumed + continue + + visual_rows = int(af.cell_params().get("visual_rows", 1)) + if visual_rows > 1: + data_row_count, consumed = _emit_generic_tall_block( + rows, data_row_count, condition_rows, af_tokens, row_idx, af + ) + row_idx += consumed + continue + + data_row_count = _append_data_row(rows, data_row_count, condition_rows[row_idx], af) + row_idx += consumed + + _validate_roundtrip(rung, rows) return rows diff --git a/src/laddercodec/decode_program.py b/src/laddercodec/decode_program.py index fa0dcb3..107c57f 100644 --- a/src/laddercodec/decode_program.py +++ b/src/laddercodec/decode_program.py @@ -24,8 +24,10 @@ from .instructions import ( INSTRUCTION_MODULES, RawInstruction, + UnknownInstruction, from_tags_af, from_tags_condition, + get_af_family_for_token, ) from .instructions.comparison import CompareContact from .instructions.contact import Contact @@ -33,7 +35,7 @@ from .instructions.drum import Drum from .instructions.shift import Shift from .instructions.timer import Timer -from .model import Program +from .model import AfInstruction, Program from .topology import CONDITION_COLUMNS as _CONDITION_COLUMNS # --------------------------------------------------------------------------- @@ -925,13 +927,16 @@ def _build_topology_backed_rung( # --------------------------------------------------------------------------- -def _implied_modifier_row_offsets(af: object) -> set[int]: - """Return AF-relative row offsets whose logic path may be omitted in SCR. +def _implied_modifier_row_offsets(af: AfInstruction | UnknownInstruction) -> set[int]: + """Return AF-relative row offsets whose horizontal path may be omitted in SCR. Click can store ``count=0`` or omit continuation-row topology blocks for - some AF modifier rows even when the visible rung still carries logic across - that row. The AF blob does still tell us which visual sub-rows are actual - logic inputs, so we restrict implied dash fill to those known pin rows. + some tall-AF continuation rows even when the visible rung still carries + logic across that row. Pinned families still use hand-tuned offsets so + optional pin rows only opt in when the AF state says that pin is active. + Non-pinned tall families (copy/search/send_receive/raw) expose plain + continuation rows instead, so every visual continuation row can carry logic + when Click suppresses its explicit topology block. """ if isinstance(af, Shift): return {1, 2} @@ -959,6 +964,15 @@ def _implied_modifier_row_offsets(af: object) -> set[int]: rows.add(3) return rows + if isinstance(af, UnknownInstruction): + return set() + + family = get_af_family_for_token(af) + if family is not None and not family.pin_names: + visual_rows = max(1, int(af.cell_params().get("visual_rows", 1))) + if visual_rows > 1: + return set(range(1, visual_rows)) + return set() @@ -1104,8 +1118,9 @@ def _build_rung( if 0 < row < logical_rows: implied_modifier_rows.add(row) - # 4. Targeted fallback for modifier rows whose topology Click omits from - # SCR even though the AF blob says that visual sub-row accepts logic. + # 4. Fallback for logic-carrying continuation rows whose topology Click + # omits from SCR. Rebuild the implied horizontal path and merge any + # pre-parsed wire_down markers into T-junctions. for row in sorted(implied_modifier_rows): explicit_right_wires = row - 1 < len(extra_rows_right_wires) and bool( extra_rows_right_wires[row - 1] @@ -1113,19 +1128,20 @@ def _build_rung( if explicit_right_wires: continue - rightmost = -1 + leftmost = None for col in range(_CONDITION_COLUMNS): if conditions[row][col] != "": - rightmost = col - if rightmost < 0: + leftmost = col + break + if leftmost is None: continue - for col in range(rightmost + 1, _CONDITION_COLUMNS): - if conditions[row][col] != "": - continue - if row > 0 and conditions[row - 1][col] == "|": - continue - conditions[row][col] = "-" + for col in range(leftmost, _CONDITION_COLUMNS): + cell = conditions[row][col] + if cell == "": + conditions[row][col] = "-" + elif cell == "|": + conditions[row][col] = "T" return Rung( logical_rows=logical_rows, diff --git a/src/laddercodec/instructions/home.py b/src/laddercodec/instructions/home.py index d3ee9b4..738936f 100644 --- a/src/laddercodec/instructions/home.py +++ b/src/laddercodec/instructions/home.py @@ -22,7 +22,7 @@ _TYPE_CODE = 0x2734 _CLASS_NAME = "Home" -_FIELD_COUNT = 21 +_FIELD_COUNT = 22 # --------------------------------------------------------------------------- # Shared field extraction diff --git a/src/laddercodec/instructions/position.py b/src/laddercodec/instructions/position.py index ef7385e..21e9a67 100644 --- a/src/laddercodec/instructions/position.py +++ b/src/laddercodec/instructions/position.py @@ -22,7 +22,7 @@ _TYPE_CODE = 0x2736 _CLASS_NAME = "Position" -_FIELD_COUNT = 18 +_FIELD_COUNT = 19 # --------------------------------------------------------------------------- # Shared field extraction diff --git a/tests/csv/test_converter.py b/tests/csv/test_converter.py index 9e02846..c46d77b 100644 --- a/tests/csv/test_converter.py +++ b/tests/csv/test_converter.py @@ -22,7 +22,6 @@ af_node_to_token, condition_node_to_token, convert_rung, - strip_tall_padding, ) from laddercodec.csv.parser import parse_csv_file from laddercodec.instructions import ( @@ -34,6 +33,7 @@ Next, RawInstruction, Return, + Search, Shift, Timer, ) @@ -134,6 +134,48 @@ def test_no_autopad_when_user_provides_two_rows(self, tmp_path: Path) -> None: assert all(c == "-" for c in conds[1]) # user wires preserved +class TestGenericTallRows: + def test_search_inserts_blank_continuation_before_following_af(self, tmp_path: Path) -> None: + csv_path = tmp_path / "main.csv" + _write_csv( + csv_path, + [ + ("R", _wire_row("C10"), "search(DS72..DS81 == DS71,result=DS82,found=C81)"), + ("", _wire_row("C11"), "out(Y001)"), + ], + ) + + rung = parse_csv_file(csv_path).rungs[0] + lr, conds, afs, _ = convert_rung(rung) + + assert lr == 3 + assert isinstance(afs[0], Search) + assert afs[1] == "" + assert isinstance(afs[2], Coil) + assert all(c == "" for c in conds[1]) + assert isinstance(conds[2][0], Contact) + assert conds[2][0].operand == "C11" + + def test_search_later_in_rung_auto_adds_trailing_blank_row(self, tmp_path: Path) -> None: + csv_path = tmp_path / "main.csv" + _write_csv( + csv_path, + [ + ("R", _wire_row("C12"), "out(Y002)"), + ("", _wire_row("C13"), "search(DS90..DS99 == DS89,result=DS100,found=C14)"), + ], + ) + + rung = parse_csv_file(csv_path).rungs[0] + lr, conds, afs, _ = convert_rung(rung) + + assert lr == 3 + assert isinstance(afs[0], Coil) + assert isinstance(afs[1], Search) + assert afs[2] == "" + assert all(c == "" for c in conds[2]) + + class TestPinRows: def test_reset_makes_timer_retentive(self, tmp_path: Path) -> None: csv_path = tmp_path / "main.csv" @@ -207,57 +249,6 @@ def test_unknown_pin_raises(self, tmp_path: Path) -> None: convert_rung(rung) -class TestStripTallPadding: - def test_strips_blank_trailing_row(self) -> None: - timer = Timer("on_delay", "T1", "TD1", "1000", "Tms") - conds = [ - [Contact(InstructionType.CONTACT_NO, "X001")] + ["-"] * 30, - [""] * 31, - ] - afs: list[object] = [timer, ""] - lr, new_conds, new_afs = strip_tall_padding(2, conds, afs) - assert lr == 1 - assert len(new_conds) == 1 - assert len(new_afs) == 1 - - def test_keeps_row_with_wires(self) -> None: - timer = Timer("on_delay", "T1", "TD1", "1000", "Tms") - conds = [ - [Contact(InstructionType.CONTACT_NO, "X001")] + ["-"] * 30, - ["-"] * 31, - ] - afs: list[object] = [timer, ""] - lr, new_conds, new_afs = strip_tall_padding(2, conds, afs) - assert lr == 2 # kept because row has wires - - def test_keeps_row_with_contacts(self) -> None: - timer = Timer("on_delay", "T1", "TD1", "1000", "Tms", retained=True) - conds = [ - [Contact(InstructionType.CONTACT_NO, "X001")] + ["-"] * 30, - [Contact(InstructionType.CONTACT_NO, "X002")] + ["-"] * 30, - ] - afs: list[object] = [timer, ""] - lr, new_conds, new_afs = strip_tall_padding(2, conds, afs) - assert lr == 2 - - def test_no_strip_for_non_timer(self) -> None: - coil = Coil(InstructionType.COIL_OUT, "Y001") - conds = [ - [Contact(InstructionType.CONTACT_NO, "X001")] + ["-"] * 30, - [""] * 31, - ] - afs: list[object] = [coil, ""] - lr, new_conds, new_afs = strip_tall_padding(2, conds, afs) - assert lr == 2 # not a tall instruction, no strip - - def test_single_row_passthrough(self) -> None: - timer = Timer("on_delay", "T1", "TD1", "1000", "Tms") - conds = [[Contact(InstructionType.CONTACT_NO, "X001")] + ["-"] * 30] - afs: list[object] = [timer] - lr, new_conds, new_afs = strip_tall_padding(1, conds, afs) - assert lr == 1 - - class TestConditionNodeToToken: def test_blank(self) -> None: assert condition_node_to_token(BlankCondition()) == "" diff --git a/tests/csv/test_writer.py b/tests/csv/test_writer.py index 2226b4e..cb39a67 100644 --- a/tests/csv/test_writer.py +++ b/tests/csv/test_writer.py @@ -8,8 +8,10 @@ from laddercodec import ( Coil, + CompareContact, Contact, Counter, + Drum, ForLoop, Next, Rung, @@ -21,9 +23,18 @@ ) from laddercodec.csv.writer import ( WriterError, + _validate_roundtrip, decoded_rung_to_rows, ) -from laddercodec.instructions import UnknownInstruction +from laddercodec.instructions import ( + Copy, + ModbusRtuTarget, + RawInstruction, + Receive, + Search, + Send, + UnknownInstruction, +) from laddercodec.model import InstructionType GOLDEN_DIR = Path(__file__).resolve().parent.parent / "fixtures" / "ladder_captures" / "golden" @@ -40,6 +51,82 @@ def _golden_paths() -> list[Path]: return [b for b in bins if b.with_suffix(".csv").exists()] +def _blank_conditions() -> list[object]: + """Return one all-blank decoded condition row.""" + return [""] * 31 + + +def _contact_row(operand: str) -> list[object]: + """Return a simple NO-contact row with trailing wires.""" + return [Contact(InstructionType.CONTACT_NO, operand)] + ["-"] * 30 + + +def _edge_row(operand: str) -> list[object]: + """Return a simple rising-edge contact row with trailing wires.""" + return [Contact(InstructionType.CONTACT_EDGE, operand, edge_kind="rise")] + ["-"] * 30 + + +def _generic_tall_af_cases() -> list[pytest.ParameterSet]: + """Return representative non-pinned tall AF instructions.""" + return [ + pytest.param(Copy("DS7", "DS8"), 2, id="copy"), + pytest.param(Search("DS72", "DS81", "DS71", "DS82", "C81", "=="), 2, id="search"), + pytest.param( + Send( + ModbusRtuTarget("rtu", "cpu2", 5), + "DS1", + "DS1", + 1, + "C1", + "C2", + "C3", + "DS100", + ), + 3, + id="send", + ), + pytest.param( + Receive( + ModbusRtuTarget("rtu", "cpu2", 5), + "DS1", + "DS1", + 1, + "C1", + "C2", + "C3", + "DS100", + ), + 3, + id="receive", + ), + pytest.param( + RawInstruction.from_csv_token("raw(X,0x2711,3,0000=)"), + 3, + id="raw", + ), + ] + + +def _search_continuation_row() -> list[object]: + """Return a nonblank continuation row with comparison and wire geometry.""" + return [CompareContact("==", "DS300", "1", wire_down=True), "T", "|"] + [""] * 28 + + +def _event_drum() -> Drum: + """Return a compact event drum that exercises reset/jump/jog pin rows.""" + return Drum( + drum_kind="event", + outputs=["Y10", "Y11"], + events_or_presets=["C10", "C11"], + pattern=[[1, 0], [0, 1]], + current_step="DS10", + completion_flag="C12", + jog_enabled=True, + jump_enabled=True, + jump_target="DS11", + ) + + # --------------------------------------------------------------------------- # Round-trip: golden.bin → decode → CSV → read_csv → compare # --------------------------------------------------------------------------- @@ -306,6 +393,78 @@ def test_count_down_preserves_wire_only_top_row(self) -> None: assert rows[1][1] == "rise(C78)" assert rows[2][32] == ".reset()" + def test_count_up_then_retained_timer_emits_both_blocks(self) -> None: + counter = Counter( + counter_type="count_up", + done_bit="CT5", + current="CTD5", + preset="100", + down_enabled=False, + reset_enabled=True, + ) + timer = Timer("on_delay", "T5", "TD5", "10", "Tm", retained=True) + rung = Rung( + logical_rows=5, + conditions=[ + _edge_row("C81"), + _blank_conditions(), + _contact_row("C82"), + _contact_row("C83"), + _contact_row("C84"), + ], + instructions=[counter, "", "", timer, ""], + comment_rtf=None, + comment=None, + ) + + rows = decoded_rung_to_rows(rung) + assert len(rows) == 4 + assert [row[32] for row in rows] == [ + "count_up(CT5,CTD5,preset=100)", + ".reset()", + "on_delay(T5,TD5,preset=10,unit=Tm)", + ".reset()", + ] + assert rows[1][1] == "C82" + assert rows[3][1] == "C84" + + def test_count_down_then_retained_timer_preserves_bridge_shape(self) -> None: + counter = Counter( + counter_type="count_down", + done_bit="CT6", + current="CTD6", + preset="25", + down_enabled=False, + reset_enabled=True, + ) + timer = Timer("on_delay", "T6", "TD6", "20", "Ts", retained=True) + rung = Rung( + logical_rows=5, + conditions=[ + _contact_row("C85"), + _edge_row("C86"), + _contact_row("C87"), + _contact_row("C88"), + _contact_row("C89"), + ], + instructions=[counter, "NOP", "", timer, ""], + comment_rtf=None, + comment=None, + ) + + rows = decoded_rung_to_rows(rung) + assert len(rows) == 5 + assert [row[32] for row in rows] == [ + "", + "count_down(CT6,CTD6,preset=25)", + ".reset()", + "on_delay(T6,TD6,preset=20,unit=Ts)", + ".reset()", + ] + assert rows[0][1] == "C85" + assert rows[1][1] == "rise(C86)" + assert rows[4][1] == "C89" + def test_shift_emits_clock_and_reset_pins(self) -> None: shift = Shift("C99", "C106") rung = Rung( @@ -326,6 +485,34 @@ def test_shift_emits_clock_and_reset_pins(self) -> None: assert rows[1][32] == ".clock()" assert rows[2][32] == ".reset()" + def test_shift_then_timer_preserves_nonblank_timer_continuation(self) -> None: + shift = Shift("C99", "C106") + timer = Timer("on_delay", "T7", "TD7", "30", "Tms", retained=False) + rung = Rung( + logical_rows=5, + conditions=[ + _contact_row("C90"), + _contact_row("C91"), + _contact_row("C92"), + _contact_row("C93"), + _contact_row("C94"), + ], + instructions=[shift, "", "", timer, ""], + comment_rtf=None, + comment=None, + ) + + rows = decoded_rung_to_rows(rung) + assert len(rows) == 5 + assert [row[32] for row in rows] == [ + "shift(C99..C106)", + ".clock()", + ".reset()", + "on_delay(T7,TD7,preset=30,unit=Tms)", + "", + ] + assert rows[4][1] == "C94" + def test_shift_requires_three_rows(self) -> None: shift = Shift("C99", "C106") rung = Rung( @@ -389,3 +576,297 @@ def test_unknown_instruction_raises(self) -> None: ) with pytest.raises(WriterError): decoded_rung_to_rows(rung) + + +class TestGenericTallRoundTrip: + @pytest.mark.parametrize(("af", "visual_rows"), _generic_tall_af_cases()) + def test_later_tall_block_at_end_roundtrips( + self, + tmp_path: Path, + af: object, + visual_rows: int, + ) -> None: + lead = Coil(InstructionType.COIL_OUT, "Y900") + rung = Rung( + logical_rows=visual_rows + 1, + conditions=[ + _contact_row("C200"), + _contact_row("C201"), + *[_blank_conditions() for _ in range(visual_rows - 1)], + ], + instructions=[lead, af, *([""] * (visual_rows - 1))], + comment_rtf=None, + comment=None, + ) + + rows = decoded_rung_to_rows(rung) + assert len(rows) == 2 + assert [row[32] for row in rows] == ["out(Y900)", af.to_csv()] + + out_csv = tmp_path / f"later-tall-end-{type(af).__name__}.csv" + write_csv(out_csv, [rung]) + [round_tripped] = read_csv(out_csv) + + assert round_tripped.logical_rows == rung.logical_rows + assert round_tripped.conditions == rung.conditions + assert round_tripped.instructions == rung.instructions + + @pytest.mark.parametrize(("af", "visual_rows"), _generic_tall_af_cases()) + def test_later_tall_block_before_following_af_roundtrips( + self, + tmp_path: Path, + af: object, + visual_rows: int, + ) -> None: + lead = Coil(InstructionType.COIL_OUT, "Y901") + tail = Coil(InstructionType.COIL_OUT, "Y902") + rung = Rung( + logical_rows=visual_rows + 2, + conditions=[ + _contact_row("C210"), + _contact_row("C211"), + *[_blank_conditions() for _ in range(visual_rows - 1)], + _contact_row("C212"), + ], + instructions=[lead, af, *([""] * (visual_rows - 1)), tail], + comment_rtf=None, + comment=None, + ) + + rows = decoded_rung_to_rows(rung) + assert len(rows) == 3 + assert [row[32] for row in rows] == ["out(Y901)", af.to_csv(), "out(Y902)"] + + out_csv = tmp_path / f"later-tall-mid-{type(af).__name__}.csv" + write_csv(out_csv, [rung]) + [round_tripped] = read_csv(out_csv) + + assert round_tripped.logical_rows == rung.logical_rows + assert round_tripped.conditions == rung.conditions + assert round_tripped.instructions == rung.instructions + + +class TestMultiPinnedRoundTrip: + def test_count_up_then_retained_timer_roundtrip(self, tmp_path: Path) -> None: + counter = Counter( + counter_type="count_up", + done_bit="CT8", + current="CTD8", + preset="100", + down_enabled=False, + reset_enabled=True, + ) + timer = Timer("on_delay", "T8", "TD8", "10", "Tm", retained=True) + rung = Rung( + logical_rows=5, + conditions=[ + _edge_row("C95"), + _blank_conditions(), + _contact_row("C96"), + _contact_row("C97"), + _contact_row("C98"), + ], + instructions=[counter, "", "", timer, ""], + comment_rtf=None, + comment=None, + ) + + out_csv = tmp_path / "count-up-timer.csv" + write_csv(out_csv, [rung]) + [round_tripped] = read_csv(out_csv) + + assert round_tripped.logical_rows == 5 + assert isinstance(round_tripped.instructions[0], Counter) + assert isinstance(round_tripped.instructions[3], Timer) + assert round_tripped.instructions[3].retained is True + assert isinstance(round_tripped.conditions[2][0], Contact) + assert round_tripped.conditions[2][0].operand == "C96" + assert isinstance(round_tripped.conditions[4][0], Contact) + assert round_tripped.conditions[4][0].operand == "C98" + + def test_shift_then_timer_roundtrip_preserves_blank_timer_row(self, tmp_path: Path) -> None: + shift = Shift("C120", "C127") + timer = Timer("on_delay", "T9", "TD9", "50", "Tms", retained=False) + rung = Rung( + logical_rows=5, + conditions=[ + _contact_row("C99"), + _contact_row("C100"), + _contact_row("C101"), + _contact_row("C102"), + _contact_row("C103"), + ], + instructions=[shift, "", "", timer, ""], + comment_rtf=None, + comment=None, + ) + + out_csv = tmp_path / "shift-timer.csv" + write_csv(out_csv, [rung]) + [round_tripped] = read_csv(out_csv) + + assert round_tripped.logical_rows == 5 + assert isinstance(round_tripped.instructions[0], Shift) + assert isinstance(round_tripped.instructions[3], Timer) + assert round_tripped.instructions[3].retained is False + assert round_tripped.instructions[4] == "" + assert isinstance(round_tripped.conditions[4][0], Contact) + assert round_tripped.conditions[4][0].operand == "C103" + + +class TestRoundTripValidator: + def test_accepts_retained_timer(self) -> None: + rung = Rung( + logical_rows=2, + conditions=[ + _contact_row("C300"), + _contact_row("C301"), + ], + instructions=[Timer("on_delay", "T30", "TD30", "10", "Tm", retained=True), ""], + comment_rtf=None, + comment=None, + ) + + _validate_roundtrip(rung, decoded_rung_to_rows(rung)) + + def test_accepts_count_down_bridge_shape(self) -> None: + rung = Rung( + logical_rows=3, + conditions=[ + _contact_row("C302"), + _edge_row("C303"), + _contact_row("C304"), + ], + instructions=[ + Counter( + counter_type="count_down", + done_bit="CT30", + current="CTD30", + preset="50", + down_enabled=False, + reset_enabled=True, + ), + "NOP", + "", + ], + comment_rtf=None, + comment=None, + ) + + _validate_roundtrip(rung, decoded_rung_to_rows(rung)) + + def test_accepts_generic_tall_continuation_with_comparison_and_t(self) -> None: + rung = Rung( + logical_rows=2, + conditions=[ + _contact_row("C305"), + _search_continuation_row(), + ], + instructions=[Search("DS72", "DS81", "DS71", "DS82", "C81", "=="), ""], + comment_rtf=None, + comment=None, + ) + + _validate_roundtrip(rung, decoded_rung_to_rows(rung)) + + def test_accepts_drum_pin_rows(self) -> None: + rung = Rung( + logical_rows=4, + conditions=[ + _contact_row("C306"), + _contact_row("C307"), + _contact_row("C308"), + _contact_row("C309"), + ], + instructions=[_event_drum(), "", "", ""], + comment_rtf=None, + comment=None, + ) + + _validate_roundtrip(rung, decoded_rung_to_rows(rung)) + + def test_raises_on_missing_retained_timer_reset_pin(self) -> None: + rung = Rung( + logical_rows=2, + conditions=[ + _contact_row("C310"), + _contact_row("C311"), + ], + instructions=[Timer("on_delay", "T31", "TD31", "20", "Tm", retained=True), ""], + comment_rtf=None, + comment=None, + ) + + rows = decoded_rung_to_rows(rung) + rows[1][32] = "" + + with pytest.raises(WriterError, match=r"AF mismatch at row 1"): + _validate_roundtrip(rung, rows) + + def test_raises_on_missing_count_down_top_row_contact(self) -> None: + rung = Rung( + logical_rows=3, + conditions=[ + _contact_row("C312"), + _edge_row("C313"), + _contact_row("C314"), + ], + instructions=[ + Counter( + counter_type="count_down", + done_bit="CT31", + current="CTD31", + preset="75", + down_enabled=False, + reset_enabled=True, + ), + "NOP", + "", + ], + comment_rtf=None, + comment=None, + ) + + rows = decoded_rung_to_rows(rung) + rows[0][1] = "" + + with pytest.raises(WriterError, match=r"condition mismatch at row 1 col A"): + _validate_roundtrip(rung, rows) + + def test_raises_on_missing_generic_tall_continuation_comparison(self) -> None: + rung = Rung( + logical_rows=2, + conditions=[ + _contact_row("C315"), + _search_continuation_row(), + ], + instructions=[Search("DS90", "DS99", "DS89", "DS100", "C316", "=="), ""], + comment_rtf=None, + comment=None, + ) + + rows = decoded_rung_to_rows(rung) + rows[1][1] = "" + + with pytest.raises(WriterError, match=r"condition mismatch at row 2 col A"): + _validate_roundtrip(rung, rows) + + def test_raises_on_missing_drum_jump_pin(self) -> None: + rung = Rung( + logical_rows=4, + conditions=[ + _contact_row("C317"), + _contact_row("C318"), + _contact_row("C319"), + _contact_row("C320"), + ], + instructions=[_event_drum(), "", "", ""], + comment_rtf=None, + comment=None, + ) + + rows = decoded_rung_to_rows(rung) + rows[2][32] = "" + + with pytest.raises(WriterError, match=r"AF mismatch at row 1"): + _validate_roundtrip(rung, rows) diff --git a/tests/csv/test_writer_raw_validator.py b/tests/csv/test_writer_raw_validator.py new file mode 100644 index 0000000..ac75261 --- /dev/null +++ b/tests/csv/test_writer_raw_validator.py @@ -0,0 +1,68 @@ +from __future__ import annotations + +import pytest + +from laddercodec import Rung +from laddercodec.csv.writer import _validate_roundtrip, decoded_rung_to_rows +from laddercodec.instructions import Email, Home, Position, Velocity + + +@pytest.mark.parametrize( + "instruction", + [ + pytest.param( + Email( + tag_60a5="1", + tag_6235="TEST", + tag_6236="TEST", + tag_6217="TEST", + tag_622a="TEST", + tag_607c="C1", + tag_607b="C2", + tag_607d="C3", + tag_6083="DS1", + ), + id="email", + ), + pytest.param( + Home( + tag_6096="1", + tag_6097="1", + tag_609f="X004", + tag_60a0="X005", + tag_609c="1", + tag_609d="1", + ), + id="home", + ), + pytest.param( + Position( + tag_6098="1", + tag_609b="1", + tag_609c="1", + tag_609d="1", + ), + id="position", + ), + pytest.param( + Velocity( + tag_609b="1", + tag_609c="1", + tag_609d="1", + ), + id="velocity", + ), + ], +) +def test_validate_roundtrip_accepts_equivalent_raw_instruction( + instruction: Email | Home | Position | Velocity, +) -> None: + rung = Rung( + logical_rows=1, + conditions=[["-"] * 31], + instructions=[instruction], + comment=None, + comment_rtf=None, + ) + + _validate_roundtrip(rung, decoded_rung_to_rows(rung)) diff --git a/tests/fixtures/ladder_captures/golden/instr-6row-multi-output.bin b/tests/fixtures/ladder_captures/golden/instr-6row-multi-output.bin index ec296d3..7069255 100644 Binary files a/tests/fixtures/ladder_captures/golden/instr-6row-multi-output.bin and b/tests/fixtures/ladder_captures/golden/instr-6row-multi-output.bin differ diff --git a/tests/fixtures/ladder_captures/golden/instr-coil-timer-ret.bin b/tests/fixtures/ladder_captures/golden/instr-coil-timer-ret.bin new file mode 100644 index 0000000..811c149 Binary files /dev/null and b/tests/fixtures/ladder_captures/golden/instr-coil-timer-ret.bin differ diff --git a/tests/fixtures/ladder_captures/golden/instr-coil-timer-ret.csv b/tests/fixtures/ladder_captures/golden/instr-coil-timer-ret.csv new file mode 100644 index 0000000..e018a69 --- /dev/null +++ b/tests/fixtures/ladder_captures/golden/instr-coil-timer-ret.csv @@ -0,0 +1,7 @@ +marker,A,B,C,D,E,F,G,H,I,J,K,L,M,N,O,P,Q,R,S,T,U,V,W,X,Y,Z,AA,AB,AC,AD,AE,AF +#,"with Rung(any_of(all_of(X005, X001 | X002, X003 | X004), all_of(X001, X003), all_of(X002, X004))):" +#, out(Y001) +#," on_delay(T1, TD1, preset=1, unit=Tms).reset(any_of(all_of(X005, X001 | X002, X003 | X004), all_of(X001, X003), all_of(X002, X004)))" +R,X001,T,X003,T,T,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,out(Y001) +,,X005,|,|,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,"on_delay(T1,TD1,preset=1,unit=Tms)" +,X002,-,X004,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,.reset() diff --git a/tests/fixtures/ladder_captures/golden/instr-copy-multirow-2af.bin b/tests/fixtures/ladder_captures/golden/instr-copy-multirow-2af.bin new file mode 100644 index 0000000..3e1476c Binary files /dev/null and b/tests/fixtures/ladder_captures/golden/instr-copy-multirow-2af.bin differ diff --git a/tests/fixtures/ladder_captures/golden/instr-copy-multirow-2af.csv b/tests/fixtures/ladder_captures/golden/instr-copy-multirow-2af.csv new file mode 100644 index 0000000..c0c9783 --- /dev/null +++ b/tests/fixtures/ladder_captures/golden/instr-copy-multirow-2af.csv @@ -0,0 +1,3 @@ +marker,A,B,C,D,E,F,G,H,I,J,K,L,M,N,O,P,Q,R,S,T,U,V,W,X,Y,Z,AA,AB,AC,AD,AE,AF +R,C1,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,"copy(1,DS1)" +,C2,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,-,"copy(1,DS1)" diff --git a/tests/fixtures/ladder_captures/golden/verify_progress.log b/tests/fixtures/ladder_captures/golden/verify_progress.log index 8ded309..10e4753 100644 --- a/tests/fixtures/ladder_captures/golden/verify_progress.log +++ b/tests/fixtures/ladder_captures/golden/verify_progress.log @@ -8,9 +8,6 @@ cmt-32row-sparse: worked cmt-3row-mixed: worked instr-2row-cond__and: worked instr-2row-r0only: worked -instr-2row-spread: worked -instr-3row-branch: worked -instr-3row-scatter: worked instr-compare-ge: worked instr-compare-series: worked instr-compare-timer: worked @@ -31,13 +28,18 @@ nc-1row-fullwire-nop: worked nc-32row-empty: worked nc-3row-t-junction: worked nc-4row-vertical: worked -instr-3row-or: worked -instr-3row-or-colA: worked mr-cmt-3rung-multirow: worked mr-cmt-3rung-multirow-multiinstructions: worked mr-cmt-3rung-multirow-nopaddedrows: worked mr-instr-2rung: worked mr-instr-cond__and: worked -instr-timer-ret: worked instr-count_down-row0-data: worked +instr-2row-spread: worked +instr-3row-branch: worked +instr-3row-or-colA: worked +instr-3row-or: worked +instr-3row-scatter: worked +instr-timer-ret: worked +instr-coil-timer-ret: worked instr-6row-multi-output: worked +instr-copy-multirow-2af: worked diff --git a/tests/ladder/test_decode_program.py b/tests/ladder/test_decode_program.py index ee594c0..7e93d87 100644 --- a/tests/ladder/test_decode_program.py +++ b/tests/ladder/test_decode_program.py @@ -10,6 +10,7 @@ from laddercodec.decode_program import ( _find_row_topology_block, _find_sections, + _implied_modifier_row_offsets, _parse_extra_row_right_wires, _parse_header, _parse_scr_tags, @@ -18,10 +19,15 @@ decode_program, ) from laddercodec.instructions import from_tags_af +from laddercodec.instructions.contact import Contact from laddercodec.instructions.home import from_tags as home_from_tags from laddercodec.instructions.math import Math from laddercodec.instructions.position import from_tags as position_from_tags +from laddercodec.instructions.raw import RawInstruction, _decompose_blob, _fields_to_tag_dicts +from laddercodec.instructions.search import Search +from laddercodec.instructions.send_receive import ModbusRtuTarget, Receive from laddercodec.instructions.timer import Timer +from laddercodec.model import InstructionType decode_program_module = importlib.import_module("laddercodec.decode_program") _SCR_FIXTURE_DIR = Path(__file__).resolve().parents[1] / "fixtures" / "scr_captures" @@ -165,6 +171,23 @@ def _compact_scr_variant_u16_field(tag: int, entries: dict[int, int]) -> bytes: return bytes(out) +def _section_instruction_from_token(row: int, col: int, token) -> tuple: + blob = token.build_blob() + class_name, type_code, part_count, _extra_bytes, fields = _decompose_blob(blob) + tags, tag_byte_lens, variant_u16_tags, variant_string_tags = _fields_to_tag_dicts(fields) + return ( + row, + col, + class_name, + type_code, + tags, + part_count, + tag_byte_lens, + variant_u16_tags, + variant_string_tags, + ) + + def test_decode_program_matches_or_topology_fixture(): clip_rungs, scr_rungs = _load_fixture_pair("or_topology") @@ -277,6 +300,117 @@ def test_parse_wiredown_uses_explicit_row_indices(): assert _parse_wiredown(data, 0, len(data)) == {1: (1, 2, 3, 4, 5)} +def test_implied_modifier_row_offsets_include_generic_tall_af_continuations(): + receive = Receive( + target=ModbusRtuTarget(name="rtu", com_port="cpu2", device_id=1), + remote_start="DS1", + dest="DS10", + quantity=1, + receiving="", + success="", + error="", + exception_response="", + ) + + assert _implied_modifier_row_offsets(Search("DS2", "DS11", "DS1", "DS12", "C1", "==")) == {1} + assert _implied_modifier_row_offsets(receive) == {1, 2} + assert _implied_modifier_row_offsets( + RawInstruction(class_name="Mystery", blob=b"", part_count=4) + ) == { + 1, + 2, + 3, + } + + +def test_build_topology_backed_rung_reconstructs_omitted_generic_tall_row_with_junction(): + topology_block = decode_program_module._ScrRowTopologyBlock( + start=0, + row_word=3, + prelude=b"", + leading_rows_right_wires=[], + row0_flag_count=0, + row0_flags={}, + flags_start=0, + continuation_start=0, + ) + section_instructions = [ + _section_instruction_from_token( + 0, + 31, + Search("DS2", "DS11", "DS1", "DS12", "C1", "=="), + ), + _section_instruction_from_token( + 1, + 0, + Contact(InstructionType.CONTACT_NO, "C10"), + ), + ] + data = bytes([0x20, 0x00, 0x00, 0x00, 0x01, 0x00, 0x02]) + + rung = decode_program_module._build_topology_backed_rung( + data=data, + topology_block=topology_block, + rung_end=len(data), + logical_rows=2, + section_instructions=section_instructions, + comment=None, + comment_rtf=None, + ) + + assert isinstance(rung.conditions[1][0], Contact) + assert rung.conditions[1][0].operand == "C10" + assert rung.conditions[1][1] == "T" + assert all(cell == "-" for cell in rung.conditions[1][2:]) + + +def test_build_topology_backed_rung_reconstructs_all_omitted_receive_rows(): + topology_block = decode_program_module._ScrRowTopologyBlock( + start=0, + row_word=4, + prelude=b"", + leading_rows_right_wires=[], + row0_flag_count=0, + row0_flags={}, + flags_start=0, + continuation_start=0, + ) + receive = Receive( + target=ModbusRtuTarget(name="rtu", com_port="cpu2", device_id=1), + remote_start="DS1", + dest="DS10", + quantity=1, + receiving="", + success="", + error="", + exception_response="", + ) + section_instructions = [ + _section_instruction_from_token(0, 31, receive), + _section_instruction_from_token(1, 0, Contact(InstructionType.CONTACT_NO, "C11")), + _section_instruction_from_token(2, 2, Contact(InstructionType.CONTACT_NO, "C12")), + ] + data = b"\x20\x00" + + rung = decode_program_module._build_topology_backed_rung( + data=data, + topology_block=topology_block, + rung_end=len(data), + logical_rows=3, + section_instructions=section_instructions, + comment=None, + comment_rtf=None, + ) + + assert isinstance(rung.instructions[0], Receive) + assert isinstance(rung.conditions[1][0], Contact) + assert all(cell == "-" for cell in rung.conditions[1][1:]) + assert rung.conditions[2][0] == "" + assert rung.conditions[2][1] == "" + assert isinstance(rung.conditions[2][2], Contact) + assert all(cell == "-" for cell in rung.conditions[2][3:]) + + def test_parse_scr_tags_handles_compact_home_raw_fields(): raw = _compact_scr_blob( "Home",