Skip to content

Commit 100b93d

Browse files
lanmowerclaude
andcommitted
refactor: extract WebSocket setup and legacy handlers from server.js
Moves wss creation, connection handling, heartbeat, hot-reload watcher, and all legacy WS message handlers (subscribe/unsubscribe/terminal/PM2) into lib/ws-setup.js (77L) and lib/ws-legacy-handlers.js (154L). server.js reduced from 1077L to 748L. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 8aa2154 commit 100b93d

4 files changed

Lines changed: 241 additions & 338 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
## [Unreleased]
22

33
### Refactor
4+
- Extract WebSocket server setup and legacy message handler from server.js to lib/ws-setup.js (77L, createWsSetup factory) and lib/ws-legacy-handlers.js (154L, registerLegacyHandler); server.js reduced from 1077L to 748L; wss.on('connection'), heartbeat interval, hot-reload watcher, subscribe/unsubscribe/terminal/PM2 legacy handlers all moved; unused WebSocketServer, spawn, createRequire imports removed from server.js
45
- Extract express upload + fsbrowse setup from server.js to lib/routes-upload.js (79L) exporting createExpressApp; server.js imports createExpressApp and no longer contains Busboy/fsbrowse/express inline code
56
- Extract maskKey, getProviderConfigs, saveProviderConfig, buildSystemPrompt, PROVIDER_CONFIGS from server.js to lib/provider-config.js (151L); extract logError, makeCleanupExecution, makeGetModelsForAgent, errLogPath from server.js to lib/server-utils.js (61L); server.js imports all via named imports; cleanupExecution wired after broadcastSync; _debugRoutes receives errLogPath
67
- Extract parseBody, acceptsEncoding, compressAndSend, sendJSON from server.js to lib/http-utils.js (43L); server.js imports from new module; zlib import removed from server.js

