diff --git a/backend/src/index.ts b/backend/src/index.ts index c80b57a..aac0d31 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -8,7 +8,6 @@ import { v1Router } from './api/v1Router.js' import { errorHandler, notFound } from './middleware/errorHandler.js' import { legacyApiDeprecation } from './middleware/legacyApiDeprecation.js' import { globalRateLimiter } from './middleware/rateLimit.js' -import { RebalancingService } from './monitoring/rebalancer.js' import { AutoRebalancerService } from './services/autoRebalancer.js' import { logger } from './utils/logger.js' import { databaseService } from './services/databaseService.js' @@ -226,14 +225,8 @@ wss.on('connection', (ws) => { }) }) -// Start existing rebalancing service (now queue-backed, no cron) -try { - const rebalancingService = new RebalancingService(wss) - rebalancingService.start() - console.log('[REBALANCING-SERVICE] Monitoring service started (queue-backed)') -} catch (error) { - console.error('Failed to start rebalancing service:', error) -} +// Wire wss into autoRebalancer so it can push real-time portfolio events to clients +autoRebalancer.setWss(wss) // Start server server.listen(port, async () => { diff --git a/backend/src/monitoring/rebalancer.ts b/backend/src/monitoring/rebalancer.ts deleted file mode 100644 index 7b6606e..0000000 --- a/backend/src/monitoring/rebalancer.ts +++ /dev/null @@ -1,162 +0,0 @@ -import { WebSocketServer } from 'ws' -import { StellarService } from '../services/stellar.js' -import { ReflectorService } from '../services/reflector.js' -import { rebalanceHistoryService, riskManagementService } from '../services/serviceContainer.js' -import { portfolioStorage } from '../services/portfolioStorage.js' -import { getPortfolioCheckQueue } from '../queue/queues.js' -import { logger } from '../utils/logger.js' -import type { Portfolio, RiskAlert } from '../types/index.js' - -export class RebalancingService { - private stellarService: StellarService - private reflectorService: ReflectorService - private wss: WebSocketServer - - constructor(wss: WebSocketServer) { - this.stellarService = new StellarService() - this.reflectorService = new ReflectorService() - this.wss = wss - } - - /** - * Start the monitoring service. - * Recurring portfolio checks and risk metric updates are now handled by - * the BullMQ portfolio-check worker. This method sets up WebSocket - * broadcasting hooks only. - * - * NOTE: node-cron schedules have been removed – replaced by the queue - * scheduler in src/queue/scheduler.ts. - */ - start() { - logger.info('[REBALANCING-SERVICE] Monitoring service started (queue-backed). WebSocket broadcasting active.') - } - - /** - * Manually check a specific portfolio and broadcast results via WebSocket. - */ - async forceCheckPortfolio(portfolioId: string): Promise { - try { - await this.checkPortfolioForRebalancing(portfolioId) - return { success: true, message: 'Portfolio check completed' } - } catch (error) { - const errorMessage = error instanceof Error ? error.message : String(error) - logger.error(`Force check failed for portfolio ${portfolioId}:`, { error: errorMessage }) - return { success: false, error: errorMessage } - } - } - - async getStatus(): Promise { - const stats = await rebalanceHistoryService.getHistoryStats() - const circuitBreakers = riskManagementService.getCircuitBreakerStatus() - const active = await this.getActivePortfolios() - return { - activePortfolios: active.length, - rebalanceHistory: stats, - circuitBreakers, - riskManagement: { enabled: true, lastUpdate: new Date().toISOString() }, - } - } - - // ─── Internal helpers (used by forceCheckPortfolio) ────────────────────── - - private async checkPortfolioForRebalancing(portfolioId: string) { - try { - const prices = await this.reflectorService.getCurrentPrices() - const portfolio = await this.stellarService.getPortfolio(portfolioId) - - const riskAlerts = riskManagementService.updatePriceData(prices) - const needsRebalance = await this.stellarService.checkRebalanceNeeded(portfolioId) - - if (needsRebalance) { - logger.info(`Portfolio ${portfolioId} needs rebalancing – enqueueing job`) - const riskCheck = riskManagementService.shouldAllowRebalance(portfolio, prices) - - if (riskCheck.allowed) { - // Enqueue a rebalance job rather than executing inline - const queue = getPortfolioCheckQueue() - if (queue) { - await queue.add( - `manual-check-${portfolioId}`, - { triggeredBy: 'manual' }, - { priority: 1 } - ) - this.notifyClients(portfolioId, 'rebalance_queued', { - message: 'Rebalance job enqueued', - }) - } - } else { - logger.warn(`Rebalancing blocked for ${portfolioId}: ${riskCheck.reason}`) - this.notifyClients(portfolioId, 'rebalance_blocked', { - message: 'Rebalancing temporarily blocked by safety systems', - reason: riskCheck.reason, - alerts: riskCheck.alerts, - }) - - await rebalanceHistoryService.recordRebalanceEvent({ - portfolioId, - trigger: 'Automatic Check – Blocked', - trades: 0, - gasUsed: '0 XLM', - status: 'failed', - prices, - portfolio, - }) - } - } - - if (riskAlerts.length > 0) { - const criticalAlerts = riskAlerts.filter((a: RiskAlert) => a.severity === 'critical') - if (criticalAlerts.length > 0) { - this.notifyClients(portfolioId, 'risk_alert', { - message: 'Critical risk conditions detected', - alerts: criticalAlerts, - }) - } - } - } catch (error) { - const errorMessage = error instanceof Error ? error.message : String(error) - logger.error(`Failed to check portfolio ${portfolioId} for rebalancing:`, { - error: errorMessage, - portfolioId, - }) - } - } - - private async getActivePortfolios(): Promise> { - const allPortfolios = await portfolioStorage.getAllPortfolios() - return allPortfolios - .filter((p: Portfolio) => p.threshold > 0) - .map((p: Portfolio) => ({ id: p.id, autoRebalance: true })) - } - - private notifyClients(portfolioId: string, event: string, data: any = {}) { - const message = JSON.stringify({ - type: 'portfolio_update', - portfolioId, - event, - data, - timestamp: new Date().toISOString(), - }) - - this.wss.clients.forEach(client => { - if (client.readyState === 1) client.send(message) - }) - - logger.info(`Notification sent: ${event} for portfolio ${portfolioId}`) - } - - private broadcastToAllClients(event: string, data: any = {}) { - const message = JSON.stringify({ - type: 'market_update', - event, - data, - timestamp: new Date().toISOString(), - }) - - this.wss.clients.forEach(client => { - if (client.readyState === 1) client.send(message) - }) - - logger.info(`Market broadcast sent: ${event}`) - } -} diff --git a/backend/src/services/autoRebalancer.ts b/backend/src/services/autoRebalancer.ts index 4e1197f..d1a1bd3 100644 --- a/backend/src/services/autoRebalancer.ts +++ b/backend/src/services/autoRebalancer.ts @@ -1,3 +1,4 @@ +import { WebSocketServer } from 'ws' import { StellarService } from './stellar.js' import { ReflectorService } from './reflector.js' import { rebalanceHistoryService } from './serviceContainer.js' @@ -12,6 +13,7 @@ export class AutoRebalancerService { private stellarService: StellarService private reflectorService: ReflectorService private isRunning = false + private wss: WebSocketServer | null = null // Configuration (kept for getStatus() compatibility) private readonly CHECK_INTERVAL = 30 * 60 * 1000 // 30 minutes @@ -136,4 +138,56 @@ export class AutoRebalancerService { } } } + + /** + * Inject the WebSocket server so portfolio events can be pushed to clients. + * Called from index.ts once wss is available. + */ + setWss(wss: WebSocketServer): void { + this.wss = wss + } + + /** + * Returns true once setWss() has been called. + */ + hasWss(): boolean { + return this.wss !== null + } + + // ─── WebSocket broadcasting ────────────────────────────────────────────── + + /** + * Push a portfolio-specific event to all connected WebSocket clients. + */ + notifyClients(portfolioId: string, event: string, data: Record = {}): void { + if (!this.wss) return + const message = JSON.stringify({ + type: 'portfolio_update', + portfolioId, + event, + data, + timestamp: new Date().toISOString(), + }) + this.wss.clients.forEach(client => { + if (client.readyState === 1) client.send(message) + }) + logger.info(`[AUTO-REBALANCER] Pushed "${event}" event to WebSocket clients`, { portfolioId }) + } + + /** + * Broadcast a market-wide event to all connected WebSocket clients. + */ + broadcastToAllClients(event: string, data: Record = {}): void { + if (!this.wss) return + const message = JSON.stringify({ + type: 'market_update', + event, + data, + timestamp: new Date().toISOString(), + }) + this.wss.clients.forEach(client => { + if (client.readyState === 1) client.send(message) + }) + logger.info(`[AUTO-REBALANCER] Broadcast "${event}" to all WebSocket clients`) + } }