Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
977 changes: 637 additions & 340 deletions package-lock.json

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@
"@nestjs/config": "^3.2.3",
"@nestjs/core": "^10.4.22",
"@nestjs/platform-express": "^10.4.22",
"@nestjs/platform-socket.io": "^10.0.0",
"@nestjs/platform-socket.io": "^10.4.22",
"@nestjs/swagger": "^7.1.1",
"@nestjs/throttler": "^6.5.0",
"@nestjs/typeorm": "^10.0.2",
"@nestjs/websockets": "^10.0.0",
"@nestjs/websockets": "^10.4.22",
"@prisma/client": "^5.19.1",
"@stellar/stellar-sdk": "^11.0.0",
"@types/jsonwebtoken": "^9.0.10",
Expand Down
98 changes: 98 additions & 0 deletions scripts/ws-load-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/**
* WebSocket connection-churn load test (issue #158).
*
* Simulates many clients repeatedly connecting and disconnecting to verify that
* the TradeGateway connection pool stays flat (no memory leak) under flaky-network
* conditions. Use it together with heap snapshots to confirm dropped sockets are
* garbage collected.
*
* Prerequisites:
* - The API running locally: npm run start
* - socket.io-client installed: npm i -D socket.io-client
*
* Usage:
* node scripts/ws-load-test.js
* URL=http://localhost:3000 CLIENTS=500 ROUNDS=20 HOLD_MS=500 node scripts/ws-load-test.js
*
* Capture heap snapshots around the run to confirm stability:
* node --expose-gc scripts/ws-load-test.js # forces GC between rounds
* # then compare process RSS / heapUsed printed each round; it should plateau.
*/

const URL = process.env.URL || 'http://localhost:3000';
const CLIENTS = Number(process.env.CLIENTS || 500);
const ROUNDS = Number(process.env.ROUNDS || 20);
const HOLD_MS = Number(process.env.HOLD_MS || 500);

let io;
try {
io = require('socket.io-client');
} catch {
console.error(
'socket.io-client is required to run this load test.\n' +
'Install it with: npm i -D socket.io-client',
);
process.exit(1);
}

const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms));

function mb(bytes) {
return (bytes / 1024 / 1024).toFixed(1);
}

function logMemory(label) {
const m = process.memoryUsage();
console.log(
`[${label}] rss=${mb(m.rss)}MB heapUsed=${mb(m.heapUsed)}MB heapTotal=${mb(m.heapTotal)}MB`,
);
}

function connectOnce() {
return new Promise((resolve) => {
const socket = io(URL, {
transports: ['websocket'],
reconnection: false,
timeout: 5000,
});
const done = () => resolve(socket);
socket.on('connect', done);
socket.on('connect_error', done);
});
}

async function round(n) {
const sockets = await Promise.all(
Array.from({ length: CLIENTS }, () => connectOnce()),
);
await sleep(HOLD_MS);
// Abruptly drop every connection — mimics flaky clients vanishing.
sockets.forEach((s) => s.disconnect());
await sleep(HOLD_MS);
if (global.gc) {
global.gc();
}
logMemory(`round ${n + 1}/${ROUNDS}`);
}

async function main() {
console.log(
`Churning ${CLIENTS} clients x ${ROUNDS} rounds against ${URL} ` +
`(${global.gc ? 'gc enabled' : 'run with --expose-gc for GC between rounds'})`,
);
logMemory('baseline');
for (let i = 0; i < ROUNDS; i++) {
await round(i);
}
logMemory('final');
console.log(
'If heapUsed/rss plateau across rounds rather than growing monotonically, ' +
'dropped connections are being pruned correctly.',
);
process.exit(0);
}

main().catch((err) => {
console.error('Load test failed:', err);
process.exit(1);
});
4 changes: 2 additions & 2 deletions src/common/logger/custom.logger.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ConsoleLogger, Injectable, Scope } from '@nestjs/common';
import { ConsoleLogger, Injectable, LogLevel, Scope } from '@nestjs/common';
import { requestContextStorage } from '../storage/request-context.storage';

