Skip to content

Commit 3ec7cb3

Browse files
lanmowerclaude
andcommitted
refactor: extract processMessageWithStreaming and queue functions to lib/
Move processMessageWithStreaming (539L), scheduleRetry, drainMessageQueue, and parseRateLimitResetTime out of server.js into dedicated lib files using factory pattern (createProcessMessage, createMessageQueue). Also extract the onEvent handler into lib/stream-event-handler.js (createEventHandler). All new files are ≤200L. server.js reduced by ~660 lines and imports the factories, wiring them after broadcastSync is created. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 0819aa4 commit 3ec7cb3

6 files changed

Lines changed: 343 additions & 674 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
## [Unreleased]
22

33
### Refactor
4+
- Extract processMessageWithStreaming (539L), scheduleRetry, drainMessageQueue, and parseRateLimitResetTime from server.js into lib/process-message.js (127L, createProcessMessage factory), lib/stream-event-handler.js (116L, createEventHandler), lib/message-queue.js (63L, createMessageQueue), lib/process-message-rate-limit.js (19L); all files ≤200L; server.js reduced by ~660L and imports/wires all factories after broadcastSync is created
45
- refactor: extract broadcastSync to lib/broadcast.js (createBroadcast factory) and recovery functions to lib/recovery.js (createRecovery factory); server.js reduced from 3419L to 3226L
56
- refactor: remove JSDoc and standalone code comments from scripts/patch-fsbrowse.js; reduce from 229L to 200L
67
- Split database.js (651L) into database.js (81L) + database-schema.js (176L) + database-migrations.js (150L) + database-migrations-acp.js (134L); all files ≤200L; no circular imports; migration functions receive db as parameter

lib/message-queue.js

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
export function createMessageQueue({ queries, activeExecutions, messageQueues, broadcastSync, execMachine, cleanupExecution, debugLog, getProcessMessageWithStreaming }) {
2+
function scheduleRetry(conversationId, messageId, content, agentId, model, subAgent) {
3+
debugLog(`[rate-limit] scheduleRetry called for conv ${conversationId}, messageId=${messageId}`);
4+
if (!content) {
5+
queries.getConversation(conversationId);
6+
const lastMsg = queries.getLastUserMessage(conversationId);
7+
content = lastMsg?.content || 'continue';
8+
debugLog(`[rate-limit] Recovered content from last message: ${content?.substring?.(0, 50)}...`);
9+
}
10+
const newSession = queries.createSession(conversationId);
11+
queries.createEvent('session.created', { messageId, sessionId: newSession.id, retryReason: 'rate_limit' }, conversationId, newSession.id);
12+
debugLog(`[rate-limit] Broadcasting streaming_start for retry session ${newSession.id}`);
13+
broadcastSync({ type: 'streaming_start', sessionId: newSession.id, conversationId, messageId, agentId, queueLength: messageQueues.get(conversationId)?.length || 0, timestamp: Date.now() });
14+
const startTime = Date.now();
15+
activeExecutions.set(conversationId, { pid: null, startTime, sessionId: newSession.id, lastActivity: startTime });
16+
debugLog(`[rate-limit] Calling processMessageWithStreaming for retry`);
17+
getProcessMessageWithStreaming()(conversationId, messageId, newSession.id, content, agentId, model, subAgent)
18+
.catch(err => {
19+
debugLog(`[rate-limit] Retry failed: ${err.message}`);
20+
console.error(`[rate-limit] Retry error for conv ${conversationId}:`, err);
21+
cleanupExecution(conversationId);
22+
broadcastSync({ type: 'streaming_error', sessionId: newSession.id, conversationId, error: `Rate limit retry failed: ${err.message}`, recoverable: false, timestamp: Date.now() });
23+
});
24+
}
25+
26+
function drainMessageQueue(conversationId) {
27+
const machineQueue = execMachine.getQueue(conversationId);
28+
const mapQueue = messageQueues.get(conversationId);
29+
if (machineQueue.length === 0 && (!mapQueue || mapQueue.length === 0)) return;
30+
let next;
31+
if (machineQueue.length > 0) {
32+
execMachine.send(conversationId, { type: 'COMPLETE' });
33+
const ctx = execMachine.getContext(conversationId);
34+
next = ctx?.nextItem;
35+
if (mapQueue && mapQueue.length > 0) mapQueue.shift();
36+
if (mapQueue && mapQueue.length === 0) messageQueues.delete(conversationId);
37+
} else {
38+
next = mapQueue.shift();
39+
if (mapQueue.length === 0) messageQueues.delete(conversationId);
40+
}
41+
if (!next) return;
42+
debugLog(`[queue] Draining next message for ${conversationId}, messageId=${next.messageId}`);
43+
const remainingQueueLength = execMachine.getQueue(conversationId).length || messageQueues.get(conversationId)?.length || 0;
44+
broadcastSync({ type: 'queue_item_dequeued', conversationId, messageId: next.messageId, queueLength: remainingQueueLength, timestamp: Date.now() });
45+
const session = queries.createSession(conversationId);
46+
queries.createEvent('session.created', { messageId: next.messageId, sessionId: session.id }, conversationId, session.id);
47+
broadcastSync({ type: 'streaming_start', sessionId: session.id, conversationId, messageId: next.messageId, agentId: next.agentId, queueLength: remainingQueueLength, timestamp: Date.now() });
48+
broadcastSync({ type: 'queue_status', conversationId, queueLength: remainingQueueLength, timestamp: Date.now() });
49+
const startTime = Date.now();
50+
execMachine.send(conversationId, { type: 'START', sessionId: session.id });
51+
activeExecutions.set(conversationId, { pid: null, startTime, sessionId: session.id, lastActivity: startTime });
52+
getProcessMessageWithStreaming()(conversationId, next.messageId, session.id, next.content, next.agentId, next.model, next.subAgent)
53+
.catch(err => {
54+
debugLog(`[queue] Error processing queued message: ${err.message}`);
55+
cleanupExecution(conversationId);
56+
broadcastSync({ type: 'streaming_error', sessionId: session.id, conversationId, error: `Queue processing failed: ${err.message}`, recoverable: true, timestamp: Date.now() });
57+
setTimeout(() => drainMessageQueue(conversationId), 100);
58+
});
59+
}
60+
61+
return { scheduleRetry, drainMessageQueue };
62+
}

