From 91c3791def706c619b0b87bc3753f5897eb3ee7e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Pierzcha=C5=82a?= Date: Thu, 11 Jun 2026 16:43:08 +0200 Subject: [PATCH 1/2] refactor: split daemon client facade --- fallow-baselines/health.json | 21 +- src/daemon-client-lifecycle.ts | 414 ++++++++++ src/daemon-client-metadata.ts | 268 +++++++ src/daemon-client-rpc.ts | 166 ++++ src/daemon-client-timeout.ts | 111 +++ src/daemon-client-transport.ts | 385 ++++++++++ src/daemon-client.ts | 1313 +------------------------------- 7 files changed, 1376 insertions(+), 1302 deletions(-) create mode 100644 src/daemon-client-lifecycle.ts create mode 100644 src/daemon-client-metadata.ts create mode 100644 src/daemon-client-rpc.ts create mode 100644 src/daemon-client-timeout.ts create mode 100644 src/daemon-client-transport.ts diff --git a/fallow-baselines/health.json b/fallow-baselines/health.json index e3329da5f..0caf8fb48 100644 --- a/fallow-baselines/health.json +++ b/fallow-baselines/health.json @@ -156,12 +156,27 @@ "count": 2 } }, - "src/daemon-client.ts": { + "src/daemon-client-lifecycle.ts": { "complexity_moderate": { - "count": 5 + "count": 1 }, "crap_moderate": { - "count": 5 + "count": 1 + } + }, + "src/daemon-client-metadata.ts": { + "complexity_moderate": { + "count": 1 + } + }, + "src/daemon-client-rpc.ts": { + "crap_moderate": { + "count": 2 + } + }, + "src/daemon-client.ts": { + "complexity_moderate": { + "count": 2 } }, "src/daemon/__tests__/network-log.test.ts": { diff --git a/src/daemon-client-lifecycle.ts b/src/daemon-client-lifecycle.ts new file mode 100644 index 000000000..ce8513f1b --- /dev/null +++ b/src/daemon-client-lifecycle.ts @@ -0,0 +1,414 @@ +import fs from 'node:fs'; +import net from 'node:net'; +import os from 'node:os'; +import path from 'node:path'; +import { AppError } from './utils/errors.ts'; +import type { DaemonRequest } from './daemon/types.ts'; +import { runCmdDetached } from './utils/exec.ts'; +import { findProjectRoot, readVersion } from './utils/version.ts'; +import { emitDiagnostic } from './utils/diagnostics.ts'; +import { + resolveDaemonPaths, + resolveDaemonServerMode, + resolveDaemonTransportPreference, + type DaemonPaths, + type DaemonServerMode, + type DaemonTransportPreference, +} from './daemon/config.ts'; +import { computeDaemonCodeSignature } from './daemon/code-signature.ts'; +import { PUBLIC_COMMANDS } from './command-catalog.ts'; +import { sleep } from './utils/timeouts.ts'; +import { + cleanupFailedDaemonStartupMetadata, + cleanupStaleDaemonLockIfSafe, + getDaemonMetadataState, + isRemoteDaemon, + readDaemonInfo, + recoverDaemonLockHolder, + removeDaemonInfo, + removeDaemonLock, + resolveDaemonStartupHint, + stopDaemonProcessForTakeover, + type DaemonInfo, + type DaemonStartupCleanupResult, +} from './daemon-client-metadata.ts'; +import { canConnect } from './daemon-client-transport.ts'; + +export type DaemonClientSettings = { + paths: DaemonPaths; + transportPreference: DaemonTransportPreference; + serverMode: DaemonServerMode; + ownedStateDir?: boolean; + remoteBaseUrl?: string; + remoteAuthToken?: string; +}; + +export type EnsuredDaemon = { + info: DaemonInfo; + startedByClient: boolean; +}; + +const DAEMON_STARTUP_TIMEOUT_MS = 15_000; +const DAEMON_STARTUP_ATTEMPTS = 2; +const LOOPBACK_BLOCK_LIST = new net.BlockList(); +LOOPBACK_BLOCK_LIST.addSubnet('127.0.0.0', 8, 'ipv4'); +LOOPBACK_BLOCK_LIST.addAddress('::1', 'ipv6'); +LOOPBACK_BLOCK_LIST.addSubnet('::ffff:127.0.0.0', 104, 'ipv6'); + +export function resolveClientSettings(req: Omit): DaemonClientSettings { + const explicitStateDir = resolveExplicitStateDir(req); + const remote = resolveRemoteClientSettings(req); + const transport = resolveTransportClientSettings(req, remote.remoteBaseUrl); + const ownedStateDir = shouldUseOwnedReplayStateDir(req, explicitStateDir, remote.rawBaseUrl); + const stateDir = ownedStateDir ? createOwnedReplayStateDir() : explicitStateDir; + return { + paths: resolveDaemonPaths(stateDir), + transportPreference: transport.preference, + serverMode: transport.serverMode, + ownedStateDir, + remoteBaseUrl: remote.remoteBaseUrl, + remoteAuthToken: remote.authToken, + }; +} + +function resolveExplicitStateDir(req: Omit): string | undefined { + return req.flags?.stateDir ?? process.env.AGENT_DEVICE_STATE_DIR; +} + +function resolveRemoteClientSettings(req: Omit): { + rawBaseUrl: string | undefined; + remoteBaseUrl?: string; + authToken?: string; +} { + const rawBaseUrl = req.flags?.daemonBaseUrl ?? process.env.AGENT_DEVICE_DAEMON_BASE_URL; + const remoteBaseUrl = resolveRemoteDaemonBaseUrl(rawBaseUrl); + const authToken = req.flags?.daemonAuthToken ?? process.env.AGENT_DEVICE_DAEMON_AUTH_TOKEN; + validateRemoteDaemonTrust(remoteBaseUrl, authToken); + return { rawBaseUrl, remoteBaseUrl, authToken }; +} + +function resolveTransportClientSettings( + req: Omit, + remoteBaseUrl: string | undefined, +): { preference: DaemonTransportPreference; serverMode: DaemonServerMode } { + const rawTransport = req.flags?.daemonTransport ?? process.env.AGENT_DEVICE_DAEMON_TRANSPORT; + const preference = resolveDaemonTransportPreference(rawTransport); + if (remoteBaseUrl && preference === 'socket') { + throw new AppError( + 'INVALID_ARGS', + 'Remote daemon base URL only supports HTTP transport. Remove --daemon-transport socket.', + { daemonBaseUrl: remoteBaseUrl }, + ); + } + const rawServerMode = + req.flags?.daemonServerMode ?? + process.env.AGENT_DEVICE_DAEMON_SERVER_MODE ?? + (rawTransport === 'dual' ? 'dual' : undefined); + return { + preference, + serverMode: resolveDaemonServerMode(rawServerMode), + }; +} + +function shouldUseOwnedReplayStateDir( + req: Omit, + explicitStateDir: string | undefined, + rawRemoteBaseUrl: string | undefined, +): boolean { + return isOneShotReplayCommand(req.command) && !explicitStateDir && !rawRemoteBaseUrl; +} + +function createOwnedReplayStateDir(): string { + return fs.mkdtempSync(path.join(os.tmpdir(), 'agent-device-replay-daemon-')); +} + +export async function ensureDaemon(settings: DaemonClientSettings): Promise { + if (settings.remoteBaseUrl) { + return await ensureRemoteDaemon(settings); + } + + const reusable = await readReusableLocalDaemon(settings); + if (reusable) return { info: reusable, startedByClient: false }; + + cleanupStaleDaemonLockIfSafe(settings.paths); + return await startLocalDaemon(settings); +} + +async function ensureRemoteDaemon(settings: DaemonClientSettings): Promise { + const remoteInfo: DaemonInfo = { + transport: 'http', + // Remote mode reuses the auth token as the daemon token so the existing JSON-RPC contract still works. + token: settings.remoteAuthToken ?? '', + pid: 0, + baseUrl: settings.remoteBaseUrl, + }; + if (await canConnect(remoteInfo, 'http')) { + return { info: remoteInfo, startedByClient: false }; + } + throw new AppError('COMMAND_FAILED', 'Remote daemon is unavailable', { + daemonBaseUrl: settings.remoteBaseUrl, + hint: 'Verify AGENT_DEVICE_DAEMON_BASE_URL points to a reachable daemon with GET /health and POST /rpc.', + }); +} + +async function readReusableLocalDaemon(settings: DaemonClientSettings): Promise { + const existing = readDaemonInfo(settings.paths.infoPath); + if (!existing) return null; + + const existingReachable = await canConnect(existing, settings.transportPreference); + if (isReusableDaemonInfo(existing, existingReachable)) return existing; + + emitDaemonTakeoverNotice(existing, existingReachable, settings.paths.baseDir); + await stopDaemonProcessForTakeover(existing); + removeDaemonInfo(settings.paths.infoPath); + return null; +} + +function isReusableDaemonInfo(info: DaemonInfo, reachable: boolean): boolean { + return ( + info.version === readVersion() && + info.codeSignature === resolveLocalDaemonCodeSignature() && + reachable + ); +} + +function emitDaemonTakeoverNotice(info: DaemonInfo, reachable: boolean, stateDir: string): void { + try { + const identity = info.version ? `pid ${info.pid}, v${info.version}` : `pid ${info.pid}`; + const reason = resolveDaemonTakeoverReason(info, reachable); + process.stderr.write(`Replacing daemon (${identity}) in ${stateDir}: ${reason}\n`); + } catch { + // The takeover notice is best effort; never fail the command on stderr issues. + } +} + +function resolveDaemonTakeoverReason(info: DaemonInfo, reachable: boolean): string { + if (info.version !== readVersion()) return `version mismatch (client v${readVersion()})`; + if (info.codeSignature !== resolveLocalDaemonCodeSignature()) return 'code-signature mismatch'; + if (!reachable) return 'unreachable'; + return 'not reusable'; +} + +async function startLocalDaemon(settings: DaemonClientSettings): Promise { + let lockRecoveryCount = 0; + const cleanupResults: DaemonStartupCleanupResult[] = []; + let startError: string | undefined; + for (let attempt = 1; attempt <= DAEMON_STARTUP_ATTEMPTS; attempt += 1) { + try { + await startDaemon(settings); + } catch (error) { + startError = error instanceof Error ? error.message : String(error); + cleanupResults.push(await cleanupFailedDaemonStartupMetadata(settings.paths, 'start_error')); + if (attempt < DAEMON_STARTUP_ATTEMPTS) { + await sleep(150); + continue; + } + break; + } + + const started = await waitForDaemonInfo(DAEMON_STARTUP_TIMEOUT_MS, settings); + if (started) return { info: started, startedByClient: true }; + + if (await recoverDaemonLockHolder(settings.paths)) { + lockRecoveryCount += 1; + continue; + } + + const metadataState = getDaemonMetadataState(settings.paths); + const hasAnotherAttempt = attempt < DAEMON_STARTUP_ATTEMPTS; + const cleanup = await cleanupFailedDaemonStartupMetadata(settings.paths, 'startup_timeout', { + stopLiveProcesses: false, + }); + cleanupResults.push(cleanup); + if (cleanup.retainedInfoProcess || cleanup.retainedLockProcess) { + const extended = await waitForDaemonInfo(DAEMON_STARTUP_TIMEOUT_MS, settings); + if (extended) return { info: extended, startedByClient: true }; + break; + } + if (!hasAnotherAttempt) break; + + // Detached daemon startup can race on busy CI hosts; retry when no metadata exists yet. + if (!metadataState.hasInfo && !metadataState.hasLock) await sleep(150); + } + + const state = getDaemonMetadataState(settings.paths); + throw new AppError('COMMAND_FAILED', 'Failed to start daemon', { + kind: 'daemon_startup_failed', + infoPath: settings.paths.infoPath, + lockPath: settings.paths.lockPath, + startupTimeoutMs: DAEMON_STARTUP_TIMEOUT_MS, + startupAttempts: DAEMON_STARTUP_ATTEMPTS, + lockRecoveryCount, + cleanupResults, + startError, + metadataState: state, + hint: resolveDaemonStartupHint(state, settings.paths), + }); +} + +export async function cleanupDaemonAfterRequest( + req: Omit, + daemon: EnsuredDaemon, + settings: DaemonClientSettings, +): Promise { + if ( + !isOneShotReplayCommand(req.command) || + (!daemon.startedByClient && !settings.ownedStateDir) || + isRemoteDaemon(daemon.info) + ) { + return; + } + + const result = { + pid: daemon.info.pid, + removedInfo: false, + removedLock: false, + removedStateDir: false, + error: undefined as string | undefined, + }; + + try { + await stopDaemonProcessForTakeover(daemon.info); + } catch (error) { + result.error = error instanceof Error ? error.message : String(error); + } finally { + const infoExists = fs.existsSync(settings.paths.infoPath); + removeDaemonInfo(settings.paths.infoPath); + result.removedInfo = infoExists && !fs.existsSync(settings.paths.infoPath); + + const lockExists = fs.existsSync(settings.paths.lockPath); + removeDaemonLock(settings.paths.lockPath); + result.removedLock = lockExists && !fs.existsSync(settings.paths.lockPath); + + if (settings.ownedStateDir) { + fs.rmSync(settings.paths.baseDir, { recursive: true, force: true }); + result.removedStateDir = !fs.existsSync(settings.paths.baseDir); + } + } + + emitDiagnostic({ + level: result.error ? 'warn' : 'info', + phase: 'daemon_replay_cleanup', + data: result, + }); +} + +function isOneShotReplayCommand(command: string | undefined): boolean { + return command === PUBLIC_COMMANDS.replay || command === PUBLIC_COMMANDS.test; +} + +async function waitForDaemonInfo( + timeoutMs: number, + settings: DaemonClientSettings, +): Promise { + const start = Date.now(); + while (Date.now() - start < timeoutMs) { + const info = readDaemonInfo(settings.paths.infoPath); + if (info && (await canConnect(info, settings.transportPreference))) return info; + await sleep(100); + } + return null; +} + +async function startDaemon(settings: DaemonClientSettings): Promise { + const launchSpec = resolveDaemonLaunchSpec(); + const args = launchSpec.useSrc + ? ['--experimental-strip-types', launchSpec.srcPath] + : [launchSpec.distPath]; + const env = { + ...process.env, + AGENT_DEVICE_STATE_DIR: settings.paths.baseDir, + AGENT_DEVICE_DAEMON_SERVER_MODE: settings.serverMode, + }; + + runCmdDetached(process.execPath, args, { env }); +} + +type DaemonLaunchSpec = { + root: string; + distPath: string; + distPaths: string[]; + srcPath: string; + useSrc: boolean; +}; + +function resolveDaemonLaunchSpec(): DaemonLaunchSpec { + const root = findProjectRoot(); + const distPaths = [ + path.join(root, 'dist', 'src', 'internal', 'daemon.js'), + path.join(root, 'dist', 'src', 'daemon.js'), + ]; + const defaultDistPath = distPaths[0]; + if (defaultDistPath === undefined) { + throw new AppError('COMMAND_FAILED', 'Daemon dist path list is empty'); + } + const distPath = distPaths.find((candidate) => fs.existsSync(candidate)) ?? defaultDistPath; + const srcPath = path.join(root, 'src', 'daemon.ts'); + + const hasDist = distPaths.some((candidate) => fs.existsSync(candidate)); + const hasSrc = fs.existsSync(srcPath); + if (!hasDist && !hasSrc) { + throw new AppError('COMMAND_FAILED', 'Daemon entry not found', { distPaths, srcPath }); + } + const runningFromSource = process.execArgv.includes('--experimental-strip-types'); + const useSrc = runningFromSource ? hasSrc : !hasDist && hasSrc; + return { root, distPath, distPaths, srcPath, useSrc }; +} + +function resolveLocalDaemonCodeSignature(): string { + const launchSpec = resolveDaemonLaunchSpec(); + const entryPath = launchSpec.useSrc ? launchSpec.srcPath : launchSpec.distPath; + return computeDaemonCodeSignature(entryPath, launchSpec.root); +} + +function resolveRemoteDaemonBaseUrl(raw: string | undefined): string | undefined { + if (!raw) return undefined; + let parsed: URL; + try { + parsed = new URL(raw); + } catch (error) { + throw new AppError( + 'INVALID_ARGS', + 'Invalid daemon base URL', + { + daemonBaseUrl: raw, + }, + error instanceof Error ? error : undefined, + ); + } + if (parsed.protocol !== 'http:' && parsed.protocol !== 'https:') { + throw new AppError('INVALID_ARGS', 'Daemon base URL must use http or https', { + daemonBaseUrl: raw, + }); + } + return parsed.toString().replace(/\/+$/, ''); +} + +function validateRemoteDaemonTrust( + remoteBaseUrl: string | undefined, + remoteAuthToken: string | undefined, +): void { + if (!remoteBaseUrl) return; + const hostname = new URL(remoteBaseUrl).hostname; + if (isLoopbackHostname(hostname)) return; + if (typeof remoteAuthToken === 'string' && remoteAuthToken.trim().length > 0) return; + throw new AppError( + 'INVALID_ARGS', + 'Remote daemon base URL for non-loopback hosts requires daemon authentication', + { + daemonBaseUrl: remoteBaseUrl, + hint: 'Provide --daemon-auth-token or AGENT_DEVICE_DAEMON_AUTH_TOKEN when using a non-loopback remote daemon URL.', + }, + ); +} + +function isLoopbackHostname(hostname: string): boolean { + const normalized = hostname + .trim() + .toLowerCase() + .replace(/^\[(.*)\]$/, '$1'); + if (normalized === 'localhost') return true; + if (net.isIPv4(normalized)) return LOOPBACK_BLOCK_LIST.check(normalized, 'ipv4'); + if (net.isIPv6(normalized)) return LOOPBACK_BLOCK_LIST.check(normalized, 'ipv6'); + return false; +} diff --git a/src/daemon-client-metadata.ts b/src/daemon-client-metadata.ts new file mode 100644 index 000000000..800b000fd --- /dev/null +++ b/src/daemon-client-metadata.ts @@ -0,0 +1,268 @@ +import fs from 'node:fs'; +import { emitDiagnostic } from './utils/diagnostics.ts'; +import { isAgentDeviceDaemonProcess, stopProcessForTakeover } from './utils/process-identity.ts'; +import { shellQuote } from './utils/shell-quote.ts'; +import { resolveDaemonPaths, type DaemonPaths, type DaemonServerMode } from './daemon/config.ts'; + +export type DaemonInfo = { + port?: number; + httpPort?: number; + transport?: DaemonServerMode; + token: string; + pid: number; + version?: string; + codeSignature?: string; + processStartTime?: string; + baseUrl?: string; +}; + +type DaemonLockInfo = { + pid: number; + processStartTime?: string; + startedAt?: number; +}; + +export type DaemonMetadataState = { + hasInfo: boolean; + hasLock: boolean; +}; + +type DaemonStartupCleanupReason = 'start_error' | 'startup_timeout'; + +export type DaemonStartupCleanupResult = { + reason: DaemonStartupCleanupReason; + removedInfo: boolean; + removedLock: boolean; + stoppedInfoProcess: boolean; + stoppedLockProcess: boolean; + retainedInfoProcess?: boolean; + retainedLockProcess?: boolean; + error?: string; +}; + +const DAEMON_TAKEOVER_TERM_TIMEOUT_MS = 3000; +const DAEMON_TAKEOVER_KILL_TIMEOUT_MS = 1000; + +export function readDaemonInfo(infoPath: string): DaemonInfo | null { + const data = readJsonFile(infoPath); + if (!data || typeof data !== 'object') return null; + const parsed = data as Partial; + const token = readRequiredDaemonToken(parsed); + if (!token) return null; + const ports = readDaemonInfoPorts(parsed); + if (!ports) return null; + return { + token, + ...ports, + transport: readDaemonInfoTransport(parsed.transport), + pid: readPositiveInteger(parsed.pid) ?? 0, + version: readOptionalString(parsed.version), + codeSignature: readOptionalString(parsed.codeSignature), + processStartTime: readOptionalString(parsed.processStartTime), + }; +} + +function readRequiredDaemonToken(parsed: Partial): string | null { + return typeof parsed.token === 'string' && parsed.token.length > 0 ? parsed.token : null; +} + +function readDaemonInfoPorts( + parsed: Partial, +): Pick | null { + const port = readPositiveInteger(parsed.port); + const httpPort = readPositiveInteger(parsed.httpPort); + if (port === undefined && httpPort === undefined) return null; + return { port, httpPort }; +} + +function readDaemonInfoTransport(value: unknown): DaemonInfo['transport'] { + return value === 'socket' || value === 'http' || value === 'dual' ? value : undefined; +} + +function readOptionalString(value: unknown): string | undefined { + return typeof value === 'string' ? value : undefined; +} + +function readPositiveInteger(value: unknown): number | undefined { + return Number.isInteger(value) && Number(value) > 0 ? Number(value) : undefined; +} + +function readDaemonLockInfo(lockPath: string): DaemonLockInfo | null { + const data = readJsonFile(lockPath); + if (!data || typeof data !== 'object') return null; + const parsed = data as Partial; + const hasPid = Number.isInteger(parsed.pid) && Number(parsed.pid) > 0; + if (!hasPid) { + return null; + } + return { + pid: Number(parsed.pid), + processStartTime: + typeof parsed.processStartTime === 'string' ? parsed.processStartTime : undefined, + startedAt: typeof parsed.startedAt === 'number' ? parsed.startedAt : undefined, + }; +} + +export function removeDaemonInfo(infoPath: string): void { + removeFileIfExists(infoPath); +} + +export function removeDaemonLock(lockPath: string): void { + removeFileIfExists(lockPath); +} + +export function cleanupStaleDaemonLockIfSafe(paths: DaemonPaths): void { + const state = getDaemonMetadataState(paths); + if (!state.hasLock || state.hasInfo) return; + const lockInfo = readDaemonLockInfo(paths.lockPath); + if (!lockInfo) { + removeDaemonLock(paths.lockPath); + return; + } + if (isAgentDeviceDaemonProcess(lockInfo.pid, lockInfo.processStartTime)) { + return; + } + removeDaemonLock(paths.lockPath); +} + +export async function cleanupFailedDaemonStartupMetadata( + paths: DaemonPaths, + reason: DaemonStartupCleanupReason, + options: { stopLiveProcesses?: boolean } = {}, +): Promise { + const stopLiveProcesses = options.stopLiveProcesses ?? true; + const result: DaemonStartupCleanupResult = { + reason, + removedInfo: false, + removedLock: false, + stoppedInfoProcess: false, + stoppedLockProcess: false, + }; + + try { + const infoExists = fs.existsSync(paths.infoPath); + const info = readDaemonInfo(paths.infoPath); + if (info) { + const liveInfoProcess = isAgentDeviceDaemonProcess(info.pid, info.processStartTime); + if (liveInfoProcess && !stopLiveProcesses) { + result.retainedInfoProcess = true; + } else { + if (liveInfoProcess) { + await stopDaemonProcessForTakeover(info); + result.stoppedInfoProcess = true; + } + removeDaemonInfo(paths.infoPath); + result.removedInfo = true; + } + } else if (infoExists) { + removeDaemonInfo(paths.infoPath); + result.removedInfo = true; + } + + const lockExists = fs.existsSync(paths.lockPath); + const lockInfo = readDaemonLockInfo(paths.lockPath); + if (lockInfo) { + const liveLockProcess = isAgentDeviceDaemonProcess(lockInfo.pid, lockInfo.processStartTime); + if (liveLockProcess && !stopLiveProcesses) { + result.retainedLockProcess = true; + } else { + if (liveLockProcess) { + await stopProcessForTakeover(lockInfo.pid, { + termTimeoutMs: DAEMON_TAKEOVER_TERM_TIMEOUT_MS, + killTimeoutMs: DAEMON_TAKEOVER_KILL_TIMEOUT_MS, + expectedStartTime: lockInfo.processStartTime, + }); + result.stoppedLockProcess = true; + } + removeDaemonLock(paths.lockPath); + result.removedLock = true; + } + } else if (lockExists) { + removeDaemonLock(paths.lockPath); + result.removedLock = true; + } + } catch (error) { + result.error = error instanceof Error ? error.message : String(error); + } + + emitDiagnostic({ + level: result.error ? 'warn' : 'info', + phase: 'daemon_startup_metadata_cleanup', + data: result, + }); + return result; +} + +export function getDaemonMetadataState(paths: DaemonPaths): DaemonMetadataState { + return { + hasInfo: fs.existsSync(paths.infoPath), + hasLock: fs.existsSync(paths.lockPath), + }; +} + +export async function recoverDaemonLockHolder(paths: DaemonPaths): Promise { + const state = getDaemonMetadataState(paths); + if (!state.hasLock || state.hasInfo) return false; + const lockInfo = readDaemonLockInfo(paths.lockPath); + if (!lockInfo) { + removeDaemonLock(paths.lockPath); + return true; + } + if (!isAgentDeviceDaemonProcess(lockInfo.pid, lockInfo.processStartTime)) { + removeDaemonLock(paths.lockPath); + return true; + } + return false; +} + +export async function stopDaemonProcessForTakeover(info: DaemonInfo): Promise { + await stopProcessForTakeover(info.pid, { + termTimeoutMs: DAEMON_TAKEOVER_TERM_TIMEOUT_MS, + killTimeoutMs: DAEMON_TAKEOVER_KILL_TIMEOUT_MS, + expectedStartTime: info.processStartTime, + }); +} + +export function isRemoteDaemon(info: DaemonInfo): boolean { + return typeof info.baseUrl === 'string' && info.baseUrl.length > 0; +} + +export function resolveDaemonStartupHint( + state: { hasInfo: boolean; hasLock: boolean }, + paths: Pick = resolveDaemonPaths( + process.env.AGENT_DEVICE_STATE_DIR, + ), +): string { + const cleanupCommand = buildDaemonMetadataCleanupCommand(paths); + if (state.hasLock && !state.hasInfo) { + return `agent-device attempted to clean stale daemon metadata automatically, but ${paths.lockPath} still exists without ${paths.infoPath}. Retry with --debug; if this persists after confirming no agent-device daemon process is running, run: ${cleanupCommand}`; + } + if (state.hasLock && state.hasInfo) { + return `agent-device attempted to clean stale daemon metadata automatically, but ${paths.infoPath} and ${paths.lockPath} still remain. Retry with --debug; if this persists after confirming no agent-device daemon process is running, run: ${cleanupCommand}`; + } + if (state.hasInfo) { + return `agent-device did not observe reachable daemon metadata after retrying, and ${paths.infoPath} still remains. Stale metadata was cleaned automatically when safe; retry with --debug. If this persists after confirming no agent-device daemon process is running, run: ${cleanupCommand}`; + } + return `agent-device did not observe reachable daemon metadata after retrying. Stale metadata was cleaned automatically when safe; retry with --debug and check daemon diagnostics logs. If stale metadata returns after confirming no agent-device daemon process is running, run: ${cleanupCommand}`; +} + +function buildDaemonMetadataCleanupCommand(paths: Pick) { + return `rm -f ${shellQuote(paths.infoPath)} ${shellQuote(paths.lockPath)}`; +} + +function readJsonFile(filePath: string): unknown | null { + if (!fs.existsSync(filePath)) return null; + try { + return JSON.parse(fs.readFileSync(filePath, 'utf8')) as unknown; + } catch { + return null; + } +} + +function removeFileIfExists(filePath: string): void { + try { + if (fs.existsSync(filePath)) fs.unlinkSync(filePath); + } catch { + // Best-effort cleanup only. + } +} diff --git a/src/daemon-client-rpc.ts b/src/daemon-client-rpc.ts new file mode 100644 index 000000000..a0b21a531 --- /dev/null +++ b/src/daemon-client-rpc.ts @@ -0,0 +1,166 @@ +import { AppError, toAppErrorCode } from './utils/errors.ts'; +import { createRequestId } from './utils/diagnostics.ts'; +import type { DaemonRequest, DaemonResponse } from './daemon/types.ts'; +import { materializeRemoteArtifacts } from './daemon-artifacts.ts'; +import type { DaemonInfo } from './daemon-client-metadata.ts'; + +export function handleDaemonHttpResponseBody( + body: string, + options: { + info: DaemonInfo; + req: DaemonRequest; + resolve: (response: DaemonResponse | PromiseLike) => void; + reject: (error: unknown) => void; + }, +): void { + const { info, req, resolve, reject } = options; + try { + const parsed = parseDaemonHttpResponseBody(body); + if (parsed.error) { + reject(toDaemonHttpRpcError(parsed.error, req.meta?.requestId)); + return; + } + if (!parsed.result || typeof parsed.result !== 'object') { + reject( + new AppError('COMMAND_FAILED', 'Invalid daemon RPC response', { + requestId: req.meta?.requestId, + }), + ); + return; + } + void resolveDaemonHttpResult(info, req, parsed.result, resolve, reject); + } catch (err) { + reject( + new AppError( + 'COMMAND_FAILED', + 'Invalid daemon response', + { + requestId: req.meta?.requestId, + line: body, + }, + err instanceof Error ? err : undefined, + ), + ); + } +} + +function parseDaemonHttpResponseBody(body: string): { + result?: DaemonResponse; + error?: { message?: string; data?: Record }; +} { + return JSON.parse(body) as { + result?: DaemonResponse; + error?: { message?: string; data?: Record }; + }; +} + +function toDaemonHttpRpcError( + error: { message?: string; data?: Record }, + requestId: string | undefined, +): AppError { + const data = error.data ?? {}; + return new AppError( + toAppErrorCode(data.code != null ? String(data.code) : undefined, 'COMMAND_FAILED'), + String(data.message ?? error.message ?? 'Daemon RPC request failed'), + { + ...(typeof data.details === 'object' && data.details ? data.details : {}), + hint: typeof data.hint === 'string' ? data.hint : undefined, + diagnosticId: typeof data.diagnosticId === 'string' ? data.diagnosticId : undefined, + logPath: typeof data.logPath === 'string' ? data.logPath : undefined, + requestId, + }, + ); +} + +async function resolveDaemonHttpResult( + info: DaemonInfo, + req: DaemonRequest, + result: DaemonResponse, + resolve: (response: DaemonResponse | PromiseLike) => void, + reject: (error: unknown) => void, +): Promise { + try { + resolve( + info.baseUrl && result.ok ? await materializeRemoteArtifacts(info, req, result) : result, + ); + } catch (error) { + reject(error); + } +} + +export function buildHttpRpcPayload( + req: DaemonRequest, + options: { includeTokenParam: boolean }, +): { + jsonrpc: '2.0'; + id: string; + method: string; + params: DaemonRequest | Record; +} { + const id = req.meta?.requestId ?? createRequestId(); + if (!isLeaseRpcCommand(req.command)) { + return { + jsonrpc: '2.0', + id, + method: 'agent_device.command', + params: req, + }; + } + return { + jsonrpc: '2.0', + id, + method: leaseRpcMethodForCommand(req.command), + params: buildLeaseRpcParams(req, req.command, options), + }; +} + +type LeaseRpcCommand = 'lease_allocate' | 'lease_heartbeat' | 'lease_release'; + +function isLeaseRpcCommand(command: string): command is LeaseRpcCommand { + return ( + command === 'lease_allocate' || command === 'lease_heartbeat' || command === 'lease_release' + ); +} + +function leaseRpcMethodForCommand(command: LeaseRpcCommand): string { + switch (command) { + case 'lease_allocate': + return 'agent_device.lease.allocate'; + case 'lease_heartbeat': + return 'agent_device.lease.heartbeat'; + case 'lease_release': + return 'agent_device.lease.release'; + } +} + +function buildLeaseRpcParams( + req: DaemonRequest, + command: LeaseRpcCommand, + options: { includeTokenParam: boolean }, +): Record { + const common = { + ...(options.includeTokenParam ? { token: req.token } : {}), + session: req.session, + tenantId: req.meta?.tenantId, + runId: req.meta?.runId, + }; + switch (command) { + case 'lease_allocate': + return { + ...common, + ttlMs: req.meta?.leaseTtlMs, + backend: req.meta?.leaseBackend, + }; + case 'lease_heartbeat': + return { + ...common, + leaseId: req.meta?.leaseId, + ttlMs: req.meta?.leaseTtlMs, + }; + case 'lease_release': + return { + ...common, + leaseId: req.meta?.leaseId, + }; + } +} diff --git a/src/daemon-client-timeout.ts b/src/daemon-client-timeout.ts new file mode 100644 index 000000000..31b77a7b6 --- /dev/null +++ b/src/daemon-client-timeout.ts @@ -0,0 +1,111 @@ +import { AppError } from './utils/errors.ts'; +import { runCmdSync } from './utils/exec.ts'; +import { emitDiagnostic } from './utils/diagnostics.ts'; +import { isAgentDeviceDaemonProcess } from './utils/process-identity.ts'; +import { PUBLIC_COMMANDS } from './command-catalog.ts'; +import type { DaemonPaths } from './daemon/config.ts'; +import { + removeDaemonInfo, + removeDaemonLock, + stopDaemonProcessForTakeover, + type DaemonInfo, +} from './daemon-client-metadata.ts'; + +const IOS_RUNNER_XCODEBUILD_KILL_PATTERNS = [ + 'xcodebuild .*AgentDeviceRunnerUITests/RunnerTests/testCommand', + 'xcodebuild .*AgentDeviceRunner\\.env\\.session-', + 'xcodebuild build-for-testing .*ios-runner/AgentDeviceRunner/AgentDeviceRunner\\.xcodeproj', +]; + +export function handleRequestTimeout( + info: DaemonInfo, + statePaths: DaemonPaths, + requestId: string | undefined, + command: string | undefined, + remote: boolean, + timeoutMs: number, +): AppError { + const cleanup = remote ? { terminated: 0 } : cleanupTimedOutIosRunnerBuilds(); + const resetDaemon = !remote && shouldResetDaemonAfterRequestTimeout(command); + const daemonReset = resetDaemon + ? resetDaemonAfterTimeout(info, statePaths) + : { forcedKill: false }; + emitDiagnostic({ + level: 'error', + phase: 'daemon_request_timeout', + data: { + timeoutMs, + requestId, + command, + timedOutRunnerPidsTerminated: cleanup.terminated, + timedOutRunnerCleanupError: cleanup.error, + daemonPidReset: resetDaemon ? info.pid : undefined, + daemonPidForceKilled: resetDaemon ? daemonReset.forcedKill : undefined, + daemonPreservedAfterTimeout: !remote && !resetDaemon, + daemonBaseUrl: info.baseUrl, + }, + }); + return new AppError('COMMAND_FAILED', 'Daemon request timed out', { + timeoutMs, + requestId, + hint: resolveRequestTimeoutHint({ remote, resetDaemon, command }), + }); +} + +export function shouldResetDaemonAfterRequestTimeout(command: string | undefined): boolean { + // Snapshot can block in platform accessibility bridges while the app is crashed or never idle. + // Keep the daemon/session alive so callers can still collect screenshot/perf/log evidence + // and close the session after the runner abort path has been triggered. + return command !== 'snapshot'; +} + +function resolveRequestTimeoutHint(params: { + remote: boolean; + resetDaemon: boolean; + command: string | undefined; +}): string { + const { remote, resetDaemon, command } = params; + if (remote) { + return 'Retry with --debug and verify the remote daemon URL, auth token, and remote host logs.'; + } + if (!resetDaemon) { + const iosPrepareHint = + command === PUBLIC_COMMANDS.snapshot + ? ' If this was the first Apple-platform snapshot on the device, run agent-device prepare ios-runner with the same --platform before snapshot/test so runner startup is handled explicitly.' + : ''; + return `Retry with --debug and check daemon diagnostics logs. The timed-out ${command ?? 'request'} request was canceled and Apple runner work was aborted when detected; the daemon was kept alive so the session can still be closed or inspected.${iosPrepareHint}`; + } + return 'Retry with --debug and check daemon diagnostics logs. Timed-out Apple runner xcodebuild processes were terminated when detected.'; +} + +function cleanupTimedOutIosRunnerBuilds(): { terminated: number; error?: string } { + let terminated = 0; + try { + for (const pattern of IOS_RUNNER_XCODEBUILD_KILL_PATTERNS) { + const result = runCmdSync('pkill', ['-f', pattern], { allowFailure: true }); + if (result.exitCode === 0) terminated += 1; + } + return { terminated }; + } catch (error) { + return { + terminated, + error: error instanceof Error ? error.message : String(error), + }; + } +} + +function resetDaemonAfterTimeout(info: DaemonInfo, paths: DaemonPaths): { forcedKill: boolean } { + let forcedKill = false; + try { + if (isAgentDeviceDaemonProcess(info.pid, info.processStartTime)) { + process.kill(info.pid, 'SIGKILL'); + forcedKill = true; + } + } catch { + void stopDaemonProcessForTakeover(info); + } finally { + removeDaemonInfo(paths.infoPath); + removeDaemonLock(paths.lockPath); + } + return { forcedKill }; +} diff --git a/src/daemon-client-transport.ts b/src/daemon-client-transport.ts new file mode 100644 index 000000000..f4a0c57ae --- /dev/null +++ b/src/daemon-client-transport.ts @@ -0,0 +1,385 @@ +import net from 'node:net'; +import http from 'node:http'; +import https from 'node:https'; +import { AppError } from './utils/errors.ts'; +import { readNodeHttpResponseBody } from './utils/node-http.ts'; +import type { DaemonRequest, DaemonResponse } from './daemon/types.ts'; +import { emitDiagnostic } from './utils/diagnostics.ts'; +import { resolveDaemonPaths, type DaemonTransportPreference } from './daemon/config.ts'; +import { + readDaemonHttpProgressResponse, + readDaemonSocketProgressResponse, + shouldReadDaemonProgressStream, +} from './daemon-client-progress.ts'; +import { buildHttpRpcPayload, handleDaemonHttpResponseBody } from './daemon-client-rpc.ts'; +import { handleRequestTimeout } from './daemon-client-timeout.ts'; +import { isRemoteDaemon, type DaemonInfo } from './daemon-client-metadata.ts'; + +type ResolvedDaemonTransport = 'socket' | 'http'; + +const LOCAL_DAEMON_HEALTHCHECK_TIMEOUT_MS = 500; +const REMOTE_DAEMON_HEALTHCHECK_TIMEOUT_MS = 3000; + +export async function canConnect( + info: DaemonInfo, + preference: DaemonTransportPreference, +): Promise { + const transport = chooseTransport(info, preference); + if (await canConnectWithTransport(info, transport)) return true; + + const fallback = chooseAutoFallbackTransport(info, preference, transport); + return fallback ? await canConnectWithTransport(info, fallback) : false; +} + +async function canConnectWithTransport( + info: DaemonInfo, + transport: ResolvedDaemonTransport, +): Promise { + return transport === 'http' ? await canConnectHttp(info) : await canConnectSocket(info.port); +} + +export function canConnectSocket(port: number | undefined): Promise { + if (!port) return Promise.resolve(false); + return new Promise((resolve) => { + let settled = false; + const socket = net.createConnection({ host: '127.0.0.1', port }, () => { + finish(true); + }); + const finish = (reachable: boolean) => { + if (settled) return; + settled = true; + socket.destroy(); + resolve(reachable); + }; + socket.setTimeout(LOCAL_DAEMON_HEALTHCHECK_TIMEOUT_MS); + socket.on('timeout', () => { + finish(false); + }); + socket.on('error', () => { + finish(false); + }); + }); +} + +function canConnectHttp(info: DaemonInfo): Promise { + const endpoint = info.baseUrl + ? buildDaemonHttpUrl(info.baseUrl, 'health') + : info.httpPort + ? `http://127.0.0.1:${info.httpPort}/health` + : null; + if (!endpoint) return Promise.resolve(false); + const url = new URL(endpoint); + const transport = url.protocol === 'https:' ? https : http; + const timeoutMs = info.baseUrl + ? REMOTE_DAEMON_HEALTHCHECK_TIMEOUT_MS + : LOCAL_DAEMON_HEALTHCHECK_TIMEOUT_MS; + return new Promise((resolve) => { + const req = transport.request( + { + protocol: url.protocol, + host: url.hostname, + port: url.port, + path: url.pathname + url.search, + method: 'GET', + timeout: timeoutMs, + }, + (res) => { + res.resume(); + resolve((res.statusCode ?? 500) < 500); + }, + ); + req.on('timeout', () => { + req.destroy(); + resolve(false); + }); + req.on('error', () => { + resolve(false); + }); + req.end(); + }); +} + +export async function sendRequest( + info: DaemonInfo, + req: DaemonRequest, + preference: DaemonTransportPreference, + timeoutMs: number | undefined, +): Promise { + const transport = chooseTransport(info, preference); + try { + return await sendRequestWithTransport(info, req, timeoutMs, transport); + } catch (error) { + const fallback = chooseAutoFallbackTransport(info, preference, transport); + if (!fallback || !isSafeAutoTransportFallbackError(error, transport)) throw error; + return await sendRequestWithTransport(info, req, timeoutMs, fallback); + } +} + +async function sendRequestWithTransport( + info: DaemonInfo, + req: DaemonRequest, + timeoutMs: number | undefined, + transport: ResolvedDaemonTransport, +): Promise { + return transport === 'http' + ? await sendHttpRequest(info, req, timeoutMs) + : await sendSocketRequest(info, req, timeoutMs); +} + +function chooseTransport( + info: DaemonInfo, + preference: DaemonTransportPreference, +): ResolvedDaemonTransport { + if (info.baseUrl) { + // Defensive guard: resolveClientSettings rejects this earlier for normal CLI flow. + if (preference === 'socket') { + throw new AppError('COMMAND_FAILED', 'Remote daemon endpoint only supports HTTP transport', { + daemonBaseUrl: info.baseUrl, + }); + } + return 'http'; + } + if (preference === 'http' || preference === 'socket') { + return requireDaemonTransport(info, preference); + } + const autoOrder: ResolvedDaemonTransport[] = + info.transport === 'socket' || info.transport === 'dual' + ? ['socket', 'http'] + : ['http', 'socket']; + const available = autoOrder.find((transport) => hasDaemonTransport(info, transport)); + if (available) return available; + throw new AppError('COMMAND_FAILED', 'Daemon metadata has no reachable transport'); +} + +function hasDaemonTransport(info: DaemonInfo, transport: ResolvedDaemonTransport): boolean { + return transport === 'http' ? Boolean(info.httpPort) : Boolean(info.port); +} + +function chooseAutoFallbackTransport( + info: DaemonInfo, + preference: DaemonTransportPreference, + attempted: ResolvedDaemonTransport, +): ResolvedDaemonTransport | null { + if (preference !== 'auto' || info.baseUrl) return null; + const fallback = attempted === 'socket' ? 'http' : 'socket'; + return hasDaemonTransport(info, fallback) ? fallback : null; +} + +function isSafeAutoTransportFallbackError( + error: unknown, + attempted: ResolvedDaemonTransport, +): boolean { + return ( + attempted === 'socket' && + error instanceof AppError && + error.code === 'COMMAND_FAILED' && + error.message === 'Failed to communicate with daemon' && + error.details?.daemonSocketRequestWritten === false + ); +} + +function requireDaemonTransport( + info: DaemonInfo, + transport: ResolvedDaemonTransport, +): ResolvedDaemonTransport { + if (hasDaemonTransport(info, transport)) return transport; + throw new AppError( + 'COMMAND_FAILED', + transport === 'http' + ? 'Daemon HTTP endpoint is unavailable' + : 'Daemon socket endpoint is unavailable', + ); +} + +function handleTransportError( + err: unknown, + requestId: string | undefined, + remote: boolean, + details: Record = {}, +): AppError { + emitDiagnostic({ + level: 'error', + phase: 'daemon_request_socket_error', + data: { + requestId, + message: err instanceof Error ? (err as Error).message : String(err), + }, + }); + return new AppError( + 'COMMAND_FAILED', + 'Failed to communicate with daemon', + { + ...details, + requestId, + hint: remote + ? 'Retry command. If this persists, verify the remote daemon URL, auth token, and remote host reachability.' + : 'Retry command. If this persists, clean stale daemon metadata and start a fresh session.', + }, + err instanceof Error ? err : undefined, + ); +} + +async function sendSocketRequest( + info: DaemonInfo, + req: DaemonRequest, + timeoutMs: number | undefined, +): Promise { + const port = info.port; + if (!port) throw new AppError('COMMAND_FAILED', 'Daemon socket endpoint is unavailable'); + return new Promise((resolve, reject) => { + let requestWritten = false; + const socket = net.createConnection({ host: '127.0.0.1', port }, () => { + requestWritten = true; + socket.write(`${JSON.stringify(req)}\n`); + }); + const statePaths = resolveDaemonPaths( + req.flags?.stateDir ?? process.env.AGENT_DEVICE_STATE_DIR, + ); + let settled = false; + const timeoutHandle = + typeof timeoutMs === 'number' + ? setTimeout(() => { + settled = true; + socket.destroy(); + reject( + handleRequestTimeout( + info, + statePaths, + req.meta?.requestId, + req.command, + false, + timeoutMs, + ), + ); + }, timeoutMs) + : undefined; + + readDaemonSocketProgressResponse(socket, { + req, + isSettled: () => settled, + clearTimeout: () => { + if (timeoutHandle) clearTimeout(timeoutHandle); + }, + resolve: (response) => { + settled = true; + resolve(response); + }, + reject: (error) => { + settled = true; + reject(error); + }, + }); + + socket.on('error', (err) => { + if (settled) return; + settled = true; + if (timeoutHandle) clearTimeout(timeoutHandle); + reject( + handleTransportError(err, req.meta?.requestId, false, { + daemonSocketRequestWritten: requestWritten, + }), + ); + }); + }); +} + +async function sendHttpRequest( + info: DaemonInfo, + req: DaemonRequest, + timeoutMs: number | undefined, +): Promise { + const rpcUrl = info.baseUrl + ? new URL(buildDaemonHttpUrl(info.baseUrl, 'rpc')) + : info.httpPort + ? new URL(`http://127.0.0.1:${info.httpPort}/rpc`) + : null; + if (!rpcUrl) throw new AppError('COMMAND_FAILED', 'Daemon HTTP endpoint is unavailable'); + const rpcPayload = JSON.stringify(buildHttpRpcPayload(req, { includeTokenParam: !info.baseUrl })); + const headers: Record = { + 'content-type': 'application/json', + 'content-length': Buffer.byteLength(rpcPayload), + }; + if (info.baseUrl && info.token) { + headers.authorization = `Bearer ${info.token}`; + headers['x-agent-device-token'] = info.token; + } + + return await new Promise((resolve, reject) => { + const statePaths = resolveDaemonPaths( + req.flags?.stateDir ?? process.env.AGENT_DEVICE_STATE_DIR, + ); + const transport = rpcUrl.protocol === 'https:' ? https : http; + const request = transport.request( + { + protocol: rpcUrl.protocol, + host: rpcUrl.hostname, + port: rpcUrl.port, + method: 'POST', + path: rpcUrl.pathname + rpcUrl.search, + headers, + }, + (res) => { + if (shouldReadDaemonProgressStream(req, res.headers?.['content-type'])) { + readDaemonHttpProgressResponse(res, { + req, + reject, + clearTimeout: () => { + if (timeoutHandle) clearTimeout(timeoutHandle); + }, + handleResponseBody: (body) => + handleDaemonHttpResponseBody(body, { info, req, resolve, reject }), + }); + return; + } + void readNodeHttpResponseBody(res) + .then((body) => { + if (timeoutHandle) clearTimeout(timeoutHandle); + handleDaemonHttpResponseBody(body, { info, req, resolve, reject }); + }) + .catch((err: unknown) => { + if (timeoutHandle) clearTimeout(timeoutHandle); + reject( + new AppError( + 'COMMAND_FAILED', + 'Failed to read daemon response', + { requestId: req.meta?.requestId }, + err instanceof Error ? err : undefined, + ), + ); + }); + }, + ); + + const remote = isRemoteDaemon(info); + const timeoutHandle = + typeof timeoutMs === 'number' + ? setTimeout(() => { + request.destroy(); + reject( + handleRequestTimeout( + info, + statePaths, + req.meta?.requestId, + req.command, + remote, + timeoutMs, + ), + ); + }, timeoutMs) + : undefined; + + request.on('error', (err) => { + if (timeoutHandle) clearTimeout(timeoutHandle); + reject(handleTransportError(err, req.meta?.requestId, remote)); + }); + + request.write(rpcPayload); + request.end(); + }); +} + +function buildDaemonHttpUrl(baseUrl: string, route: 'health' | 'rpc'): string { + // URL(base, relative) treats a base without trailing slash as a file path, so normalize to a directory-like base. + const normalizedBase = baseUrl.endsWith('/') ? baseUrl : `${baseUrl}/`; + return new URL(route, normalizedBase).toString(); +} diff --git a/src/daemon-client.ts b/src/daemon-client.ts index 0da7bf50e..1e77ef84f 100644 --- a/src/daemon-client.ts +++ b/src/daemon-client.ts @@ -1,39 +1,25 @@ -import net from 'node:net'; -import http from 'node:http'; -import https from 'node:https'; -import fs from 'node:fs'; -import os from 'node:os'; -import path from 'node:path'; -import { sleep } from './utils/timeouts.ts'; -import { AppError, toAppErrorCode } from './utils/errors.ts'; -import { readNodeHttpResponseBody } from './utils/node-http.ts'; import type { DaemonRequest as SharedDaemonRequest, DaemonResponse as SharedDaemonResponse, } from './daemon/types.ts'; -import { runCmdDetached, runCmdSync } from './utils/exec.ts'; -import { findProjectRoot, readVersion } from './utils/version.ts'; import { createRequestId, emitDiagnostic, withDiagnosticTimer } from './utils/diagnostics.ts'; -import { isAgentDeviceDaemonProcess, stopProcessForTakeover } from './utils/process-identity.ts'; -import { - resolveDaemonPaths, - resolveDaemonServerMode, - resolveDaemonTransportPreference, - type DaemonPaths, - type DaemonServerMode, - type DaemonTransportPreference, -} from './daemon/config.ts'; -import { computeDaemonCodeSignature } from './daemon/code-signature.ts'; import { PUBLIC_COMMANDS } from './command-catalog.ts'; -import { shellQuote } from './utils/shell-quote.ts'; +import { prepareRemoteRequestArtifacts } from './daemon-artifacts.ts'; import { - readDaemonHttpProgressResponse, - readDaemonSocketProgressResponse, - shouldReadDaemonProgressStream, -} from './daemon-client-progress.ts'; -import { materializeRemoteArtifacts, prepareRemoteRequestArtifacts } from './daemon-artifacts.ts'; + cleanupDaemonAfterRequest, + ensureDaemon, + resolveClientSettings, +} from './daemon-client-lifecycle.ts'; +import { sendRequest } from './daemon-client-transport.ts'; + export { computeDaemonCodeSignature } from './daemon/code-signature.ts'; export { downloadRemoteArtifact } from './daemon-artifacts.ts'; +export { + cleanupFailedDaemonStartupMetadata, + resolveDaemonStartupHint, +} from './daemon-client-metadata.ts'; +export { canConnectSocket } from './daemon-client-transport.ts'; +export { shouldResetDaemonAfterRequestTimeout } from './daemon-client-timeout.ts'; export type DaemonRequest = SharedDaemonRequest; export type DaemonResponse = SharedDaemonResponse; @@ -59,75 +45,8 @@ export type OpenAppOptions = { meta?: Omit, 'uploadedArtifactId' | 'clientArtifactPaths'>; }; -type DaemonInfo = { - port?: number; - httpPort?: number; - transport?: DaemonServerMode; - token: string; - pid: number; - version?: string; - codeSignature?: string; - processStartTime?: string; - baseUrl?: string; -}; - -type DaemonLockInfo = { - pid: number; - processStartTime?: string; - startedAt?: number; -}; - -type DaemonMetadataState = { - hasInfo: boolean; - hasLock: boolean; -}; - -type DaemonStartupCleanupReason = 'start_error' | 'startup_timeout'; - -type DaemonStartupCleanupResult = { - reason: DaemonStartupCleanupReason; - removedInfo: boolean; - removedLock: boolean; - stoppedInfoProcess: boolean; - stoppedLockProcess: boolean; - retainedInfoProcess?: boolean; - retainedLockProcess?: boolean; - error?: string; -}; - -type DaemonClientSettings = { - paths: DaemonPaths; - transportPreference: DaemonTransportPreference; - serverMode: DaemonServerMode; - ownedStateDir?: boolean; - remoteBaseUrl?: string; - remoteAuthToken?: string; -}; - -type EnsuredDaemon = { - info: DaemonInfo; - startedByClient: boolean; -}; - -type ResolvedDaemonTransport = 'socket' | 'http'; - const REQUEST_TIMEOUT_MS = 90_000; const PREPARE_REQUEST_TIMEOUT_MS = 240_000; -const DAEMON_STARTUP_TIMEOUT_MS = 15_000; -const DAEMON_STARTUP_ATTEMPTS = 2; -const DAEMON_TAKEOVER_TERM_TIMEOUT_MS = 3000; -const DAEMON_TAKEOVER_KILL_TIMEOUT_MS = 1000; -const LOCAL_DAEMON_HEALTHCHECK_TIMEOUT_MS = 500; -const REMOTE_DAEMON_HEALTHCHECK_TIMEOUT_MS = 3000; -const IOS_RUNNER_XCODEBUILD_KILL_PATTERNS = [ - 'xcodebuild .*AgentDeviceRunnerUITests/RunnerTests/testCommand', - 'xcodebuild .*AgentDeviceRunner\\.env\\.session-', - 'xcodebuild build-for-testing .*ios-runner/AgentDeviceRunner/AgentDeviceRunner\\.xcodeproj', -]; -const LOOPBACK_BLOCK_LIST = new net.BlockList(); -LOOPBACK_BLOCK_LIST.addSubnet('127.0.0.0', 8, 'ipv4'); -LOOPBACK_BLOCK_LIST.addAddress('::1', 'ipv6'); -LOOPBACK_BLOCK_LIST.addSubnet('::ffff:127.0.0.0', 104, 'ipv6'); export async function sendToDaemon(req: Omit): Promise { const requestId = req.meta?.requestId ?? createRequestId(); @@ -142,7 +61,7 @@ export async function sendToDaemon(req: Omit): Promise): DaemonClientSettings { - const explicitStateDir = resolveExplicitStateDir(req); - const remote = resolveRemoteClientSettings(req); - const transport = resolveTransportClientSettings(req, remote.remoteBaseUrl); - const ownedStateDir = shouldUseOwnedReplayStateDir(req, explicitStateDir, remote.rawBaseUrl); - const stateDir = ownedStateDir ? createOwnedReplayStateDir() : explicitStateDir; - return { - paths: resolveDaemonPaths(stateDir), - transportPreference: transport.preference, - serverMode: transport.serverMode, - ownedStateDir, - remoteBaseUrl: remote.remoteBaseUrl, - remoteAuthToken: remote.authToken, - }; -} - -function resolveExplicitStateDir(req: Omit): string | undefined { - return req.flags?.stateDir ?? process.env.AGENT_DEVICE_STATE_DIR; -} - -function resolveRemoteClientSettings(req: Omit): { - rawBaseUrl: string | undefined; - remoteBaseUrl?: string; - authToken?: string; -} { - const rawBaseUrl = req.flags?.daemonBaseUrl ?? process.env.AGENT_DEVICE_DAEMON_BASE_URL; - const remoteBaseUrl = resolveRemoteDaemonBaseUrl(rawBaseUrl); - const authToken = req.flags?.daemonAuthToken ?? process.env.AGENT_DEVICE_DAEMON_AUTH_TOKEN; - validateRemoteDaemonTrust(remoteBaseUrl, authToken); - return { rawBaseUrl, remoteBaseUrl, authToken }; -} - -function resolveTransportClientSettings( - req: Omit, - remoteBaseUrl: string | undefined, -): { preference: DaemonTransportPreference; serverMode: DaemonServerMode } { - const rawTransport = req.flags?.daemonTransport ?? process.env.AGENT_DEVICE_DAEMON_TRANSPORT; - const preference = resolveDaemonTransportPreference(rawTransport); - if (remoteBaseUrl && preference === 'socket') { - throw new AppError( - 'INVALID_ARGS', - 'Remote daemon base URL only supports HTTP transport. Remove --daemon-transport socket.', - { daemonBaseUrl: remoteBaseUrl }, - ); - } - const rawServerMode = - req.flags?.daemonServerMode ?? - process.env.AGENT_DEVICE_DAEMON_SERVER_MODE ?? - (rawTransport === 'dual' ? 'dual' : undefined); - return { - preference, - serverMode: resolveDaemonServerMode(rawServerMode), - }; -} - -function shouldUseOwnedReplayStateDir( - req: Omit, - explicitStateDir: string | undefined, - rawRemoteBaseUrl: string | undefined, -): boolean { - return isOneShotReplayCommand(req.command) && !explicitStateDir && !rawRemoteBaseUrl; -} - -function createOwnedReplayStateDir(): string { - return fs.mkdtempSync(path.join(os.tmpdir(), 'agent-device-replay-daemon-')); -} - -async function ensureDaemon(settings: DaemonClientSettings): Promise { - if (settings.remoteBaseUrl) { - return await ensureRemoteDaemon(settings); - } - - const reusable = await readReusableLocalDaemon(settings); - if (reusable) return { info: reusable, startedByClient: false }; - - cleanupStaleDaemonLockIfSafe(settings.paths); - return await startLocalDaemon(settings); -} - -async function ensureRemoteDaemon(settings: DaemonClientSettings): Promise { - const remoteInfo: DaemonInfo = { - transport: 'http', - // Remote mode reuses the auth token as the daemon token so the existing JSON-RPC contract still works. - token: settings.remoteAuthToken ?? '', - pid: 0, - baseUrl: settings.remoteBaseUrl, - }; - if (await canConnect(remoteInfo, 'http')) { - return { info: remoteInfo, startedByClient: false }; - } - throw new AppError('COMMAND_FAILED', 'Remote daemon is unavailable', { - daemonBaseUrl: settings.remoteBaseUrl, - hint: 'Verify AGENT_DEVICE_DAEMON_BASE_URL points to a reachable daemon with GET /health and POST /rpc.', - }); -} - -async function readReusableLocalDaemon(settings: DaemonClientSettings): Promise { - const existing = readDaemonInfo(settings.paths.infoPath); - if (!existing) return null; - - const existingReachable = await canConnect(existing, settings.transportPreference); - if (isReusableDaemonInfo(existing, existingReachable)) return existing; - - emitDaemonTakeoverNotice(existing, existingReachable, settings.paths.baseDir); - await stopDaemonProcessForTakeover(existing); - removeDaemonInfo(settings.paths.infoPath); - return null; -} - -function isReusableDaemonInfo(info: DaemonInfo, reachable: boolean): boolean { - return ( - info.version === readVersion() && - info.codeSignature === resolveLocalDaemonCodeSignature() && - reachable - ); -} - -function emitDaemonTakeoverNotice(info: DaemonInfo, reachable: boolean, stateDir: string): void { - try { - const identity = info.version ? `pid ${info.pid}, v${info.version}` : `pid ${info.pid}`; - const reason = resolveDaemonTakeoverReason(info, reachable); - process.stderr.write(`Replacing daemon (${identity}) in ${stateDir}: ${reason}\n`); - } catch { - // The takeover notice is best effort; never fail the command on stderr issues. - } -} - -function resolveDaemonTakeoverReason(info: DaemonInfo, reachable: boolean): string { - if (info.version !== readVersion()) return `version mismatch (client v${readVersion()})`; - if (info.codeSignature !== resolveLocalDaemonCodeSignature()) return 'code-signature mismatch'; - if (!reachable) return 'unreachable'; - return 'not reusable'; -} - -async function startLocalDaemon(settings: DaemonClientSettings): Promise { - let lockRecoveryCount = 0; - const cleanupResults: DaemonStartupCleanupResult[] = []; - let startError: string | undefined; - for (let attempt = 1; attempt <= DAEMON_STARTUP_ATTEMPTS; attempt += 1) { - try { - await startDaemon(settings); - } catch (error) { - startError = error instanceof Error ? error.message : String(error); - cleanupResults.push(await cleanupFailedDaemonStartupMetadata(settings.paths, 'start_error')); - if (attempt < DAEMON_STARTUP_ATTEMPTS) { - await sleep(150); - continue; - } - break; - } - - const started = await waitForDaemonInfo(DAEMON_STARTUP_TIMEOUT_MS, settings); - if (started) return { info: started, startedByClient: true }; - - if (await recoverDaemonLockHolder(settings.paths)) { - lockRecoveryCount += 1; - continue; - } - - const metadataState = getDaemonMetadataState(settings.paths); - const hasAnotherAttempt = attempt < DAEMON_STARTUP_ATTEMPTS; - const cleanup = await cleanupFailedDaemonStartupMetadata(settings.paths, 'startup_timeout', { - stopLiveProcesses: false, - }); - cleanupResults.push(cleanup); - if (cleanup.retainedInfoProcess || cleanup.retainedLockProcess) { - const extended = await waitForDaemonInfo(DAEMON_STARTUP_TIMEOUT_MS, settings); - if (extended) return { info: extended, startedByClient: true }; - break; - } - if (!hasAnotherAttempt) break; - - // Detached daemon startup can race on busy CI hosts; retry when no metadata exists yet. - if (!metadataState.hasInfo && !metadataState.hasLock) await sleep(150); - } - - const state = getDaemonMetadataState(settings.paths); - throw new AppError('COMMAND_FAILED', 'Failed to start daemon', { - kind: 'daemon_startup_failed', - infoPath: settings.paths.infoPath, - lockPath: settings.paths.lockPath, - startupTimeoutMs: DAEMON_STARTUP_TIMEOUT_MS, - startupAttempts: DAEMON_STARTUP_ATTEMPTS, - lockRecoveryCount, - cleanupResults, - startError, - metadataState: state, - hint: resolveDaemonStartupHint(state, settings.paths), - }); -} - -async function cleanupDaemonAfterRequest( - req: Omit, - daemon: EnsuredDaemon, - settings: DaemonClientSettings, -): Promise { - if ( - !isOneShotReplayCommand(req.command) || - (!daemon.startedByClient && !settings.ownedStateDir) || - isRemoteDaemon(daemon.info) - ) { - return; - } - - const result = { - pid: daemon.info.pid, - removedInfo: false, - removedLock: false, - removedStateDir: false, - error: undefined as string | undefined, - }; - - try { - await stopDaemonProcessForTakeover(daemon.info); - } catch (error) { - result.error = error instanceof Error ? error.message : String(error); - } finally { - const infoExists = fs.existsSync(settings.paths.infoPath); - removeDaemonInfo(settings.paths.infoPath); - result.removedInfo = infoExists && !fs.existsSync(settings.paths.infoPath); - - const lockExists = fs.existsSync(settings.paths.lockPath); - removeDaemonLock(settings.paths.lockPath); - result.removedLock = lockExists && !fs.existsSync(settings.paths.lockPath); - - if (settings.ownedStateDir) { - fs.rmSync(settings.paths.baseDir, { recursive: true, force: true }); - result.removedStateDir = !fs.existsSync(settings.paths.baseDir); - } - } - - emitDiagnostic({ - level: result.error ? 'warn' : 'info', - phase: 'daemon_replay_cleanup', - data: result, - }); -} - -function isOneShotReplayCommand(command: string | undefined): boolean { - return command === PUBLIC_COMMANDS.replay || command === PUBLIC_COMMANDS.test; -} - -async function waitForDaemonInfo( - timeoutMs: number, - settings: DaemonClientSettings, -): Promise { - const start = Date.now(); - while (Date.now() - start < timeoutMs) { - const info = readDaemonInfo(settings.paths.infoPath); - if (info && (await canConnect(info, settings.transportPreference))) return info; - await sleep(100); - } - return null; -} - -async function recoverDaemonLockHolder(paths: DaemonPaths): Promise { - const state = getDaemonMetadataState(paths); - if (!state.hasLock || state.hasInfo) return false; - const lockInfo = readDaemonLockInfo(paths.lockPath); - if (!lockInfo) { - removeDaemonLock(paths.lockPath); - return true; - } - if (!isAgentDeviceDaemonProcess(lockInfo.pid, lockInfo.processStartTime)) { - removeDaemonLock(paths.lockPath); - return true; - } - return false; -} - -async function stopDaemonProcessForTakeover(info: DaemonInfo): Promise { - await stopProcessForTakeover(info.pid, { - termTimeoutMs: DAEMON_TAKEOVER_TERM_TIMEOUT_MS, - killTimeoutMs: DAEMON_TAKEOVER_KILL_TIMEOUT_MS, - expectedStartTime: info.processStartTime, - }); -} - -function readDaemonInfo(infoPath: string): DaemonInfo | null { - const data = readJsonFile(infoPath); - if (!data || typeof data !== 'object') return null; - const parsed = data as Partial; - const token = readRequiredDaemonToken(parsed); - if (!token) return null; - const ports = readDaemonInfoPorts(parsed); - if (!ports) return null; - return { - token, - ...ports, - transport: readDaemonInfoTransport(parsed.transport), - pid: readPositiveInteger(parsed.pid) ?? 0, - version: readOptionalString(parsed.version), - codeSignature: readOptionalString(parsed.codeSignature), - processStartTime: readOptionalString(parsed.processStartTime), - }; -} - -function readRequiredDaemonToken(parsed: Partial): string | null { - return typeof parsed.token === 'string' && parsed.token.length > 0 ? parsed.token : null; -} - -function readDaemonInfoPorts( - parsed: Partial, -): Pick | null { - const port = readPositiveInteger(parsed.port); - const httpPort = readPositiveInteger(parsed.httpPort); - if (port === undefined && httpPort === undefined) return null; - return { port, httpPort }; -} - -function readDaemonInfoTransport(value: unknown): DaemonInfo['transport'] { - return value === 'socket' || value === 'http' || value === 'dual' ? value : undefined; -} - -function readOptionalString(value: unknown): string | undefined { - return typeof value === 'string' ? value : undefined; -} - -function readPositiveInteger(value: unknown): number | undefined { - return Number.isInteger(value) && Number(value) > 0 ? Number(value) : undefined; -} - -function readDaemonLockInfo(lockPath: string): DaemonLockInfo | null { - const data = readJsonFile(lockPath); - if (!data || typeof data !== 'object') return null; - const parsed = data as Partial; - const hasPid = Number.isInteger(parsed.pid) && Number(parsed.pid) > 0; - if (!hasPid) { - return null; - } - return { - pid: Number(parsed.pid), - processStartTime: - typeof parsed.processStartTime === 'string' ? parsed.processStartTime : undefined, - startedAt: typeof parsed.startedAt === 'number' ? parsed.startedAt : undefined, - }; -} - -function removeDaemonInfo(infoPath: string): void { - removeFileIfExists(infoPath); -} - -function removeDaemonLock(lockPath: string): void { - removeFileIfExists(lockPath); -} - -function cleanupStaleDaemonLockIfSafe(paths: DaemonPaths): void { - const state = getDaemonMetadataState(paths); - if (!state.hasLock || state.hasInfo) return; - const lockInfo = readDaemonLockInfo(paths.lockPath); - if (!lockInfo) { - removeDaemonLock(paths.lockPath); - return; - } - if (isAgentDeviceDaemonProcess(lockInfo.pid, lockInfo.processStartTime)) { - return; - } - removeDaemonLock(paths.lockPath); -} - -export async function cleanupFailedDaemonStartupMetadata( - paths: DaemonPaths, - reason: DaemonStartupCleanupReason, - options: { stopLiveProcesses?: boolean } = {}, -): Promise { - const stopLiveProcesses = options.stopLiveProcesses ?? true; - const result: DaemonStartupCleanupResult = { - reason, - removedInfo: false, - removedLock: false, - stoppedInfoProcess: false, - stoppedLockProcess: false, - }; - - try { - const infoExists = fs.existsSync(paths.infoPath); - const info = readDaemonInfo(paths.infoPath); - if (info) { - const liveInfoProcess = isAgentDeviceDaemonProcess(info.pid, info.processStartTime); - if (liveInfoProcess && !stopLiveProcesses) { - result.retainedInfoProcess = true; - } else { - if (liveInfoProcess) { - await stopDaemonProcessForTakeover(info); - result.stoppedInfoProcess = true; - } - removeDaemonInfo(paths.infoPath); - result.removedInfo = true; - } - } else if (infoExists) { - removeDaemonInfo(paths.infoPath); - result.removedInfo = true; - } - - const lockExists = fs.existsSync(paths.lockPath); - const lockInfo = readDaemonLockInfo(paths.lockPath); - if (lockInfo) { - const liveLockProcess = isAgentDeviceDaemonProcess(lockInfo.pid, lockInfo.processStartTime); - if (liveLockProcess && !stopLiveProcesses) { - result.retainedLockProcess = true; - } else { - if (liveLockProcess) { - await stopProcessForTakeover(lockInfo.pid, { - termTimeoutMs: DAEMON_TAKEOVER_TERM_TIMEOUT_MS, - killTimeoutMs: DAEMON_TAKEOVER_KILL_TIMEOUT_MS, - expectedStartTime: lockInfo.processStartTime, - }); - result.stoppedLockProcess = true; - } - removeDaemonLock(paths.lockPath); - result.removedLock = true; - } - } else if (lockExists) { - removeDaemonLock(paths.lockPath); - result.removedLock = true; - } - } catch (error) { - result.error = error instanceof Error ? error.message : String(error); - } - - emitDiagnostic({ - level: result.error ? 'warn' : 'info', - phase: 'daemon_startup_metadata_cleanup', - data: result, - }); - return result; -} - -function getDaemonMetadataState(paths: DaemonPaths): DaemonMetadataState { - return { - hasInfo: fs.existsSync(paths.infoPath), - hasLock: fs.existsSync(paths.lockPath), - }; -} - -function readJsonFile(filePath: string): unknown | null { - if (!fs.existsSync(filePath)) return null; - try { - return JSON.parse(fs.readFileSync(filePath, 'utf8')) as unknown; - } catch { - return null; - } -} - -function removeFileIfExists(filePath: string): void { - try { - if (fs.existsSync(filePath)) fs.unlinkSync(filePath); - } catch { - // Best-effort cleanup only. - } -} - -async function canConnect( - info: DaemonInfo, - preference: DaemonTransportPreference, -): Promise { - const transport = chooseTransport(info, preference); - if (await canConnectWithTransport(info, transport)) return true; - - const fallback = chooseAutoFallbackTransport(info, preference, transport); - return fallback ? await canConnectWithTransport(info, fallback) : false; -} - -async function canConnectWithTransport( - info: DaemonInfo, - transport: ResolvedDaemonTransport, -): Promise { - return transport === 'http' ? await canConnectHttp(info) : await canConnectSocket(info.port); -} - -export function canConnectSocket(port: number | undefined): Promise { - if (!port) return Promise.resolve(false); - return new Promise((resolve) => { - let settled = false; - const socket = net.createConnection({ host: '127.0.0.1', port }, () => { - finish(true); - }); - const finish = (reachable: boolean) => { - if (settled) return; - settled = true; - socket.destroy(); - resolve(reachable); - }; - socket.setTimeout(LOCAL_DAEMON_HEALTHCHECK_TIMEOUT_MS); - socket.on('timeout', () => { - finish(false); - }); - socket.on('error', () => { - finish(false); - }); - }); -} - -function canConnectHttp(info: DaemonInfo): Promise { - const endpoint = info.baseUrl - ? buildDaemonHttpUrl(info.baseUrl, 'health') - : info.httpPort - ? `http://127.0.0.1:${info.httpPort}/health` - : null; - if (!endpoint) return Promise.resolve(false); - const url = new URL(endpoint); - const transport = url.protocol === 'https:' ? https : http; - const timeoutMs = info.baseUrl - ? REMOTE_DAEMON_HEALTHCHECK_TIMEOUT_MS - : LOCAL_DAEMON_HEALTHCHECK_TIMEOUT_MS; - return new Promise((resolve) => { - const req = transport.request( - { - protocol: url.protocol, - host: url.hostname, - port: url.port, - path: url.pathname + url.search, - method: 'GET', - timeout: timeoutMs, - }, - (res) => { - res.resume(); - resolve((res.statusCode ?? 500) < 500); - }, - ); - req.on('timeout', () => { - req.destroy(); - resolve(false); - }); - req.on('error', () => { - resolve(false); - }); - req.end(); - }); -} - -async function startDaemon(settings: DaemonClientSettings): Promise { - const launchSpec = resolveDaemonLaunchSpec(); - const args = launchSpec.useSrc - ? ['--experimental-strip-types', launchSpec.srcPath] - : [launchSpec.distPath]; - const env = { - ...process.env, - AGENT_DEVICE_STATE_DIR: settings.paths.baseDir, - AGENT_DEVICE_DAEMON_SERVER_MODE: settings.serverMode, - }; - - runCmdDetached(process.execPath, args, { env }); -} - -type DaemonLaunchSpec = { - root: string; - distPath: string; - distPaths: string[]; - srcPath: string; - useSrc: boolean; -}; - -function resolveDaemonLaunchSpec(): DaemonLaunchSpec { - const root = findProjectRoot(); - const distPaths = [ - path.join(root, 'dist', 'src', 'internal', 'daemon.js'), - path.join(root, 'dist', 'src', 'daemon.js'), - ]; - const defaultDistPath = distPaths[0]; - if (defaultDistPath === undefined) { - throw new AppError('COMMAND_FAILED', 'Daemon dist path list is empty'); - } - const distPath = distPaths.find((candidate) => fs.existsSync(candidate)) ?? defaultDistPath; - const srcPath = path.join(root, 'src', 'daemon.ts'); - - const hasDist = distPaths.some((candidate) => fs.existsSync(candidate)); - const hasSrc = fs.existsSync(srcPath); - if (!hasDist && !hasSrc) { - throw new AppError('COMMAND_FAILED', 'Daemon entry not found', { distPaths, srcPath }); - } - const runningFromSource = process.execArgv.includes('--experimental-strip-types'); - const useSrc = runningFromSource ? hasSrc : !hasDist && hasSrc; - return { root, distPath, distPaths, srcPath, useSrc }; -} - -function resolveLocalDaemonCodeSignature(): string { - const launchSpec = resolveDaemonLaunchSpec(); - const entryPath = launchSpec.useSrc ? launchSpec.srcPath : launchSpec.distPath; - return computeDaemonCodeSignature(entryPath, launchSpec.root); -} - -async function sendRequest( - info: DaemonInfo, - req: DaemonRequest, - preference: DaemonTransportPreference, - timeoutMs: number | undefined, -): Promise { - const transport = chooseTransport(info, preference); - try { - return await sendRequestWithTransport(info, req, timeoutMs, transport); - } catch (error) { - const fallback = chooseAutoFallbackTransport(info, preference, transport); - if (!fallback || !isSafeAutoTransportFallbackError(error, transport)) throw error; - return await sendRequestWithTransport(info, req, timeoutMs, fallback); - } -} - -async function sendRequestWithTransport( - info: DaemonInfo, - req: DaemonRequest, - timeoutMs: number | undefined, - transport: ResolvedDaemonTransport, -): Promise { - return transport === 'http' - ? await sendHttpRequest(info, req, timeoutMs) - : await sendSocketRequest(info, req, timeoutMs); -} - -function chooseTransport( - info: DaemonInfo, - preference: DaemonTransportPreference, -): ResolvedDaemonTransport { - if (info.baseUrl) { - // Defensive guard: resolveClientSettings rejects this earlier for normal CLI flow. - if (preference === 'socket') { - throw new AppError('COMMAND_FAILED', 'Remote daemon endpoint only supports HTTP transport', { - daemonBaseUrl: info.baseUrl, - }); - } - return 'http'; - } - if (preference === 'http' || preference === 'socket') { - return requireDaemonTransport(info, preference); - } - const autoOrder: ResolvedDaemonTransport[] = - info.transport === 'socket' || info.transport === 'dual' - ? ['socket', 'http'] - : ['http', 'socket']; - const available = autoOrder.find((transport) => hasDaemonTransport(info, transport)); - if (available) return available; - throw new AppError('COMMAND_FAILED', 'Daemon metadata has no reachable transport'); -} - -function hasDaemonTransport(info: DaemonInfo, transport: ResolvedDaemonTransport): boolean { - return transport === 'http' ? Boolean(info.httpPort) : Boolean(info.port); -} - -function chooseAutoFallbackTransport( - info: DaemonInfo, - preference: DaemonTransportPreference, - attempted: ResolvedDaemonTransport, -): ResolvedDaemonTransport | null { - if (preference !== 'auto' || info.baseUrl) return null; - const fallback = attempted === 'socket' ? 'http' : 'socket'; - return hasDaemonTransport(info, fallback) ? fallback : null; -} - -function isSafeAutoTransportFallbackError( - error: unknown, - attempted: ResolvedDaemonTransport, -): boolean { - return ( - attempted === 'socket' && - error instanceof AppError && - error.code === 'COMMAND_FAILED' && - error.message === 'Failed to communicate with daemon' && - error.details?.daemonSocketRequestWritten === false - ); -} - -function requireDaemonTransport( - info: DaemonInfo, - transport: ResolvedDaemonTransport, -): ResolvedDaemonTransport { - if (hasDaemonTransport(info, transport)) return transport; - throw new AppError( - 'COMMAND_FAILED', - transport === 'http' - ? 'Daemon HTTP endpoint is unavailable' - : 'Daemon socket endpoint is unavailable', - ); -} - -function handleRequestTimeout( - info: DaemonInfo, - statePaths: DaemonPaths, - requestId: string | undefined, - command: string | undefined, - remote: boolean, - timeoutMs: number, -): AppError { - const cleanup = remote ? { terminated: 0 } : cleanupTimedOutIosRunnerBuilds(); - const resetDaemon = !remote && shouldResetDaemonAfterRequestTimeout(command); - const daemonReset = resetDaemon - ? resetDaemonAfterTimeout(info, statePaths) - : { forcedKill: false }; - emitDiagnostic({ - level: 'error', - phase: 'daemon_request_timeout', - data: { - timeoutMs, - requestId, - command, - timedOutRunnerPidsTerminated: cleanup.terminated, - timedOutRunnerCleanupError: cleanup.error, - daemonPidReset: resetDaemon ? info.pid : undefined, - daemonPidForceKilled: resetDaemon ? daemonReset.forcedKill : undefined, - daemonPreservedAfterTimeout: !remote && !resetDaemon, - daemonBaseUrl: info.baseUrl, - }, - }); - return new AppError('COMMAND_FAILED', 'Daemon request timed out', { - timeoutMs, - requestId, - hint: resolveRequestTimeoutHint({ remote, resetDaemon, command }), - }); -} - -export function shouldResetDaemonAfterRequestTimeout(command: string | undefined): boolean { - // Snapshot can block in platform accessibility bridges while the app is crashed or never idle. - // Keep the daemon/session alive so callers can still collect screenshot/perf/log evidence - // and close the session after the runner abort path has been triggered. - return command !== 'snapshot'; -} - -function resolveRequestTimeoutHint(params: { - remote: boolean; - resetDaemon: boolean; - command: string | undefined; -}): string { - const { remote, resetDaemon, command } = params; - if (remote) { - return 'Retry with --debug and verify the remote daemon URL, auth token, and remote host logs.'; - } - if (!resetDaemon) { - const iosPrepareHint = - command === PUBLIC_COMMANDS.snapshot - ? ' If this was the first Apple-platform snapshot on the device, run agent-device prepare ios-runner with the same --platform before snapshot/test so runner startup is handled explicitly.' - : ''; - return `Retry with --debug and check daemon diagnostics logs. The timed-out ${command ?? 'request'} request was canceled and Apple runner work was aborted when detected; the daemon was kept alive so the session can still be closed or inspected.${iosPrepareHint}`; - } - return 'Retry with --debug and check daemon diagnostics logs. Timed-out Apple runner xcodebuild processes were terminated when detected.'; -} - -function handleTransportError( - err: unknown, - requestId: string | undefined, - remote: boolean, - details: Record = {}, -): AppError { - emitDiagnostic({ - level: 'error', - phase: 'daemon_request_socket_error', - data: { - requestId, - message: err instanceof Error ? (err as Error).message : String(err), - }, - }); - return new AppError( - 'COMMAND_FAILED', - 'Failed to communicate with daemon', - { - ...details, - requestId, - hint: remote - ? 'Retry command. If this persists, verify the remote daemon URL, auth token, and remote host reachability.' - : 'Retry command. If this persists, clean stale daemon metadata and start a fresh session.', - }, - err instanceof Error ? err : undefined, - ); -} - -async function sendSocketRequest( - info: DaemonInfo, - req: DaemonRequest, - timeoutMs: number | undefined, -): Promise { - const port = info.port; - if (!port) throw new AppError('COMMAND_FAILED', 'Daemon socket endpoint is unavailable'); - return new Promise((resolve, reject) => { - let requestWritten = false; - const socket = net.createConnection({ host: '127.0.0.1', port }, () => { - requestWritten = true; - socket.write(`${JSON.stringify(req)}\n`); - }); - const statePaths = resolveDaemonPaths( - req.flags?.stateDir ?? process.env.AGENT_DEVICE_STATE_DIR, - ); - let settled = false; - const timeoutHandle = - typeof timeoutMs === 'number' - ? setTimeout(() => { - settled = true; - socket.destroy(); - reject( - handleRequestTimeout( - info, - statePaths, - req.meta?.requestId, - req.command, - false, - timeoutMs, - ), - ); - }, timeoutMs) - : undefined; - - readDaemonSocketProgressResponse(socket, { - req, - isSettled: () => settled, - clearTimeout: () => { - if (timeoutHandle) clearTimeout(timeoutHandle); - }, - resolve: (response) => { - settled = true; - resolve(response); - }, - reject: (error) => { - settled = true; - reject(error); - }, - }); - - socket.on('error', (err) => { - if (settled) return; - settled = true; - if (timeoutHandle) clearTimeout(timeoutHandle); - reject( - handleTransportError(err, req.meta?.requestId, false, { - daemonSocketRequestWritten: requestWritten, - }), - ); - }); - }); -} - -async function sendHttpRequest( - info: DaemonInfo, - req: DaemonRequest, - timeoutMs: number | undefined, -): Promise { - const rpcUrl = info.baseUrl - ? new URL(buildDaemonHttpUrl(info.baseUrl, 'rpc')) - : info.httpPort - ? new URL(`http://127.0.0.1:${info.httpPort}/rpc`) - : null; - if (!rpcUrl) throw new AppError('COMMAND_FAILED', 'Daemon HTTP endpoint is unavailable'); - const rpcPayload = JSON.stringify(buildHttpRpcPayload(req, { includeTokenParam: !info.baseUrl })); - const headers: Record = { - 'content-type': 'application/json', - 'content-length': Buffer.byteLength(rpcPayload), - }; - if (info.baseUrl && info.token) { - headers.authorization = `Bearer ${info.token}`; - headers['x-agent-device-token'] = info.token; - } - - return await new Promise((resolve, reject) => { - const statePaths = resolveDaemonPaths( - req.flags?.stateDir ?? process.env.AGENT_DEVICE_STATE_DIR, - ); - const transport = rpcUrl.protocol === 'https:' ? https : http; - const request = transport.request( - { - protocol: rpcUrl.protocol, - host: rpcUrl.hostname, - port: rpcUrl.port, - method: 'POST', - path: rpcUrl.pathname + rpcUrl.search, - headers, - }, - (res) => { - if (shouldReadDaemonProgressStream(req, res.headers?.['content-type'])) { - readDaemonHttpProgressResponse(res, { - req, - reject, - clearTimeout: () => { - if (timeoutHandle) clearTimeout(timeoutHandle); - }, - handleResponseBody: (body) => - handleDaemonHttpResponseBody(body, { info, req, resolve, reject }), - }); - return; - } - void readNodeHttpResponseBody(res) - .then((body) => { - if (timeoutHandle) clearTimeout(timeoutHandle); - handleDaemonHttpResponseBody(body, { info, req, resolve, reject }); - }) - .catch((err: unknown) => { - if (timeoutHandle) clearTimeout(timeoutHandle); - reject( - new AppError( - 'COMMAND_FAILED', - 'Failed to read daemon response', - { requestId: req.meta?.requestId }, - err instanceof Error ? err : undefined, - ), - ); - }); - }, - ); - - const remote = isRemoteDaemon(info); - const timeoutHandle = - typeof timeoutMs === 'number' - ? setTimeout(() => { - request.destroy(); - reject( - handleRequestTimeout( - info, - statePaths, - req.meta?.requestId, - req.command, - remote, - timeoutMs, - ), - ); - }, timeoutMs) - : undefined; - - request.on('error', (err) => { - if (timeoutHandle) clearTimeout(timeoutHandle); - reject(handleTransportError(err, req.meta?.requestId, remote)); - }); - - request.write(rpcPayload); - request.end(); - }); -} - -function handleDaemonHttpResponseBody( - body: string, - options: { - info: DaemonInfo; - req: DaemonRequest; - resolve: (response: DaemonResponse | PromiseLike) => void; - reject: (error: unknown) => void; - }, -): void { - const { info, req, resolve, reject } = options; - try { - const parsed = parseDaemonHttpResponseBody(body); - if (parsed.error) { - reject(toDaemonHttpRpcError(parsed.error, req.meta?.requestId)); - return; - } - if (!parsed.result || typeof parsed.result !== 'object') { - reject( - new AppError('COMMAND_FAILED', 'Invalid daemon RPC response', { - requestId: req.meta?.requestId, - }), - ); - return; - } - void resolveDaemonHttpResult(info, req, parsed.result, resolve, reject); - } catch (err) { - reject( - new AppError( - 'COMMAND_FAILED', - 'Invalid daemon response', - { - requestId: req.meta?.requestId, - line: body, - }, - err instanceof Error ? err : undefined, - ), - ); - } -} - -function parseDaemonHttpResponseBody(body: string): { - result?: DaemonResponse; - error?: { message?: string; data?: Record }; -} { - return JSON.parse(body) as { - result?: DaemonResponse; - error?: { message?: string; data?: Record }; - }; -} - -function toDaemonHttpRpcError( - error: { message?: string; data?: Record }, - requestId: string | undefined, -): AppError { - const data = error.data ?? {}; - return new AppError( - toAppErrorCode(data.code != null ? String(data.code) : undefined, 'COMMAND_FAILED'), - String(data.message ?? error.message ?? 'Daemon RPC request failed'), - { - ...(typeof data.details === 'object' && data.details ? data.details : {}), - hint: typeof data.hint === 'string' ? data.hint : undefined, - diagnosticId: typeof data.diagnosticId === 'string' ? data.diagnosticId : undefined, - logPath: typeof data.logPath === 'string' ? data.logPath : undefined, - requestId, - }, - ); -} - -async function resolveDaemonHttpResult( - info: DaemonInfo, - req: DaemonRequest, - result: DaemonResponse, - resolve: (response: DaemonResponse | PromiseLike) => void, - reject: (error: unknown) => void, -): Promise { - try { - resolve( - info.baseUrl && result.ok ? await materializeRemoteArtifacts(info, req, result) : result, - ); - } catch (error) { - reject(error); - } -} - -function buildHttpRpcPayload( - req: DaemonRequest, - options: { includeTokenParam: boolean }, -): { - jsonrpc: '2.0'; - id: string; - method: string; - params: DaemonRequest | Record; -} { - const id = req.meta?.requestId ?? createRequestId(); - if (!isLeaseRpcCommand(req.command)) { - return { - jsonrpc: '2.0', - id, - method: 'agent_device.command', - params: req, - }; - } - return { - jsonrpc: '2.0', - id, - method: leaseRpcMethodForCommand(req.command), - params: buildLeaseRpcParams(req, req.command, options), - }; -} - -type LeaseRpcCommand = 'lease_allocate' | 'lease_heartbeat' | 'lease_release'; - -function isLeaseRpcCommand(command: string): command is LeaseRpcCommand { - return ( - command === 'lease_allocate' || command === 'lease_heartbeat' || command === 'lease_release' - ); -} - -function leaseRpcMethodForCommand(command: LeaseRpcCommand): string { - switch (command) { - case 'lease_allocate': - return 'agent_device.lease.allocate'; - case 'lease_heartbeat': - return 'agent_device.lease.heartbeat'; - case 'lease_release': - return 'agent_device.lease.release'; - } -} - -function buildLeaseRpcParams( - req: DaemonRequest, - command: LeaseRpcCommand, - options: { includeTokenParam: boolean }, -): Record { - const common = { - ...(options.includeTokenParam ? { token: req.token } : {}), - session: req.session, - tenantId: req.meta?.tenantId, - runId: req.meta?.runId, - }; - switch (command) { - case 'lease_allocate': - return { - ...common, - ttlMs: req.meta?.leaseTtlMs, - backend: req.meta?.leaseBackend, - }; - case 'lease_heartbeat': - return { - ...common, - leaseId: req.meta?.leaseId, - ttlMs: req.meta?.leaseTtlMs, - }; - case 'lease_release': - return { - ...common, - leaseId: req.meta?.leaseId, - }; - } -} - -function cleanupTimedOutIosRunnerBuilds(): { terminated: number; error?: string } { - let terminated = 0; - try { - for (const pattern of IOS_RUNNER_XCODEBUILD_KILL_PATTERNS) { - const result = runCmdSync('pkill', ['-f', pattern], { allowFailure: true }); - if (result.exitCode === 0) terminated += 1; - } - return { terminated }; - } catch (error) { - return { - terminated, - error: error instanceof Error ? error.message : String(error), - }; - } -} - -function resetDaemonAfterTimeout(info: DaemonInfo, paths: DaemonPaths): { forcedKill: boolean } { - let forcedKill = false; - try { - if (isAgentDeviceDaemonProcess(info.pid, info.processStartTime)) { - process.kill(info.pid, 'SIGKILL'); - forcedKill = true; - } - } catch { - void stopProcessForTakeover(info.pid, { - termTimeoutMs: DAEMON_TAKEOVER_TERM_TIMEOUT_MS, - killTimeoutMs: DAEMON_TAKEOVER_KILL_TIMEOUT_MS, - expectedStartTime: info.processStartTime, - }); - } finally { - removeDaemonInfo(paths.infoPath); - removeDaemonLock(paths.lockPath); - } - return { forcedKill }; -} - -function isRemoteDaemon(info: DaemonInfo): boolean { - return typeof info.baseUrl === 'string' && info.baseUrl.length > 0; -} - -function resolveRemoteDaemonBaseUrl(raw: string | undefined): string | undefined { - if (!raw) return undefined; - let parsed: URL; - try { - parsed = new URL(raw); - } catch (error) { - throw new AppError( - 'INVALID_ARGS', - 'Invalid daemon base URL', - { - daemonBaseUrl: raw, - }, - error instanceof Error ? error : undefined, - ); - } - if (parsed.protocol !== 'http:' && parsed.protocol !== 'https:') { - throw new AppError('INVALID_ARGS', 'Daemon base URL must use http or https', { - daemonBaseUrl: raw, - }); - } - return parsed.toString().replace(/\/+$/, ''); -} - -function validateRemoteDaemonTrust( - remoteBaseUrl: string | undefined, - remoteAuthToken: string | undefined, -): void { - if (!remoteBaseUrl) return; - const hostname = new URL(remoteBaseUrl).hostname; - if (isLoopbackHostname(hostname)) return; - if (typeof remoteAuthToken === 'string' && remoteAuthToken.trim().length > 0) return; - throw new AppError( - 'INVALID_ARGS', - 'Remote daemon base URL for non-loopback hosts requires daemon authentication', - { - daemonBaseUrl: remoteBaseUrl, - hint: 'Provide --daemon-auth-token or AGENT_DEVICE_DAEMON_AUTH_TOKEN when using a non-loopback remote daemon URL.', - }, - ); -} - -function isLoopbackHostname(hostname: string): boolean { - const normalized = hostname - .trim() - .toLowerCase() - .replace(/^\[(.*)\]$/, '$1'); - if (normalized === 'localhost') return true; - if (net.isIPv4(normalized)) return LOOPBACK_BLOCK_LIST.check(normalized, 'ipv4'); - if (net.isIPv6(normalized)) return LOOPBACK_BLOCK_LIST.check(normalized, 'ipv6'); - return false; -} - -function buildDaemonHttpUrl(baseUrl: string, route: 'health' | 'rpc'): string { - // URL(base, relative) treats a base without trailing slash as a file path, so normalize to a directory-like base. - const normalizedBase = baseUrl.endsWith('/') ? baseUrl : `${baseUrl}/`; - return new URL(route, normalizedBase).toString(); -} - -export function resolveDaemonStartupHint( - state: { hasInfo: boolean; hasLock: boolean }, - paths: Pick = resolveDaemonPaths( - process.env.AGENT_DEVICE_STATE_DIR, - ), -): string { - const cleanupCommand = buildDaemonMetadataCleanupCommand(paths); - if (state.hasLock && !state.hasInfo) { - return `agent-device attempted to clean stale daemon metadata automatically, but ${paths.lockPath} still exists without ${paths.infoPath}. Retry with --debug; if this persists after confirming no agent-device daemon process is running, run: ${cleanupCommand}`; - } - if (state.hasLock && state.hasInfo) { - return `agent-device attempted to clean stale daemon metadata automatically, but ${paths.infoPath} and ${paths.lockPath} still remain. Retry with --debug; if this persists after confirming no agent-device daemon process is running, run: ${cleanupCommand}`; - } - if (state.hasInfo) { - return `agent-device did not observe reachable daemon metadata after retrying, and ${paths.infoPath} still remains. Stale metadata was cleaned automatically when safe; retry with --debug. If this persists after confirming no agent-device daemon process is running, run: ${cleanupCommand}`; - } - return `agent-device did not observe reachable daemon metadata after retrying. Stale metadata was cleaned automatically when safe; retry with --debug and check daemon diagnostics logs. If stale metadata returns after confirming no agent-device daemon process is running, run: ${cleanupCommand}`; -} - -function buildDaemonMetadataCleanupCommand(paths: Pick) { - return `rm -f ${shellQuote(paths.infoPath)} ${shellQuote(paths.lockPath)}`; -} From 0b420f250feb09b65b6789497ee7646f94f11234 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Pierzcha=C5=82a?= Date: Fri, 12 Jun 2026 09:05:42 +0200 Subject: [PATCH 2/2] fix: keep daemon timeout cleanup on resolved paths --- src/daemon-client-transport.ts | 20 ++- src/daemon-client.ts | 9 +- .../__tests__/daemon-client-lifecycle.test.ts | 114 +++++++++++++++++- 3 files changed, 128 insertions(+), 15 deletions(-) diff --git a/src/daemon-client-transport.ts b/src/daemon-client-transport.ts index f4a0c57ae..a7cdf4fae 100644 --- a/src/daemon-client-transport.ts +++ b/src/daemon-client-transport.ts @@ -5,7 +5,7 @@ import { AppError } from './utils/errors.ts'; import { readNodeHttpResponseBody } from './utils/node-http.ts'; import type { DaemonRequest, DaemonResponse } from './daemon/types.ts'; import { emitDiagnostic } from './utils/diagnostics.ts'; -import { resolveDaemonPaths, type DaemonTransportPreference } from './daemon/config.ts'; +import type { DaemonPaths, DaemonTransportPreference } from './daemon/config.ts'; import { readDaemonHttpProgressResponse, readDaemonSocketProgressResponse, @@ -103,27 +103,29 @@ export async function sendRequest( info: DaemonInfo, req: DaemonRequest, preference: DaemonTransportPreference, + statePaths: DaemonPaths, timeoutMs: number | undefined, ): Promise { const transport = chooseTransport(info, preference); try { - return await sendRequestWithTransport(info, req, timeoutMs, transport); + return await sendRequestWithTransport(info, req, statePaths, timeoutMs, transport); } catch (error) { const fallback = chooseAutoFallbackTransport(info, preference, transport); if (!fallback || !isSafeAutoTransportFallbackError(error, transport)) throw error; - return await sendRequestWithTransport(info, req, timeoutMs, fallback); + return await sendRequestWithTransport(info, req, statePaths, timeoutMs, fallback); } } async function sendRequestWithTransport( info: DaemonInfo, req: DaemonRequest, + statePaths: DaemonPaths, timeoutMs: number | undefined, transport: ResolvedDaemonTransport, ): Promise { return transport === 'http' - ? await sendHttpRequest(info, req, timeoutMs) - : await sendSocketRequest(info, req, timeoutMs); + ? await sendHttpRequest(info, req, statePaths, timeoutMs) + : await sendSocketRequest(info, req, statePaths, timeoutMs); } function chooseTransport( @@ -222,6 +224,7 @@ function handleTransportError( async function sendSocketRequest( info: DaemonInfo, req: DaemonRequest, + statePaths: DaemonPaths, timeoutMs: number | undefined, ): Promise { const port = info.port; @@ -232,9 +235,6 @@ async function sendSocketRequest( requestWritten = true; socket.write(`${JSON.stringify(req)}\n`); }); - const statePaths = resolveDaemonPaths( - req.flags?.stateDir ?? process.env.AGENT_DEVICE_STATE_DIR, - ); let settled = false; const timeoutHandle = typeof timeoutMs === 'number' @@ -286,6 +286,7 @@ async function sendSocketRequest( async function sendHttpRequest( info: DaemonInfo, req: DaemonRequest, + statePaths: DaemonPaths, timeoutMs: number | undefined, ): Promise { const rpcUrl = info.baseUrl @@ -305,9 +306,6 @@ async function sendHttpRequest( } return await new Promise((resolve, reject) => { - const statePaths = resolveDaemonPaths( - req.flags?.stateDir ?? process.env.AGENT_DEVICE_STATE_DIR, - ); const transport = rpcUrl.protocol === 'https:' ? https : http; const request = transport.request( { diff --git a/src/daemon-client.ts b/src/daemon-client.ts index 1e77ef84f..c821fca5c 100644 --- a/src/daemon-client.ts +++ b/src/daemon-client.ts @@ -101,7 +101,14 @@ export async function sendToDaemon(req: Omit): Promise await sendRequest(info, request, settings.transportPreference, requestTimeoutMs), + async () => + await sendRequest( + info, + request, + settings.transportPreference, + settings.paths, + requestTimeoutMs, + ), { requestId, command: req.command }, ); } finally { diff --git a/src/utils/__tests__/daemon-client-lifecycle.test.ts b/src/utils/__tests__/daemon-client-lifecycle.test.ts index d0859976c..d55254c6d 100644 --- a/src/utils/__tests__/daemon-client-lifecycle.test.ts +++ b/src/utils/__tests__/daemon-client-lifecycle.test.ts @@ -9,7 +9,11 @@ import { afterEach, test, vi } from 'vitest'; vi.mock('../exec.ts', async (importOriginal) => { const actual = await importOriginal(); - return { ...actual, runCmdDetached: vi.fn() }; + return { + ...actual, + runCmdDetached: vi.fn(), + runCmdSync: vi.fn(() => ({ exitCode: 1, stdout: '', stderr: '' })), + }; }); vi.mock('../timeouts.ts', async (importOriginal) => { @@ -18,14 +22,19 @@ vi.mock('../timeouts.ts', async (importOriginal) => { }); import { resolveDaemonPaths, type DaemonPaths } from '../../daemon/config.ts'; -import { computeDaemonCodeSignature, sendToDaemon } from '../../daemon-client.ts'; +import { + computeDaemonCodeSignature, + sendToDaemon, + type DaemonRequest, +} from '../../daemon-client.ts'; +import { sendRequest } from '../../daemon-client-transport.ts'; import { closeLoopbackServer, listenOnLoopback, supportsLoopbackBind, } from '../../__tests__/test-utils/index.ts'; import { AppError } from '../errors.ts'; -import { runCmdDetached } from '../exec.ts'; +import { runCmdDetached, runCmdSync } from '../exec.ts'; import { readProcessStartTime } from '../process-identity.ts'; import { sleep } from '../timeouts.ts'; import { findProjectRoot, readVersion } from '../version.ts'; @@ -49,10 +58,12 @@ type HttpDaemonFixture = { }; const mockRunCmdDetached = vi.mocked(runCmdDetached); +const mockRunCmdSync = vi.mocked(runCmdSync); const mockSleep = vi.mocked(sleep); afterEach(() => { mockRunCmdDetached.mockReset(); + mockRunCmdSync.mockClear(); mockSleep.mockClear(); vi.unstubAllEnvs(); }); @@ -147,6 +158,37 @@ async function startHttpDaemonFixture( return { server, port, seenPaths, rpcRequests }; } +async function startHangingHttpDaemonFixture(): Promise { + const seenPaths: string[] = []; + const rpcRequests: Record[] = []; + const server = http.createServer((req, res) => { + const url = new URL(req.url || '/', 'http://127.0.0.1'); + seenPaths.push(`${req.method ?? 'GET'} ${url.pathname}`); + + if (req.method === 'GET' && url.pathname === '/health') { + res.writeHead(200); + res.end('ok'); + return; + } + + if (req.method === 'POST' && url.pathname === '/rpc') { + const chunks: Buffer[] = []; + req.on('data', (chunk) => { + chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)); + }); + req.on('end', () => { + rpcRequests.push(JSON.parse(Buffer.concat(chunks).toString('utf8')) as Record); + }); + return; + } + + res.writeHead(404); + res.end('not found'); + }); + const port = await listenOnLoopback(server); + return { server, port, seenPaths, rpcRequests }; +} + function installSpawnedHttpDaemon(paths: DaemonPaths, httpPort: number): void { mockRunCmdDetached.mockImplementation((_command, _args, options) => { assert.equal(options?.env?.AGENT_DEVICE_STATE_DIR, paths.baseDir); @@ -476,6 +518,72 @@ function captureStderr(): { read: () => string; restore: () => void } { }; } +test('sendRequest timeout cleanup uses resolved daemon paths instead of request flags', async (t) => { + if (!(await supportsLoopbackBind())) { + t.skip('loopback listeners are not permitted in this environment'); + return; + } + + const daemonStateDir = makeTempStateDir('agent-device-daemon-timeout-active-'); + const requestFlagStateDir = makeTempStateDir('agent-device-daemon-timeout-request-'); + const daemonPaths = resolveDaemonPaths(daemonStateDir); + const requestFlagPaths = resolveDaemonPaths(requestFlagStateDir); + const daemon = await startHangingHttpDaemonFixture(); + writeDaemonInfo(daemonPaths, { + httpPort: daemon.port, + transport: 'http', + pid: 999_999, + }); + writeDaemonLock(daemonPaths, { pid: 999_999 }); + writeDaemonInfo(requestFlagPaths, { + httpPort: daemon.port, + transport: 'http', + pid: 999_998, + }); + writeDaemonLock(requestFlagPaths, { pid: 999_998 }); + + const request: DaemonRequest = { + session: 'default', + command: 'replay', + positionals: [], + flags: { stateDir: requestFlagStateDir, daemonTransport: 'http' }, + token: 'local-secret', + meta: { requestId: 'req-timeout-paths' }, + }; + + try { + let thrown: unknown; + try { + await sendRequest( + { + token: 'local-secret', + pid: 999_999, + httpPort: daemon.port, + transport: 'http', + }, + request, + 'http', + daemonPaths, + 50, + ); + } catch (error) { + thrown = error; + } + + assert.ok(thrown instanceof AppError); + assert.equal(thrown.message, 'Daemon request timed out'); + assert.deepEqual(daemon.seenPaths, ['POST /rpc']); + assert.equal(fs.existsSync(daemonPaths.infoPath), false); + assert.equal(fs.existsSync(daemonPaths.lockPath), false); + assert.equal(fs.existsSync(requestFlagPaths.infoPath), true); + assert.equal(fs.existsSync(requestFlagPaths.lockPath), true); + } finally { + await closeLoopbackServer(daemon.server); + fs.rmSync(daemonStateDir, { recursive: true, force: true }); + fs.rmSync(requestFlagStateDir, { recursive: true, force: true }); + } +}); + test('sendToDaemon falls back from failed socket transport to HTTP using daemon metadata ports', async (t) => { if (!(await supportsLoopbackBind())) { t.skip('loopback listeners are not permitted in this environment');