Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/cache/cache.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -262,4 +262,20 @@ export class CacheService {
return false;
}
}

/**
* Atomic SET NX EX — sets key only if it does not exist.
* Returns true if the lock was acquired, false if it was already held.
* ttlSeconds: lock expiry to prevent stale locks on crash.
*/
async setNx(key: string, value: string, ttlSeconds: number): Promise<boolean> {
try {
const client = (this.cacheManager as any).store.getClient();
const result = await client.set(key, value, 'EX', ttlSeconds, 'NX');
return result === 'OK';
} catch (error) {
this.logger.error(`Error in setNx for key ${key}:`, error);
return false;
}
}
}
172 changes: 172 additions & 0 deletions src/transactions/transaction-reminders.service.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
import { TransactionRemindersService } from './transaction-reminders.service';

const mockMilestone = {
id: 'ms-1',
title: 'Inspection',
expectedDate: new Date('2026-07-01'),
transaction: { buyerId: 'buyer-1', sellerId: 'seller-1' },
};

const mockPrisma = {
transactionMilestone: {
findMany: jest.fn(),
update: jest.fn(),
},
userPreferences: {
findUnique: jest.fn(),
},
};

const mockNotifications = {
sendNotification: jest.fn(),
};

const mockCache = {
setNx: jest.fn(),
del: jest.fn(),
};

function makeService(): TransactionRemindersService {
return new TransactionRemindersService(
mockPrisma as any,
mockNotifications as any,
mockCache as any,
);
}

describe('TransactionRemindersService', () => {
beforeEach(() => {
jest.clearAllMocks();
mockNotifications.sendNotification.mockResolvedValue(undefined);
mockPrisma.transactionMilestone.update.mockResolvedValue({});
mockPrisma.userPreferences.findUnique.mockResolvedValue(null);
});

describe('distributed lock — lock acquired', () => {
it('processes reminders when lock is successfully acquired', async () => {
mockCache.setNx.mockResolvedValue(true);
mockPrisma.transactionMilestone.findMany.mockResolvedValue([mockMilestone]);

const result = await makeService().sendDeadlineReminders();

expect(mockCache.setNx).toHaveBeenCalledWith('lock:transaction-reminders', '1', 300);
expect(mockPrisma.transactionMilestone.findMany).toHaveBeenCalled();
expect(result.sent).toBeGreaterThan(0);
});
});

describe('distributed lock — lock unavailable', () => {
it('skips processing when lock is already held by another instance', async () => {
mockCache.setNx.mockResolvedValue(false);

const result = await makeService().sendDeadlineReminders();

expect(mockPrisma.transactionMilestone.findMany).not.toHaveBeenCalled();
expect(mockNotifications.sendNotification).not.toHaveBeenCalled();
expect(result.sent).toBe(0);
});
});

describe('distributed lock — lock release', () => {
it('releases lock after successful processing', async () => {
mockCache.setNx.mockResolvedValue(true);
mockPrisma.transactionMilestone.findMany.mockResolvedValue([]);

await makeService().sendDeadlineReminders();

expect(mockCache.del).toHaveBeenCalledWith('lock:transaction-reminders');
});

it('releases lock even when processing throws an error', async () => {
mockCache.setNx.mockResolvedValue(true);
mockPrisma.transactionMilestone.findMany.mockRejectedValue(new Error('DB error'));

await expect(makeService().sendDeadlineReminders()).rejects.toThrow('DB error');

expect(mockCache.del).toHaveBeenCalledWith('lock:transaction-reminders');
});
});

describe('regression — existing reminder behavior', () => {
it('sends notifications to buyer and seller for a pending milestone', async () => {
mockCache.setNx.mockResolvedValue(true);
mockPrisma.transactionMilestone.findMany.mockResolvedValue([mockMilestone]);

const result = await makeService().sendDeadlineReminders(3);

expect(mockNotifications.sendNotification).toHaveBeenCalledWith(
'buyer-1',
'Transaction Deadline Reminder',
expect.stringContaining('Inspection'),
'TRANSACTION_UPDATE',
{ milestoneId: 'ms-1' },
);
expect(mockNotifications.sendNotification).toHaveBeenCalledWith(
'seller-1',
'Transaction Deadline Reminder',
expect.stringContaining('Inspection'),
'TRANSACTION_UPDATE',
{ milestoneId: 'ms-1' },
);
expect(result.sent).toBe(2);
});

it('marks milestone as reminded after sending', async () => {
mockCache.setNx.mockResolvedValue(true);
mockPrisma.transactionMilestone.findMany.mockResolvedValue([mockMilestone]);

await makeService().sendDeadlineReminders();

expect(mockPrisma.transactionMilestone.update).toHaveBeenCalledWith(
expect.objectContaining({
where: { id: 'ms-1' },
data: expect.objectContaining({ reminderSentAt: expect.any(Date) }),
}),
);
});

it('does not send duplicate notification when buyer and seller are the same user', async () => {
mockCache.setNx.mockResolvedValue(true);
const sameUserMilestone = {
...mockMilestone,
transaction: { buyerId: 'user-1', sellerId: 'user-1' },
};
mockPrisma.transactionMilestone.findMany.mockResolvedValue([sameUserMilestone]);

const result = await makeService().sendDeadlineReminders();

expect(mockNotifications.sendNotification).toHaveBeenCalledTimes(1);
expect(result.sent).toBe(1);
});

it('respects buyer opt-out preference', async () => {
mockCache.setNx.mockResolvedValue(true);
mockPrisma.transactionMilestone.findMany.mockResolvedValue([mockMilestone]);
mockPrisma.userPreferences.findUnique.mockImplementation(({ where }: any) =>
where.userId === 'buyer-1' ? { optOutReminders: true } : null,
);

const result = await makeService().sendDeadlineReminders();

expect(mockNotifications.sendNotification).toHaveBeenCalledTimes(1);
expect(mockNotifications.sendNotification).toHaveBeenCalledWith(
'seller-1',
expect.any(String),
expect.any(String),
expect.any(String),
expect.any(Object),
);
expect(result.sent).toBe(1);
});

it('returns sent: 0 when no milestones are due', async () => {
mockCache.setNx.mockResolvedValue(true);
mockPrisma.transactionMilestone.findMany.mockResolvedValue([]);

const result = await makeService().sendDeadlineReminders();

expect(result.sent).toBe(0);
expect(mockNotifications.sendNotification).not.toHaveBeenCalled();
});
});
});
19 changes: 19 additions & 0 deletions src/transactions/transaction-reminders.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
import { Injectable, Logger } from '@nestjs/common';
import { PrismaService } from '../database/prisma.service';
import { NotificationsService } from '../notifications/notifications.service';
import { CacheService } from '../cache/cache.service';

