Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
# Changelog

## 0.1.7

### Fixed
- **SCR decode**: skip pure vertical pass-through rows in implied modifier
row fallback, preventing `|` → `T` and blank → `-` corruption on
continuation rows
- **CSV converter**: recognize `Contact`/`CompareContact` objects with
`wire_down=True` during wire hydration, not just bare string tokens

### Changed
- **SCR decode**: remove `_implied_modifier_row_offsets` fallback — SCR
continuation row topology already encodes per-row horizontal connectivity
explicitly, verified across 883 rungs in 47 SCR files

## 0.1.6

### Fixed
Expand Down
60 changes: 60 additions & 0 deletions devtools/compare_csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""Compare {slug}.csv against {slug}.clipboard.csv in a directory.

Reports which pairs are identical and which differ.

Usage:
uv run devtools/compare_csv.py <directory>
"""

from __future__ import annotations

import sys
from pathlib import Path


def main() -> None:
if len(sys.argv) != 2:
print(f"Usage: {sys.argv[0]} <directory>", file=sys.stderr)
sys.exit(1)

directory = Path(sys.argv[1])
if not directory.is_dir():
print(f"Not a directory: {directory}", file=sys.stderr)
sys.exit(1)

# Find all .clipboard.csv files and match to base CSVs
clipboard_files = sorted(directory.rglob("*.clipboard.csv"))
if not clipboard_files:
print(f"No *.clipboard.csv files found in {directory}")
sys.exit(1)

identical = 0
differ = 0
missing = 0

for clip_path in clipboard_files:
# {slug}.clipboard.csv -> {slug}.csv (same directory)
slug = clip_path.name.removesuffix(".clipboard.csv")
base_path = clip_path.parent / f"{slug}.csv"

if not base_path.exists():
print(f" MISSING {slug}.csv")
missing += 1
continue

if base_path.read_bytes() == clip_path.read_bytes():
print(f" IDENTICAL {slug}")
identical += 1
else:
print(f" DIFFER {slug}")
differ += 1

print()
print(
f"Total: {len(clipboard_files)} pairs — {identical} identical, {differ} differ, {missing} missing base"
)
sys.exit(1 if differ or missing else 0)


if __name__ == "__main__":
main()
14 changes: 10 additions & 4 deletions src/laddercodec/csv/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,10 @@ def _hydrate_wire_continuations(condition_rows: list[list[ConditionToken]]) -> N

After block finalization and auto-padding, padding rows are all-blank.
This pass restores vertical wire continuity: wherever row *R* has a
``T`` (junction-down) or ``|`` (vertical pass-through), row *R+1*
gets a ``|`` if the cell is currently blank — unless a horizontal wire
from the left already connects to that junction.
``T`` (junction-down), ``|`` (vertical pass-through), or a condition
instruction with ``wire_down=True``, row *R+1* gets a ``|`` if the
cell is currently blank — unless a horizontal wire from the left
already connects to that junction.

A vertical wire terminates when ``(R+1, C-1)`` carries a horizontal
path (anything other than ``""`` or ``"|"``), because the horizontal
Expand All @@ -147,7 +148,12 @@ def _hydrate_wire_continuations(condition_rows: list[list[ConditionToken]]) -> N
for row_idx in range(len(condition_rows) - 2):
for col_idx in range(len(condition_rows[row_idx])):
token = condition_rows[row_idx][col_idx]
if token in ("T", "|") and condition_rows[row_idx + 1][col_idx] == "":
# A cell propagates | downward if it's a T/| wire token,
# or a Contact/CompareContact with wire_down=True.
propagates_down = token in ("T", "|") or (
hasattr(token, "wire_down") and token.wire_down
)
if propagates_down and condition_rows[row_idx + 1][col_idx] == "":
# Check if a horizontal wire from the left already connects here.
if col_idx > 0:
left = condition_rows[row_idx + 1][col_idx - 1]
Expand Down
95 changes: 1 addition & 94 deletions src/laddercodec/decode_program.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,13 @@
from .instructions import (
INSTRUCTION_MODULES,
RawInstruction,
UnknownInstruction,
from_tags_af,
from_tags_condition,
get_af_family_for_token,
)
from .instructions.comparison import CompareContact
from .instructions.contact import Contact
from .instructions.counter import Counter
from .instructions.drum import Drum
from .instructions.shift import Shift
from .instructions.timer import Timer
from .model import AfInstruction, Program
from .model import Program
from .topology import CONDITION_COLUMNS as _CONDITION_COLUMNS

# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -877,60 +872,6 @@ def _build_topology_backed_rung(
)


# ---------------------------------------------------------------------------
# Modifier-row inference
# ---------------------------------------------------------------------------


def _implied_modifier_row_offsets(af: AfInstruction | UnknownInstruction) -> set[int]:
"""Return AF-relative row offsets whose horizontal path may be omitted in SCR.

