diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index b040985..924c3b7 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -98,14 +98,14 @@ jobs: - name: Install runtime + test dependencies # Install from the pinned lock file for deterministic dependency - # resolution (closes #47). pytest is added on top — it is not in - # requirements-lock.txt because it is a dev-only dep. pywebview is + # resolution (closes #47). pytest and hypothesis are added on top — not in + # requirements-lock.txt (dev-only). pywebview is # the desktop-launcher dep and pulls GTK / Qt system libraries on # Linux — intentionally excluded from the CI unittest matrix. run: | python -m pip install --upgrade pip python -m pip install -r requirements-lock.txt - python -m pip install 'pytest>=8,<9' + python -m pip install 'pytest>=8,<9' 'hypothesis>=6.100,<7' - name: Run unittest suite run: python -m unittest discover tests -v diff --git a/.gitignore b/.gitignore index 685a7ae..5fd078f 100644 --- a/.gitignore +++ b/.gitignore @@ -43,3 +43,4 @@ Thumbs.db .coverage htmlcov/ coverage.xml +.hypothesis/ diff --git a/README.md b/README.md index 4ca8e78..007802e 100644 --- a/README.md +++ b/README.md @@ -61,6 +61,12 @@ source venv/bin/activate pip install -r requirements.txt ``` +For development (pytest, mypy, Hypothesis property tests): + +```bash +pip install -e ".[dev]" +``` + For reproducible installs (same versions as CI), use the pinned lock file: ```bash diff --git a/pyproject.toml b/pyproject.toml index 2c4226b..ea79f67 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,6 +32,7 @@ desktop = ["pywebview>=5.0,<6"] dev = [ "pytest>=8,<9", "mypy>=1.10,<2", + "hypothesis>=6.100,<7", ] [project.scripts] diff --git a/tests/test_blob_parsing_fuzz.py b/tests/test_blob_parsing_fuzz.py new file mode 100644 index 0000000..36fd753 --- /dev/null +++ b/tests/test_blob_parsing_fuzz.py @@ -0,0 +1,369 @@ +"""Property-based fuzz tests for blob / bubble parsing (issue #71). + +Run: + python -m unittest tests.test_blob_parsing_fuzz -v + python -m pytest tests/test_blob_parsing_fuzz.py -v +""" + +from __future__ import annotations + +import json +import os +import sqlite3 +import sys +import tempfile +import unittest + +from hypothesis import HealthCheck, given, settings +from hypothesis import strategies as st + +REPO_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +if REPO_ROOT not in sys.path: + sys.path.insert(0, REPO_ROOT) + +from models import Bubble, SchemaError +from utils.cli_chat_reader import ( + classify_blob_data, + extract_blob_refs, + messages_to_bubbles, + traverse_blobs, +) +from utils.text_extract import extract_text_from_bubble + +# Bounded strategies: fast enough for CI (<30s total with default example counts). +_JSON_VALUES = st.one_of( + st.none(), + st.booleans(), + st.integers(), + st.floats(allow_nan=False, allow_infinity=False), + st.text(max_size=200), + st.lists(st.text(max_size=80), max_size=8), +) + +_BUBBLE_RAW = st.dictionaries( + st.text(min_size=0, max_size=40), + _JSON_VALUES, + max_size=12, +) + +_BUBBLE_RAW_ANY = st.one_of( + _BUBBLE_RAW, + st.none(), + st.integers(), + st.lists(st.text(max_size=40), max_size=5), + st.text(max_size=200), +) + +_BUBBLE_ID = st.text( + alphabet=st.characters(blacklist_categories=("Cs",), blacklist_characters="\x00"), + min_size=1, + max_size=80, +) + +_BUBBLE_ID_ANY = st.one_of( + _BUBBLE_ID, + st.just(""), + st.none(), + st.integers(min_value=0, max_value=9999), + st.binary(min_size=0, max_size=8), +) + +_BLOB_ID_HEX = st.text( + alphabet="abcdef0123456789", + min_size=64, + max_size=64, +) + + +@st.composite +def _cli_message(draw): + # Empty role is intentional adversarial input (unknown / missing role). + role = draw(st.sampled_from(["user", "assistant", "system", "tool", ""])) + content = draw( + st.one_of( + st.text(max_size=500), + st.lists( + st.dictionaries( + st.sampled_from( + ["type", "text", "toolName", "args", "toolCallId", "result"] + ), + st.one_of(st.text(max_size=120), st.integers(), st.none()), + max_size=6, + ), + max_size=8, + ), + st.none(), + ) + ) + return {"role": role, "content": content} + + +_BUBBLE_LIKE = st.dictionaries( + st.sampled_from(["text", "richText", "codeBlocks", "type", "metadata"]), + st.one_of( + st.text(max_size=300), + st.integers(), + st.booleans(), + st.none(), + st.lists( + st.dictionaries( + st.text(max_size=20), + st.one_of(st.text(max_size=100), st.integers()), + max_size=5, + ), + max_size=4, + ), + st.dictionaries(st.text(max_size=20), _JSON_VALUES, max_size=5), + ), + max_size=6, +) + +_KV_VALUE = st.one_of( + st.none(), + _BUBBLE_RAW, + st.text(max_size=400), + st.binary(max_size=256), + st.integers(), +) + + +def _make_meta_value(meta: dict) -> str: + return json.dumps(meta).encode("utf-8").hex() + + +def _build_store_db_raw(path: str, meta: dict, blobs: dict[str, bytes]) -> None: + """Minimal store.db with well-formed meta dict and arbitrary blob payloads.""" + _build_store_db_meta_row(path, _make_meta_value(meta), blobs) + + +def _build_store_db_meta_row( + path: str, meta_row: str | None, blobs: dict[str, bytes] +) -> None: + """Minimal store.db; *meta_row* is the raw ``meta.value`` (hex JSON or adversarial).""" + conn = sqlite3.connect(path) + conn.execute("CREATE TABLE meta (key TEXT PRIMARY KEY, value TEXT)") + conn.execute("CREATE TABLE blobs (id TEXT PRIMARY KEY, data BLOB)") + if meta_row is not None: + conn.execute("INSERT INTO meta VALUES ('0', ?)", (meta_row,)) + for blob_id, data in blobs.items(): + conn.execute("INSERT INTO blobs VALUES (?, ?)", (blob_id, data)) + conn.commit() + conn.close() + + +_FUZZ_META_ROW = st.one_of( + st.none(), + st.just(""), + st.text(min_size=0, max_size=200), + st.dictionaries(st.text(max_size=20), _JSON_VALUES, max_size=6).map( + lambda d: json.dumps(d).encode("utf-8").hex() + ), + st.builds( + lambda root: _make_meta_value({"latestRootBlobId": root, "createdAt": 1}), + _BLOB_ID_HEX, + ), +) + + +def _assemble_workspace_bubble(bubble_id: object, value: object) -> dict | None: + """Mirror workspace_tabs KV bubble load (json.loads → Bubble.from_dict). + + Matches ``services/workspace_tabs.py`` (bubbleId loop): ``json.loads(row["value"])`` + with no type branching — same exceptions as production. Rows with ``value IS NULL`` + are not selected in production; ``None`` here returns ``None`` for fuzz only. + + Intentionally omits ``_loads_kv_value_logged`` (logging / payload hashing). + """ + if value is None: + return None + try: + parsed = json.loads(value) # type: ignore[arg-type] + except (json.JSONDecodeError, TypeError, ValueError): + return None + try: + return Bubble.from_dict(parsed, bubble_id=bubble_id).raw # type: ignore[arg-type] + except SchemaError: + return None + + +def _parse_bubble_from_dict(raw: object, bubble_id: object) -> Bubble | None: + """Call Bubble.from_dict; return None on SchemaError, propagate nothing else.""" + try: + return Bubble.from_dict(raw, bubble_id=bubble_id) # type: ignore[arg-type] + except SchemaError: + return None + + +class TestBubbleFromDictFuzz(unittest.TestCase): + @given(raw=_BUBBLE_RAW, bubble_id=_BUBBLE_ID) + @settings(max_examples=80, deadline=None) + def test_never_raises_unhandled(self, raw: dict, bubble_id: str) -> None: + bubble = _parse_bubble_from_dict(raw, bubble_id) + if bubble is None: + return + self.assertEqual(bubble.bubble_id, bubble_id) + self.assertIs(bubble.raw, raw) + + @given(raw=_BUBBLE_RAW_ANY, bubble_id=_BUBBLE_ID_ANY) + @settings(max_examples=80, deadline=None) + def test_adversarial_inputs_only_schema_error_or_success( + self, raw: object, bubble_id: object + ) -> None: + try: + _parse_bubble_from_dict(raw, bubble_id) + except Exception as exc: + self.fail(f"unexpected {type(exc).__name__}: {exc}") + + @given(raw=_BUBBLE_RAW, bubble_id=_BUBBLE_ID) + @settings(max_examples=80, deadline=None) + def test_parsing_is_idempotent(self, raw: dict, bubble_id: str) -> None: + first = _parse_bubble_from_dict(raw, bubble_id) + second = _parse_bubble_from_dict(raw, bubble_id) + self.assertEqual(first, second) + + +class TestWorkspaceTabsAssemblyFuzz(unittest.TestCase): + @given(bubble_id=_BUBBLE_ID_ANY, value=_KV_VALUE) + @settings(max_examples=100, deadline=None) + def test_assemble_workspace_bubble_never_raises( + self, bubble_id: object, value: object + ) -> None: + try: + result = _assemble_workspace_bubble(bubble_id, value) + except Exception as exc: + self.fail(f"unexpected {type(exc).__name__}: {exc}") + if result is not None: + self.assertIsInstance(result, dict) + + +class TestBlobChainParsingFuzz(unittest.TestCase): + @given(data=st.binary(max_size=4096)) + @settings(max_examples=120, deadline=None) + def test_extract_blob_refs_never_raises(self, data: bytes) -> None: + try: + refs = extract_blob_refs(data) + except Exception as exc: + self.fail(f"unexpected {type(exc).__name__}: {exc}") + self.assertIsInstance(refs, list) + for ref in refs: + self.assertIsInstance(ref, str) + self.assertEqual(len(ref), 64) + + @given(data=st.binary(max_size=4096)) + @settings(max_examples=80, deadline=None) + def test_extract_blob_refs_is_idempotent(self, data: bytes) -> None: + self.assertEqual(extract_blob_refs(data), extract_blob_refs(data)) + + @given(data=st.binary(max_size=4096)) + @settings(max_examples=80, deadline=None) + def test_classify_blob_data_never_raises(self, data: bytes) -> None: + try: + msg, refs = classify_blob_data(data) + except Exception as exc: + self.fail(f"unexpected {type(exc).__name__}: {exc}") + if msg is not None: + self.assertIsInstance(msg, dict) + self.assertEqual(refs, []) + else: + self.assertIsInstance(refs, list) + + @given( + root_id=_BLOB_ID_HEX, + extra_ids=st.lists(_BLOB_ID_HEX, max_size=6, unique=True), + payloads=st.lists(st.binary(max_size=1024), min_size=1, max_size=8), + ) + @settings( + max_examples=40, + deadline=None, + suppress_health_check=[HealthCheck.too_slow], + ) + def test_traverse_blobs_never_raises( + self, root_id: str, extra_ids: list[str], payloads: list[bytes] + ) -> None: + # CliSessionMeta only requires latestRootBlobId (str); BFS runs after meta parse. + meta = {"latestRootBlobId": root_id, "createdAt": 1_700_000_000_000} + blobs: dict[str, bytes] = {root_id: payloads[0]} + for i, bid in enumerate(extra_ids): + if bid not in blobs: + blobs[bid] = payloads[(i + 1) % len(payloads)] + with tempfile.TemporaryDirectory() as td: + db_path = os.path.join(td, "store.db") + _build_store_db_raw(db_path, meta, blobs) + try: + messages = traverse_blobs(db_path) + except Exception as exc: + self.fail(f"traverse_blobs raised {type(exc).__name__}: {exc}") + self.assertIsInstance(messages, list) + + @given(meta_row=_FUZZ_META_ROW) + @settings( + max_examples=30, + deadline=None, + suppress_health_check=[HealthCheck.too_slow], + ) + def test_traverse_blobs_meta_parse_never_raises(self, meta_row: str | None) -> None: + """Covers meta decode / CliSessionMeta.from_dict failure → return [] (no crash).""" + with tempfile.TemporaryDirectory() as td: + db_path = os.path.join(td, "store.db") + _build_store_db_meta_row(db_path, meta_row, {}) + try: + messages = traverse_blobs(db_path) + except Exception as exc: + self.fail(f"traverse_blobs raised {type(exc).__name__}: {exc}") + self.assertIsInstance(messages, list) + + +class TestTextExtractionFuzz(unittest.TestCase): + @given(bubble=_BUBBLE_LIKE) + @settings(max_examples=100, deadline=None) + def test_extract_text_from_bubble_never_raises(self, bubble: dict) -> None: + try: + text = extract_text_from_bubble(bubble) + except Exception as exc: + self.fail(f"unexpected {type(exc).__name__}: {exc}") + self.assertIsInstance(text, str) + + @given(bubble=_BUBBLE_LIKE) + @settings(max_examples=80, deadline=None) + def test_extract_text_is_idempotent(self, bubble: dict) -> None: + self.assertEqual( + extract_text_from_bubble(bubble), + extract_text_from_bubble(bubble), + ) + + @given( + messages=st.lists(_cli_message(), max_size=12), + created_at=st.integers(min_value=0, max_value=2_000_000_000_000), + ) + @settings(max_examples=80, deadline=None) + def test_messages_to_bubbles_then_extract_never_raises( + self, messages: list[dict], created_at: int + ) -> None: + try: + bubbles = messages_to_bubbles(messages, created_at) + except Exception as exc: + self.fail(f"messages_to_bubbles raised {type(exc).__name__}: {exc}") + self.assertIsInstance(bubbles, list) + for bubble in bubbles: + try: + text = extract_text_from_bubble(bubble) + except Exception as exc: + self.fail(f"extract_text_from_bubble raised {type(exc).__name__}: {exc}") + self.assertIsInstance(text, str) + + @given( + messages=st.lists(_cli_message(), max_size=12), + created_at=st.integers(min_value=0, max_value=2_000_000_000_000), + ) + @settings(max_examples=80, deadline=None) + def test_messages_to_bubbles_is_idempotent( + self, messages: list[dict], created_at: int + ) -> None: + self.assertEqual( + messages_to_bubbles(messages, created_at), + messages_to_bubbles(messages, created_at), + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_cli_chat_reader.py b/tests/test_cli_chat_reader.py index afc182c..ce07d42 100644 --- a/tests/test_cli_chat_reader.py +++ b/tests/test_cli_chat_reader.py @@ -20,7 +20,7 @@ from utils.cli_chat_reader import ( _content_to_text, - _extract_blob_refs, + extract_blob_refs, _extract_tool_calls, _strip_user_info, aggregate_session_stats, @@ -75,34 +75,34 @@ def _build_store_db(path: str, meta: dict, json_blobs: dict[str, dict], chain: d # --------------------------------------------------------------------------- -# _extract_blob_refs +# extract_blob_refs # --------------------------------------------------------------------------- class TestExtractBlobRefs(unittest.TestCase): def test_empty_bytes_returns_empty(self): - self.assertEqual(_extract_blob_refs(b""), []) + self.assertEqual(extract_blob_refs(b""), []) def test_single_ref(self): ref = "a" * 64 # 32 bytes as hex raw = b"\x0a\x20" + bytes.fromhex(ref) - self.assertEqual(_extract_blob_refs(raw), [ref]) + self.assertEqual(extract_blob_refs(raw), [ref]) def test_two_refs(self): ref1 = "a" * 64 ref2 = "b" * 64 raw = b"\x0a\x20" + bytes.fromhex(ref1) + b"\x0a\x20" + bytes.fromhex(ref2) - self.assertEqual(_extract_blob_refs(raw), [ref1, ref2]) + self.assertEqual(extract_blob_refs(raw), [ref1, ref2]) def test_noise_bytes_ignored(self): ref = "c" * 64 noise = b"\x00\xff\x01\x02\x03\x04" raw = noise + b"\x0a\x20" + bytes.fromhex(ref) + b"\xde\xad" - self.assertIn(ref, _extract_blob_refs(raw)) + self.assertIn(ref, extract_blob_refs(raw)) def test_partial_tag_at_end_ignored(self): # Only 0x0a without 0x20 immediately following should not produce a ref. raw = b"\x0a" + b"\x00" * 32 - self.assertEqual(_extract_blob_refs(raw), []) + self.assertEqual(extract_blob_refs(raw), []) # --------------------------------------------------------------------------- diff --git a/tests/test_models.py b/tests/test_models.py index a15a68e..04a8b84 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -17,7 +17,7 @@ Workspace, WorkspaceLocalComposer, ) -from utils.cli_chat_reader import _extract_blob_refs +from utils.cli_chat_reader import extract_blob_refs GOOD_COMPOSER_RAW: dict = { @@ -252,7 +252,7 @@ def test_meta_parses_then_blob_chain_extracts_refs(self) -> None: self.assertEqual(meta.latest_root_blob_id, ref1) chain_blob = _make_blob_chain(ref1, ref2, ref3) - refs = _extract_blob_refs(chain_blob) + refs = extract_blob_refs(chain_blob) self.assertEqual(refs, [ref1, ref2, ref3]) def test_blob_chain_skips_non_marker_bytes(self) -> None: @@ -261,10 +261,10 @@ def test_blob_chain_skips_non_marker_bytes(self) -> None: garbage_after = b"\xff\xfe" raw = garbage_before + bytes([0x0A, 0x20]) + bytes.fromhex(ref) + garbage_after - self.assertEqual(_extract_blob_refs(raw), [ref]) + self.assertEqual(extract_blob_refs(raw), [ref]) def test_blob_chain_empty_returns_empty_list(self) -> None: - self.assertEqual(_extract_blob_refs(b""), []) + self.assertEqual(extract_blob_refs(b""), []) if __name__ == "__main__": diff --git a/utils/cli_chat_reader.py b/utils/cli_chat_reader.py index 14dbd0c..5c744c4 100644 --- a/utils/cli_chat_reader.py +++ b/utils/cli_chat_reader.py @@ -62,7 +62,7 @@ def _read_meta(db_path: str) -> dict: return {} -def _extract_blob_refs(data: bytes) -> list[str]: +def extract_blob_refs(data: bytes) -> list[str]: """Extract all 32-byte (SHA-256) blob references from a binary chain node. The encoding is: tag ``0x0a`` (field 1, length-delimited) followed by @@ -79,6 +79,23 @@ def _extract_blob_refs(data: bytes) -> list[str]: return refs +def classify_blob_data(data: bytes) -> tuple[dict | None, list[str]]: + """Classify a blob payload as a JSON message or a binary chain node. + + Returns ``(message_dict, [])`` when *data* decodes to a dict with a + ``role`` field; otherwise ``(None, refs)`` where *refs* are SHA-256 hex + ids from :func:`extract_blob_refs`. Used by :func:`traverse_blobs` and + property tests — keep in sync when the load loop changes. + """ + try: + msg = json.loads(data.decode("utf-8")) + if isinstance(msg, dict) and "role" in msg: + return msg, [] + except (UnicodeDecodeError, json.JSONDecodeError): + pass + return None, extract_blob_refs(data) + + def traverse_blobs(db_path: str) -> list[dict]: """Reconstruct the conversation from a ``store.db`` blob graph. @@ -118,15 +135,11 @@ def traverse_blobs(db_path: str) -> list[dict]: for blob_id, data in conn.execute("SELECT id, data FROM blobs"): if not isinstance(data, bytes): continue - try: - msg = json.loads(data.decode("utf-8")) - if isinstance(msg, dict) and "role" in msg: - json_blobs[blob_id] = msg - continue - except (UnicodeDecodeError, json.JSONDecodeError): - pass - refs = _extract_blob_refs(data) - chain_blobs[blob_id] = refs + msg, refs = classify_blob_data(data) + if msg is not None: + json_blobs[blob_id] = msg + else: + chain_blobs[blob_id] = refs # BFS from root (newest-first by nature of the linked-list structure); # reverse at the end to restore chronological (oldest→newest) order. diff --git a/utils/text_extract.py b/utils/text_extract.py index d0b179c..644ec10 100644 --- a/utils/text_extract.py +++ b/utils/text_extract.py @@ -28,9 +28,9 @@ def extract_text_from_bubble(bubble: dict) -> str: text = "" - # Try text field first + # Try text field first (coerce non-str values — Cursor payloads can drift) if bubble.get("text") and str(bubble["text"]).strip(): - text = bubble["text"] + text = str(bubble["text"]) # Fall back to richText if not text and bubble.get("richText"): @@ -49,7 +49,7 @@ def extract_text_from_bubble(bubble: dict) -> str: lang = cb.get("language", "") text += f"\n\n```{lang}\n{cb['content']}\n```" - return text + return text if isinstance(text, str) else "" def slug(s: str) -> str: