From be9f7cf703544af288605264a5714fa4ac9644e1 Mon Sep 17 00:00:00 2001 From: ssweber <57631333+ssweber@users.noreply.github.com> Date: Wed, 8 Apr 2026 14:58:33 -0400 Subject: [PATCH 1/4] fix: wire hydration for instruction wire_down and SCR padding rows Two bugs caused CSV round-trip failures on rungs with tall AF instructions (Copy, etc.) whose continuation rows carried only vertical pass-through wires (|): 1. decode_program: The implied modifier row fallback filled horizontal wires across pure | pass-through rows, converting | to T and blanks to -. Now skips rows containing only vertical wires. 2. converter: _hydrate_wire_continuations only checked for bare "T"/"|" string tokens, missing Contact/CompareContact objects with wire_down=True. Now recognizes instruction objects that propagate wires downward. Co-Authored-By: Claude Opus 4.6 --- src/laddercodec/csv/converter.py | 14 ++++++++++---- src/laddercodec/decode_program.py | 15 ++++++++++----- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/src/laddercodec/csv/converter.py b/src/laddercodec/csv/converter.py index 960c87f..bd9bfd5 100644 --- a/src/laddercodec/csv/converter.py +++ b/src/laddercodec/csv/converter.py @@ -130,9 +130,10 @@ def _hydrate_wire_continuations(condition_rows: list[list[ConditionToken]]) -> N After block finalization and auto-padding, padding rows are all-blank. This pass restores vertical wire continuity: wherever row *R* has a - ``T`` (junction-down) or ``|`` (vertical pass-through), row *R+1* - gets a ``|`` if the cell is currently blank — unless a horizontal wire - from the left already connects to that junction. + ``T`` (junction-down), ``|`` (vertical pass-through), or a condition + instruction with ``wire_down=True``, row *R+1* gets a ``|`` if the + cell is currently blank — unless a horizontal wire from the left + already connects to that junction. A vertical wire terminates when ``(R+1, C-1)`` carries a horizontal path (anything other than ``""`` or ``"|"``), because the horizontal @@ -147,7 +148,12 @@ def _hydrate_wire_continuations(condition_rows: list[list[ConditionToken]]) -> N for row_idx in range(len(condition_rows) - 2): for col_idx in range(len(condition_rows[row_idx])): token = condition_rows[row_idx][col_idx] - if token in ("T", "|") and condition_rows[row_idx + 1][col_idx] == "": + # A cell propagates | downward if it's a T/| wire token, + # or a Contact/CompareContact with wire_down=True. + propagates_down = token in ("T", "|") or ( + hasattr(token, "wire_down") and token.wire_down + ) + if propagates_down and condition_rows[row_idx + 1][col_idx] == "": # Check if a horizontal wire from the left already connects here. if col_idx > 0: left = condition_rows[row_idx + 1][col_idx - 1] diff --git a/src/laddercodec/decode_program.py b/src/laddercodec/decode_program.py index 2493629..69e4922 100644 --- a/src/laddercodec/decode_program.py +++ b/src/laddercodec/decode_program.py @@ -1075,7 +1075,8 @@ def _build_rung( # 4. Fallback for logic-carrying continuation rows whose topology Click # omits from SCR. Rebuild the implied horizontal path and merge any - # pre-parsed wire_down markers into T-junctions. + # pre-parsed wire_down markers into T-junctions. Skip rows that + # contain only vertical pass-through wires (no logic path). for row in sorted(implied_modifier_rows): explicit_right_wires = row - 1 < len(extra_rows_right_wires) and bool( extra_rows_right_wires[row - 1] @@ -1084,11 +1085,15 @@ def _build_rung( continue leftmost = None + has_non_vertical = False for col in range(_CONDITION_COLUMNS): - if conditions[row][col] != "": - leftmost = col - break - if leftmost is None: + cell = conditions[row][col] + if cell != "": + if leftmost is None: + leftmost = col + if cell != "|": + has_non_vertical = True + if leftmost is None or not has_non_vertical: continue for col in range(leftmost, _CONDITION_COLUMNS): From 94bc9dad3289c39d98e71ee57813979e06cdc265 Mon Sep 17 00:00:00 2001 From: ssweber <57631333+ssweber@users.noreply.github.com> Date: Wed, 8 Apr 2026 15:14:00 -0400 Subject: [PATCH 2/4] feat: add CSV comparison devtool for decode_program validation Compares {slug}.csv against {slug}.clipboard.csv in a directory, reporting identical/differ/missing pairs. Co-Authored-By: Claude Opus 4.6 --- devtools/compare_csv.py | 58 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) create mode 100644 devtools/compare_csv.py diff --git a/devtools/compare_csv.py b/devtools/compare_csv.py new file mode 100644 index 0000000..8663e7b --- /dev/null +++ b/devtools/compare_csv.py @@ -0,0 +1,58 @@ +"""Compare {slug}.csv against {slug}.clipboard.csv in a directory. + +Reports which pairs are identical and which differ. + +Usage: + uv run devtools/compare_csv.py +""" + +from __future__ import annotations + +import sys +from pathlib import Path + + +def main() -> None: + if len(sys.argv) != 2: + print(f"Usage: {sys.argv[0]} ", file=sys.stderr) + sys.exit(1) + + directory = Path(sys.argv[1]) + if not directory.is_dir(): + print(f"Not a directory: {directory}", file=sys.stderr) + sys.exit(1) + + # Find all .clipboard.csv files and match to base CSVs + clipboard_files = sorted(directory.rglob("*.clipboard.csv")) + if not clipboard_files: + print(f"No *.clipboard.csv files found in {directory}") + sys.exit(1) + + identical = 0 + differ = 0 + missing = 0 + + for clip_path in clipboard_files: + # {slug}.clipboard.csv -> {slug}.csv (same directory) + slug = clip_path.name.removesuffix(".clipboard.csv") + base_path = clip_path.parent / f"{slug}.csv" + + if not base_path.exists(): + print(f" MISSING {slug}.csv") + missing += 1 + continue + + if base_path.read_bytes() == clip_path.read_bytes(): + print(f" IDENTICAL {slug}") + identical += 1 + else: + print(f" DIFFER {slug}") + differ += 1 + + print() + print(f"Total: {len(clipboard_files)} pairs — {identical} identical, {differ} differ, {missing} missing base") + sys.exit(1 if differ or missing else 0) + + +if __name__ == "__main__": + main() From ef4135c9bcedccc3aa086449f2e2be1e5ba0dab7 Mon Sep 17 00:00:00 2001 From: ssweber <57631333+ssweber@users.noreply.github.com> Date: Wed, 8 Apr 2026 15:23:31 -0400 Subject: [PATCH 3/4] refactor: remove implied modifier row fallback from SCR decoder MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit SCR continuation row topology blocks already encode per-row horizontal connectivity explicitly. The _implied_modifier_row_offsets heuristic was always a no-op — verified across 883 rungs in 47 SCR files. Co-Authored-By: Claude Opus 4.6 --- CHANGELOG.md | 7 ++ devtools/compare_csv.py | 4 +- src/laddercodec/decode_program.py | 100 +---------------------- tests/ladder/test_decode_program.py | 118 +--------------------------- 4 files changed, 12 insertions(+), 217 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b9bce23..f61e259 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # Changelog +## Unreleased + +### Changed +- **SCR decode**: remove `_implied_modifier_row_offsets` fallback — SCR + continuation row topology already encodes per-row horizontal connectivity + explicitly, verified across 883 rungs in 47 SCR files + ## 0.1.6 ### Fixed diff --git a/devtools/compare_csv.py b/devtools/compare_csv.py index 8663e7b..1fb9920 100644 --- a/devtools/compare_csv.py +++ b/devtools/compare_csv.py @@ -50,7 +50,9 @@ def main() -> None: differ += 1 print() - print(f"Total: {len(clipboard_files)} pairs — {identical} identical, {differ} differ, {missing} missing base") + print( + f"Total: {len(clipboard_files)} pairs — {identical} identical, {differ} differ, {missing} missing base" + ) sys.exit(1 if differ or missing else 0) diff --git a/src/laddercodec/decode_program.py b/src/laddercodec/decode_program.py index 69e4922..0f35f01 100644 --- a/src/laddercodec/decode_program.py +++ b/src/laddercodec/decode_program.py @@ -24,18 +24,13 @@ from .instructions import ( INSTRUCTION_MODULES, RawInstruction, - UnknownInstruction, from_tags_af, from_tags_condition, - get_af_family_for_token, ) from .instructions.comparison import CompareContact from .instructions.contact import Contact from .instructions.counter import Counter -from .instructions.drum import Drum -from .instructions.shift import Shift -from .instructions.timer import Timer -from .model import AfInstruction, Program +from .model import Program from .topology import CONDITION_COLUMNS as _CONDITION_COLUMNS # --------------------------------------------------------------------------- @@ -877,60 +872,6 @@ def _build_topology_backed_rung( ) -# --------------------------------------------------------------------------- -# Modifier-row inference -# --------------------------------------------------------------------------- - - -def _implied_modifier_row_offsets(af: AfInstruction | UnknownInstruction) -> set[int]: - """Return AF-relative row offsets whose horizontal path may be omitted in SCR. - - Click can store ``count=0`` or omit continuation-row topology blocks for - some tall-AF continuation rows even when the visible rung still carries - logic across that row. Pinned families still use hand-tuned offsets so - optional pin rows only opt in when the AF state says that pin is active. - Non-pinned tall families (copy/search/send_receive/raw) expose plain - continuation rows instead, so every visual continuation row can carry logic - when Click suppresses its explicit topology block. - """ - if isinstance(af, Shift): - return {1, 2} - - if isinstance(af, Timer): - return {1} if af.retained else set() - - if isinstance(af, Counter): - if af.counter_type == "count_down": - return {1, 2} if af.reset_enabled else {1} - - rows: set[int] = set() - if af.down_enabled: - rows.add(1) - if af.reset_enabled: - rows.add(2) - return rows - - if isinstance(af, Drum): - rows = {1} - if af.drum_kind == "event": - if af.jump_enabled: - rows.add(2) - if af.jog_enabled: - rows.add(3) - return rows - - if isinstance(af, UnknownInstruction): - return set() - - family = get_af_family_for_token(af) - if family is not None and not family.pin_names: - visual_rows = max(1, int(af.cell_params().get("visual_rows", 1))) - if visual_rows > 1: - return set(range(1, visual_rows)) - - return set() - - # --------------------------------------------------------------------------- # Rung construction # --------------------------------------------------------------------------- @@ -1064,45 +1005,6 @@ def _build_rung( elif isinstance(cell, (Contact, CompareContact)): cell.wire_down = True - implied_modifier_rows: set[int] = set() - for af_row, af in enumerate(instructions): - if isinstance(af, str): - continue - for row_offset in _implied_modifier_row_offsets(af): - row = af_row + row_offset - if 0 < row < logical_rows: - implied_modifier_rows.add(row) - - # 4. Fallback for logic-carrying continuation rows whose topology Click - # omits from SCR. Rebuild the implied horizontal path and merge any - # pre-parsed wire_down markers into T-junctions. Skip rows that - # contain only vertical pass-through wires (no logic path). - for row in sorted(implied_modifier_rows): - explicit_right_wires = row - 1 < len(extra_rows_right_wires) and bool( - extra_rows_right_wires[row - 1] - ) - if explicit_right_wires: - continue - - leftmost = None - has_non_vertical = False - for col in range(_CONDITION_COLUMNS): - cell = conditions[row][col] - if cell != "": - if leftmost is None: - leftmost = col - if cell != "|": - has_non_vertical = True - if leftmost is None or not has_non_vertical: - continue - - for col in range(leftmost, _CONDITION_COLUMNS): - cell = conditions[row][col] - if cell == "": - conditions[row][col] = "-" - elif cell == "|": - conditions[row][col] = "T" - return Rung( logical_rows=logical_rows, conditions=conditions, diff --git a/tests/ladder/test_decode_program.py b/tests/ladder/test_decode_program.py index 7715f20..2a10456 100644 --- a/tests/ladder/test_decode_program.py +++ b/tests/ladder/test_decode_program.py @@ -10,7 +10,6 @@ from laddercodec.decode_program import ( _find_all_row_topology_blocks, _find_sections, - _implied_modifier_row_offsets, _parse_extra_row_right_wires, _parse_header, _parse_scr_tags, @@ -19,15 +18,11 @@ decode_program, ) from laddercodec.instructions import from_tags_af -from laddercodec.instructions.contact import Contact from laddercodec.instructions.home import from_tags as home_from_tags from laddercodec.instructions.math import Math from laddercodec.instructions.position import from_tags as position_from_tags -from laddercodec.instructions.raw import RawInstruction, _decompose_blob, _fields_to_tag_dicts -from laddercodec.instructions.search import Search -from laddercodec.instructions.send_receive import ModbusRtuTarget, Receive +from laddercodec.instructions.raw import _decompose_blob, _fields_to_tag_dicts from laddercodec.instructions.timer import Timer -from laddercodec.model import InstructionType decode_program_module = importlib.import_module("laddercodec.decode_program") _SCR_FIXTURE_DIR = Path(__file__).resolve().parents[1] / "fixtures" / "scr_captures" @@ -323,117 +318,6 @@ def test_parse_wiredown_uses_explicit_row_indices(): assert _parse_wiredown(data, 0, len(data)) == {1: (1, 2, 3, 4, 5)} -def test_implied_modifier_row_offsets_include_generic_tall_af_continuations(): - receive = Receive( - target=ModbusRtuTarget(name="rtu", com_port="cpu2", device_id=1), - remote_start="DS1", - dest="DS10", - quantity=1, - receiving="", - success="", - error="", - exception_response="", - ) - - assert _implied_modifier_row_offsets(Search("DS2", "DS11", "DS1", "DS12", "C1", "==")) == {1} - assert _implied_modifier_row_offsets(receive) == {1, 2} - assert _implied_modifier_row_offsets( - RawInstruction(class_name="Mystery", blob=b"", part_count=4) - ) == { - 1, - 2, - 3, - } - - -def test_build_topology_backed_rung_reconstructs_omitted_generic_tall_row_with_junction(): - topology_block = decode_program_module._ScrRowTopologyBlock( - start=0, - row_word=3, - prelude=b"", - leading_rows_right_wires=[], - row0_flag_count=0, - row0_flags={}, - flags_start=0, - continuation_start=0, - ) - section_instructions = [ - _section_instruction_from_token( - 0, - 31, - Search("DS2", "DS11", "DS1", "DS12", "C1", "=="), - ), - _section_instruction_from_token( - 1, - 0, - Contact(InstructionType.CONTACT_NO, "C10"), - ), - ] - data = bytes([0x20, 0x00, 0x00, 0x00, 0x01, 0x00, 0x02]) - - rung = decode_program_module._build_topology_backed_rung( - data=data, - topology_block=topology_block, - rung_end=len(data), - logical_rows=2, - section_instructions=section_instructions, - comment=None, - comment_rtf=None, - ) - - assert isinstance(rung.conditions[1][0], Contact) - assert rung.conditions[1][0].operand == "C10" - assert rung.conditions[1][1] == "T" - assert all(cell == "-" for cell in rung.conditions[1][2:]) - - -def test_build_topology_backed_rung_reconstructs_all_omitted_receive_rows(): - topology_block = decode_program_module._ScrRowTopologyBlock( - start=0, - row_word=4, - prelude=b"", - leading_rows_right_wires=[], - row0_flag_count=0, - row0_flags={}, - flags_start=0, - continuation_start=0, - ) - receive = Receive( - target=ModbusRtuTarget(name="rtu", com_port="cpu2", device_id=1), - remote_start="DS1", - dest="DS10", - quantity=1, - receiving="", - success="", - error="", - exception_response="", - ) - section_instructions = [ - _section_instruction_from_token(0, 31, receive), - _section_instruction_from_token(1, 0, Contact(InstructionType.CONTACT_NO, "C11")), - _section_instruction_from_token(2, 2, Contact(InstructionType.CONTACT_NO, "C12")), - ] - data = b"\x20\x00" - - rung = decode_program_module._build_topology_backed_rung( - data=data, - topology_block=topology_block, - rung_end=len(data), - logical_rows=3, - section_instructions=section_instructions, - comment=None, - comment_rtf=None, - ) - - assert isinstance(rung.instructions[0], Receive) - assert isinstance(rung.conditions[1][0], Contact) - assert all(cell == "-" for cell in rung.conditions[1][1:]) - assert rung.conditions[2][0] == "" - assert rung.conditions[2][1] == "" - assert isinstance(rung.conditions[2][2], Contact) - assert all(cell == "-" for cell in rung.conditions[2][3:]) - - def test_parse_scr_tags_handles_compact_home_raw_fields(): raw = _compact_scr_blob( "Home", From dda438b214dadaa73c7a01ca30327813d182cd02 Mon Sep 17 00:00:00 2001 From: ssweber <57631333+ssweber@users.noreply.github.com> Date: Wed, 8 Apr 2026 15:23:31 -0400 Subject: [PATCH 4/4] refactor: remove implied modifier row fallback from SCR decoder MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit SCR continuation row topology blocks already encode per-row horizontal connectivity explicitly. The _implied_modifier_row_offsets heuristic was always a no-op — verified across 883 rungs in 47 SCR files. Co-Authored-By: Claude Opus 4.6 --- CHANGELOG.md | 14 ++++ devtools/compare_csv.py | 4 +- src/laddercodec/decode_program.py | 100 +---------------------- tests/ladder/test_decode_program.py | 118 +--------------------------- 4 files changed, 19 insertions(+), 217 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b9bce23..358bc4a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,19 @@ # Changelog +## 0.1.7 + +### Fixed +- **SCR decode**: skip pure vertical pass-through rows in implied modifier + row fallback, preventing `|` → `T` and blank → `-` corruption on + continuation rows +- **CSV converter**: recognize `Contact`/`CompareContact` objects with + `wire_down=True` during wire hydration, not just bare string tokens + +### Changed +- **SCR decode**: remove `_implied_modifier_row_offsets` fallback — SCR + continuation row topology already encodes per-row horizontal connectivity + explicitly, verified across 883 rungs in 47 SCR files + ## 0.1.6 ### Fixed diff --git a/devtools/compare_csv.py b/devtools/compare_csv.py index 8663e7b..1fb9920 100644 --- a/devtools/compare_csv.py +++ b/devtools/compare_csv.py @@ -50,7 +50,9 @@ def main() -> None: differ += 1 print() - print(f"Total: {len(clipboard_files)} pairs — {identical} identical, {differ} differ, {missing} missing base") + print( + f"Total: {len(clipboard_files)} pairs — {identical} identical, {differ} differ, {missing} missing base" + ) sys.exit(1 if differ or missing else 0) diff --git a/src/laddercodec/decode_program.py b/src/laddercodec/decode_program.py index 69e4922..0f35f01 100644 --- a/src/laddercodec/decode_program.py +++ b/src/laddercodec/decode_program.py @@ -24,18 +24,13 @@ from .instructions import ( INSTRUCTION_MODULES, RawInstruction, - UnknownInstruction, from_tags_af, from_tags_condition, - get_af_family_for_token, ) from .instructions.comparison import CompareContact from .instructions.contact import Contact from .instructions.counter import Counter -from .instructions.drum import Drum -from .instructions.shift import Shift -from .instructions.timer import Timer -from .model import AfInstruction, Program +from .model import Program from .topology import CONDITION_COLUMNS as _CONDITION_COLUMNS # --------------------------------------------------------------------------- @@ -877,60 +872,6 @@ def _build_topology_backed_rung( ) -# --------------------------------------------------------------------------- -# Modifier-row inference -# --------------------------------------------------------------------------- - - -def _implied_modifier_row_offsets(af: AfInstruction | UnknownInstruction) -> set[int]: - """Return AF-relative row offsets whose horizontal path may be omitted in SCR. - - Click can store ``count=0`` or omit continuation-row topology blocks for - some tall-AF continuation rows even when the visible rung still carries - logic across that row. Pinned families still use hand-tuned offsets so - optional pin rows only opt in when the AF state says that pin is active. - Non-pinned tall families (copy/search/send_receive/raw) expose plain - continuation rows instead, so every visual continuation row can carry logic - when Click suppresses its explicit topology block. - """ - if isinstance(af, Shift): - return {1, 2} - - if isinstance(af, Timer): - return {1} if af.retained else set() - - if isinstance(af, Counter): - if af.counter_type == "count_down": - return {1, 2} if af.reset_enabled else {1} - - rows: set[int] = set() - if af.down_enabled: - rows.add(1) - if af.reset_enabled: - rows.add(2) - return rows - - if isinstance(af, Drum): - rows = {1} - if af.drum_kind == "event": - if af.jump_enabled: - rows.add(2) - if af.jog_enabled: - rows.add(3) - return rows - - if isinstance(af, UnknownInstruction): - return set() - - family = get_af_family_for_token(af) - if family is not None and not family.pin_names: - visual_rows = max(1, int(af.cell_params().get("visual_rows", 1))) - if visual_rows > 1: - return set(range(1, visual_rows)) - - return set() - - # --------------------------------------------------------------------------- # Rung construction # --------------------------------------------------------------------------- @@ -1064,45 +1005,6 @@ def _build_rung( elif isinstance(cell, (Contact, CompareContact)): cell.wire_down = True - implied_modifier_rows: set[int] = set() - for af_row, af in enumerate(instructions): - if isinstance(af, str): - continue - for row_offset in _implied_modifier_row_offsets(af): - row = af_row + row_offset - if 0 < row < logical_rows: - implied_modifier_rows.add(row) - - # 4. Fallback for logic-carrying continuation rows whose topology Click - # omits from SCR. Rebuild the implied horizontal path and merge any - # pre-parsed wire_down markers into T-junctions. Skip rows that - # contain only vertical pass-through wires (no logic path). - for row in sorted(implied_modifier_rows): - explicit_right_wires = row - 1 < len(extra_rows_right_wires) and bool( - extra_rows_right_wires[row - 1] - ) - if explicit_right_wires: - continue - - leftmost = None - has_non_vertical = False - for col in range(_CONDITION_COLUMNS): - cell = conditions[row][col] - if cell != "": - if leftmost is None: - leftmost = col - if cell != "|": - has_non_vertical = True - if leftmost is None or not has_non_vertical: - continue - - for col in range(leftmost, _CONDITION_COLUMNS): - cell = conditions[row][col] - if cell == "": - conditions[row][col] = "-" - elif cell == "|": - conditions[row][col] = "T" - return Rung( logical_rows=logical_rows, conditions=conditions, diff --git a/tests/ladder/test_decode_program.py b/tests/ladder/test_decode_program.py index 7715f20..2a10456 100644 --- a/tests/ladder/test_decode_program.py +++ b/tests/ladder/test_decode_program.py @@ -10,7 +10,6 @@ from laddercodec.decode_program import ( _find_all_row_topology_blocks, _find_sections, - _implied_modifier_row_offsets, _parse_extra_row_right_wires, _parse_header, _parse_scr_tags, @@ -19,15 +18,11 @@ decode_program, ) from laddercodec.instructions import from_tags_af -from laddercodec.instructions.contact import Contact from laddercodec.instructions.home import from_tags as home_from_tags from laddercodec.instructions.math import Math from laddercodec.instructions.position import from_tags as position_from_tags -from laddercodec.instructions.raw import RawInstruction, _decompose_blob, _fields_to_tag_dicts -from laddercodec.instructions.search import Search -from laddercodec.instructions.send_receive import ModbusRtuTarget, Receive +from laddercodec.instructions.raw import _decompose_blob, _fields_to_tag_dicts from laddercodec.instructions.timer import Timer -from laddercodec.model import InstructionType decode_program_module = importlib.import_module("laddercodec.decode_program") _SCR_FIXTURE_DIR = Path(__file__).resolve().parents[1] / "fixtures" / "scr_captures" @@ -323,117 +318,6 @@ def test_parse_wiredown_uses_explicit_row_indices(): assert _parse_wiredown(data, 0, len(data)) == {1: (1, 2, 3, 4, 5)} -def test_implied_modifier_row_offsets_include_generic_tall_af_continuations(): - receive = Receive( - target=ModbusRtuTarget(name="rtu", com_port="cpu2", device_id=1), - remote_start="DS1", - dest="DS10", - quantity=1, - receiving="", - success="", - error="", - exception_response="", - ) - - assert _implied_modifier_row_offsets(Search("DS2", "DS11", "DS1", "DS12", "C1", "==")) == {1} - assert _implied_modifier_row_offsets(receive) == {1, 2} - assert _implied_modifier_row_offsets( - RawInstruction(class_name="Mystery", blob=b"", part_count=4) - ) == { - 1, - 2, - 3, - } - - -def test_build_topology_backed_rung_reconstructs_omitted_generic_tall_row_with_junction(): - topology_block = decode_program_module._ScrRowTopologyBlock( - start=0, - row_word=3, - prelude=b"", - leading_rows_right_wires=[], - row0_flag_count=0, - row0_flags={}, - flags_start=0, - continuation_start=0, - ) - section_instructions = [ - _section_instruction_from_token( - 0, - 31, - Search("DS2", "DS11", "DS1", "DS12", "C1", "=="), - ), - _section_instruction_from_token( - 1, - 0, - Contact(InstructionType.CONTACT_NO, "C10"), - ), - ] - data = bytes([0x20, 0x00, 0x00, 0x00, 0x01, 0x00, 0x02]) - - rung = decode_program_module._build_topology_backed_rung( - data=data, - topology_block=topology_block, - rung_end=len(data), - logical_rows=2, - section_instructions=section_instructions, - comment=None, - comment_rtf=None, - ) - - assert isinstance(rung.conditions[1][0], Contact) - assert rung.conditions[1][0].operand == "C10" - assert rung.conditions[1][1] == "T" - assert all(cell == "-" for cell in rung.conditions[1][2:]) - - -def test_build_topology_backed_rung_reconstructs_all_omitted_receive_rows(): - topology_block = decode_program_module._ScrRowTopologyBlock( - start=0, - row_word=4, - prelude=b"", - leading_rows_right_wires=[], - row0_flag_count=0, - row0_flags={}, - flags_start=0, - continuation_start=0, - ) - receive = Receive( - target=ModbusRtuTarget(name="rtu", com_port="cpu2", device_id=1), - remote_start="DS1", - dest="DS10", - quantity=1, - receiving="", - success="", - error="", - exception_response="", - ) - section_instructions = [ - _section_instruction_from_token(0, 31, receive), - _section_instruction_from_token(1, 0, Contact(InstructionType.CONTACT_NO, "C11")), - _section_instruction_from_token(2, 2, Contact(InstructionType.CONTACT_NO, "C12")), - ] - data = b"\x20\x00" - - rung = decode_program_module._build_topology_backed_rung( - data=data, - topology_block=topology_block, - rung_end=len(data), - logical_rows=3, - section_instructions=section_instructions, - comment=None, - comment_rtf=None, - ) - - assert isinstance(rung.instructions[0], Receive) - assert isinstance(rung.conditions[1][0], Contact) - assert all(cell == "-" for cell in rung.conditions[1][1:]) - assert rung.conditions[2][0] == "" - assert rung.conditions[2][1] == "" - assert isinstance(rung.conditions[2][2], Contact) - assert all(cell == "-" for cell in rung.conditions[2][3:]) - - def test_parse_scr_tags_handles_compact_home_raw_fields(): raw = _compact_scr_blob( "Home",