diff --git a/plugins/codex/scripts/lib/broker-lifecycle.mjs b/plugins/codex/scripts/lib/broker-lifecycle.mjs index ef763819..06eebf69 100644 --- a/plugins/codex/scripts/lib/broker-lifecycle.mjs +++ b/plugins/codex/scripts/lib/broker-lifecycle.mjs @@ -6,6 +6,7 @@ import process from "node:process"; import { spawn } from "node:child_process"; import { fileURLToPath } from "node:url"; import { createBrokerEndpoint, parseBrokerEndpoint } from "./broker-endpoint.mjs"; +import { runCommand, terminateProcessTree } from "./process.mjs"; import { resolveStateDir } from "./state.mjs"; export const PID_FILE_ENV = "CODEX_COMPANION_APP_SERVER_PID_FILE"; @@ -110,7 +111,66 @@ async function isBrokerEndpointReady(endpoint) { } } +function readPidFile(pidFile) { + if (!pidFile || !fs.existsSync(pidFile)) { + return null; + } + const value = Number(fs.readFileSync(pidFile, "utf8").trim()); + return Number.isFinite(value) ? value : null; +} + +function getProcessCommand(pid, options = {}) { + const runCommandImpl = options.runCommandImpl ?? runCommand; + const platform = options.platform ?? process.platform; + const result = + platform === "win32" + ? runCommandImpl( + "powershell", + [ + "-NoProfile", + "-Command", + `$process = Get-CimInstance Win32_Process -Filter "ProcessId = ${pid}"; if ($process) { $process.CommandLine }` + ], + { + cwd: options.cwd, + env: options.env + } + ) + : runCommandImpl("ps", ["-p", String(pid), "-o", "command="], { + cwd: options.cwd, + env: options.env + }); + if (result.error || result.status !== 0) { + return null; + } + return result.stdout.trim(); +} + +function isExpectedBrokerProcess({ pid, endpoint = null, pidFile = null, platform = process.platform, runCommandImpl = runCommand }) { + if (!Number.isFinite(pid)) { + return false; + } + + const recordedPid = readPidFile(pidFile); + if (recordedPid !== null && recordedPid !== pid) { + return false; + } + + const command = getProcessCommand(pid, { platform, runCommandImpl }); + if (!command) { + return false; + } + + return ( + command.includes("app-server-broker.mjs") && + command.includes("serve") && + (!endpoint || command.includes(endpoint)) && + (!pidFile || command.includes(pidFile)) + ); +} + export async function ensureBrokerSession(cwd, options = {}) { + const killProcess = options.killProcess ?? terminateProcessTree; const existing = loadBrokerSession(cwd); if (existing && (await isBrokerEndpointReady(existing.endpoint))) { return existing; @@ -123,7 +183,10 @@ export async function ensureBrokerSession(cwd, options = {}) { logFile: existing.logFile ?? null, sessionDir: existing.sessionDir ?? null, pid: existing.pid ?? null, - killProcess: options.killProcess ?? null + killProcess, + validateProcess: options.validateProcess, + platform: options.platform, + runCommandImpl: options.runCommandImpl }); clearBrokerSession(cwd); } @@ -154,7 +217,10 @@ export async function ensureBrokerSession(cwd, options = {}) { logFile, sessionDir, pid: child.pid ?? null, - killProcess: options.killProcess ?? null + killProcess, + validateProcess: options.validateProcess, + platform: options.platform, + runCommandImpl: options.runCommandImpl }); return null; } @@ -170,8 +236,18 @@ export async function ensureBrokerSession(cwd, options = {}) { return session; } -export function teardownBrokerSession({ endpoint = null, pidFile, logFile, sessionDir = null, pid = null, killProcess = null }) { - if (Number.isFinite(pid) && killProcess) { +export function teardownBrokerSession({ + endpoint = null, + pidFile, + logFile, + sessionDir = null, + pid = null, + killProcess = terminateProcessTree, + validateProcess = isExpectedBrokerProcess, + platform = process.platform, + runCommandImpl = runCommand +}) { + if (Number.isFinite(pid) && validateProcess({ pid, endpoint, pidFile, platform, runCommandImpl })) { try { killProcess(pid); } catch { diff --git a/plugins/codex/scripts/lib/state.mjs b/plugins/codex/scripts/lib/state.mjs index 2da23498..9014800f 100644 --- a/plugins/codex/scripts/lib/state.mjs +++ b/plugins/codex/scripts/lib/state.mjs @@ -3,6 +3,7 @@ import fs from "node:fs"; import os from "node:os"; import path from "node:path"; +import { runCommand } from "./process.mjs"; import { resolveWorkspaceRoot } from "./workspace.mjs"; const STATE_VERSION = 1; @@ -10,7 +11,13 @@ const PLUGIN_DATA_ENV = "CLAUDE_PLUGIN_DATA"; const FALLBACK_STATE_ROOT_DIR = path.join(os.tmpdir(), "codex-companion"); const STATE_FILE_NAME = "state.json"; const JOBS_DIR_NAME = "jobs"; +const STATE_LOCK_DIR_NAME = ".state.lock"; const MAX_JOBS = 50; +const STATE_LOCK_STALE_MS = 30000; +const STATE_LOCK_TIMEOUT_MS = STATE_LOCK_STALE_MS + 5000; +const STATE_LOCK_RETRY_MS = 20; +const STATE_LOCK_TIMEOUT_ENV = "CODEX_COMPANION_STATE_LOCK_TIMEOUT_MS"; +let currentProcessStartedAt = undefined; function nowIso() { return new Date().toISOString(); @@ -89,7 +96,147 @@ function removeFileIfExists(filePath) { } } -export function saveState(cwd, state) { +function sleepSync(ms) { + const buffer = new SharedArrayBuffer(4); + const view = new Int32Array(buffer); + Atomics.wait(view, 0, 0, ms); +} + +function resolveStateLockDir(cwd) { + return path.join(resolveStateDir(cwd), STATE_LOCK_DIR_NAME); +} + +function getProcessStartedAt(pid, options = {}) { + const runCommandImpl = options.runCommandImpl ?? runCommand; + const platform = options.platform ?? process.platform; + const result = + platform === "win32" + ? runCommandImpl( + "powershell", + [ + "-NoProfile", + "-Command", + `$process = Get-CimInstance Win32_Process -Filter "ProcessId = ${pid}"; if ($process) { $process.CreationDate.ToUniversalTime().ToString("o") }` + ], + { + cwd: options.cwd, + env: options.env + } + ) + : runCommandImpl("ps", ["-p", String(pid), "-o", "lstart="], { + cwd: options.cwd, + env: options.env + }); + + if (result.error || result.status !== 0) { + return null; + } + return result.stdout.trim() || null; +} + +function getCurrentProcessStartedAt() { + if (currentProcessStartedAt === undefined) { + currentProcessStartedAt = getProcessStartedAt(process.pid); + } + return currentProcessStartedAt; +} + +function readLockOwner(lockDir) { + try { + return JSON.parse(fs.readFileSync(path.join(lockDir, "owner"), "utf8")); + } catch { + return null; + } +} + +function processExists(pid) { + if (!Number.isFinite(pid)) { + return false; + } + try { + process.kill(pid, 0); + return true; + } catch (error) { + if (error?.code === "ESRCH") { + return false; + } + return true; + } +} + +function isLockOwnerAlive(owner) { + if (!processExists(owner?.pid)) { + return false; + } + if (!owner?.startedAt) { + return true; + } + const startedAt = getProcessStartedAt(owner.pid); + return startedAt === null || startedAt === owner.startedAt; +} + +function ownsLock(lockDir, token) { + const owner = readLockOwner(lockDir); + return owner?.token === token && owner?.pid === process.pid; +} + +function removeStaleLock(lockDir) { + try { + const stats = fs.statSync(lockDir); + const owner = readLockOwner(lockDir); + if (Date.now() - stats.mtimeMs > STATE_LOCK_STALE_MS && !isLockOwnerAlive(owner)) { + fs.rmSync(lockDir, { recursive: true, force: true }); + } + } catch { + // Another process may have released the lock between attempts. + } +} + +function acquireStateLock(cwd, options = {}) { + const lockDir = resolveStateLockDir(cwd); + const configuredTimeoutMs = Number(process.env[STATE_LOCK_TIMEOUT_ENV]); + const timeoutMs = options.timeoutMs ?? (Number.isFinite(configuredTimeoutMs) ? configuredTimeoutMs : STATE_LOCK_TIMEOUT_MS); + const start = Date.now(); + fs.mkdirSync(resolveStateDir(cwd), { recursive: true }); + + while (true) { + try { + fs.mkdirSync(lockDir); + const token = `${process.pid}-${Date.now()}-${Math.random().toString(36).slice(2)}`; + fs.writeFileSync( + path.join(lockDir, "owner"), + `${JSON.stringify({ pid: process.pid, startedAt: getCurrentProcessStartedAt(), token })}\n`, + "utf8" + ); + return () => { + if (ownsLock(lockDir, token)) { + fs.rmSync(lockDir, { recursive: true, force: true }); + } + }; + } catch (error) { + if (error?.code !== "EEXIST") { + throw error; + } + + removeStaleLock(lockDir); + if (Date.now() - start >= timeoutMs) { + throw new Error(`Timed out waiting for state lock: ${lockDir}`); + } + sleepSync(STATE_LOCK_RETRY_MS); + } + } +} + +function withStateLock(cwd, fn) { + const release = acquireStateLock(cwd); + try { + return fn(); + } finally { + release(); + } +} + +function saveStateUnlocked(cwd, state) { const previousJobs = loadState(cwd).jobs; ensureStateDir(cwd); const nextJobs = pruneJobs(state.jobs ?? []); @@ -115,10 +262,16 @@ export function saveState(cwd, state) { return nextState; } +export function saveState(cwd, state) { + return withStateLock(cwd, () => saveStateUnlocked(cwd, state)); +} + export function updateState(cwd, mutate) { - const state = loadState(cwd); - mutate(state); - return saveState(cwd, state); + return withStateLock(cwd, () => { + const state = loadState(cwd); + mutate(state); + return saveStateUnlocked(cwd, state); + }); } export function generateJobId(prefix = "job") { diff --git a/tests/broker-lifecycle.test.mjs b/tests/broker-lifecycle.test.mjs new file mode 100644 index 00000000..4fd35780 --- /dev/null +++ b/tests/broker-lifecycle.test.mjs @@ -0,0 +1,204 @@ +import fs from "node:fs"; +import path from "node:path"; +import test from "node:test"; +import assert from "node:assert/strict"; +import { spawn } from "node:child_process"; + +import { makeTempDir, writeExecutable } from "./helpers.mjs"; +import { createBrokerEndpoint } from "../plugins/codex/scripts/lib/broker-endpoint.mjs"; +import { ensureBrokerSession, saveBrokerSession, teardownBrokerSession } from "../plugins/codex/scripts/lib/broker-lifecycle.mjs"; + +function processExists(pid) { + try { + process.kill(pid, 0); + return true; + } catch (error) { + if (error?.code === "ESRCH") { + return false; + } + throw error; + } +} + +async function waitForExit(pid) { + const start = Date.now(); + while (Date.now() - start < 2000) { + if (!processExists(pid)) { + return; + } + await new Promise((resolve) => setTimeout(resolve, 25)); + } + throw new Error(`Process ${pid} did not exit.`); +} + +function terminateTestProcess(pid) { + if (!pid || !processExists(pid)) { + return; + } + if (process.platform === "win32") { + process.kill(pid, "SIGTERM"); + return; + } + try { + process.kill(-pid, "SIGTERM"); + } catch { + process.kill(pid, "SIGTERM"); + } +} + +test("ensureBrokerSession terminates a stale broker pid without an injected killer", async () => { + const workspace = makeTempDir(); + const sessionDir = makeTempDir("cxc-stale-"); + const endpoint = createBrokerEndpoint(sessionDir, process.platform); + const pidFile = path.join(sessionDir, "broker.pid"); + const logFile = path.join(sessionDir, "broker.log"); + const brokerScript = path.join(workspace, "app-server-broker.mjs"); + writeExecutable(brokerScript, "setInterval(() => {}, 1000);\n"); + const child = spawn(process.execPath, [brokerScript, "serve", "--endpoint", endpoint, "--cwd", workspace, "--pid-file", pidFile], { + detached: true, + stdio: "ignore" + }); + child.unref(); + + fs.writeFileSync(pidFile, `${child.pid}\n`, "utf8"); + fs.writeFileSync(logFile, "stale broker\n", "utf8"); + saveBrokerSession(workspace, { + endpoint, + pidFile, + logFile, + sessionDir, + pid: child.pid + }); + const missingBroker = path.join(workspace, "missing-broker.mjs"); + + try { + const session = await ensureBrokerSession(workspace, { + timeoutMs: 1, + scriptPath: missingBroker + }); + + assert.equal(session, null); + await waitForExit(child.pid); + assert.equal(fs.existsSync(pidFile), false); + assert.equal(fs.existsSync(logFile), false); + } finally { + terminateTestProcess(child.pid); + } +}); + +test("ensureBrokerSession does not terminate a reused stale pid", async () => { + const workspace = makeTempDir(); + const sessionDir = makeTempDir("cxc-reused-"); + const endpoint = createBrokerEndpoint(sessionDir, process.platform); + const pidFile = path.join(sessionDir, "broker.pid"); + const logFile = path.join(sessionDir, "broker.log"); + const child = spawn(process.execPath, ["-e", "setInterval(() => {}, 1000);"], { + detached: true, + stdio: "ignore" + }); + child.unref(); + + fs.writeFileSync(pidFile, `${child.pid}\n`, "utf8"); + fs.writeFileSync(logFile, "stale broker\n", "utf8"); + saveBrokerSession(workspace, { + endpoint, + pidFile, + logFile, + sessionDir, + pid: child.pid + }); + + try { + const session = await ensureBrokerSession(workspace, { + timeoutMs: 1, + scriptPath: path.join(workspace, "missing-broker.mjs") + }); + + assert.equal(session, null); + assert.equal(processExists(child.pid), true); + } finally { + terminateTestProcess(child.pid); + } +}); + +test("teardownBrokerSession validates broker identity on Windows before killing", () => { + const sessionDir = makeTempDir("cxc-win-"); + const endpoint = createBrokerEndpoint(sessionDir, "win32"); + const pidFile = path.join(sessionDir, "broker.pid"); + const logFile = path.join(sessionDir, "broker.log"); + fs.writeFileSync(pidFile, "12345\n", "utf8"); + fs.writeFileSync(logFile, "broker log\n", "utf8"); + + const killedPids = []; + teardownBrokerSession({ + endpoint, + pidFile, + logFile, + sessionDir, + pid: 12345, + platform: "win32", + runCommandImpl(command, args) { + assert.equal(command, "powershell"); + assert.deepEqual(args.slice(0, 2), ["-NoProfile", "-Command"]); + return { + command, + args, + status: 0, + signal: null, + stdout: `node app-server-broker.mjs serve --endpoint ${endpoint} --pid-file ${pidFile}\n`, + stderr: "", + error: null + }; + }, + killProcess(pid) { + killedPids.push(pid); + } + }); + + assert.deepEqual(killedPids, [12345]); +}); + +test("ensureBrokerSession uses injected broker killers in tests", async () => { + const workspace = makeTempDir(); + const sessionDir = makeTempDir("cxc-injected-"); + const endpoint = createBrokerEndpoint(sessionDir, process.platform); + const pidFile = path.join(sessionDir, "broker.pid"); + const logFile = path.join(sessionDir, "broker.log"); + const brokerScript = path.join(workspace, "broker.mjs"); + fs.writeFileSync(pidFile, "12345\n", "utf8"); + fs.writeFileSync(logFile, "stale broker\n", "utf8"); + writeExecutable(brokerScript, "setInterval(() => {}, 1000);\n"); + saveBrokerSession(workspace, { + endpoint, + pidFile, + logFile, + sessionDir, + pid: 12345 + }); + + const killedPids = []; + try { + const session = await ensureBrokerSession(workspace, { + timeoutMs: 1, + scriptPath: brokerScript, + validateProcess() { + return true; + }, + killProcess(pid) { + killedPids.push(pid); + if (pid !== 12345) { + terminateTestProcess(pid); + } + } + }); + + assert.equal(session, null); + assert.equal(killedPids.includes(12345), true); + } finally { + for (const pid of killedPids) { + if (pid !== 12345) { + terminateTestProcess(pid); + } + } + } +}); diff --git a/tests/state-concurrent-worker.mjs b/tests/state-concurrent-worker.mjs new file mode 100644 index 00000000..0c80403b --- /dev/null +++ b/tests/state-concurrent-worker.mjs @@ -0,0 +1,13 @@ +import { upsertJob } from "../plugins/codex/scripts/lib/state.mjs"; + +const cwd = process.argv[2]; +const workerId = process.argv[3]; +const jobCount = Number(process.argv[4] ?? 0); + +for (let index = 0; index < jobCount; index += 1) { + upsertJob(cwd, { + id: `worker-${workerId}-job-${index}`, + status: "running", + title: `Worker ${workerId} job ${index}` + }); +} diff --git a/tests/state.test.mjs b/tests/state.test.mjs index 0f8f57ce..8fb5ac88 100644 --- a/tests/state.test.mjs +++ b/tests/state.test.mjs @@ -3,9 +3,20 @@ import os from "node:os"; import path from "node:path"; import test from "node:test"; import assert from "node:assert/strict"; +import { spawn } from "node:child_process"; +import { fileURLToPath } from "node:url"; import { makeTempDir } from "./helpers.mjs"; -import { resolveJobFile, resolveJobLogFile, resolveStateDir, resolveStateFile, saveState } from "../plugins/codex/scripts/lib/state.mjs"; +import { + listJobs, + resolveJobFile, + resolveJobLogFile, + resolveStateDir, + resolveStateFile, + saveState +} from "../plugins/codex/scripts/lib/state.mjs"; + +const STATE_CONCURRENT_WORKER = path.join(path.dirname(fileURLToPath(import.meta.url)), "state-concurrent-worker.mjs"); test("resolveStateDir uses a temp-backed per-workspace directory", () => { const workspace = makeTempDir(); @@ -103,3 +114,86 @@ test("saveState prunes dropped job artifacts when indexed jobs exceed the cap", .sort() ); }); + +function runWorker(workspace, workerId, jobsPerWorker) { + return new Promise((resolve, reject) => { + const child = spawn(process.execPath, [STATE_CONCURRENT_WORKER, workspace, String(workerId), String(jobsPerWorker)], { + env: process.env, + stdio: ["ignore", "pipe", "pipe"] + }); + let stderr = ""; + child.stderr.on("data", (chunk) => { + stderr += chunk; + }); + child.on("error", reject); + child.on("exit", (code, signal) => { + resolve({ code, signal, stderr }); + }); + }); +} + +test("upsertJob preserves concurrent state updates from multiple processes", async () => { + const workspace = makeTempDir(); + const workerCount = 5; + const jobsPerWorker = 8; + const expectedJobCount = workerCount * jobsPerWorker; + + const results = await Promise.all( + Array.from({ length: workerCount }, (_, workerId) => runWorker(workspace, workerId, jobsPerWorker)) + ); + for (const result of results) { + assert.equal(result.code, 0, result.stderr); + assert.equal(result.signal, null); + } + + const jobs = listJobs(workspace); + assert.equal(jobs.length, expectedJobCount); + assert.equal(new Set(jobs.map((job) => job.id)).size, expectedJobCount); +}); + +test("upsertJob waits for a live stale-looking state lock owner", async () => { + const workspace = makeTempDir(); + const previousTimeout = process.env.CODEX_COMPANION_STATE_LOCK_TIMEOUT_MS; + process.env.CODEX_COMPANION_STATE_LOCK_TIMEOUT_MS = "100"; + const stateDir = resolveStateDir(workspace); + const lockDir = path.join(stateDir, ".state.lock"); + fs.mkdirSync(lockDir, { recursive: true }); + fs.writeFileSync(path.join(lockDir, "owner"), `${JSON.stringify({ pid: process.pid, token: "live-owner" })}\n`, "utf8"); + const old = new Date(Date.now() - 60000); + fs.utimesSync(lockDir, old, old); + + try { + const result = await runWorker(workspace, "blocked", 1); + + assert.equal(result.code, 1); + assert.match(result.stderr, /Timed out waiting for state lock/); + assert.equal(fs.existsSync(lockDir), true); + } finally { + if (previousTimeout == null) { + delete process.env.CODEX_COMPANION_STATE_LOCK_TIMEOUT_MS; + } else { + process.env.CODEX_COMPANION_STATE_LOCK_TIMEOUT_MS = previousTimeout; + } + fs.rmSync(lockDir, { recursive: true, force: true }); + } +}); + +test("upsertJob recovers a stale lock after pid reuse", async () => { + const workspace = makeTempDir(); + const stateDir = resolveStateDir(workspace); + const lockDir = path.join(stateDir, ".state.lock"); + fs.mkdirSync(lockDir, { recursive: true }); + fs.writeFileSync( + path.join(lockDir, "owner"), + `${JSON.stringify({ pid: process.pid, startedAt: "reused-process", token: "stale-owner" })}\n`, + "utf8" + ); + const old = new Date(Date.now() - 60000); + fs.utimesSync(lockDir, old, old); + + const result = await runWorker(workspace, "recovered", 1); + + assert.equal(result.code, 0, result.stderr); + assert.equal(fs.existsSync(lockDir), false); + assert.equal(listJobs(workspace).length, 1); +});