diff --git a/examples/alerts_and_drift/budget_breach_demo.py b/examples/alerts_and_drift/budget_breach_demo.py index 89a7151..b5766bf 100644 --- a/examples/alerts_and_drift/budget_breach_demo.py +++ b/examples/alerts_and_drift/budget_breach_demo.py @@ -22,7 +22,7 @@ # --------------------------------------------------------------------------- -# Pricing reference (from pricing/models.toml for claude-sonnet-4-20250514): +# Pricing reference (from pricing/models.toml for claude-sonnet-4-6): # input = $3.00 / MTok -> $0.000003 per token # output = $15.00 / MTok -> $0.000015 per token # @@ -44,7 +44,7 @@ def run_expensive_agent() -> None: input_tokens = 1000 + (i * 200) output_tokens = 500 + (i * 100) - # Estimate cost (using claude-sonnet-4-20250514 rates) + # Estimate cost (using claude-sonnet-4-6 rates) est_cost = (input_tokens * 3e-6) + (output_tokens * 15e-6) cumulative_cost += est_cost @@ -52,7 +52,7 @@ def run_expensive_agent() -> None: f" ~${est_cost:.4f} (cumulative ~${cumulative_cost:.4f})") record_llm_call( - model="claude-sonnet-4-20250514", + model="claude-sonnet-4-6", provider="anthropic", input_tokens=input_tokens, output_tokens=output_tokens, diff --git a/tokenjam/api/app.py b/tokenjam/api/app.py index 5ffae46..2d7daf5 100644 --- a/tokenjam/api/app.py +++ b/tokenjam/api/app.py @@ -3,7 +3,7 @@ from html import escape as html_escape from pathlib import Path -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any, AsyncContextManager, Callable from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware @@ -23,18 +23,25 @@ def create_app( config: TjConfig, db: StorageBackend, ingest_pipeline: IngestPipeline, + lifespan: Callable[[FastAPI], AsyncContextManager[Any]] | None = None, ) -> FastAPI: """ Build and return the FastAPI app. db and ingest_pipeline are passed in (not imported globally) so tests can inject mocks easily. + + `lifespan`, if provided, is a FastAPI lifespan context manager — used by + `tj serve` to start/stop the retention scheduler and write server.state + only after uvicorn has bound the port (so a failed bind can't clobber a + running daemon's state file). """ app = FastAPI( title="TokenJam", version="0.1.0", docs_url="/docs", redoc_url=None, + lifespan=lifespan, ) # CORS — local only by default diff --git a/tokenjam/cli/cmd_doctor.py b/tokenjam/cli/cmd_doctor.py index 119f8f8..9f633d7 100644 --- a/tokenjam/cli/cmd_doctor.py +++ b/tokenjam/cli/cmd_doctor.py @@ -127,13 +127,24 @@ def _check_schema_vs_capture(config: object) -> dict: def _check_drift_inactive(config: object, db: object) -> dict: + """Report drift-baseline progress for any agent that hasn't reached threshold yet. + + Drift detection is enabled by default, so brand-new agents (0–9 sessions) + would otherwise trip a warning on every `tj doctor` run — pure noise, + since collection-in-progress is the expected state. Downgraded to `info` + so the user can see which agents are still building a baseline without + treating it as a problem. + """ + in_progress: list[str] = [] for agent_id, ac in config.agents.items(): - if ac.drift.enabled: - count = db.get_completed_session_count(agent_id) - if count < ac.drift.baseline_sessions: - return {"name": "Drift detection", "level": "warning", - "message": f"Agent '{agent_id}' has drift enabled but only " - f"{count}/{ac.drift.baseline_sessions} baseline sessions."} + if not ac.drift.enabled: + continue + count = db.get_completed_session_count(agent_id) + if count < ac.drift.baseline_sessions: + in_progress.append(f"{agent_id} ({count}/{ac.drift.baseline_sessions})") + if in_progress: + return {"name": "Drift detection", "level": "info", + "message": "Collecting baseline: " + ", ".join(in_progress)} return {"name": "Drift detection", "level": "ok", "message": "Drift detection status is consistent."} @@ -188,7 +199,8 @@ def _check_spans_stats(db: object) -> dict: conn = getattr(db, "conn", None) if conn is None: return {"name": "Spans column statistics", "level": "info", - "message": "Skipped — non-DuckDB backend."} + "message": "Skipped — CLI is running through the HTTP API " + "fallback (stop `tj serve` to access the DB directly)."} try: corrupt = check_spans_stats_corruption(conn) except duckdb.Error as e: @@ -222,7 +234,9 @@ def _attempt_repairs(checks: list[dict], db: object, output_json: bool) -> None: if conn is None: if not output_json: console.print( - " [yellow]Repair skipped — non-DuckDB backend.[/yellow]" + " [yellow]Repair skipped — CLI is using the HTTP API " + "fallback. Stop `tj serve` and retry so doctor has " + "direct DB access.[/yellow]" ) continue try: diff --git a/tokenjam/cli/cmd_serve.py b/tokenjam/cli/cmd_serve.py index 0862ca2..5d006b3 100644 --- a/tokenjam/cli/cmd_serve.py +++ b/tokenjam/cli/cmd_serve.py @@ -1,7 +1,9 @@ from __future__ import annotations import click +from contextlib import asynccontextmanager from pathlib import Path +from typing import AsyncIterator from tokenjam.utils.formatting import console @@ -19,12 +21,12 @@ def cmd_serve(ctx: click.Context, host: str | None, port: int | None, bind_port = port or config.api.port import uvicorn + from fastapi import FastAPI from tokenjam.api.app import create_app from tokenjam.core.ingest import build_default_pipeline db = ctx.obj["db"] pipeline = build_default_pipeline(db, config) - app = create_app(config, db, pipeline) # Schedule retention cleanup using a separate DB connection per run # to avoid concurrent write conflicts with uvicorn worker threads. @@ -46,22 +48,20 @@ def _retention_job() -> None: hour=0, minute=0, ) - scheduler.start() - @app.on_event("shutdown") - async def _shutdown_scheduler() -> None: - scheduler.shutdown(wait=False) - - # Write the resolved config path so other subcommands (e.g. onboard --codex) - # can find the secret this server is using regardless of CWD. Defer the - # write to a FastAPI startup event so it only fires after uvicorn binds - # the port — otherwise a failed-to-bind serve clobbers the state file - # of the running daemon (D2). + # ~/.local/share/tj/server.state lets other subcommands (e.g. `tj onboard + # --codex`) find the config this server is using regardless of CWD. We + # write it from the lifespan so it only happens after uvicorn binds the + # port — a failed bind must NOT clobber the running daemon's state file. + # Same reasoning for `scheduler.start()`: don't fire off a background + # thread for a server that's about to exit with EADDRINUSE. import json as _json _state_path = Path.home() / ".local" / "share" / "tj" / "server.state" - @app.on_event("startup") - async def _write_server_state() -> None: + @asynccontextmanager + async def _lifespan(_app: FastAPI) -> AsyncIterator[None]: + # startup + scheduler.start() _state_path.parent.mkdir(parents=True, exist_ok=True) _state_path.write_text( _json.dumps({ @@ -70,6 +70,13 @@ async def _write_server_state() -> None: "pid": __import__("os").getpid(), }) ) + try: + yield + finally: + # shutdown + scheduler.shutdown(wait=False) + + app = create_app(config, db, pipeline, lifespan=_lifespan) console.print(f"[bold]tj serve[/bold] starting on http://{bind_host}:{bind_port}") console.print(f" API docs: http://{bind_host}:{bind_port}/docs")