Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import { DeepLinkModule } from './deep-link/deep-link.module';
import { InvoicesModule } from './payments/invoices/invoices.module';
import { ReportingModule } from './payments/reporting/reporting.module';
import { HealthModule } from './health/health.module';
import { QueueModule } from './queues/queue.module';
import { WorkersBridgeModule } from './workers/bridge/workers-bridge.module';

// ✅ keep BOTH modules
import { ReadReplicaModule } from './database/read-replica';
Expand Down Expand Up @@ -54,6 +56,8 @@ const featureFlags = loadFeatureFlags();
InvoicesModule,
ReportingModule,
HealthModule,
QueueModule,
WorkersBridgeModule,

// ✅ always include read replicas (or wrap if needed)
ReadReplicaModule,
Expand Down
1 change: 1 addition & 0 deletions src/common/constants/queue.constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ export const QUEUE_NAMES = {
USER_DATA_EXPORT: 'user-data-export',
SUBSCRIPTIONS: 'subscriptions',
WEBHOOKS: 'webhooks',
DEAD_LETTER: 'dead-letter',
} as const;
export const JOB_NAMES = {
// Email queue
Expand Down
3 changes: 0 additions & 3 deletions src/messaging/messaging.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@ import { TracingService } from './tracing/tracing.service';
@Module({
imports: [
TypeOrmModule.forFeature([Message]),
BullModule.forRoot({
redis: process.env.QUEUE_REDIS_URL || process.env.REDIS_URL || 'redis://127.0.0.1:6379',
}),
BullModule.registerQueue({ name: QUEUE_NAMES.MESSAGE_QUEUE }),
],
providers: [
Expand Down
42 changes: 42 additions & 0 deletions src/monitoring/metrics/metrics-collection.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,15 @@ export class MetricsCollectionService implements OnModuleInit {
/** Queue job processing duration, labelled by queue_name and job_type */
public queueProcessingTime: Histogram;

/** Current number of waiting jobs per queue */
public queueWaitingJobs: Gauge;

/** Current number of active jobs per queue */
public queueActiveJobs: Gauge;

/** Total number of failed jobs per queue */
public queueFailedJobs: Gauge;

// ── Business Metrics – Email ───────────────────────────────────────────────

/** Total email campaigns sent, labelled by campaign_type and status */
Expand Down Expand Up @@ -209,6 +218,18 @@ export class MetricsCollectionService implements OnModuleInit {
this.queueProcessingTime.observe({ queue_name: queueName, job_type: jobType }, duration);
}

updateQueueWaitingJobs(queueName: string, count: number): void {
this.queueWaitingJobs.set({ queue_name: queueName }, count);
}

updateQueueActiveJobs(queueName: string, count: number): void {
this.queueActiveJobs.set({ queue_name: queueName }, count);
}

updateQueueFailedJobs(queueName: string, count: number): void {
this.queueFailedJobs.set({ queue_name: queueName }, count);
}

// ── Recording helpers – Email ─────────────────────────────────────────────

recordEmailCampaignSent(campaignType: string, status: string): void {
Expand Down Expand Up @@ -388,6 +409,27 @@ export class MetricsCollectionService implements OnModuleInit {
registers: [this.registry],
});

this.queueWaitingJobs = new Gauge({
name: 'queue_waiting_jobs',
help: 'Current number of waiting jobs per queue',
labelNames: ['queue_name'],
registers: [this.registry],
});

this.queueActiveJobs = new Gauge({
name: 'queue_active_jobs',
help: 'Current number of active jobs per queue',
labelNames: ['queue_name'],
registers: [this.registry],
});

this.queueFailedJobs = new Gauge({
name: 'queue_failed_jobs_total',
help: 'Total number of failed jobs per queue',
labelNames: ['queue_name'],
registers: [this.registry],
});

// Email
this.emailCampaignsSent = new Counter({
name: 'email_campaigns_sent_total',
Expand Down
65 changes: 65 additions & 0 deletions src/queues/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,71 @@ curl http://localhost:3000/queues/metrics
curl http://localhost:3000/queues/health
```

## Optimization Architecture

### Centralized Queue Config (`QueueModule`)

`src/queues/queue.module.ts` is a `@Global()` module that registers all 11 Bull queues in one place. It provides `QueueService`, `PrioritizationService`, `RetryStrategyService`, and `QueueMetricsService` globally.

### Workers Bridge (`WorkersBridgeService`)

`src/workers/bridge/workers-bridge.service.ts` bridges Bull queue consumers to the existing worker classes. On `onModuleInit`, it:
1. Binds each queue to its worker's `.handle()` method
2. Wraps processing with Prometheus metric recording (`queue_processing_duration_seconds`)
3. Registers `failed` event handlers that forward permanently failed jobs to the dead-letter queue

### Priority Queue

`PrioritizationService` maps `JobPriority` enum (CRITICAL=1, HIGH=2, NORMAL=3, LOW=4, BACKGROUND=5) to Bull's native priority (0-4, lower=higher). `QueueService.addJob()` defaults to `NORMAL` if no priority is specified, ensuring all jobs participate in Bull's priority sorting.

### Dead Letter Queue

`DeadLetterService` receives failed jobs from Bull's `failed` event and re-queues them to the `DEAD_LETTER` queue with the original job metadata, error reason, and stack trace. This replaces the in-process-only failure tracking.

### Retry Strategies

`RetryStrategyService` exposes `RETRY_STRATEGIES` (EMAIL, PAYMENT, NOTIFICATION, BACKUP, REPORT, DEFAULT) as injectable config. Pass a strategy key to `QueueService.addJob()` to apply automatic backoff and max attempts.

### Monitoring

- `MetricsCollectionService` records `queue_processing_duration_seconds` (histogram), `queue_waiting_jobs`, `queue_active_jobs`, `queue_failed_jobs_total` (gauges)
- `QueueMetricsService` polls all queues every 30s and updates the Prometheus gauges
- All queue metrics are available at `/metrics`

### Using `QueueService`

```ts
// Basic — default priority (NORMAL)
await queueService.addJob(QUEUE_NAMES.EMAIL, 'send-email', { to, subject });

// With explicit priority
await queueService.addJob(QUEUE_NAMES.WEBHOOKS, 'process-webhook', payload, {
priority: JobPriority.CRITICAL,
});

// With retry strategy
await queueService.addJob(QUEUE_NAMES.EMAIL, 'send-campaign', template, {}, 'EMAIL');
```

### Environment Variables

| Variable | Default | Description |
|---|---|---|
| `REDIS_URL` / `QUEUE_REDIS_URL` | `redis://127.0.0.1:6379` | Redis connection |
| `QUEUE_CONCURRENCY_EMAIL` | `5` | Email worker concurrency |
| `QUEUE_CONCURRENCY_MEDIA` | `3` | Media processing concurrency |
| `QUEUE_CONCURRENCY_SYNC` | `4` | Data sync concurrency |
| `QUEUE_CONCURRENCY_BACKUP` | `1` | Backup concurrency |
| `QUEUE_CONCURRENCY_WEBHOOKS` | `10` | Webhooks concurrency |
| `QUEUE_CONCURRENCY_SUBSCRIPTIONS` | `5` | Subscriptions concurrency |

### Load Testing

```bash
# Queue throughput benchmark (requires Redis localhost)
npx ts-node tests/load/queue-throughput.benchmark.ts
```

## Production Considerations

1. **Redis High Availability**: Use Redis Sentinel or Cluster
Expand Down
10 changes: 10 additions & 0 deletions src/queues/dead-letter/dead-letter.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { Module } from '@nestjs/common';
import { QueueModule } from '../queue.module';
import { DeadLetterService } from './dead-letter.service';

@Module({
imports: [QueueModule],
providers: [DeadLetterService],
exports: [DeadLetterService],
})
export class DeadLetterModule {}
45 changes: 45 additions & 0 deletions src/queues/dead-letter/dead-letter.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import { Injectable, Logger } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bull';
import { Queue, Job } from 'bull';
import { QUEUE_NAMES } from '../../common/constants/queue.constants';

export interface DeadLetterJobData {
originalQueue: string;
originalJobId: string | number;
originalJobName: string;
originalData: any;
failedReason: string;
failedAt: string;
attemptsMade: number;
stackTrace?: string;
}

@Injectable()
export class DeadLetterService {
private readonly logger = new Logger(DeadLetterService.name);

constructor(@InjectQueue(QUEUE_NAMES.DEAD_LETTER) private readonly deadLetterQueue: Queue) {}

async sendToDeadLetter(job: Job, queueName: string): Promise<void> {
const data: DeadLetterJobData = {
originalQueue: queueName,
originalJobId: job.id,
originalJobName: job.name,
originalData: job.data,
failedReason: job.failedReason ?? 'Unknown error',
failedAt: new Date().toISOString(),
attemptsMade: job.attemptsMade,
stackTrace: job.stacktrace?.[0],
};

await this.deadLetterQueue.add(`${queueName}:${job.name || 'unknown'}`, data, {
attempts: 1,
removeOnComplete: true,
removeOnFail: true,
});

this.logger.warn(
`[DEAD-LETTER] Job ${job.id} from "${queueName}" moved to dead-letter queue (reason: ${data.failedReason})`,
);
}
}
56 changes: 56 additions & 0 deletions src/queues/metrics/queue-metrics.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import { Injectable, Logger } from '@nestjs/common';
import { Interval } from '@nestjs/schedule';
import { InjectQueue } from '@nestjs/bull';
import { Queue } from 'bull';
import { QUEUE_NAMES } from '../../common/constants/queue.constants';
import { MetricsCollectionService } from '../../monitoring/metrics/metrics-collection.service';

@Injectable()
export class QueueMetricsService {
private readonly logger = new Logger(QueueMetricsService.name);

private readonly queueList: { name: string; queue: Queue }[] = [];

constructor(
@InjectQueue(QUEUE_NAMES.EMAIL) emailQueue: Queue,
@InjectQueue(QUEUE_NAMES.EMAIL_MARKETING) emailMarketingQueue: Queue,
@InjectQueue(QUEUE_NAMES.SYNC_TASKS) syncTasksQueue: Queue,
@InjectQueue(QUEUE_NAMES.BACKUP_PROCESSING) backupProcessingQueue: Queue,
@InjectQueue(QUEUE_NAMES.MESSAGE_QUEUE) messageQueue: Queue,
@InjectQueue(QUEUE_NAMES.MEDIA_PROCESSING) mediaProcessingQueue: Queue,
@InjectQueue(QUEUE_NAMES.DEFAULT) defaultQueue: Queue,
@InjectQueue(QUEUE_NAMES.USER_DATA_EXPORT) userDataExportQueue: Queue,
@InjectQueue(QUEUE_NAMES.SUBSCRIPTIONS) subscriptionsQueue: Queue,
@InjectQueue(QUEUE_NAMES.WEBHOOKS) webhooksQueue: Queue,
@InjectQueue(QUEUE_NAMES.DEAD_LETTER) deadLetterQueue: Queue,
private readonly metrics: MetricsCollectionService,
) {
this.queueList = [
{ name: QUEUE_NAMES.EMAIL, queue: emailQueue },
{ name: QUEUE_NAMES.EMAIL_MARKETING, queue: emailMarketingQueue },
{ name: QUEUE_NAMES.SYNC_TASKS, queue: syncTasksQueue },
{ name: QUEUE_NAMES.BACKUP_PROCESSING, queue: backupProcessingQueue },
{ name: QUEUE_NAMES.MESSAGE_QUEUE, queue: messageQueue },
{ name: QUEUE_NAMES.MEDIA_PROCESSING, queue: mediaProcessingQueue },
{ name: QUEUE_NAMES.DEFAULT, queue: defaultQueue },
{ name: QUEUE_NAMES.USER_DATA_EXPORT, queue: userDataExportQueue },
{ name: QUEUE_NAMES.SUBSCRIPTIONS, queue: subscriptionsQueue },
{ name: QUEUE_NAMES.WEBHOOKS, queue: webhooksQueue },
{ name: QUEUE_NAMES.DEAD_LETTER, queue: deadLetterQueue },
];
}

@Interval(30_000)
async recordQueueMetrics(): Promise<void> {
for (const entry of this.queueList) {
try {
const counts = await entry.queue.getJobCounts();
this.metrics.updateQueueWaitingJobs(entry.name, counts.waiting || 0);
this.metrics.updateQueueActiveJobs(entry.name, counts.active || 0);
this.metrics.updateQueueFailedJobs(entry.name, counts.failed || 0);
} catch (err) {
this.logger.warn(`Failed to record metrics for queue "${entry.name}": ${err}`);
}
}
}
}
4 changes: 4 additions & 0 deletions src/queues/prioritization/prioritization.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ export class PrioritizationService {
return optionsMap[priority];
}

toBullPriority(priority: JobPriority): number {
return Math.max(0, priority - 1);
}

/**
* Adjust priority dynamically based on job age
*/
Expand Down
34 changes: 34 additions & 0 deletions src/queues/queue.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import { Module, Global } from '@nestjs/common';
import { BullModule } from '@nestjs/bull';
import { QUEUE_NAMES } from '../common/constants/queue.constants';
import { QueueService } from './queue.service';
import { PrioritizationService } from './prioritization/prioritization.service';
import { RetryStrategyService } from './retry/retry-strategy.service';
import { QueueMetricsService } from './metrics/queue-metrics.service';
import { MonitoringModule } from '../monitoring/monitoring.module';

@Global()
@Module({
imports: [
MonitoringModule,
BullModule.forRoot({
redis: process.env.QUEUE_REDIS_URL || process.env.REDIS_URL || 'redis://127.0.0.1:6379',
}),
BullModule.registerQueue(
{ name: QUEUE_NAMES.EMAIL },
{ name: QUEUE_NAMES.EMAIL_MARKETING },
{ name: QUEUE_NAMES.SYNC_TASKS },
{ name: QUEUE_NAMES.BACKUP_PROCESSING },
{ name: QUEUE_NAMES.MESSAGE_QUEUE },
{ name: QUEUE_NAMES.MEDIA_PROCESSING },
{ name: QUEUE_NAMES.DEFAULT },
{ name: QUEUE_NAMES.USER_DATA_EXPORT },
{ name: QUEUE_NAMES.SUBSCRIPTIONS },
{ name: QUEUE_NAMES.WEBHOOKS },
{ name: QUEUE_NAMES.DEAD_LETTER },
),
],
providers: [QueueService, PrioritizationService, RetryStrategyService, QueueMetricsService],
exports: [BullModule, QueueService, PrioritizationService, RetryStrategyService],
})
export class QueueModule {}
Loading
Loading