Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions bridge/audit_log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
"""Audit log — structured, human-readable per-event log on disk.
PRD §BR-9: "All events and turn outputs written to a structured,
human-readable audit log."
Format: JSONL — one JSON object per line, append-only, easy to grep.
Each entry has a stable `kind` plus event-specific fields:
{"kind": "state_change", "ts": "...", "session_id": "...",
"field": "momentum", "before": 5, "after": 0, "note": "..."}
{"kind": "turn_complete", "ts": "...", "turn": 12,
"rewards": {...}, "per_city": {...}}
The audit log is independent of the save exchange — saves are
checkpointed snapshots; the audit log is the running history. They
serve different audiences (mod readers vs. forensic debugging).
"""

from __future__ import annotations

import json
import os
import threading
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Optional


DEFAULT_AUDIT_FILENAME = "audit.jsonl"


class AuditLog:
"""Append-only JSONL audit log."""

def __init__(self, path: str | Path):
p = Path(path)
if p.suffix == "":
p.mkdir(parents=True, exist_ok=True)
p = p / DEFAULT_AUDIT_FILENAME
else:
p.parent.mkdir(parents=True, exist_ok=True)
self.path: Path = p
self._lock = threading.Lock()

def log(self, kind: str, **fields: Any) -> Dict[str, Any]:
"""Append one entry. Returns the entry dict for caller convenience."""
entry: Dict[str, Any] = {
"kind": kind,
"ts": datetime.now().astimezone().isoformat(),
}
# Sanitize unserializable values rather than crashing the daemon.
for k, v in fields.items():
try:
json.dumps(v)
entry[k] = v
except (TypeError, ValueError):
entry[k] = repr(v)

line = json.dumps(entry) + "\n"
Comment on lines +52 to +59
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current sanitization logic is inefficient because it calls json.dumps(v) for every field to check serializability, and then calls json.dumps(entry) again for the entire record. This results in redundant processing, especially for larger objects.

Using the default parameter in json.dumps is a more idiomatic and performant way to handle unserializable values. It also allows for a more 'structured' log by preserving the container (e.g., a list or dict) and only stringifying the specific unserializable leaf nodes, rather than replacing the entire top-level field with a repr string.

        entry.update(fields)
        # Use default=repr to handle unserializable values efficiently and maintain structure.
        line = json.dumps(entry, default=repr) + "\n"

with self._lock:
with open(self.path, "a", encoding="utf-8") as f:
f.write(line)
return entry

def log_state_change(self, change: Any) -> Dict[str, Any]:
"""Convenience for HookListener.StateChange records."""
return self.log(
"state_change",
session_id=change.session_id,
event_type=change.event_type,
field=change.field,
before=change.before,
after=change.after,
note=change.note,
)

def log_turn_complete(self, turn: int, rewards: Dict[str, int],
per_city: Optional[Dict[str, Dict[str, int]]] = None) -> Dict[str, Any]:
return self.log("turn_complete", turn=turn, rewards=rewards,
per_city=per_city or {})

def log_session_added(self, session_id: str, name: str, city_id: str = "") -> Dict[str, Any]:
return self.log("session_added", session_id=session_id, name=name, city_id=city_id)

