Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions packages/agent/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,10 @@ Logs serve two purposes: real-time observability and session resume. Every ACP m

`SessionLogWriter` (`src/session-log-writer.ts`) is a per-session multiplexer that buffers raw ndJson lines. On flush (auto-scheduled 500ms after writes, or explicit), it dispatches to whichever backend is configured:

- **OTEL** (`src/otel-log-writer.ts`) — preferred path. Creates an OpenTelemetry `LoggerProvider` per session with resource attributes (`task_id`, `run_id`, `device_type`) set once and indexed via `resource_fingerprint`. Each ndJson line is emitted as an OTEL log record with an `event_type` attribute (the ACP method name) and exported via OTLP HTTP to PostHog's `/i/v1/agent-logs` endpoint. Batch flush interval defaults to 500ms.
- **Legacy S3** — falls back to `PostHogAPIClient.appendTaskRunLog()`, which POSTs batched `StoredNotification` entries to the Django API. The API stores them as the task run's `log_url`.
- **S3** — `PostHogAPIClient.appendTaskRunLog()` POSTs batched `StoredNotification` entries to the Django API, which stores them as the task run's `log_url` (used for full log download and session resume).
- **Local cache** — when `localCachePath` is set, raw ndJson is also appended to `<cache>/sessions/<runId>/logs.ndjson` for instant local loading.

Both backends can be active simultaneously — OTEL for fast indexed queries, S3 for full log download.
Separately from `SessionLogWriter`, the cloud `AgentServer` ships its own operational logs to PostHog Logs via `OtelLogWriter` (`src/otel-log-writer.ts`). It creates an OpenTelemetry `LoggerProvider` with resource attributes (`service.name=posthog-code-agent`, `service.version`, `task_id`, `run_id`, `device_type`) set once and indexed via `resource_fingerprint`, then emits each server log line as an OTEL log record (severity mapped from the log level, scope in the `log.scope` attribute) via OTLP HTTP to PostHog's `/i/v1/logs` endpoint. This export is enabled only when the sandbox injects `POSTHOG_OTEL_LOGS_HOST` and `POSTHOG_OTEL_LOGS_API_KEY`; otherwise it is a no-op.

### Resuming from logs

