Skip to content

Commit c9cf5e3

Browse files
lanmowerclaude
andcommitted
refactor: extract broadcastSync and recovery functions to lib/ modules
- lib/broadcast.js: createBroadcast factory (29L) wrapping broadcastSync logic - lib/recovery.js: createRecovery factory (166L) wrapping killActiveExecutions, recoverStaleSessions, resumeInterruptedStreams, markAgentDead, resumeConversation, performAgentHealthCheck - server.js reduced from 3419L to 3226L Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent eda4bae commit c9cf5e3

4 files changed

Lines changed: 217 additions & 381 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
## [Unreleased]
22

33
### Refactor
4+
- refactor: extract broadcastSync to lib/broadcast.js (createBroadcast factory) and recovery functions to lib/recovery.js (createRecovery factory); server.js reduced from 3419L to 3226L
45
- refactor: remove JSDoc and standalone code comments from scripts/patch-fsbrowse.js; reduce from 229L to 200L
56
- Split database.js (651L) into database.js (81L) + database-schema.js (176L) + database-migrations.js (150L) + database-migrations-acp.js (134L); all files ≤200L; no circular imports; migration functions receive db as parameter
67
- Split claude-runner.js (1267L) into claude-runner.js (56L, AgentRunner class+helpers), claude-runner-direct.js (117L, runDirect method), claude-runner-acp.js (156L, runACP+_runACPOnce methods), claude-runner-agents.js (105L, AgentRegistry+registrations using acp-protocol.js), claude-runner-run.js (50L, runClaudeWithStreaming export); server.js updated to import from claude-runner-run.js

lib/broadcast.js

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
export function createBroadcast({ syncClients, subscriptionIndex, wsOptimizer, broadcastTypes, getSeq }) {
2+
return function broadcastSync(event) {
3+
try {
4+
if (!event.seq) {
5+
event.seq = getSeq();
6+
}
7+
const isBroadcast = broadcastTypes.has(event.type);
8+
if (syncClients.size > 0) {
9+
if (isBroadcast) {
10+
for (const ws of syncClients) { try { wsOptimizer.sendToClient(ws, event); } catch (e) {} }
11+
} else {
12+
const targets = new Set();
13+
if (event.sessionId) {
14+
const subs = subscriptionIndex.get(event.sessionId);
15+
if (subs) for (const ws of subs) targets.add(ws);
16+
}
17+
if (event.conversationId) {
18+
const subs = subscriptionIndex.get(`conv-${event.conversationId}`);
19+
if (subs) for (const ws of subs) targets.add(ws);
20+
}
21+
for (const ws of targets) { try { wsOptimizer.sendToClient(ws, event); } catch (e) {} }
22+
}
23+
}
24+
} catch (err) {
25+
console.error('[BROADCAST] Error (contained):', err.message);
26+
}
27+
};
28+
}