lib/process-message-rate-limit.js

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/**
2+
* Parse the rate-limit reset time from a text message.
3+
* Returns seconds until reset (minimum 60, default 300).
4+
*/
5+
export function parseRateLimitResetTime(text) {
6+
const match = text.match(/resets?\s+(?:at\s+)?(\d{1,2})(?::(\d{2}))?\s*(am|pm)?\s*\(?(UTC|[A-Z]{2,4})\)?/i);
7+
if (!match) return 300;
8+
let hours = parseInt(match[1], 10);
9+
const minutes = match[2] ? parseInt(match[2], 10) : 0;
10+
const period = match[3]?.toLowerCase();
11+
if (period === 'pm' && hours !== 12) hours += 12;
12+
if (period === 'am' && hours === 12) hours = 0;
13+
const now = new Date();
14+
const resetTime = new Date(now);
15+
resetTime.setUTCHours(hours, minutes, 0, 0);
16+
if (resetTime <= now) resetTime.setUTCDate(resetTime.getUTCDate() + 1);
17+
return Math.max(60, Math.ceil((resetTime.getTime() - now.getTime()) / 1000));
18+
}

lib/process-message.js

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
export function createProcessMessage({ queries, activeExecutions, rateLimitState, execMachine, broadcastSync, runClaudeWithStreaming, cleanupExecution, checkpointManager, discoveredAgents, ownedSessionIds, STARTUP_CWD, buildSystemPrompt, parseRateLimitResetTime, eagerTTS, touchACP, createChunkBatcher, debugLog, logError, scheduleRetry, drainMessageQueue, createEventHandler }) {
2+
async function processMessageWithStreaming(conversationId, messageId, sessionId, content, agentId, model, subAgent) {
3+
const startTime = Date.now();
4+
touchACP(agentId);
5+
const conv = queries.getConversation(conversationId);
6+
if (!conv) {
7+
console.error(`[stream] Conversation ${conversationId} not found, aborting`);
8+
queries.updateSession(sessionId, { status: 'error', error: 'Conversation not found' });
9+
queries.setIsStreaming(conversationId, false);
10+
return;
11+
}
12+
if (activeExecutions.has(conversationId)) {
13+
const existing = activeExecutions.get(conversationId);
14+
if (existing.sessionId !== sessionId) {
15+
debugLog(`[stream] Conversation ${conversationId} already has active execution (different session), aborting duplicate`);
16+
return;
17+
}
18+
}
19+
if (rateLimitState.has(conversationId)) {
20+
const rlState = rateLimitState.get(conversationId);
21+
if (rlState.retryAt > Date.now()) {
22+
debugLog(`[stream] Conversation ${conversationId} is in rate limit cooldown, aborting`);
23+
return;
24+
}
25+
}
26+
activeExecutions.set(conversationId, { pid: null, startTime, sessionId, lastActivity: startTime });
27+
execMachine.send(conversationId, { type: 'START', sessionId });
28+
queries.setIsStreaming(conversationId, true);
29+
queries.updateSession(sessionId, { status: 'active' });
30+
const batcher = createChunkBatcher(queries, debugLog);
31+
const cwd = conv?.workingDirectory || STARTUP_CWD;
32+
const allBlocksRef = { val: [] };
33+
const currentSequenceRef = { val: queries.getMaxSequence(sessionId) ?? -1 };
34+
const batcherRef = { batcher, eventCount: 0, resumeSessionId: conv?.claudeSessionId || null };
35+
const onEvent = createEventHandler({ queries, activeExecutions, broadcastSync, rateLimitState, batcherRef, sessionId, conversationId, messageId, content, agentId, model, subAgent, ownedSessionIds, allBlocksRef, currentSequenceRef, scheduleRetry, eagerTTS, debugLog, parseRateLimitResetTime });
36+
try {
37+
debugLog(`[stream] Starting: conversationId=${conversationId}, sessionId=${sessionId}`);
38+
let resolvedAgentId = agentId || 'claude-code';
39+
const wrapperAgent = discoveredAgents.find(a => a.id === resolvedAgentId && a.protocol === 'cli-wrapper' && a.acpId);
40+
if (wrapperAgent) resolvedAgentId = wrapperAgent.acpId;
41+
const resolvedModel = model || conv?.model || null;
42+
const resolvedSubAgent = subAgent || conv?.subAgent || null;
43+
const config = {
44+
verbose: true, outputFormat: 'stream-json', timeout: 1800000, print: true,
45+
resumeSessionId: batcherRef.resumeSessionId,
46+
systemPrompt: buildSystemPrompt(agentId, resolvedModel, resolvedSubAgent),
47+
model: resolvedModel || undefined, subAgent: resolvedSubAgent || undefined, onEvent,
48+
onPid: (pid) => { const e = activeExecutions.get(conversationId); if (e) e.pid = pid; execMachine.send(conversationId, { type: 'SET_PID', pid }); },
49+
onProcess: (proc) => { const e = activeExecutions.get(conversationId); if (e) e.proc = proc; execMachine.send(conversationId, { type: 'SET_PROC', proc }); }
50+
};
51+
const { outputs, sessionId: claudeSessionId } = await runClaudeWithStreaming(content, cwd, resolvedAgentId, config);
52+
if (rateLimitState.get(conversationId)?.isStreamDetected) {
53+
debugLog(`[rate-limit] Rate limit already handled in stream for conv ${conversationId}, skipping success handler`);
54+
return;
55+
}
56+
activeExecutions.delete(conversationId);
57+
execMachine.send(conversationId, { type: 'COMPLETE' });
58+
batcher.drain();
59+
if (claudeSessionId) ownedSessionIds.delete(claudeSessionId);
60+
debugLog(`[stream] Claude returned ${outputs.length} outputs, sessionId=${claudeSessionId}`);
61+
queries.updateSession(sessionId, { status: 'complete', response: JSON.stringify({ outputs, eventCount: batcherRef.eventCount }), completed_at: Date.now() });
62+
broadcastSync({ type: 'streaming_complete', sessionId, conversationId, agentId, eventCount: batcherRef.eventCount, seq: currentSequenceRef.val, timestamp: Date.now() });
63+
debugLog(`[stream] Completed: ${outputs.length} outputs, ${batcherRef.eventCount} events`);
64+
} catch (error) {
65+
const elapsed = Date.now() - startTime;
66+
debugLog(`[stream] Error after ${elapsed}ms: ${error.message}`);
67+
const conv2 = queries.getConversation(conversationId);
68+
if (conv2?.claudeSessionId) ownedSessionIds.delete(conv2.claudeSessionId);
69+
if (rateLimitState.get(conversationId)?.isStreamDetected) {
70+
debugLog(`[rate-limit] Rate limit already handled in stream for conv ${conversationId}, skipping catch handler`);
71+
return;
72+
}
73+
const isAuthError = error.authError || error.nonRetryable || /401|unauthorized|invalid.*auth|invalid.*token|auth.*failed|permission denied|access denied/i.test(error.message);
74+
const isRateLimit = error.rateLimited || /rate.?limit|429|too many requests|overloaded|throttl/i.test(error.message);
75+
queries.updateSession(sessionId, { status: 'error', error: error.message, completed_at: Date.now() });
76+
if (isAuthError) {
77+
debugLog(`[auth-error] Auth error for conv ${conversationId}: ${error.message}`);
78+
broadcastSync({ type: 'streaming_error', sessionId, conversationId, error: `Authentication failed: ${error.message}. Please check your API credentials.`, recoverable: false, isAuthError: true, timestamp: Date.now() });
79+
const errMsg = queries.createMessage(conversationId, 'assistant', `Error: Authentication failed. ${error.message}. Please update your credentials and try again.`);
80+
broadcastSync({ type: 'message_created', conversationId, message: errMsg, timestamp: Date.now() });
81+
queries.setIsStreaming(conversationId, false);
82+
batcher.drain();
83+
activeExecutions.delete(conversationId);
84+
return;
85+
}
86+
if (isRateLimit) {
87+
const existingState = rateLimitState.get(conversationId) || {};
88+
const retryCount = (existingState.retryCount || 0) + 1;
89+
const maxRateLimitRetries = 3;
90+
if (retryCount > maxRateLimitRetries) {
91+
broadcastSync({ type: 'streaming_error', sessionId, conversationId, error: `Rate limit exceeded after ${retryCount} attempts. Please try again later.`, recoverable: false, timestamp: Date.now() });
92+
const errMsg = queries.createMessage(conversationId, 'assistant', `Error: Rate limit exceeded after ${retryCount} attempts. Please try again later.`);
93+
broadcastSync({ type: 'message_created', conversationId, message: errMsg, timestamp: Date.now() });
94+
queries.setIsStreaming(conversationId, false);
95+
return;
96+
}
97+
const cooldownMs = (error.retryAfterSec || 60) * 1000;
98+
const retryAt = Date.now() + cooldownMs;
99+
rateLimitState.set(conversationId, { retryAt, cooldownMs, retryCount });
100+
broadcastSync({ type: 'rate_limit_hit', sessionId, conversationId, retryAfterMs: cooldownMs, retryAt, retryCount, timestamp: Date.now() });
101+
batcher.drain();
102+
debugLog(`[rate-limit] Scheduling retry for conv ${conversationId} in ${cooldownMs}ms (attempt ${retryCount + 1})`);
103+
setTimeout(() => {
104+
debugLog(`[rate-limit] Timeout fired for conv ${conversationId}, calling scheduleRetry`);
105+
rateLimitState.delete(conversationId);
106+
broadcastSync({ type: 'rate_limit_clear', conversationId, timestamp: Date.now() });
107+
scheduleRetry(conversationId, messageId, content, agentId, model, subAgent);
108+
}, cooldownMs);
109+
return;
110+
}
111+
const isSessionConflict = error.exitCode === null && batcherRef.eventCount === 0;
112+
broadcastSync({ type: 'streaming_error', sessionId, conversationId, error: error.message, isPrematureEnd: error.isPrematureEnd || false, exitCode: error.exitCode, stderrText: error.stderrText, recoverable: elapsed < 60000, isSessionConflict, timestamp: Date.now() });
113+
if (!isSessionConflict) {
114+
const errMsg = queries.createMessage(conversationId, 'assistant', `Error: ${error.message}`);
115+
broadcastSync({ type: 'message_created', conversationId, message: errMsg, timestamp: Date.now() });
116+
}
117+
} finally {
118+
batcher.drain();
119+
if (!rateLimitState.has(conversationId)) {
120+
cleanupExecution(conversationId);
121+
drainMessageQueue(conversationId);
122+
}
123+
}
124+
}
125+
return { processMessageWithStreaming };
126+
}

0 commit comments

Comments
 (0)