lib/ws-legacy-handlers.js

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
import { spawn } from 'child_process';
2+
import { createRequire } from 'module';
3+
4+
const _req = createRequire(import.meta.url);
5+
6+
export function registerLegacyHandler(wsRouter, { subscriptionIndex, execMachine, activeExecutions, messageQueues, checkpointManager, queries, pm2Manager, pm2Subscribers, getSeq, sendWs, debugLog }) {
7+
wsRouter.onLegacy((data, ws) => {
8+
try {
9+
if (data.type === 'subscribe') {
10+
if (data.sessionId) {
11+
ws.subscriptions.add(data.sessionId);
12+
if (!subscriptionIndex.has(data.sessionId)) subscriptionIndex.set(data.sessionId, new Set());
13+
subscriptionIndex.get(data.sessionId).add(ws);
14+
}
15+
if (data.conversationId) {
16+
const key = `conv-${data.conversationId}`;
17+
ws.subscriptions.add(key);
18+
if (!subscriptionIndex.has(key)) subscriptionIndex.set(key, new Set());
19+
subscriptionIndex.get(key).add(ws);
20+
}
21+
const subTarget = data.sessionId || data.conversationId;
22+
debugLog(`[WebSocket] Client ${ws.clientId} subscribed to ${subTarget}`);
23+
sendWs(ws, { type: 'subscription_confirmed', sessionId: data.sessionId, conversationId: data.conversationId, timestamp: Date.now() });
24+
if (data.conversationId && execMachine.isActive(data.conversationId)) {
25+
const ctx = execMachine.getContext(data.conversationId);
26+
const execution = activeExecutions.get(data.conversationId);
27+
const sessionId = ctx?.sessionId || execution?.sessionId;
28+
const conv = queries.getConversation(data.conversationId);
29+
const queueLength = execMachine.getQueue(data.conversationId).length || messageQueues.get(data.conversationId)?.length || 0;
30+
sendWs(ws, { type: 'streaming_start', sessionId, conversationId: data.conversationId, agentId: conv?.agentType || conv?.agentId || 'claude-code', queueLength, resumed: true, seq: getSeq(), timestamp: Date.now() });
31+
}
32+
if (data.conversationId && checkpointManager.hasPendingCheckpoint(data.conversationId)) {
33+
const checkpoint = checkpointManager.getPendingCheckpoint(data.conversationId);
34+
if (checkpoint) {
35+
debugLog(`[checkpoint] Injecting ${checkpoint.events.length} events to client for ${data.conversationId}`);
36+
const latestSession = queries.getLatestSession(data.conversationId);
37+
if (latestSession) {
38+
sendWs(ws, { type: 'streaming_resumed', sessionId: latestSession.id, conversationId: data.conversationId, resumeFrom: checkpoint.sessionId, eventCount: checkpoint.events.length, chunkCount: checkpoint.chunks.length, timestamp: Date.now() });
39+
checkpointManager.injectCheckpointEvents(latestSession.id, checkpoint, (evt) => {
40+
sendWs(ws, { ...evt, sessionId: latestSession.id, conversationId: data.conversationId });
41+
});
42+
}
43+
}
44+
}
45+
} else if (data.type === 'unsubscribe') {
46+
if (data.sessionId) {
47+
ws.subscriptions.delete(data.sessionId);
48+
const idx = subscriptionIndex.get(data.sessionId);
49+
if (idx) { idx.delete(ws); if (idx.size === 0) subscriptionIndex.delete(data.sessionId); }
50+
}
51+
if (data.conversationId) {
52+
const key = `conv-${data.conversationId}`;
53+
ws.subscriptions.delete(key);
54+
const idx = subscriptionIndex.get(key);
55+
if (idx) { idx.delete(ws); if (idx.size === 0) subscriptionIndex.delete(key); }
56+
}
57+
debugLog(`[WebSocket] Client ${ws.clientId} unsubscribed from ${data.sessionId || data.conversationId}`);
58+
} else if (data.type === 'get_subscriptions') {
59+
sendWs(ws, { type: 'subscriptions', subscriptions: Array.from(ws.subscriptions), timestamp: Date.now() });
60+
} else if (data.type === 'set_voice') {
61+
ws.ttsVoiceId = data.voiceId || 'default';
62+
} else if (data.type === 'latency_report') {
63+
ws.latencyTier = data.quality || 'good';
64+
ws.latencyAvg = data.avg || 0;
65+
ws.latencyTrend = data.trend || 'stable';
66+
} else if (data.type === 'ping') {
67+
sendWs(ws, { type: 'pong', requestId: data.requestId, timestamp: Date.now() });
68+
} else if (data.type === 'terminal_start') {
69+
if (ws.terminalProc) { try { ws.terminalProc.kill(); } catch (_) {} }
70+
try {
71+
const pty = _req('node-pty');
72+
const shell = process.env.SHELL || '/bin/bash';
73+
const cwd = data.cwd || process.env.STARTUP_CWD || process.env.HOME || '/';
74+
const proc = pty.spawn(shell, [], { name: 'xterm-256color', cols: data.cols || 80, rows: data.rows || 24, cwd, env: { ...process.env, TERM: 'xterm-256color', COLORTERM: 'truecolor' } });
75+
ws.terminalProc = proc;
76+
ws.terminalPty = true;
77+
proc.on('data', (chunk) => { if (ws.readyState === 1) sendWs(ws, { type: 'terminal_output', data: Buffer.from(chunk).toString('base64'), encoding: 'base64' }); });
78+
proc.on('exit', (code) => { if (ws.readyState === 1) sendWs(ws, { type: 'terminal_exit', code }); ws.terminalProc = null; });
79+
proc.on('error', (err) => { console.error('[TERMINAL] PTY error (contained):', err.message); if (ws.readyState === 1) sendWs(ws, { type: 'terminal_exit', code: 1, error: err.message }); ws.terminalProc = null; });
80+
sendWs(ws, { type: 'terminal_started', timestamp: Date.now() });
81+
} catch (_e) {
82+
console.error('[TERMINAL] Failed to spawn PTY, falling back to pipes:', _e.message);
83+
const shell = process.env.SHELL || '/bin/bash';
84+
const cwd = data.cwd || process.env.STARTUP_CWD || process.env.HOME || '/';
85+
const proc = spawn(shell, ['-i'], { cwd, env: { ...process.env, TERM: 'xterm-256color', COLORTERM: 'truecolor' }, stdio: ['pipe', 'pipe', 'pipe'] });
86+
ws.terminalProc = proc;
87+
ws.terminalPty = false;
88+
proc.stdout.on('data', (chunk) => { if (ws.readyState === 1) sendWs(ws, { type: 'terminal_output', data: chunk.toString('base64'), encoding: 'base64' }); });
89+
proc.stderr.on('data', (chunk) => { if (ws.readyState === 1) sendWs(ws, { type: 'terminal_output', data: chunk.toString('base64'), encoding: 'base64' }); });
90+
proc.on('exit', (code) => { if (ws.readyState === 1) sendWs(ws, { type: 'terminal_exit', code }); ws.terminalProc = null; });
91+
proc.on('error', (err) => { console.error('[TERMINAL] Spawn error (contained):', err.message); if (ws.readyState === 1) sendWs(ws, { type: 'terminal_exit', code: 1, error: err.message }); ws.terminalProc = null; });
92+
proc.stdin.on('error', () => {});
93+
proc.stdout.on('error', () => {});
94+
proc.stderr.on('error', () => {});
95+
sendWs(ws, { type: 'terminal_started', timestamp: Date.now() });
96+
}
97+
} else if (data.type === 'terminal_input') {
98+
if (ws.terminalProc) {
99+
try {
100+
const input = Buffer.from(data.data, 'base64');
101+
if (ws.terminalPty) { ws.terminalProc.write(input); }
102+
else if (ws.terminalProc.stdin && ws.terminalProc.stdin.writable) { ws.terminalProc.stdin.write(input); }
103+
} catch (_) {}
104+
}
105+
} else if (data.type === 'terminal_resize') {
106+
if (ws.terminalProc && ws.terminalPty) {
107+
try {
108+
const { cols, rows } = data;
109+
if (cols && rows && typeof ws.terminalProc.resize === 'function') { ws.terminalProc.resize(cols, rows); }
110+
} catch (_) {}
111+
}
112+
} else if (data.type === 'terminal_stop') {
113+
if (ws.terminalProc) { try { ws.terminalProc.kill(); } catch (_) {} ws.terminalProc = null; }
114+
} else if (data.type === 'pm2_list') {
115+
if (!pm2Manager.connected) {
116+
if (ws.readyState === 1) sendWs(ws, { type: 'pm2_unavailable', reason: 'PM2 not connected', timestamp: Date.now() });
117+
} else {
118+
pm2Manager.listProcesses().then(processes => {
119+
if (ws.readyState === 1) {
120+
const hasActive = processes.some(p => ['online', 'launching', 'stopping', 'waiting restart'].includes(p.status));
121+
sendWs(ws, { type: 'pm2_list_response', processes, hasActive });
122+
}
123+
}).catch(() => { if (ws.readyState === 1) sendWs(ws, { type: 'pm2_unavailable', reason: 'list failed', timestamp: Date.now() }); });
124+
}
125+
} else if (data.type === 'pm2_start_monitoring') {
126+
pm2Subscribers.add(ws);
127+
ws.pm2Subscribed = true;
128+
if (!pm2Manager.connected) {
129+
if (ws.readyState === 1) sendWs(ws, { type: 'pm2_unavailable', reason: 'PM2 not connected', timestamp: Date.now() });
130+
} else {
131+
sendWs(ws, { type: 'pm2_monitoring_started' });
132+
}
133+
} else if (data.type === 'pm2_stop_monitoring') {
134+
pm2Subscribers.delete(ws);
135+
ws.pm2Subscribed = false;
136+
sendWs(ws, { type: 'pm2_monitoring_stopped' });
137+
} else if (data.type === 'pm2_start') {
138+
pm2Manager.startProcess(data.name).then(result => { sendWs(ws, { type: 'pm2_start_response', name: data.name, ...result }); });
139+
} else if (data.type === 'pm2_stop') {
140+
pm2Manager.stopProcess(data.name).then(result => { sendWs(ws, { type: 'pm2_stop_response', name: data.name, ...result }); });
141+
} else if (data.type === 'pm2_restart') {
142+
pm2Manager.restartProcess(data.name).then(result => { sendWs(ws, { type: 'pm2_restart_response', name: data.name, ...result }); });
143+
} else if (data.type === 'pm2_delete') {
144+
pm2Manager.deleteProcess(data.name).then(result => { sendWs(ws, { type: 'pm2_delete_response', name: data.name, ...result }); });
145+
} else if (data.type === 'pm2_logs') {
146+
pm2Manager.getLogs(data.name, { lines: data.lines || 100 }).then(result => { sendWs(ws, { type: 'pm2_logs_response', name: data.name, ...result }); });
147+
} else if (data.type === 'pm2_flush_logs') {
148+
pm2Manager.flushLogs(data.name).then(result => { sendWs(ws, { type: 'pm2_flush_logs_response', name: data.name, ...result }); });
149+
} else if (data.type === 'pm2_ping') {
150+
pm2Manager.ping().then(result => { sendWs(ws, { type: 'pm2_ping_response', ...result }); });
151+
}
152+
} catch (err) { console.error('[WS-LEGACY] Handler error (contained):', err.message); }
153+
});
154+
}

