Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

- **Bulk-sent messages are now recorded, their errors no longer leak internal addresses, and a running
batch can be cancelled across instances.** Messages sent via a bulk batch went straight to the engine
and were never written to the messages table, so they were invisible to chat history and statistics; a
blocked-destination (SSRF) failure stored the refused internal address verbatim in the batch result
(readable via the batch-status endpoint); and a cancellation was only honoured by the process that
created the batch. Bulk sends now persist like single sends, a blocked-destination error is reported as a
generic code, and the batch is re-checked against the database as it runs so a cancel issued by another
instance (or after a restart) stops it.

## [0.4.2] - 2026-06-19

Bug-fix and hardening release: access-control tightening, session-lifecycle resilience, data-migration
Expand Down
108 changes: 107 additions & 1 deletion src/modules/message/bulk-message.service.spec.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import { Test, TestingModule } from '@nestjs/testing';
import { getRepositoryToken } from '@nestjs/typeorm';
import { BulkMessageService, resolveFinalBatchStatus } from './bulk-message.service';
import { BulkMessageService, resolveFinalBatchStatus, sanitizeBatchError } from './bulk-message.service';
import { MessageBatch, BatchStatus } from './entities/message-batch.entity';
import { MessageStatus } from './entities/message.entity';
import { SessionService } from '../session/session.service';
import { MessageService } from './message.service';
import { SsrfBlockedError } from '../../common/security/ssrf-guard';

/** Regression lock for the terminal-status decision (cancel-clobber + stopOnError overwrite bugs). */
describe('resolveFinalBatchStatus', () => {
Expand Down Expand Up @@ -43,6 +46,7 @@ describe('BulkMessageService.onApplicationBootstrap', () => {
BulkMessageService,
{ provide: getRepositoryToken(MessageBatch, 'data'), useValue: repo },
{ provide: SessionService, useValue: { getEngine: jest.fn() } },
{ provide: MessageService, useValue: { saveOutgoingMessage: jest.fn() } },
],
}).compile();
service = module.get<BulkMessageService>(BulkMessageService);
Expand All @@ -65,3 +69,105 @@ describe('BulkMessageService.onApplicationBootstrap', () => {
expect(repo.save).not.toHaveBeenCalled();
});
});

/** Regression lock: an SSRF block (which names the internal host/IP) must not be stored verbatim. */
describe('sanitizeBatchError', () => {
it('replaces an SSRF block message with a generic one (no internal address leak)', () => {
const result = sanitizeBatchError(
new SsrfBlockedError('Host evil.example resolves to a blocked internal address: 169.254.169.254'),
);
expect(result.message).not.toContain('169.254.169.254');
expect(result.code).toBe('SEND_BLOCKED');
});

it('passes through an ordinary error message under SEND_FAILED', () => {
const result = sanitizeBatchError(new Error('Session is not active'));
expect(result).toEqual({ code: 'SEND_FAILED', message: 'Session is not active' });
});
});