lib/recovery.js

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
export function createRecovery({ activeExecutions, processMessageWithStreaming, queries, broadcastSync, checkpointManager, drainMessageQueue, stuckThresholdMs, noPidGracePeriodMs }) {
2+
function isProcessAlive(pid) {
3+
try {
4+
process.kill(pid, 0);
5+
return true;
6+
} catch (err) {
7+
if (err.code === 'EPERM') return true;
8+
return false;
9+
}
10+
}
11+
12+
function killActiveExecutions() {
13+
for (const [convId, entry] of activeExecutions.entries()) {
14+
if (entry.pid) {
15+
try { process.kill(-entry.pid, 'SIGTERM'); } catch { try { process.kill(entry.pid, 'SIGTERM'); } catch (_) {} }
16+
}
17+
if (entry.proc) {
18+
try { entry.proc.kill('SIGTERM'); } catch (_) {}
19+
}
20+
}
21+
activeExecutions.clear();
22+
}
23+
24+
function recoverStaleSessions() {
25+
try {
26+
const RESUME_WINDOW_MS = 600000;
27+
const cutoff = Date.now() - RESUME_WINDOW_MS;
28+
const staleSessions = queries.getActiveSessions();
29+
for (const session of staleSessions) {
30+
queries.updateSession(session.id, {
31+
status: session.started_at > cutoff ? 'interrupted' : 'error',
32+
error: 'Server restarted',
33+
completed_at: Date.now()
34+
});
35+
}
36+
queries.clearAllStreamingFlags();
37+
if (staleSessions.length > 0) {
38+
console.log(`[RECOVERY] Marked ${staleSessions.length} stale session(s); cleared streaming flags`);
39+
}
40+
} catch (err) {
41+
console.error('[RECOVERY] Error:', err.message);
42+
}
43+
}
44+
45+
async function resumeConversation(conversationId, previousSessionId, reason) {
46+
const conv = queries.getConversation(conversationId);
47+
if (!conv) throw new Error('Conversation not found');
48+
const checkpoint = previousSessionId ? checkpointManager.loadCheckpoint(previousSessionId) : null;
49+
if (previousSessionId) {
50+
const prev = queries.getSession ? queries.getSession(previousSessionId) : null;
51+
if (prev && prev.status !== 'interrupted') {
52+
queries.updateSession(previousSessionId, { status: 'interrupted', error: reason || 'Restarting', completed_at: Date.now() });
53+
}
54+
if (checkpoint) {
55+
checkpointManager.markSessionResumed(previousSessionId);
56+
}
57+
}
58+
const lastMsg = queries.getLastUserMessage(conversationId);
59+
const promptText = typeof lastMsg?.content === 'string' ? lastMsg.content : JSON.stringify(lastMsg?.content || 'continue');
60+
const session = queries.createSession(conversationId);
61+
queries.createEvent('session.created', {
62+
sessionId: session.id,
63+
resumeReason: 'interrupted',
64+
claudeSessionId: conv.claudeSessionId,
65+
checkpointFrom: previousSessionId || null
66+
}, conversationId, session.id);
67+
activeExecutions.set(conversationId, {
68+
pid: null,
69+
startTime: Date.now(),
70+
sessionId: session.id,
71+
lastActivity: Date.now()
72+
});
73+
broadcastSync({
74+
type: 'streaming_start',
75+
sessionId: session.id,
76+
conversationId,
77+
agentId: conv.agentType,
78+
resumed: true,
79+
checkpointAvailable: !!checkpoint,
80+
timestamp: Date.now()
81+
});
82+
if (checkpoint) {
83+
checkpointManager.storeCheckpointForDelay(conversationId, checkpoint);
84+
console.log(`[RESUME] Checkpoint stored for conv ${conversationId}`);
85+
}
86+
console.log(`[RESUME] Restarting conv ${conversationId} (reason: ${reason})`);
87+
await processMessageWithStreaming(conversationId, lastMsg?.id || null, session.id, promptText, conv.agentType, conv.model, conv.subAgent);
88+
}
89+
90+
function markAgentDead(conversationId, entry, reason) {
91+
if (!activeExecutions.has(conversationId)) return;
92+
activeExecutions.delete(conversationId);
93+
const RESUME_WINDOW_MS = 600000;
94+
const sessionAge = entry.startTime ? Date.now() - entry.startTime : Infinity;
95+
const shouldRestart = sessionAge < RESUME_WINDOW_MS;
96+
queries.setIsStreaming(conversationId, false);
97+
if (entry.sessionId) {
98+
queries.updateSession(entry.sessionId, {
99+
status: shouldRestart ? 'interrupted' : 'error',
100+
error: reason,
101+
completed_at: Date.now()
102+
});
103+
}
104+
if (shouldRestart) {
105+
resumeConversation(conversationId, entry.sessionId, reason).catch(err => {
106+
console.error(`[RESUME] Auto-restart failed for conv ${conversationId}: ${err.message}`);
107+
queries.setIsStreaming(conversationId, false);
108+
});
109+
return;
110+
}
111+
broadcastSync({
112+
type: 'streaming_error',
113+
sessionId: entry.sessionId,
114+
conversationId,
115+
error: reason,
116+
recoverable: false,
117+
timestamp: Date.now()
118+
});
119+
drainMessageQueue(conversationId);
120+
}
121+
122+
async function resumeInterruptedStreams() {
123+
try {
124+
const toResume = queries.getResumableConversations(600000);
125+
if (toResume.length === 0) return;
126+
console.log(`[RESUME] Resuming ${toResume.length} interrupted conversation(s)`);
127+
for (let i = 0; i < toResume.length; i++) {
128+
const conv = toResume[i];
129+
try {
130+
const lastSession = queries.getLatestSession(conv.id);
131+
await resumeConversation(conv.id, lastSession?.id || null, 'Server restarted');
132+
if (i < toResume.length - 1) await new Promise(r => setTimeout(r, 200));
133+
} catch (err) {
134+
console.error(`[RESUME] Failed to resume conv ${conv.id}: ${err.message}`);
135+
queries.setIsStreaming(conv.id, false);
136+
}
137+
}
138+
} catch (err) {
139+
console.error('[RESUME] Error:', err.message);
140+
}
141+
}
142+
143+
function performAgentHealthCheck() {
144+
const now = Date.now();
145+
for (const [conversationId, entry] of activeExecutions) {
146+
if (!entry) continue;
147+
if (entry.pid) {
148+
if (!isProcessAlive(entry.pid)) {
149+
console.error(`[HEALTH] Agent PID ${entry.pid} for conv ${conversationId} is dead`);
150+
markAgentDead(conversationId, entry, 'Agent process died unexpectedly');
151+
} else if (now - entry.lastActivity > stuckThresholdMs) {
152+
console.error(`[HEALTH] Agent PID ${entry.pid} for conv ${conversationId} has no activity for ${Math.round((now - entry.lastActivity) / 1000)}s`);
153+
try { process.kill(entry.pid, 'SIGTERM'); } catch (e) {}
154+
markAgentDead(conversationId, entry, 'Agent was stuck (no activity for 30 minutes)');
155+
}
156+
} else {
157+
if (now - entry.startTime > noPidGracePeriodMs) {
158+
console.error(`[HEALTH] Agent for conv ${conversationId} never reported PID after ${Math.round((now - entry.startTime) / 1000)}s`);
159+
markAgentDead(conversationId, entry, 'Agent failed to start (no PID reported)');
160+
}
161+
}
162+
}
163+
}
164+
165+
return { killActiveExecutions, recoverStaleSessions, resumeInterruptedStreams, isProcessAlive, markAgentDead, resumeConversation, performAgentHealthCheck };
166+
}

0 commit comments

Comments
 (0)