Expand Down
39 changes: 39 additions & 0 deletions packages/agent/src/otel-log-writer.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { OTLPLogExporter } from "@opentelemetry/exporter-logs-otlp-http";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { OtelLogWriter } from "./otel-log-writer";
import type { StoredNotification } from "./types";
Expand All @@ -19,6 +20,7 @@ describe("OtelLogWriter", () => {

beforeEach(() => {
mockExport.mockClear();
vi.mocked(OTLPLogExporter).mockClear();
// Session context (taskId, runId) is now passed in constructor as resource attributes
writer = new OtelLogWriter(
{
Expand All @@ -37,6 +39,43 @@ describe("OtelLogWriter", () => {
await writer.shutdown();
});

it("defaults to the /i/v1/logs endpoint", () => {
expect(OTLPLogExporter).toHaveBeenCalledWith({
url: "https://us.i.posthog.com/i/v1/logs",
headers: { Authorization: "Bearer phc_test_key" },
});
});

it.each([
["debug", "DEBUG"],
["info", "INFO"],
["warn", "WARN"],
["error", "ERROR"],
] as const)(
"maps the %s level to OTEL severity %s and stores the scope",
async (level, severityText) => {
writer.emitLog(level, "suspension", "checkpoint failed");

await writer.flush();

const log = mockExport.mock.calls[0][0][0];
expect(log.severityText).toBe(severityText);
expect(log.attributes["log.scope"]).toBe("suspension");
},
);

it.each([
["with data", { taskId: "t1" }, `boom ${JSON.stringify({ taskId: "t1" })}`],
["without data", undefined, "boom"],
] as const)("formats the body %s", async (_label, data, expectedBody) => {
writer.emitLog("info", "fs", "boom", data);

await writer.flush();

const log = mockExport.mock.calls[0][0][0];
expect(log.body).toBe(expectedBody);
});

it("should emit a log entry with event_type as regular attribute", async () => {
const notification: StoredNotification = {
type: "notification",
Expand Down
60 changes: 56 additions & 4 deletions packages/agent/src/otel-log-writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ import {
BatchLogRecordProcessor,
LoggerProvider,
} from "@opentelemetry/sdk-logs";
import { ATTR_SERVICE_NAME } from "@opentelemetry/semantic-conventions";
import type { StoredNotification } from "./types";
import {
ATTR_SERVICE_NAME,
ATTR_SERVICE_VERSION,
} from "@opentelemetry/semantic-conventions";
import type { LogLevel, StoredNotification } from "./types";
import type { Logger } from "./utils/logger";

export interface OtelLogConfig {
Expand All @@ -16,7 +19,7 @@ export interface OtelLogConfig {
apiKey: string;
/** Batch flush interval in ms (default: 500) */
flushIntervalMs?: number;
/** Override the logs endpoint path (default: /i/v1/agent-logs) */
/** Override the logs endpoint path (default: /i/v1/logs) */
logsPath?: string;
}

Expand All @@ -31,6 +34,27 @@ export interface SessionContext {
runId: string;
/** Deployment environment - "local" for desktop, "cloud" for cloud sandbox */
deviceType?: "local" | "cloud";
/** Agent version, surfaced as the OTEL service.version resource attribute */
serviceVersion?: string;
}

/** Maps the agent's log levels onto OTEL severities. */
const LOG_LEVEL_TO_SEVERITY: Record<
LogLevel,
{ number: SeverityNumber; text: string }
> = {
debug: { number: SeverityNumber.DEBUG, text: "DEBUG" },
info: { number: SeverityNumber.INFO, text: "INFO" },
warn: { number: SeverityNumber.WARN, text: "WARN" },
error: { number: SeverityNumber.ERROR, text: "ERROR" },
};

function safeStringify(data: unknown): string {
try {
return JSON.stringify(data);
} catch {
return String(data);
}
}

export class OtelLogWriter {
Expand All @@ -42,7 +66,7 @@ export class OtelLogWriter {
sessionContext: SessionContext,
_debugLogger?: Logger,
) {
const logsPath = config.logsPath ?? "/i/v1/agent-logs";
const logsPath = config.logsPath ?? "/i/v1/logs";
const exporter = new OTLPLogExporter({
url: `${config.posthogHost}${logsPath}`,
headers: { Authorization: `Bearer ${config.apiKey}` },
Expand All @@ -57,6 +81,9 @@ export class OtelLogWriter {
this.loggerProvider = new LoggerProvider({
resource: resourceFromAttributes({
[ATTR_SERVICE_NAME]: "posthog-code-agent",
...(sessionContext.serviceVersion
? { [ATTR_SERVICE_VERSION]: sessionContext.serviceVersion }
: {}),
run_id: sessionContext.runId,
task_id: sessionContext.taskId,
device_type: sessionContext.deviceType ?? "local",
Expand All @@ -67,6 +94,31 @@ export class OtelLogWriter {
this.logger = this.loggerProvider.getLogger("agent-session");
}

/**
* Emit a structured agent log line to PostHog Logs.
*
* Maps the agent's log level onto an OTEL severity and stores the scope as a
* `log.scope` attribute (mirroring the desktop electron-log OTEL transport),
* so cloud-run logs are queryable by run_id / task_id / severity.
*/
emitLog(
level: LogLevel,
scope: string,
message: string,
data?: unknown,
): void {
const severity = LOG_LEVEL_TO_SEVERITY[level] ?? LOG_LEVEL_TO_SEVERITY.info;
const body =
data !== undefined ? `${message} ${safeStringify(data)}` : message;

this.logger.emit({
severityNumber: severity.number,
severityText: severity.text,
body,
attributes: { "log.scope": scope },
});
}

/**
* Emit an agent event to PostHog Logs via OTEL.
*/
Expand Down
87 changes: 83 additions & 4 deletions packages/agent/src/server/agent-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import {
import type { PermissionMode } from "../execution-mode";
import { DEFAULT_CODEX_MODEL } from "../gateway-models";
import { HandoffCheckpointTracker } from "../handoff-checkpoint";
import { OtelLogWriter } from "../otel-log-writer";
import { PostHogAPIClient } from "../posthog-api";
import { extractCreatedPrUrl } from "../pr-url-detector";
import {
Expand Down Expand Up @@ -238,6 +239,7 @@ export class AgentServer {
private session: ActiveSession | null = null;
private app: Hono;
private posthogAPI: PostHogAPIClient;
private otelLogWriter: OtelLogWriter | null = null;
private eventStreamSender: TaskRunEventStreamSender | null = null;
private questionRelayedToSlack = false;
private detectedPrUrl: string | null = null;
Expand Down Expand Up @@ -295,9 +297,39 @@ export class AgentServer {
);
};

// Ship a log line to PostHog Logs (no-op until OTEL is configured). Safe to
// call before a session exists — it never touches session state.
private emitOtelLog = (
level: LogLevel,
scope: string,
message: string,
data?: unknown,
): void => {
this.otelLogWriter?.emitLog(level, scope, message, data);
};

// Post-session choke point: ship to PostHog Logs, then mirror to the SSE
// console stream for any connected desktop client.
private handleLog = (
level: LogLevel,
scope: string,
message: string,
data?: unknown,
): void => {
this.emitOtelLog(level, scope, message, data);
this.emitConsoleLog(level, scope, message, data);
};

constructor(config: AgentServerConfig) {
this.config = config;
this.logger = new Logger({ debug: true, prefix: "[AgentServer]" });
this.logger = new Logger({
debug: true,
prefix: "[AgentServer]",
// Pre-session logs go to OTEL only; the SSE console stream needs an
// active session, which the post-init logger wires in via handleLog.
onLog: this.emitOtelLog,
});
this.otelLogWriter = this.createOtelLogWriter();
this.posthogAPI = new PostHogAPIClient({
apiUrl: config.apiUrl,
projectId: config.projectId,
Expand All @@ -318,6 +350,35 @@ export class AgentServer {
this.app = this.createApp();
}

private createOtelLogWriter(): OtelLogWriter | null {
const otelLogs = this.config.otelLogs;
// Disabled unless the sandbox injects an OTEL logs host + ingest key,
// mirroring how the desktop electron-log OTEL transport silently no-ops
// when its env vars are absent.
if (!otelLogs?.host || !otelLogs?.apiKey) {
return null;
}

try {
return new OtelLogWriter(
{
posthogHost: otelLogs.host,
apiKey: otelLogs.apiKey,
...(otelLogs.logsPath ? { logsPath: otelLogs.logsPath } : {}),
},
{
taskId: this.config.taskId,
runId: this.config.runId,
deviceType: "cloud",
serviceVersion: this.config.version ?? packageJson.version,
},
);
} catch (error) {
this.logger.error("Failed to initialize OTEL log writer", error);
return null;
}
}

private getRuntimeAdapter(): "claude" | "codex" {
return this.config.runtimeAdapter ?? "claude";
}
Expand Down Expand Up @@ -589,6 +650,15 @@ export class AgentServer {
}

this.logger.debug("Agent server stopped");

if (this.otelLogWriter) {
try {
await this.otelLogWriter.shutdown();
} catch (error) {
this.logger.debug("Failed to shut down OTEL log writer", error);
}
this.otelLogWriter = null;
}
}

/**
Expand Down Expand Up @@ -627,6 +697,17 @@ export class AgentServer {
stopError,
);
}

// Flush buffered logs (including the fatal error logged above) to PostHog
// Logs before the process exits. bin.ts races this against a 5s deadline.
try {
await this.otelLogWriter?.flush();
} catch (flushError) {
this.logger.error(
"Failed to flush OTEL logs after fatal error",
flushError,
);
}
}

private authenticateRequest(
Expand Down Expand Up @@ -1089,9 +1170,7 @@ export class AgentServer {
this.logger = new Logger({
debug: true,
prefix: "[AgentServer]",
onLog: (level, scope, message, data) => {
this.emitConsoleLog(level, scope, message, data);
},
onLog: this.handleLog,
});

this.logger.debug("Session initialized successfully");
Expand Down
17 changes: 17 additions & 0 deletions packages/agent/src/server/bin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ const envSchema = z.object({
)
.transform((value) => parseInt(value, 10))
.optional(),
// Optional OTEL logs export. When both host and key are present the agent
// ships its logs to PostHog Logs; otherwise export is disabled (no-op).
POSTHOG_OTEL_LOGS_HOST: z.url().optional(),
POSTHOG_OTEL_LOGS_API_KEY: z.string().min(1).optional(),
POSTHOG_OTEL_LOGS_PATH: z.string().min(1).optional(),
});

const program = new Command();
Expand Down Expand Up @@ -154,6 +159,17 @@ program
);
}

const otelLogs =
env.POSTHOG_OTEL_LOGS_HOST && env.POSTHOG_OTEL_LOGS_API_KEY
? {
host: env.POSTHOG_OTEL_LOGS_HOST,
apiKey: env.POSTHOG_OTEL_LOGS_API_KEY,
...(env.POSTHOG_OTEL_LOGS_PATH
? { logsPath: env.POSTHOG_OTEL_LOGS_PATH }
: {}),
}
: undefined;

const server = new AgentServer({
port: parseInt(options.port, 10),
jwtPublicKey: env.JWT_PUBLIC_KEY,
Expand All @@ -175,6 +191,7 @@ program
runtimeAdapter: env.POSTHOG_CODE_RUNTIME_ADAPTER,
model: env.POSTHOG_CODE_MODEL,
reasoningEffort: env.POSTHOG_CODE_REASONING_EFFORT,
otelLogs,
});

process.on("SIGINT", async () => {
Expand Down
15 changes: 15 additions & 0 deletions packages/agent/src/server/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,19 @@ export interface AgentServerConfig {
runtimeAdapter?: "claude" | "codex";
model?: string;
reasoningEffort?: "low" | "medium" | "high" | "xhigh" | "max";
otelLogs?: AgentServerOtelLogsConfig;
}

/**
* OTEL logs export config for the cloud agent server. When present, the server
* ships its logs to PostHog Logs via OTLP HTTP. Injected by the sandbox
* environment; absent in local/dev runs, where export stays disabled.
*/
export interface AgentServerOtelLogsConfig {
/** PostHog logs ingest host, e.g. "https://us.i.posthog.com" */
host: string;
/** Ingest-capable API key for the logs endpoint */
apiKey: string;
/** Override the logs endpoint path (default: /i/v1/logs) */
logsPath?: string;
}
Loading