Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 80 additions & 4 deletions plugins/codex/scripts/lib/broker-lifecycle.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import process from "node:process";
import { spawn } from "node:child_process";
import { fileURLToPath } from "node:url";
import { createBrokerEndpoint, parseBrokerEndpoint } from "./broker-endpoint.mjs";
import { runCommand, terminateProcessTree } from "./process.mjs";
import { resolveStateDir } from "./state.mjs";

export const PID_FILE_ENV = "CODEX_COMPANION_APP_SERVER_PID_FILE";
Expand Down Expand Up @@ -110,7 +111,66 @@ async function isBrokerEndpointReady(endpoint) {
}
}

function readPidFile(pidFile) {
if (!pidFile || !fs.existsSync(pidFile)) {
return null;
}
const value = Number(fs.readFileSync(pidFile, "utf8").trim());
return Number.isFinite(value) ? value : null;
}

function getProcessCommand(pid, options = {}) {
const runCommandImpl = options.runCommandImpl ?? runCommand;
const platform = options.platform ?? process.platform;
const result =
platform === "win32"
? runCommandImpl(
"powershell",
[
"-NoProfile",
"-Command",
`$process = Get-CimInstance Win32_Process -Filter "ProcessId = ${pid}"; if ($process) { $process.CommandLine }`
],
{
cwd: options.cwd,
env: options.env
}
)
: runCommandImpl("ps", ["-p", String(pid), "-o", "command="], {
cwd: options.cwd,
env: options.env
});
if (result.error || result.status !== 0) {
return null;
}
return result.stdout.trim();
}

function isExpectedBrokerProcess({ pid, endpoint = null, pidFile = null, platform = process.platform, runCommandImpl = runCommand }) {
if (!Number.isFinite(pid)) {
return false;
}

const recordedPid = readPidFile(pidFile);
if (recordedPid !== null && recordedPid !== pid) {
return false;
}

const command = getProcessCommand(pid, { platform, runCommandImpl });
if (!command) {
return false;
}

return (
command.includes("app-server-broker.mjs") &&
command.includes("serve") &&
(!endpoint || command.includes(endpoint)) &&
(!pidFile || command.includes(pidFile))
);
}

export async function ensureBrokerSession(cwd, options = {}) {
const killProcess = options.killProcess ?? terminateProcessTree;
const existing = loadBrokerSession(cwd);
if (existing && (await isBrokerEndpointReady(existing.endpoint))) {
return existing;
Expand All @@ -123,7 +183,10 @@ export async function ensureBrokerSession(cwd, options = {}) {
logFile: existing.logFile ?? null,
sessionDir: existing.sessionDir ?? null,
pid: existing.pid ?? null,
killProcess: options.killProcess ?? null
killProcess,
validateProcess: options.validateProcess,
platform: options.platform,
runCommandImpl: options.runCommandImpl
});
clearBrokerSession(cwd);
}
Expand Down Expand Up @@ -154,7 +217,10 @@ export async function ensureBrokerSession(cwd, options = {}) {
logFile,
sessionDir,
pid: child.pid ?? null,
killProcess: options.killProcess ?? null
killProcess,
validateProcess: options.validateProcess,
platform: options.platform,
runCommandImpl: options.runCommandImpl
});
return null;
}
Expand All @@ -170,8 +236,18 @@ export async function ensureBrokerSession(cwd, options = {}) {
return session;
}

