From 5c03bffa1e8060bec98cc3af39e081e53d49d51c Mon Sep 17 00:00:00 2001 From: Jaydbrown Date: Sat, 20 Jun 2026 18:17:13 +0100 Subject: [PATCH 01/32] refactor(autoRebalancer): add WebSocketServer import from ws package --- backend/src/services/autoRebalancer.ts | 253 +++++++++++++------------ 1 file changed, 127 insertions(+), 126 deletions(-) diff --git a/backend/src/services/autoRebalancer.ts b/backend/src/services/autoRebalancer.ts index 4e1197f..969ef65 100644 --- a/backend/src/services/autoRebalancer.ts +++ b/backend/src/services/autoRebalancer.ts @@ -1,139 +1,140 @@ +import { WebSocketServer } from 'ws' import { StellarService } from './stellar.js' import { ReflectorService } from './reflector.js' import { rebalanceHistoryService } from './serviceContainer.js' import { portfolioStorage } from './portfolioStorage.js' -import { CircuitBreakers } from './circuitBreakers.js' -import { notificationService } from './notificationService.js' -import { logger } from '../utils/logger.js' -import { getPortfolioCheckQueue } from '../queue/queues.js' -import { isRedisAvailable } from '../queue/connection.js' - +import { CircuitBreakers } from './circuitBreakers.js' +import { notificationService } from './notificationService.js' +import { logger } from '../utils/logger.js' +import { getPortfolioCheckQueue } from '../queue/queues.js' +import { isRedisAvailable } from '../queue/connection.js' + export class AutoRebalancerService { private stellarService: StellarService private reflectorService: ReflectorService private isRunning = false - - // Configuration (kept for getStatus() compatibility) - private readonly CHECK_INTERVAL = 30 * 60 * 1000 // 30 minutes - private readonly MIN_REBALANCE_INTERVAL = 24 * 60 * 60 * 1000 - private readonly MAX_AUTO_REBALANCES_PER_DAY = 3 - + + // Configuration (kept for getStatus() compatibility) + private readonly CHECK_INTERVAL = 30 * 60 * 1000 // 30 minutes + private readonly MIN_REBALANCE_INTERVAL = 24 * 60 * 60 * 1000 + private readonly MAX_AUTO_REBALANCES_PER_DAY = 3 + constructor() { this.stellarService = new StellarService() this.reflectorService = new ReflectorService() } - - /** - * Start the automatic monitoring service. - * With BullMQ, this just flags the service as running – the scheduler - * already registered the repeatable job. We also enqueue an immediate - * check so the first run happens without waiting 30 min. - */ - async start(): Promise { - if (this.isRunning) { - logger.warn('[AUTO-REBALANCER] Already running') - return - } - - this.isRunning = true - logger.info('[AUTO-REBALANCER] Service started (queue-backed)') - - const redisUp = await isRedisAvailable() - if (redisUp) { - const queue = getPortfolioCheckQueue() - if (queue) { - await queue.add( - 'startup-portfolio-check', - { triggeredBy: 'startup' }, - { priority: 1 } - ) - logger.info('[AUTO-REBALANCER] Enqueued startup portfolio-check job') - } - } else { - logger.warn('[AUTO-REBALANCER] Redis not available – startup check skipped') - } - } - - /** - * Stop the service flag (workers are stopped separately by index.ts). - */ - stop(): void { - if (!this.isRunning) return - this.isRunning = false - logger.info('[AUTO-REBALANCER] Service stopped') - } - - /** - * Force an immediate check of all portfolios. - */ - async forceCheck(): Promise { - const queue = getPortfolioCheckQueue() - if (!queue) throw new Error('Redis unavailable – cannot force check') - - await queue.add( - 'force-portfolio-check', - { triggeredBy: 'manual' }, - { priority: 1 } - ) - logger.info('[AUTO-REBALANCER] Force check job enqueued') - } - - /** - * Get service status - */ - getStatus(): { - isRunning: boolean - checkInterval: number - minRebalanceInterval: number - maxRebalancesPerDay: number - backend: string - } { - return { - isRunning: this.isRunning, - checkInterval: this.CHECK_INTERVAL, - minRebalanceInterval: this.MIN_REBALANCE_INTERVAL, - maxRebalancesPerDay: this.MAX_AUTO_REBALANCES_PER_DAY, - backend: 'bullmq', - } - } - - /** - * Get statistics about auto-rebalancing activity - */ - async getStatistics(): Promise<{ - totalAutoRebalances: number - rebalancesToday: number - lastCheckTime: string | null - averageRebalancesPerDay: number - }> { - try { + + /** + * Start the automatic monitoring service. + * With BullMQ, this just flags the service as running – the scheduler + * already registered the repeatable job. We also enqueue an immediate + * check so the first run happens without waiting 30 min. + */ + async start(): Promise { + if (this.isRunning) { + logger.warn('[AUTO-REBALANCER] Already running') + return + } + + this.isRunning = true + logger.info('[AUTO-REBALANCER] Service started (queue-backed)') + + const redisUp = await isRedisAvailable() + if (redisUp) { + const queue = getPortfolioCheckQueue() + if (queue) { + await queue.add( + 'startup-portfolio-check', + { triggeredBy: 'startup' }, + { priority: 1 } + ) + logger.info('[AUTO-REBALANCER] Enqueued startup portfolio-check job') + } + } else { + logger.warn('[AUTO-REBALANCER] Redis not available – startup check skipped') + } + } + + /** + * Stop the service flag (workers are stopped separately by index.ts). + */ + stop(): void { + if (!this.isRunning) return + this.isRunning = false + logger.info('[AUTO-REBALANCER] Service stopped') + } + + /** + * Force an immediate check of all portfolios. + */ + async forceCheck(): Promise { + const queue = getPortfolioCheckQueue() + if (!queue) throw new Error('Redis unavailable – cannot force check') + + await queue.add( + 'force-portfolio-check', + { triggeredBy: 'manual' }, + { priority: 1 } + ) + logger.info('[AUTO-REBALANCER] Force check job enqueued') + } + + /** + * Get service status + */ + getStatus(): { + isRunning: boolean + checkInterval: number + minRebalanceInterval: number + maxRebalancesPerDay: number + backend: string + } { + return { + isRunning: this.isRunning, + checkInterval: this.CHECK_INTERVAL, + minRebalanceInterval: this.MIN_REBALANCE_INTERVAL, + maxRebalancesPerDay: this.MAX_AUTO_REBALANCES_PER_DAY, + backend: 'bullmq', + } + } + + /** + * Get statistics about auto-rebalancing activity + */ + async getStatistics(): Promise<{ + totalAutoRebalances: number + rebalancesToday: number + lastCheckTime: string | null + averageRebalancesPerDay: number + }> { + try { const allAutoRebalances = await rebalanceHistoryService.getAllAutoRebalances() - - const today = new Date() - today.setHours(0, 0, 0, 0) - const todayRebalances = allAutoRebalances.filter( - r => new Date(r.timestamp) >= today - ) - - const thirtyDaysAgo = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000) - const recentRebalances = allAutoRebalances.filter( - r => new Date(r.timestamp) >= thirtyDaysAgo - ) - - return { - totalAutoRebalances: allAutoRebalances.length, - rebalancesToday: todayRebalances.length, - lastCheckTime: new Date().toISOString(), - averageRebalancesPerDay: recentRebalances.length / 30, - } - } catch (error) { - logger.error('[AUTO-REBALANCER] Error getting statistics', { error }) - return { - totalAutoRebalances: 0, - rebalancesToday: 0, - lastCheckTime: null, - averageRebalancesPerDay: 0, - } - } - } -} + + const today = new Date() + today.setHours(0, 0, 0, 0) + const todayRebalances = allAutoRebalances.filter( + r => new Date(r.timestamp) >= today + ) + + const thirtyDaysAgo = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000) + const recentRebalances = allAutoRebalances.filter( + r => new Date(r.timestamp) >= thirtyDaysAgo + ) + + return { + totalAutoRebalances: allAutoRebalances.length, + rebalancesToday: todayRebalances.length, + lastCheckTime: new Date().toISOString(), + averageRebalancesPerDay: recentRebalances.length / 30, + } + } catch (error) { + logger.error('[AUTO-REBALANCER] Error getting statistics', { error }) + return { + totalAutoRebalances: 0, + rebalancesToday: 0, + lastCheckTime: null, + averageRebalancesPerDay: 0, + } + } + } +} From 39ef5de93b0a8fbb4363289c5ba1885d6cb71dbe Mon Sep 17 00:00:00 2001 From: Jaydbrown Date: Sat, 20 Jun 2026 18:17:15 +0100 Subject: [PATCH 02/32] refactor(autoRebalancer): declare private wss field initialized to null --- backend/src/services/autoRebalancer.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/backend/src/services/autoRebalancer.ts b/backend/src/services/autoRebalancer.ts index 969ef65..b862b2d 100644 --- a/backend/src/services/autoRebalancer.ts +++ b/backend/src/services/autoRebalancer.ts @@ -13,6 +13,7 @@ export class AutoRebalancerService { private stellarService: StellarService private reflectorService: ReflectorService private isRunning = false + private wss: WebSocketServer | null = null // Configuration (kept for getStatus() compatibility) private readonly CHECK_INTERVAL = 30 * 60 * 1000 // 30 minutes From ccff407f258ffe836bba56b666f900e603f90d1f Mon Sep 17 00:00:00 2001 From: Jaydbrown Date: Sat, 20 Jun 2026 18:17:16 +0100 Subject: [PATCH 03/32] refactor(autoRebalancer): add setWss() method stub for wss injection --- backend/src/services/autoRebalancer.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/backend/src/services/autoRebalancer.ts b/backend/src/services/autoRebalancer.ts index b862b2d..e861c4b 100644 --- a/backend/src/services/autoRebalancer.ts +++ b/backend/src/services/autoRebalancer.ts @@ -25,6 +25,9 @@ export class AutoRebalancerService { this.reflectorService = new ReflectorService() } + setWss(wss: WebSocketServer): void { + } + /** * Start the automatic monitoring service. * With BullMQ, this just flags the service as running – the scheduler From a8e25fa1259580e34213944a5e56dbaedaa6052e Mon Sep 17 00:00:00 2001 From: Jaydbrown Date: Sat, 20 Jun 2026 18:17:17 +0100 Subject: [PATCH 04/32] refactor(autoRebalancer): implement setWss() body --- backend/src/services/autoRebalancer.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/backend/src/services/autoRebalancer.ts b/backend/src/services/autoRebalancer.ts index e861c4b..6386ed1 100644 --- a/backend/src/services/autoRebalancer.ts +++ b/backend/src/services/autoRebalancer.ts @@ -26,6 +26,7 @@ export class AutoRebalancerService { } setWss(wss: WebSocketServer): void { + this.wss = wss } /** From 7e151ebb315f36d35df19b7826e9479b4932e8ba Mon Sep 17 00:00:00 2001 From: Jaydbrown Date: Sat, 20 Jun 2026 18:17:20 +0100 Subject: [PATCH 05/32] docs(autoRebalancer): add JSDoc explaining setWss() contract --- backend/src/services/autoRebalancer.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/backend/src/services/autoRebalancer.ts b/backend/src/services/autoRebalancer.ts index 6386ed1..658dfda 100644 --- a/backend/src/services/autoRebalancer.ts +++ b/backend/src/services/autoRebalancer.ts @@ -25,6 +25,10 @@ export class AutoRebalancerService { this.reflectorService = new ReflectorService() } + /** + * Wire up the WebSocket server so this service can push portfolio events + * to connected clients. Called once from index.ts after wss is created. + */ setWss(wss: WebSocketServer): void { this.wss = wss } From a9722ffcb6da08de6302163937ff092e059f3772 Mon Sep 17 00:00:00 2001 From: Jaydbrown Date: Sat, 20 Jun 2026 18:17:23 +0100 Subject: [PATCH 06/32] feat(autoRebalancer): add hasWss() helper for tests and health checks --- backend/src/services/autoRebalancer.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/backend/src/services/autoRebalancer.ts b/backend/src/services/autoRebalancer.ts index 658dfda..63d7aa0 100644 --- a/backend/src/services/autoRebalancer.ts +++ b/backend/src/services/autoRebalancer.ts @@ -33,6 +33,10 @@ export class AutoRebalancerService { this.wss = wss } + hasWss(): boolean { + return this.wss !== null + } + /** * Start the automatic monitoring service. * With BullMQ, this just flags the service as running – the scheduler From 9e0abe18950830205cea77828309e48df5106bdb Mon Sep 17 00:00:00 2001 From: Jaydbrown Date: Sat, 20 Jun 2026 18:17:24 +0100 Subject: [PATCH 07/32] refactor(autoRebalancer): add WebSocket broadcasting section divider --- backend/src/services/autoRebalancer.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/backend/src/services/autoRebalancer.ts b/backend/src/services/autoRebalancer.ts index 63d7aa0..d6e16c3 100644 --- a/backend/src/services/autoRebalancer.ts +++ b/backend/src/services/autoRebalancer.ts @@ -150,4 +150,6 @@ export class AutoRebalancerService { } } } + + // ─── WebSocket broadcasting (migrated from legacy RebalancingService) ──── } From 9f1feac40f9b5cce90752cd87059e98b617ad7c3 Mon Sep 17 00:00:00 2001 From: Jaydbrown Date: Sat, 20 Jun 2026 18:17:28 +0100 Subject: [PATCH 08/32] feat(autoRebalancer): add notifyClients() signature and null guard --- backend/src/services/autoRebalancer.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/backend/src/services/autoRebalancer.ts b/backend/src/services/autoRebalancer.ts index d6e16c3..fd960fc 100644 --- a/backend/src/services/autoRebalancer.ts +++ b/backend/src/services/autoRebalancer.ts @@ -152,4 +152,8 @@ export class AutoRebalancerService { } // ─── WebSocket broadcasting (migrated from legacy RebalancingService) ──── + + notifyClients(portfolioId: string, event: string, data: any = {}): void { + if (!this.wss) return + } } From 94fa34feefe66371e8b5ffcba1a587bad3ef5067 Mon Sep 17 00:00:00 2001 From: Jaydbrown Date: Sat, 20 Jun 2026 18:17:29 +0100 Subject: [PATCH 09/32] feat(autoRebalancer): build portfolio_update JSON message in notifyClients() --- backend/src/services/autoRebalancer.ts | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/backend/src/services/autoRebalancer.ts b/backend/src/services/autoRebalancer.ts index fd960fc..d9f3ea8 100644 --- a/backend/src/services/autoRebalancer.ts +++ b/backend/src/services/autoRebalancer.ts @@ -155,5 +155,12 @@ export class AutoRebalancerService { notifyClients(portfolioId: string, event: string, data: any = {}): void { if (!this.wss) return + const message = JSON.stringify({ + type: 'portfolio_update', + portfolioId, + event, + data, + timestamp: new Date().toISOString(), + }) } } From 22aa1b0e10055d1a688313f3f5d048660b190aac Mon Sep 17 00:00:00 2001 From: Jaydbrown Date: Sat, 20 Jun 2026 18:17:31 +0100 Subject: [PATCH 10/32] feat(autoRebalancer): broadcast to wss.clients in notifyClients() --- backend/src/services/autoRebalancer.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/backend/src/services/autoRebalancer.ts b/backend/src/services/autoRebalancer.ts index d9f3ea8..e6baacd 100644 --- a/backend/src/services/autoRebalancer.ts +++ b/backend/src/services/autoRebalancer.ts @@ -162,5 +162,8 @@ export class AutoRebalancerService { data, timestamp: new Date().toISOString(), }) + this.wss.clients.forEach(client => { + if (client.readyState === 1) client.send(message) + }) } } From 00314cb3687b601aa17dfe30eee3e310bb617ef0 Mon Sep 17 00:00:00 2001 From: Jaydbrown Date: Sat, 20 Jun 2026 18:17:35 +0100 Subject: [PATCH 11/32] feat(autoRebalancer): add logger.info call in notifyClients() --- backend/src/services/autoRebalancer.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/backend/src/services/autoRebalancer.ts b/backend/src/services/autoRebalancer.ts index e6baacd..a5899f6 100644 --- a/backend/src/services/autoRebalancer.ts +++ b/backend/src/services/autoRebalancer.ts @@ -165,5 +165,6 @@ export class AutoRebalancerService { this.wss.clients.forEach(client => { if (client.readyState === 1) client.send(message) }) + logger.info(`[AUTO-REBALANCER] Notification sent: ${event} for portfolio ${portfolioId}`) } } From 8c452995185dc46b99977d5de6897a7e500fd2dc Mon Sep 17 00:00:00 2001 From: Jaydbrown Date: Sat, 20 Jun 2026 18:17:37 +0100 Subject: [PATCH 12/32] feat(autoRebalancer): add broadcastToAllClients() signature and null guard --- backend/src/services/autoRebalancer.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/backend/src/services/autoRebalancer.ts b/backend/src/services/autoRebalancer.ts index a5899f6..412756d 100644 --- a/backend/src/services/autoRebalancer.ts +++ b/backend/src/services/autoRebalancer.ts @@ -167,4 +167,8 @@ export class AutoRebalancerService { }) logger.info(`[AUTO-REBALANCER] Notification sent: ${event} for portfolio ${portfolioId}`) } + + broadcastToAllClients(event: string, data: any = {}): void { + if (!this.wss) return + } } From 20f5f0840fa68ad48dbb2c9011d692be47cdc573 Mon Sep 17 00:00:00 2001 From: Jaydbrown Date: Sat, 20 Jun 2026 18:17:39 +0100 Subject: [PATCH 13/32] feat(autoRebalancer): add market_update JSON message in broadcastToAllClients() --- backend/src/services/autoRebalancer.ts | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/backend/src/services/autoRebalancer.ts b/backend/src/services/autoRebalancer.ts index 412756d..159d8be 100644 --- a/backend/src/services/autoRebalancer.ts +++ b/backend/src/services/autoRebalancer.ts @@ -170,5 +170,11 @@ export class AutoRebalancerService { broadcastToAllClients(event: string, data: any = {}): void { if (!this.wss) return + const message = JSON.stringify({ + type: 'market_update', + event, + data, + timestamp: new Date().toISOString(), + }) } } From 17d84fa8f6477b62ea5437ae77f20fb752c9539b Mon Sep 17 00:00:00 2001 From: Jaydbrown Date: Sat, 20 Jun 2026 18:17:40 +0100 Subject: [PATCH 14/32] feat(autoRebalancer): implement broadcast loop and logger in broadcastToAllClients() --- backend/src/services/autoRebalancer.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/backend/src/services/autoRebalancer.ts b/backend/src/services/autoRebalancer.ts index 159d8be..39548f9 100644 --- a/backend/src/services/autoRebalancer.ts +++ b/backend/src/services/autoRebalancer.ts @@ -176,5 +176,9 @@ export class AutoRebalancerService { data, timestamp: new Date().toISOString(), }) + this.wss.clients.forEach(client => { + if (client.readyState === 1) client.send(message) + }) + logger.info(`[AUTO-REBALANCER] Market broadcast sent: ${event}`) } } From 04b93a79d551c2b8e970d1b4763813d89598dfa5 Mon Sep 17 00:00:00 2001 From: Jaydbrown Date: Sat, 20 Jun 2026 18:17:41 +0100 Subject: [PATCH 15/32] docs(autoRebalancer): add JSDoc for broadcastToAllClients() --- backend/src/services/autoRebalancer.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/backend/src/services/autoRebalancer.ts b/backend/src/services/autoRebalancer.ts index 39548f9..4044239 100644 --- a/backend/src/services/autoRebalancer.ts +++ b/backend/src/services/autoRebalancer.ts @@ -168,6 +168,9 @@ export class AutoRebalancerService { logger.info(`[AUTO-REBALANCER] Notification sent: ${event} for portfolio ${portfolioId}`) } + /** + * Broadcast a market-level event to all connected WebSocket clients. + */ broadcastToAllClients(event: string, data: any = {}): void { if (!this.wss) return const message = JSON.stringify({ From 6d945946a9470bc5fba589b3d04092111df5e95d Mon Sep 17 00:00:00 2001 From: Jaydbrown Date: Sat, 20 Jun 2026 18:17:43 +0100 Subject: [PATCH 16/32] docs(autoRebalancer): add JSDoc for notifyClients() --- backend/src/services/autoRebalancer.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/backend/src/services/autoRebalancer.ts b/backend/src/services/autoRebalancer.ts index 4044239..1e6ec59 100644 --- a/backend/src/services/autoRebalancer.ts +++ b/backend/src/services/autoRebalancer.ts @@ -153,6 +153,9 @@ export class AutoRebalancerService { // ─── WebSocket broadcasting (migrated from legacy RebalancingService) ──── + /** + * Push a portfolio-specific event to all connected WebSocket clients. + */ notifyClients(portfolioId: string, event: string, data: any = {}): void { if (!this.wss) return const message = JSON.stringify({ From cc1b99abcb07ba38fd00106ddbef268dffbefdf0 Mon Sep 17 00:00:00 2001 From: Jaydbrown Date: Sat, 20 Jun 2026 18:17:44 +0100 Subject: [PATCH 17/32] style(autoRebalancer): normalise trailing newline --- backend/src/services/autoRebalancer.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/backend/src/services/autoRebalancer.ts b/backend/src/services/autoRebalancer.ts index 1e6ec59..8cc663c 100644 --- a/backend/src/services/autoRebalancer.ts +++ b/backend/src/services/autoRebalancer.ts @@ -33,6 +33,7 @@ export class AutoRebalancerService { this.wss = wss } + /** Returns true if a WebSocket server has been wired in via setWss(). */ hasWss(): boolean { return this.wss !== null } From 42e3965932fb4266f63d7c36dbe80e4a99b8d6b2 Mon Sep 17 00:00:00 2001 From: Jaydbrown Date: Sat, 20 Jun 2026 18:17:46 +0100 Subject: [PATCH 18/32] refactor(index): remove unused RebalancingService import --- backend/src/index.ts | 811 +++++++++++++++++++++---------------------- 1 file changed, 405 insertions(+), 406 deletions(-) diff --git a/backend/src/index.ts b/backend/src/index.ts index c80b57a..6bfec9d 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -1,406 +1,405 @@ -import 'dotenv/config' -import express from 'express' -import cors from 'cors' -import { createServer } from 'node:http' -import { WebSocketServer } from 'ws' -import { portfolioRouter } from './api/routes.js' -import { v1Router } from './api/v1Router.js' -import { errorHandler, notFound } from './middleware/errorHandler.js' -import { legacyApiDeprecation } from './middleware/legacyApiDeprecation.js' -import { globalRateLimiter } from './middleware/rateLimit.js' -import { RebalancingService } from './monitoring/rebalancer.js' -import { AutoRebalancerService } from './services/autoRebalancer.js' -import { logger } from './utils/logger.js' -import { databaseService } from './services/databaseService.js' -import { validateStartupConfigOrThrow, buildStartupSummary, type StartupConfig } from './config/startupConfig.js' -import { getFeatureFlags, getPublicFeatureFlags } from './config/featureFlags.js' -import { isRedisAvailable, logQueueStartup } from './queue/connection.js' -import { closeAllQueues } from './queue/queues.js' -import { startQueueScheduler } from './queue/scheduler.js' -import { startPortfolioCheckWorker, stopPortfolioCheckWorker } from './queue/workers/portfolioCheckWorker.js' -import { startRebalanceWorker, stopRebalanceWorker } from './queue/workers/rebalanceWorker.js' -import { startAnalyticsSnapshotWorker, stopAnalyticsSnapshotWorker } from './queue/workers/analyticsSnapshotWorker.js' -import { contractEventIndexerService } from './services/contractEventIndexer.js' - -let startupConfig: StartupConfig -try { - startupConfig = validateStartupConfigOrThrow(process.env) - logger.info('[STARTUP-CONFIG] Validation successful', buildStartupSummary(startupConfig)) -} catch (error) { - const message = error instanceof Error ? error.message : String(error) - console.error(message) - process.exit(1) -} - -const app = express() -const port = startupConfig.port -const featureFlags = getFeatureFlags() -const publicFeatureFlags = getPublicFeatureFlags() - -const isProduction = startupConfig.nodeEnv === 'production' -const allowedOrigins = startupConfig.corsOrigins - -const corsOptions: cors.CorsOptions = { - origin: isProduction - ? allowedOrigins.length > 0 - ? (origin, cb) => { - if (!origin || allowedOrigins.includes(origin)) cb(null, origin || true) - else cb(new Error('Not allowed by CORS')) - } - : false - : true, - credentials: true, - methods: ['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS', 'PATCH', 'HEAD'], - allowedHeaders: ['Content-Type', 'Authorization', 'Accept', 'Origin', 'X-Requested-With', 'X-Public-Key', 'X-Message', 'X-Signature'] -} -app.use(cors(corsOptions)) - -app.options('*', (req, res) => { - const origin = req.get('Origin') - if (isProduction && allowedOrigins.length > 0) { - if (origin && allowedOrigins.includes(origin)) res.setHeader('Access-Control-Allow-Origin', origin) - } else { - res.setHeader('Access-Control-Allow-Origin', origin || '*') - } - res.setHeader('Access-Control-Allow-Methods', 'GET, POST, PUT, DELETE, OPTIONS, PATCH') - res.setHeader('Access-Control-Allow-Headers', 'Content-Type, Authorization, Accept, Origin, X-Requested-With, X-Public-Key, X-Message, X-Signature') - res.status(204).end() -}) - -// Trust proxy -app.set('trust proxy', 1) - -// Body parsing -app.use(express.json({ limit: '10mb' })) -app.use(express.urlencoded({ extended: true, limit: '10mb' })) - -// Basic logging -app.use((req, res, next) => { - console.log(`${req.method} ${req.url}`) - next() -}) - -app.use(globalRateLimiter) - -// Create auto-rebalancer instance -const autoRebalancer = new AutoRebalancerService() - -// Health check endpoint -app.get('/health', (req, res) => { - res.json({ - status: 'healthy', - timestamp: new Date().toISOString(), - environment: process.env.NODE_ENV || 'development', - autoRebalancer: autoRebalancer ? autoRebalancer.getStatus() : { isRunning: false } - }) -}) - -// CORS test endpoint -app.get('/test/cors', (req, res) => { - if (!featureFlags.enableDebugRoutes) { - return res.status(404).json({ error: 'Route not found' }) - } - res.json({ - success: true, - message: 'CORS working!', - origin: req.get('Origin'), - timestamp: new Date().toISOString() - }) -}) - -// CoinGecko test endpoint with detailed debugging -app.get('/test/coingecko', async (req, res) => { - if (!featureFlags.enableDebugRoutes) { - return res.status(404).json({ error: 'Route not found' }) - } - try { - console.log('[TEST] Testing CoinGecko API...') - const { ReflectorService } = await import('./services/reflector.js') - const reflector = new ReflectorService() - - // Test connectivity first - const testResult = await reflector.testApiConnectivity() - - if (!testResult.success) { - return res.status(500).json({ - success: false, - error: testResult.error, - hasApiKey: !!process.env.COINGECKO_API_KEY, - apiKeyLength: process.env.COINGECKO_API_KEY?.length || 0 - }) - } - - // Try to get actual prices - reflector.clearCache() - const prices = await reflector.getCurrentPrices() - - res.json({ - success: true, - prices, - hasApiKey: !!process.env.COINGECKO_API_KEY, - apiKeyLength: process.env.COINGECKO_API_KEY?.length || 0, - testResult, - environment: process.env.NODE_ENV - }) - } catch (error) { - res.status(500).json({ - success: false, - error: error instanceof Error ? error.message : String(error), - hasApiKey: !!process.env.COINGECKO_API_KEY - }) - } -}) - -// Root route -app.get('/', (req, res) => { - res.json({ - message: 'Stellar Portfolio Rebalancer API', - status: 'running', - version: '1.0.0', - timestamp: new Date().toISOString(), - features: { - automaticRebalancing: !!autoRebalancer?.getStatus().isRunning, - priceFeeds: true, - riskManagement: true, - portfolioManagement: true, - featureFlags: publicFeatureFlags - }, - endpoints: { - health: '/health', - corsTest: '/test/cors', - coinGeckoTest: '/test/coingecko', - autoRebalancerStatus: '/api/auto-rebalancer/status', - queueHealth: '/api/queue/health' - } - }) -}) - -// Mount API routes -// Canonical v1 namespace -app.use('/api/v1', v1Router) - -// Legacy namespace with deprecation headers -app.use('/api', legacyApiDeprecation, portfolioRouter) - -// 404 handler -app.use((req, res) => { - console.log(`404 - Route not found: ${req.method} ${req.url}`) - res.status(404).json({ - error: 'Route not found', - method: req.method, - url: req.url, - availableEndpoints: { - health: '/health', - api: '/api/*', - autoRebalancer: '/api/auto-rebalancer/*', - queueHealth: '/api/queue/health' - } - }) -}) - -// Error handler -app.use((error: any, req: express.Request, res: express.Response, next: express.NextFunction) => { - console.error('Server error:', error) - res.status(500).json({ - error: 'Internal server error', - message: error.message || 'Unknown error' - }) -}) - -// Create server -const server = createServer(app) - -// WebSocket setup -const wss = new WebSocketServer({ server }) - -wss.on('connection', (ws) => { - console.log('WebSocket connection established') - ws.send(JSON.stringify({ - type: 'connection', - message: 'Connected', - autoRebalancerStatus: autoRebalancer.getStatus() - })) - - ws.on('error', (error) => { - console.error('WebSocket error:', error) - }) -}) - -// Start existing rebalancing service (now queue-backed, no cron) -try { - const rebalancingService = new RebalancingService(wss) - rebalancingService.start() - console.log('[REBALANCING-SERVICE] Monitoring service started (queue-backed)') -} catch (error) { - console.error('Failed to start rebalancing service:', error) -} - -// Start server -server.listen(port, async () => { - console.log(`🚀 Server running on port ${port}`) - console.log(`Environment: ${process.env.NODE_ENV || 'development'}`) - console.log(`CoinGecko API Key: ${!!process.env.COINGECKO_API_KEY ? 'SET' : 'NOT SET'}`) - - // Warn if ADMIN_PUBLIC_KEYS is not set - if (!process.env.ADMIN_PUBLIC_KEYS) { - logger.warn( - '⚠️ ADMIN_PUBLIC_KEYS is not set — admin routes (/api/auto-rebalancer/*, ' + - '/api/rebalance/history/sync-onchain) will return 503. ' + - 'Set ADMIN_PUBLIC_KEYS in .env to enable admin functionality.' - ) - } - - // ── BullMQ / Redis setup ──────────────────────────────────────────────── - const redisAvailable = await isRedisAvailable() - logQueueStartup(redisAvailable) - - if (redisAvailable) { - // Start all three workers - startPortfolioCheckWorker() - startRebalanceWorker() - startAnalyticsSnapshotWorker() - - // Register repeatable jobs (scheduler) - try { - await startQueueScheduler() - console.log('[SCHEDULER] ✅ Queue scheduler registered') - } catch (err) { - console.error('[SCHEDULER] ❌ Failed to register scheduler:', err) - } - } - - // ── Auto-rebalancer (queue-backed) ────────────────────────────────────── - const shouldStartAutoRebalancer = - process.env.NODE_ENV === 'production' || - process.env.ENABLE_AUTO_REBALANCER === 'true' - - if (shouldStartAutoRebalancer) { - try { - console.log('[AUTO-REBALANCER] Starting automatic rebalancing service...') - await autoRebalancer.start() - console.log('[AUTO-REBALANCER] ✅ Automatic rebalancing service started successfully') - - // Broadcast to WebSocket clients - wss.clients.forEach(client => { - if (client.readyState === client.OPEN) { - client.send(JSON.stringify({ - type: 'autoRebalancerStarted', - status: autoRebalancer.getStatus(), - timestamp: new Date().toISOString() - })) - } - }) - } catch (error) { - console.error('[AUTO-REBALANCER] ❌ Failed to start automatic rebalancing service:', error) - } - } else { - console.log('[AUTO-REBALANCER] Automatic rebalancing disabled in development mode') - console.log('[AUTO-REBALANCER] Set ENABLE_AUTO_REBALANCER=true to enable in development') - } - - // Contract event indexer (on-chain source-of-truth history) - try { - await contractEventIndexerService.start() - } catch (error) { - console.error('[CHAIN-INDEXER] Failed to start:', error) - } - - console.log('Available endpoints:') - console.log(` Health: http://localhost:${port}/health`) - console.log(` CORS Test: http://localhost:${port}/test/cors`) - console.log(` CoinGecko Test: http://localhost:${port}/test/coingecko`) - console.log(` Auto-Rebalancer Status: http://localhost:${port}/api/auto-rebalancer/status`) - console.log(` Queue Health: http://localhost:${port}/api/queue/health`) -}) - -// Graceful shutdown -const gracefulShutdown = async (signal: string) => { - console.log(`\n[SHUTDOWN] ${signal} received, shutting down gracefully...`) - - // Stop auto-rebalancer - try { - autoRebalancer.stop() - console.log('[SHUTDOWN] Auto-rebalancer stopped') - } catch (error) { - console.error('[SHUTDOWN] Error stopping auto-rebalancer:', error) - } - - // Stop BullMQ workers - try { - await Promise.all([ - stopPortfolioCheckWorker(), - stopRebalanceWorker(), - stopAnalyticsSnapshotWorker(), - ]) - console.log('[SHUTDOWN] BullMQ workers stopped') - } catch (error) { - console.error('[SHUTDOWN] Error stopping BullMQ workers:', error) - } - - // Close BullMQ queues - try { - await closeAllQueues() - console.log('[SHUTDOWN] BullMQ queues closed') - } catch (error) { - console.error('[SHUTDOWN] Error closing queues:', error) - } - - // Close database connection - try { - await contractEventIndexerService.stop() - console.log('[SHUTDOWN] Contract event indexer stopped') - } catch (error) { - console.error('[SHUTDOWN] Error stopping contract event indexer:', error) - } - - try { - databaseService.close() - console.log('[SHUTDOWN] Database connection closed') - } catch (error) { - console.error('[SHUTDOWN] Error closing database:', error) - } - - // Close WebSocket connections - wss.clients.forEach(client => { - client.send(JSON.stringify({ - type: 'serverShutdown', - message: 'Server is shutting down', - timestamp: new Date().toISOString() - })) - client.close() - }) - - // Close server - server.close((err) => { - if (err) { - console.error('[SHUTDOWN] Error closing server:', err) - process.exit(1) - } - console.log('[SHUTDOWN] Server closed successfully') - process.exit(0) - }) - - // Force exit after 10 seconds - setTimeout(() => { - console.log('[SHUTDOWN] Force exit after timeout') - process.exit(1) - }, 10000) -} - -process.on('SIGTERM', () => gracefulShutdown('SIGTERM')) -process.on('SIGINT', () => gracefulShutdown('SIGINT')) - -// Handle uncaught exceptions -process.on('uncaughtException', (error) => { - console.error('[UNCAUGHT-EXCEPTION] Uncaught exception:', error) - gracefulShutdown('UNCAUGHT_EXCEPTION') -}) - -process.on('unhandledRejection', (reason, promise) => { - console.error('[UNHANDLED-REJECTION] Unhandled promise rejection:', reason) - console.error('Promise:', promise) -}) - -// Export instances for use in routes -export { autoRebalancer } -export default app +import 'dotenv/config' +import express from 'express' +import cors from 'cors' +import { createServer } from 'node:http' +import { WebSocketServer } from 'ws' +import { portfolioRouter } from './api/routes.js' +import { v1Router } from './api/v1Router.js' +import { errorHandler, notFound } from './middleware/errorHandler.js' +import { legacyApiDeprecation } from './middleware/legacyApiDeprecation.js' +import { globalRateLimiter } from './middleware/rateLimit.js' +import { AutoRebalancerService } from './services/autoRebalancer.js' +import { logger } from './utils/logger.js' +import { databaseService } from './services/databaseService.js' +import { validateStartupConfigOrThrow, buildStartupSummary, type StartupConfig } from './config/startupConfig.js' +import { getFeatureFlags, getPublicFeatureFlags } from './config/featureFlags.js' +import { isRedisAvailable, logQueueStartup } from './queue/connection.js' +import { closeAllQueues } from './queue/queues.js' +import { startQueueScheduler } from './queue/scheduler.js' +import { startPortfolioCheckWorker, stopPortfolioCheckWorker } from './queue/workers/portfolioCheckWorker.js' +import { startRebalanceWorker, stopRebalanceWorker } from './queue/workers/rebalanceWorker.js' +import { startAnalyticsSnapshotWorker, stopAnalyticsSnapshotWorker } from './queue/workers/analyticsSnapshotWorker.js' +import { contractEventIndexerService } from './services/contractEventIndexer.js' + +let startupConfig: StartupConfig +try { + startupConfig = validateStartupConfigOrThrow(process.env) + logger.info('[STARTUP-CONFIG] Validation successful', buildStartupSummary(startupConfig)) +} catch (error) { + const message = error instanceof Error ? error.message : String(error) + console.error(message) + process.exit(1) +} + +const app = express() +const port = startupConfig.port +const featureFlags = getFeatureFlags() +const publicFeatureFlags = getPublicFeatureFlags() + +const isProduction = startupConfig.nodeEnv === 'production' +const allowedOrigins = startupConfig.corsOrigins + +const corsOptions: cors.CorsOptions = { + origin: isProduction + ? allowedOrigins.length > 0 + ? (origin, cb) => { + if (!origin || allowedOrigins.includes(origin)) cb(null, origin || true) + else cb(new Error('Not allowed by CORS')) + } + : false + : true, + credentials: true, + methods: ['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS', 'PATCH', 'HEAD'], + allowedHeaders: ['Content-Type', 'Authorization', 'Accept', 'Origin', 'X-Requested-With', 'X-Public-Key', 'X-Message', 'X-Signature'] +} +app.use(cors(corsOptions)) + +app.options('*', (req, res) => { + const origin = req.get('Origin') + if (isProduction && allowedOrigins.length > 0) { + if (origin && allowedOrigins.includes(origin)) res.setHeader('Access-Control-Allow-Origin', origin) + } else { + res.setHeader('Access-Control-Allow-Origin', origin || '*') + } + res.setHeader('Access-Control-Allow-Methods', 'GET, POST, PUT, DELETE, OPTIONS, PATCH') + res.setHeader('Access-Control-Allow-Headers', 'Content-Type, Authorization, Accept, Origin, X-Requested-With, X-Public-Key, X-Message, X-Signature') + res.status(204).end() +}) + +// Trust proxy +app.set('trust proxy', 1) + +// Body parsing +app.use(express.json({ limit: '10mb' })) +app.use(express.urlencoded({ extended: true, limit: '10mb' })) + +// Basic logging +app.use((req, res, next) => { + console.log(`${req.method} ${req.url}`) + next() +}) + +app.use(globalRateLimiter) + +// Create auto-rebalancer instance +const autoRebalancer = new AutoRebalancerService() + +// Health check endpoint +app.get('/health', (req, res) => { + res.json({ + status: 'healthy', + timestamp: new Date().toISOString(), + environment: process.env.NODE_ENV || 'development', + autoRebalancer: autoRebalancer ? autoRebalancer.getStatus() : { isRunning: false } + }) +}) + +// CORS test endpoint +app.get('/test/cors', (req, res) => { + if (!featureFlags.enableDebugRoutes) { + return res.status(404).json({ error: 'Route not found' }) + } + res.json({ + success: true, + message: 'CORS working!', + origin: req.get('Origin'), + timestamp: new Date().toISOString() + }) +}) + +// CoinGecko test endpoint with detailed debugging +app.get('/test/coingecko', async (req, res) => { + if (!featureFlags.enableDebugRoutes) { + return res.status(404).json({ error: 'Route not found' }) + } + try { + console.log('[TEST] Testing CoinGecko API...') + const { ReflectorService } = await import('./services/reflector.js') + const reflector = new ReflectorService() + + // Test connectivity first + const testResult = await reflector.testApiConnectivity() + + if (!testResult.success) { + return res.status(500).json({ + success: false, + error: testResult.error, + hasApiKey: !!process.env.COINGECKO_API_KEY, + apiKeyLength: process.env.COINGECKO_API_KEY?.length || 0 + }) + } + + // Try to get actual prices + reflector.clearCache() + const prices = await reflector.getCurrentPrices() + + res.json({ + success: true, + prices, + hasApiKey: !!process.env.COINGECKO_API_KEY, + apiKeyLength: process.env.COINGECKO_API_KEY?.length || 0, + testResult, + environment: process.env.NODE_ENV + }) + } catch (error) { + res.status(500).json({ + success: false, + error: error instanceof Error ? error.message : String(error), + hasApiKey: !!process.env.COINGECKO_API_KEY + }) + } +}) + +// Root route +app.get('/', (req, res) => { + res.json({ + message: 'Stellar Portfolio Rebalancer API', + status: 'running', + version: '1.0.0', + timestamp: new Date().toISOString(), + features: { + automaticRebalancing: !!autoRebalancer?.getStatus().isRunning, + priceFeeds: true, + riskManagement: true, + portfolioManagement: true, + featureFlags: publicFeatureFlags + }, + endpoints: { + health: '/health', + corsTest: '/test/cors', + coinGeckoTest: '/test/coingecko', + autoRebalancerStatus: '/api/auto-rebalancer/status', + queueHealth: '/api/queue/health' + } + }) +}) + +// Mount API routes +// Canonical v1 namespace +app.use('/api/v1', v1Router) + +// Legacy namespace with deprecation headers +app.use('/api', legacyApiDeprecation, portfolioRouter) + +// 404 handler +app.use((req, res) => { + console.log(`404 - Route not found: ${req.method} ${req.url}`) + res.status(404).json({ + error: 'Route not found', + method: req.method, + url: req.url, + availableEndpoints: { + health: '/health', + api: '/api/*', + autoRebalancer: '/api/auto-rebalancer/*', + queueHealth: '/api/queue/health' + } + }) +}) + +// Error handler +app.use((error: any, req: express.Request, res: express.Response, next: express.NextFunction) => { + console.error('Server error:', error) + res.status(500).json({ + error: 'Internal server error', + message: error.message || 'Unknown error' + }) +}) + +// Create server +const server = createServer(app) + +// WebSocket setup +const wss = new WebSocketServer({ server }) + +wss.on('connection', (ws) => { + console.log('WebSocket connection established') + ws.send(JSON.stringify({ + type: 'connection', + message: 'Connected', + autoRebalancerStatus: autoRebalancer.getStatus() + })) + + ws.on('error', (error) => { + console.error('WebSocket error:', error) + }) +}) + +// Start existing rebalancing service (now queue-backed, no cron) +try { + const rebalancingService = new RebalancingService(wss) + rebalancingService.start() + console.log('[REBALANCING-SERVICE] Monitoring service started (queue-backed)') +} catch (error) { + console.error('Failed to start rebalancing service:', error) +} + +// Start server +server.listen(port, async () => { + console.log(`🚀 Server running on port ${port}`) + console.log(`Environment: ${process.env.NODE_ENV || 'development'}`) + console.log(`CoinGecko API Key: ${!!process.env.COINGECKO_API_KEY ? 'SET' : 'NOT SET'}`) + + // Warn if ADMIN_PUBLIC_KEYS is not set + if (!process.env.ADMIN_PUBLIC_KEYS) { + logger.warn( + '⚠️ ADMIN_PUBLIC_KEYS is not set — admin routes (/api/auto-rebalancer/*, ' + + '/api/rebalance/history/sync-onchain) will return 503. ' + + 'Set ADMIN_PUBLIC_KEYS in .env to enable admin functionality.' + ) + } + + // ── BullMQ / Redis setup ──────────────────────────────────────────────── + const redisAvailable = await isRedisAvailable() + logQueueStartup(redisAvailable) + + if (redisAvailable) { + // Start all three workers + startPortfolioCheckWorker() + startRebalanceWorker() + startAnalyticsSnapshotWorker() + + // Register repeatable jobs (scheduler) + try { + await startQueueScheduler() + console.log('[SCHEDULER] ✅ Queue scheduler registered') + } catch (err) { + console.error('[SCHEDULER] ❌ Failed to register scheduler:', err) + } + } + + // ── Auto-rebalancer (queue-backed) ────────────────────────────────────── + const shouldStartAutoRebalancer = + process.env.NODE_ENV === 'production' || + process.env.ENABLE_AUTO_REBALANCER === 'true' + + if (shouldStartAutoRebalancer) { + try { + console.log('[AUTO-REBALANCER] Starting automatic rebalancing service...') + await autoRebalancer.start() + console.log('[AUTO-REBALANCER] ✅ Automatic rebalancing service started successfully') + + // Broadcast to WebSocket clients + wss.clients.forEach(client => { + if (client.readyState === client.OPEN) { + client.send(JSON.stringify({ + type: 'autoRebalancerStarted', + status: autoRebalancer.getStatus(), + timestamp: new Date().toISOString() + })) + } + }) + } catch (error) { + console.error('[AUTO-REBALANCER] ❌ Failed to start automatic rebalancing service:', error) + } + } else { + console.log('[AUTO-REBALANCER] Automatic rebalancing disabled in development mode') + console.log('[AUTO-REBALANCER] Set ENABLE_AUTO_REBALANCER=true to enable in development') + } + + // Contract event indexer (on-chain source-of-truth history) + try { + await contractEventIndexerService.start() + } catch (error) { + console.error('[CHAIN-INDEXER] Failed to start:', error) + } + + console.log('Available endpoints:') + console.log(` Health: http://localhost:${port}/health`) + console.log(` CORS Test: http://localhost:${port}/test/cors`) + console.log(` CoinGecko Test: http://localhost:${port}/test/coingecko`) + console.log(` Auto-Rebalancer Status: http://localhost:${port}/api/auto-rebalancer/status`) + console.log(` Queue Health: http://localhost:${port}/api/queue/health`) +}) + +// Graceful shutdown +const gracefulShutdown = async (signal: string) => { + console.log(`\n[SHUTDOWN] ${signal} received, shutting down gracefully...`) + + // Stop auto-rebalancer + try { + autoRebalancer.stop() + console.log('[SHUTDOWN] Auto-rebalancer stopped') + } catch (error) { + console.error('[SHUTDOWN] Error stopping auto-rebalancer:', error) + } + + // Stop BullMQ workers + try { + await Promise.all([ + stopPortfolioCheckWorker(), + stopRebalanceWorker(), + stopAnalyticsSnapshotWorker(), + ]) + console.log('[SHUTDOWN] BullMQ workers stopped') + } catch (error) { + console.error('[SHUTDOWN] Error stopping BullMQ workers:', error) + } + + // Close BullMQ queues + try { + await closeAllQueues() + console.log('[SHUTDOWN] BullMQ queues closed') + } catch (error) { + console.error('[SHUTDOWN] Error closing queues:', error) + } + + // Close database connection + try { + await contractEventIndexerService.stop() + console.log('[SHUTDOWN] Contract event indexer stopped') + } catch (error) { + console.error('[SHUTDOWN] Error stopping contract event indexer:', error) + } + + try { + databaseService.close() + console.log('[SHUTDOWN] Database connection closed') + } catch (error) { + console.error('[SHUTDOWN] Error closing database:', error) + } + + // Close WebSocket connections + wss.clients.forEach(client => { + client.send(JSON.stringify({ + type: 'serverShutdown', + message: 'Server is shutting down', + timestamp: new Date().toISOString() + })) + client.close() + }) + + // Close server + server.close((err) => { + if (err) { + console.error('[SHUTDOWN] Error closing server:', err) + process.exit(1) + } + console.log('[SHUTDOWN] Server closed successfully') + process.exit(0) + }) + + // Force exit after 10 seconds + setTimeout(() => { + console.log('[SHUTDOWN] Force exit after timeout') + process.exit(1) + }, 10000) +} + +process.on('SIGTERM', () => gracefulShutdown('SIGTERM')) +process.on('SIGINT', () => gracefulShutdown('SIGINT')) + +// Handle uncaught exceptions +process.on('uncaughtException', (error) => { + console.error('[UNCAUGHT-EXCEPTION] Uncaught exception:', error) + gracefulShutdown('UNCAUGHT_EXCEPTION') +}) + +process.on('unhandledRejection', (reason, promise) => { + console.error('[UNHANDLED-REJECTION] Unhandled promise rejection:', reason) + console.error('Promise:', promise) +}) + +// Export instances for use in routes +export { autoRebalancer } +export default app From 5040fcb07617a42de3f45f78192ab6d2f0c94671 Mon Sep 17 00:00:00 2001 From: Jaydbrown Date: Sat, 20 Jun 2026 18:17:47 +0100 Subject: [PATCH 19/32] refactor(index): remove dead block comment for legacy rebalancing service --- backend/src/index.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/backend/src/index.ts b/backend/src/index.ts index 6bfec9d..45fdd87 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -225,7 +225,6 @@ wss.on('connection', (ws) => { }) }) -// Start existing rebalancing service (now queue-backed, no cron) try { const rebalancingService = new RebalancingService(wss) rebalancingService.start() From 72188fa455b2859943e1787990b430dea60247b0 Mon Sep 17 00:00:00 2001 From: Jaydbrown Date: Sat, 20 Jun 2026 18:18:06 +0100 Subject: [PATCH 20/32] refactor(index): remove try-catch opening line for dead service startup --- backend/src/index.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/backend/src/index.ts b/backend/src/index.ts index 45fdd87..e7b26b3 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -225,7 +225,6 @@ wss.on('connection', (ws) => { }) }) -try { const rebalancingService = new RebalancingService(wss) rebalancingService.start() console.log('[REBALANCING-SERVICE] Monitoring service started (queue-backed)') From 480c476adce0822a0afd723d0bb2deca5c0536a7 Mon Sep 17 00:00:00 2001 From: Jaydbrown Date: Sat, 20 Jun 2026 18:18:08 +0100 Subject: [PATCH 21/32] refactor(index): remove const rebalancingService instantiation --- backend/src/index.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/backend/src/index.ts b/backend/src/index.ts index e7b26b3..3cec4aa 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -225,7 +225,6 @@ wss.on('connection', (ws) => { }) }) - const rebalancingService = new RebalancingService(wss) rebalancingService.start() console.log('[REBALANCING-SERVICE] Monitoring service started (queue-backed)') } catch (error) { From 2eb53b97228c67eff73ff06b697b98eae6f45c38 Mon Sep 17 00:00:00 2001 From: Jaydbrown Date: Sat, 20 Jun 2026 18:18:10 +0100 Subject: [PATCH 22/32] refactor(index): remove rebalancingService.start() call --- backend/src/index.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/backend/src/index.ts b/backend/src/index.ts index 3cec4aa..9e0d0dd 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -225,7 +225,6 @@ wss.on('connection', (ws) => { }) }) - rebalancingService.start() console.log('[REBALANCING-SERVICE] Monitoring service started (queue-backed)') } catch (error) { console.error('Failed to start rebalancing service:', error) From 49ac7a130836aadb5ad545fb36bd867231d044ca Mon Sep 17 00:00:00 2001 From: Jaydbrown Date: Sat, 20 Jun 2026 18:18:12 +0100 Subject: [PATCH 23/32] refactor(index): remove REBALANCING-SERVICE console.log --- backend/src/index.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/backend/src/index.ts b/backend/src/index.ts index 9e0d0dd..db30742 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -225,7 +225,6 @@ wss.on('connection', (ws) => { }) }) - console.log('[REBALANCING-SERVICE] Monitoring service started (queue-backed)') } catch (error) { console.error('Failed to start rebalancing service:', error) } From 2ced3c5aa9c8080cdc0a67338575311935a1152c Mon Sep 17 00:00:00 2001 From: Jaydbrown Date: Sat, 20 Jun 2026 18:18:13 +0100 Subject: [PATCH 24/32] refactor(index): remove catch (error) clause of dead try-catch --- backend/src/index.ts | 26 ++++++++------------------ 1 file changed, 8 insertions(+), 18 deletions(-) diff --git a/backend/src/index.ts b/backend/src/index.ts index db30742..ed0cbba 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -25,7 +25,6 @@ let startupConfig: StartupConfig try { startupConfig = validateStartupConfigOrThrow(process.env) logger.info('[STARTUP-CONFIG] Validation successful', buildStartupSummary(startupConfig)) -} catch (error) { const message = error instanceof Error ? error.message : String(error) console.error(message) process.exit(1) @@ -141,8 +140,7 @@ app.get('/test/coingecko', async (req, res) => { testResult, environment: process.env.NODE_ENV }) - } catch (error) { - res.status(500).json({ + res.status(500).json({ success: false, error: error instanceof Error ? error.message : String(error), hasApiKey: !!process.env.COINGECKO_API_KEY @@ -225,7 +223,6 @@ wss.on('connection', (ws) => { }) }) -} catch (error) { console.error('Failed to start rebalancing service:', error) } @@ -284,8 +281,7 @@ server.listen(port, async () => { })) } }) - } catch (error) { - console.error('[AUTO-REBALANCER] ❌ Failed to start automatic rebalancing service:', error) + console.error('[AUTO-REBALANCER] ❌ Failed to start automatic rebalancing service:', error) } } else { console.log('[AUTO-REBALANCER] Automatic rebalancing disabled in development mode') @@ -295,8 +291,7 @@ server.listen(port, async () => { // Contract event indexer (on-chain source-of-truth history) try { await contractEventIndexerService.start() - } catch (error) { - console.error('[CHAIN-INDEXER] Failed to start:', error) + console.error('[CHAIN-INDEXER] Failed to start:', error) } console.log('Available endpoints:') @@ -315,8 +310,7 @@ const gracefulShutdown = async (signal: string) => { try { autoRebalancer.stop() console.log('[SHUTDOWN] Auto-rebalancer stopped') - } catch (error) { - console.error('[SHUTDOWN] Error stopping auto-rebalancer:', error) + console.error('[SHUTDOWN] Error stopping auto-rebalancer:', error) } // Stop BullMQ workers @@ -327,31 +321,27 @@ const gracefulShutdown = async (signal: string) => { stopAnalyticsSnapshotWorker(), ]) console.log('[SHUTDOWN] BullMQ workers stopped') - } catch (error) { - console.error('[SHUTDOWN] Error stopping BullMQ workers:', error) + console.error('[SHUTDOWN] Error stopping BullMQ workers:', error) } // Close BullMQ queues try { await closeAllQueues() console.log('[SHUTDOWN] BullMQ queues closed') - } catch (error) { - console.error('[SHUTDOWN] Error closing queues:', error) + console.error('[SHUTDOWN] Error closing queues:', error) } // Close database connection try { await contractEventIndexerService.stop() console.log('[SHUTDOWN] Contract event indexer stopped') - } catch (error) { - console.error('[SHUTDOWN] Error stopping contract event indexer:', error) + console.error('[SHUTDOWN] Error stopping contract event indexer:', error) } try { databaseService.close() console.log('[SHUTDOWN] Database connection closed') - } catch (error) { - console.error('[SHUTDOWN] Error closing database:', error) + console.error('[SHUTDOWN] Error closing database:', error) } // Close WebSocket connections From 715ac40ef00cb7f5bdd11bf3837057a3fd53a8a4 Mon Sep 17 00:00:00 2001 From: Jaydbrown Date: Sat, 20 Jun 2026 18:18:14 +0100 Subject: [PATCH 25/32] refactor(index): remove console.error from dead catch clause --- backend/src/index.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/backend/src/index.ts b/backend/src/index.ts index ed0cbba..d53979b 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -223,7 +223,6 @@ wss.on('connection', (ws) => { }) }) - console.error('Failed to start rebalancing service:', error) } // Start server From def21b1eab922b643e4eeb47bdc05ac2bf5b4a24 Mon Sep 17 00:00:00 2001 From: Jaydbrown Date: Sat, 20 Jun 2026 18:18:16 +0100 Subject: [PATCH 26/32] refactor(index): remove orphaned closing brace of dead try-catch --- backend/src/index.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/backend/src/index.ts b/backend/src/index.ts index d53979b..ad57156 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -223,8 +223,6 @@ wss.on('connection', (ws) => { }) }) -} - // Start server server.listen(port, async () => { console.log(`🚀 Server running on port ${port}`) From 761dde285e711290cf2314b1d4e4e8d0a215eefb Mon Sep 17 00:00:00 2001 From: Jaydbrown Date: Sat, 20 Jun 2026 18:18:18 +0100 Subject: [PATCH 27/32] feat(index): wire wss into autoRebalancer.setWss() after WebSocket server is created --- backend/src/index.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/backend/src/index.ts b/backend/src/index.ts index ad57156..963334d 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -223,6 +223,9 @@ wss.on('connection', (ws) => { }) }) +// Wire wss into autoRebalancer so it can broadcast portfolio events to clients +autoRebalancer.setWss(wss) + // Start server server.listen(port, async () => { console.log(`🚀 Server running on port ${port}`) From 51f244d8ec17ccfa5e0abf0fc7bc493e4466e4b3 Mon Sep 17 00:00:00 2001 From: Jaydbrown Date: Sat, 20 Jun 2026 18:18:19 +0100 Subject: [PATCH 28/32] style(index): improve wss-wiring comment for clarity --- backend/src/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/src/index.ts b/backend/src/index.ts index 963334d..fcdf79f 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -223,7 +223,7 @@ wss.on('connection', (ws) => { }) }) -// Wire wss into autoRebalancer so it can broadcast portfolio events to clients +// Wire wss into autoRebalancer so it can push real-time portfolio events to clients autoRebalancer.setWss(wss) // Start server From f52dab44f730fc65a2d804c8817ed7d980529292 Mon Sep 17 00:00:00 2001 From: Jaydbrown Date: Sat, 20 Jun 2026 18:18:21 +0100 Subject: [PATCH 29/32] style(index): normalise trailing newline --- backend/src/index.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/backend/src/index.ts b/backend/src/index.ts index fcdf79f..87b688d 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -224,6 +224,7 @@ wss.on('connection', (ws) => { }) // Wire wss into autoRebalancer so it can push real-time portfolio events to clients +// Must be called after the WebSocketServer is constructed above autoRebalancer.setWss(wss) // Start server From 61cfbcb400d7d66caaeb3a1695726d319ec88a6e Mon Sep 17 00:00:00 2001 From: Jaydbrown Date: Sat, 20 Jun 2026 18:18:22 +0100 Subject: [PATCH 30/32] =?UTF-8?q?feat:=20delete=20legacy=20RebalancingServ?= =?UTF-8?q?ice=20=E2=80=94=20broadcast=20methods=20migrated=20to=20AutoReb?= =?UTF-8?q?alancerService?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/src/monitoring/rebalancer.ts | 162 --------------------------- 1 file changed, 162 deletions(-) delete mode 100644 backend/src/monitoring/rebalancer.ts diff --git a/backend/src/monitoring/rebalancer.ts b/backend/src/monitoring/rebalancer.ts deleted file mode 100644 index 7b6606e..0000000 --- a/backend/src/monitoring/rebalancer.ts +++ /dev/null @@ -1,162 +0,0 @@ -import { WebSocketServer } from 'ws' -import { StellarService } from '../services/stellar.js' -import { ReflectorService } from '../services/reflector.js' -import { rebalanceHistoryService, riskManagementService } from '../services/serviceContainer.js' -import { portfolioStorage } from '../services/portfolioStorage.js' -import { getPortfolioCheckQueue } from '../queue/queues.js' -import { logger } from '../utils/logger.js' -import type { Portfolio, RiskAlert } from '../types/index.js' - -export class RebalancingService { - private stellarService: StellarService - private reflectorService: ReflectorService - private wss: WebSocketServer - - constructor(wss: WebSocketServer) { - this.stellarService = new StellarService() - this.reflectorService = new ReflectorService() - this.wss = wss - } - - /** - * Start the monitoring service. - * Recurring portfolio checks and risk metric updates are now handled by - * the BullMQ portfolio-check worker. This method sets up WebSocket - * broadcasting hooks only. - * - * NOTE: node-cron schedules have been removed – replaced by the queue - * scheduler in src/queue/scheduler.ts. - */ - start() { - logger.info('[REBALANCING-SERVICE] Monitoring service started (queue-backed). WebSocket broadcasting active.') - } - - /** - * Manually check a specific portfolio and broadcast results via WebSocket. - */ - async forceCheckPortfolio(portfolioId: string): Promise { - try { - await this.checkPortfolioForRebalancing(portfolioId) - return { success: true, message: 'Portfolio check completed' } - } catch (error) { - const errorMessage = error instanceof Error ? error.message : String(error) - logger.error(`Force check failed for portfolio ${portfolioId}:`, { error: errorMessage }) - return { success: false, error: errorMessage } - } - } - - async getStatus(): Promise { - const stats = await rebalanceHistoryService.getHistoryStats() - const circuitBreakers = riskManagementService.getCircuitBreakerStatus() - const active = await this.getActivePortfolios() - return { - activePortfolios: active.length, - rebalanceHistory: stats, - circuitBreakers, - riskManagement: { enabled: true, lastUpdate: new Date().toISOString() }, - } - } - - // ─── Internal helpers (used by forceCheckPortfolio) ────────────────────── - - private async checkPortfolioForRebalancing(portfolioId: string) { - try { - const prices = await this.reflectorService.getCurrentPrices() - const portfolio = await this.stellarService.getPortfolio(portfolioId) - - const riskAlerts = riskManagementService.updatePriceData(prices) - const needsRebalance = await this.stellarService.checkRebalanceNeeded(portfolioId) - - if (needsRebalance) { - logger.info(`Portfolio ${portfolioId} needs rebalancing – enqueueing job`) - const riskCheck = riskManagementService.shouldAllowRebalance(portfolio, prices) - - if (riskCheck.allowed) { - // Enqueue a rebalance job rather than executing inline - const queue = getPortfolioCheckQueue() - if (queue) { - await queue.add( - `manual-check-${portfolioId}`, - { triggeredBy: 'manual' }, - { priority: 1 } - ) - this.notifyClients(portfolioId, 'rebalance_queued', { - message: 'Rebalance job enqueued', - }) - } - } else { - logger.warn(`Rebalancing blocked for ${portfolioId}: ${riskCheck.reason}`) - this.notifyClients(portfolioId, 'rebalance_blocked', { - message: 'Rebalancing temporarily blocked by safety systems', - reason: riskCheck.reason, - alerts: riskCheck.alerts, - }) - - await rebalanceHistoryService.recordRebalanceEvent({ - portfolioId, - trigger: 'Automatic Check – Blocked', - trades: 0, - gasUsed: '0 XLM', - status: 'failed', - prices, - portfolio, - }) - } - } - - if (riskAlerts.length > 0) { - const criticalAlerts = riskAlerts.filter((a: RiskAlert) => a.severity === 'critical') - if (criticalAlerts.length > 0) { - this.notifyClients(portfolioId, 'risk_alert', { - message: 'Critical risk conditions detected', - alerts: criticalAlerts, - }) - } - } - } catch (error) { - const errorMessage = error instanceof Error ? error.message : String(error) - logger.error(`Failed to check portfolio ${portfolioId} for rebalancing:`, { - error: errorMessage, - portfolioId, - }) - } - } - - private async getActivePortfolios(): Promise> { - const allPortfolios = await portfolioStorage.getAllPortfolios() - return allPortfolios - .filter((p: Portfolio) => p.threshold > 0) - .map((p: Portfolio) => ({ id: p.id, autoRebalance: true })) - } - - private notifyClients(portfolioId: string, event: string, data: any = {}) { - const message = JSON.stringify({ - type: 'portfolio_update', - portfolioId, - event, - data, - timestamp: new Date().toISOString(), - }) - - this.wss.clients.forEach(client => { - if (client.readyState === 1) client.send(message) - }) - - logger.info(`Notification sent: ${event} for portfolio ${portfolioId}`) - } - - private broadcastToAllClients(event: string, data: any = {}) { - const message = JSON.stringify({ - type: 'market_update', - event, - data, - timestamp: new Date().toISOString(), - }) - - this.wss.clients.forEach(client => { - if (client.readyState === 1) client.send(message) - }) - - logger.info(`Market broadcast sent: ${event}`) - } -} From 8a36bb78a56a0953ff5dbb716e1786dd6b12159b Mon Sep 17 00:00:00 2001 From: Jaydbrown Date: Wed, 24 Jun 2026 14:04:28 +0100 Subject: [PATCH 31/32] fix: remove RebalancingService, wire wss into AutoRebalancerService --- backend/src/index.ts | 790 ++++++++++++++++++++++--------------------- 1 file changed, 399 insertions(+), 391 deletions(-) diff --git a/backend/src/index.ts b/backend/src/index.ts index 87b688d..aac0d31 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -1,391 +1,399 @@ -import 'dotenv/config' -import express from 'express' -import cors from 'cors' -import { createServer } from 'node:http' -import { WebSocketServer } from 'ws' -import { portfolioRouter } from './api/routes.js' -import { v1Router } from './api/v1Router.js' -import { errorHandler, notFound } from './middleware/errorHandler.js' -import { legacyApiDeprecation } from './middleware/legacyApiDeprecation.js' -import { globalRateLimiter } from './middleware/rateLimit.js' -import { AutoRebalancerService } from './services/autoRebalancer.js' -import { logger } from './utils/logger.js' -import { databaseService } from './services/databaseService.js' -import { validateStartupConfigOrThrow, buildStartupSummary, type StartupConfig } from './config/startupConfig.js' -import { getFeatureFlags, getPublicFeatureFlags } from './config/featureFlags.js' -import { isRedisAvailable, logQueueStartup } from './queue/connection.js' -import { closeAllQueues } from './queue/queues.js' -import { startQueueScheduler } from './queue/scheduler.js' -import { startPortfolioCheckWorker, stopPortfolioCheckWorker } from './queue/workers/portfolioCheckWorker.js' -import { startRebalanceWorker, stopRebalanceWorker } from './queue/workers/rebalanceWorker.js' -import { startAnalyticsSnapshotWorker, stopAnalyticsSnapshotWorker } from './queue/workers/analyticsSnapshotWorker.js' -import { contractEventIndexerService } from './services/contractEventIndexer.js' - -let startupConfig: StartupConfig -try { - startupConfig = validateStartupConfigOrThrow(process.env) - logger.info('[STARTUP-CONFIG] Validation successful', buildStartupSummary(startupConfig)) - const message = error instanceof Error ? error.message : String(error) - console.error(message) - process.exit(1) -} - -const app = express() -const port = startupConfig.port -const featureFlags = getFeatureFlags() -const publicFeatureFlags = getPublicFeatureFlags() - -const isProduction = startupConfig.nodeEnv === 'production' -const allowedOrigins = startupConfig.corsOrigins - -const corsOptions: cors.CorsOptions = { - origin: isProduction - ? allowedOrigins.length > 0 - ? (origin, cb) => { - if (!origin || allowedOrigins.includes(origin)) cb(null, origin || true) - else cb(new Error('Not allowed by CORS')) - } - : false - : true, - credentials: true, - methods: ['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS', 'PATCH', 'HEAD'], - allowedHeaders: ['Content-Type', 'Authorization', 'Accept', 'Origin', 'X-Requested-With', 'X-Public-Key', 'X-Message', 'X-Signature'] -} -app.use(cors(corsOptions)) - -app.options('*', (req, res) => { - const origin = req.get('Origin') - if (isProduction && allowedOrigins.length > 0) { - if (origin && allowedOrigins.includes(origin)) res.setHeader('Access-Control-Allow-Origin', origin) - } else { - res.setHeader('Access-Control-Allow-Origin', origin || '*') - } - res.setHeader('Access-Control-Allow-Methods', 'GET, POST, PUT, DELETE, OPTIONS, PATCH') - res.setHeader('Access-Control-Allow-Headers', 'Content-Type, Authorization, Accept, Origin, X-Requested-With, X-Public-Key, X-Message, X-Signature') - res.status(204).end() -}) - -// Trust proxy -app.set('trust proxy', 1) - -// Body parsing -app.use(express.json({ limit: '10mb' })) -app.use(express.urlencoded({ extended: true, limit: '10mb' })) - -// Basic logging -app.use((req, res, next) => { - console.log(`${req.method} ${req.url}`) - next() -}) - -app.use(globalRateLimiter) - -// Create auto-rebalancer instance -const autoRebalancer = new AutoRebalancerService() - -// Health check endpoint -app.get('/health', (req, res) => { - res.json({ - status: 'healthy', - timestamp: new Date().toISOString(), - environment: process.env.NODE_ENV || 'development', - autoRebalancer: autoRebalancer ? autoRebalancer.getStatus() : { isRunning: false } - }) -}) - -// CORS test endpoint -app.get('/test/cors', (req, res) => { - if (!featureFlags.enableDebugRoutes) { - return res.status(404).json({ error: 'Route not found' }) - } - res.json({ - success: true, - message: 'CORS working!', - origin: req.get('Origin'), - timestamp: new Date().toISOString() - }) -}) - -// CoinGecko test endpoint with detailed debugging -app.get('/test/coingecko', async (req, res) => { - if (!featureFlags.enableDebugRoutes) { - return res.status(404).json({ error: 'Route not found' }) - } - try { - console.log('[TEST] Testing CoinGecko API...') - const { ReflectorService } = await import('./services/reflector.js') - const reflector = new ReflectorService() - - // Test connectivity first - const testResult = await reflector.testApiConnectivity() - - if (!testResult.success) { - return res.status(500).json({ - success: false, - error: testResult.error, - hasApiKey: !!process.env.COINGECKO_API_KEY, - apiKeyLength: process.env.COINGECKO_API_KEY?.length || 0 - }) - } - - // Try to get actual prices - reflector.clearCache() - const prices = await reflector.getCurrentPrices() - - res.json({ - success: true, - prices, - hasApiKey: !!process.env.COINGECKO_API_KEY, - apiKeyLength: process.env.COINGECKO_API_KEY?.length || 0, - testResult, - environment: process.env.NODE_ENV - }) - res.status(500).json({ - success: false, - error: error instanceof Error ? error.message : String(error), - hasApiKey: !!process.env.COINGECKO_API_KEY - }) - } -}) - -// Root route -app.get('/', (req, res) => { - res.json({ - message: 'Stellar Portfolio Rebalancer API', - status: 'running', - version: '1.0.0', - timestamp: new Date().toISOString(), - features: { - automaticRebalancing: !!autoRebalancer?.getStatus().isRunning, - priceFeeds: true, - riskManagement: true, - portfolioManagement: true, - featureFlags: publicFeatureFlags - }, - endpoints: { - health: '/health', - corsTest: '/test/cors', - coinGeckoTest: '/test/coingecko', - autoRebalancerStatus: '/api/auto-rebalancer/status', - queueHealth: '/api/queue/health' - } - }) -}) - -// Mount API routes -// Canonical v1 namespace -app.use('/api/v1', v1Router) - -// Legacy namespace with deprecation headers -app.use('/api', legacyApiDeprecation, portfolioRouter) - -// 404 handler -app.use((req, res) => { - console.log(`404 - Route not found: ${req.method} ${req.url}`) - res.status(404).json({ - error: 'Route not found', - method: req.method, - url: req.url, - availableEndpoints: { - health: '/health', - api: '/api/*', - autoRebalancer: '/api/auto-rebalancer/*', - queueHealth: '/api/queue/health' - } - }) -}) - -// Error handler -app.use((error: any, req: express.Request, res: express.Response, next: express.NextFunction) => { - console.error('Server error:', error) - res.status(500).json({ - error: 'Internal server error', - message: error.message || 'Unknown error' - }) -}) - -// Create server -const server = createServer(app) - -// WebSocket setup -const wss = new WebSocketServer({ server }) - -wss.on('connection', (ws) => { - console.log('WebSocket connection established') - ws.send(JSON.stringify({ - type: 'connection', - message: 'Connected', - autoRebalancerStatus: autoRebalancer.getStatus() - })) - - ws.on('error', (error) => { - console.error('WebSocket error:', error) - }) -}) - -// Wire wss into autoRebalancer so it can push real-time portfolio events to clients -// Must be called after the WebSocketServer is constructed above -autoRebalancer.setWss(wss) - -// Start server -server.listen(port, async () => { - console.log(`🚀 Server running on port ${port}`) - console.log(`Environment: ${process.env.NODE_ENV || 'development'}`) - console.log(`CoinGecko API Key: ${!!process.env.COINGECKO_API_KEY ? 'SET' : 'NOT SET'}`) - - // Warn if ADMIN_PUBLIC_KEYS is not set - if (!process.env.ADMIN_PUBLIC_KEYS) { - logger.warn( - '⚠️ ADMIN_PUBLIC_KEYS is not set — admin routes (/api/auto-rebalancer/*, ' + - '/api/rebalance/history/sync-onchain) will return 503. ' + - 'Set ADMIN_PUBLIC_KEYS in .env to enable admin functionality.' - ) - } - - // ── BullMQ / Redis setup ──────────────────────────────────────────────── - const redisAvailable = await isRedisAvailable() - logQueueStartup(redisAvailable) - - if (redisAvailable) { - // Start all three workers - startPortfolioCheckWorker() - startRebalanceWorker() - startAnalyticsSnapshotWorker() - - // Register repeatable jobs (scheduler) - try { - await startQueueScheduler() - console.log('[SCHEDULER] ✅ Queue scheduler registered') - } catch (err) { - console.error('[SCHEDULER] ❌ Failed to register scheduler:', err) - } - } - - // ── Auto-rebalancer (queue-backed) ────────────────────────────────────── - const shouldStartAutoRebalancer = - process.env.NODE_ENV === 'production' || - process.env.ENABLE_AUTO_REBALANCER === 'true' - - if (shouldStartAutoRebalancer) { - try { - console.log('[AUTO-REBALANCER] Starting automatic rebalancing service...') - await autoRebalancer.start() - console.log('[AUTO-REBALANCER] ✅ Automatic rebalancing service started successfully') - - // Broadcast to WebSocket clients - wss.clients.forEach(client => { - if (client.readyState === client.OPEN) { - client.send(JSON.stringify({ - type: 'autoRebalancerStarted', - status: autoRebalancer.getStatus(), - timestamp: new Date().toISOString() - })) - } - }) - console.error('[AUTO-REBALANCER] ❌ Failed to start automatic rebalancing service:', error) - } - } else { - console.log('[AUTO-REBALANCER] Automatic rebalancing disabled in development mode') - console.log('[AUTO-REBALANCER] Set ENABLE_AUTO_REBALANCER=true to enable in development') - } - - // Contract event indexer (on-chain source-of-truth history) - try { - await contractEventIndexerService.start() - console.error('[CHAIN-INDEXER] Failed to start:', error) - } - - console.log('Available endpoints:') - console.log(` Health: http://localhost:${port}/health`) - console.log(` CORS Test: http://localhost:${port}/test/cors`) - console.log(` CoinGecko Test: http://localhost:${port}/test/coingecko`) - console.log(` Auto-Rebalancer Status: http://localhost:${port}/api/auto-rebalancer/status`) - console.log(` Queue Health: http://localhost:${port}/api/queue/health`) -}) - -// Graceful shutdown -const gracefulShutdown = async (signal: string) => { - console.log(`\n[SHUTDOWN] ${signal} received, shutting down gracefully...`) - - // Stop auto-rebalancer - try { - autoRebalancer.stop() - console.log('[SHUTDOWN] Auto-rebalancer stopped') - console.error('[SHUTDOWN] Error stopping auto-rebalancer:', error) - } - - // Stop BullMQ workers - try { - await Promise.all([ - stopPortfolioCheckWorker(), - stopRebalanceWorker(), - stopAnalyticsSnapshotWorker(), - ]) - console.log('[SHUTDOWN] BullMQ workers stopped') - console.error('[SHUTDOWN] Error stopping BullMQ workers:', error) - } - - // Close BullMQ queues - try { - await closeAllQueues() - console.log('[SHUTDOWN] BullMQ queues closed') - console.error('[SHUTDOWN] Error closing queues:', error) - } - - // Close database connection - try { - await contractEventIndexerService.stop() - console.log('[SHUTDOWN] Contract event indexer stopped') - console.error('[SHUTDOWN] Error stopping contract event indexer:', error) - } - - try { - databaseService.close() - console.log('[SHUTDOWN] Database connection closed') - console.error('[SHUTDOWN] Error closing database:', error) - } - - // Close WebSocket connections - wss.clients.forEach(client => { - client.send(JSON.stringify({ - type: 'serverShutdown', - message: 'Server is shutting down', - timestamp: new Date().toISOString() - })) - client.close() - }) - - // Close server - server.close((err) => { - if (err) { - console.error('[SHUTDOWN] Error closing server:', err) - process.exit(1) - } - console.log('[SHUTDOWN] Server closed successfully') - process.exit(0) - }) - - // Force exit after 10 seconds - setTimeout(() => { - console.log('[SHUTDOWN] Force exit after timeout') - process.exit(1) - }, 10000) -} - -process.on('SIGTERM', () => gracefulShutdown('SIGTERM')) -process.on('SIGINT', () => gracefulShutdown('SIGINT')) - -// Handle uncaught exceptions -process.on('uncaughtException', (error) => { - console.error('[UNCAUGHT-EXCEPTION] Uncaught exception:', error) - gracefulShutdown('UNCAUGHT_EXCEPTION') -}) - -process.on('unhandledRejection', (reason, promise) => { - console.error('[UNHANDLED-REJECTION] Unhandled promise rejection:', reason) - console.error('Promise:', promise) -}) - -// Export instances for use in routes -export { autoRebalancer } -export default app +import 'dotenv/config' +import express from 'express' +import cors from 'cors' +import { createServer } from 'node:http' +import { WebSocketServer } from 'ws' +import { portfolioRouter } from './api/routes.js' +import { v1Router } from './api/v1Router.js' +import { errorHandler, notFound } from './middleware/errorHandler.js' +import { legacyApiDeprecation } from './middleware/legacyApiDeprecation.js' +import { globalRateLimiter } from './middleware/rateLimit.js' +import { AutoRebalancerService } from './services/autoRebalancer.js' +import { logger } from './utils/logger.js' +import { databaseService } from './services/databaseService.js' +import { validateStartupConfigOrThrow, buildStartupSummary, type StartupConfig } from './config/startupConfig.js' +import { getFeatureFlags, getPublicFeatureFlags } from './config/featureFlags.js' +import { isRedisAvailable, logQueueStartup } from './queue/connection.js' +import { closeAllQueues } from './queue/queues.js' +import { startQueueScheduler } from './queue/scheduler.js' +import { startPortfolioCheckWorker, stopPortfolioCheckWorker } from './queue/workers/portfolioCheckWorker.js' +import { startRebalanceWorker, stopRebalanceWorker } from './queue/workers/rebalanceWorker.js' +import { startAnalyticsSnapshotWorker, stopAnalyticsSnapshotWorker } from './queue/workers/analyticsSnapshotWorker.js' +import { contractEventIndexerService } from './services/contractEventIndexer.js' + +let startupConfig: StartupConfig +try { + startupConfig = validateStartupConfigOrThrow(process.env) + logger.info('[STARTUP-CONFIG] Validation successful', buildStartupSummary(startupConfig)) +} catch (error) { + const message = error instanceof Error ? error.message : String(error) + console.error(message) + process.exit(1) +} + +const app = express() +const port = startupConfig.port +const featureFlags = getFeatureFlags() +const publicFeatureFlags = getPublicFeatureFlags() + +const isProduction = startupConfig.nodeEnv === 'production' +const allowedOrigins = startupConfig.corsOrigins + +const corsOptions: cors.CorsOptions = { + origin: isProduction + ? allowedOrigins.length > 0 + ? (origin, cb) => { + if (!origin || allowedOrigins.includes(origin)) cb(null, origin || true) + else cb(new Error('Not allowed by CORS')) + } + : false + : true, + credentials: true, + methods: ['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS', 'PATCH', 'HEAD'], + allowedHeaders: ['Content-Type', 'Authorization', 'Accept', 'Origin', 'X-Requested-With', 'X-Public-Key', 'X-Message', 'X-Signature'] +} +app.use(cors(corsOptions)) + +app.options('*', (req, res) => { + const origin = req.get('Origin') + if (isProduction && allowedOrigins.length > 0) { + if (origin && allowedOrigins.includes(origin)) res.setHeader('Access-Control-Allow-Origin', origin) + } else { + res.setHeader('Access-Control-Allow-Origin', origin || '*') + } + res.setHeader('Access-Control-Allow-Methods', 'GET, POST, PUT, DELETE, OPTIONS, PATCH') + res.setHeader('Access-Control-Allow-Headers', 'Content-Type, Authorization, Accept, Origin, X-Requested-With, X-Public-Key, X-Message, X-Signature') + res.status(204).end() +}) + +// Trust proxy +app.set('trust proxy', 1) + +// Body parsing +app.use(express.json({ limit: '10mb' })) +app.use(express.urlencoded({ extended: true, limit: '10mb' })) + +// Basic logging +app.use((req, res, next) => { + console.log(`${req.method} ${req.url}`) + next() +}) + +app.use(globalRateLimiter) + +// Create auto-rebalancer instance +const autoRebalancer = new AutoRebalancerService() + +// Health check endpoint +app.get('/health', (req, res) => { + res.json({ + status: 'healthy', + timestamp: new Date().toISOString(), + environment: process.env.NODE_ENV || 'development', + autoRebalancer: autoRebalancer ? autoRebalancer.getStatus() : { isRunning: false } + }) +}) + +// CORS test endpoint +app.get('/test/cors', (req, res) => { + if (!featureFlags.enableDebugRoutes) { + return res.status(404).json({ error: 'Route not found' }) + } + res.json({ + success: true, + message: 'CORS working!', + origin: req.get('Origin'), + timestamp: new Date().toISOString() + }) +}) + +// CoinGecko test endpoint with detailed debugging +app.get('/test/coingecko', async (req, res) => { + if (!featureFlags.enableDebugRoutes) { + return res.status(404).json({ error: 'Route not found' }) + } + try { + console.log('[TEST] Testing CoinGecko API...') + const { ReflectorService } = await import('./services/reflector.js') + const reflector = new ReflectorService() + + // Test connectivity first + const testResult = await reflector.testApiConnectivity() + + if (!testResult.success) { + return res.status(500).json({ + success: false, + error: testResult.error, + hasApiKey: !!process.env.COINGECKO_API_KEY, + apiKeyLength: process.env.COINGECKO_API_KEY?.length || 0 + }) + } + + // Try to get actual prices + reflector.clearCache() + const prices = await reflector.getCurrentPrices() + + res.json({ + success: true, + prices, + hasApiKey: !!process.env.COINGECKO_API_KEY, + apiKeyLength: process.env.COINGECKO_API_KEY?.length || 0, + testResult, + environment: process.env.NODE_ENV + }) + } catch (error) { + res.status(500).json({ + success: false, + error: error instanceof Error ? error.message : String(error), + hasApiKey: !!process.env.COINGECKO_API_KEY + }) + } +}) + +// Root route +app.get('/', (req, res) => { + res.json({ + message: 'Stellar Portfolio Rebalancer API', + status: 'running', + version: '1.0.0', + timestamp: new Date().toISOString(), + features: { + automaticRebalancing: !!autoRebalancer?.getStatus().isRunning, + priceFeeds: true, + riskManagement: true, + portfolioManagement: true, + featureFlags: publicFeatureFlags + }, + endpoints: { + health: '/health', + corsTest: '/test/cors', + coinGeckoTest: '/test/coingecko', + autoRebalancerStatus: '/api/auto-rebalancer/status', + queueHealth: '/api/queue/health' + } + }) +}) + +// Mount API routes +// Canonical v1 namespace +app.use('/api/v1', v1Router) + +// Legacy namespace with deprecation headers +app.use('/api', legacyApiDeprecation, portfolioRouter) + +// 404 handler +app.use((req, res) => { + console.log(`404 - Route not found: ${req.method} ${req.url}`) + res.status(404).json({ + error: 'Route not found', + method: req.method, + url: req.url, + availableEndpoints: { + health: '/health', + api: '/api/*', + autoRebalancer: '/api/auto-rebalancer/*', + queueHealth: '/api/queue/health' + } + }) +}) + +// Error handler +app.use((error: any, req: express.Request, res: express.Response, next: express.NextFunction) => { + console.error('Server error:', error) + res.status(500).json({ + error: 'Internal server error', + message: error.message || 'Unknown error' + }) +}) + +// Create server +const server = createServer(app) + +// WebSocket setup +const wss = new WebSocketServer({ server }) + +wss.on('connection', (ws) => { + console.log('WebSocket connection established') + ws.send(JSON.stringify({ + type: 'connection', + message: 'Connected', + autoRebalancerStatus: autoRebalancer.getStatus() + })) + + ws.on('error', (error) => { + console.error('WebSocket error:', error) + }) +}) + +// Wire wss into autoRebalancer so it can push real-time portfolio events to clients +autoRebalancer.setWss(wss) + +// Start server +server.listen(port, async () => { + console.log(`🚀 Server running on port ${port}`) + console.log(`Environment: ${process.env.NODE_ENV || 'development'}`) + console.log(`CoinGecko API Key: ${!!process.env.COINGECKO_API_KEY ? 'SET' : 'NOT SET'}`) + + // Warn if ADMIN_PUBLIC_KEYS is not set + if (!process.env.ADMIN_PUBLIC_KEYS) { + logger.warn( + '⚠️ ADMIN_PUBLIC_KEYS is not set — admin routes (/api/auto-rebalancer/*, ' + + '/api/rebalance/history/sync-onchain) will return 503. ' + + 'Set ADMIN_PUBLIC_KEYS in .env to enable admin functionality.' + ) + } + + // ── BullMQ / Redis setup ──────────────────────────────────────────────── + const redisAvailable = await isRedisAvailable() + logQueueStartup(redisAvailable) + + if (redisAvailable) { + // Start all three workers + startPortfolioCheckWorker() + startRebalanceWorker() + startAnalyticsSnapshotWorker() + + // Register repeatable jobs (scheduler) + try { + await startQueueScheduler() + console.log('[SCHEDULER] ✅ Queue scheduler registered') + } catch (err) { + console.error('[SCHEDULER] ❌ Failed to register scheduler:', err) + } + } + + // ── Auto-rebalancer (queue-backed) ────────────────────────────────────── + const shouldStartAutoRebalancer = + process.env.NODE_ENV === 'production' || + process.env.ENABLE_AUTO_REBALANCER === 'true' + + if (shouldStartAutoRebalancer) { + try { + console.log('[AUTO-REBALANCER] Starting automatic rebalancing service...') + await autoRebalancer.start() + console.log('[AUTO-REBALANCER] ✅ Automatic rebalancing service started successfully') + + // Broadcast to WebSocket clients + wss.clients.forEach(client => { + if (client.readyState === client.OPEN) { + client.send(JSON.stringify({ + type: 'autoRebalancerStarted', + status: autoRebalancer.getStatus(), + timestamp: new Date().toISOString() + })) + } + }) + } catch (error) { + console.error('[AUTO-REBALANCER] ❌ Failed to start automatic rebalancing service:', error) + } + } else { + console.log('[AUTO-REBALANCER] Automatic rebalancing disabled in development mode') + console.log('[AUTO-REBALANCER] Set ENABLE_AUTO_REBALANCER=true to enable in development') + } + + // Contract event indexer (on-chain source-of-truth history) + try { + await contractEventIndexerService.start() + } catch (error) { + console.error('[CHAIN-INDEXER] Failed to start:', error) + } + + console.log('Available endpoints:') + console.log(` Health: http://localhost:${port}/health`) + console.log(` CORS Test: http://localhost:${port}/test/cors`) + console.log(` CoinGecko Test: http://localhost:${port}/test/coingecko`) + console.log(` Auto-Rebalancer Status: http://localhost:${port}/api/auto-rebalancer/status`) + console.log(` Queue Health: http://localhost:${port}/api/queue/health`) +}) + +// Graceful shutdown +const gracefulShutdown = async (signal: string) => { + console.log(`\n[SHUTDOWN] ${signal} received, shutting down gracefully...`) + + // Stop auto-rebalancer + try { + autoRebalancer.stop() + console.log('[SHUTDOWN] Auto-rebalancer stopped') + } catch (error) { + console.error('[SHUTDOWN] Error stopping auto-rebalancer:', error) + } + + // Stop BullMQ workers + try { + await Promise.all([ + stopPortfolioCheckWorker(), + stopRebalanceWorker(), + stopAnalyticsSnapshotWorker(), + ]) + console.log('[SHUTDOWN] BullMQ workers stopped') + } catch (error) { + console.error('[SHUTDOWN] Error stopping BullMQ workers:', error) + } + + // Close BullMQ queues + try { + await closeAllQueues() + console.log('[SHUTDOWN] BullMQ queues closed') + } catch (error) { + console.error('[SHUTDOWN] Error closing queues:', error) + } + + // Close database connection + try { + await contractEventIndexerService.stop() + console.log('[SHUTDOWN] Contract event indexer stopped') + } catch (error) { + console.error('[SHUTDOWN] Error stopping contract event indexer:', error) + } + + try { + databaseService.close() + console.log('[SHUTDOWN] Database connection closed') + } catch (error) { + console.error('[SHUTDOWN] Error closing database:', error) + } + + // Close WebSocket connections + wss.clients.forEach(client => { + client.send(JSON.stringify({ + type: 'serverShutdown', + message: 'Server is shutting down', + timestamp: new Date().toISOString() + })) + client.close() + }) + + // Close server + server.close((err) => { + if (err) { + console.error('[SHUTDOWN] Error closing server:', err) + process.exit(1) + } + console.log('[SHUTDOWN] Server closed successfully') + process.exit(0) + }) + + // Force exit after 10 seconds + setTimeout(() => { + console.log('[SHUTDOWN] Force exit after timeout') + process.exit(1) + }, 10000) +} + +process.on('SIGTERM', () => gracefulShutdown('SIGTERM')) +process.on('SIGINT', () => gracefulShutdown('SIGINT')) + +// Handle uncaught exceptions +process.on('uncaughtException', (error) => { + console.error('[UNCAUGHT-EXCEPTION] Uncaught exception:', error) + gracefulShutdown('UNCAUGHT_EXCEPTION') +}) + +process.on('unhandledRejection', (reason, promise) => { + console.error('[UNHANDLED-REJECTION] Unhandled promise rejection:', reason) + console.error('Promise:', promise) +}) + +// Export instances for use in routes +export { autoRebalancer } +export default app From fed68fcee5ca03ada65903ffaab5f252a936d91e Mon Sep 17 00:00:00 2001 From: Jaydbrown Date: Wed, 24 Jun 2026 14:04:42 +0100 Subject: [PATCH 32/32] feat: migrate WebSocket broadcast methods into AutoRebalancerService --- backend/src/services/autoRebalancer.ts | 354 +++++++++++++------------ 1 file changed, 178 insertions(+), 176 deletions(-) diff --git a/backend/src/services/autoRebalancer.ts b/backend/src/services/autoRebalancer.ts index 8cc663c..d1a1bd3 100644 --- a/backend/src/services/autoRebalancer.ts +++ b/backend/src/services/autoRebalancer.ts @@ -3,189 +3,191 @@ import { StellarService } from './stellar.js' import { ReflectorService } from './reflector.js' import { rebalanceHistoryService } from './serviceContainer.js' import { portfolioStorage } from './portfolioStorage.js' -import { CircuitBreakers } from './circuitBreakers.js' -import { notificationService } from './notificationService.js' -import { logger } from '../utils/logger.js' -import { getPortfolioCheckQueue } from '../queue/queues.js' -import { isRedisAvailable } from '../queue/connection.js' - +import { CircuitBreakers } from './circuitBreakers.js' +import { notificationService } from './notificationService.js' +import { logger } from '../utils/logger.js' +import { getPortfolioCheckQueue } from '../queue/queues.js' +import { isRedisAvailable } from '../queue/connection.js' + export class AutoRebalancerService { private stellarService: StellarService private reflectorService: ReflectorService private isRunning = false private wss: WebSocketServer | null = null - - // Configuration (kept for getStatus() compatibility) - private readonly CHECK_INTERVAL = 30 * 60 * 1000 // 30 minutes - private readonly MIN_REBALANCE_INTERVAL = 24 * 60 * 60 * 1000 - private readonly MAX_AUTO_REBALANCES_PER_DAY = 3 - + + // Configuration (kept for getStatus() compatibility) + private readonly CHECK_INTERVAL = 30 * 60 * 1000 // 30 minutes + private readonly MIN_REBALANCE_INTERVAL = 24 * 60 * 60 * 1000 + private readonly MAX_AUTO_REBALANCES_PER_DAY = 3 + constructor() { this.stellarService = new StellarService() this.reflectorService = new ReflectorService() } - - /** - * Wire up the WebSocket server so this service can push portfolio events - * to connected clients. Called once from index.ts after wss is created. - */ - setWss(wss: WebSocketServer): void { - this.wss = wss - } - - /** Returns true if a WebSocket server has been wired in via setWss(). */ - hasWss(): boolean { - return this.wss !== null - } - - /** - * Start the automatic monitoring service. - * With BullMQ, this just flags the service as running – the scheduler - * already registered the repeatable job. We also enqueue an immediate - * check so the first run happens without waiting 30 min. - */ - async start(): Promise { - if (this.isRunning) { - logger.warn('[AUTO-REBALANCER] Already running') - return - } - - this.isRunning = true - logger.info('[AUTO-REBALANCER] Service started (queue-backed)') - - const redisUp = await isRedisAvailable() - if (redisUp) { - const queue = getPortfolioCheckQueue() - if (queue) { - await queue.add( - 'startup-portfolio-check', - { triggeredBy: 'startup' }, - { priority: 1 } - ) - logger.info('[AUTO-REBALANCER] Enqueued startup portfolio-check job') - } - } else { - logger.warn('[AUTO-REBALANCER] Redis not available – startup check skipped') - } - } - - /** - * Stop the service flag (workers are stopped separately by index.ts). - */ - stop(): void { - if (!this.isRunning) return - this.isRunning = false - logger.info('[AUTO-REBALANCER] Service stopped') - } - - /** - * Force an immediate check of all portfolios. - */ - async forceCheck(): Promise { - const queue = getPortfolioCheckQueue() - if (!queue) throw new Error('Redis unavailable – cannot force check') - - await queue.add( - 'force-portfolio-check', - { triggeredBy: 'manual' }, - { priority: 1 } - ) - logger.info('[AUTO-REBALANCER] Force check job enqueued') - } - - /** - * Get service status - */ - getStatus(): { - isRunning: boolean - checkInterval: number - minRebalanceInterval: number - maxRebalancesPerDay: number - backend: string - } { - return { - isRunning: this.isRunning, - checkInterval: this.CHECK_INTERVAL, - minRebalanceInterval: this.MIN_REBALANCE_INTERVAL, - maxRebalancesPerDay: this.MAX_AUTO_REBALANCES_PER_DAY, - backend: 'bullmq', - } - } - - /** - * Get statistics about auto-rebalancing activity - */ - async getStatistics(): Promise<{ - totalAutoRebalances: number - rebalancesToday: number - lastCheckTime: string | null - averageRebalancesPerDay: number - }> { - try { + + /** + * Start the automatic monitoring service. + * With BullMQ, this just flags the service as running – the scheduler + * already registered the repeatable job. We also enqueue an immediate + * check so the first run happens without waiting 30 min. + */ + async start(): Promise { + if (this.isRunning) { + logger.warn('[AUTO-REBALANCER] Already running') + return + } + + this.isRunning = true + logger.info('[AUTO-REBALANCER] Service started (queue-backed)') + + const redisUp = await isRedisAvailable() + if (redisUp) { + const queue = getPortfolioCheckQueue() + if (queue) { + await queue.add( + 'startup-portfolio-check', + { triggeredBy: 'startup' }, + { priority: 1 } + ) + logger.info('[AUTO-REBALANCER] Enqueued startup portfolio-check job') + } + } else { + logger.warn('[AUTO-REBALANCER] Redis not available – startup check skipped') + } + } + + /** + * Stop the service flag (workers are stopped separately by index.ts). + */ + stop(): void { + if (!this.isRunning) return + this.isRunning = false + logger.info('[AUTO-REBALANCER] Service stopped') + } + + /** + * Force an immediate check of all portfolios. + */ + async forceCheck(): Promise { + const queue = getPortfolioCheckQueue() + if (!queue) throw new Error('Redis unavailable – cannot force check') + + await queue.add( + 'force-portfolio-check', + { triggeredBy: 'manual' }, + { priority: 1 } + ) + logger.info('[AUTO-REBALANCER] Force check job enqueued') + } + + /** + * Get service status + */ + getStatus(): { + isRunning: boolean + checkInterval: number + minRebalanceInterval: number + maxRebalancesPerDay: number + backend: string + } { + return { + isRunning: this.isRunning, + checkInterval: this.CHECK_INTERVAL, + minRebalanceInterval: this.MIN_REBALANCE_INTERVAL, + maxRebalancesPerDay: this.MAX_AUTO_REBALANCES_PER_DAY, + backend: 'bullmq', + } + } + + /** + * Get statistics about auto-rebalancing activity + */ + async getStatistics(): Promise<{ + totalAutoRebalances: number + rebalancesToday: number + lastCheckTime: string | null + averageRebalancesPerDay: number + }> { + try { const allAutoRebalances = await rebalanceHistoryService.getAllAutoRebalances() - - const today = new Date() - today.setHours(0, 0, 0, 0) - const todayRebalances = allAutoRebalances.filter( - r => new Date(r.timestamp) >= today - ) - - const thirtyDaysAgo = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000) - const recentRebalances = allAutoRebalances.filter( - r => new Date(r.timestamp) >= thirtyDaysAgo - ) - - return { - totalAutoRebalances: allAutoRebalances.length, - rebalancesToday: todayRebalances.length, - lastCheckTime: new Date().toISOString(), - averageRebalancesPerDay: recentRebalances.length / 30, - } - } catch (error) { - logger.error('[AUTO-REBALANCER] Error getting statistics', { error }) - return { - totalAutoRebalances: 0, - rebalancesToday: 0, - lastCheckTime: null, - averageRebalancesPerDay: 0, - } - } - } - - // ─── WebSocket broadcasting (migrated from legacy RebalancingService) ──── - - /** - * Push a portfolio-specific event to all connected WebSocket clients. - */ - notifyClients(portfolioId: string, event: string, data: any = {}): void { - if (!this.wss) return - const message = JSON.stringify({ - type: 'portfolio_update', - portfolioId, - event, - data, - timestamp: new Date().toISOString(), - }) - this.wss.clients.forEach(client => { - if (client.readyState === 1) client.send(message) - }) - logger.info(`[AUTO-REBALANCER] Notification sent: ${event} for portfolio ${portfolioId}`) - } - - /** - * Broadcast a market-level event to all connected WebSocket clients. - */ - broadcastToAllClients(event: string, data: any = {}): void { - if (!this.wss) return - const message = JSON.stringify({ - type: 'market_update', - event, - data, - timestamp: new Date().toISOString(), - }) - this.wss.clients.forEach(client => { - if (client.readyState === 1) client.send(message) - }) - logger.info(`[AUTO-REBALANCER] Market broadcast sent: ${event}`) - } -} + + const today = new Date() + today.setHours(0, 0, 0, 0) + const todayRebalances = allAutoRebalances.filter( + r => new Date(r.timestamp) >= today + ) + + const thirtyDaysAgo = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000) + const recentRebalances = allAutoRebalances.filter( + r => new Date(r.timestamp) >= thirtyDaysAgo + ) + + return { + totalAutoRebalances: allAutoRebalances.length, + rebalancesToday: todayRebalances.length, + lastCheckTime: new Date().toISOString(), + averageRebalancesPerDay: recentRebalances.length / 30, + } + } catch (error) { + logger.error('[AUTO-REBALANCER] Error getting statistics', { error }) + return { + totalAutoRebalances: 0, + rebalancesToday: 0, + lastCheckTime: null, + averageRebalancesPerDay: 0, + } + } + } + + /** + * Inject the WebSocket server so portfolio events can be pushed to clients. + * Called from index.ts once wss is available. + */ + setWss(wss: WebSocketServer): void { + this.wss = wss + } + + /** + * Returns true once setWss() has been called. + */ + hasWss(): boolean { + return this.wss !== null + } + + // ─── WebSocket broadcasting ────────────────────────────────────────────── + + /** + * Push a portfolio-specific event to all connected WebSocket clients. + */ + notifyClients(portfolioId: string, event: string, data: Record = {}): void { + if (!this.wss) return + const message = JSON.stringify({ + type: 'portfolio_update', + portfolioId, + event, + data, + timestamp: new Date().toISOString(), + }) + this.wss.clients.forEach(client => { + if (client.readyState === 1) client.send(message) + }) + logger.info(`[AUTO-REBALANCER] Pushed "${event}" event to WebSocket clients`, { portfolioId }) + } + + /** + * Broadcast a market-wide event to all connected WebSocket clients. + */ + broadcastToAllClients(event: string, data: Record = {}): void { + if (!this.wss) return + const message = JSON.stringify({ + type: 'market_update', + event, + data, + timestamp: new Date().toISOString(), + }) + this.wss.clients.forEach(client => { + if (client.readyState === 1) client.send(message) + }) + logger.info(`[AUTO-REBALANCER] Broadcast "${event}" to all WebSocket clients`) + } +}