def read_lines(self, limit: Optional[int] = None) -> list[Dict[str, Any]]:
"""Read entries back (for inspection / tests). Skips malformed lines."""
if not self.path.exists():
return []
out: list[Dict[str, Any]] = []
with open(self.path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
out.append(json.loads(line))
except json.JSONDecodeError:
continue
if limit is not None:
return out[-limit:]
return out
Comment on lines +89 to +101
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The read_lines method reads the entire audit log into memory before applying the limit. As the audit log is append-only and intended to grow over time, this will eventually lead to significant memory consumption and performance degradation.

Consider using collections.deque with a maxlen to efficiently keep only the last N lines in memory while iterating through the file. This also correctly handles the case where limit=0 (which currently returns the entire list due to Python's slice behavior [-0:]).

Suggested change
out: list[Dict[str, Any]] = []
with open(self.path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
out.append(json.loads(line))
except json.JSONDecodeError:
continue
if limit is not None:
return out[-limit:]
return out
from collections import deque
def _iter_entries():
with open(self.path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line: continue
try:
yield json.loads(line)
except json.JSONDecodeError:
continue
if limit is not None:
return list(deque(_iter_entries(), maxlen=limit))
return list(_iter_entries())

14 changes: 14 additions & 0 deletions bridge/bridge_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, TextIO

from bridge.audit_log import AuditLog
from bridge.hook_listener import HookListener
from bridge.save_exchange import SaveExchange
from bridge.scoring import ScoringEngine
Expand All @@ -39,15 +40,25 @@ def __init__(
scoring_engine: ScoringEngine,
hook_listener: Optional[HookListener] = None,
save_exchange: Optional[SaveExchange] = None,
audit_log: Optional[AuditLog] = None,
turn_interval_sec: float = 1.0,
):
self.state = state
self.scoring = scoring_engine
self.hook_listener = hook_listener
self.save_exchange = save_exchange
self.audit_log = audit_log
self.turn_interval_sec = turn_interval_sec
self.callbacks: Dict[str, Callable] = {}
self.running = False
# Wire the listener's on_change to the audit log if both are present.
if self.hook_listener is not None and self.audit_log is not None:
existing_callback = self.hook_listener.on_change
def _audit_then_existing(change):
self.audit_log.log_state_change(change)
if existing_callback is not None:
existing_callback(change)
self.hook_listener.on_change = _audit_then_existing
Comment on lines +55 to +61
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Wiring the audit log by wrapping hook_listener.on_change in the BridgeLoop constructor introduces a side effect on the hook_listener object. If the same HookListener instance is passed to multiple BridgeLoop instances (e.g., in tests or during a component reload), it will result in multiple layers of wrapping and duplicate audit log entries.

It would be safer to check if the listener is already wrapped or to modify HookListener to support multiple subscribers.


# ──────────────────────────────────────────────────────────────────
# Lifecycle
Expand Down Expand Up @@ -107,6 +118,9 @@ async def process_turn(self) -> Optional[Dict[str, Any]]:
if self.save_exchange is not None:
self.save_exchange.write_payload(self.state)

if self.audit_log is not None:
self.audit_log.log_turn_complete(self.state.current_turn, rewards, per_city)

self._invoke_callbacks("turn_complete", rewards)
if per_city:
self._invoke_callbacks("per_city_rewards", per_city)
Expand Down
7 changes: 6 additions & 1 deletion bridge/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import sys
from pathlib import Path

from bridge.audit_log import AuditLog
from bridge.bridge_loop import BridgeLoop
from bridge.hook_listener import HookListener
from bridge.save_exchange import SaveExchange
Expand All @@ -38,17 +39,21 @@ def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
help="Seconds after Stop before WAITING (default: 60)")
parser.add_argument("--no-hooks", action="store_true",
help="Run the turn loop only — don't read hook events")
parser.add_argument("--audit-dir", type=Path, default=Path.cwd() / "logs",
help="Directory for audit.jsonl (default: ./logs)")
return parser.parse_args(argv)


def build_components(args: argparse.Namespace):
save_exchange = SaveExchange(args.saves_dir)
audit_log = AuditLog(args.audit_dir)
state = save_exchange.read_payload() or SessionState()
scoring = ScoringEngine(SimpleScoringStrategy())
listener = HookListener(state=state, idle_threshold_sec=args.idle_threshold)
loop = BridgeLoop(
state=state, scoring_engine=scoring, hook_listener=listener,
save_exchange=save_exchange, turn_interval_sec=args.turn_interval,
save_exchange=save_exchange, audit_log=audit_log,
turn_interval_sec=args.turn_interval,
)

def _on_turn(rewards):
Expand Down
155 changes: 155 additions & 0 deletions bridge/tests/test_audit_log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
"""Tests for the audit log (PRD §BR-9)."""

import json
from datetime import datetime

import pytest

from bridge.audit_log import AuditLog, DEFAULT_AUDIT_FILENAME
from bridge.bridge_loop import BridgeLoop
from bridge.hook_listener import HookListener, StateChange
from bridge.save_exchange import SaveExchange
from bridge.scoring import ScoringEngine, SimpleScoringStrategy
from bridge.session_state import SessionState, SessionStatus


def test_audit_log_directory_path_appends_default_filename(tmp_path):
log = AuditLog(tmp_path / "logs")
assert log.path.name == DEFAULT_AUDIT_FILENAME
assert log.path.parent == tmp_path / "logs"


def test_audit_log_explicit_filename_respected(tmp_path):
custom = tmp_path / "logs" / "session.jsonl"
log = AuditLog(custom)
assert log.path == custom


def test_log_writes_jsonl_with_timestamp(tmp_path):
log = AuditLog(tmp_path / "logs")
log.log("test", foo="bar")
lines = log.path.read_text(encoding="utf-8").splitlines()
assert len(lines) == 1
entry = json.loads(lines[0])
assert entry["kind"] == "test"
assert entry["foo"] == "bar"
assert "ts" in entry # ISO-8601 timestamp


def test_log_appends_not_overwrites(tmp_path):
log = AuditLog(tmp_path / "logs")
log.log("a", n=1)
log.log("b", n=2)
lines = log.path.read_text(encoding="utf-8").splitlines()
assert len(lines) == 2


def test_log_state_change_records_all_fields(tmp_path):
log = AuditLog(tmp_path / "logs")
change = StateChange(
event_type="goal_completed", session_id="s1",
field="momentum", before=3, after=4, note="task_completed",
)
log.log_state_change(change)
entries = log.read_lines()
assert len(entries) == 1
e = entries[0]
assert e["kind"] == "state_change"
assert e["session_id"] == "s1"
assert e["event_type"] == "goal_completed"
assert e["field"] == "momentum"
assert e["before"] == 3
assert e["after"] == 4
assert e["note"] == "task_completed"


def test_log_turn_complete(tmp_path):
log = AuditLog(tmp_path / "logs")
log.log_turn_complete(turn=5, rewards={"gold": 100},
per_city={"camelot": {"gold": 100}})
entries = log.read_lines()
assert entries[0]["kind"] == "turn_complete"
assert entries[0]["turn"] == 5
assert entries[0]["rewards"]["gold"] == 100
assert entries[0]["per_city"]["camelot"]["gold"] == 100


def test_read_lines_with_limit(tmp_path):
log = AuditLog(tmp_path / "logs")
for i in range(20):
log.log("entry", n=i)
last_5 = log.read_lines(limit=5)
assert len(last_5) == 5
# Last entry should be n=19
assert last_5[-1]["n"] == 19


def test_read_lines_skips_malformed(tmp_path):
log = AuditLog(tmp_path / "logs")
log.log("good", n=1)
# Manually append a malformed line.
with open(log.path, "a", encoding="utf-8") as f:
f.write("not json\n")
log.log("good", n=2)
entries = log.read_lines()
# Only the two valid entries.
assert len(entries) == 2
assert [e["n"] for e in entries] == [1, 2]


def test_unserializable_value_is_repr_safe(tmp_path):
log = AuditLog(tmp_path / "logs")

class NotJsonable:
def __repr__(self):
return "<weird>"

log.log("test", obj=NotJsonable())
entry = log.read_lines()[0]
assert entry["obj"] == "<weird>"


# ────────────────────────────────────────────────────────────────────────
# BridgeLoop integration
# ────────────────────────────────────────────────────────────────────────

@pytest.mark.asyncio
async def test_bridge_loop_logs_state_changes_and_turns(tmp_path):
state = SessionState(kingdom_name="Camelot")
s = state.add_session("Royal Court")
s.session_id = "sess-1"
s.status = SessionStatus.ACTIVE

listener = HookListener(state=state)
audit = AuditLog(tmp_path / "logs")
save = SaveExchange(tmp_path / "saves")
loop = BridgeLoop(
state=state,
scoring_engine=ScoringEngine(SimpleScoringStrategy()),
hook_listener=listener,
save_exchange=save,
audit_log=audit,
)
# Inject a hook event — listener should call audit via on_change.
loop.feed_event({"event_type": "TaskCompleted",
"payload": {"session_id": "sess-1"}})
# Then advance a turn.
await loop.process_turn()

entries = audit.read_lines()
kinds = [e["kind"] for e in entries]
assert "state_change" in kinds # at least one state change from TaskCompleted
assert "turn_complete" in kinds # the turn completion entry
turn_entry = next(e for e in entries if e["kind"] == "turn_complete")
assert turn_entry["turn"] == 1


@pytest.mark.asyncio
async def test_bridge_loop_audit_log_optional(tmp_path):
"""Loop without audit log should still work — backward compat."""
state = SessionState()
s = state.add_session("X")
s.status = SessionStatus.ACTIVE
loop = BridgeLoop(state=state, scoring_engine=ScoringEngine(SimpleScoringStrategy()))
rewards = await loop.process_turn()
assert rewards is not None
Loading