const LOCK_KEY = 'lock:transaction-reminders';
const LOCK_TTL_SECONDS = 300; // 5 minutes — exceeds expected job runtime

@Injectable()
export class TransactionRemindersService {
Expand All @@ -11,9 +15,24 @@ export class TransactionRemindersService {
constructor(
private readonly prisma: PrismaService,
private readonly notificationsService: NotificationsService,
private readonly cacheService: CacheService,
) {}

async sendDeadlineReminders(daysAhead: number = 3): Promise<{ sent: number }> {
const acquired = await this.cacheService.setNx(LOCK_KEY, '1', LOCK_TTL_SECONDS);
if (!acquired) {
this.logger.log('Transaction reminders lock already held — skipping execution');
return { sent: 0 };
}

try {
return await this.processReminders(daysAhead);
} finally {
await this.cacheService.del(LOCK_KEY);
}
}

private async processReminders(daysAhead: number): Promise<{ sent: number }> {
const now = new Date();
const cutoff = new Date(now.getTime() + daysAhead * 24 * 60 * 60 * 1000);

Expand Down
9 changes: 8 additions & 1 deletion src/transactions/transactions.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,16 @@ import { PrismaModule } from '../database/prisma.module';
import { BlockchainModule } from '../blockchain/blockchain.module';
import { NotificationsModule } from '../notifications/notifications.module';
import { CommissionsModule } from '../commissions/commissions.module';
import { CacheModuleConfig } from '../cache/cache.module';

@Module({
imports: [PrismaModule, BlockchainModule, NotificationsModule, CommissionsModule],
imports: [
PrismaModule,
BlockchainModule,
NotificationsModule,
CommissionsModule,
CacheModuleConfig,
],
providers: [
TransactionsService,
DisputesService,
Expand Down
Loading