diff --git a/bridge/audit_log.py b/bridge/audit_log.py new file mode 100644 index 0000000..730ae48 --- /dev/null +++ b/bridge/audit_log.py @@ -0,0 +1,101 @@ +"""Audit log — structured, human-readable per-event log on disk. + +PRD §BR-9: "All events and turn outputs written to a structured, +human-readable audit log." + +Format: JSONL — one JSON object per line, append-only, easy to grep. +Each entry has a stable `kind` plus event-specific fields: + + {"kind": "state_change", "ts": "...", "session_id": "...", + "field": "momentum", "before": 5, "after": 0, "note": "..."} + {"kind": "turn_complete", "ts": "...", "turn": 12, + "rewards": {...}, "per_city": {...}} + +The audit log is independent of the save exchange — saves are +checkpointed snapshots; the audit log is the running history. They +serve different audiences (mod readers vs. forensic debugging). +""" + +from __future__ import annotations + +import json +import os +import threading +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, Optional + + +DEFAULT_AUDIT_FILENAME = "audit.jsonl" + + +class AuditLog: + """Append-only JSONL audit log.""" + + def __init__(self, path: str | Path): + p = Path(path) + if p.suffix == "": + p.mkdir(parents=True, exist_ok=True) + p = p / DEFAULT_AUDIT_FILENAME + else: + p.parent.mkdir(parents=True, exist_ok=True) + self.path: Path = p + self._lock = threading.Lock() + + def log(self, kind: str, **fields: Any) -> Dict[str, Any]: + """Append one entry. Returns the entry dict for caller convenience.""" + entry: Dict[str, Any] = { + "kind": kind, + "ts": datetime.now().astimezone().isoformat(), + } + # Sanitize unserializable values rather than crashing the daemon. + for k, v in fields.items(): + try: + json.dumps(v) + entry[k] = v + except (TypeError, ValueError): + entry[k] = repr(v) + + line = json.dumps(entry) + "\n" + with self._lock: + with open(self.path, "a", encoding="utf-8") as f: + f.write(line) + return entry + + def log_state_change(self, change: Any) -> Dict[str, Any]: + """Convenience for HookListener.StateChange records.""" + return self.log( + "state_change", + session_id=change.session_id, + event_type=change.event_type, + field=change.field, + before=change.before, + after=change.after, + note=change.note, + ) + + def log_turn_complete(self, turn: int, rewards: Dict[str, int], + per_city: Optional[Dict[str, Dict[str, int]]] = None) -> Dict[str, Any]: + return self.log("turn_complete", turn=turn, rewards=rewards, + per_city=per_city or {}) + + def log_session_added(self, session_id: str, name: str, city_id: str = "") -> Dict[str, Any]: + return self.log("session_added", session_id=session_id, name=name, city_id=city_id) + + def read_lines(self, limit: Optional[int] = None) -> list[Dict[str, Any]]: + """Read entries back (for inspection / tests). Skips malformed lines.""" + if not self.path.exists(): + return [] + out: list[Dict[str, Any]] = [] + with open(self.path, "r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if not line: + continue + try: + out.append(json.loads(line)) + except json.JSONDecodeError: + continue + if limit is not None: + return out[-limit:] + return out diff --git a/bridge/bridge_loop.py b/bridge/bridge_loop.py index 96cefb5..c2eb21a 100644 --- a/bridge/bridge_loop.py +++ b/bridge/bridge_loop.py @@ -19,6 +19,7 @@ from pathlib import Path from typing import Any, Callable, Dict, List, Optional, TextIO +from bridge.audit_log import AuditLog from bridge.hook_listener import HookListener from bridge.save_exchange import SaveExchange from bridge.scoring import ScoringEngine @@ -39,15 +40,25 @@ def __init__( scoring_engine: ScoringEngine, hook_listener: Optional[HookListener] = None, save_exchange: Optional[SaveExchange] = None, + audit_log: Optional[AuditLog] = None, turn_interval_sec: float = 1.0, ): self.state = state self.scoring = scoring_engine self.hook_listener = hook_listener self.save_exchange = save_exchange + self.audit_log = audit_log self.turn_interval_sec = turn_interval_sec self.callbacks: Dict[str, Callable] = {} self.running = False + # Wire the listener's on_change to the audit log if both are present. + if self.hook_listener is not None and self.audit_log is not None: + existing_callback = self.hook_listener.on_change + def _audit_then_existing(change): + self.audit_log.log_state_change(change) + if existing_callback is not None: + existing_callback(change) + self.hook_listener.on_change = _audit_then_existing # ────────────────────────────────────────────────────────────────── # Lifecycle @@ -107,6 +118,9 @@ async def process_turn(self) -> Optional[Dict[str, Any]]: if self.save_exchange is not None: self.save_exchange.write_payload(self.state) + if self.audit_log is not None: + self.audit_log.log_turn_complete(self.state.current_turn, rewards, per_city) + self._invoke_callbacks("turn_complete", rewards) if per_city: self._invoke_callbacks("per_city_rewards", per_city) diff --git a/bridge/daemon.py b/bridge/daemon.py index 4b9c7e6..91159d2 100644 --- a/bridge/daemon.py +++ b/bridge/daemon.py @@ -17,6 +17,7 @@ import sys from pathlib import Path +from bridge.audit_log import AuditLog from bridge.bridge_loop import BridgeLoop from bridge.hook_listener import HookListener from bridge.save_exchange import SaveExchange @@ -38,17 +39,21 @@ def parse_args(argv: list[str] | None = None) -> argparse.Namespace: help="Seconds after Stop before WAITING (default: 60)") parser.add_argument("--no-hooks", action="store_true", help="Run the turn loop only — don't read hook events") + parser.add_argument("--audit-dir", type=Path, default=Path.cwd() / "logs", + help="Directory for audit.jsonl (default: ./logs)") return parser.parse_args(argv) def build_components(args: argparse.Namespace): save_exchange = SaveExchange(args.saves_dir) + audit_log = AuditLog(args.audit_dir) state = save_exchange.read_payload() or SessionState() scoring = ScoringEngine(SimpleScoringStrategy()) listener = HookListener(state=state, idle_threshold_sec=args.idle_threshold) loop = BridgeLoop( state=state, scoring_engine=scoring, hook_listener=listener, - save_exchange=save_exchange, turn_interval_sec=args.turn_interval, + save_exchange=save_exchange, audit_log=audit_log, + turn_interval_sec=args.turn_interval, ) def _on_turn(rewards): diff --git a/bridge/tests/test_audit_log.py b/bridge/tests/test_audit_log.py new file mode 100644 index 0000000..18c894b --- /dev/null +++ b/bridge/tests/test_audit_log.py @@ -0,0 +1,155 @@ +"""Tests for the audit log (PRD §BR-9).""" + +import json +from datetime import datetime + +import pytest + +from bridge.audit_log import AuditLog, DEFAULT_AUDIT_FILENAME +from bridge.bridge_loop import BridgeLoop +from bridge.hook_listener import HookListener, StateChange +from bridge.save_exchange import SaveExchange +from bridge.scoring import ScoringEngine, SimpleScoringStrategy +from bridge.session_state import SessionState, SessionStatus + + +def test_audit_log_directory_path_appends_default_filename(tmp_path): + log = AuditLog(tmp_path / "logs") + assert log.path.name == DEFAULT_AUDIT_FILENAME + assert log.path.parent == tmp_path / "logs" + + +def test_audit_log_explicit_filename_respected(tmp_path): + custom = tmp_path / "logs" / "session.jsonl" + log = AuditLog(custom) + assert log.path == custom + + +def test_log_writes_jsonl_with_timestamp(tmp_path): + log = AuditLog(tmp_path / "logs") + log.log("test", foo="bar") + lines = log.path.read_text(encoding="utf-8").splitlines() + assert len(lines) == 1 + entry = json.loads(lines[0]) + assert entry["kind"] == "test" + assert entry["foo"] == "bar" + assert "ts" in entry # ISO-8601 timestamp + + +def test_log_appends_not_overwrites(tmp_path): + log = AuditLog(tmp_path / "logs") + log.log("a", n=1) + log.log("b", n=2) + lines = log.path.read_text(encoding="utf-8").splitlines() + assert len(lines) == 2 + + +def test_log_state_change_records_all_fields(tmp_path): + log = AuditLog(tmp_path / "logs") + change = StateChange( + event_type="goal_completed", session_id="s1", + field="momentum", before=3, after=4, note="task_completed", + ) + log.log_state_change(change) + entries = log.read_lines() + assert len(entries) == 1 + e = entries[0] + assert e["kind"] == "state_change" + assert e["session_id"] == "s1" + assert e["event_type"] == "goal_completed" + assert e["field"] == "momentum" + assert e["before"] == 3 + assert e["after"] == 4 + assert e["note"] == "task_completed" + + +def test_log_turn_complete(tmp_path): + log = AuditLog(tmp_path / "logs") + log.log_turn_complete(turn=5, rewards={"gold": 100}, + per_city={"camelot": {"gold": 100}}) + entries = log.read_lines() + assert entries[0]["kind"] == "turn_complete" + assert entries[0]["turn"] == 5 + assert entries[0]["rewards"]["gold"] == 100 + assert entries[0]["per_city"]["camelot"]["gold"] == 100 + + +def test_read_lines_with_limit(tmp_path): + log = AuditLog(tmp_path / "logs") + for i in range(20): + log.log("entry", n=i) + last_5 = log.read_lines(limit=5) + assert len(last_5) == 5 + # Last entry should be n=19 + assert last_5[-1]["n"] == 19 + + +def test_read_lines_skips_malformed(tmp_path): + log = AuditLog(tmp_path / "logs") + log.log("good", n=1) + # Manually append a malformed line. + with open(log.path, "a", encoding="utf-8") as f: + f.write("not json\n") + log.log("good", n=2) + entries = log.read_lines() + # Only the two valid entries. + assert len(entries) == 2 + assert [e["n"] for e in entries] == [1, 2] + + +def test_unserializable_value_is_repr_safe(tmp_path): + log = AuditLog(tmp_path / "logs") + + class NotJsonable: + def __repr__(self): + return "" + + log.log("test", obj=NotJsonable()) + entry = log.read_lines()[0] + assert entry["obj"] == "" + + +# ──────────────────────────────────────────────────────────────────────── +# BridgeLoop integration +# ──────────────────────────────────────────────────────────────────────── + +@pytest.mark.asyncio +async def test_bridge_loop_logs_state_changes_and_turns(tmp_path): + state = SessionState(kingdom_name="Camelot") + s = state.add_session("Royal Court") + s.session_id = "sess-1" + s.status = SessionStatus.ACTIVE + + listener = HookListener(state=state) + audit = AuditLog(tmp_path / "logs") + save = SaveExchange(tmp_path / "saves") + loop = BridgeLoop( + state=state, + scoring_engine=ScoringEngine(SimpleScoringStrategy()), + hook_listener=listener, + save_exchange=save, + audit_log=audit, + ) + # Inject a hook event — listener should call audit via on_change. + loop.feed_event({"event_type": "TaskCompleted", + "payload": {"session_id": "sess-1"}}) + # Then advance a turn. + await loop.process_turn() + + entries = audit.read_lines() + kinds = [e["kind"] for e in entries] + assert "state_change" in kinds # at least one state change from TaskCompleted + assert "turn_complete" in kinds # the turn completion entry + turn_entry = next(e for e in entries if e["kind"] == "turn_complete") + assert turn_entry["turn"] == 1 + + +@pytest.mark.asyncio +async def test_bridge_loop_audit_log_optional(tmp_path): + """Loop without audit log should still work — backward compat.""" + state = SessionState() + s = state.add_session("X") + s.status = SessionStatus.ACTIVE + loop = BridgeLoop(state=state, scoring_engine=ScoringEngine(SimpleScoringStrategy())) + rewards = await loop.process_turn() + assert rewards is not None