Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions examples/alerts_and_drift/budget_breach_demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@


# ---------------------------------------------------------------------------
# Pricing reference (from pricing/models.toml for claude-sonnet-4-20250514):
# Pricing reference (from pricing/models.toml for claude-sonnet-4-6):
# input = $3.00 / MTok -> $0.000003 per token
# output = $15.00 / MTok -> $0.000015 per token
#
Expand All @@ -44,15 +44,15 @@ def run_expensive_agent() -> None:
input_tokens = 1000 + (i * 200)
output_tokens = 500 + (i * 100)

# Estimate cost (using claude-sonnet-4-20250514 rates)
# Estimate cost (using claude-sonnet-4-6 rates)
est_cost = (input_tokens * 3e-6) + (output_tokens * 15e-6)
cumulative_cost += est_cost

print(f" Call {i:2d}: {input_tokens:5d} in / {output_tokens:4d} out "
f" ~${est_cost:.4f} (cumulative ~${cumulative_cost:.4f})")

record_llm_call(
model="claude-sonnet-4-20250514",
model="claude-sonnet-4-6",
provider="anthropic",
input_tokens=input_tokens,
output_tokens=output_tokens,
Expand Down
9 changes: 8 additions & 1 deletion tokenjam/api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from html import escape as html_escape
from pathlib import Path
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any, AsyncContextManager, Callable

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
Expand All @@ -23,18 +23,25 @@ def create_app(
config: TjConfig,
db: StorageBackend,
ingest_pipeline: IngestPipeline,
lifespan: Callable[[FastAPI], AsyncContextManager[Any]] | None = None,
) -> FastAPI:
"""
Build and return the FastAPI app.

db and ingest_pipeline are passed in (not imported globally) so tests
can inject mocks easily.

`lifespan`, if provided, is a FastAPI lifespan context manager — used by
`tj serve` to start/stop the retention scheduler and write server.state
only after uvicorn has bound the port (so a failed bind can't clobber a
running daemon's state file).
"""
app = FastAPI(
title="TokenJam",
version="0.1.0",
docs_url="/docs",
redoc_url=None,
lifespan=lifespan,
)

# CORS — local only by default
Expand Down
30 changes: 22 additions & 8 deletions tokenjam/cli/cmd_doctor.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,24 @@ def _check_schema_vs_capture(config: object) -> dict:


def _check_drift_inactive(config: object, db: object) -> dict:
"""Report drift-baseline progress for any agent that hasn't reached threshold yet.

Drift detection is enabled by default, so brand-new agents (0–9 sessions)
would otherwise trip a warning on every `tj doctor` run — pure noise,
since collection-in-progress is the expected state. Downgraded to `info`
so the user can see which agents are still building a baseline without
treating it as a problem.
"""
in_progress: list[str] = []
for agent_id, ac in config.agents.items():
if ac.drift.enabled:
count = db.get_completed_session_count(agent_id)
if count < ac.drift.baseline_sessions:
return {"name": "Drift detection", "level": "warning",
"message": f"Agent '{agent_id}' has drift enabled but only "
f"{count}/{ac.drift.baseline_sessions} baseline sessions."}
if not ac.drift.enabled:
continue
count = db.get_completed_session_count(agent_id)
if count < ac.drift.baseline_sessions:
in_progress.append(f"{agent_id} ({count}/{ac.drift.baseline_sessions})")
if in_progress:
return {"name": "Drift detection", "level": "info",
"message": "Collecting baseline: " + ", ".join(in_progress)}
return {"name": "Drift detection", "level": "ok",
"message": "Drift detection status is consistent."}

Expand Down Expand Up @@ -188,7 +199,8 @@ def _check_spans_stats(db: object) -> dict:
conn = getattr(db, "conn", None)
if conn is None:
return {"name": "Spans column statistics", "level": "info",
"message": "Skipped — non-DuckDB backend."}
"message": "Skipped — CLI is running through the HTTP API "
"fallback (stop `tj serve` to access the DB directly)."}
try:
corrupt = check_spans_stats_corruption(conn)
except duckdb.Error as e:
Expand Down Expand Up @@ -222,7 +234,9 @@ def _attempt_repairs(checks: list[dict], db: object, output_json: bool) -> None:
if conn is None:
if not output_json:
console.print(
" [yellow]Repair skipped — non-DuckDB backend.[/yellow]"
" [yellow]Repair skipped — CLI is using the HTTP API "
"fallback. Stop `tj serve` and retry so doctor has "
"direct DB access.[/yellow]"
)
continue
try:
Expand Down
33 changes: 20 additions & 13 deletions tokenjam/cli/cmd_serve.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from __future__ import annotations

import click
from contextlib import asynccontextmanager
from pathlib import Path
from typing import AsyncIterator

from tokenjam.utils.formatting import console

Expand All @@ -19,12 +21,12 @@ def cmd_serve(ctx: click.Context, host: str | None, port: int | None,
bind_port = port or config.api.port

import uvicorn
from fastapi import FastAPI
from tokenjam.api.app import create_app
from tokenjam.core.ingest import build_default_pipeline

db = ctx.obj["db"]
pipeline = build_default_pipeline(db, config)
app = create_app(config, db, pipeline)

# Schedule retention cleanup using a separate DB connection per run
# to avoid concurrent write conflicts with uvicorn worker threads.
Expand All @@ -46,22 +48,20 @@ def _retention_job() -> None:
hour=0,
minute=0,
)
scheduler.start()

@app.on_event("shutdown")
async def _shutdown_scheduler() -> None:
scheduler.shutdown(wait=False)

# Write the resolved config path so other subcommands (e.g. onboard --codex)
# can find the secret this server is using regardless of CWD. Defer the
# write to a FastAPI startup event so it only fires after uvicorn binds
# the port — otherwise a failed-to-bind serve clobbers the state file
# of the running daemon (D2).
# ~/.local/share/tj/server.state lets other subcommands (e.g. `tj onboard
# --codex`) find the config this server is using regardless of CWD. We
# write it from the lifespan so it only happens after uvicorn binds the
# port — a failed bind must NOT clobber the running daemon's state file.
# Same reasoning for `scheduler.start()`: don't fire off a background
# thread for a server that's about to exit with EADDRINUSE.
import json as _json
_state_path = Path.home() / ".local" / "share" / "tj" / "server.state"

@app.on_event("startup")
async def _write_server_state() -> None:
@asynccontextmanager
async def _lifespan(_app: FastAPI) -> AsyncIterator[None]:
# startup
scheduler.start()
_state_path.parent.mkdir(parents=True, exist_ok=True)
_state_path.write_text(
_json.dumps({
Expand All @@ -70,6 +70,13 @@ async def _write_server_state() -> None:
"pid": __import__("os").getpid(),
})
)
try:
yield
finally:
# shutdown
scheduler.shutdown(wait=False)

app = create_app(config, db, pipeline, lifespan=_lifespan)

console.print(f"[bold]tj serve[/bold] starting on http://{bind_host}:{bind_port}")
console.print(f" API docs: http://{bind_host}:{bind_port}/docs")
Expand Down
Loading