Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 76 additions & 7 deletions claude_code_log/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,46 @@ def get_library_version() -> str:
# ========== Cache Path Configuration ==========


def subagents_fingerprint(jsonl_path: Path) -> str:
"""Fingerprint of the sidecar inputs that feed spawn discovery (#213).

Parsing a transcript also reads ``agent-*.meta.json`` sidecars (see
``converter._subagent_meta_map``), so they must take part in cache
invalidation: a sidecar appearing after the transcript was cached
(e.g. a still-running agent spawning a child without touching the
trunk file) would otherwise go unnoticed by the mtime-only check.

The fingerprint is ``"<count>:<newest int mtime>"`` over the sidecars
of the dirs where this transcript's children can live — the sibling
``<stem>/subagents/`` dir, plus the containing dir for agent files
(the flat layout puts children next to their parent). Sidecars are
written once at spawn time, so count+newest-mtime captures every
addition and removal. Empty string when there are none.

Deliberately narrower than ``converter._subagent_meta_map``'s scan
for TRUNK files: the parse also defensively checks the project dir
itself, but Claude Code never writes sidecars there, and
fingerprinting it would rescan a potentially large dir on every
cache read for a theoretical-only input.
"""
candidates = [jsonl_path.parent / jsonl_path.stem / "subagents"]
if jsonl_path.parent.name == "subagents":
candidates.append(jsonl_path.parent)
metas: list[Path] = []
for directory in candidates:
if directory.is_dir():
metas.extend(directory.glob("agent-*.meta.json"))
if not metas:
return ""
try:
newest = max(int(p.stat().st_mtime) for p in metas)
except OSError:
# A sidecar vanished mid-scan; treat as unstable so the next
# check re-reads (an always-mismatching fingerprint is safe).
return f"{len(metas)}:unstable"
return f"{len(metas)}:{newest}"


def get_cache_db_path(projects_dir: Path) -> Path:
"""Get cache database path, respecting CLAUDE_CODE_LOG_CACHE_PATH env var.

Expand Down Expand Up @@ -471,7 +511,8 @@ def is_file_cached(self, jsonl_path: Path) -> bool:

with self._get_connection() as conn:
row = conn.execute(
"SELECT source_mtime FROM cached_files WHERE project_id = ? AND file_name = ?",
"SELECT source_mtime, subagents_fingerprint FROM cached_files"
" WHERE project_id = ? AND file_name = ?",
(self._project_id, jsonl_path.name),
).fetchone()

Expand All @@ -482,7 +523,19 @@ def is_file_cached(self, jsonl_path: Path) -> bool:
cached_mtime = row["source_mtime"]

# Cache is valid if modification times match (within 1 second tolerance)
return abs(source_mtime - cached_mtime) < 1.0
if abs(source_mtime - cached_mtime) >= 1.0:
return False

# The sidecar inputs of spawn discovery (#213) must match too —
# new agent-*.meta.json files appear without touching the source
# jsonl. Pre-007 rows carry NULL: accept those only when the file
# has no sidecars today (nothing to miss), so legacy caches don't
# mass-invalidate while sessions WITH sidecars reparse once.
cached_fp = row["subagents_fingerprint"]
current_fp = subagents_fingerprint(jsonl_path)
if cached_fp is None:
return current_fp == ""
return cached_fp == current_fp

def load_cached_entries(self, jsonl_path: Path) -> Optional[List[TranscriptEntry]]:
"""Load cached transcript entries for a JSONL file."""
Expand Down Expand Up @@ -564,28 +617,43 @@ def load_cached_entries_filtered(
return [self._deserialize_entry(row) for row in rows]

def save_cached_entries(
self, jsonl_path: Path, entries: List[TranscriptEntry]
self,
jsonl_path: Path,
entries: List[TranscriptEntry],
subagents_fp: Optional[str] = None,
) -> None:
"""Save parsed transcript entries to cache."""
"""Save parsed transcript entries to cache.