Click can store ``count=0`` or omit continuation-row topology blocks for
some tall-AF continuation rows even when the visible rung still carries
logic across that row. Pinned families still use hand-tuned offsets so
optional pin rows only opt in when the AF state says that pin is active.
Non-pinned tall families (copy/search/send_receive/raw) expose plain
continuation rows instead, so every visual continuation row can carry logic
when Click suppresses its explicit topology block.
"""
if isinstance(af, Shift):
return {1, 2}

if isinstance(af, Timer):
return {1} if af.retained else set()

if isinstance(af, Counter):
if af.counter_type == "count_down":
return {1, 2} if af.reset_enabled else {1}

rows: set[int] = set()
if af.down_enabled:
rows.add(1)
if af.reset_enabled:
rows.add(2)
return rows

if isinstance(af, Drum):
rows = {1}
if af.drum_kind == "event":
if af.jump_enabled:
rows.add(2)
if af.jog_enabled:
rows.add(3)
return rows

if isinstance(af, UnknownInstruction):
return set()

family = get_af_family_for_token(af)
if family is not None and not family.pin_names:
visual_rows = max(1, int(af.cell_params().get("visual_rows", 1)))
if visual_rows > 1:
return set(range(1, visual_rows))

return set()


# ---------------------------------------------------------------------------
# Rung construction
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -1064,40 +1005,6 @@ def _build_rung(
elif isinstance(cell, (Contact, CompareContact)):
cell.wire_down = True

implied_modifier_rows: set[int] = set()
for af_row, af in enumerate(instructions):
if isinstance(af, str):
continue
for row_offset in _implied_modifier_row_offsets(af):
row = af_row + row_offset
if 0 < row < logical_rows:
implied_modifier_rows.add(row)

# 4. Fallback for logic-carrying continuation rows whose topology Click
# omits from SCR. Rebuild the implied horizontal path and merge any
# pre-parsed wire_down markers into T-junctions.
for row in sorted(implied_modifier_rows):
explicit_right_wires = row - 1 < len(extra_rows_right_wires) and bool(
extra_rows_right_wires[row - 1]
)
if explicit_right_wires:
continue

leftmost = None
for col in range(_CONDITION_COLUMNS):
if conditions[row][col] != "":
leftmost = col
break
if leftmost is None:
continue

for col in range(leftmost, _CONDITION_COLUMNS):
cell = conditions[row][col]
if cell == "":
conditions[row][col] = "-"
elif cell == "|":
conditions[row][col] = "T"

return Rung(
logical_rows=logical_rows,
conditions=conditions,
Expand Down
118 changes: 1 addition & 117 deletions tests/ladder/test_decode_program.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from laddercodec.decode_program import (
_find_all_row_topology_blocks,
_find_sections,
_implied_modifier_row_offsets,
_parse_extra_row_right_wires,
_parse_header,
_parse_scr_tags,
Expand All @@ -19,15 +18,11 @@
decode_program,
)
from laddercodec.instructions import from_tags_af
from laddercodec.instructions.contact import Contact
from laddercodec.instructions.home import from_tags as home_from_tags
from laddercodec.instructions.math import Math
from laddercodec.instructions.position import from_tags as position_from_tags
from laddercodec.instructions.raw import RawInstruction, _decompose_blob, _fields_to_tag_dicts
from laddercodec.instructions.search import Search
from laddercodec.instructions.send_receive import ModbusRtuTarget, Receive
from laddercodec.instructions.raw import _decompose_blob, _fields_to_tag_dicts
from laddercodec.instructions.timer import Timer
from laddercodec.model import InstructionType

decode_program_module = importlib.import_module("laddercodec.decode_program")
_SCR_FIXTURE_DIR = Path(__file__).resolve().parents[1] / "fixtures" / "scr_captures"
Expand Down Expand Up @@ -323,117 +318,6 @@ def test_parse_wiredown_uses_explicit_row_indices():
assert _parse_wiredown(data, 0, len(data)) == {1: (1, 2, 3, 4, 5)}


def test_implied_modifier_row_offsets_include_generic_tall_af_continuations():
receive = Receive(
target=ModbusRtuTarget(name="rtu", com_port="cpu2", device_id=1),
remote_start="DS1",
dest="DS10",
quantity=1,
receiving="",
success="",
error="",
exception_response="",
)

assert _implied_modifier_row_offsets(Search("DS2", "DS11", "DS1", "DS12", "C1", "==")) == {1}
assert _implied_modifier_row_offsets(receive) == {1, 2}
assert _implied_modifier_row_offsets(
RawInstruction(class_name="Mystery", blob=b"", part_count=4)
) == {
1,
2,
3,
}


def test_build_topology_backed_rung_reconstructs_omitted_generic_tall_row_with_junction():
topology_block = decode_program_module._ScrRowTopologyBlock(
start=0,
row_word=3,
prelude=b"",
leading_rows_right_wires=[],
row0_flag_count=0,
row0_flags={},
flags_start=0,
continuation_start=0,
)
section_instructions = [
_section_instruction_from_token(
0,
31,
Search("DS2", "DS11", "DS1", "DS12", "C1", "=="),
),
_section_instruction_from_token(
1,
0,
Contact(InstructionType.CONTACT_NO, "C10"),
),
]
data = bytes([0x20, 0x00, 0x00, 0x00, 0x01, 0x00, 0x02])

rung = decode_program_module._build_topology_backed_rung(
data=data,
topology_block=topology_block,
rung_end=len(data),
logical_rows=2,
section_instructions=section_instructions,
comment=None,
comment_rtf=None,
)

assert isinstance(rung.conditions[1][0], Contact)
assert rung.conditions[1][0].operand == "C10"
assert rung.conditions[1][1] == "T"
assert all(cell == "-" for cell in rung.conditions[1][2:])


def test_build_topology_backed_rung_reconstructs_all_omitted_receive_rows():
topology_block = decode_program_module._ScrRowTopologyBlock(
start=0,
row_word=4,
prelude=b"",
leading_rows_right_wires=[],
row0_flag_count=0,
row0_flags={},
flags_start=0,
continuation_start=0,
)
receive = Receive(
target=ModbusRtuTarget(name="rtu", com_port="cpu2", device_id=1),
remote_start="DS1",
dest="DS10",
quantity=1,
receiving="",
success="",
error="",
exception_response="",
)
section_instructions = [
_section_instruction_from_token(0, 31, receive),
_section_instruction_from_token(1, 0, Contact(InstructionType.CONTACT_NO, "C11")),
_section_instruction_from_token(2, 2, Contact(InstructionType.CONTACT_NO, "C12")),
]
data = b"\x20\x00"

rung = decode_program_module._build_topology_backed_rung(
data=data,
topology_block=topology_block,
rung_end=len(data),
logical_rows=3,
section_instructions=section_instructions,
comment=None,
comment_rtf=None,
)

assert isinstance(rung.instructions[0], Receive)
assert isinstance(rung.conditions[1][0], Contact)
assert all(cell == "-" for cell in rung.conditions[1][1:])
assert rung.conditions[2][0] == ""
assert rung.conditions[2][1] == ""
assert isinstance(rung.conditions[2][2], Contact)
assert all(cell == "-" for cell in rung.conditions[2][3:])


def test_parse_scr_tags_handles_compact_home_raw_fields():
raw = _compact_scr_blob(
"Home",
Expand Down