/**
Expand All @@ -19,7 +19,7 @@ export class CustomLogger extends ConsoleLogger {
* @returns The fully formatted log string.
*/
formatMessage(
logLevel: string,
logLevel: LogLevel,
message: unknown,
pidMessage: string,
formattedLogLevel: string,
Expand Down
7 changes: 4 additions & 3 deletions src/dynamic/dynamic.module.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { Module } from '@nestjs/common';
import { InvoicesController } from './invoices.controller';
import { PdfService } from './pdf.service';
import { NetworkController } from './dynamic.controller';
import { PdfService } from './dynamic.serivice';

@Module({
controllers: [InvoicesController],
controllers: [NetworkController],
providers: [PdfService],
exports: [PdfService],
})
export class InvoicesModule {}
12 changes: 7 additions & 5 deletions src/pools/pools.controller.int.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,19 @@ describe('PoolsController (integration)', () => {
await app.close();
});

it('GET /api/v1/pools/:poolId/apy-history returns 200 and success envelope', async () => {
it('GET /api/v1/pools/:poolId/apy-history returns 200 and a 7-point history array', async () => {
const res = await request(app.getHttpServer())
.get('/api/v1/pools/pool-123/apy-history')
.expect(200);

// The endpoint returns the APY history array directly (see the controller's
// `ApyHistoryPoint[]` return type / Swagger `isArray: true`); this app has no
// global response-envelope interceptor.
expect(res.body).toBeDefined();
expect(res.body.status).toBe('success');
expect(Array.isArray(res.body.data)).toBe(true);
expect(res.body.data).toHaveLength(7);
expect(Array.isArray(res.body)).toBe(true);
expect(res.body).toHaveLength(7);

for (const item of res.body.data) {
for (const item of res.body) {
expect(item).toHaveProperty('date');
expect(item).toHaveProperty('apyPercentage');
expect(typeof item.date).toBe('string');
Expand Down
183 changes: 183 additions & 0 deletions src/trade/trade.gateway.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
import { Test, TestingModule } from '@nestjs/testing';
import { TradeGateway } from './trade.gateway';
import { RedisService } from '../common/redis/redis.service';

/**
* Unit tests for the TradeGateway connection pool (issue #158).
*
* Verifies that connections are tracked on connect, fully removed on disconnect
* (no leaked references or timers), that the heartbeat terminates ghost/dead
* sockets, and that churning 500 clients leaves the pool empty.
*/
const HEARTBEAT_INTERVAL_MS = 30_000;

interface MockSocket {
id: string;
connected: boolean;
conn: { on: jest.Mock; off: jest.Mock };
on: jest.Mock;
off: jest.Mock;
emit: jest.Mock;
disconnect: jest.Mock;
trigger: (event: string, ...args: unknown[]) => void;
triggerConn: (event: string, ...args: unknown[]) => void;
}

function createMockSocket(id: string): MockSocket {
const handlers: Record<string, (...args: unknown[]) => void> = {};
const connHandlers: Record<string, (...args: unknown[]) => void> = {};
const socket: MockSocket = {
id,
connected: true,
conn: {
on: jest.fn((event: string, handler: (...args: unknown[]) => void) => {
connHandlers[event] = handler;
}),
off: jest.fn(),
},
on: jest.fn((event: string, handler: (...args: unknown[]) => void) => {
handlers[event] = handler;
}),
off: jest.fn(),
emit: jest.fn(),
disconnect: jest.fn(function (this: MockSocket) {
this.connected = false;
}),
trigger: (event, ...args) => handlers[event]?.(...args),
triggerConn: (event, ...args) => connHandlers[event]?.(...args),
};
return socket;
}

describe('TradeGateway (connection pool)', () => {
let gateway: TradeGateway;

beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [
TradeGateway,
{ provide: RedisService, useValue: { subscribe: jest.fn() } },
],
}).compile();

gateway = module.get<TradeGateway>(TradeGateway);

// Use jest's fake timers for the whole suite so the per-socket heartbeat is
// deterministic and timer globals are consistently available. Enabled after
// the async module compile so it doesn't interfere with promise resolution.
jest.useFakeTimers();
});

afterEach(() => {
gateway.onModuleDestroy();
jest.clearAllTimers();
jest.useRealTimers();
});

it('should be defined', () => {
expect(gateway).toBeDefined();
expect(gateway.getConnectionCount()).toBe(0);
});

it('tracks a connection on connect and releases it on disconnect', () => {
const socket = createMockSocket('a');

gateway.handleConnection(socket as never);
expect(gateway.getConnectionCount()).toBe(1);

gateway.handleDisconnect(socket as never);
expect(gateway.getConnectionCount()).toBe(0);
});

it('clears the per-socket heartbeat on disconnect (no lingering interval)', () => {
jest.useFakeTimers();
const clearSpy = jest.spyOn(global, 'clearInterval');
const socket = createMockSocket('a');

gateway.handleConnection(socket as never);
gateway.handleDisconnect(socket as never);

expect(clearSpy).toHaveBeenCalled();

// After disconnect, advancing time must not emit further heartbeats.
socket.emit.mockClear();
jest.advanceTimersByTime(HEARTBEAT_INTERVAL_MS * 3);
expect(socket.emit).not.toHaveBeenCalled();
});

it('detaches every listener it attached on disconnect', () => {
const socket = createMockSocket('a');

gateway.handleConnection(socket as never);
gateway.handleDisconnect(socket as never);

expect(socket.off).toHaveBeenCalledWith('heartbeat:pong', expect.any(Function));
expect(socket.off).toHaveBeenCalledWith('error', expect.any(Function));
expect(socket.conn.off).toHaveBeenCalledWith('packet', expect.any(Function));
});

it('sends a heartbeat ping and keeps a responsive client alive', () => {
jest.useFakeTimers();
const socket = createMockSocket('a');
gateway.handleConnection(socket as never);

// First probe: emits an app-level ping and marks the client unproven.
jest.advanceTimersByTime(HEARTBEAT_INTERVAL_MS);
expect(socket.emit).toHaveBeenCalledWith('heartbeat:ping');

// Client proves liveness (engine pong packet).
socket.triggerConn('packet', { type: 'pong' });

// Next probe: still alive, so it is NOT disconnected.
jest.advanceTimersByTime(HEARTBEAT_INTERVAL_MS);
expect(socket.disconnect).not.toHaveBeenCalled();
expect(gateway.getConnectionCount()).toBe(1);
});

it('terminates an unresponsive (ghost) client', () => {
jest.useFakeTimers();
const socket = createMockSocket('a');
gateway.handleConnection(socket as never);

// Two probes with no pong in between → second probe terminates it.
jest.advanceTimersByTime(HEARTBEAT_INTERVAL_MS); // marks unproven, pings
jest.advanceTimersByTime(HEARTBEAT_INTERVAL_MS); // still unproven → disconnect
expect(socket.disconnect).toHaveBeenCalledWith(true);
});

it('terminates a client whose transport already dropped', () => {
jest.useFakeTimers();
const socket = createMockSocket('a');
gateway.handleConnection(socket as never);

socket.connected = false; // transport gone but still in the pool
jest.advanceTimersByTime(HEARTBEAT_INTERVAL_MS);
expect(socket.disconnect).toHaveBeenCalledWith(true);
});

it('does not leak references when 500 clients connect and disconnect', () => {
const sockets = Array.from({ length: 500 }, (_, i) =>
createMockSocket(`client-${i}`),
);

sockets.forEach((s) => gateway.handleConnection(s as never));
expect(gateway.getConnectionCount()).toBe(500);

sockets.forEach((s) => gateway.handleDisconnect(s as never));
expect(gateway.getConnectionCount()).toBe(0);
});

it('clears all timers on module destroy', () => {
jest.useFakeTimers();
const clearSpy = jest.spyOn(global, 'clearInterval');
[createMockSocket('a'), createMockSocket('b')].forEach((s) =>
gateway.handleConnection(s as never),
);
expect(gateway.getConnectionCount()).toBe(2);

gateway.onModuleDestroy();

expect(gateway.getConnectionCount()).toBe(0);
expect(clearSpy).toHaveBeenCalledTimes(2);
});
});
Loading