``subagents_fp`` is the sidecar fingerprint AS OF THE PARSE —
callers that scanned sidecars should pass the value captured
before the scan, so a sidecar landing mid-parse mismatches on the
next read and forces a reparse (over-invalidation; computing it
here at save time would instead validate a parse that never saw
the late sidecar). Falls back to computing now when omitted.
"""
if self._project_id is None:
return

source_mtime = jsonl_path.stat().st_mtime
cached_mtime = datetime.now().timestamp()
if subagents_fp is None:
subagents_fp = subagents_fingerprint(jsonl_path)

with self._get_connection() as conn:
# Insert or update file record
# Use ON CONFLICT to preserve file ID and avoid cascade deletes on messages
conn.execute(
"""
INSERT INTO cached_files
(project_id, file_name, file_path, source_mtime, cached_mtime, message_count)
VALUES (?, ?, ?, ?, ?, ?)
(project_id, file_name, file_path, source_mtime, cached_mtime,
message_count, subagents_fingerprint)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(project_id, file_name) DO UPDATE SET
file_path = excluded.file_path,
source_mtime = excluded.source_mtime,
cached_mtime = excluded.cached_mtime,
message_count = excluded.message_count
message_count = excluded.message_count,
subagents_fingerprint = excluded.subagents_fingerprint
""",
(
self._project_id,
Expand All @@ -594,6 +662,7 @@ def save_cached_entries(
source_mtime,
cached_mtime,
len(entries),
subagents_fp,
),
)

Expand Down
171 changes: 160 additions & 11 deletions claude_code_log/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,15 +229,21 @@ def load_transcript(
to_date: Optional[str] = None,
silent: bool = False,
_loaded_files: Optional[set[Path]] = None,
_meta_maps: Optional[dict[Path, dict[str, str]]] = None,
) -> list[TranscriptEntry]:
"""Load and parse JSONL transcript file, using cache if available.

Args:
_loaded_files: Internal parameter to track loaded files and prevent infinite recursion.
_meta_maps: Internal per-load memo of ``{dir: {toolUseId: agentId}}``
sidecar maps, so one flat ``subagents/`` family is scanned once
per top-level load instead of once per recursively loaded file.
"""
# Initialize loaded files set on first call
if _loaded_files is None:
_loaded_files = set()
if _meta_maps is None:
_meta_maps = {}

# Prevent infinite recursion by checking if this file is already being loaded
if jsonl_path in _loaded_files:
Expand All @@ -259,7 +265,14 @@ def load_transcript(
print(f"Loading {jsonl_path} from cache...")
return cached_entries

# Parse from source file
# Parse from source file. Capture the sidecar fingerprint FIRST — it
# must describe the world as-of-the-parse (or older), so a sidecar
# landing mid-parse mismatches on the next cache read and forces a
# reparse, instead of being fingerprinted-as-covered without having
# been scanned (review advisory on PR #218).
from .cache import subagents_fingerprint

subagents_fp = subagents_fingerprint(jsonl_path)
messages: list[TranscriptEntry] = []
agent_ids: set[str] = set() # Collect agentId references while parsing
# Track unrecognized message types already warned about so we emit at
Expand Down Expand Up @@ -383,6 +396,16 @@ def load_transcript(
f"\n{traceback.format_exc()}"
)

# Sidecar-driven spawn linking (issue #213): resolve each spawning
# tool_use to its sub-agent via the agent-<id>.meta.json files. This is
# what makes NESTED spawns discoverable — a sub-agent's own spawn
# tool_results carry no ``toolUseResult.agentId`` (trunk-only
# enrichment), and an interrupted spawn has no usable tool_result at
# all; the sidecar's ``toolUseId`` covers both.
_apply_subagent_meta_links(
messages, _subagent_meta_map(jsonl_path, _meta_maps), agent_ids, jsonl_path
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.

# Prompt-hash fallback: link Task tool_results that lack a structured
# agentId (common for true teammate subagents) by matching the
# tool_use's prompt input against the <teammate-message
Expand Down Expand Up @@ -425,33 +448,149 @@ def load_transcript(
to_date,
silent=True,
_loaded_files=_loaded_files,
_meta_maps=_meta_maps,
)
agent_messages_map[agent_id] = agent_messages

# Insert agent messages at their point of use (only once per agent)
if agent_messages_map:
# Iterate through messages and insert agent messages after the FIRST message
# that references them (via UserTranscriptEntry.agentId)
# Iterate through messages and insert agent messages after the FIRST
# message that references them. References come in two flavors:
# ``spawnedAgentId`` (sidecar-linked spawn entries — the only flavor
# available inside agent transcripts, where ``agentId`` means
# membership) and bare ``agentId`` on non-sidechain entries (the
# legacy trunk backpatch from ``toolUseResult``). Because a nested
# agent's spawn entry sits inside its parent agent's (recursively
# loaded) message block, this single pass nests sub-sub-agents at
# the right flat-order position too.
result_messages: list[TranscriptEntry] = []
for message in messages:
result_messages.append(message)

# Check if this is a UserTranscriptEntry with agentId
if isinstance(message, UserTranscriptEntry) and message.agentId:
agent_id = message.agentId
if agent_id in agent_messages_map:
# Insert agent messages right after this message (pop to insert only once)
result_messages.extend(agent_messages_map.pop(agent_id))
if not isinstance(message, (UserTranscriptEntry, AssistantTranscriptEntry)):
continue
agent_id = message.spawnedAgentId or (
message.agentId if not message.isSidechain else None
)
if agent_id and agent_id in agent_messages_map:
# Insert agent messages right after this message (pop to insert only once)
result_messages.extend(agent_messages_map.pop(agent_id))

messages = result_messages

# Save to cache if cache manager is available
if cache_manager is not None:
cache_manager.save_cached_entries(jsonl_path, messages)
cache_manager.save_cached_entries(
jsonl_path, messages, subagents_fp=subagents_fp
)

return messages


def _subagent_meta_map(
jsonl_path: Path, memo: dict[Path, dict[str, str]]
) -> dict[str, str]:
"""``{spawning toolUseId: agentId}`` from ``agent-<id>.meta.json`` sidecars.