describe('BulkMessageService.processBatch', () => {
let service: BulkMessageService;
let repo: { findOne: jest.Mock; save: jest.Mock };
let messageService: { saveOutgoingMessage: jest.Mock };
let engine: { sendTextMessage: jest.Mock };
let sessionService: { getEngine: jest.Mock; findOne: jest.Mock };

const makeBatch = (messageCount: number): MessageBatch =>
({
id: 'b1',
batchId: 'bx',
sessionId: 's1',
status: BatchStatus.PENDING,
currentIndex: 0,
messages: Array.from({ length: messageCount }, (_, i) => ({
chatId: `c${i}@c.us`,
type: 'text',
content: { text: 'hi' },
})),
options: { delayBetweenMessages: 0, randomizeDelay: false, stopOnError: false },
progress: { total: messageCount, sent: 0, failed: 0, pending: messageCount, cancelled: 0 },
results: [],
}) as unknown as MessageBatch;

beforeEach(async () => {
engine = { sendTextMessage: jest.fn().mockResolvedValue({ id: 'wa1', timestamp: 111 }) };
sessionService = {
getEngine: jest.fn().mockReturnValue(engine),
findOne: jest.fn().mockResolvedValue({ phone: '628' }),
};
messageService = { saveOutgoingMessage: jest.fn().mockResolvedValue(undefined) };
repo = { findOne: jest.fn(), save: jest.fn().mockImplementation(b => Promise.resolve(b)) };
const module: TestingModule = await Test.createTestingModule({
providers: [
BulkMessageService,
{ provide: getRepositoryToken(MessageBatch, 'data'), useValue: repo },
{ provide: SessionService, useValue: sessionService },
{ provide: MessageService, useValue: messageService },
],
}).compile();
service = module.get<BulkMessageService>(BulkMessageService);
});

const runProcessBatch = (): Promise<void> =>
(service as unknown as { processBatch: (id: string) => Promise<void> }).processBatch('b1');

it('persists every sent message so it appears in chat history / stats', async () => {
repo.findOne.mockResolvedValue(makeBatch(1));

await runProcessBatch();

expect(messageService.saveOutgoingMessage).toHaveBeenCalledWith(
's1',
expect.objectContaining({
waMessageId: 'wa1',
chatId: 'c0@c.us',
type: 'text',
status: MessageStatus.SENT,
}),
);
});

it('stops sending when the batch is cancelled in the DB by another instance/restart', async () => {
// First load is the running batch; the cadence re-read reports a CANCELLED status.
repo.findOne.mockResolvedValueOnce(makeBatch(3)).mockResolvedValue({ status: BatchStatus.CANCELLED });

await runProcessBatch();

// Only the first message (before the cadence re-read saw CANCELLED) was sent.
expect(engine.sendTextMessage).toHaveBeenCalledTimes(1);
});

it('does not clobber a CANCELLED that landed after the last cadence read (final status stays CANCELLED)', async () => {
const batch = makeBatch(1);
repo.findOne
.mockResolvedValueOnce(batch) // processBatch initial load
.mockResolvedValueOnce(batch) // cadence re-read (i=0) — still PROCESSING
.mockResolvedValue({ status: BatchStatus.CANCELLED }); // FINAL pre-save re-read — cancel landed late

await runProcessBatch();

const savedStatuses = (repo.save.mock.calls as [MessageBatch][]).map(c => c[0].status);
expect(savedStatuses[savedStatuses.length - 1]).toBe(BatchStatus.CANCELLED);
});
});
91 changes: 83 additions & 8 deletions src/modules/message/bulk-message.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@ import {
BatchMessageResult,
} from './entities/message-batch.entity';
import { SendBulkMessageDto } from './dto/bulk-message.dto';
import { MessageStatus } from './entities/message.entity';
import { SessionService } from '../session/session.service';
import { IWhatsAppEngine } from '../../engine/interfaces/whatsapp-engine.interface';
import { MessageService } from './message.service';
import { SsrfBlockedError } from '../../common/security/ssrf-guard';
import { IWhatsAppEngine, MessageResult } from '../../engine/interfaces/whatsapp-engine.interface';

