Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
node_modules/
dist/
.env
.atl/
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,3 +195,5 @@ Your PR description must include:
**There is no time limit stated intentionally.** A focused solution delivered in four hours tells us more than an exhaustive one delivered in two days. Prioritise depth of reasoning over breadth of features.

If you have questions, open an issue on this repository. We respond to issues within one business day.


43 changes: 43 additions & 0 deletions apps/api/src/app.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { TypeOrmModule } from '@nestjs/typeorm';
import { createDatabaseConfig } from '@app/shared/config/database.config';
import { ConsumerAckEntity } from './consumers/consumer-ack.entity';
import { ConsumerProcessingService } from './consumers/consumer-processing.service';
import { ConsumerReceiptEntity } from './consumers/consumer-receipt.entity';
import { FraudConsumer } from './consumers/fraud.consumer';
import { KafkaConsumerRunnerService } from './consumers/kafka-consumer-runner.service';
import { LedgerConsumer } from './consumers/ledger.consumer';
import { NotifyConsumer } from './consumers/notify.consumer';
import { DltPublisher } from './dlt/dlt.publisher';
import { OutboxEventEntity } from './outbox/outbox.entity';
import { PaymentController } from './payments/payment.controller';
import { PaymentEntity } from './payments/payment.entity';
import { PaymentService } from './payments/payment.service';
import { StatusQueryService } from './payments/status-query.service';

const entities = [PaymentEntity, OutboxEventEntity, ConsumerReceiptEntity, ConsumerAckEntity];

@Module({
controllers: [PaymentController],
imports: [
ConfigModule.forRoot({
isGlobal: true,
}),
TypeOrmModule.forRootAsync({
useFactory: () => createDatabaseConfig(entities),
}),
TypeOrmModule.forFeature(entities),
],
providers: [
PaymentService,
StatusQueryService,
DltPublisher,
ConsumerProcessingService,
KafkaConsumerRunnerService,
FraudConsumer,
LedgerConsumer,
NotifyConsumer,
],
})
export class AppModule {}
54 changes: 54 additions & 0 deletions apps/api/src/consumers/consumer-ack.entity.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import {
Column,
CreateDateColumn,
Entity,
Index,
JoinColumn,
ManyToOne,
PrimaryGeneratedColumn,
UpdateDateColumn,
} from 'typeorm';
import { PaymentEntity } from '../payments/payment.entity';

export const CONSUMER_ACK_PENDING = 'pending';
export const CONSUMER_ACK_SUCCEEDED = 'succeeded';
export const CONSUMER_ACK_FAILED = 'failed';

