diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..6131575 --- /dev/null +++ b/.gitattributes @@ -0,0 +1,7 @@ +* text=auto eol=lf +*.jsonl text eol=lf +*.py text eol=lf +*.md text eol=lf +*.toml text eol=lf +*.lock text eol=lf +justfile text eol=lf diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..7bbfbef --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,24 @@ +name: CI + +on: + push: + pull_request: + +jobs: + test: + name: test (${{ matrix.os }}, py${{ matrix.python-version }}) + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + os: [ubuntu-latest, macos-latest, windows-latest] + python-version: ["3.10", "3.13"] + steps: + - uses: actions/checkout@v4 + - uses: astral-sh/setup-uv@v3 + - uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + - uses: extractions/setup-just@v2 + - run: just install + - run: just check diff --git a/cchat/__init__.py b/cchat/__init__.py index aa01369..9d45c11 100644 --- a/cchat/__init__.py +++ b/cchat/__init__.py @@ -1 +1 @@ -"""Claude Code Chat Browser CLI.""" +"""Claude Code Chat Browser CLI.""" diff --git a/cchat/cli.py b/cchat/cli.py index ae17be8..5341971 100644 --- a/cchat/cli.py +++ b/cchat/cli.py @@ -1,68 +1,68 @@ -"""Argparse entry point for the cchat CLI tool.""" - -import argparse -import sys - -from cchat import formatters -from cchat.commands import agents_cmd, files_cmd, line_cmd, lines_cmd, list_cmd, search_cmd, serve_cmd, spending_cmd, tokens_cmd, view_cmd - - -class _FullHelpParser(argparse.ArgumentParser): - """ArgumentParser that prints full --help on errors instead of short usage.""" - - def error(self, message): - sys.stderr.write(f"error: {message}\n\n") - # If a subcommand was given, show its help instead of top-level - if self._subparsers is not None: - for action in self._subparsers._actions: - if isinstance(action, argparse._SubParsersAction): - for arg in sys.argv[1:]: - if arg in action.choices: - action.choices[arg].print_help(sys.stderr) # type: ignore[union-attr] - sys.exit(2) - self.print_help(sys.stderr) - sys.exit(2) - - -class _SubcommandHelpParser(argparse.ArgumentParser): - """Subcommand parser that prints its own full --help on errors.""" - - def error(self, message): - sys.stderr.write(f"error: {message}\n\n") - self.print_help(sys.stderr) - sys.exit(2) - - -def main(): - parser = _FullHelpParser(description="Claude Code Chat Browser") - parser.add_argument("--no-color", action="store_true", help="Disable colored output") - - subparsers = parser.add_subparsers(dest="command", parser_class=_SubcommandHelpParser) - subparsers.required = True - - list_cmd.register(subparsers) - view_cmd.register(subparsers) - line_cmd.register(subparsers) - lines_cmd.register(subparsers) - files_cmd.register(subparsers) - search_cmd.register(subparsers) - tokens_cmd.register(subparsers) - spending_cmd.register(subparsers) - agents_cmd.register(subparsers) - serve_cmd.register(subparsers) - - args = parser.parse_args() - - if args.no_color: - formatters.set_no_color(True) - - try: - args.func(args) - except KeyboardInterrupt: - sys.exit(0) - except BrokenPipeError: - sys.exit(0) - - -if __name__ == "__main__": - main() +"""Argparse entry point for the cchat CLI tool.""" + +import argparse +import sys + +from cchat import formatters +from cchat.commands import agents_cmd, files_cmd, line_cmd, lines_cmd, list_cmd, search_cmd, serve_cmd, spending_cmd, tokens_cmd, view_cmd + + +class _FullHelpParser(argparse.ArgumentParser): + """ArgumentParser that prints full --help on errors instead of short usage.""" + + def error(self, message): + sys.stderr.write(f"error: {message}\n\n") + # If a subcommand was given, show its help instead of top-level + if self._subparsers is not None: + for action in self._subparsers._actions: + if isinstance(action, argparse._SubParsersAction): + for arg in sys.argv[1:]: + if arg in action.choices: + action.choices[arg].print_help(sys.stderr) # type: ignore[union-attr] + sys.exit(2) + self.print_help(sys.stderr) + sys.exit(2) + + +class _SubcommandHelpParser(argparse.ArgumentParser): + """Subcommand parser that prints its own full --help on errors.""" + + def error(self, message): + sys.stderr.write(f"error: {message}\n\n") + self.print_help(sys.stderr) + sys.exit(2) + + +def main(): + parser = _FullHelpParser(description="Claude Code Chat Browser") + parser.add_argument("--no-color", action="store_true", help="Disable colored output") + + subparsers = parser.add_subparsers(dest="command", parser_class=_SubcommandHelpParser) + subparsers.required = True + + list_cmd.register(subparsers) + view_cmd.register(subparsers) + line_cmd.register(subparsers) + lines_cmd.register(subparsers) + files_cmd.register(subparsers) + search_cmd.register(subparsers) + tokens_cmd.register(subparsers) + spending_cmd.register(subparsers) + agents_cmd.register(subparsers) + serve_cmd.register(subparsers) + + args = parser.parse_args() + + if args.no_color: + formatters.set_no_color(True) + + try: + args.func(args) + except KeyboardInterrupt: + sys.exit(0) + except BrokenPipeError: + sys.exit(0) + + +if __name__ == "__main__": + main() diff --git a/cchat/commands/files_cmd.py b/cchat/commands/files_cmd.py index ac6e6a0..8604f12 100644 --- a/cchat/commands/files_cmd.py +++ b/cchat/commands/files_cmd.py @@ -1,81 +1,81 @@ -"""List files modified in a conversation.""" - -from __future__ import annotations - -import argparse -from collections import defaultdict - -from cchat import formatters, parser, store - - -def register(subparsers: argparse._SubParsersAction) -> None: - p = subparsers.add_parser("files", help="List files modified in a conversation") - p.add_argument("conv", help="Conversation identifier (path, UUID, prefix, or slug)") - p.add_argument( - "--no-subagents", - action="store_true", - default=False, - help="Exclude subagent conversations", - ) - p.add_argument("--json", action="store_true", default=False, help="Output as JSON") - p.add_argument("--no-color", action="store_true", default=False, help="Disable colored output") - p.set_defaults(func=run) - - -def _scan_file(path, file_counts, file_tools): - """Scan a single JSONL file for file modifications.""" - for _line_num, data in parser.parse_lines(path): - if data.get("type") != "assistant": - continue - mods = parser.extract_file_modifications(data) - if not mods: - continue - for mod in mods: - fp = mod["file_path"] - file_counts[fp] += 1 - file_tools[fp].add(mod["tool"]) - - -def run(args: argparse.Namespace) -> None: - if args.no_color: - formatters.set_no_color(True) - - conv_path = store.resolve_conversation(args.conv) - - file_counts: dict[str, int] = defaultdict(int) - file_tools: dict[str, set[str]] = defaultdict(set) - - # Scan main conversation - _scan_file(conv_path, file_counts, file_tools) - - # Scan subagents unless disabled - include_subagents = not args.no_subagents - if include_subagents: - for sa_path in store.get_subagent_paths(conv_path): - _scan_file(sa_path, file_counts, file_tools) - - if not file_counts: - print("No file modifications found.") - return - - # Sort by modification count descending - sorted_files = sorted(file_counts.items(), key=lambda x: x[1], reverse=True) - - if args.json: - data = [ - { - "file_path": fp, - "modifications": count, - "tools": sorted(file_tools[fp]), - } - for fp, count in sorted_files - ] - print(formatters.format_json(data)) - return - - rows = [ - [fp, str(count), ", ".join(sorted(file_tools[fp]))] - for fp, count in sorted_files - ] - headers = ["FILE", "MODIFICATIONS", "TOOLS"] - print(formatters.format_table(rows, headers, no_color=args.no_color)) +"""List files modified in a conversation.""" + +from __future__ import annotations + +import argparse +from collections import defaultdict + +from cchat import formatters, parser, store + + +def register(subparsers: argparse._SubParsersAction) -> None: + p = subparsers.add_parser("files", help="List files modified in a conversation") + p.add_argument("conv", help="Conversation identifier (path, UUID, prefix, or slug)") + p.add_argument( + "--no-subagents", + action="store_true", + default=False, + help="Exclude subagent conversations", + ) + p.add_argument("--json", action="store_true", default=False, help="Output as JSON") + p.add_argument("--no-color", action="store_true", default=False, help="Disable colored output") + p.set_defaults(func=run) + + +def _scan_file(path, file_counts, file_tools): + """Scan a single JSONL file for file modifications.""" + for _line_num, data in parser.parse_lines(path): + if data.get("type") != "assistant": + continue + mods = parser.extract_file_modifications(data) + if not mods: + continue + for mod in mods: + fp = mod["file_path"] + file_counts[fp] += 1 + file_tools[fp].add(mod["tool"]) + + +def run(args: argparse.Namespace) -> None: + if args.no_color: + formatters.set_no_color(True) + + conv_path = store.resolve_conversation(args.conv) + + file_counts: dict[str, int] = defaultdict(int) + file_tools: dict[str, set[str]] = defaultdict(set) + + # Scan main conversation + _scan_file(conv_path, file_counts, file_tools) + + # Scan subagents unless disabled + include_subagents = not args.no_subagents + if include_subagents: + for sa_path in store.get_subagent_paths(conv_path): + _scan_file(sa_path, file_counts, file_tools) + + if not file_counts: + print("No file modifications found.") + return + + # Sort by modification count descending + sorted_files = sorted(file_counts.items(), key=lambda x: x[1], reverse=True) + + if args.json: + data = [ + { + "file_path": fp, + "modifications": count, + "tools": sorted(file_tools[fp]), + } + for fp, count in sorted_files + ] + print(formatters.format_json(data)) + return + + rows = [ + [fp, str(count), ", ".join(sorted(file_tools[fp]))] + for fp, count in sorted_files + ] + headers = ["FILE", "MODIFICATIONS", "TOOLS"] + print(formatters.format_table(rows, headers, no_color=args.no_color)) diff --git a/cchat/commands/lines_cmd.py b/cchat/commands/lines_cmd.py index 99c489d..2f3c5a3 100644 --- a/cchat/commands/lines_cmd.py +++ b/cchat/commands/lines_cmd.py @@ -1,280 +1,280 @@ -"""Show parsed lines from a conversation.""" - -from __future__ import annotations - -import argparse -import json -import sys - -from cchat import formatters, parser, store - - -def _safe_print(text=""): - """Print with encoding-safe fallback for Windows consoles.""" - encoding = sys.stdout.encoding or 'utf-8' - print(text.encode(encoding, errors='replace').decode(encoding)) - - -def register(subparsers: argparse._SubParsersAction) -> None: - p = subparsers.add_parser("lines", help="List lines in a conversation") - p.add_argument("conv", nargs="?", default=None, - help="Conversation identifier (path, UUID, prefix, or slug)") - p.add_argument("--agent", dest="agent_id", default=None, metavar="AGENT_ID", - help="Resolve a subagent JSONL by agent ID (mutually exclusive with conv)") - p.add_argument("--type", dest="line_type", default=None, - help="Only show lines of this type (e.g. user, assistant, system)") - p.add_argument("--subtype", dest="line_subtype", default=None, - help="Only show lines of this subtype (e.g. message, tool_result, tool_use, text)") - p.add_argument("--head", type=int, nargs="?", const=50, default=None, metavar="N", - help="Show first N lines (default N=50; default mode if nothing specified)") - p.add_argument("--tail", type=int, nargs="?", const=50, default=None, metavar="N", - help="Show last N lines (default N=50)") - p.add_argument("--from", type=int, default=None, dest="from_line", metavar="N", - help="Start from line number N (inclusive, overrides --head/--tail)") - p.add_argument("--to", type=int, default=None, dest="to_line", metavar="N", - help="End at line number N (inclusive, overrides --head/--tail)") - p.add_argument("--full", action="store_true", - help="Show full content of each line instead of compact table") - p.add_argument("--max-chars", type=int, default=None, metavar="N", - help="Limit content to N characters per line (implies --full)") - p.add_argument("--middle-out", action="store_true", - help="When truncating with --max-chars, cut the middle and keep start+end") - p.add_argument("--json", action="store_true", dest="json_output", - help="Output as JSON") - p.add_argument("--no-color", action="store_true", - help="Disable colored output") - p.set_defaults(func=run) - - -def _format_tokens(count: int | None) -> str: - """Format a token count for display: ``1234`` -> ``1.2k``, *None* -> ``-``.""" - if count is None: - return "-" - if count >= 1_000_000: - return f"{count / 1_000_000:.1f}M" - if count >= 1_000: - return f"{count / 1_000:.1f}k" - return str(count) - - -def _trunc(text: str, max_len: int, full: bool) -> str: - """Truncate text unless full mode is set.""" - if full: - return text - return formatters.truncate(text, max_len) - - -def _cap(text: str, budget: list[int] | None, middle_out: bool = False) -> str: - """Return *text* capped to the remaining character budget. - - *budget* is a one-element list ``[remaining]`` mutated in-place so that - successive calls within the same line share the allowance. When *budget* - is ``None`` no truncation is applied. - - If *middle_out* is True the first and last portions are kept with a - ``[...N chars...]`` marker in the middle. - """ - if budget is None: - return text - if budget[0] <= 0: - return "..." - if len(text) <= budget[0]: - budget[0] -= len(text) - return text - limit = budget[0] - budget[0] = 0 - if not middle_out: - return text[:limit] + "..." - # Keep start and end, redact the middle - redacted = len(text) - limit - marker = f"\n[...{redacted} chars...]\n" - half = (limit - len(marker)) // 2 - if half < 20: - # Not enough room for a meaningful split -- fall back to end-truncation - return text[:limit] + "..." - end_len = limit - len(marker) - half - return text[:half] + marker + text[-end_len:] - - -def _render_line(line_num: int, data: dict, max_chars: int | None = None, - middle_out: bool = False) -> None: - """Render a single conversation line with full detail (replicates view_cmd pattern).""" - line_type = data.get("type", "") - ts = formatters.format_timestamp(parser.extract_timestamp(data)) - subtype = parser.classify_line_subtype(data) - tokens = parser.extract_token_count(data) - msg = data.get("message") or {} - - # Budget tracker: mutable single-element list shared across _cap calls - budget = [max_chars] if max_chars is not None else None - mo = middle_out - - # Header - tok_str = f" {_format_tokens(tokens)} tok" if tokens is not None else "" - header = formatters.colored( - f"--- L{line_num} {line_type}/{subtype}{tok_str} {ts} ---", - formatters.BOLD, - ) - _safe_print(header) - - if line_type == "user": - content = msg.get("content") if isinstance(msg, dict) else None - - if isinstance(content, str): - _safe_print(formatters.colored("USER:", formatters.GREEN)) - _safe_print(_cap(content, budget, mo)) - - elif isinstance(content, list): - _safe_print(formatters.colored("TOOL RESULT:", formatters.GREEN)) - for item in content: - if not isinstance(item, dict): - continue - if item.get("type") != "tool_result": - continue - tool_use_id = item.get("tool_use_id", "") - _safe_print(f" tool_use_id: {tool_use_id}") - if budget is not None and budget[0] <= 0: - _safe_print(" ...") - break - sub_content = item.get("content", []) - if isinstance(sub_content, list): - for sub in sub_content: - if isinstance(sub, dict) and sub.get("type") == "text": - _safe_print(f" {_cap(sub.get('text', ''), budget, mo)}") - elif isinstance(sub_content, str): - _safe_print(f" {_cap(sub_content, budget, mo)}") - - else: - _safe_print(formatters.colored("USER:", formatters.GREEN)) - _safe_print(_cap(str(content), budget, mo)) - - elif line_type == "assistant": - _safe_print(formatters.colored("ASSISTANT:", formatters.BLUE)) - content = msg.get("content") if isinstance(msg, dict) else None - if isinstance(content, list): - for item in content: - if not isinstance(item, dict): - continue - if budget is not None and budget[0] <= 0: - _safe_print("...") - break - item_type = item.get("type", "") - - if item_type == "text": - _safe_print(_cap(item.get("text", ""), budget, mo)) - - elif item_type == "thinking": - _safe_print(formatters.colored("[THINKING]", formatters.DIM)) - _safe_print(_cap(item.get("thinking", ""), budget, mo)) - - elif item_type == "tool_use": - name = item.get("name", "tool") - _safe_print(formatters.colored(f"[TOOL: {name}]", formatters.MAGENTA)) - inp = item.get("input", {}) - _safe_print(_cap(json.dumps(inp, indent=2, default=str), budget, mo)) - - elif line_type == "system": - _safe_print(formatters.colored("SYSTEM:", formatters.BOLD)) - subtype_val = data.get("subtype", "") - if subtype_val: - _safe_print(f" subtype: {subtype_val}") - for key in ("url", "durationMs", "sessionId"): - val = data.get(key) - if val is not None: - _safe_print(f" {key}: {val}") - - else: - _safe_print(f"{line_type.upper() or 'UNKNOWN'}:") - _safe_print(_cap(json.dumps(data, indent=2, default=str), budget, mo)) - - _safe_print() # blank line separator - - -def run(args: argparse.Namespace) -> None: - if args.no_color: - formatters.set_no_color(True) - - if args.agent_id and args.conv: - print("Error: --agent and conv are mutually exclusive", file=sys.stderr) - sys.exit(1) - if not args.agent_id and not args.conv: - print("Error: either conv or --agent is required", file=sys.stderr) - sys.exit(1) - - if args.agent_id: - path = store.resolve_agent(args.agent_id) - else: - path = store.resolve_conversation(args.conv) - lines = parser.deduplicate_assistant_lines(parser.parse_lines(path)) - - # Collect into list, applying type filter - entries: list[tuple[int, dict]] = [] - for line_num, data in lines: - if args.line_type and data.get("type") != args.line_type: - continue - if args.line_subtype and parser.classify_line_subtype(data) != args.line_subtype: - continue - entries.append((line_num, data)) - - # Validate mutually exclusive --head / --tail - if args.head is not None and args.tail is not None: - print("Error: --head and --tail are mutually exclusive", file=sys.stderr) - sys.exit(1) - - # Apply navigation: --from/--to override --head/--tail - if args.from_line is not None or args.to_line is not None: - entries = [ - (ln, d) for ln, d in entries - if (args.from_line is None or ln >= args.from_line) - and (args.to_line is None or ln <= args.to_line) - ] - elif args.tail is not None: - entries = entries[-args.tail:] - else: - # --head N, or default to first 50 (but --full shows all by default) - head_n = args.head if args.head is not None else (None if args.full else 50) - if head_n is not None: - entries = entries[:head_n] - - if args.json_output: - records = [] - for line_num, data in entries: - uuid_val = data.get("uuid", "") - records.append({ - "line_number": line_num, - "type": data.get("type", ""), - "subtype": parser.classify_line_subtype(data), - "tokens": parser.extract_token_count(data), - "uuid": uuid_val[:8] if uuid_val else "", - "timestamp": parser.extract_timestamp(data), - "snippet": parser.extract_content_summary(data), - }) - _safe_print(formatters.format_json(records)) - return - - # --max-chars implies --full - if args.max_chars is not None: - args.full = True - - if args.full: - for line_num, data in entries: - _render_line(line_num, data, max_chars=args.max_chars, - middle_out=args.middle_out) - return - - headers = ["LINE#", "TYPE", "SUBTYPE", "TOKENS", "UUID", "TIMESTAMP", "SNIPPET"] - rows: list[list[str]] = [] - for line_num, data in entries: - uuid_val = data.get("uuid", "") - tokens = parser.extract_token_count(data) - rows.append([ - str(line_num), - data.get("type", ""), - parser.classify_line_subtype(data), - _format_tokens(tokens), - uuid_val[:8] if uuid_val else "", - formatters.format_timestamp(parser.extract_timestamp(data)), - parser.extract_content_summary(data), - ]) - - _safe_print(formatters.format_table(rows, headers, no_color=args.no_color)) +"""Show parsed lines from a conversation.""" + +from __future__ import annotations + +import argparse +import json +import sys + +from cchat import formatters, parser, store + + +def _safe_print(text=""): + """Print with encoding-safe fallback for Windows consoles.""" + encoding = sys.stdout.encoding or 'utf-8' + print(text.encode(encoding, errors='replace').decode(encoding)) + + +def register(subparsers: argparse._SubParsersAction) -> None: + p = subparsers.add_parser("lines", help="List lines in a conversation") + p.add_argument("conv", nargs="?", default=None, + help="Conversation identifier (path, UUID, prefix, or slug)") + p.add_argument("--agent", dest="agent_id", default=None, metavar="AGENT_ID", + help="Resolve a subagent JSONL by agent ID (mutually exclusive with conv)") + p.add_argument("--type", dest="line_type", default=None, + help="Only show lines of this type (e.g. user, assistant, system)") + p.add_argument("--subtype", dest="line_subtype", default=None, + help="Only show lines of this subtype (e.g. message, tool_result, tool_use, text)") + p.add_argument("--head", type=int, nargs="?", const=50, default=None, metavar="N", + help="Show first N lines (default N=50; default mode if nothing specified)") + p.add_argument("--tail", type=int, nargs="?", const=50, default=None, metavar="N", + help="Show last N lines (default N=50)") + p.add_argument("--from", type=int, default=None, dest="from_line", metavar="N", + help="Start from line number N (inclusive, overrides --head/--tail)") + p.add_argument("--to", type=int, default=None, dest="to_line", metavar="N", + help="End at line number N (inclusive, overrides --head/--tail)") + p.add_argument("--full", action="store_true", + help="Show full content of each line instead of compact table") + p.add_argument("--max-chars", type=int, default=None, metavar="N", + help="Limit content to N characters per line (implies --full)") + p.add_argument("--middle-out", action="store_true", + help="When truncating with --max-chars, cut the middle and keep start+end") + p.add_argument("--json", action="store_true", dest="json_output", + help="Output as JSON") + p.add_argument("--no-color", action="store_true", + help="Disable colored output") + p.set_defaults(func=run) + + +def _format_tokens(count: int | None) -> str: + """Format a token count for display: ``1234`` -> ``1.2k``, *None* -> ``-``.""" + if count is None: + return "-" + if count >= 1_000_000: + return f"{count / 1_000_000:.1f}M" + if count >= 1_000: + return f"{count / 1_000:.1f}k" + return str(count) + + +def _trunc(text: str, max_len: int, full: bool) -> str: + """Truncate text unless full mode is set.""" + if full: + return text + return formatters.truncate(text, max_len) + + +def _cap(text: str, budget: list[int] | None, middle_out: bool = False) -> str: + """Return *text* capped to the remaining character budget. + + *budget* is a one-element list ``[remaining]`` mutated in-place so that + successive calls within the same line share the allowance. When *budget* + is ``None`` no truncation is applied. + + If *middle_out* is True the first and last portions are kept with a + ``[...N chars...]`` marker in the middle. + """ + if budget is None: + return text + if budget[0] <= 0: + return "..." + if len(text) <= budget[0]: + budget[0] -= len(text) + return text + limit = budget[0] + budget[0] = 0 + if not middle_out: + return text[:limit] + "..." + # Keep start and end, redact the middle + redacted = len(text) - limit + marker = f"\n[...{redacted} chars...]\n" + half = (limit - len(marker)) // 2 + if half < 20: + # Not enough room for a meaningful split -- fall back to end-truncation + return text[:limit] + "..." + end_len = limit - len(marker) - half + return text[:half] + marker + text[-end_len:] + + +def _render_line(line_num: int, data: dict, max_chars: int | None = None, + middle_out: bool = False) -> None: + """Render a single conversation line with full detail (replicates view_cmd pattern).""" + line_type = data.get("type", "") + ts = formatters.format_timestamp(parser.extract_timestamp(data)) + subtype = parser.classify_line_subtype(data) + tokens = parser.extract_token_count(data) + msg = data.get("message") or {} + + # Budget tracker: mutable single-element list shared across _cap calls + budget = [max_chars] if max_chars is not None else None + mo = middle_out + + # Header + tok_str = f" {_format_tokens(tokens)} tok" if tokens is not None else "" + header = formatters.colored( + f"--- L{line_num} {line_type}/{subtype}{tok_str} {ts} ---", + formatters.BOLD, + ) + _safe_print(header) + + if line_type == "user": + content = msg.get("content") if isinstance(msg, dict) else None + + if isinstance(content, str): + _safe_print(formatters.colored("USER:", formatters.GREEN)) + _safe_print(_cap(content, budget, mo)) + + elif isinstance(content, list): + _safe_print(formatters.colored("TOOL RESULT:", formatters.GREEN)) + for item in content: + if not isinstance(item, dict): + continue + if item.get("type") != "tool_result": + continue + tool_use_id = item.get("tool_use_id", "") + _safe_print(f" tool_use_id: {tool_use_id}") + if budget is not None and budget[0] <= 0: + _safe_print(" ...") + break + sub_content = item.get("content", []) + if isinstance(sub_content, list): + for sub in sub_content: + if isinstance(sub, dict) and sub.get("type") == "text": + _safe_print(f" {_cap(sub.get('text', ''), budget, mo)}") + elif isinstance(sub_content, str): + _safe_print(f" {_cap(sub_content, budget, mo)}") + + else: + _safe_print(formatters.colored("USER:", formatters.GREEN)) + _safe_print(_cap(str(content), budget, mo)) + + elif line_type == "assistant": + _safe_print(formatters.colored("ASSISTANT:", formatters.BLUE)) + content = msg.get("content") if isinstance(msg, dict) else None + if isinstance(content, list): + for item in content: + if not isinstance(item, dict): + continue + if budget is not None and budget[0] <= 0: + _safe_print("...") + break + item_type = item.get("type", "") + + if item_type == "text": + _safe_print(_cap(item.get("text", ""), budget, mo)) + + elif item_type == "thinking": + _safe_print(formatters.colored("[THINKING]", formatters.DIM)) + _safe_print(_cap(item.get("thinking", ""), budget, mo)) + + elif item_type == "tool_use": + name = item.get("name", "tool") + _safe_print(formatters.colored(f"[TOOL: {name}]", formatters.MAGENTA)) + inp = item.get("input", {}) + _safe_print(_cap(json.dumps(inp, indent=2, default=str), budget, mo)) + + elif line_type == "system": + _safe_print(formatters.colored("SYSTEM:", formatters.BOLD)) + subtype_val = data.get("subtype", "") + if subtype_val: + _safe_print(f" subtype: {subtype_val}") + for key in ("url", "durationMs", "sessionId"): + val = data.get(key) + if val is not None: + _safe_print(f" {key}: {val}") + + else: + _safe_print(f"{line_type.upper() or 'UNKNOWN'}:") + _safe_print(_cap(json.dumps(data, indent=2, default=str), budget, mo)) + + _safe_print() # blank line separator + + +def run(args: argparse.Namespace) -> None: + if args.no_color: + formatters.set_no_color(True) + + if args.agent_id and args.conv: + print("Error: --agent and conv are mutually exclusive", file=sys.stderr) + sys.exit(1) + if not args.agent_id and not args.conv: + print("Error: either conv or --agent is required", file=sys.stderr) + sys.exit(1) + + if args.agent_id: + path = store.resolve_agent(args.agent_id) + else: + path = store.resolve_conversation(args.conv) + lines = parser.deduplicate_assistant_lines(parser.parse_lines(path)) + + # Collect into list, applying type filter + entries: list[tuple[int, dict]] = [] + for line_num, data in lines: + if args.line_type and data.get("type") != args.line_type: + continue + if args.line_subtype and parser.classify_line_subtype(data) != args.line_subtype: + continue + entries.append((line_num, data)) + + # Validate mutually exclusive --head / --tail + if args.head is not None and args.tail is not None: + print("Error: --head and --tail are mutually exclusive", file=sys.stderr) + sys.exit(1) + + # Apply navigation: --from/--to override --head/--tail + if args.from_line is not None or args.to_line is not None: + entries = [ + (ln, d) for ln, d in entries + if (args.from_line is None or ln >= args.from_line) + and (args.to_line is None or ln <= args.to_line) + ] + elif args.tail is not None: + entries = entries[-args.tail:] + else: + # --head N, or default to first 50 (but --full shows all by default) + head_n = args.head if args.head is not None else (None if args.full else 50) + if head_n is not None: + entries = entries[:head_n] + + if args.json_output: + records = [] + for line_num, data in entries: + uuid_val = data.get("uuid", "") + records.append({ + "line_number": line_num, + "type": data.get("type", ""), + "subtype": parser.classify_line_subtype(data), + "tokens": parser.extract_token_count(data), + "uuid": uuid_val[:8] if uuid_val else "", + "timestamp": parser.extract_timestamp(data), + "snippet": parser.extract_content_summary(data), + }) + _safe_print(formatters.format_json(records)) + return + + # --max-chars implies --full + if args.max_chars is not None: + args.full = True + + if args.full: + for line_num, data in entries: + _render_line(line_num, data, max_chars=args.max_chars, + middle_out=args.middle_out) + return + + headers = ["LINE#", "TYPE", "SUBTYPE", "TOKENS", "UUID", "TIMESTAMP", "SNIPPET"] + rows: list[list[str]] = [] + for line_num, data in entries: + uuid_val = data.get("uuid", "") + tokens = parser.extract_token_count(data) + rows.append([ + str(line_num), + data.get("type", ""), + parser.classify_line_subtype(data), + _format_tokens(tokens), + uuid_val[:8] if uuid_val else "", + formatters.format_timestamp(parser.extract_timestamp(data)), + parser.extract_content_summary(data), + ]) + + _safe_print(formatters.format_table(rows, headers, no_color=args.no_color)) diff --git a/cchat/commands/search_cmd.py b/cchat/commands/search_cmd.py index c4769b2..4f2ecf7 100644 --- a/cchat/commands/search_cmd.py +++ b/cchat/commands/search_cmd.py @@ -1,153 +1,153 @@ -"""Search conversation transcripts for a query string.""" - -from __future__ import annotations - -import argparse -import json -import sys - -from cchat import formatters, parser, store - -HARD_CAP = 1000 - - -def register(subparsers: argparse._SubParsersAction) -> None: - p = subparsers.add_parser("search", help="Search conversation transcripts") - p.add_argument("query", help="Substring to search for (case-insensitive)") - p.add_argument("--project", default=None, help="Restrict search to a project key") - p.add_argument("--limit", type=int, default=20, help="Maximum number of matches (default: 20)") - p.add_argument("--sort", choices=["newest", "oldest"], default="newest", help="Sort order (default: newest)") - p.add_argument("--type", dest="type_filter", default=None, help="Only show lines of this type (e.g. user, assistant, system)") - p.add_argument("--first-per-conv", action="store_true", default=False, help="Keep only the first match per conversation") - p.add_argument("--json", action="store_true", default=False, help="Output as JSON") - p.add_argument("--no-color", action="store_true", default=False, help="Disable colored output") - p.set_defaults(func=run) - - -def _snippet(line: str, query_lower: str, context: int = 30) -> str: - """Extract a snippet around the first occurrence of query in line.""" - line_lower = line.lower() - idx = line_lower.find(query_lower) - if idx == -1: - return "" - start = max(0, idx - context) - end = min(len(line), idx + len(query_lower) + context) - snippet = line[start:end].replace("\n", " ").replace("\r", "") - prefix = "..." if start > 0 else "" - suffix = "..." if end < len(line) else "" - result = prefix + snippet + suffix - # Replace characters that can't be encoded in the console's encoding (e.g. - # cp1252 on Windows) to avoid UnicodeEncodeError when printing. - enc = sys.stdout.encoding or "utf-8" - return result.encode(enc, errors="replace").decode(enc) - - -def run(args: argparse.Namespace) -> None: - if args.no_color: - formatters.set_no_color(True) - - query_lower = args.query.lower() - - conversations = store.discover_conversations(project_key=args.project) - if not conversations: - print("No conversations found.") - return - - # Build lookup for slug and project_key by UUID. - conv_meta: dict[str, store.ConversationInfo] = {c.uuid: c for c in conversations} - - matches: list[dict] = [] - capped = False - - for conv in conversations: - conv_uuid = conv.uuid - try: - with open(conv.path, "r", encoding="utf-8") as fh: - for line_num, raw_line in enumerate(fh, start=1): - if query_lower not in raw_line.lower(): - continue - try: - data = json.loads(raw_line) - except (json.JSONDecodeError, ValueError): - data = {} - - line_type = data.get("type", "?") - - # --type filter: skip non-matching lines early. - if args.type_filter and line_type != args.type_filter: - continue - - snip = _snippet(raw_line.strip(), query_lower) - timestamp = parser.extract_timestamp(data) - info = conv_meta.get(conv_uuid) - - matches.append({ - "conversation": conv_uuid, - "line": line_num, - "type": line_type, - "snippet": snip, - "timestamp": timestamp, - "slug": info.slug if info else None, - "project_key": info.project_key if info else None, - }) - - if len(matches) >= HARD_CAP: - capped = True - break - except OSError: - continue - - if capped: - break - - if not matches: - print(f"No matches found for '{args.query}'.") - return - - # Sort all matches. - reverse = args.sort == "newest" - matches.sort(key=lambda m: m["timestamp"] or "", reverse=reverse) - - total_found = len(matches) - - # --first-per-conv: keep only the first match per conversation. - if args.first_per_conv: - seen: set[str] = set() - deduped: list[dict] = [] - for m in matches: - if m["conversation"] not in seen: - seen.add(m["conversation"]) - deduped.append(m) - matches = deduped - - # Apply --limit after sorting and dedup. - matches = matches[: args.limit] - - if args.json: - envelope = { - "matches": matches, - "total": total_found, - "capped": capped, - } - print(formatters.format_json(envelope)) - return - - # Truncation notice to stderr. - if len(matches) < total_found or capped: - displayed = len(matches) - total_label = f"{total_found}+" if capped else str(total_found) - print(f"(showing {displayed} of {total_label} matches)", file=sys.stderr) - - rows = [ - [ - formatters.format_timestamp(m["timestamp"]), - m["slug"] or m["conversation"][:8], - m["conversation"][:8], - str(m["line"]), - m["type"], - m["snippet"], - ] - for m in matches - ] - headers = ["DATE", "SLUG", "CONVERSATION", "LINE#", "TYPE", "MATCH"] - print(formatters.format_table(rows, headers, no_color=args.no_color)) +"""Search conversation transcripts for a query string.""" + +from __future__ import annotations + +import argparse +import json +import sys + +from cchat import formatters, parser, store + +HARD_CAP = 1000 + + +def register(subparsers: argparse._SubParsersAction) -> None: + p = subparsers.add_parser("search", help="Search conversation transcripts") + p.add_argument("query", help="Substring to search for (case-insensitive)") + p.add_argument("--project", default=None, help="Restrict search to a project key") + p.add_argument("--limit", type=int, default=20, help="Maximum number of matches (default: 20)") + p.add_argument("--sort", choices=["newest", "oldest"], default="newest", help="Sort order (default: newest)") + p.add_argument("--type", dest="type_filter", default=None, help="Only show lines of this type (e.g. user, assistant, system)") + p.add_argument("--first-per-conv", action="store_true", default=False, help="Keep only the first match per conversation") + p.add_argument("--json", action="store_true", default=False, help="Output as JSON") + p.add_argument("--no-color", action="store_true", default=False, help="Disable colored output") + p.set_defaults(func=run) + + +def _snippet(line: str, query_lower: str, context: int = 30) -> str: + """Extract a snippet around the first occurrence of query in line.""" + line_lower = line.lower() + idx = line_lower.find(query_lower) + if idx == -1: + return "" + start = max(0, idx - context) + end = min(len(line), idx + len(query_lower) + context) + snippet = line[start:end].replace("\n", " ").replace("\r", "") + prefix = "..." if start > 0 else "" + suffix = "..." if end < len(line) else "" + result = prefix + snippet + suffix + # Replace characters that can't be encoded in the console's encoding (e.g. + # cp1252 on Windows) to avoid UnicodeEncodeError when printing. + enc = sys.stdout.encoding or "utf-8" + return result.encode(enc, errors="replace").decode(enc) + + +def run(args: argparse.Namespace) -> None: + if args.no_color: + formatters.set_no_color(True) + + query_lower = args.query.lower() + + conversations = store.discover_conversations(project_key=args.project) + if not conversations: + print("No conversations found.") + return + + # Build lookup for slug and project_key by UUID. + conv_meta: dict[str, store.ConversationInfo] = {c.uuid: c for c in conversations} + + matches: list[dict] = [] + capped = False + + for conv in conversations: + conv_uuid = conv.uuid + try: + with open(conv.path, "r", encoding="utf-8") as fh: + for line_num, raw_line in enumerate(fh, start=1): + if query_lower not in raw_line.lower(): + continue + try: + data = json.loads(raw_line) + except (json.JSONDecodeError, ValueError): + data = {} + + line_type = data.get("type", "?") + + # --type filter: skip non-matching lines early. + if args.type_filter and line_type != args.type_filter: + continue + + snip = _snippet(raw_line.strip(), query_lower) + timestamp = parser.extract_timestamp(data) + info = conv_meta.get(conv_uuid) + + matches.append({ + "conversation": conv_uuid, + "line": line_num, + "type": line_type, + "snippet": snip, + "timestamp": timestamp, + "slug": info.slug if info else None, + "project_key": info.project_key if info else None, + }) + + if len(matches) >= HARD_CAP: + capped = True + break + except OSError: + continue + + if capped: + break + + if not matches: + print(f"No matches found for '{args.query}'.") + return + + # Sort all matches. + reverse = args.sort == "newest" + matches.sort(key=lambda m: m["timestamp"] or "", reverse=reverse) + + total_found = len(matches) + + # --first-per-conv: keep only the first match per conversation. + if args.first_per_conv: + seen: set[str] = set() + deduped: list[dict] = [] + for m in matches: + if m["conversation"] not in seen: + seen.add(m["conversation"]) + deduped.append(m) + matches = deduped + + # Apply --limit after sorting and dedup. + matches = matches[: args.limit] + + if args.json: + envelope = { + "matches": matches, + "total": total_found, + "capped": capped, + } + print(formatters.format_json(envelope)) + return + + # Truncation notice to stderr. + if len(matches) < total_found or capped: + displayed = len(matches) + total_label = f"{total_found}+" if capped else str(total_found) + print(f"(showing {displayed} of {total_label} matches)", file=sys.stderr) + + rows = [ + [ + formatters.format_timestamp(m["timestamp"]), + m["slug"] or m["conversation"][:8], + m["conversation"][:8], + str(m["line"]), + m["type"], + m["snippet"], + ] + for m in matches + ] + headers = ["DATE", "SLUG", "CONVERSATION", "LINE#", "TYPE", "MATCH"] + print(formatters.format_table(rows, headers, no_color=args.no_color)) diff --git a/cchat/commands/tokens_cmd.py b/cchat/commands/tokens_cmd.py index 38d1795..667d0ea 100644 --- a/cchat/commands/tokens_cmd.py +++ b/cchat/commands/tokens_cmd.py @@ -1,119 +1,119 @@ -"""Show per-turn token usage and estimated cost for a conversation.""" - -from __future__ import annotations - -import argparse - -from cchat import costs, formatters, parser, store - - -def register(subparsers: argparse._SubParsersAction) -> None: - p = subparsers.add_parser("tokens", help="Show token usage and estimated cost") - p.add_argument("conv", help="Conversation identifier (path, UUID, prefix, or slug)") - p.add_argument("--json", action="store_true", default=False, help="Output as JSON") - p.add_argument("--no-color", action="store_true", default=False, help="Disable colored output") - p.set_defaults(func=run) - - -def _fmt(n: int) -> str: - """Format an integer with comma separators.""" - return f"{n:,}" - - -def run(args: argparse.Namespace) -> None: - if args.no_color: - formatters.set_no_color(True) - - conv_path = store.resolve_conversation(args.conv) - - lines = parser.parse_lines(conv_path) - deduped = parser.deduplicate_assistant_lines(lines) - - turns: list[dict] = [] - turn_num = 0 - total_cost = 0.0 - - for _line_num, data in deduped: - if data.get("type") != "assistant": - continue - model = parser.extract_model(data) - usage = parser.extract_usage(data) - if usage is None: - continue - - turn_num += 1 - rates = costs.get_rates(model) - turn_cost = costs.cost_for_usage(usage, rates) - total_cost += turn_cost - - # Extract model name (last component for display) - model_display = "unknown" - if model: - parts = model.split("-") - if len(parts) > 1: - # e.g., "claude-opus-4-6" -> "opus-4-6" - model_display = "-".join(parts[1:]) - else: - model_display = model - - turns.append({ - "turn": turn_num, - "model": model_display, - "input_tokens": usage["input_tokens"], - "output_tokens": usage["output_tokens"], - "cache_read_input_tokens": usage["cache_read_input_tokens"], - "cache_creation_input_tokens": usage["cache_creation_input_tokens"], - "cost_usd": turn_cost, - }) - - if not turns: - print("No token usage data found.") - return - - # Compute totals - total_input = sum(t["input_tokens"] for t in turns) - total_output = sum(t["output_tokens"] for t in turns) - total_cache_read = sum(t["cache_read_input_tokens"] for t in turns) - total_cache_create = sum(t["cache_creation_input_tokens"] for t in turns) - - if args.json: - data = { - "turns": turns, - "totals": { - "input_tokens": total_input, - "output_tokens": total_output, - "cache_read_input_tokens": total_cache_read, - "cache_creation_input_tokens": total_cache_create, - }, - "estimated_cost_usd": round(total_cost, 2), - } - print(formatters.format_json(data)) - return - - # Table output - headers = ["TURN", "MODEL", "INPUT", "OUTPUT", "CACHE_READ", "CACHE_CREATE", "COST"] - rows = [ - [ - str(t["turn"]), - t["model"], - _fmt(t["input_tokens"]), - _fmt(t["output_tokens"]), - _fmt(t["cache_read_input_tokens"]), - _fmt(t["cache_creation_input_tokens"]), - formatters.format_cost(t["cost_usd"]), - ] - for t in turns - ] - - # Totals row - rows.append([ - "TOTAL", - "", - _fmt(total_input), - _fmt(total_output), - _fmt(total_cache_read), - _fmt(total_cache_create), - formatters.format_cost(total_cost), - ]) - - print(formatters.format_table(rows, headers, no_color=args.no_color)) +"""Show per-turn token usage and estimated cost for a conversation.""" + +from __future__ import annotations + +import argparse + +from cchat import costs, formatters, parser, store + + +def register(subparsers: argparse._SubParsersAction) -> None: + p = subparsers.add_parser("tokens", help="Show token usage and estimated cost") + p.add_argument("conv", help="Conversation identifier (path, UUID, prefix, or slug)") + p.add_argument("--json", action="store_true", default=False, help="Output as JSON") + p.add_argument("--no-color", action="store_true", default=False, help="Disable colored output") + p.set_defaults(func=run) + + +def _fmt(n: int) -> str: + """Format an integer with comma separators.""" + return f"{n:,}" + + +def run(args: argparse.Namespace) -> None: + if args.no_color: + formatters.set_no_color(True) + + conv_path = store.resolve_conversation(args.conv) + + lines = parser.parse_lines(conv_path) + deduped = parser.deduplicate_assistant_lines(lines) + + turns: list[dict] = [] + turn_num = 0 + total_cost = 0.0 + + for _line_num, data in deduped: + if data.get("type") != "assistant": + continue + model = parser.extract_model(data) + usage = parser.extract_usage(data) + if usage is None: + continue + + turn_num += 1 + rates = costs.get_rates(model) + turn_cost = costs.cost_for_usage(usage, rates) + total_cost += turn_cost + + # Extract model name (last component for display) + model_display = "unknown" + if model: + parts = model.split("-") + if len(parts) > 1: + # e.g., "claude-opus-4-6" -> "opus-4-6" + model_display = "-".join(parts[1:]) + else: + model_display = model + + turns.append({ + "turn": turn_num, + "model": model_display, + "input_tokens": usage["input_tokens"], + "output_tokens": usage["output_tokens"], + "cache_read_input_tokens": usage["cache_read_input_tokens"], + "cache_creation_input_tokens": usage["cache_creation_input_tokens"], + "cost_usd": turn_cost, + }) + + if not turns: + print("No token usage data found.") + return + + # Compute totals + total_input = sum(t["input_tokens"] for t in turns) + total_output = sum(t["output_tokens"] for t in turns) + total_cache_read = sum(t["cache_read_input_tokens"] for t in turns) + total_cache_create = sum(t["cache_creation_input_tokens"] for t in turns) + + if args.json: + data = { + "turns": turns, + "totals": { + "input_tokens": total_input, + "output_tokens": total_output, + "cache_read_input_tokens": total_cache_read, + "cache_creation_input_tokens": total_cache_create, + }, + "estimated_cost_usd": round(total_cost, 2), + } + print(formatters.format_json(data)) + return + + # Table output + headers = ["TURN", "MODEL", "INPUT", "OUTPUT", "CACHE_READ", "CACHE_CREATE", "COST"] + rows = [ + [ + str(t["turn"]), + t["model"], + _fmt(t["input_tokens"]), + _fmt(t["output_tokens"]), + _fmt(t["cache_read_input_tokens"]), + _fmt(t["cache_creation_input_tokens"]), + formatters.format_cost(t["cost_usd"]), + ] + for t in turns + ] + + # Totals row + rows.append([ + "TOTAL", + "", + _fmt(total_input), + _fmt(total_output), + _fmt(total_cache_read), + _fmt(total_cache_create), + formatters.format_cost(total_cost), + ]) + + print(formatters.format_table(rows, headers, no_color=args.no_color)) diff --git a/cchat/formatters.py b/cchat/formatters.py index 86f096d..d061484 100644 --- a/cchat/formatters.py +++ b/cchat/formatters.py @@ -1,212 +1,224 @@ -"""Output formatting utilities for the cchat CLI tool.""" - -from __future__ import annotations - -import json -import os -import sys -from datetime import datetime - -# --------------------------------------------------------------------------- -# Global state -# --------------------------------------------------------------------------- - -_no_color: bool = False - - -def set_no_color(val: bool) -> None: - """Globally force color off (or back on).""" - global _no_color - _no_color = val - - -def supports_color() -> bool: - """Return *True* if the terminal likely supports ANSI color codes.""" - if _no_color: - return False - if os.environ.get("NO_COLOR") is not None: - return False - if not hasattr(sys.stdout, "isatty") or not sys.stdout.isatty(): - return False - return True - - -# --------------------------------------------------------------------------- -# Common ANSI codes -# --------------------------------------------------------------------------- - -RED = 31 -GREEN = 32 -YELLOW = 33 -BLUE = 34 -MAGENTA = 35 -CYAN = 36 -BOLD = 1 -DIM = 2 - - -# --------------------------------------------------------------------------- -# Text helpers -# --------------------------------------------------------------------------- - - -def colored(text: str, color_code: int) -> str: - """Wrap *text* in ANSI escape sequences if color is supported.""" - if not supports_color(): - return text - return f"\033[{color_code}m{text}\033[0m" - - -def truncate_middle(text: str, max_len: int) -> str: - """Truncate *text* by replacing the middle with an ellipsis.""" - if len(text) <= max_len: - return text - if max_len < 3: - return text[:max_len] - half = (max_len - 1) // 2 - end_len = max_len - 1 - half - return text[:half] + "\u2026" + text[-end_len:] - - -def truncate(text: str, max_len: int) -> str: - """Simple end-truncation with an ellipsis.""" - if len(text) <= max_len: - return text - if max_len < 2: - return text[:max_len] - return text[: max_len - 1] + "\u2026" - - -# --------------------------------------------------------------------------- -# Table formatting -# --------------------------------------------------------------------------- - - -def format_table( - rows: list[list[str]], - headers: list[str], - no_color: bool = False, -) -> str: - """Format *rows* as a simple text table with *headers*. - - Columns are left-aligned and separated by two spaces. - """ - num_cols = len(headers) - - # Ensure every row has the right number of columns. - normalised: list[list[str]] = [] - for row in rows: - padded = list(row) + [""] * (num_cols - len(row)) - normalised.append(padded[:num_cols]) - - # Column widths. - widths = [len(h) for h in headers] - for row in normalised: - for i, cell in enumerate(row): - widths[i] = max(widths[i], len(cell)) - - sep = " " - - # Header row. - use_color = supports_color() and not no_color - header_cells = [h.ljust(widths[i]) for i, h in enumerate(headers)] - header_line = sep.join(header_cells) - if use_color: - header_line = f"\033[{BOLD}m{header_line}\033[0m" - - # Separator line. - dash_line = sep.join("-" * w for w in widths) - - # Data rows. - data_lines: list[str] = [] - for row in normalised: - cells = [cell.ljust(widths[i]) for i, cell in enumerate(row)] - data_lines.append(sep.join(cells)) - - return "\n".join([header_line, dash_line] + data_lines) - - -# --------------------------------------------------------------------------- -# Formatting helpers -# --------------------------------------------------------------------------- - - -def format_size(size_bytes: int) -> str: - """Return a human-readable file size string.""" - if size_bytes < 1024: - return f"{size_bytes} B" - if size_bytes < 1024 * 1024: - return f"{size_bytes / 1024:.1f} KB" - if size_bytes < 1024 * 1024 * 1024: - return f"{size_bytes / (1024 * 1024):.1f} MB" - return f"{size_bytes / (1024 * 1024 * 1024):.1f} GB" - - -def format_timestamp(ts: str | None) -> str: - """Parse an ISO timestamp and return ``MM-DD HH:MM``, or ``\u2014``.""" - if not ts: - return "\u2014" - try: - dt = datetime.fromisoformat(str(ts)) - return dt.strftime("%m-%d %H:%M") - except (ValueError, TypeError): - return "\u2014" - - -def format_cost(cost: float | None) -> str: - """Return a formatted cost string, or '-' if None.""" - if cost is None: - return "-" - if cost < 0.01: - return f"${cost:.3f}" - return f"${cost:.2f}" - - -def format_tokens(count: int | None) -> str: - """Return a human-readable token count string.""" - if count is None or count == 0: - return "-" - if count < 1000: - return str(count) - if count < 1_000_000: - return f"{count / 1000:.1f}K" - return f"{count / 1_000_000:.1f}M" - - -def format_model(model: str | None) -> str: - """Return a short display name for a model ID.""" - if not model: - return "-" - if "opus-4-6" in model: - return "opus4.6" - if "opus-4" in model: - return "opus4" - if "sonnet-4-6" in model: - return "sonnet4.6" - if "sonnet-4-5" in model: - return "sonnet4.5" - if "sonnet-4" in model: - return "sonnet4" - if "haiku-4-5" in model: - return "haiku4.5" - if "haiku-4" in model: - return "haiku4" - if "3-5-sonnet" in model: - return "sonnet3.5" - if "3-5-haiku" in model: - return "haiku3.5" - # Fallback: last segment - return model.split("-")[-1] if "-" in model else model - - -def format_workspace(cwd: str | None) -> str: - """Return the basename of the conversation's working directory.""" - if cwd: - from pathlib import PurePosixPath - return PurePosixPath(cwd).name - return "" - - -def format_json(data: object) -> str: - """Pretty-print *data* as indented JSON.""" - return json.dumps(data, indent=2, default=str) +"""Output formatting utilities for the cchat CLI tool.""" + +from __future__ import annotations + +import json +import os +import sys +from datetime import datetime + +# --------------------------------------------------------------------------- +# Global state +# --------------------------------------------------------------------------- + +_no_color: bool = False + + +def set_no_color(val: bool) -> None: + """Globally force color off (or back on).""" + global _no_color + _no_color = val + + +def supports_color() -> bool: + """Return *True* if the terminal likely supports ANSI color codes.""" + if _no_color: + return False + if os.environ.get("NO_COLOR") is not None: + return False + if not hasattr(sys.stdout, "isatty") or not sys.stdout.isatty(): + return False + return True + + +# --------------------------------------------------------------------------- +# Common ANSI codes +# --------------------------------------------------------------------------- + +RED = 31 +GREEN = 32 +YELLOW = 33 +BLUE = 34 +MAGENTA = 35 +CYAN = 36 +BOLD = 1 +DIM = 2 + + +# --------------------------------------------------------------------------- +# Text helpers +# --------------------------------------------------------------------------- + + +def colored(text: str, color_code: int) -> str: + """Wrap *text* in ANSI escape sequences if color is supported.""" + if not supports_color(): + return text + return f"\033[{color_code}m{text}\033[0m" + + +def truncate_middle(text: str, max_len: int) -> str: + """Truncate *text* by replacing the middle with an ellipsis.""" + if len(text) <= max_len: + return text + if max_len < 3: + return text[:max_len] + half = (max_len - 1) // 2 + end_len = max_len - 1 - half + return text[:half] + "\u2026" + text[-end_len:] + + +def truncate(text: str, max_len: int) -> str: + """Simple end-truncation with an ellipsis.""" + if len(text) <= max_len: + return text + if max_len < 2: + return text[:max_len] + return text[: max_len - 1] + "\u2026" + + +# --------------------------------------------------------------------------- +# Table formatting +# --------------------------------------------------------------------------- + + +def format_table( + rows: list[list[str]], + headers: list[str], + no_color: bool = False, +) -> str: + """Format *rows* as a simple text table with *headers*. + + Columns are left-aligned and separated by two spaces. + """ + num_cols = len(headers) + + # Ensure every row has the right number of columns. + normalised: list[list[str]] = [] + for row in rows: + padded = list(row) + [""] * (num_cols - len(row)) + normalised.append(padded[:num_cols]) + + # Column widths. + widths = [len(h) for h in headers] + for row in normalised: + for i, cell in enumerate(row): + widths[i] = max(widths[i], len(cell)) + + sep = " " + + # Header row. + use_color = supports_color() and not no_color + header_cells = [h.ljust(widths[i]) for i, h in enumerate(headers)] + header_line = sep.join(header_cells) + if use_color: + header_line = f"\033[{BOLD}m{header_line}\033[0m" + + # Separator line. + dash_line = sep.join("-" * w for w in widths) + + # Data rows. + data_lines: list[str] = [] + for row in normalised: + cells = [cell.ljust(widths[i]) for i, cell in enumerate(row)] + data_lines.append(sep.join(cells)) + + return "\n".join([header_line, dash_line] + data_lines) + + +# --------------------------------------------------------------------------- +# Formatting helpers +# --------------------------------------------------------------------------- + + +def format_size(size_bytes: int) -> str: + """Return a human-readable file size string.""" + if size_bytes < 1024: + return f"{size_bytes} B" + if size_bytes < 1024 * 1024: + return f"{size_bytes / 1024:.1f} KB" + if size_bytes < 1024 * 1024 * 1024: + return f"{size_bytes / (1024 * 1024):.1f} MB" + return f"{size_bytes / (1024 * 1024 * 1024):.1f} GB" + + +def format_timestamp(ts: str | None) -> str: + """Parse an ISO timestamp and return ``MM-DD HH:MM``, or ``\u2014``.""" + if not ts: + return "\u2014" + try: + # Python 3.10's fromisoformat doesn't accept a trailing "Z"; + # normalize it to "+00:00" for cross-version compatibility. + s = str(ts) + if s.endswith("Z"): + s = s[:-1] + "+00:00" + dt = datetime.fromisoformat(s) + return dt.strftime("%m-%d %H:%M") + except (ValueError, TypeError): + return "\u2014" + + +def format_cost(cost: float | None) -> str: + """Return a formatted cost string, or '-' if None.""" + if cost is None: + return "-" + if cost < 0.01: + return f"${cost:.3f}" + return f"${cost:.2f}" + + +def format_tokens(count: int | None) -> str: + """Return a human-readable token count string.""" + if count is None or count == 0: + return "-" + if count < 1000: + return str(count) + if count < 1_000_000: + return f"{count / 1000:.1f}K" + return f"{count / 1_000_000:.1f}M" + + +def format_model(model: str | None) -> str: + """Return a short display name for a model ID.""" + if not model: + return "-" + if "opus-4-6" in model: + return "opus4.6" + if "opus-4" in model: + return "opus4" + if "sonnet-4-6" in model: + return "sonnet4.6" + if "sonnet-4-5" in model: + return "sonnet4.5" + if "sonnet-4" in model: + return "sonnet4" + if "haiku-4-5" in model: + return "haiku4.5" + if "haiku-4" in model: + return "haiku4" + if "3-5-sonnet" in model: + return "sonnet3.5" + if "3-5-haiku" in model: + return "haiku3.5" + # Fallback: last segment + return model.split("-")[-1] if "-" in model else model + + +def format_workspace(cwd: str | None) -> str: + """Return the basename of the conversation's working directory. + + Handles both Windows-style (backslash) and POSIX-style (forward slash) + paths regardless of the host OS, since JSONL files may have been + recorded on either platform. + """ + if not cwd: + return "" + normalized = cwd.replace("\\", "/").rstrip("/") + if not normalized: + return "" + return normalized.rsplit("/", 1)[-1] + + +def format_json(data: object) -> str: + """Pretty-print *data* as indented JSON.""" + return json.dumps(data, indent=2, default=str) diff --git a/cchat/parser.py b/cchat/parser.py index c8b741a..8514f29 100644 --- a/cchat/parser.py +++ b/cchat/parser.py @@ -1,282 +1,282 @@ -"""Streaming JSONL parser for Claude Code conversation transcripts.""" - -from __future__ import annotations - -import json -from pathlib import Path -from typing import Iterator - - -def parse_lines(path: Path) -> Iterator[tuple[int, dict]]: - """Open a JSONL file and yield (line_number, parsed_dict) for each line. - - Line numbers are 1-indexed. Malformed lines are silently skipped. - Never loads the full file into memory. - """ - with open(path, encoding="utf-8") as fh: - for line_num, raw in enumerate(fh, start=1): - raw = raw.strip() - if not raw: - continue - try: - data = json.loads(raw) - except (json.JSONDecodeError, ValueError): - continue - yield (line_num, data) - - -def deduplicate_assistant_lines( - lines: Iterator[tuple[int, dict]], -) -> Iterator[tuple[int, dict]]: - """Deduplicate streamed assistant lines by ``message.id``. - - Assistant messages are streamed as multiple JSONL lines sharing the same - ``message.id``. Only the final line for each id (the one with - ``stop_reason`` set, or the last one seen) is yielded. Non-assistant lines - pass through immediately. - """ - # message_id -> (line_num, data) - buffer: dict[str, tuple[int, dict]] = {} - current_id: str | None = None - - for line_num, data in lines: - if data.get("type") != "assistant": - # Flush any buffered assistant entry before yielding a - # non-assistant line, so ordering is preserved. - if current_id is not None and current_id in buffer: - yield buffer.pop(current_id) - current_id = None - yield (line_num, data) - continue - - msg = data.get("message") or {} - msg_id = msg.get("id") - if msg_id is None: - # No message id – pass through as-is. - yield (line_num, data) - continue - - # If we see a *new* message id, flush the previous buffered entry. - if msg_id != current_id and current_id is not None and current_id in buffer: - yield buffer.pop(current_id) - - current_id = msg_id - buffer[msg_id] = (line_num, data) - - # Flush remaining buffered entries. - for entry in buffer.values(): - yield entry - - -def extract_timestamp(line: dict) -> str | None: - """Return the ``timestamp`` field as an ISO string, or *None*.""" - ts = line.get("timestamp") - if ts is None: - return None - return str(ts) - - -def classify_line_subtype(line: dict) -> str: - """Return a short subtype label for a conversation line. - - Subtypes refine the top-level ``type`` to distinguish e.g. user messages - from tool results, or assistant text from tool calls. - """ - line_type = line.get("type", "") - - if line_type == "user": - msg = line.get("message") or {} - content = msg.get("content") - if isinstance(content, list): - return "tool_result" - return "message" - - if line_type == "assistant": - msg = line.get("message") or {} - content = msg.get("content") - if not isinstance(content, list): - return "response" - has_text = False - has_tool = False - has_thinking = False - for item in content: - if not isinstance(item, dict): - continue - it = item.get("type", "") - if it == "text": - has_text = True - elif it == "tool_use": - has_tool = True - elif it == "thinking": - has_thinking = True - parts = [] - if has_thinking: - parts.append("thinking") - if has_text and has_tool: - parts.append("text+tool") - elif has_text: - parts.append("text") - elif has_tool: - parts.append("tool_use") - return "+".join(parts) if parts else "response" - - if line_type == "system": - return line.get("subtype") or "system" - - if line_type == "progress": - return "agent" - - # custom-title, agent-name, file-history-snapshot, etc. - return line_type or "" - - -def extract_token_count(line: dict) -> int | None: - """Return the total token footprint of a line, or *None* if unknown. - - For assistant lines, returns ``input_tokens + output_tokens`` from the - usage block. For other lines, estimates from the serialised content - length (roughly 4 chars per token). - """ - line_type = line.get("type", "") - - if line_type == "assistant": - msg = line.get("message") or {} - usage = msg.get("usage") - if isinstance(usage, dict): - inp = usage.get("input_tokens", 0) - out = usage.get("output_tokens", 0) - cache_read = usage.get("cache_read_input_tokens", 0) - cache_create = usage.get("cache_creation_input_tokens", 0) - total = inp + out + cache_read + cache_create - if total > 0: - return total - return None - - if line_type == "user": - msg = line.get("message") or {} - content = msg.get("content") - if isinstance(content, str): - return max(1, len(content) // 4) - if isinstance(content, list): - # Estimate from serialised JSON length - import json as _json - try: - return max(1, len(_json.dumps(content, default=str)) // 4) - except (TypeError, ValueError): - pass - return None - - return None - - -def extract_content_summary(line: dict, max_len: int = 80) -> str: - """Return a short human-readable summary of a conversation line.""" - line_type = line.get("type", "") - - if line_type == "user": - msg = line.get("message") or {} - content = msg.get("content") - if isinstance(content, str): - text = content.replace("\n", " ").strip() - if len(text) > max_len: - return text[:max_len - 1] + "\u2026" - return text - if isinstance(content, list): - for item in content: - if isinstance(item, dict) and item.get("type") == "tool_result": - return "[tool_result]" - return "[tool_result]" - return "user" - - if line_type == "assistant": - msg = line.get("message") or {} - content = msg.get("content") - if not isinstance(content, list): - return "assistant" - parts: list[str] = [] - first_text: str | None = None - tool_names: list[str] = [] - for item in content: - if not isinstance(item, dict): - continue - if item.get("type") == "text" and first_text is None: - first_text = item.get("text", "").replace("\n", " ").strip() - elif item.get("type") == "tool_use": - name = item.get("name", "tool") - tool_names.append(name) - if first_text: - if len(first_text) > max_len: - first_text = first_text[:max_len - 1] + "\u2026" - parts.append(first_text) - if tool_names: - parts.append("[" + ", ".join(tool_names) + "]") - return " ".join(parts) if parts else "assistant" - - if line_type == "system": - return line.get("subtype") or "system" - - if line_type == "progress": - return "agent progress" - - return line_type or "unknown" - - -def extract_model(line: dict) -> str | None: - """Return the model ID from an assistant message, or None.""" - msg = line.get("message") or {} - return msg.get("model") - - -def extract_usage(line: dict) -> dict | None: - """Extract token usage information from an assistant message. - - Returns a dict with keys ``input_tokens``, ``output_tokens``, - ``cache_read_input_tokens``, ``cache_creation_input_tokens`` (all - defaulting to 0), and optionally ``cache_creation`` (a dict with tier - breakdown), or *None* if usage data is absent. - """ - msg = line.get("message") or {} - usage = msg.get("usage") - if not usage or not isinstance(usage, dict): - return None - result = { - "input_tokens": usage.get("input_tokens", 0), - "output_tokens": usage.get("output_tokens", 0), - "cache_read_input_tokens": usage.get("cache_read_input_tokens", 0), - "cache_creation_input_tokens": usage.get("cache_creation_input_tokens", 0), - } - # Include tier breakdown if present - cc = usage.get("cache_creation") - if isinstance(cc, dict): - result["cache_creation"] = cc - return result - - -def extract_file_modifications(line: dict) -> list[dict] | None: - """Extract file modification info from assistant tool_use calls. - - Looks for ``Write`` and ``Edit`` tool calls and returns a list of - ``{"tool": name, "file_path": path}`` dicts, or *None* if none found. - """ - if line.get("type") != "assistant": - return None - msg = line.get("message") or {} - content = msg.get("content") - if not isinstance(content, list): - return None - - modifications: list[dict] = [] - for item in content: - if not isinstance(item, dict): - continue - if item.get("type") != "tool_use": - continue - name = item.get("name", "") - if name not in ("Write", "Edit"): - continue - inp = item.get("input") or {} - file_path = inp.get("file_path") - if file_path: - modifications.append({"tool": name, "file_path": file_path}) - - return modifications if modifications else None +"""Streaming JSONL parser for Claude Code conversation transcripts.""" + +from __future__ import annotations + +import json +from pathlib import Path +from typing import Iterator + + +def parse_lines(path: Path) -> Iterator[tuple[int, dict]]: + """Open a JSONL file and yield (line_number, parsed_dict) for each line. + + Line numbers are 1-indexed. Malformed lines are silently skipped. + Never loads the full file into memory. + """ + with open(path, encoding="utf-8") as fh: + for line_num, raw in enumerate(fh, start=1): + raw = raw.strip() + if not raw: + continue + try: + data = json.loads(raw) + except (json.JSONDecodeError, ValueError): + continue + yield (line_num, data) + + +def deduplicate_assistant_lines( + lines: Iterator[tuple[int, dict]], +) -> Iterator[tuple[int, dict]]: + """Deduplicate streamed assistant lines by ``message.id``. + + Assistant messages are streamed as multiple JSONL lines sharing the same + ``message.id``. Only the final line for each id (the one with + ``stop_reason`` set, or the last one seen) is yielded. Non-assistant lines + pass through immediately. + """ + # message_id -> (line_num, data) + buffer: dict[str, tuple[int, dict]] = {} + current_id: str | None = None + + for line_num, data in lines: + if data.get("type") != "assistant": + # Flush any buffered assistant entry before yielding a + # non-assistant line, so ordering is preserved. + if current_id is not None and current_id in buffer: + yield buffer.pop(current_id) + current_id = None + yield (line_num, data) + continue + + msg = data.get("message") or {} + msg_id = msg.get("id") + if msg_id is None: + # No message id – pass through as-is. + yield (line_num, data) + continue + + # If we see a *new* message id, flush the previous buffered entry. + if msg_id != current_id and current_id is not None and current_id in buffer: + yield buffer.pop(current_id) + + current_id = msg_id + buffer[msg_id] = (line_num, data) + + # Flush remaining buffered entries. + for entry in buffer.values(): + yield entry + + +def extract_timestamp(line: dict) -> str | None: + """Return the ``timestamp`` field as an ISO string, or *None*.""" + ts = line.get("timestamp") + if ts is None: + return None + return str(ts) + + +def classify_line_subtype(line: dict) -> str: + """Return a short subtype label for a conversation line. + + Subtypes refine the top-level ``type`` to distinguish e.g. user messages + from tool results, or assistant text from tool calls. + """ + line_type = line.get("type", "") + + if line_type == "user": + msg = line.get("message") or {} + content = msg.get("content") + if isinstance(content, list): + return "tool_result" + return "message" + + if line_type == "assistant": + msg = line.get("message") or {} + content = msg.get("content") + if not isinstance(content, list): + return "response" + has_text = False + has_tool = False + has_thinking = False + for item in content: + if not isinstance(item, dict): + continue + it = item.get("type", "") + if it == "text": + has_text = True + elif it == "tool_use": + has_tool = True + elif it == "thinking": + has_thinking = True + parts = [] + if has_thinking: + parts.append("thinking") + if has_text and has_tool: + parts.append("text+tool") + elif has_text: + parts.append("text") + elif has_tool: + parts.append("tool_use") + return "+".join(parts) if parts else "response" + + if line_type == "system": + return line.get("subtype") or "system" + + if line_type == "progress": + return "agent" + + # custom-title, agent-name, file-history-snapshot, etc. + return line_type or "" + + +def extract_token_count(line: dict) -> int | None: + """Return the total token footprint of a line, or *None* if unknown. + + For assistant lines, returns ``input_tokens + output_tokens`` from the + usage block. For other lines, estimates from the serialised content + length (roughly 4 chars per token). + """ + line_type = line.get("type", "") + + if line_type == "assistant": + msg = line.get("message") or {} + usage = msg.get("usage") + if isinstance(usage, dict): + inp = usage.get("input_tokens", 0) + out = usage.get("output_tokens", 0) + cache_read = usage.get("cache_read_input_tokens", 0) + cache_create = usage.get("cache_creation_input_tokens", 0) + total = inp + out + cache_read + cache_create + if total > 0: + return total + return None + + if line_type == "user": + msg = line.get("message") or {} + content = msg.get("content") + if isinstance(content, str): + return max(1, len(content) // 4) + if isinstance(content, list): + # Estimate from serialised JSON length + import json as _json + try: + return max(1, len(_json.dumps(content, default=str)) // 4) + except (TypeError, ValueError): + pass + return None + + return None + + +def extract_content_summary(line: dict, max_len: int = 80) -> str: + """Return a short human-readable summary of a conversation line.""" + line_type = line.get("type", "") + + if line_type == "user": + msg = line.get("message") or {} + content = msg.get("content") + if isinstance(content, str): + text = content.replace("\n", " ").strip() + if len(text) > max_len: + return text[:max_len - 1] + "\u2026" + return text + if isinstance(content, list): + for item in content: + if isinstance(item, dict) and item.get("type") == "tool_result": + return "[tool_result]" + return "[tool_result]" + return "user" + + if line_type == "assistant": + msg = line.get("message") or {} + content = msg.get("content") + if not isinstance(content, list): + return "assistant" + parts: list[str] = [] + first_text: str | None = None + tool_names: list[str] = [] + for item in content: + if not isinstance(item, dict): + continue + if item.get("type") == "text" and first_text is None: + first_text = item.get("text", "").replace("\n", " ").strip() + elif item.get("type") == "tool_use": + name = item.get("name", "tool") + tool_names.append(name) + if first_text: + if len(first_text) > max_len: + first_text = first_text[:max_len - 1] + "\u2026" + parts.append(first_text) + if tool_names: + parts.append("[" + ", ".join(tool_names) + "]") + return " ".join(parts) if parts else "assistant" + + if line_type == "system": + return line.get("subtype") or "system" + + if line_type == "progress": + return "agent progress" + + return line_type or "unknown" + + +def extract_model(line: dict) -> str | None: + """Return the model ID from an assistant message, or None.""" + msg = line.get("message") or {} + return msg.get("model") + + +def extract_usage(line: dict) -> dict | None: + """Extract token usage information from an assistant message. + + Returns a dict with keys ``input_tokens``, ``output_tokens``, + ``cache_read_input_tokens``, ``cache_creation_input_tokens`` (all + defaulting to 0), and optionally ``cache_creation`` (a dict with tier + breakdown), or *None* if usage data is absent. + """ + msg = line.get("message") or {} + usage = msg.get("usage") + if not usage or not isinstance(usage, dict): + return None + result = { + "input_tokens": usage.get("input_tokens", 0), + "output_tokens": usage.get("output_tokens", 0), + "cache_read_input_tokens": usage.get("cache_read_input_tokens", 0), + "cache_creation_input_tokens": usage.get("cache_creation_input_tokens", 0), + } + # Include tier breakdown if present + cc = usage.get("cache_creation") + if isinstance(cc, dict): + result["cache_creation"] = cc + return result + + +def extract_file_modifications(line: dict) -> list[dict] | None: + """Extract file modification info from assistant tool_use calls. + + Looks for ``Write`` and ``Edit`` tool calls and returns a list of + ``{"tool": name, "file_path": path}`` dicts, or *None* if none found. + """ + if line.get("type") != "assistant": + return None + msg = line.get("message") or {} + content = msg.get("content") + if not isinstance(content, list): + return None + + modifications: list[dict] = [] + for item in content: + if not isinstance(item, dict): + continue + if item.get("type") != "tool_use": + continue + name = item.get("name", "") + if name not in ("Write", "Edit"): + continue + inp = item.get("input") or {} + file_path = inp.get("file_path") + if file_path: + modifications.append({"tool": name, "file_path": file_path}) + + return modifications if modifications else None diff --git a/cchat/store.py b/cchat/store.py index 3f5d97f..611ab33 100644 --- a/cchat/store.py +++ b/cchat/store.py @@ -1,582 +1,582 @@ -"""Conversation discovery and resolution module.""" - -from __future__ import annotations - -import json -import os -from dataclasses import dataclass -from pathlib import Path -from typing import TYPE_CHECKING - -if TYPE_CHECKING: - from cchat.costs import CostCache, TokenBreakdown - -CLAUDE_DIR = Path.home() / ".claude" -PROJECTS_DIR = CLAUDE_DIR / "projects" -SESSIONS_DIR = CLAUDE_DIR / "sessions" -HISTORY_FILE = CLAUDE_DIR / "history.jsonl" - - -def _safe_subdirs(parent: Path) -> list[Path]: - """List subdirectories, skipping symlinks and inaccessible entries.""" - dirs: list[Path] = [] - for entry in parent.iterdir(): - if entry.is_symlink(): - continue - try: - if entry.is_dir(): - dirs.append(entry) - except OSError: - continue - return dirs - - -@dataclass -class ConversationInfo: - path: Path - uuid: str - project_key: str - size: int - first_timestamp: str | None = None - last_timestamp: str | None = None - slug: str | None = None - snippet: str | None = None - turn_count: int = 0 - agent_count: int = 0 - total_tokens: int = 0 - model: str | None = None - estimated_cost_usd: float | None = None - session_id: str | None = None - name: str | None = None - cwd: str | None = None - - -@dataclass -class SubagentInfo: - """Metadata for a single subagent JSONL file.""" - path: Path - agent_id: str - conversation_uuid: str - project_key: str - size: int - prompt_snippet: str | None = None - first_timestamp: str | None = None - last_timestamp: str | None = None - line_count: int = 0 - turn_count: int = 0 - total_tokens: int = 0 - model: str | None = None - estimated_cost_usd: float | None = None - - -def _parse_line(raw: str) -> dict | None: - """Parse a single JSONL line, returning None on failure.""" - try: - return json.loads(raw) - except (json.JSONDecodeError, ValueError): - return None - - -def _is_user_turn(obj: dict) -> bool: - """Check if this line is a human user message (string content, not tool result).""" - if obj.get("type") != "user": - return False - msg = obj.get("message", {}) - content = msg.get("content") if isinstance(msg, dict) else None - return isinstance(content, str) - - -def _is_agent_call(obj: dict) -> bool: - """Check if this assistant line contains an Agent tool_use call.""" - if obj.get("type") != "assistant": - return False - msg = obj.get("message", {}) - if not isinstance(msg, dict): - return False - content = msg.get("content", []) - if not isinstance(content, list): - return False - for item in content: - if isinstance(item, dict) and item.get("type") == "tool_use" and item.get("name") == "Agent": - return True - return False - - -def _extract_snippet(obj: dict) -> str | None: - """Extract first 60 chars of user message content.""" - msg = obj.get("message", {}) - if not isinstance(msg, dict): - return None - content = msg.get("content") - if isinstance(content, str): - text = content.strip().replace("\n", " ") - return text[:60] if len(text) > 60 else text - return None - - -def _scan_conversation(path: Path) -> ConversationInfo: - """Scan a conversation JSONL file for metadata.""" - uuid = path.stem - project_key = path.parent.name - size = os.path.getsize(path) - - first_timestamp = None - last_timestamp = None - slug = None - snippet = None - session_id = None - cwd = None - turn_count = 0 - agent_count = 0 - model = None - - # Scan first 100 lines for metadata + start counting - with open(path, "r", encoding="utf-8") as f: - for i, raw_line in enumerate(f): - if i >= 100: - break - obj = _parse_line(raw_line) - if obj is None: - continue - - ts = obj.get("timestamp") - if ts: - if first_timestamp is None: - first_timestamp = ts - last_timestamp = ts - - if slug is None and obj.get("slug"): - slug = obj["slug"] - - if session_id is None and obj.get("sessionId"): - session_id = obj["sessionId"] - - if cwd is None and obj.get("cwd"): - cwd = obj["cwd"] - - if snippet is None and _is_user_turn(obj): - snippet = _extract_snippet(obj) - - if model is None and obj.get("type") == "assistant": - msg = obj.get("message") or {} - if isinstance(msg, dict) and msg.get("model"): - model = msg["model"] - - if _is_user_turn(obj): - turn_count += 1 - if _is_agent_call(obj): - agent_count += 1 - - # Continue scanning remaining lines for counts and last_timestamp - for raw_line in f: - obj = _parse_line(raw_line) - if obj is None: - continue - - ts = obj.get("timestamp") - if ts: - last_timestamp = ts - - if _is_user_turn(obj): - turn_count += 1 - if _is_agent_call(obj): - agent_count += 1 - - # Optimize last_timestamp: read last 4KB for final timestamp - if size > 4096: - try: - with open(path, "rb") as f: - f.seek(max(0, size - 4096)) - tail = f.read().decode("utf-8", errors="replace") - for raw_line in reversed(tail.splitlines()): - obj = _parse_line(raw_line) - if obj and obj.get("timestamp"): - last_timestamp = obj["timestamp"] - break - except OSError: - pass - - return ConversationInfo( - path=path, - uuid=uuid, - project_key=project_key, - size=size, - first_timestamp=first_timestamp, - last_timestamp=last_timestamp, - slug=slug, - snippet=snippet, - turn_count=turn_count, - agent_count=agent_count, - model=model, - session_id=session_id, - cwd=cwd, - ) - - -def _load_session_names() -> dict[str, str]: - """Load session names from ~/.claude/sessions/*.json. - - Returns a mapping of sessionId -> name for sessions that have been renamed. - """ - names: dict[str, str] = {} - if not SESSIONS_DIR.exists(): - return names - for path in SESSIONS_DIR.glob("*.json"): - try: - with open(path, "r", encoding="utf-8") as f: - data = json.load(f) - sid = data.get("sessionId") - name = data.get("name") - if sid and name: - names[sid] = name - except (json.JSONDecodeError, OSError): - continue - return names - - -def _resolve_project_key(project_key: str) -> str | None: - """Resolve a project key, trying exact match then substring match. - - Returns the matched directory name, or None if no match found. - """ - exact = PROJECTS_DIR / project_key - if exact.exists() and exact.is_dir(): - return project_key - - # Substring match against known project directory names - needle = project_key.lower() - candidates = [d.name for d in _safe_subdirs(PROJECTS_DIR)] - matches = [c for c in candidates if needle in c.lower()] - - if len(matches) == 1: - return matches[0] - if len(matches) > 1: - import sys - - print( - f"Ambiguous project key '{project_key}' matches:\n" - + "\n".join(f" {m}" for m in sorted(matches)), - file=sys.stderr, - ) - return None - return None - - -def discover_conversations(project_key: str | None = None) -> list[ConversationInfo]: - """Discover all conversations under ~/.claude/projects/. - - Args: - project_key: If given, only scan that project subdirectory. - Supports exact directory names and substring matching. - - Returns: - List of ConversationInfo for each conversation found. - """ - if not PROJECTS_DIR.exists(): - return [] - - results: list[ConversationInfo] = [] - - if project_key: - resolved = _resolve_project_key(project_key) - if resolved is None: - return [] - search_dirs = [PROJECTS_DIR / resolved] - else: - search_dirs = _safe_subdirs(PROJECTS_DIR) - - for proj_dir in search_dirs: - if not proj_dir.exists(): - continue - for jsonl_file in proj_dir.glob("*.jsonl"): - # Skip files in subagents/ subdirectories - if "subagents" in jsonl_file.parts: - continue - results.append(_scan_conversation(jsonl_file)) - - # Populate session names from ~/.claude/sessions/*.json - session_names = _load_session_names() - for conv in results: - if conv.session_id and conv.session_id in session_names: - conv.name = session_names[conv.session_id] - - return results - - -def resolve_conversation(identifier: str, project_key: str | None = None) -> Path: - """Resolve a conversation identifier to its JSONL file path. - - Resolution order: - 1. Valid file path that exists - 2. Exact UUID match - 3. UUID prefix (4+ chars) - 4. Slug match - 5. Raise SystemExit - - Args: - identifier: File path, UUID, UUID prefix, or slug. - project_key: Optional project key to narrow search. - - Returns: - Path to the conversation JSONL file. - """ - # 1. Direct file path - candidate = Path(identifier) - if candidate.exists() and candidate.is_file(): - return candidate - - if not PROJECTS_DIR.exists(): - raise SystemExit(f"No conversations found: {PROJECTS_DIR} does not exist") - - if project_key: - resolved = _resolve_project_key(project_key) - search_pattern = resolved if resolved else project_key - else: - search_pattern = "*" - - # 2. Exact UUID match - matches = list(PROJECTS_DIR.glob(f"{search_pattern}/{identifier}.jsonl")) - if len(matches) == 1: - return matches[0] - if len(matches) > 1: - projects = [m.parent.name for m in matches] - raise SystemExit( - f"UUID {identifier} found in multiple projects: {', '.join(projects)}. " - "Use --project to disambiguate." - ) - - # 3. UUID prefix (4+ chars) - if len(identifier) >= 4: - matches = list(PROJECTS_DIR.glob(f"{search_pattern}/{identifier}*.jsonl")) - # Filter out subagent files - matches = [m for m in matches if "subagents" not in m.parts] - if len(matches) == 1: - return matches[0] - if len(matches) > 1: - uuids = [m.stem for m in matches] - raise SystemExit( - f"Prefix '{identifier}' matches multiple conversations:\n" - + "\n".join(f" {u}" for u in uuids) - ) - - # 4. Slug match - if project_key: - search_dirs = [PROJECTS_DIR / project_key] - else: - search_dirs = _safe_subdirs(PROJECTS_DIR) - - for proj_dir in search_dirs: - if not proj_dir.exists(): - continue - for jsonl_file in proj_dir.glob("*.jsonl"): - if "subagents" in jsonl_file.parts: - continue - try: - with open(jsonl_file, "r", encoding="utf-8") as f: - for i, raw_line in enumerate(f): - if i >= 50: - break - obj = _parse_line(raw_line) - if obj and obj.get("slug") == identifier: - return jsonl_file - except OSError: - continue - - raise SystemExit(f"Could not resolve conversation: '{identifier}'") - - -def resolve_agent(agent_id: str) -> Path: - """Resolve a subagent ID to its JSONL file path. - - Globs for ``~/.claude/projects/**/agent-.jsonl``. - - Args: - agent_id: Hex agent identifier (e.g. ``ad37c9994945aa7c4``). - - Returns: - Path to the agent JSONL file. - """ - if not PROJECTS_DIR.exists(): - raise SystemExit(f"No conversations found: {PROJECTS_DIR} does not exist") - - matches = sorted( - PROJECTS_DIR.glob(f"**/agent-{agent_id}.jsonl"), - key=lambda p: p.stat().st_mtime, - reverse=True, - ) - - if not matches: - raise SystemExit(f"No subagent found with ID '{agent_id}'") - - return matches[0] - - -def list_projects() -> list[str]: - """Return directory names under the projects dir.""" - if not PROJECTS_DIR.exists(): - return [] - return sorted(d.name for d in _safe_subdirs(PROJECTS_DIR)) - - -def get_subagent_paths(conv_path: Path) -> list[Path]: - """Find subagent JSONL files for a conversation. - - Looks for /subagents/agent-*.jsonl relative to the conversation file. - """ - uuid = conv_path.stem - subagent_dir = conv_path.parent / uuid / "subagents" - if not subagent_dir.exists(): - return [] - return sorted(subagent_dir.glob("agent-*.jsonl")) - - -def _scan_subagent(path: Path, conv_uuid: str, project_key: str) -> SubagentInfo: - """Scan a subagent JSONL file for metadata.""" - # Extract agent ID from filename: agent-.jsonl - agent_id = path.stem - if agent_id.startswith("agent-"): - agent_id = agent_id[len("agent-"):] - - size = os.path.getsize(path) - first_timestamp = None - last_timestamp = None - prompt_snippet = None - line_count = 0 - turn_count = 0 - model = None - - with open(path, "r", encoding="utf-8") as f: - for raw_line in f: - obj = _parse_line(raw_line) - if obj is None: - continue - line_count += 1 - - ts = obj.get("timestamp") - if ts: - if first_timestamp is None: - first_timestamp = ts - last_timestamp = ts - - # First user message with string content is the prompt - if prompt_snippet is None and _is_user_turn(obj): - prompt_snippet = _extract_snippet(obj) - - if model is None and obj.get("type") == "assistant": - msg = obj.get("message") or {} - if isinstance(msg, dict) and msg.get("model"): - model = msg["model"] - - if _is_user_turn(obj): - turn_count += 1 - - return SubagentInfo( - path=path, - agent_id=agent_id, - conversation_uuid=conv_uuid, - project_key=project_key, - size=size, - prompt_snippet=prompt_snippet, - first_timestamp=first_timestamp, - last_timestamp=last_timestamp, - line_count=line_count, - turn_count=turn_count, - model=model, - ) - - -def list_subagents(conv_path: Path) -> list[SubagentInfo]: - """List all subagents for a conversation with metadata. - - Args: - conv_path: Path to the conversation JSONL file. - - Returns: - List of SubagentInfo sorted by first_timestamp (earliest first). - """ - conv_uuid = conv_path.stem - project_key = conv_path.parent.name - sa_paths = get_subagent_paths(conv_path) - - results = [] - for sa_path in sa_paths: - results.append(_scan_subagent(sa_path, conv_uuid, project_key)) - - # Sort by timestamp (None sorts last) - results.sort(key=lambda s: s.first_timestamp or "") - return results - - -def get_conversation_cost(info: ConversationInfo, cache: "CostCache") -> float: - """Return cost for conversation + all subagents. Per-file costs cached independently. - - Also populates ``info.total_tokens`` as a side effect. - - Returns: - Total cost in USD (main + all subagents). - """ - from cchat.costs import compute_file_cost - - # Main conversation - mtime = info.path.stat().st_mtime - cached = cache.get(info.uuid, mtime, info.size) - if cached is not None: - total_cost, total_tokens = cached - else: - total_cost, total_tokens = compute_file_cost(info.path) - cache.set(info.uuid, mtime, info.size, total_cost, total_tokens) - - info.total_tokens = total_tokens - - # Subagents - for sa_path in get_subagent_paths(info.path): - sa_id = sa_path.stem - if sa_id.startswith("agent-"): - sa_id = sa_id[len("agent-") :] - sa_size = sa_path.stat().st_size - sa_mtime = sa_path.stat().st_mtime - sa_cached = cache.get(sa_id, sa_mtime, sa_size) - if sa_cached is not None: - sa_cost, sa_tokens = sa_cached - else: - sa_cost, sa_tokens = compute_file_cost(sa_path) - cache.set(sa_id, sa_mtime, sa_size, sa_cost, sa_tokens) - total_cost += sa_cost - info.total_tokens += sa_tokens - - return total_cost - - -def get_conversation_tokens( - info: ConversationInfo, *, include_subagents: bool = False -) -> "TokenBreakdown": - """Return a TokenBreakdown for a conversation. - - Args: - info: Conversation metadata. - include_subagents: If True, also sum tokens from subagent JSONL files. - Default False to match cost calculation scope (which only counts - the parent file). - """ - from cchat.costs import compute_file_tokens - - bd = compute_file_tokens(info.path) - if include_subagents: - for sa_path in get_subagent_paths(info.path): - bd += compute_file_tokens(sa_path) - return bd - - -def get_subagent_stats(subagent: SubagentInfo, cache: "CostCache") -> None: - """Populate cost and tokens on a SubagentInfo (mutates in place).""" - from cchat.costs import compute_file_cost - - sa_mtime = subagent.path.stat().st_mtime - cached = cache.get(subagent.agent_id, sa_mtime, subagent.size) - if cached is not None: - subagent.estimated_cost_usd, subagent.total_tokens = cached - else: - cost, tokens = compute_file_cost(subagent.path) - cache.set(subagent.agent_id, sa_mtime, subagent.size, cost, tokens) - subagent.estimated_cost_usd = cost - subagent.total_tokens = tokens +"""Conversation discovery and resolution module.""" + +from __future__ import annotations + +import json +import os +from dataclasses import dataclass +from pathlib import Path +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from cchat.costs import CostCache, TokenBreakdown + +CLAUDE_DIR = Path.home() / ".claude" +PROJECTS_DIR = CLAUDE_DIR / "projects" +SESSIONS_DIR = CLAUDE_DIR / "sessions" +HISTORY_FILE = CLAUDE_DIR / "history.jsonl" + + +def _safe_subdirs(parent: Path) -> list[Path]: + """List subdirectories, skipping symlinks and inaccessible entries.""" + dirs: list[Path] = [] + for entry in parent.iterdir(): + if entry.is_symlink(): + continue + try: + if entry.is_dir(): + dirs.append(entry) + except OSError: + continue + return dirs + + +@dataclass +class ConversationInfo: + path: Path + uuid: str + project_key: str + size: int + first_timestamp: str | None = None + last_timestamp: str | None = None + slug: str | None = None + snippet: str | None = None + turn_count: int = 0 + agent_count: int = 0 + total_tokens: int = 0 + model: str | None = None + estimated_cost_usd: float | None = None + session_id: str | None = None + name: str | None = None + cwd: str | None = None + + +@dataclass +class SubagentInfo: + """Metadata for a single subagent JSONL file.""" + path: Path + agent_id: str + conversation_uuid: str + project_key: str + size: int + prompt_snippet: str | None = None + first_timestamp: str | None = None + last_timestamp: str | None = None + line_count: int = 0 + turn_count: int = 0 + total_tokens: int = 0 + model: str | None = None + estimated_cost_usd: float | None = None + + +def _parse_line(raw: str) -> dict | None: + """Parse a single JSONL line, returning None on failure.""" + try: + return json.loads(raw) + except (json.JSONDecodeError, ValueError): + return None + + +def _is_user_turn(obj: dict) -> bool: + """Check if this line is a human user message (string content, not tool result).""" + if obj.get("type") != "user": + return False + msg = obj.get("message", {}) + content = msg.get("content") if isinstance(msg, dict) else None + return isinstance(content, str) + + +def _is_agent_call(obj: dict) -> bool: + """Check if this assistant line contains an Agent tool_use call.""" + if obj.get("type") != "assistant": + return False + msg = obj.get("message", {}) + if not isinstance(msg, dict): + return False + content = msg.get("content", []) + if not isinstance(content, list): + return False + for item in content: + if isinstance(item, dict) and item.get("type") == "tool_use" and item.get("name") == "Agent": + return True + return False + + +def _extract_snippet(obj: dict) -> str | None: + """Extract first 60 chars of user message content.""" + msg = obj.get("message", {}) + if not isinstance(msg, dict): + return None + content = msg.get("content") + if isinstance(content, str): + text = content.strip().replace("\n", " ") + return text[:60] if len(text) > 60 else text + return None + + +def _scan_conversation(path: Path) -> ConversationInfo: + """Scan a conversation JSONL file for metadata.""" + uuid = path.stem + project_key = path.parent.name + size = os.path.getsize(path) + + first_timestamp = None + last_timestamp = None + slug = None + snippet = None + session_id = None + cwd = None + turn_count = 0 + agent_count = 0 + model = None + + # Scan first 100 lines for metadata + start counting + with open(path, "r", encoding="utf-8") as f: + for i, raw_line in enumerate(f): + if i >= 100: + break + obj = _parse_line(raw_line) + if obj is None: + continue + + ts = obj.get("timestamp") + if ts: + if first_timestamp is None: + first_timestamp = ts + last_timestamp = ts + + if slug is None and obj.get("slug"): + slug = obj["slug"] + + if session_id is None and obj.get("sessionId"): + session_id = obj["sessionId"] + + if cwd is None and obj.get("cwd"): + cwd = obj["cwd"] + + if snippet is None and _is_user_turn(obj): + snippet = _extract_snippet(obj) + + if model is None and obj.get("type") == "assistant": + msg = obj.get("message") or {} + if isinstance(msg, dict) and msg.get("model"): + model = msg["model"] + + if _is_user_turn(obj): + turn_count += 1 + if _is_agent_call(obj): + agent_count += 1 + + # Continue scanning remaining lines for counts and last_timestamp + for raw_line in f: + obj = _parse_line(raw_line) + if obj is None: + continue + + ts = obj.get("timestamp") + if ts: + last_timestamp = ts + + if _is_user_turn(obj): + turn_count += 1 + if _is_agent_call(obj): + agent_count += 1 + + # Optimize last_timestamp: read last 4KB for final timestamp + if size > 4096: + try: + with open(path, "rb") as f: + f.seek(max(0, size - 4096)) + tail = f.read().decode("utf-8", errors="replace") + for raw_line in reversed(tail.splitlines()): + obj = _parse_line(raw_line) + if obj and obj.get("timestamp"): + last_timestamp = obj["timestamp"] + break + except OSError: + pass + + return ConversationInfo( + path=path, + uuid=uuid, + project_key=project_key, + size=size, + first_timestamp=first_timestamp, + last_timestamp=last_timestamp, + slug=slug, + snippet=snippet, + turn_count=turn_count, + agent_count=agent_count, + model=model, + session_id=session_id, + cwd=cwd, + ) + + +def _load_session_names() -> dict[str, str]: + """Load session names from ~/.claude/sessions/*.json. + + Returns a mapping of sessionId -> name for sessions that have been renamed. + """ + names: dict[str, str] = {} + if not SESSIONS_DIR.exists(): + return names + for path in SESSIONS_DIR.glob("*.json"): + try: + with open(path, "r", encoding="utf-8") as f: + data = json.load(f) + sid = data.get("sessionId") + name = data.get("name") + if sid and name: + names[sid] = name + except (json.JSONDecodeError, OSError): + continue + return names + + +def _resolve_project_key(project_key: str) -> str | None: + """Resolve a project key, trying exact match then substring match. + + Returns the matched directory name, or None if no match found. + """ + exact = PROJECTS_DIR / project_key + if exact.exists() and exact.is_dir(): + return project_key + + # Substring match against known project directory names + needle = project_key.lower() + candidates = [d.name for d in _safe_subdirs(PROJECTS_DIR)] + matches = [c for c in candidates if needle in c.lower()] + + if len(matches) == 1: + return matches[0] + if len(matches) > 1: + import sys + + print( + f"Ambiguous project key '{project_key}' matches:\n" + + "\n".join(f" {m}" for m in sorted(matches)), + file=sys.stderr, + ) + return None + return None + + +def discover_conversations(project_key: str | None = None) -> list[ConversationInfo]: + """Discover all conversations under ~/.claude/projects/. + + Args: + project_key: If given, only scan that project subdirectory. + Supports exact directory names and substring matching. + + Returns: + List of ConversationInfo for each conversation found. + """ + if not PROJECTS_DIR.exists(): + return [] + + results: list[ConversationInfo] = [] + + if project_key: + resolved = _resolve_project_key(project_key) + if resolved is None: + return [] + search_dirs = [PROJECTS_DIR / resolved] + else: + search_dirs = _safe_subdirs(PROJECTS_DIR) + + for proj_dir in search_dirs: + if not proj_dir.exists(): + continue + for jsonl_file in proj_dir.glob("*.jsonl"): + # Skip files in subagents/ subdirectories + if "subagents" in jsonl_file.parts: + continue + results.append(_scan_conversation(jsonl_file)) + + # Populate session names from ~/.claude/sessions/*.json + session_names = _load_session_names() + for conv in results: + if conv.session_id and conv.session_id in session_names: + conv.name = session_names[conv.session_id] + + return results + + +def resolve_conversation(identifier: str, project_key: str | None = None) -> Path: + """Resolve a conversation identifier to its JSONL file path. + + Resolution order: + 1. Valid file path that exists + 2. Exact UUID match + 3. UUID prefix (4+ chars) + 4. Slug match + 5. Raise SystemExit + + Args: + identifier: File path, UUID, UUID prefix, or slug. + project_key: Optional project key to narrow search. + + Returns: + Path to the conversation JSONL file. + """ + # 1. Direct file path + candidate = Path(identifier) + if candidate.exists() and candidate.is_file(): + return candidate + + if not PROJECTS_DIR.exists(): + raise SystemExit(f"No conversations found: {PROJECTS_DIR} does not exist") + + if project_key: + resolved = _resolve_project_key(project_key) + search_pattern = resolved if resolved else project_key + else: + search_pattern = "*" + + # 2. Exact UUID match + matches = list(PROJECTS_DIR.glob(f"{search_pattern}/{identifier}.jsonl")) + if len(matches) == 1: + return matches[0] + if len(matches) > 1: + projects = [m.parent.name for m in matches] + raise SystemExit( + f"UUID {identifier} found in multiple projects: {', '.join(projects)}. " + "Use --project to disambiguate." + ) + + # 3. UUID prefix (4+ chars) + if len(identifier) >= 4: + matches = list(PROJECTS_DIR.glob(f"{search_pattern}/{identifier}*.jsonl")) + # Filter out subagent files + matches = [m for m in matches if "subagents" not in m.parts] + if len(matches) == 1: + return matches[0] + if len(matches) > 1: + uuids = [m.stem for m in matches] + raise SystemExit( + f"Prefix '{identifier}' matches multiple conversations:\n" + + "\n".join(f" {u}" for u in uuids) + ) + + # 4. Slug match + if project_key: + search_dirs = [PROJECTS_DIR / project_key] + else: + search_dirs = _safe_subdirs(PROJECTS_DIR) + + for proj_dir in search_dirs: + if not proj_dir.exists(): + continue + for jsonl_file in proj_dir.glob("*.jsonl"): + if "subagents" in jsonl_file.parts: + continue + try: + with open(jsonl_file, "r", encoding="utf-8") as f: + for i, raw_line in enumerate(f): + if i >= 50: + break + obj = _parse_line(raw_line) + if obj and obj.get("slug") == identifier: + return jsonl_file + except OSError: + continue + + raise SystemExit(f"Could not resolve conversation: '{identifier}'") + + +def resolve_agent(agent_id: str) -> Path: + """Resolve a subagent ID to its JSONL file path. + + Globs for ``~/.claude/projects/**/agent-.jsonl``. + + Args: + agent_id: Hex agent identifier (e.g. ``ad37c9994945aa7c4``). + + Returns: + Path to the agent JSONL file. + """ + if not PROJECTS_DIR.exists(): + raise SystemExit(f"No conversations found: {PROJECTS_DIR} does not exist") + + matches = sorted( + PROJECTS_DIR.glob(f"**/agent-{agent_id}.jsonl"), + key=lambda p: p.stat().st_mtime, + reverse=True, + ) + + if not matches: + raise SystemExit(f"No subagent found with ID '{agent_id}'") + + return matches[0] + + +def list_projects() -> list[str]: + """Return directory names under the projects dir.""" + if not PROJECTS_DIR.exists(): + return [] + return sorted(d.name for d in _safe_subdirs(PROJECTS_DIR)) + + +def get_subagent_paths(conv_path: Path) -> list[Path]: + """Find subagent JSONL files for a conversation. + + Looks for /subagents/agent-*.jsonl relative to the conversation file. + """ + uuid = conv_path.stem + subagent_dir = conv_path.parent / uuid / "subagents" + if not subagent_dir.exists(): + return [] + return sorted(subagent_dir.glob("agent-*.jsonl")) + + +def _scan_subagent(path: Path, conv_uuid: str, project_key: str) -> SubagentInfo: + """Scan a subagent JSONL file for metadata.""" + # Extract agent ID from filename: agent-.jsonl + agent_id = path.stem + if agent_id.startswith("agent-"): + agent_id = agent_id[len("agent-"):] + + size = os.path.getsize(path) + first_timestamp = None + last_timestamp = None + prompt_snippet = None + line_count = 0 + turn_count = 0 + model = None + + with open(path, "r", encoding="utf-8") as f: + for raw_line in f: + obj = _parse_line(raw_line) + if obj is None: + continue + line_count += 1 + + ts = obj.get("timestamp") + if ts: + if first_timestamp is None: + first_timestamp = ts + last_timestamp = ts + + # First user message with string content is the prompt + if prompt_snippet is None and _is_user_turn(obj): + prompt_snippet = _extract_snippet(obj) + + if model is None and obj.get("type") == "assistant": + msg = obj.get("message") or {} + if isinstance(msg, dict) and msg.get("model"): + model = msg["model"] + + if _is_user_turn(obj): + turn_count += 1 + + return SubagentInfo( + path=path, + agent_id=agent_id, + conversation_uuid=conv_uuid, + project_key=project_key, + size=size, + prompt_snippet=prompt_snippet, + first_timestamp=first_timestamp, + last_timestamp=last_timestamp, + line_count=line_count, + turn_count=turn_count, + model=model, + ) + + +def list_subagents(conv_path: Path) -> list[SubagentInfo]: + """List all subagents for a conversation with metadata. + + Args: + conv_path: Path to the conversation JSONL file. + + Returns: + List of SubagentInfo sorted by first_timestamp (earliest first). + """ + conv_uuid = conv_path.stem + project_key = conv_path.parent.name + sa_paths = get_subagent_paths(conv_path) + + results = [] + for sa_path in sa_paths: + results.append(_scan_subagent(sa_path, conv_uuid, project_key)) + + # Sort by timestamp (None sorts last) + results.sort(key=lambda s: s.first_timestamp or "") + return results + + +def get_conversation_cost(info: ConversationInfo, cache: "CostCache") -> float: + """Return cost for conversation + all subagents. Per-file costs cached independently. + + Also populates ``info.total_tokens`` as a side effect. + + Returns: + Total cost in USD (main + all subagents). + """ + from cchat.costs import compute_file_cost + + # Main conversation + mtime = info.path.stat().st_mtime + cached = cache.get(info.uuid, mtime, info.size) + if cached is not None: + total_cost, total_tokens = cached + else: + total_cost, total_tokens = compute_file_cost(info.path) + cache.set(info.uuid, mtime, info.size, total_cost, total_tokens) + + info.total_tokens = total_tokens + + # Subagents + for sa_path in get_subagent_paths(info.path): + sa_id = sa_path.stem + if sa_id.startswith("agent-"): + sa_id = sa_id[len("agent-") :] + sa_size = sa_path.stat().st_size + sa_mtime = sa_path.stat().st_mtime + sa_cached = cache.get(sa_id, sa_mtime, sa_size) + if sa_cached is not None: + sa_cost, sa_tokens = sa_cached + else: + sa_cost, sa_tokens = compute_file_cost(sa_path) + cache.set(sa_id, sa_mtime, sa_size, sa_cost, sa_tokens) + total_cost += sa_cost + info.total_tokens += sa_tokens + + return total_cost + + +def get_conversation_tokens( + info: ConversationInfo, *, include_subagents: bool = False +) -> "TokenBreakdown": + """Return a TokenBreakdown for a conversation. + + Args: + info: Conversation metadata. + include_subagents: If True, also sum tokens from subagent JSONL files. + Default False to match cost calculation scope (which only counts + the parent file). + """ + from cchat.costs import compute_file_tokens + + bd = compute_file_tokens(info.path) + if include_subagents: + for sa_path in get_subagent_paths(info.path): + bd += compute_file_tokens(sa_path) + return bd + + +def get_subagent_stats(subagent: SubagentInfo, cache: "CostCache") -> None: + """Populate cost and tokens on a SubagentInfo (mutates in place).""" + from cchat.costs import compute_file_cost + + sa_mtime = subagent.path.stat().st_mtime + cached = cache.get(subagent.agent_id, sa_mtime, subagent.size) + if cached is not None: + subagent.estimated_cost_usd, subagent.total_tokens = cached + else: + cost, tokens = compute_file_cost(subagent.path) + cache.set(subagent.agent_id, sa_mtime, subagent.size, cost, tokens) + subagent.estimated_cost_usd = cost + subagent.total_tokens = tokens diff --git a/docs/jsonl_schema.md b/docs/jsonl_schema.md index 6b0c0c0..7ab211b 100644 --- a/docs/jsonl_schema.md +++ b/docs/jsonl_schema.md @@ -1,35 +1,35 @@ -# Claude Code Conversation JSONL Schema - -## Storage Location - -``` -~/.claude/projects//.jsonl -``` - -- Each project directory is named by its path with path separators replaced by `--` (e.g. `C--git-l-sc/` for `C:\git\l\sc`) -- Each conversation is a UUID-named `.jsonl` file (one JSON object per line) -- Sub-agent conversations are nested under `/subagents/agent-*.jsonl` -- `~/.claude/history.jsonl` contains a global index with timestamps, project paths, and session IDs - -## Line Format - -Each line is a JSON object. Lines chain via `parentUuid` → `uuid` (first line has `parentUuid: null`). - -**Common fields** on most lines: `type`, `uuid`, `parentUuid`, `timestamp`, `sessionId`, `cwd`, `gitBranch`, `version`, `slug`, `isSidechain`. - -## Line Types (`type` field) - -- **`system`** — metadata. `subtype` is `"bridge_status"` (session start, has `url`) or `"turn_duration"` (turn end, has `durationMs`). -- **`user`** — human message or tool result. - - Human message: `message.content` is a string. Has `promptId`, `permissionMode`. - - Tool result: `message.content` is array of `{type: "tool_result", tool_use_id, content: [{type: "text", text}]}`. Has `sourceToolAssistantUUID` (points to assistant line that made the call) and `toolUseResult` object. -- **`assistant`** — LLM response. `message` is an Anthropic API message object. - - `message.content` is array of content items: `{type: "text", text}`, `{type: "thinking", thinking, signature}`, or `{type: "tool_use", id, name, input}`. - - **Streaming**: multiple lines share the same `message.id`; intermediate lines have `stop_reason: null`, final line has `"end_turn"` or `"tool_use"`. - - `message.usage` has `input_tokens`, `output_tokens`, `cache_read_input_tokens`, etc. -- **`progress`** — subagent execution updates. `data.type` is `"agent_progress"`, `toolUseID` links to spawning tool_use. -- **`file-history-snapshot`** — file state for undo. Has `snapshot.trackedFileBackups` mapping paths to backup data. - -## Conversation Flow - -`system/bridge_status` → `user` (human) → `assistant` (streaming chunks) → `user` (tool results) → `assistant` → ... → `system/turn_duration` +# Claude Code Conversation JSONL Schema + +## Storage Location + +``` +~/.claude/projects//.jsonl +``` + +- Each project directory is named by its path with path separators replaced by `--` (e.g. `C--git-l-sc/` for `C:\git\l\sc`) +- Each conversation is a UUID-named `.jsonl` file (one JSON object per line) +- Sub-agent conversations are nested under `/subagents/agent-*.jsonl` +- `~/.claude/history.jsonl` contains a global index with timestamps, project paths, and session IDs + +## Line Format + +Each line is a JSON object. Lines chain via `parentUuid` → `uuid` (first line has `parentUuid: null`). + +**Common fields** on most lines: `type`, `uuid`, `parentUuid`, `timestamp`, `sessionId`, `cwd`, `gitBranch`, `version`, `slug`, `isSidechain`. + +## Line Types (`type` field) + +- **`system`** — metadata. `subtype` is `"bridge_status"` (session start, has `url`) or `"turn_duration"` (turn end, has `durationMs`). +- **`user`** — human message or tool result. + - Human message: `message.content` is a string. Has `promptId`, `permissionMode`. + - Tool result: `message.content` is array of `{type: "tool_result", tool_use_id, content: [{type: "text", text}]}`. Has `sourceToolAssistantUUID` (points to assistant line that made the call) and `toolUseResult` object. +- **`assistant`** — LLM response. `message` is an Anthropic API message object. + - `message.content` is array of content items: `{type: "text", text}`, `{type: "thinking", thinking, signature}`, or `{type: "tool_use", id, name, input}`. + - **Streaming**: multiple lines share the same `message.id`; intermediate lines have `stop_reason: null`, final line has `"end_turn"` or `"tool_use"`. + - `message.usage` has `input_tokens`, `output_tokens`, `cache_read_input_tokens`, etc. +- **`progress`** — subagent execution updates. `data.type` is `"agent_progress"`, `toolUseID` links to spawning tool_use. +- **`file-history-snapshot`** — file state for undo. Has `snapshot.trackedFileBackups` mapping paths to backup data. + +## Conversation Flow + +`system/bridge_status` → `user` (human) → `assistant` (streaming chunks) → `user` (tool results) → `assistant` → ... → `system/turn_duration` diff --git a/pyproject.toml b/pyproject.toml index c134d5d..3be07a0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,34 +1,34 @@ -[build-system] -requires = ["hatchling"] -build-backend = "hatchling.build" - -[project] -name = "cchat" -version = "0.1.0" -description = "Claude Code Chat Browser CLI" -requires-python = ">=3.10" - -[project.optional-dependencies] -serve = ["fastapi", "uvicorn[standard]"] - -[dependency-groups] -dev = ["pytest"] - -[project.scripts] -cchat = "cchat.cli:main" - -[tool.pytest.ini_options] -testpaths = ["tests"] - -[tool.coverage.run] -source = ["cchat"] -omit = ["tests/*", "help.py"] - -[tool.coverage.report] -show_missing = true -skip_empty = true -exclude_lines = [ - "pragma: no cover", - "if __name__ == .__main__.", - "raise NotImplementedError", -] +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "cchat" +version = "0.1.0" +description = "Claude Code Chat Browser CLI" +requires-python = ">=3.10" + +[project.optional-dependencies] +serve = ["fastapi", "uvicorn[standard]"] + +[dependency-groups] +dev = ["pytest"] + +[project.scripts] +cchat = "cchat.cli:main" + +[tool.pytest.ini_options] +testpaths = ["tests"] + +[tool.coverage.run] +source = ["cchat"] +omit = ["tests/*", "help.py"] + +[tool.coverage.report] +show_missing = true +skip_empty = true +exclude_lines = [ + "pragma: no cover", + "if __name__ == .__main__.", + "raise NotImplementedError", +] diff --git a/tests/test_formatters.py b/tests/test_formatters.py index 44723f3..f1f7403 100644 --- a/tests/test_formatters.py +++ b/tests/test_formatters.py @@ -21,6 +21,7 @@ format_size, format_table, format_timestamp, + format_workspace, set_no_color, supports_color, truncate, @@ -525,6 +526,10 @@ def test_valid_iso_with_tz_offset(self): def test_valid_iso_with_utc_offset(self): assert format_timestamp("2024-01-15T10:30:00+00:00") == "01-15 10:30" + def test_valid_iso_with_z_suffix(self): + # Python 3.10's fromisoformat rejects trailing "Z"; we normalize it. + assert format_timestamp("2024-01-15T10:30:00Z") == "01-15 10:30" + def test_midnight(self): assert format_timestamp("2024-12-25T00:00:00") == "12-25 00:00" @@ -600,3 +605,34 @@ def test_set_uses_str_fallback(self): # Sets are not JSON serializable; default=str should handle it result = format_json(data) assert "items" in result + + +# =========================================================================== +# format_workspace +# =========================================================================== + + +class TestFormatWorkspace: + def test_windows_path(self): + assert format_workspace("C:\\Users\\evergr3n\\foo\\bar") == "bar" + + def test_posix_path(self): + assert format_workspace("/Users/ted/code/cchat") == "cchat" + + def test_posix_path_trailing_slash(self): + assert format_workspace("/Users/ted/code/cchat/") == "cchat" + + def test_windows_path_trailing_backslash(self): + assert format_workspace("C:\\Users\\evergr3n\\foo\\bar\\") == "bar" + + def test_empty_string(self): + assert format_workspace("") == "" + + def test_none(self): + assert format_workspace(None) == "" + + def test_relative_path(self): + assert format_workspace("relative/path") == "path" + + def test_singlename(self): + assert format_workspace("singlename") == "singlename"