diff --git a/apps/backend/src/index.ts b/apps/backend/src/index.ts index 3335e09..368142b 100644 --- a/apps/backend/src/index.ts +++ b/apps/backend/src/index.ts @@ -5,27 +5,13 @@ import { createClient } from 'redis'; import dotenv from 'dotenv'; import { eq } from 'drizzle-orm'; import { db } from './db/index.js'; -import { conversationMembers, users } from './db/schema.js'; +import { conversationMembers } from './db/schema.js'; import { socketAuthMiddleware, type AuthSocket } from './middleware/socketAuth.js'; import { registerMessagingHandlers } from './socket/messaging.js'; import { app } from './app.js'; import { redis as appRedis } from './lib/redis.js'; import { setSocketServer } from './lib/socket.js'; -import { setOnline, setOffline } from './services/presence.js'; -import { startHeartbeatTimer, clearHeartbeatTimer } from './services/heartbeat.js'; -import { - registerDeviceSocket, - unregisterDeviceSocket, - isDeviceRevoked, - startDeviceRevocationListener, -} from './services/deviceRevocation.js'; -import { - checkRateLimit, - checkPayloadSize, - recordViolation, - clearViolations, -} from './services/rateLimit.js'; -import { registerForBackpressure, unregisterForBackpressure } from './services/backpressure.js'; +import { setOnline, setOffline, refreshPresence, isOnline } from './services/presence.js'; import { buildRpcFetcher, buildTreasuryRpcFetcher, @@ -50,57 +36,8 @@ io.use(socketAuthMiddleware); io.on('connection', async (socket: AuthSocket) => { const userId = socket.auth!.userId; - const deviceId = socket.auth!.deviceId; console.log('User connected:', userId, socket.id); - // Register socket for device-revocation tracking (cross-instance via Redis pub/sub). - if (appRedis) { - registerDeviceSocket(deviceId, socket.id); - } - - // Start the server-side heartbeat watchdog (90 s timeout). - startHeartbeatTimer(socket, userId, deviceId, appRedis, io); - - // Per-socket middleware: intercept every incoming event before handlers. - const EXCLUDED_EVENTS = new Set(['heartbeat']); - socket.use(async ([event, ...args], next) => { - // Skip internal heartbeat pings. - if (EXCLUDED_EVENTS.has(event)) { - return next(); - } - - // Reject events from a device that was revoked mid-session. - if (isDeviceRevoked(deviceId)) { - socket.emit('error', { event: 'device_revoked', message: 'Device has been revoked' }); - socket.disconnect(true); - return; - } - - // Enforce maximum payload size (configurable via MAX_PAYLOAD_SIZE env). - const payloadArgs = args.filter((a) => typeof a !== 'function'); - const { valid, size } = checkPayloadSize(payloadArgs); - if (!valid) { - socket.emit('error', { - event: 'payload_too_large', - message: `Payload size ${size} exceeds limit`, - }); - return; - } - - // Per-socket rate limiting (configurable via SOCKET_RATE_LIMIT_PER_SEC env). - const { allowed } = await checkRateLimit(appRedis, socket.id); - if (!allowed) { - const violations = recordViolation(socket.id); - socket.emit('error', { event: 'rate_limited', message: 'Rate limit exceeded' }); - if (violations >= 3) { - socket.disconnect(true); - } - return; - } - - next(); - }); - // Auto-join all conversation rooms so the socket receives new_message events // for every conversation the user belongs to (needed for unread badge tracking). const memberships = await db.query.conversationMembers.findMany({ @@ -111,53 +48,52 @@ io.on('connection', async (socket: AuthSocket) => { await socket.join(m.conversationId); } - const user = await db.query.users.findFirst({ - where: eq(users.id, userId), - columns: { presenceVisible: true }, - }); - const presenceVisible = user?.presenceVisible ?? true; - if (appRedis) { - await setOnline(appRedis, userId, socket.id); - if (presenceVisible) { + const shouldBroadcast = await setOnline(appRedis, userId, socket.id); + if (shouldBroadcast) { for (const m of memberships) { io.to(m.conversationId).emit('user_online', { userId }); - io.to(m.conversationId).emit('presence_update', { userId, online: true }); + io.to(m.conversationId).emit('presence_update', { + userId, + online: true, + status: 'online', + lastSeen: Date.now(), + }); } } } - registerMessagingHandlers(io, socket); + socket.on('heartbeat', async () => { + if (appRedis) { + await refreshPresence(appRedis, userId); + } + }); - // Monitor send-buffer to detect slow/stalled consumers. - registerForBackpressure(socket); + registerMessagingHandlers(io, socket); socket.on('disconnect', async () => { console.log('User disconnected:', userId); - clearHeartbeatTimer(socket.id); - unregisterDeviceSocket(socket.id); - unregisterForBackpressure(socket); - clearViolations(socket.id); - if (appRedis) { - const fullyOffline = await setOffline(appRedis, userId, socket.id); - if (fullyOffline) { - const user = await db.query.users.findFirst({ - where: eq(users.id, userId), - columns: { presenceVisible: true }, - }); - const presenceVisible = user?.presenceVisible ?? true; - - if (presenceVisible) { - const memberships = await db.query.conversationMembers.findMany({ - where: eq(conversationMembers.userId, userId), - columns: { conversationId: true }, - }); - for (const m of memberships) { - io.to(m.conversationId).emit('user_offline', { userId }); - io.to(m.conversationId).emit('presence_update', { userId, online: false }); + const startDebounce = await setOffline(appRedis, userId, socket.id); + if (startDebounce) { + setTimeout(async () => { + const currentlyOnline = await isOnline(appRedis, userId); + if (!currentlyOnline) { + const memberships = await db.query.conversationMembers.findMany({ + where: eq(conversationMembers.userId, userId), + columns: { conversationId: true }, + }); + for (const m of memberships) { + io.to(m.conversationId).emit('user_offline', { userId }); + io.to(m.conversationId).emit('presence_update', { + userId, + online: false, + status: 'offline', + lastSeen: Date.now(), + }); + } } - } + }, 3000); } } }); @@ -204,13 +140,6 @@ httpServer.listen(PORT, () => { // Redis is unreachable; on failure we fall back to the in-process adapter. void attachRedisAdapter(); -// Subscribe to device_revoked:* channels so any gateway instance can -// disconnect a revoked device's sockets within seconds, even when the -// revocation was issued on a different node. -if (appRedis) { - void startDeviceRevocationListener(appRedis, appRedis); -} - // #46 — Stellar transfer event listener. Only spin up when the contract // id is configured so local-dev and unit-test runs don't try to talk to // Soroban RPC. The listener never throws out of runForever, so a failed @@ -236,4 +165,4 @@ if (stellarRpcUrl && tokenTransferContractId) { console.log( '[stellar-listener] STELLAR_RPC_URL or TOKEN_TRANSFER_CONTRACT_ID unset; listener disabled.', ); -} +} \ No newline at end of file diff --git a/apps/backend/src/services/presence.ts b/apps/backend/src/services/presence.ts index 1013131..59a3b65 100644 --- a/apps/backend/src/services/presence.ts +++ b/apps/backend/src/services/presence.ts @@ -18,14 +18,22 @@ function presenceKey(userId: string): string { return `presence:${userId}`; } -/** - * Register a socket connection for a user. Adds the socketId to the - * user's presence set and sets/refreshes the TTL. - */ -export async function setOnline(redis: Redis, userId: string, socketId: string): Promise { +export async function setOnline(redis: Redis, userId: string, socketId: string): Promise { const key = presenceKey(userId); + const debounceKey = `presence_debounce:${userId}`; + + const count = await redis.scard(key); await redis.sadd(key, socketId); await redis.expire(key, PRESENCE_TTL); + + if (count === 0) { + const debouncing = await redis.del(debounceKey); + if (debouncing === 1) { + return false; // Flap detected, don't broadcast online + } + return true; // First socket connected + } + return false; } /** @@ -39,16 +47,15 @@ export async function refreshPresence(redis: Redis, userId: string): Promise { const key = presenceKey(userId); + const debounceKey = `presence_debounce:${userId}`; + await redis.srem(key, socketId); const remaining = await redis.scard(key); if (remaining === 0) { await redis.del(key); + await redis.set(debounceKey, '1', 'EX', 3); return true; } return false; diff --git a/apps/web/src/components/conversations/ConversationListSidebar.tsx b/apps/web/src/components/conversations/ConversationListSidebar.tsx index a7dc39c..3e5d3fe 100644 --- a/apps/web/src/components/conversations/ConversationListSidebar.tsx +++ b/apps/web/src/components/conversations/ConversationListSidebar.tsx @@ -240,8 +240,9 @@ export function ConversationListSidebar() { handleOffline(data.userId); } - function onPresenceUpdate(data: { userId: string; online: boolean }) { - if (data.online) { + function onPresenceUpdate(data: { userId: string; online?: boolean; status?: 'online' | 'offline'; lastSeen?: number }) { + const isOnline = data.status ? data.status === 'online' : !!data.online; + if (isOnline) { handleOnline(data.userId); } else { handleOffline(data.userId);