@Entity({ name: 'consumer_acknowledgements' })
@Index('uq_consumer_ack_payment_consumer', ['paymentId', 'consumerName'], { unique: true })
@Index('idx_consumer_ack_status', ['status'])
export class ConsumerAckEntity {
@PrimaryGeneratedColumn('uuid')
id!: string;

@Column({ name: 'payment_id', type: 'uuid' })
paymentId!: string;

@Column({ name: 'consumer_name', type: 'varchar', length: 32 })
consumerName!: string;

@Column({ name: 'event_id', type: 'uuid' })
eventId!: string;

@Column({ type: 'varchar', length: 16, default: CONSUMER_ACK_PENDING })
status!: string;

@Column({ name: 'error_code', type: 'varchar', length: 64, nullable: true })
errorCode!: string | null;

@Column({ name: 'error_detail', type: 'text', nullable: true })
errorDetail!: string | null;

@Column({ name: 'acknowledged_at', type: 'timestamptz', nullable: true })
acknowledgedAt!: Date | null;

@CreateDateColumn({ name: 'created_at', type: 'timestamptz' })
createdAt!: Date;

@UpdateDateColumn({ name: 'updated_at', type: 'timestamptz' })
updatedAt!: Date;

@ManyToOne(() => PaymentEntity, (payment) => payment.acknowledgements, { onDelete: 'CASCADE' })
@JoinColumn({ name: 'payment_id' })
payment!: PaymentEntity;
}
98 changes: 98 additions & 0 deletions apps/api/src/consumers/consumer-processing.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import { Injectable, Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { PaymentConsumerEvent } from '@app/shared/events/payment-created.event';
import { Repository } from 'typeorm';
import { DltPublisher } from '../dlt/dlt.publisher';
import {
CONSUMER_ACK_FAILED,
CONSUMER_ACK_SUCCEEDED,
ConsumerAckEntity,
} from './consumer-ack.entity';
import { ConsumerReceiptEntity } from './consumer-receipt.entity';

@Injectable()
export class ConsumerProcessingService {
private readonly logger = new Logger(ConsumerProcessingService.name);

constructor(
@InjectRepository(ConsumerReceiptEntity)
private readonly receiptRepository: Repository<ConsumerReceiptEntity>,
@InjectRepository(ConsumerAckEntity)
private readonly ackRepository: Repository<ConsumerAckEntity>,
private readonly dltPublisher: DltPublisher,
) {}

async process(
consumerName: string,
event: PaymentConsumerEvent,
sideEffect: () => Promise<void>,
): Promise<'processed' | 'duplicate'> {
const existing = await this.receiptRepository.findOne({
where: { consumerName, eventId: event.eventId },
});

if (existing) {
existing.deliveryCount += 1;
await this.receiptRepository.save(existing);
return 'duplicate';
}

await this.receiptRepository.save(
this.receiptRepository.create({
consumerName,
eventId: event.eventId,
paymentId: event.paymentId,
}),
);

try {
await sideEffect();

await this.ackRepository.save(
this.ackRepository.create({
paymentId: event.paymentId,
consumerName,
eventId: event.eventId,
status: CONSUMER_ACK_SUCCEEDED,
acknowledgedAt: new Date(),
errorCode: null,
errorDetail: null,
}),
);

return 'processed';
} catch (error) {
const attempt = event.deliveryAttempt ?? 1;
const maxRetries = event.maxRetries ?? 3;
const exhausted = attempt >= maxRetries;
const detail = error instanceof Error ? error.message : String(error);

await this.ackRepository.save(
this.ackRepository.create({
paymentId: event.paymentId,
consumerName,
eventId: event.eventId,
status: CONSUMER_ACK_FAILED,
acknowledgedAt: new Date(),
errorCode: exhausted ? 'RETRY_EXHAUSTED' : 'CONSUMER_ERROR',
errorDetail: detail,
}),
);

if (exhausted) {
await this.dltPublisher.publishPaymentFailed({
paymentId: event.paymentId,
eventId: event.eventId,
consumerName,
reason: detail,
});
}

this.logger.warn(
`Consumer ${consumerName} failed for event ${event.eventId}. exhausted=${exhausted}`,
);

throw error;
}
}
}
31 changes: 31 additions & 0 deletions apps/api/src/consumers/consumer-receipt.entity.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { Column, CreateDateColumn, Entity, Index, PrimaryGeneratedColumn, UpdateDateColumn } from 'typeorm';

export const CONSUMER_NAME_FRAUD = 'fraud';
export const CONSUMER_NAME_LEDGER = 'ledger';
export const CONSUMER_NAME_NOTIFY = 'notify';

@Entity({ name: 'consumer_receipts' })
@Index('uq_consumer_receipts_consumer_event', ['consumerName', 'eventId'], { unique: true })
@Index('idx_consumer_receipts_payment_id', ['paymentId'])
export class ConsumerReceiptEntity {
@PrimaryGeneratedColumn('uuid')
id!: string;

@Column({ name: 'consumer_name', type: 'varchar', length: 32 })
consumerName!: string;

@Column({ name: 'event_id', type: 'uuid' })
eventId!: string;

@Column({ name: 'payment_id', type: 'uuid' })
paymentId!: string;

@Column({ name: 'delivery_count', type: 'integer', default: 1 })
deliveryCount!: number;

@CreateDateColumn({ name: 'first_seen_at', type: 'timestamptz' })
firstSeenAt!: Date;

@UpdateDateColumn({ name: 'last_seen_at', type: 'timestamptz' })
lastSeenAt!: Date;
}
22 changes: 22 additions & 0 deletions apps/api/src/consumers/fraud.consumer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { Injectable, Logger } from '@nestjs/common';
import { PaymentConsumerEvent } from '@app/shared/events/payment-created.event';
import { CONSUMER_NAME_FRAUD } from './consumer-receipt.entity';
import { ConsumerProcessingService } from './consumer-processing.service';

@Injectable()
export class FraudConsumer {
private readonly logger = new Logger(FraudConsumer.name);

constructor(private readonly processingService: ConsumerProcessingService) {}

async handle(event: PaymentConsumerEvent): Promise<'processed' | 'duplicate'> {
return this.processingService.process(CONSUMER_NAME_FRAUD, event, async () => {
const parsedAmount = Number(event.amount);
if (Number.isFinite(parsedAmount) && parsedAmount.toFixed(2) === '13.37') {
throw new Error('Fraud rule rejected payment amount 13.37');
}

this.logger.log(`Fraud scoring payment ${event.paymentId}`);
});
}
}
Loading