From 6486f92c94ed5ae871d84c61c83e1705fa67eae9 Mon Sep 17 00:00:00 2001 From: "Huncho.Dev" Date: Sun, 28 Jun 2026 04:14:50 +0000 Subject: [PATCH] #219 Typing indicators (ephemeral, never stored) FIXED --- apps/backend/src/__tests__/typing.test.ts | 293 ++++++++++++++++++ apps/backend/src/routes/conversations.ts | 4 +- apps/backend/src/routes/treasury.ts | 5 +- apps/backend/src/socket/messaging.ts | 160 ++++++++-- apps/web/src/components/chat/MessageInput.tsx | 33 +- 5 files changed, 455 insertions(+), 40 deletions(-) create mode 100644 apps/backend/src/__tests__/typing.test.ts diff --git a/apps/backend/src/__tests__/typing.test.ts b/apps/backend/src/__tests__/typing.test.ts new file mode 100644 index 0000000..6907d73 --- /dev/null +++ b/apps/backend/src/__tests__/typing.test.ts @@ -0,0 +1,293 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import { EventEmitter } from 'events'; + +// ── Mock DB ──────────────────────────────────────────────────────────────── + +const mockFindFirst = vi.fn(); +const mockFindMany = vi.fn(); +const mockInsert = vi.fn(); +const mockUpdate = vi.fn(); + +vi.mock('../db/index.js', () => ({ + db: { + query: { + conversationMembers: { + findFirst: mockFindFirst, + findMany: mockFindMany, + }, + }, + insert: mockInsert, + update: mockUpdate, + }, +})); + +vi.mock('../db/schema.js', () => ({ + conversationMembers: {}, + conversations: {}, + messages: {}, +})); + +vi.mock('drizzle-orm', () => ({ + and: vi.fn((...args: unknown[]) => args), + eq: vi.fn((col: unknown, val: unknown) => ({ col, val })), + lt: vi.fn(), + desc: vi.fn(), + sql: vi.fn(), +})); + +vi.mock('../lib/conversationCache.js', () => ({ + invalidateConversationCaches: vi.fn(), +})); + +vi.mock('../lib/messages.js', () => ({ + serializeMessage: vi.fn(), +})); + +vi.mock('../lib/redis.js', () => ({ + redis: null, +})); + +// ── Mock Socket helpers ──────────────────────────────────────────────────── + +function makeSocket(userId: string, rooms: string[] = []) { + const emitter = new EventEmitter(); + const emitted: { event: string; data: unknown }[] = []; + const roomEmitted: { room: string; event: string; data: unknown }[] = []; + + const socket = Object.assign(emitter, { + id: `sock-${userId}`, + auth: { userId }, + rooms: new Set(rooms), + emit: vi.fn((event: string, data: unknown) => { + emitted.push({ event, data }); + }), + to: vi.fn((room: string) => ({ + emit: vi.fn((event: string, data: unknown) => { + roomEmitted.push({ room, event, data }); + }), + })), + join: vi.fn((room: string) => { + socket.rooms.add(room); + }), + emitted, + roomEmitted, + }); + + return socket; +} + +function makeIo() { + const roomEmitted: { room: string; event: string; data: unknown }[] = []; + const io = { + to: vi.fn((room: string) => ({ + emit: vi.fn((event: string, data: unknown) => { + roomEmitted.push({ room, event, data }); + }), + })), + roomEmitted, + }; + return io; +} + +// ── Tests ────────────────────────────────────────────────────────────────── + +describe('Typing indicator Socket events (typing_start / typing_stop)', () => { + beforeEach(() => { + vi.clearAllMocks(); + vi.useFakeTimers(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it('relays typing_start to conversation room members with zero DB writes', async () => { + const userId = 'user-123'; + const conversationId = 'conv-abc'; + const socket = makeSocket(userId, [conversationId]); + const io = makeIo(); + + const { registerMessagingHandlers } = await import('../socket/messaging.js'); + registerMessagingHandlers(io as never, socket as never); + + const handler = (socket as EventEmitter).listeners('typing_start')[0] as (p: unknown) => Promise; + await handler({ conversationId }); + + // Zero DB writes + expect(mockInsert).not.toHaveBeenCalled(); + expect(mockUpdate).not.toHaveBeenCalled(); + + // Relayed to room via socket.to(room).emit + expect(socket.to).toHaveBeenCalledWith(conversationId); + expect(socket.roomEmitted).toContainEqual({ + room: conversationId, + event: 'typing_start', + data: { conversationId, userId }, + }); + }); + + it('includes optional deviceId but never relays content', async () => { + const userId = 'user-123'; + const conversationId = 'conv-abc'; + const deviceId = 'device-xyz'; + const socket = makeSocket(userId, [conversationId]); + const io = makeIo(); + + const { registerMessagingHandlers } = await import('../socket/messaging.js'); + registerMessagingHandlers(io as never, socket as never); + + const handler = (socket as EventEmitter).listeners('typing_start')[0] as (p: unknown) => Promise; + await handler({ + conversationId, + deviceId, + content: 'SUPER SECRET CONFIDENTIAL TEXT', + extraField: 12345, + }); + + expect(socket.roomEmitted).toContainEqual({ + room: conversationId, + event: 'typing_start', + data: { conversationId, userId, deviceId }, + }); + + const emittedPayload = socket.roomEmitted[0]!.data as Record; + expect(emittedPayload).not.toHaveProperty('content'); + expect(emittedPayload).not.toHaveProperty('extraField'); + }); + + it('auto-clears typing state after timeout (5 seconds) if no typing_stop', async () => { + const userId = 'user-timer'; + const conversationId = 'conv-timer'; + const socket = makeSocket(userId, [conversationId]); + const io = makeIo(); + + const { registerMessagingHandlers } = await import('../socket/messaging.js'); + registerMessagingHandlers(io as never, socket as never); + + const startHandler = (socket as EventEmitter).listeners('typing_start')[0] as (p: unknown) => Promise; + await startHandler({ conversationId }); + + expect(socket.roomEmitted).toHaveLength(1); + expect(socket.roomEmitted[0]?.event).toBe('typing_start'); + + // Advance time by 4.9 seconds - should not clear yet + vi.advanceTimersByTime(4900); + expect(socket.roomEmitted).toHaveLength(1); + + // Advance time past 5 seconds + vi.advanceTimersByTime(100); + expect(socket.roomEmitted).toHaveLength(2); + expect(socket.roomEmitted[1]).toEqual({ + room: conversationId, + event: 'typing_stop', + data: { conversationId, userId }, + }); + }); + + it('manual typing_stop clears auto-expire timeout and relays typing_stop', async () => { + const userId = 'user-stop'; + const conversationId = 'conv-stop'; + const socket = makeSocket(userId, [conversationId]); + const io = makeIo(); + + const { registerMessagingHandlers } = await import('../socket/messaging.js'); + registerMessagingHandlers(io as never, socket as never); + + const startHandler = (socket as EventEmitter).listeners('typing_start')[0] as (p: unknown) => Promise; + const stopHandler = (socket as EventEmitter).listeners('typing_stop')[0] as (p: unknown) => Promise; + + await startHandler({ conversationId }); + await stopHandler({ conversationId }); + + expect(socket.roomEmitted).toHaveLength(2); + expect(socket.roomEmitted[1]?.event).toBe('typing_stop'); + + // Advance time by 10 seconds - timer should have been cancelled, no duplicate typing_stop + vi.advanceTimersByTime(10000); + expect(socket.roomEmitted).toHaveLength(2); + }); + + it('guards non-members when socket not in room and DB membership check fails', async () => { + const userId = 'outsider'; + const conversationId = 'conv-private'; + const socket = makeSocket(userId, []); // not in room + const io = makeIo(); + + mockFindFirst.mockResolvedValueOnce(undefined); // DB check says not a member + + const { registerMessagingHandlers } = await import('../socket/messaging.js'); + registerMessagingHandlers(io as never, socket as never); + + const startHandler = (socket as EventEmitter).listeners('typing_start')[0] as (p: unknown) => Promise; + await startHandler({ conversationId }); + + expect(socket.to).not.toHaveBeenCalled(); + expect(socket.emit).toHaveBeenCalledWith( + 'error', + expect.objectContaining({ + event: 'typing_start', + message: expect.stringContaining('member'), + }), + ); + }); + + it('clears active typing state on disconnect', async () => { + const userId = 'user-dc'; + const conversationId = 'conv-dc'; + const deviceId = 'dev-dc'; + const socket = makeSocket(userId, [conversationId]); + const io = makeIo(); + + const { registerMessagingHandlers } = await import('../socket/messaging.js'); + registerMessagingHandlers(io as never, socket as never); + + const startHandler = (socket as EventEmitter).listeners('typing_start')[0] as (p: unknown) => Promise; + await startHandler({ conversationId, deviceId }); + + expect(socket.roomEmitted).toHaveLength(1); + + // Trigger disconnect + const dcHandlers = (socket as EventEmitter).listeners('disconnect'); + for (const h of dcHandlers) { + h(); + } + + expect(socket.roomEmitted).toHaveLength(2); + expect(socket.roomEmitted[1]).toEqual({ + room: conversationId, + event: 'typing_stop', + data: { conversationId, userId, deviceId }, + }); + }); + + it('clears active typing state on send_message', async () => { + const userId = 'user-msg'; + const conversationId = 'conv-msg'; + const socket = makeSocket(userId, [conversationId]); + const io = makeIo(); + + mockFindFirst.mockResolvedValue({ id: 'mem-1', userId, conversationId }); + mockFindMany.mockResolvedValue([]); + const returnFn = vi.fn().mockResolvedValue([{ id: 'msg-1', content: 'hello' }]); + const valFn = vi.fn().mockReturnValue({ returning: returnFn }); + mockInsert.mockReturnValue({ values: valFn }); + + const { registerMessagingHandlers } = await import('../socket/messaging.js'); + registerMessagingHandlers(io as never, socket as never); + + const startHandler = (socket as EventEmitter).listeners('typing_start')[0] as (p: unknown) => Promise; + const sendHandler = (socket as EventEmitter).listeners('send_message')[0] as (p: unknown) => Promise; + + await startHandler({ conversationId }); + expect(socket.roomEmitted).toHaveLength(1); + + await sendHandler({ conversationId, content: 'Done typing!' }); + + // Should emit new_message (io.to) AND typing_stop (socket.to) + expect(socket.roomEmitted).toContainEqual({ + room: conversationId, + event: 'typing_stop', + data: { conversationId, userId }, + }); + }); +}); \ No newline at end of file diff --git a/apps/backend/src/routes/conversations.ts b/apps/backend/src/routes/conversations.ts index 822070f..dfcfb14 100644 --- a/apps/backend/src/routes/conversations.ts +++ b/apps/backend/src/routes/conversations.ts @@ -95,7 +95,7 @@ conversationsRouter.get('/', async (req: AuthRequest, res) => { with: { conversation: conversationRelations as never, }, - })) as unknown as Array<{ conversationId: string; conversation: ConversationPayload }>; + })) as unknown as Array<{ conversationId: string; isMuted: boolean; isArchived: boolean; conversation: ConversationPayload }>; // Single subquery for message counts — no N+1 const conversationIds = memberships.map((m) => m.conversationId); @@ -762,4 +762,4 @@ conversationsRouter.delete('/:id/leave', async (req: AuthRequest, res) => { await invalidateConversationCaches(members.map((member) => member.userId)); res.status(204).send(); -}); +}); \ No newline at end of file diff --git a/apps/backend/src/routes/treasury.ts b/apps/backend/src/routes/treasury.ts index 660f768..b9b28d4 100644 --- a/apps/backend/src/routes/treasury.ts +++ b/apps/backend/src/routes/treasury.ts @@ -1,9 +1,10 @@ import { Router } from 'express'; +import type { IRouter } from 'express'; import { z } from 'zod'; import { requireAuth, type AuthRequest } from '../middleware/auth.js'; import { validate } from '../middleware/validate.js'; -export const treasuryRouter = Router(); +export const treasuryRouter: IRouter = Router(); treasuryRouter.use(requireAuth); @@ -38,4 +39,4 @@ treasuryRouter.post('/propose', validate(proposeSchema), async (req, res) => { recipient, ttlLedgers: TTL_LEDGERS[ttl], }); -}); +}); \ No newline at end of file diff --git a/apps/backend/src/socket/messaging.ts b/apps/backend/src/socket/messaging.ts index 17d3bab..a3291d7 100644 --- a/apps/backend/src/socket/messaging.ts +++ b/apps/backend/src/socket/messaging.ts @@ -11,6 +11,23 @@ const PAGE_SIZE = 30; export function registerMessagingHandlers(io: Server, socket: AuthSocket): void { const userId = socket.auth!.userId; + const typingTimers = new Map(); + + socket.on('disconnect', () => { + for (const [timerKey, timer] of typingTimers.entries()) { + clearTimeout(timer); + const idx = timerKey.indexOf(':'); + const cid = idx === -1 ? timerKey : timerKey.slice(0, idx); + const did = idx === -1 ? undefined : timerKey.slice(idx + 1); + const rp: { conversationId: string; userId: string; deviceId?: string } = { + conversationId: cid, + userId, + }; + if (did) rp.deviceId = did; + socket.to(cid).emit('typing_stop', rp); + } + typingTimers.clear(); + }); // ── join_room ────────────────────────────────────────────────────────────── // Payload: { conversationId: string } @@ -64,6 +81,21 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void io.to(conversationId).emit('new_message', message); + for (const [timerKey, timer] of typingTimers.entries()) { + if (timerKey === conversationId || timerKey.startsWith(`${conversationId}:`)) { + clearTimeout(timer); + typingTimers.delete(timerKey); + const idx = timerKey.indexOf(':'); + const did = idx === -1 ? undefined : timerKey.slice(idx + 1); + const rp: { conversationId: string; userId: string; deviceId?: string } = { + conversationId, + userId, + }; + if (did) rp.deviceId = did; + socket.to(conversationId).emit('typing_stop', rp); + } + } + const members = await db.query.conversationMembers.findMany({ where: eq(conversationMembers.conversationId, conversationId), columns: { userId: true }, @@ -196,46 +228,108 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void }, ); // ── typing_start ──────────────────────────────────────────────────────────── - // Payload: { conversationId: string } - // Broadcasts to the room excluding the sender. No DB write. - socket.on('typing_start', async (payload: { conversationId: string }) => { - const { conversationId } = payload; + // Payload: { conversationId: string; deviceId?: string } + // Broadcasts to the room excluding the sender via Pub/Sub. Zero DB write. Auto-expires. + socket.on( + 'typing_start', + async (payload?: { conversationId?: string; deviceId?: string; [key: string]: unknown }) => { + if (!payload || typeof payload.conversationId !== 'string' || !payload.conversationId.trim()) { + socket.emit('error', { event: 'typing_start', message: 'Invalid conversationId' }); + return; + } - const membership = await db.query.conversationMembers.findFirst({ - where: and( - eq(conversationMembers.conversationId, conversationId), - eq(conversationMembers.userId, userId), - ), - }); + const conversationId = payload.conversationId.trim(); - if (!membership) { - socket.emit('error', { event: 'typing_start', message: 'Not a member of this conversation' }); - return; - } + if (!socket.rooms?.has(conversationId)) { + const membership = await db.query.conversationMembers.findFirst({ + where: and( + eq(conversationMembers.conversationId, conversationId), + eq(conversationMembers.userId, userId), + ), + }); - socket.to(conversationId).emit('typing_start', { conversationId, userId }); - }); + if (!membership) { + socket.emit('error', { + event: 'typing_start', + message: 'Not a member of this conversation', + }); + return; + } + } + + const relayPayload: { conversationId: string; userId: string; deviceId?: string } = { + conversationId, + userId, + }; + if (typeof payload.deviceId === 'string' && payload.deviceId.trim()) { + relayPayload.deviceId = payload.deviceId.trim(); + } + + const timerKey = relayPayload.deviceId ? `${conversationId}:${relayPayload.deviceId}` : conversationId; + const existingTimer = typingTimers.get(timerKey); + if (existingTimer) { + clearTimeout(existingTimer); + } + + const timer = setTimeout(() => { + typingTimers.delete(timerKey); + socket.to(conversationId).emit('typing_stop', relayPayload); + }, 5000); + + typingTimers.set(timerKey, timer); + + socket.to(conversationId).emit('typing_start', relayPayload); + }, + ); // ── typing_stop ───────────────────────────────────────────────────────────── - // Payload: { conversationId: string } - // Broadcasts to the room excluding the sender. No DB write. - socket.on('typing_stop', async (payload: { conversationId: string }) => { - const { conversationId } = payload; + // Payload: { conversationId: string; deviceId?: string } + // Broadcasts to the room excluding the sender via Pub/Sub. Zero DB write. + socket.on( + 'typing_stop', + async (payload?: { conversationId?: string; deviceId?: string; [key: string]: unknown }) => { + if (!payload || typeof payload.conversationId !== 'string' || !payload.conversationId.trim()) { + socket.emit('error', { event: 'typing_stop', message: 'Invalid conversationId' }); + return; + } - const membership = await db.query.conversationMembers.findFirst({ - where: and( - eq(conversationMembers.conversationId, conversationId), - eq(conversationMembers.userId, userId), - ), - }); + const conversationId = payload.conversationId.trim(); - if (!membership) { - socket.emit('error', { event: 'typing_stop', message: 'Not a member of this conversation' }); - return; - } + if (!socket.rooms?.has(conversationId)) { + const membership = await db.query.conversationMembers.findFirst({ + where: and( + eq(conversationMembers.conversationId, conversationId), + eq(conversationMembers.userId, userId), + ), + }); - socket.to(conversationId).emit('typing_stop', { conversationId, userId }); - }); + if (!membership) { + socket.emit('error', { + event: 'typing_stop', + message: 'Not a member of this conversation', + }); + return; + } + } + + const relayPayload: { conversationId: string; userId: string; deviceId?: string } = { + conversationId, + userId, + }; + if (typeof payload.deviceId === 'string' && payload.deviceId.trim()) { + relayPayload.deviceId = payload.deviceId.trim(); + } + + const timerKey = relayPayload.deviceId ? `${conversationId}:${relayPayload.deviceId}` : conversationId; + const existingTimer = typingTimers.get(timerKey); + if (existingTimer) { + clearTimeout(existingTimer); + typingTimers.delete(timerKey); + } + + socket.to(conversationId).emit('typing_stop', relayPayload); + }, + ); // ── ask_assistant ────────────────────────────────────────────────────────── // Payload: { conversationId: string; content: string } @@ -334,4 +428,4 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void socket.emit('error', { event: 'ask_assistant', message: 'Failed to get AI reply' }); } }); -} +} \ No newline at end of file diff --git a/apps/web/src/components/chat/MessageInput.tsx b/apps/web/src/components/chat/MessageInput.tsx index 9ccc66a..85c2136 100644 --- a/apps/web/src/components/chat/MessageInput.tsx +++ b/apps/web/src/components/chat/MessageInput.tsx @@ -1,6 +1,6 @@ "use client"; -import React, { useState } from "react"; +import React, { useState, useRef } from "react"; import type { Socket } from "socket.io-client"; import transferToken from "../../lib/soroban"; @@ -15,9 +15,35 @@ export default function MessageInput({ conversationId, recipient, socket }: Prop const [showPay, setShowPay] = useState(false); const [amount, setAmount] = useState(""); const [busy, setBusy] = useState(false); + const typingTimerRef = useRef | null>(null); + const isTypingRef = useRef(false); + + function stopTyping() { + if (socket && conversationId && isTypingRef.current) { + isTypingRef.current = false; + if (typingTimerRef.current) clearTimeout(typingTimerRef.current); + socket.emit("typing_stop", { conversationId }); + } + } + + function handleTextChange(e: React.ChangeEvent) { + setText(e.target.value); + if (socket && conversationId) { + if (!isTypingRef.current) { + isTypingRef.current = true; + socket.emit("typing_start", { conversationId }); + } + if (typingTimerRef.current) clearTimeout(typingTimerRef.current); + typingTimerRef.current = setTimeout(() => { + isTypingRef.current = false; + socket.emit("typing_stop", { conversationId }); + }, 2000); + } + } function handleSendText() { if (!text.trim() || !socket) return; + stopTyping(); socket.emit("send_message", { conversationId, content: text.trim(), @@ -38,6 +64,7 @@ export default function MessageInput({ conversationId, recipient, socket }: Prop } setBusy(true); + stopTyping(); try { const txHash = await transferToken(recipient, Math.floor(n)); const transferMsg = { @@ -75,7 +102,7 @@ export default function MessageInput({ conversationId, recipient, socket }: Prop className="flex-1 p-2 border rounded" placeholder="Type a message..." value={text} - onChange={(e) => setText(e.target.value)} + onChange={handleTextChange} onKeyDown={(e) => { if (e.key === "Enter") handleSendText(); }} @@ -131,4 +158,4 @@ export default function MessageInput({ conversationId, recipient, socket }: Prop )} ); -} +} \ No newline at end of file