Claude Code (2.1.172+) writes every sub-agent transcript flat into the
trunk session's ``<sid>/subagents/`` dir — at ANY nesting depth — with a
sidecar ``agent-<id>.meta.json`` carrying the spawning ``toolUseId``.
The map covers the whole flat family; callers match it against the
tool_use ids actually present in their own entries.

Children of *this* transcript can live in two places: the sibling
``<stem>/subagents/`` dir (trunk file) or the transcript's own
containing dir (agent file — the flat layout puts children next to
their parent). Scans both, memoized per directory in ``memo`` (scoped
to one top-level load, so there's no cross-load staleness).
"""
result: dict[str, str] = {}
candidates = [jsonl_path.parent / jsonl_path.stem / "subagents", jsonl_path.parent]
for directory in candidates:
if directory in memo:
result.update(memo[directory])
continue
dir_map: dict[str, str] = {}
if directory.is_dir():
for meta_path in directory.glob("agent-*.meta.json"):
agent_id = meta_path.name[len("agent-") : -len(".meta.json")]
if not agent_id:
continue
try:
raw: Any = json.loads(meta_path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
continue
if not isinstance(raw, dict):
continue
tool_use_id = cast("dict[str, Any]", raw).get("toolUseId")
if isinstance(tool_use_id, str) and tool_use_id:
dir_map[tool_use_id] = agent_id
memo[directory] = dir_map
result.update(dir_map)
return result


def _apply_subagent_meta_links(
messages: list[TranscriptEntry],
meta_map: dict[str, str],
agent_ids: set[str],
jsonl_path: Path,
) -> None:
"""Stamp ``spawnedAgentId`` on spawn entries resolved via sidecar files.

For every sidecar ``toolUseId`` whose tool_use appears in *messages*,
the spawned agent id lands on the spawning tool_result entry (or the
tool_use's assistant entry when no result exists — interrupted or
still-running spawns) and joins *agent_ids* so the file loader picks
the transcript up. On non-sidechain (trunk) entries the legacy
``agentId`` reference is backpatched too, so everything keyed on it
(relocation anchors, task metadata) keeps working; sidechain entries
keep ``agentId`` as pure membership.
"""
if not meta_map:
return
own_agent_id = (
jsonl_path.stem[len("agent-") :]
if jsonl_path.stem.startswith("agent-")
else None
)
use_entries: dict[str, BaseTranscriptEntry] = {}
result_entries: dict[str, BaseTranscriptEntry] = {}
for msg in messages:
if not isinstance(msg, (UserTranscriptEntry, AssistantTranscriptEntry)):
continue
for item in msg.message.content:
if isinstance(item, ToolUseContent) and item.id in meta_map:
use_entries.setdefault(item.id, msg)
elif isinstance(item, ToolResultContent) and item.tool_use_id in meta_map:
result_entries.setdefault(item.tool_use_id, msg)

for tool_use_id, spawned in sorted(meta_map.items()):
# Self-guard: an agent can't spawn itself; skip a sidecar that
# (through data corruption) would claim otherwise.
if spawned == own_agent_id:
continue
anchor = result_entries.get(tool_use_id) or use_entries.get(tool_use_id)
if anchor is None:
# Sidecar belongs to another transcript of the flat family.
continue
if anchor.spawnedAgentId and anchor.spawnedAgentId != spawned:
# One anchor entry, two spawns: only reachable when a single
# entry carries several spawn tool_uses AND more than one of
# them lacks a tool_result (results anchor 1:1 on their own
# entries). Claude Code streams one content block per
# assistant entry, so this doesn't occur in real transcripts
# — but never silently overwrite: keep the first link and
# let the extra transcript load anyway (it relocates via the
# defensive tail-append instead of a spawn anchor).
agent_ids.add(spawned)
continue
anchor.spawnedAgentId = spawned
if not anchor.isSidechain and not anchor.agentId:
anchor.agentId = spawned
agent_ids.add(spawned)


def _link_subagents_by_prompt_hash(
messages: list[TranscriptEntry],
jsonl_path: Path,
Expand Down Expand Up @@ -632,9 +771,17 @@ def _integrate_agent_entries(messages: list[TranscriptEntry]) -> None:

agent_anchors: dict[str, str] = {}
agent_anchors_from_sidechain: dict[str, str] = {}
spawn_anchors: dict[str, str] = {}
for msg in messages:
if not isinstance(msg, (BaseTranscriptEntry, PassthroughTranscriptEntry)):
continue
# Sidecar-resolved spawn links (issue #213) are the most precise
# anchors — they work at any nesting depth and need no boundary
# heuristics (an entry's own membership can never equal the agent
# it spawned).
spawned = getattr(msg, "spawnedAgentId", None)
if spawned and spawned != msg.agentId:
spawn_anchors.setdefault(spawned, msg.uuid)
if not msg.agentId:
continue
if msg.isSidechain:
Expand All @@ -648,9 +795,11 @@ def _integrate_agent_entries(messages: list[TranscriptEntry]) -> None:
agent_anchors_from_sidechain.setdefault(msg.agentId, msg.uuid)
else:
agent_anchors[msg.agentId] = msg.uuid
# Merge: non-sidechain anchors take priority
# Merge precedence: sidecar spawn links > non-sidechain agentId
# references > sidechain cross-boundary heuristics.
for agent_id, uuid in agent_anchors_from_sidechain.items():
agent_anchors.setdefault(agent_id, uuid)
agent_anchors.update(spawn_anchors)

if not agent_anchors:
return
Expand Down
1 change: 1 addition & 0 deletions claude_code_log/factories/meta_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def create_meta(transcript: BaseTranscriptEntry) -> MessageMeta:
is_meta=getattr(transcript, "isMeta", False) or False,
source_tool_use_id=getattr(transcript, "sourceToolUseID", None),
agent_id=transcript.agentId,
spawned_agent_id=transcript.spawnedAgentId,
cwd=transcript.cwd,
git_branch=transcript.gitBranch,
team_name=getattr(transcript, "teamName", None),
Expand Down
Loading
Loading