export function teardownBrokerSession({ endpoint = null, pidFile, logFile, sessionDir = null, pid = null, killProcess = null }) {
if (Number.isFinite(pid) && killProcess) {
export function teardownBrokerSession({
endpoint = null,
pidFile,
logFile,
sessionDir = null,
pid = null,
killProcess = terminateProcessTree,
validateProcess = isExpectedBrokerProcess,
platform = process.platform,
runCommandImpl = runCommand
}) {
if (Number.isFinite(pid) && validateProcess({ pid, endpoint, pidFile, platform, runCommandImpl })) {
try {
killProcess(pid);
} catch {
Expand Down
161 changes: 157 additions & 4 deletions plugins/codex/scripts/lib/state.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,21 @@ import fs from "node:fs";
import os from "node:os";
import path from "node:path";

import { runCommand } from "./process.mjs";
import { resolveWorkspaceRoot } from "./workspace.mjs";

const STATE_VERSION = 1;
const PLUGIN_DATA_ENV = "CLAUDE_PLUGIN_DATA";
const FALLBACK_STATE_ROOT_DIR = path.join(os.tmpdir(), "codex-companion");
const STATE_FILE_NAME = "state.json";
const JOBS_DIR_NAME = "jobs";
const STATE_LOCK_DIR_NAME = ".state.lock";
const MAX_JOBS = 50;
const STATE_LOCK_STALE_MS = 30000;
const STATE_LOCK_TIMEOUT_MS = STATE_LOCK_STALE_MS + 5000;
const STATE_LOCK_RETRY_MS = 20;
const STATE_LOCK_TIMEOUT_ENV = "CODEX_COMPANION_STATE_LOCK_TIMEOUT_MS";
let currentProcessStartedAt = undefined;

function nowIso() {
return new Date().toISOString();
Expand Down Expand Up @@ -89,7 +96,147 @@ function removeFileIfExists(filePath) {
}
}

export function saveState(cwd, state) {
function sleepSync(ms) {
const buffer = new SharedArrayBuffer(4);
const view = new Int32Array(buffer);
Atomics.wait(view, 0, 0, ms);
}

function resolveStateLockDir(cwd) {
return path.join(resolveStateDir(cwd), STATE_LOCK_DIR_NAME);
}

function getProcessStartedAt(pid, options = {}) {
const runCommandImpl = options.runCommandImpl ?? runCommand;
const platform = options.platform ?? process.platform;
const result =
platform === "win32"
? runCommandImpl(
"powershell",
[
"-NoProfile",
"-Command",
`$process = Get-CimInstance Win32_Process -Filter "ProcessId = ${pid}"; if ($process) { $process.CreationDate.ToUniversalTime().ToString("o") }`
],
{
cwd: options.cwd,
env: options.env
}
)
: runCommandImpl("ps", ["-p", String(pid), "-o", "lstart="], {
cwd: options.cwd,
env: options.env
});

if (result.error || result.status !== 0) {
return null;
}
return result.stdout.trim() || null;
}

function getCurrentProcessStartedAt() {
if (currentProcessStartedAt === undefined) {
currentProcessStartedAt = getProcessStartedAt(process.pid);
}
return currentProcessStartedAt;
}

function readLockOwner(lockDir) {
try {
return JSON.parse(fs.readFileSync(path.join(lockDir, "owner"), "utf8"));
} catch {
return null;
}
}

function processExists(pid) {
if (!Number.isFinite(pid)) {
return false;
}
try {
process.kill(pid, 0);
return true;
} catch (error) {
if (error?.code === "ESRCH") {
return false;
}
return true;
}
}

function isLockOwnerAlive(owner) {
if (!processExists(owner?.pid)) {
return false;
}
if (!owner?.startedAt) {
return true;
}
const startedAt = getProcessStartedAt(owner.pid);
return startedAt === null || startedAt === owner.startedAt;
}

function ownsLock(lockDir, token) {
const owner = readLockOwner(lockDir);
return owner?.token === token && owner?.pid === process.pid;
}

function removeStaleLock(lockDir) {
try {
const stats = fs.statSync(lockDir);
const owner = readLockOwner(lockDir);
if (Date.now() - stats.mtimeMs > STATE_LOCK_STALE_MS && !isLockOwnerAlive(owner)) {
fs.rmSync(lockDir, { recursive: true, force: true });
}
} catch {
// Another process may have released the lock between attempts.
}
}

function acquireStateLock(cwd, options = {}) {
const lockDir = resolveStateLockDir(cwd);
const configuredTimeoutMs = Number(process.env[STATE_LOCK_TIMEOUT_ENV]);
const timeoutMs = options.timeoutMs ?? (Number.isFinite(configuredTimeoutMs) ? configuredTimeoutMs : STATE_LOCK_TIMEOUT_MS);
const start = Date.now();
fs.mkdirSync(resolveStateDir(cwd), { recursive: true });

while (true) {
try {
fs.mkdirSync(lockDir);
const token = `${process.pid}-${Date.now()}-${Math.random().toString(36).slice(2)}`;
fs.writeFileSync(
path.join(lockDir, "owner"),
`${JSON.stringify({ pid: process.pid, startedAt: getCurrentProcessStartedAt(), token })}\n`,
"utf8"
);
return () => {
if (ownsLock(lockDir, token)) {
fs.rmSync(lockDir, { recursive: true, force: true });
}
};
} catch (error) {
if (error?.code !== "EEXIST") {
throw error;
}

removeStaleLock(lockDir);
if (Date.now() - start >= timeoutMs) {
throw new Error(`Timed out waiting for state lock: ${lockDir}`);
}
sleepSync(STATE_LOCK_RETRY_MS);
}
}
}

function withStateLock(cwd, fn) {
const release = acquireStateLock(cwd);
try {
return fn();
} finally {
release();
}
}

function saveStateUnlocked(cwd, state) {
const previousJobs = loadState(cwd).jobs;
ensureStateDir(cwd);
const nextJobs = pruneJobs(state.jobs ?? []);
Expand All @@ -115,10 +262,16 @@ export function saveState(cwd, state) {
return nextState;
}

export function saveState(cwd, state) {
return withStateLock(cwd, () => saveStateUnlocked(cwd, state));
}

export function updateState(cwd, mutate) {
const state = loadState(cwd);
mutate(state);
return saveState(cwd, state);
return withStateLock(cwd, () => {
const state = loadState(cwd);
mutate(state);
return saveStateUnlocked(cwd, state);
});
}

export function generateJobId(prefix = "job") {
Expand Down
Loading