// Type definitions for bulk message content
interface BulkMessageContent {
Expand Down Expand Up @@ -40,6 +43,18 @@ export function resolveFinalBatchStatus(
return progress.failed > 0 && progress.sent === 0 ? BatchStatus.FAILED : BatchStatus.COMPLETED;
}

/**
* Build the error stored on a batch result. An SSRF block names the internal host/IP it refused, so
* it must never be persisted/returned verbatim — it would be readable via GET batch status. Map it to
* a generic, code-tagged message; ordinary errors keep their (non-sensitive) message.
*/
export function sanitizeBatchError(error: unknown): { code: string; message: string } {
if (error instanceof SsrfBlockedError) {
return { code: 'SEND_BLOCKED', message: 'Destination address is not allowed' };
}
return { code: 'SEND_FAILED', message: error instanceof Error ? error.message : String(error) };
}

@Injectable()
export class BulkMessageService implements OnApplicationBootstrap {
private readonly logger = new Logger(BulkMessageService.name);
Expand All @@ -49,6 +64,7 @@ export class BulkMessageService implements OnApplicationBootstrap {
@InjectRepository(MessageBatch, 'data')
private readonly batchRepository: Repository<MessageBatch>,
private readonly sessionService: SessionService,
private readonly messageService: MessageService,
) {}

/**
Expand Down Expand Up @@ -182,6 +198,7 @@ export class BulkMessageService implements OnApplicationBootstrap {

const results: BatchMessageResult[] = batch.results || [];
let stoppedOnError = false;
let cancelledByDb = false;

for (let i = batch.currentIndex; i < batch.messages.length; i++) {
// Check for cancellation
Expand Down Expand Up @@ -209,17 +226,21 @@ export class BulkMessageService implements OnApplicationBootstrap {
batch.progress.sent++;
batch.progress.pending--;

// Persist like a single send so the message shows in chat history + stats. The engine echo
// (onMessageCreate) fires the webhook/WS but does NOT write the DB, so without this the
// bulk-sent message is invisible to the messages table.
await this.persistSentMessage(batch.sessionId, msg.chatId, msg.type, content, messageResult);

this.logger.debug(`Batch ${batch.batchId}: Sent message ${i + 1}/${batch.messages.length} to ${msg.chatId}`);
} catch (error) {
result.status = BatchMessageStatus.FAILED;
result.error = {
code: 'SEND_FAILED',
message: String(error),
};
// Sanitize: an SSRF block names an internal address — never store/return/log it verbatim.
const sanitized = sanitizeBatchError(error);
result.error = sanitized;
batch.progress.failed++;
batch.progress.pending--;

this.logger.warn(`Batch ${batch.batchId}: Failed message ${i + 1} to ${msg.chatId}: ${String(error)}`);
this.logger.warn(`Batch ${batch.batchId}: Failed message ${i + 1} to ${msg.chatId}: ${sanitized.message}`);

if (batch.options.stopOnError) {
batch.status = BatchStatus.FAILED;
Expand All @@ -235,6 +256,15 @@ export class BulkMessageService implements OnApplicationBootstrap {

// Save progress periodically (every 10 messages or last message)
if (i % 10 === 0 || i === batch.messages.length - 1) {
// Honor a cancellation issued by ANOTHER instance / after a restart — the in-memory Map only
// sees same-process cancels. Re-read the status BEFORE saving so we don't clobber a CANCELLED
// back to PROCESSING.
const fresh = await this.batchRepository.findOne({ where: { id: batch.id }, select: ['status'] });
if (fresh?.status === BatchStatus.CANCELLED) {
cancelledByDb = true;
this.logger.log(`Batch ${batch.batchId} cancelled (DB) at index ${i}`);
break;
}
await this.batchRepository.save(batch);
}

Expand All @@ -247,7 +277,16 @@ export class BulkMessageService implements OnApplicationBootstrap {

// Final update. NOTE: `batch` still holds the in-memory PROCESSING status from the start, so a
// cancellation persisted by cancelBatch would be overwritten if we saved without re-deriving it.
const cancelled = !this.processingBatches.get(batch.id);
// A cancel may also have landed AFTER the last cadence re-read (multi-replica / post-restart); the
// unconditional save below would clobber it back to a terminal non-cancelled status, so re-read
// once more here unless we already know the batch was cancelled.
if (!cancelledByDb) {
const fresh = await this.batchRepository.findOne({ where: { id: batch.id }, select: ['status'] });
if (fresh?.status === BatchStatus.CANCELLED) {
cancelledByDb = true;
}
}
const cancelled = cancelledByDb || !this.processingBatches.get(batch.id);
batch.status = resolveFinalBatchStatus(cancelled, stoppedOnError, batch.progress);
if (cancelled) {
// Reconcile the counters the same way cancelBatch does, so the persisted state is consistent.
Expand Down Expand Up @@ -295,12 +334,48 @@ export class BulkMessageService implements OnApplicationBootstrap {
return processValue(content) as BulkMessageContent;
}

/**
* Persist a successfully-sent batch message via the shared single-send persistence path, so it
* shows up in chat history and stats like any other outgoing message. Best-effort: a persistence
* failure must never flip a message that actually went out to FAILED.
*/
private async persistSentMessage(
sessionId: string,
chatId: string,
type: string,
content: BulkMessageContent,
result: MessageResult,
): Promise<void> {
const media = content.image ?? content.video ?? content.audio ?? content.document;
try {
await this.messageService.saveOutgoingMessage(sessionId, {
waMessageId: result.id,
chatId,
body: content.text ?? content.caption ?? '',
type,
timestamp: result.timestamp,
status: MessageStatus.SENT,
metadata: media
? {
media: {
mimetype: media.mimetype,
data: media.url ?? media.base64,
filename: content.document?.filename,
},
}
: undefined,
});
} catch (error) {
this.logger.warn(`Batch message persisted-after-send failed: ${String(error)}`);
}
}

private sendMessage(
engine: IWhatsAppEngine,
chatId: string,
type: string,
content: BulkMessageContent,
): Promise<{ id: string }> {
): Promise<MessageResult> {
switch (type) {
case 'text':
return engine.sendTextMessage(chatId, content.text || '');
Expand Down
5 changes: 3 additions & 2 deletions src/modules/message/message.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -480,9 +480,10 @@ export class MessageService {

/**
* Save outgoing message to database.
* When called before sending, creates a record with PENDING status.
* When called before sending, creates a record with PENDING status; bulk send reuses this after a
* successful send (status SENT) so batch messages are persisted like single sends.
*/
private async saveOutgoingMessage(
async saveOutgoingMessage(
sessionId: string,
data: {
waMessageId?: string;
Expand Down
Loading