diff --git a/src/app.module.ts b/src/app.module.ts index 81dd098b..870a9311 100644 --- a/src/app.module.ts +++ b/src/app.module.ts @@ -25,6 +25,8 @@ import { DeepLinkModule } from './deep-link/deep-link.module'; import { InvoicesModule } from './payments/invoices/invoices.module'; import { ReportingModule } from './payments/reporting/reporting.module'; import { HealthModule } from './health/health.module'; +import { QueueModule } from './queues/queue.module'; +import { WorkersBridgeModule } from './workers/bridge/workers-bridge.module'; // ✅ keep BOTH modules import { ReadReplicaModule } from './database/read-replica'; @@ -54,6 +56,8 @@ const featureFlags = loadFeatureFlags(); InvoicesModule, ReportingModule, HealthModule, + QueueModule, + WorkersBridgeModule, // ✅ always include read replicas (or wrap if needed) ReadReplicaModule, diff --git a/src/common/constants/queue.constants.ts b/src/common/constants/queue.constants.ts index 21e3ca63..3217a2cb 100644 --- a/src/common/constants/queue.constants.ts +++ b/src/common/constants/queue.constants.ts @@ -9,6 +9,7 @@ export const QUEUE_NAMES = { USER_DATA_EXPORT: 'user-data-export', SUBSCRIPTIONS: 'subscriptions', WEBHOOKS: 'webhooks', + DEAD_LETTER: 'dead-letter', } as const; export const JOB_NAMES = { // Email queue diff --git a/src/messaging/messaging.module.ts b/src/messaging/messaging.module.ts index 8918e28d..e355fb17 100644 --- a/src/messaging/messaging.module.ts +++ b/src/messaging/messaging.module.ts @@ -13,9 +13,6 @@ import { TracingService } from './tracing/tracing.service'; @Module({ imports: [ TypeOrmModule.forFeature([Message]), - BullModule.forRoot({ - redis: process.env.QUEUE_REDIS_URL || process.env.REDIS_URL || 'redis://127.0.0.1:6379', - }), BullModule.registerQueue({ name: QUEUE_NAMES.MESSAGE_QUEUE }), ], providers: [ diff --git a/src/monitoring/metrics/metrics-collection.service.ts b/src/monitoring/metrics/metrics-collection.service.ts index f37f692a..106f1c4b 100644 --- a/src/monitoring/metrics/metrics-collection.service.ts +++ b/src/monitoring/metrics/metrics-collection.service.ts @@ -87,6 +87,15 @@ export class MetricsCollectionService implements OnModuleInit { /** Queue job processing duration, labelled by queue_name and job_type */ public queueProcessingTime: Histogram; + /** Current number of waiting jobs per queue */ + public queueWaitingJobs: Gauge; + + /** Current number of active jobs per queue */ + public queueActiveJobs: Gauge; + + /** Total number of failed jobs per queue */ + public queueFailedJobs: Gauge; + // ── Business Metrics – Email ─────────────────────────────────────────────── /** Total email campaigns sent, labelled by campaign_type and status */ @@ -209,6 +218,18 @@ export class MetricsCollectionService implements OnModuleInit { this.queueProcessingTime.observe({ queue_name: queueName, job_type: jobType }, duration); } + updateQueueWaitingJobs(queueName: string, count: number): void { + this.queueWaitingJobs.set({ queue_name: queueName }, count); + } + + updateQueueActiveJobs(queueName: string, count: number): void { + this.queueActiveJobs.set({ queue_name: queueName }, count); + } + + updateQueueFailedJobs(queueName: string, count: number): void { + this.queueFailedJobs.set({ queue_name: queueName }, count); + } + // ── Recording helpers – Email ───────────────────────────────────────────── recordEmailCampaignSent(campaignType: string, status: string): void { @@ -388,6 +409,27 @@ export class MetricsCollectionService implements OnModuleInit { registers: [this.registry], }); + this.queueWaitingJobs = new Gauge({ + name: 'queue_waiting_jobs', + help: 'Current number of waiting jobs per queue', + labelNames: ['queue_name'], + registers: [this.registry], + }); + + this.queueActiveJobs = new Gauge({ + name: 'queue_active_jobs', + help: 'Current number of active jobs per queue', + labelNames: ['queue_name'], + registers: [this.registry], + }); + + this.queueFailedJobs = new Gauge({ + name: 'queue_failed_jobs_total', + help: 'Total number of failed jobs per queue', + labelNames: ['queue_name'], + registers: [this.registry], + }); + // Email this.emailCampaignsSent = new Counter({ name: 'email_campaigns_sent_total', diff --git a/src/queues/README.md b/src/queues/README.md index b2386418..6ca735fa 100644 --- a/src/queues/README.md +++ b/src/queues/README.md @@ -465,6 +465,71 @@ curl http://localhost:3000/queues/metrics curl http://localhost:3000/queues/health ``` +## Optimization Architecture + +### Centralized Queue Config (`QueueModule`) + +`src/queues/queue.module.ts` is a `@Global()` module that registers all 11 Bull queues in one place. It provides `QueueService`, `PrioritizationService`, `RetryStrategyService`, and `QueueMetricsService` globally. + +### Workers Bridge (`WorkersBridgeService`) + +`src/workers/bridge/workers-bridge.service.ts` bridges Bull queue consumers to the existing worker classes. On `onModuleInit`, it: +1. Binds each queue to its worker's `.handle()` method +2. Wraps processing with Prometheus metric recording (`queue_processing_duration_seconds`) +3. Registers `failed` event handlers that forward permanently failed jobs to the dead-letter queue + +### Priority Queue + +`PrioritizationService` maps `JobPriority` enum (CRITICAL=1, HIGH=2, NORMAL=3, LOW=4, BACKGROUND=5) to Bull's native priority (0-4, lower=higher). `QueueService.addJob()` defaults to `NORMAL` if no priority is specified, ensuring all jobs participate in Bull's priority sorting. + +### Dead Letter Queue + +`DeadLetterService` receives failed jobs from Bull's `failed` event and re-queues them to the `DEAD_LETTER` queue with the original job metadata, error reason, and stack trace. This replaces the in-process-only failure tracking. + +### Retry Strategies + +`RetryStrategyService` exposes `RETRY_STRATEGIES` (EMAIL, PAYMENT, NOTIFICATION, BACKUP, REPORT, DEFAULT) as injectable config. Pass a strategy key to `QueueService.addJob()` to apply automatic backoff and max attempts. + +### Monitoring + +- `MetricsCollectionService` records `queue_processing_duration_seconds` (histogram), `queue_waiting_jobs`, `queue_active_jobs`, `queue_failed_jobs_total` (gauges) +- `QueueMetricsService` polls all queues every 30s and updates the Prometheus gauges +- All queue metrics are available at `/metrics` + +### Using `QueueService` + +```ts +// Basic — default priority (NORMAL) +await queueService.addJob(QUEUE_NAMES.EMAIL, 'send-email', { to, subject }); + +// With explicit priority +await queueService.addJob(QUEUE_NAMES.WEBHOOKS, 'process-webhook', payload, { + priority: JobPriority.CRITICAL, +}); + +// With retry strategy +await queueService.addJob(QUEUE_NAMES.EMAIL, 'send-campaign', template, {}, 'EMAIL'); +``` + +### Environment Variables + +| Variable | Default | Description | +|---|---|---| +| `REDIS_URL` / `QUEUE_REDIS_URL` | `redis://127.0.0.1:6379` | Redis connection | +| `QUEUE_CONCURRENCY_EMAIL` | `5` | Email worker concurrency | +| `QUEUE_CONCURRENCY_MEDIA` | `3` | Media processing concurrency | +| `QUEUE_CONCURRENCY_SYNC` | `4` | Data sync concurrency | +| `QUEUE_CONCURRENCY_BACKUP` | `1` | Backup concurrency | +| `QUEUE_CONCURRENCY_WEBHOOKS` | `10` | Webhooks concurrency | +| `QUEUE_CONCURRENCY_SUBSCRIPTIONS` | `5` | Subscriptions concurrency | + +### Load Testing + +```bash +# Queue throughput benchmark (requires Redis localhost) +npx ts-node tests/load/queue-throughput.benchmark.ts +``` + ## Production Considerations 1. **Redis High Availability**: Use Redis Sentinel or Cluster diff --git a/src/queues/dead-letter/dead-letter.module.ts b/src/queues/dead-letter/dead-letter.module.ts new file mode 100644 index 00000000..eea71938 --- /dev/null +++ b/src/queues/dead-letter/dead-letter.module.ts @@ -0,0 +1,10 @@ +import { Module } from '@nestjs/common'; +import { QueueModule } from '../queue.module'; +import { DeadLetterService } from './dead-letter.service'; + +@Module({ + imports: [QueueModule], + providers: [DeadLetterService], + exports: [DeadLetterService], +}) +export class DeadLetterModule {} diff --git a/src/queues/dead-letter/dead-letter.service.ts b/src/queues/dead-letter/dead-letter.service.ts new file mode 100644 index 00000000..b83a6024 --- /dev/null +++ b/src/queues/dead-letter/dead-letter.service.ts @@ -0,0 +1,45 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { InjectQueue } from '@nestjs/bull'; +import { Queue, Job } from 'bull'; +import { QUEUE_NAMES } from '../../common/constants/queue.constants'; + +export interface DeadLetterJobData { + originalQueue: string; + originalJobId: string | number; + originalJobName: string; + originalData: any; + failedReason: string; + failedAt: string; + attemptsMade: number; + stackTrace?: string; +} + +@Injectable() +export class DeadLetterService { + private readonly logger = new Logger(DeadLetterService.name); + + constructor(@InjectQueue(QUEUE_NAMES.DEAD_LETTER) private readonly deadLetterQueue: Queue) {} + + async sendToDeadLetter(job: Job, queueName: string): Promise { + const data: DeadLetterJobData = { + originalQueue: queueName, + originalJobId: job.id, + originalJobName: job.name, + originalData: job.data, + failedReason: job.failedReason ?? 'Unknown error', + failedAt: new Date().toISOString(), + attemptsMade: job.attemptsMade, + stackTrace: job.stacktrace?.[0], + }; + + await this.deadLetterQueue.add(`${queueName}:${job.name || 'unknown'}`, data, { + attempts: 1, + removeOnComplete: true, + removeOnFail: true, + }); + + this.logger.warn( + `[DEAD-LETTER] Job ${job.id} from "${queueName}" moved to dead-letter queue (reason: ${data.failedReason})`, + ); + } +} diff --git a/src/queues/metrics/queue-metrics.service.ts b/src/queues/metrics/queue-metrics.service.ts new file mode 100644 index 00000000..5c7ea868 --- /dev/null +++ b/src/queues/metrics/queue-metrics.service.ts @@ -0,0 +1,56 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { Interval } from '@nestjs/schedule'; +import { InjectQueue } from '@nestjs/bull'; +import { Queue } from 'bull'; +import { QUEUE_NAMES } from '../../common/constants/queue.constants'; +import { MetricsCollectionService } from '../../monitoring/metrics/metrics-collection.service'; + +@Injectable() +export class QueueMetricsService { + private readonly logger = new Logger(QueueMetricsService.name); + + private readonly queueList: { name: string; queue: Queue }[] = []; + + constructor( + @InjectQueue(QUEUE_NAMES.EMAIL) emailQueue: Queue, + @InjectQueue(QUEUE_NAMES.EMAIL_MARKETING) emailMarketingQueue: Queue, + @InjectQueue(QUEUE_NAMES.SYNC_TASKS) syncTasksQueue: Queue, + @InjectQueue(QUEUE_NAMES.BACKUP_PROCESSING) backupProcessingQueue: Queue, + @InjectQueue(QUEUE_NAMES.MESSAGE_QUEUE) messageQueue: Queue, + @InjectQueue(QUEUE_NAMES.MEDIA_PROCESSING) mediaProcessingQueue: Queue, + @InjectQueue(QUEUE_NAMES.DEFAULT) defaultQueue: Queue, + @InjectQueue(QUEUE_NAMES.USER_DATA_EXPORT) userDataExportQueue: Queue, + @InjectQueue(QUEUE_NAMES.SUBSCRIPTIONS) subscriptionsQueue: Queue, + @InjectQueue(QUEUE_NAMES.WEBHOOKS) webhooksQueue: Queue, + @InjectQueue(QUEUE_NAMES.DEAD_LETTER) deadLetterQueue: Queue, + private readonly metrics: MetricsCollectionService, + ) { + this.queueList = [ + { name: QUEUE_NAMES.EMAIL, queue: emailQueue }, + { name: QUEUE_NAMES.EMAIL_MARKETING, queue: emailMarketingQueue }, + { name: QUEUE_NAMES.SYNC_TASKS, queue: syncTasksQueue }, + { name: QUEUE_NAMES.BACKUP_PROCESSING, queue: backupProcessingQueue }, + { name: QUEUE_NAMES.MESSAGE_QUEUE, queue: messageQueue }, + { name: QUEUE_NAMES.MEDIA_PROCESSING, queue: mediaProcessingQueue }, + { name: QUEUE_NAMES.DEFAULT, queue: defaultQueue }, + { name: QUEUE_NAMES.USER_DATA_EXPORT, queue: userDataExportQueue }, + { name: QUEUE_NAMES.SUBSCRIPTIONS, queue: subscriptionsQueue }, + { name: QUEUE_NAMES.WEBHOOKS, queue: webhooksQueue }, + { name: QUEUE_NAMES.DEAD_LETTER, queue: deadLetterQueue }, + ]; + } + + @Interval(30_000) + async recordQueueMetrics(): Promise { + for (const entry of this.queueList) { + try { + const counts = await entry.queue.getJobCounts(); + this.metrics.updateQueueWaitingJobs(entry.name, counts.waiting || 0); + this.metrics.updateQueueActiveJobs(entry.name, counts.active || 0); + this.metrics.updateQueueFailedJobs(entry.name, counts.failed || 0); + } catch (err) { + this.logger.warn(`Failed to record metrics for queue "${entry.name}": ${err}`); + } + } + } +} diff --git a/src/queues/prioritization/prioritization.service.ts b/src/queues/prioritization/prioritization.service.ts index 2f76cff1..907fdcc1 100644 --- a/src/queues/prioritization/prioritization.service.ts +++ b/src/queues/prioritization/prioritization.service.ts @@ -148,6 +148,10 @@ export class PrioritizationService { return optionsMap[priority]; } + toBullPriority(priority: JobPriority): number { + return Math.max(0, priority - 1); + } + /** * Adjust priority dynamically based on job age */ diff --git a/src/queues/queue.module.ts b/src/queues/queue.module.ts new file mode 100644 index 00000000..0ca7c137 --- /dev/null +++ b/src/queues/queue.module.ts @@ -0,0 +1,34 @@ +import { Module, Global } from '@nestjs/common'; +import { BullModule } from '@nestjs/bull'; +import { QUEUE_NAMES } from '../common/constants/queue.constants'; +import { QueueService } from './queue.service'; +import { PrioritizationService } from './prioritization/prioritization.service'; +import { RetryStrategyService } from './retry/retry-strategy.service'; +import { QueueMetricsService } from './metrics/queue-metrics.service'; +import { MonitoringModule } from '../monitoring/monitoring.module'; + +@Global() +@Module({ + imports: [ + MonitoringModule, + BullModule.forRoot({ + redis: process.env.QUEUE_REDIS_URL || process.env.REDIS_URL || 'redis://127.0.0.1:6379', + }), + BullModule.registerQueue( + { name: QUEUE_NAMES.EMAIL }, + { name: QUEUE_NAMES.EMAIL_MARKETING }, + { name: QUEUE_NAMES.SYNC_TASKS }, + { name: QUEUE_NAMES.BACKUP_PROCESSING }, + { name: QUEUE_NAMES.MESSAGE_QUEUE }, + { name: QUEUE_NAMES.MEDIA_PROCESSING }, + { name: QUEUE_NAMES.DEFAULT }, + { name: QUEUE_NAMES.USER_DATA_EXPORT }, + { name: QUEUE_NAMES.SUBSCRIPTIONS }, + { name: QUEUE_NAMES.WEBHOOKS }, + { name: QUEUE_NAMES.DEAD_LETTER }, + ), + ], + providers: [QueueService, PrioritizationService, RetryStrategyService, QueueMetricsService], + exports: [BullModule, QueueService, PrioritizationService, RetryStrategyService], +}) +export class QueueModule {} diff --git a/src/queues/queue.service.ts b/src/queues/queue.service.ts new file mode 100644 index 00000000..1e3fc1ee --- /dev/null +++ b/src/queues/queue.service.ts @@ -0,0 +1,111 @@ +import { Injectable, Logger, NotFoundException } from '@nestjs/common'; +import { InjectQueue } from '@nestjs/bull'; +import { Queue, Job, JobOptions as BullJobOptions } from 'bull'; +import { QUEUE_NAMES } from '../common/constants/queue.constants'; +import { JobPriority } from './enums/job-priority.enum'; +import { IJobOptions } from './interfaces/queue.interfaces'; +import { PrioritizationService } from './prioritization/prioritization.service'; +import { RetryStrategyService, RetryStrategyKey } from './retry/retry-strategy.service'; + +export interface AddJobResult { + jobId: string | number; + queue: string; + name: string; +} + +@Injectable() +export class QueueService { + private readonly logger = new Logger(QueueService.name); + + constructor( + @InjectQueue(QUEUE_NAMES.EMAIL) private readonly emailQueue: Queue, + @InjectQueue(QUEUE_NAMES.EMAIL_MARKETING) private readonly emailMarketingQueue: Queue, + @InjectQueue(QUEUE_NAMES.SYNC_TASKS) private readonly syncTasksQueue: Queue, + @InjectQueue(QUEUE_NAMES.BACKUP_PROCESSING) private readonly backupProcessingQueue: Queue, + @InjectQueue(QUEUE_NAMES.MESSAGE_QUEUE) private readonly messageQueue: Queue, + @InjectQueue(QUEUE_NAMES.MEDIA_PROCESSING) private readonly mediaProcessingQueue: Queue, + @InjectQueue(QUEUE_NAMES.DEFAULT) private readonly defaultQueue: Queue, + @InjectQueue(QUEUE_NAMES.USER_DATA_EXPORT) private readonly userDataExportQueue: Queue, + @InjectQueue(QUEUE_NAMES.SUBSCRIPTIONS) private readonly subscriptionsQueue: Queue, + @InjectQueue(QUEUE_NAMES.WEBHOOKS) private readonly webhooksQueue: Queue, + @InjectQueue(QUEUE_NAMES.DEAD_LETTER) private readonly deadLetterQueue: Queue, + private readonly prioritizationService: PrioritizationService, + private readonly retryStrategyService: RetryStrategyService, + ) {} + + private readonly queueMap = new Map(); + + private getQueue(queueName: string): Queue { + let queue = this.queueMap.get(queueName); + if (!queue) { + const map: Record = { + [QUEUE_NAMES.EMAIL]: this.emailQueue, + [QUEUE_NAMES.EMAIL_MARKETING]: this.emailMarketingQueue, + [QUEUE_NAMES.SYNC_TASKS]: this.syncTasksQueue, + [QUEUE_NAMES.BACKUP_PROCESSING]: this.backupProcessingQueue, + [QUEUE_NAMES.MESSAGE_QUEUE]: this.messageQueue, + [QUEUE_NAMES.MEDIA_PROCESSING]: this.mediaProcessingQueue, + [QUEUE_NAMES.DEFAULT]: this.defaultQueue, + [QUEUE_NAMES.USER_DATA_EXPORT]: this.userDataExportQueue, + [QUEUE_NAMES.SUBSCRIPTIONS]: this.subscriptionsQueue, + [QUEUE_NAMES.WEBHOOKS]: this.webhooksQueue, + [QUEUE_NAMES.DEAD_LETTER]: this.deadLetterQueue, + }; + queue = map[queueName]; + if (!queue) { + throw new NotFoundException(`Queue "${queueName}" not found`); + } + this.queueMap.set(queueName, queue); + } + return queue; + } + + async addJob( + queueName: string, + jobName: string, + data: Record, + options?: Partial, + retryStrategy?: RetryStrategyKey, + ): Promise { + const queue = this.getQueue(queueName); + const priorityLevel = options?.priority ?? JobPriority.NORMAL; + const bullPriority = this.prioritizationService.toBullPriority(priorityLevel); + + const { priority: _, ...restOptions } = options ?? {}; + + let retryOpts: Record = {}; + if (retryStrategy) { + retryOpts = { + attempts: this.retryStrategyService.getBullAttempts(retryStrategy), + backoff: this.retryStrategyService.getBullBackoff(retryStrategy), + }; + } + + const jobOptions: BullJobOptions = { + ...restOptions, + ...retryOpts, + priority: bullPriority, + }; + + const job = await queue.add(jobName, data, jobOptions); + this.logger.debug(`Job ${job.id} added to "${queueName}" (name: ${jobName})`); + return { jobId: job.id, queue: queueName, name: jobName }; + } + + async getJob(queueName: string, jobId: string): Promise { + const queue = this.getQueue(queueName); + return queue.getJob(jobId); + } + + async getQueueCounts(queueName: string): Promise<{ + waiting: number; + active: number; + completed: number; + failed: number; + delayed: number; + }> { + const queue = this.getQueue(queueName); + const counts = await queue.getJobCounts(); + return counts; + } +} diff --git a/src/queues/retry/retry-strategy.module.ts b/src/queues/retry/retry-strategy.module.ts new file mode 100644 index 00000000..7304c914 --- /dev/null +++ b/src/queues/retry/retry-strategy.module.ts @@ -0,0 +1,8 @@ +import { Module } from '@nestjs/common'; +import { RetryStrategyService } from './retry-strategy.service'; + +@Module({ + providers: [RetryStrategyService], + exports: [RetryStrategyService], +}) +export class RetryStrategyModule {} diff --git a/src/queues/retry/retry-strategy.service.ts b/src/queues/retry/retry-strategy.service.ts new file mode 100644 index 00000000..9425165a --- /dev/null +++ b/src/queues/retry/retry-strategy.service.ts @@ -0,0 +1,27 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { RETRY_STRATEGIES } from '../queues.constants'; +import { IRetryStrategy } from '../interfaces/queue.interfaces'; + +export type RetryStrategyKey = keyof typeof RETRY_STRATEGIES; + +@Injectable() +export class RetryStrategyService { + private readonly logger = new Logger(RetryStrategyService.name); + + getStrategy(key: RetryStrategyKey): IRetryStrategy { + return RETRY_STRATEGIES[key]; + } + + getBullBackoff(key: RetryStrategyKey): { type: 'fixed' | 'exponential'; delay: number } { + const strategy = this.getStrategy(key); + return { type: strategy.backoffType, delay: strategy.initialDelay }; + } + + getBullAttempts(key: RetryStrategyKey): number { + return this.getStrategy(key).maxAttempts; + } + + getAllStrategies(): Record { + return { ...RETRY_STRATEGIES }; + } +} diff --git a/src/workers/bridge/workers-bridge.module.ts b/src/workers/bridge/workers-bridge.module.ts new file mode 100644 index 00000000..5c87e99d --- /dev/null +++ b/src/workers/bridge/workers-bridge.module.ts @@ -0,0 +1,36 @@ +import { Module } from '@nestjs/common'; +import { EmailModule } from '../../email-marketing/email.module'; +import { WebhooksDeliveryModule } from '../../webhooks/webhooks-delivery.module'; +import { MonitoringModule } from '../../monitoring/monitoring.module'; +import { QueueModule } from '../../queues/queue.module'; +import { DeadLetterModule } from '../../queues/dead-letter/dead-letter.module'; +import { MessagingModule } from '../../messaging/messaging.module'; +import { EmailWorker } from '../processors/email.worker'; +import { MediaProcessingWorker } from '../processors/media-processing.worker'; +import { DataSyncWorker } from '../processors/data-sync.worker'; +import { BackupProcessingWorker } from '../processors/backup-processing.worker'; +import { WebhooksWorker } from '../processors/webhooks.worker'; +import { SubscriptionsWorker } from '../processors/subscriptions.worker'; +import { WorkersBridgeService } from './workers-bridge.service'; + +@Module({ + imports: [ + QueueModule, + DeadLetterModule, + EmailModule, + WebhooksDeliveryModule, + MonitoringModule, + MessagingModule, + ], + providers: [ + EmailWorker, + MediaProcessingWorker, + DataSyncWorker, + BackupProcessingWorker, + WebhooksWorker, + SubscriptionsWorker, + WorkersBridgeService, + ], + exports: [WorkersBridgeService], +}) +export class WorkersBridgeModule {} diff --git a/src/workers/bridge/workers-bridge.service.ts b/src/workers/bridge/workers-bridge.service.ts new file mode 100644 index 00000000..7eb52e5a --- /dev/null +++ b/src/workers/bridge/workers-bridge.service.ts @@ -0,0 +1,160 @@ +import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; +import { InjectQueue } from '@nestjs/bull'; +import { Queue, Job } from 'bull'; +import { QUEUE_NAMES } from '../../common/constants/queue.constants'; +import { EmailWorker } from '../processors/email.worker'; +import { MediaProcessingWorker } from '../processors/media-processing.worker'; +import { DataSyncWorker } from '../processors/data-sync.worker'; +import { BackupProcessingWorker } from '../processors/backup-processing.worker'; +import { WebhooksWorker } from '../processors/webhooks.worker'; +import { SubscriptionsWorker } from '../processors/subscriptions.worker'; +import { MetricsCollectionService } from '../../monitoring/metrics/metrics-collection.service'; +import { DeadLetterService } from '../../queues/dead-letter/dead-letter.service'; +import { MessagingService } from '../../messaging/messaging.service'; + +interface QueueWorkerBinding { + queue: Queue; + handler: (job: Job) => Promise; + concurrency: number; + name: string; +} + +@Injectable() +export class WorkersBridgeService implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(WorkersBridgeService.name); + private readonly bindings: QueueWorkerBinding[] = []; + + constructor( + @InjectQueue(QUEUE_NAMES.EMAIL) private readonly emailQueue: Queue, + @InjectQueue(QUEUE_NAMES.MEDIA_PROCESSING) private readonly mediaQueue: Queue, + @InjectQueue(QUEUE_NAMES.SYNC_TASKS) private readonly syncQueue: Queue, + @InjectQueue(QUEUE_NAMES.BACKUP_PROCESSING) private readonly backupQueue: Queue, + @InjectQueue(QUEUE_NAMES.WEBHOOKS) private readonly webhooksQueue: Queue, + @InjectQueue(QUEUE_NAMES.SUBSCRIPTIONS) private readonly subscriptionsQueue: Queue, + private readonly emailWorker: EmailWorker, + private readonly mediaWorker: MediaProcessingWorker, + private readonly dataSyncWorker: DataSyncWorker, + private readonly backupWorker: BackupProcessingWorker, + private readonly webhooksWorker: WebhooksWorker, + private readonly subscriptionsWorker: SubscriptionsWorker, + private readonly metrics?: MetricsCollectionService, + private readonly deadLetterService?: DeadLetterService, + private readonly messagingService?: MessagingService, + ) {} + + async onModuleInit(): Promise { + this.registerBinding( + this.emailQueue, + (job) => this.emailWorker.handle(job), + this.getConcurrency('email', 5), + 'email', + ); + this.registerBinding( + this.mediaQueue, + (job) => this.mediaWorker.handle(job), + this.getConcurrency('media', 3), + 'media-processing', + ); + this.registerBinding( + this.syncQueue, + (job) => this.dataSyncWorker.handle(job), + this.getConcurrency('sync', 4), + 'sync-tasks', + ); + this.registerBinding( + this.backupQueue, + (job) => this.backupWorker.handle(job), + this.getConcurrency('backup', 1), + 'backup-processing', + ); + this.registerBinding( + this.webhooksQueue, + (job) => this.webhooksWorker.handle(job), + this.getConcurrency('webhooks', 10), + 'webhooks', + ); + this.registerBinding( + this.subscriptionsQueue, + (job) => this.subscriptionsWorker.handle(job), + this.getConcurrency('subscriptions', 5), + 'subscriptions', + ); + + if (this.messagingService) { + await this.messagingService.processMessages(); + this.logger.log('Messaging queue processor registered via MessagingService'); + } + + await Promise.all(this.bindings.map((b) => this.startBinding(b))); + this.registerDeadLetterHandlers(); + this.logger.log(`Workers bridge initialized with ${this.bindings.length} queue bindings`); + } + + async onModuleDestroy(): Promise { + this.logger.log('Closing worker queue connections...'); + for (const binding of this.bindings) { + try { + await binding.queue.close(); + } catch (err) { + this.logger.warn(`Error closing queue "${binding.name}": ${err}`); + } + } + } + + private registerBinding( + queue: Queue, + handler: (job: Job) => Promise, + concurrency: number, + name: string, + ): void { + this.bindings.push({ queue, handler, concurrency, name }); + } + + private async startBinding(binding: QueueWorkerBinding): Promise { + const wrapped = async (job: Job): Promise => { + const start = Date.now(); + try { + this.logger.debug(`[${binding.name}] Processing job ${job.id} (${job.name})`); + return await binding.handler(job); + } finally { + const durationMs = Date.now() - start; + this.metrics?.recordQueueProcessingTime( + binding.name, + job.name || 'unknown', + durationMs / 1000, + ); + this.logger.debug(`[${binding.name}] Job ${job.id} completed in ${durationMs}ms`); + } + }; + + binding.queue.process(binding.concurrency, wrapped); + this.logger.log(`Queue "${binding.name}" bound with concurrency ${binding.concurrency}`); + } + + private registerDeadLetterHandlers(): void { + for (const binding of this.bindings) { + binding.queue.on('failed', async (job: Job, err: Error) => { + this.logger.warn(`[${binding.name}] Job ${job.id} failed: ${err.message}`); + if (this.deadLetterService) { + try { + await this.deadLetterService.sendToDeadLetter(job, binding.name); + } catch (dlqErr) { + this.logger.error( + `[DEAD-LETTER] Failed to forward job ${job.id} to dead-letter queue: ${dlqErr}`, + ); + } + } + }); + } + } + + private getConcurrency(key: string, fallback: number): number { + const envKey = `QUEUE_CONCURRENCY_${key.toUpperCase()}`; + const val = process.env[envKey]; + if (val !== undefined) { + const parsed = parseInt(val, 10); + if (Number.isFinite(parsed) && parsed > 0) return parsed; + } + return fallback; + } +} diff --git a/tests/load/queue-throughput.benchmark.ts b/tests/load/queue-throughput.benchmark.ts new file mode 100644 index 00000000..76a55013 --- /dev/null +++ b/tests/load/queue-throughput.benchmark.ts @@ -0,0 +1,156 @@ +/** + * Queue Throughput Benchmark + * + * Measures Bull queue throughput with and without priority under load. + * + * Usage: + * npx ts-node tests/load/queue-throughput.benchmark.ts + * + * Prerequisites: + * - Redis running on REDIS_URL (default: redis://127.0.0.1:6379) + */ + +import Bull from 'bull'; +import { performance } from 'perf_hooks'; + +const REDIS_URL = process.env.REDIS_URL || 'redis://127.0.0.1:6379'; +const JOB_COUNT = 5_000; +const CONCURRENCY = 10; + +interface BenchmarkResult { + label: string; + totalJobs: number; + durationMs: number; + throughput: number; + p50Ms: number; + p95Ms: number; + p99Ms: number; +} + +function createQueue(name: string): Bull.Queue { + return new Bull(name, REDIS_URL, { + defaultJobOptions: { + removeOnComplete: true, + removeOnFail: true, + }, + }); +} + +async function runBenchmark( + label: string, + queue: Bull.Queue, + jobs: { name: string; data: any; opts?: Bull.JobOptions }[], + concurrency: number, +): Promise { + const latencies: number[] = []; + + const processPromise = new Promise((resolve, reject) => { + queue.process(concurrency, async (job: Bull.Job) => { + const start = performance.now(); + await job.progress(100); + latencies.push(performance.now() - start); + }); + + queue.on('completed', (job: Bull.Job) => { + if (latencies.length >= jobs.length) { + resolve(); + } + }); + + queue.on('failed', (job: Bull.Job, err: Error) => { + console.error(`Job ${job.id} failed: ${err.message}`); + }); + }); + + const addStart = performance.now(); + for (const job of jobs) { + await queue.add(job.name, job.data, job.opts); + } + const addDuration = performance.now() - addStart; + console.log(` Added ${jobs.length} jobs in ${addDuration.toFixed(0)}ms`); + + const processStart = performance.now(); + await processPromise; + const processDuration = performance.now() - processStart; + + latencies.sort((a, b) => a - b); + const p50 = latencies[Math.floor(latencies.length * 0.5)]; + const p95 = latencies[Math.floor(latencies.length * 0.95)]; + const p99 = latencies[Math.floor(latencies.length * 0.99)]; + + return { + label, + totalJobs: jobs.length, + durationMs: processDuration, + throughput: Math.round((jobs.length / processDuration) * 1000), + p50Ms: Math.round(p50 * 100) / 100, + p95Ms: Math.round(p95 * 100) / 100, + p99Ms: Math.round(p99 * 100) / 100, + }; +} + +function printResults(results: BenchmarkResult[]): void { + console.log('\n========== BENCHMARK RESULTS ==========\n'); + console.log( + `${'LABEL'.padEnd(30)} ${'JOBS'.padEnd(8)} ${'DURATION'.padEnd(12)} ${'THROUGHPUT'.padEnd(12)} ${'P50'.padEnd(10)} ${'P95'.padEnd(10)} ${'P99'.padEnd(10)}`, + ); + console.log('-'.repeat(92)); + for (const r of results) { + console.log( + `${r.label.padEnd(30)} ${String(r.totalJobs).padEnd(8)} ${`${r.durationMs.toFixed(0)}ms`.padEnd(12)} ${`${r.throughput} jobs/s`.padEnd(12)} ${`${r.p50Ms}ms`.padEnd(10)} ${`${r.p95Ms}ms`.padEnd(10)} ${`${r.p99Ms}ms`.padEnd(10)}`, + ); + } + console.log('\n======================================\n'); +} + +async function main(): Promise { + console.log(`Queue Throughput Benchmark`); + console.log(` Redis: ${REDIS_URL}`); + console.log(` Jobs per test: ${JOB_COUNT}`); + console.log(` Concurrency: ${CONCURRENCY}\n`); + + const jobs = Array.from({ length: JOB_COUNT }, (_, i) => ({ + name: 'benchmark', + data: { index: i, timestamp: Date.now() }, + })); + + const priorityJobs = Array.from({ length: JOB_COUNT }, (_, i) => ({ + name: 'benchmark', + data: { index: i, timestamp: Date.now() }, + opts: { priority: i % 5 } as Bull.JobOptions, + })); + + const results: BenchmarkResult[] = []; + + // Test 1: Without priority + const q1 = createQueue('benchmark-default'); + await q1.empty(); + console.log('Test 1: Without priority...'); + results.push(await runBenchmark('Without priority', q1, jobs, CONCURRENCY)); + await q1.close(); + + // Test 2: With priority + const q2 = createQueue('benchmark-priority'); + await q2.empty(); + console.log('Test 2: With priority...'); + results.push(await runBenchmark('With priority', q2, priorityJobs, CONCURRENCY)); + await q2.close(); + + // Test 3: Higher concurrency + const q3 = createQueue('benchmark-high-concurrency'); + await q3.empty(); + console.log('Test 3: Concurrency x2...'); + results.push( + await runBenchmark('Concurrency x2', q3, jobs, CONCURRENCY * 2), + ); + await q3.close(); + + printResults(results); + console.log('Benchmark complete. Clean up queues manually via Redis CLI:\n'); + console.log(' redis-cli KEYS "bull:benchmark-*" | xargs redis-cli DEL'); +} + +main().catch((err) => { + console.error('Benchmark failed:', err); + process.exit(1); +});