From d3d6e5af039c5c921a2c4835ef62cd48b6c624d4 Mon Sep 17 00:00:00 2001 From: "Huncho.Dev" Date: Sun, 28 Jun 2026 04:28:33 +0000 Subject: [PATCH 1/2] #221 Presence recovery after gateway restart FIXED --- .../__tests__/presence.reconciliation.test.ts | 165 ++++++++++++++++++ apps/backend/src/index.ts | 54 +++++- apps/backend/src/routes/conversations.ts | 7 +- apps/backend/src/routes/treasury.ts | 3 +- apps/backend/src/services/presence.ts | 79 +++++++++ 5 files changed, 303 insertions(+), 5 deletions(-) create mode 100644 apps/backend/src/__tests__/presence.reconciliation.test.ts diff --git a/apps/backend/src/__tests__/presence.reconciliation.test.ts b/apps/backend/src/__tests__/presence.reconciliation.test.ts new file mode 100644 index 0000000..08f7649 --- /dev/null +++ b/apps/backend/src/__tests__/presence.reconciliation.test.ts @@ -0,0 +1,165 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest'; +import { reconcileBoot, cleanupStaleSockets, setOffline } from '../services/presence.js'; + +// ── DB mock ──────────────────────────────────────────────────────────────── +const { mockFindMany } = vi.hoisted(() => ({ + mockFindMany: vi.fn(), +})); + +vi.mock('../db/index.js', () => ({ + db: { + query: { + conversationMembers: { findMany: mockFindMany }, + }, + }, +})); + +vi.mock('../db/schema.js', () => ({ + conversationMembers: { + userId: 'userId', + conversationId: 'conversationId', + }, +})); + +vi.mock('drizzle-orm', () => ({ + eq: vi.fn((col: unknown, val: unknown) => ({ col, val })), +})); + +// ── Redis & Socket mock ──────────────────────────────────────────────────── + +describe('Presence Reconciliation & Gateway Boot (#...)', () => { + let mockRedis: any; + let mockIo: any; + let mockSocketsJoin: any; + let mockFetchSockets: any; + + beforeEach(() => { + vi.clearAllMocks(); + + mockSocketsJoin = vi.fn(); + mockFetchSockets = vi.fn().mockResolvedValue([]); + + mockIo = { + in: vi.fn((sid: string) => ({ + socketsJoin: mockSocketsJoin, + fetchSockets: () => mockFetchSockets(sid), + })), + }; + + mockRedis = { + scan: vi.fn(), + keys: vi.fn(), + smembers: vi.fn(), + srem: vi.fn(), + scard: vi.fn(), + del: vi.fn(), + }; + }); + + describe('reconcileBoot', () => { + it('rebuilds room subscriptions from active Redis socket mappings on boot', async () => { + // redis.scan returns presence keys + mockRedis.scan + .mockResolvedValueOnce(['10', ['presence:user-1', 'presence:user-2']]) + .mockResolvedValueOnce(['0', []]); + + mockRedis.smembers.mockImplementation(async (key: string) => { + if (key === 'presence:user-1') return ['socket-1a', 'socket-1b']; + if (key === 'presence:user-2') return ['socket-2a']; + return []; + }); + + mockFindMany.mockImplementation(async ({ where }: any) => { + if (where.val === 'user-1') { + return [{ conversationId: 'room-alpha' }, { conversationId: 'room-beta' }]; + } + if (where.val === 'user-2') { + return [{ conversationId: 'room-gamma' }]; + } + return []; + }); + + await reconcileBoot(mockIo as any, mockRedis as any); + + expect(mockRedis.scan).toHaveBeenCalledTimes(2); + expect(mockFindMany).toHaveBeenCalledTimes(2); + + // user-1 sockets joined room-alpha & room-beta + expect(mockIo.in).toHaveBeenCalledWith('socket-1a'); + expect(mockIo.in).toHaveBeenCalledWith('socket-1b'); + expect(mockIo.in).toHaveBeenCalledWith('socket-2a'); + expect(mockSocketsJoin).toHaveBeenCalledWith('room-alpha'); + expect(mockSocketsJoin).toHaveBeenCalledWith('room-beta'); + expect(mockSocketsJoin).toHaveBeenCalledWith('room-gamma'); + }); + + it('falls back to redis.keys if redis.scan throws', async () => { + mockRedis.scan.mockRejectedValue(new Error('scan not supported')); + mockRedis.keys.mockResolvedValue(['presence:user-3']); + mockRedis.smembers.mockResolvedValue(['socket-3a']); + mockFindMany.mockResolvedValue([{ conversationId: 'room-delta' }]); + + await reconcileBoot(mockIo as any, mockRedis as any); + + expect(mockRedis.keys).toHaveBeenCalledWith('presence:*'); + expect(mockSocketsJoin).toHaveBeenCalledWith('room-delta'); + }); + }); + + describe('cleanupStaleSockets', () => { + it('removes stale socket IDs from Redis presence set and deletes empty sets', async () => { + mockRedis.smembers.mockResolvedValue(['socket-dead', 'socket-alive']); + + mockFetchSockets.mockImplementation(async (sid: string) => { + if (sid === 'socket-alive') return [{ id: 'socket-alive' }]; // still connected + return []; // dead socket + }); + + mockRedis.scard.mockResolvedValue(1); + + await cleanupStaleSockets(mockIo as any, mockRedis as any, 'user-1'); + + expect(mockRedis.srem).toHaveBeenCalledWith('presence:user-1', 'socket-dead'); + expect(mockRedis.srem).not.toHaveBeenCalledWith('presence:user-1', 'socket-alive'); + expect(mockRedis.del).not.toHaveBeenCalled(); + }); + + it('deletes presence key if all sockets were stale and removed', async () => { + mockRedis.smembers.mockResolvedValue(['socket-dead-1']); + mockFetchSockets.mockResolvedValue([]); // dead socket + mockRedis.scard.mockResolvedValue(0); + + await cleanupStaleSockets(mockIo as any, mockRedis as any, 'user-2'); + + expect(mockRedis.srem).toHaveBeenCalledWith('presence:user-2', 'socket-dead-1'); + expect(mockRedis.del).toHaveBeenCalledWith('presence:user-2'); + }); + + it('ignores activeSocketId if passed', async () => { + mockRedis.smembers.mockResolvedValue(['socket-new']); + + await cleanupStaleSockets(mockIo as any, mockRedis as any, 'user-3', 'socket-new'); + + expect(mockFetchSockets).not.toHaveBeenCalled(); + expect(mockRedis.srem).not.toHaveBeenCalled(); + }); + }); + + describe('setOffline', () => { + it('removes socket ID and returns true when no sockets remain', async () => { + mockRedis.scard.mockResolvedValue(0); + const offline = await setOffline(mockRedis as any, 'user-1', 'socket-1'); + expect(mockRedis.srem).toHaveBeenCalledWith('presence:user-1', 'socket-1'); + expect(mockRedis.del).toHaveBeenCalledWith('presence:user-1'); + expect(offline).toBe(true); + }); + + it('returns false when surviving connections remain', async () => { + mockRedis.scard.mockResolvedValue(1); + const offline = await setOffline(mockRedis as any, 'user-1', 'socket-1'); + expect(mockRedis.srem).toHaveBeenCalledWith('presence:user-1', 'socket-1'); + expect(mockRedis.del).not.toHaveBeenCalled(); + expect(offline).toBe(false); + }); + }); +}); diff --git a/apps/backend/src/index.ts b/apps/backend/src/index.ts index f8d60b7..ea17bd0 100644 --- a/apps/backend/src/index.ts +++ b/apps/backend/src/index.ts @@ -11,7 +11,13 @@ import { registerMessagingHandlers } from './socket/messaging.js'; import { app } from './app.js'; import { redis as appRedis } from './lib/redis.js'; import { setSocketServer } from './lib/socket.js'; -import { setOnline, setOffline, refreshPresence } from './services/presence.js'; +import { + setOnline, + setOffline, + refreshPresence, + reconcileBoot, + cleanupStaleSockets, +} from './services/presence.js'; import { buildRpcFetcher, buildTreasuryRpcFetcher, @@ -30,6 +36,27 @@ const io = new Server(httpServer, { cors: { origin: '*' }, }); +let isShuttingDown = false; + +const handleShutdown = () => { + isShuttingDown = true; +}; + +process.on('SIGTERM', handleShutdown); +process.on('SIGINT', handleShutdown); + +const origIoClose = io.close.bind(io); +io.close = ((fn?: () => void) => { + isShuttingDown = true; + return origIoClose(fn); +}) as typeof io.close; + +const origHttpClose = httpServer.close.bind(httpServer); +httpServer.close = ((fn?: (err?: Error) => void) => { + isShuttingDown = true; + return origHttpClose(fn); +}) as typeof httpServer.close; + setSocketServer(io); io.use(socketAuthMiddleware); @@ -49,6 +76,7 @@ io.on('connection', async (socket: AuthSocket) => { } if (appRedis) { + await cleanupStaleSockets(io, appRedis, userId, socket.id); await setOnline(appRedis, userId, socket.id); for (const m of memberships) { io.to(m.conversationId).emit('user_online', { userId }); @@ -58,15 +86,24 @@ io.on('connection', async (socket: AuthSocket) => { socket.on('heartbeat', async () => { if (appRedis) { + await cleanupStaleSockets(io, appRedis, userId, socket.id); await refreshPresence(appRedis, userId); } }); registerMessagingHandlers(io, socket); - socket.on('disconnect', async () => { - console.log('User disconnected:', userId); + socket.on('disconnect', async (reason: string) => { + console.log('User disconnected:', userId, reason); + if ( + isShuttingDown || + reason === 'server shutting down' || + reason === 'server namespace disconnect' + ) { + return; + } if (appRedis) { + await cleanupStaleSockets(io, appRedis, userId, socket.id); const fullyOffline = await setOffline(appRedis, userId, socket.id); if (fullyOffline) { const memberships = await db.query.conversationMembers.findMany({ @@ -111,6 +148,15 @@ async function attachRedisAdapter(): Promise { const message = err instanceof Error ? err.message : String(err); console.warn(`[socket.io] Redis unavailable (${message}) — running in single-instance mode`); await Promise.allSettled([pubClient.quit(), subClient.quit()]); + } finally { + if (appRedis) { + try { + await reconcileBoot(io, appRedis); + console.log('[presence] Boot reconciliation complete'); + } catch (err) { + console.warn('[presence] Boot reconciliation failed:', err); + } + } } } @@ -149,3 +195,5 @@ if (stellarRpcUrl && tokenTransferContractId) { '[stellar-listener] STELLAR_RPC_URL or TOKEN_TRANSFER_CONTRACT_ID unset; listener disabled.', ); } + +export { httpServer, io }; diff --git a/apps/backend/src/routes/conversations.ts b/apps/backend/src/routes/conversations.ts index 822070f..8b9a0d0 100644 --- a/apps/backend/src/routes/conversations.ts +++ b/apps/backend/src/routes/conversations.ts @@ -95,7 +95,12 @@ conversationsRouter.get('/', async (req: AuthRequest, res) => { with: { conversation: conversationRelations as never, }, - })) as unknown as Array<{ conversationId: string; conversation: ConversationPayload }>; + })) as unknown as Array<{ + conversationId: string; + isMuted: boolean; + isArchived: boolean; + conversation: ConversationPayload; + }>; // Single subquery for message counts — no N+1 const conversationIds = memberships.map((m) => m.conversationId); diff --git a/apps/backend/src/routes/treasury.ts b/apps/backend/src/routes/treasury.ts index 660f768..e66acc5 100644 --- a/apps/backend/src/routes/treasury.ts +++ b/apps/backend/src/routes/treasury.ts @@ -1,9 +1,10 @@ import { Router } from 'express'; +import type { IRouter } from 'express'; import { z } from 'zod'; import { requireAuth, type AuthRequest } from '../middleware/auth.js'; import { validate } from '../middleware/validate.js'; -export const treasuryRouter = Router(); +export const treasuryRouter: IRouter = Router(); treasuryRouter.use(requireAuth); diff --git a/apps/backend/src/services/presence.ts b/apps/backend/src/services/presence.ts index ccda9cd..cf4a250 100644 --- a/apps/backend/src/services/presence.ts +++ b/apps/backend/src/services/presence.ts @@ -10,7 +10,11 @@ * - On disconnect: remove socketId from set, if set empty → user_offline * - GET /users/:id/presence → { online: boolean } */ +import type { Server } from 'socket.io'; import type { Redis } from 'ioredis'; +import { eq } from 'drizzle-orm'; +import { db } from '../db/index.js'; +import { conversationMembers } from '../db/schema.js'; const PRESENCE_TTL = 60; // seconds @@ -62,3 +66,78 @@ export async function isOnline(redis: Redis, userId: string): Promise { const count = await redis.scard(key); return count > 0; } + +/** + * Remove any socket IDs in the user's presence set that are no longer + * connected anywhere in the Socket.IO cluster. + */ +export async function cleanupStaleSockets( + io: Server, + redis: Redis, + userId: string, + ignoredSocketId?: string, +): Promise { + const key = presenceKey(userId); + const socketIds = await redis.smembers(key); + if (socketIds.length === 0) return; + + await Promise.all( + socketIds.map(async (sid) => { + if (ignoredSocketId && sid === ignoredSocketId) return; + try { + const sockets = await io.in(sid).fetchSockets(); + if (sockets.length === 0) { + await redis.srem(key, sid); + } + } catch (err) { + console.warn(`[presence] Failed to check socket status for ${sid}:`, err); + } + }), + ); + + const remaining = await redis.scard(key); + if (remaining === 0) { + await redis.del(key); + } +} + +/** + * Rebuild room subscriptions from active Redis socket mappings on gateway boot. + */ +export async function reconcileBoot(io: Server, redis: Redis): Promise { + let presenceKeys: string[]; + try { + let cursor = '0'; + presenceKeys = []; + do { + const [nextCursor, keys] = await redis.scan(cursor, 'MATCH', 'presence:*', 'COUNT', 100); + cursor = nextCursor; + presenceKeys.push(...keys); + } while (cursor !== '0'); + } catch { + presenceKeys = await redis.keys('presence:*'); + } + + for (const key of presenceKeys) { + const userId = key.slice('presence:'.length); + if (!userId) continue; + + const socketIds = await redis.smembers(key); + if (socketIds.length === 0) continue; + + try { + const memberships = await db.query.conversationMembers.findMany({ + where: eq(conversationMembers.userId, userId), + columns: { conversationId: true }, + }); + + for (const socketId of socketIds) { + for (const m of memberships) { + io.in(socketId).socketsJoin(m.conversationId); + } + } + } catch (err) { + console.warn(`[presence] Failed to rebuild subscriptions for ${userId}:`, err); + } + } +} From f23f00c5adf23d7856d9bba74348285de0e0917f Mon Sep 17 00:00:00 2001 From: "Huncho.Dev" Date: Mon, 29 Jun 2026 04:27:32 +0000 Subject: [PATCH 2/2] =?UTF-8?q?fix:=20resolve=20merge=20conflicts=20in=20i?= =?UTF-8?q?ndex.ts=20and=20treasury.ts=20=E2=80=93=20CI=20green?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/backend/src/index.ts | 121 +++++++++++++++++++++++++--- apps/backend/src/routes/treasury.ts | 4 +- 2 files changed, 113 insertions(+), 12 deletions(-) diff --git a/apps/backend/src/index.ts b/apps/backend/src/index.ts index ea17bd0..195888e 100644 --- a/apps/backend/src/index.ts +++ b/apps/backend/src/index.ts @@ -5,12 +5,13 @@ import { createClient } from 'redis'; import dotenv from 'dotenv'; import { eq } from 'drizzle-orm'; import { db } from './db/index.js'; -import { conversationMembers } from './db/schema.js'; +import { conversationMembers, users } from './db/schema.js'; import { socketAuthMiddleware, type AuthSocket } from './middleware/socketAuth.js'; import { registerMessagingHandlers } from './socket/messaging.js'; import { app } from './app.js'; import { redis as appRedis } from './lib/redis.js'; import { setSocketServer } from './lib/socket.js'; + import { setOnline, setOffline, @@ -18,6 +19,22 @@ import { reconcileBoot, cleanupStaleSockets, } from './services/presence.js'; + +import { startHeartbeatTimer, clearHeartbeatTimer } from './services/heartbeat.js'; +import { + registerDeviceSocket, + unregisterDeviceSocket, + isDeviceRevoked, + startDeviceRevocationListener, +} from './services/deviceRevocation.js'; +import { + checkRateLimit, + checkPayloadSize, + recordViolation, + clearViolations, +} from './services/rateLimit.js'; +import { registerForBackpressure, unregisterForBackpressure } from './services/backpressure.js'; + import { buildRpcFetcher, buildTreasuryRpcFetcher, @@ -63,8 +80,57 @@ io.use(socketAuthMiddleware); io.on('connection', async (socket: AuthSocket) => { const userId = socket.auth!.userId; + const deviceId = socket.auth!.deviceId; console.log('User connected:', userId, socket.id); + // Register socket for device-revocation tracking (cross-instance via Redis pub/sub). + if (appRedis) { + registerDeviceSocket(deviceId, socket.id); + } + + // Start the server-side heartbeat watchdog (90 s timeout). + startHeartbeatTimer(socket, userId, deviceId, appRedis, io); + + // Per-socket middleware: intercept every incoming event before handlers. + const EXCLUDED_EVENTS = new Set(['heartbeat']); + socket.use(async ([event, ...args], next) => { + // Skip internal heartbeat pings. + if (EXCLUDED_EVENTS.has(event)) { + return next(); + } + + // Reject events from a device that was revoked mid-session. + if (isDeviceRevoked(deviceId)) { + socket.emit('error', { event: 'device_revoked', message: 'Device has been revoked' }); + socket.disconnect(true); + return; + } + + // Enforce maximum payload size (configurable via MAX_PAYLOAD_SIZE env). + const payloadArgs = args.filter((a) => typeof a !== 'function'); + const { valid, size } = checkPayloadSize(payloadArgs); + if (!valid) { + socket.emit('error', { + event: 'payload_too_large', + message: `Payload size ${size} exceeds limit`, + }); + return; + } + + // Per-socket rate limiting (configurable via SOCKET_RATE_LIMIT_PER_SEC env). + const { allowed } = await checkRateLimit(appRedis, socket.id); + if (!allowed) { + const violations = recordViolation(socket.id); + socket.emit('error', { event: 'rate_limited', message: 'Rate limit exceeded' }); + if (violations >= 3) { + socket.disconnect(true); + } + return; + } + + next(); + }); + // Auto-join all conversation rooms so the socket receives new_message events // for every conversation the user belongs to (needed for unread badge tracking). const memberships = await db.query.conversationMembers.findMany({ @@ -75,12 +141,20 @@ io.on('connection', async (socket: AuthSocket) => { await socket.join(m.conversationId); } + const user = await db.query.users.findFirst({ + where: eq(users.id, userId), + columns: { presenceVisible: true }, + }); + const presenceVisible = user?.presenceVisible ?? true; + if (appRedis) { await cleanupStaleSockets(io, appRedis, userId, socket.id); await setOnline(appRedis, userId, socket.id); - for (const m of memberships) { - io.to(m.conversationId).emit('user_online', { userId }); - io.to(m.conversationId).emit('presence_update', { userId, online: true }); + if (presenceVisible) { + for (const m of memberships) { + io.to(m.conversationId).emit('user_online', { userId }); + io.to(m.conversationId).emit('presence_update', { userId, online: true }); + } } } @@ -93,8 +167,19 @@ io.on('connection', async (socket: AuthSocket) => { registerMessagingHandlers(io, socket); + // Monitor send-buffer to detect slow/stalled consumers. + registerForBackpressure(socket); + socket.on('disconnect', async (reason: string) => { console.log('User disconnected:', userId, reason); + clearHeartbeatTimer(socket.id); + unregisterDeviceSocket(socket.id); + unregisterForBackpressure(socket); + clearViolations(socket.id); + + // During a gateway restart we must NOT wipe presence — surviving + // devices re-assert via heartbeat and Redis TTLs. This satisfies + // #221: Gateway restart does not drop still-connected users to offline. if ( isShuttingDown || reason === 'server shutting down' || @@ -102,17 +187,26 @@ io.on('connection', async (socket: AuthSocket) => { ) { return; } + if (appRedis) { await cleanupStaleSockets(io, appRedis, userId, socket.id); const fullyOffline = await setOffline(appRedis, userId, socket.id); if (fullyOffline) { - const memberships = await db.query.conversationMembers.findMany({ - where: eq(conversationMembers.userId, userId), - columns: { conversationId: true }, + const user = await db.query.users.findFirst({ + where: eq(users.id, userId), + columns: { presenceVisible: true }, }); - for (const m of memberships) { - io.to(m.conversationId).emit('user_offline', { userId }); - io.to(m.conversationId).emit('presence_update', { userId, online: false }); + const presenceVisible = user?.presenceVisible ?? true; + + if (presenceVisible) { + const memberships = await db.query.conversationMembers.findMany({ + where: eq(conversationMembers.userId, userId), + columns: { conversationId: true }, + }); + for (const m of memberships) { + io.to(m.conversationId).emit('user_offline', { userId }); + io.to(m.conversationId).emit('presence_update', { userId, online: false }); + } } } } @@ -169,6 +263,13 @@ httpServer.listen(PORT, () => { // Redis is unreachable; on failure we fall back to the in-process adapter. void attachRedisAdapter(); +// Subscribe to device_revoked:* channels so any gateway instance can +// disconnect a revoked device's sockets within seconds, even when the +// revocation was issued on a different node. +if (appRedis) { + void startDeviceRevocationListener(appRedis, appRedis); +} + // #46 — Stellar transfer event listener. Only spin up when the contract // id is configured so local-dev and unit-test runs don't try to talk to // Soroban RPC. The listener never throws out of runForever, so a failed diff --git a/apps/backend/src/routes/treasury.ts b/apps/backend/src/routes/treasury.ts index e66acc5..9a9363f 100644 --- a/apps/backend/src/routes/treasury.ts +++ b/apps/backend/src/routes/treasury.ts @@ -1,5 +1,5 @@ -import { Router } from 'express'; -import type { IRouter } from 'express'; +import { Router, type IRouter } from 'express'; + import { z } from 'zod'; import { requireAuth, type AuthRequest } from '../middleware/auth.js'; import { validate } from '../middleware/validate.js';