lib/ws-setup.js

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
import fs from 'fs';
2+
import path from 'path';
3+
import { WebSocketServer } from 'ws';
4+
import { registerLegacyHandler } from './ws-legacy-handlers.js';
5+
6+
export function createWsSetup(server, { BASE_URL, watch, staticDir, _assetCache, htmlState, sendWs, wsRouter, debugLog, subscriptionIndex, syncClients, pm2Subscribers, wsOptimizer, legacyDeps }) {
7+
const hotReloadClients = [];
8+
9+
const wss = new WebSocketServer({ server, perMessageDeflate: false });
10+
wss.on('error', (err) => { console.error('[WSS] WebSocket server error (contained):', err.message); });
11+
12+
wss.on('connection', (ws, req) => {
13+
const _pwd = process.env.PASSWORD;
14+
if (_pwd) {
15+
const url = new URL(req.url, 'http://localhost');
16+
const token = url.searchParams.get('token');
17+
if (token !== _pwd) { ws.close(4001, 'Unauthorized'); return; }
18+
}
19+
const wsPath = req.url.split('?')[0];
20+
const wsRoute = wsPath.startsWith(BASE_URL) ? wsPath.slice(BASE_URL.length) : wsPath;
21+
if (wsRoute === '/hot-reload') {
22+
hotReloadClients.push(ws);
23+
ws.on('close', () => { const i = hotReloadClients.indexOf(ws); if (i > -1) hotReloadClients.splice(i, 1); });
24+
} else if (wsRoute === '/sync') {
25+
syncClients.add(ws);
26+
ws.isAlive = true;
27+
ws.subscriptions = new Set();
28+
ws.clientId = `client-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
29+
sendWs(ws, { type: 'sync_connected', clientId: ws.clientId, timestamp: Date.now() });
30+
ws.on('error', (err) => { console.error('[WS] Client error (contained):', ws.clientId, err.message); });
31+
ws.on('message', (msg) => { try { wsRouter.onMessage(ws, msg); } catch (e) { console.error('[WS] Message handler error (contained):', e.message); } });
32+
ws.on('pong', () => { ws.isAlive = true; });
33+
ws.on('close', () => {
34+
if (ws.terminalProc) { try { ws.terminalProc.kill(); } catch (_) {} ws.terminalProc = null; }
35+
syncClients.delete(ws);
36+
wsOptimizer.removeClient(ws);
37+
for (const sub of ws.subscriptions) {
38+
const idx = subscriptionIndex.get(sub);
39+
if (idx) { idx.delete(ws); if (idx.size === 0) subscriptionIndex.delete(sub); }
40+
}
41+
if (ws.pm2Subscribed) pm2Subscribers.delete(ws);
42+
debugLog(`[WebSocket] Client ${ws.clientId} disconnected`);
43+
});
44+
}
45+
});
46+
47+
registerLegacyHandler(wsRouter, legacyDeps);
48+
49+
setInterval(() => {
50+
syncClients.forEach(ws => {
51+
if (!ws.isAlive) { syncClients.delete(ws); wsOptimizer.removeClient(ws); return ws.terminate(); }
52+
ws.isAlive = false;
53+
ws.ping();
54+
});
55+
}, 30000);
56+
57+
if (watch) {
58+
const watchedFiles = new Map();
59+
try {
60+
fs.readdirSync(staticDir).forEach(file => {
61+
const fp = path.join(staticDir, file);
62+
if (watchedFiles.has(fp)) return;
63+
fs.watchFile(fp, { interval: 100 }, (curr, prev) => {
64+
if (curr.mtime > prev.mtime) {
65+
_assetCache.clear();
66+
htmlState.cache = null;
67+
htmlState.etag = null;
68+
hotReloadClients.forEach(c => { if (c.readyState === 1) c.send(JSON.stringify({ type: 'reload' })); });
69+
}
70+
});
71+
watchedFiles.set(fp, true);
72+
});
73+
} catch (e) { console.error('Watch error:', e.message); }
74+
}
75+
76+
return { wss, hotReloadClients };
77+
}

0 commit comments

Comments
 (0)