From 329e7cb564532e3a6737f2c58207fe9b46d4284b Mon Sep 17 00:00:00 2001 From: Justin Gasper Date: Thu, 13 Nov 2025 12:52:35 +1100 Subject: [PATCH 1/2] Failsafe for lack of Kafka connection, still able to create F2F reviews --- .../services/first2finish.service.ts | 12 +++++++ .../services/scheduler.service.spec.ts | 34 +++++++++++++++++++ src/autopilot/services/scheduler.service.ts | 17 ++++++++++ 3 files changed, 63 insertions(+) diff --git a/src/autopilot/services/first2finish.service.ts b/src/autopilot/services/first2finish.service.ts index 4db2647..ad6bcbf 100644 --- a/src/autopilot/services/first2finish.service.ts +++ b/src/autopilot/services/first2finish.service.ts @@ -198,6 +198,18 @@ export class First2FinishService { } } + async handleIterativePhaseClosed(challengeId: string): Promise { + try { + await this.prepareNextIterativeReview(challengeId); + } catch (error) { + const err = error as Error; + this.logger.error( + `Failed to refresh iterative review submissions for challenge ${challengeId} after phase closure: ${err.message}`, + err.stack, + ); + } + } + private async processFirst2FinishSubmission( challenge: IChallenge, submissionId?: string, diff --git a/src/autopilot/services/scheduler.service.spec.ts b/src/autopilot/services/scheduler.service.spec.ts index 2950fc5..5b45883 100644 --- a/src/autopilot/services/scheduler.service.spec.ts +++ b/src/autopilot/services/scheduler.service.spec.ts @@ -23,6 +23,8 @@ import { PhaseTransitionPayload, } from '../interfaces/autopilot.interface'; import { FinanceApiService } from '../../finance/finance-api.service'; +import { First2FinishService } from './first2finish.service'; +import { ITERATIVE_REVIEW_PHASE_NAME } from '../constants/review.constants'; type MockedMethod any> = jest.Mock< ReturnType, @@ -99,6 +101,7 @@ describe('SchedulerService (review phase deferral)', () => { let resourcesService: jest.Mocked; let phaseChangeNotificationService: jest.Mocked; let configService: jest.Mocked; + let first2FinishService: jest.Mocked; beforeEach(() => { kafkaService = { @@ -182,6 +185,10 @@ describe('SchedulerService (review phase deferral)', () => { get: jest.fn().mockReturnValue(undefined), } as unknown as jest.Mocked; + first2FinishService = { + handleIterativePhaseClosed: jest.fn().mockResolvedValue(undefined), + } as unknown as jest.Mocked; + scheduler = new SchedulerService( kafkaService as unknown as KafkaService, challengeApiService as unknown as ChallengeApiService, @@ -192,6 +199,7 @@ describe('SchedulerService (review phase deferral)', () => { resourcesService, phaseChangeNotificationService, configService, + first2FinishService, ); }); @@ -274,6 +282,32 @@ describe('SchedulerService (review phase deferral)', () => { ); }); + it('refreshes submissions when iterative review closes', async () => { + const payload = createPayload({ + phaseTypeName: ITERATIVE_REVIEW_PHASE_NAME, + }); + const phaseDetails = createPhase({ + id: payload.phaseId, + phaseId: payload.phaseId, + name: ITERATIVE_REVIEW_PHASE_NAME, + isOpen: true, + }); + + challengeApiService.getPhaseDetails.mockResolvedValue(phaseDetails); + reviewService.getPendingReviewCount.mockResolvedValue(0); + challengeApiService.advancePhase.mockResolvedValue({ + success: true, + message: 'closed iterative review', + updatedPhases: [], + }); + + await scheduler.advancePhase(payload); + + expect( + first2FinishService.handleIterativePhaseClosed, + ).toHaveBeenCalledWith(payload.challengeId); + }); + it('assigns checkpoint winners after closing checkpoint review', async () => { const payload = createPayload({ phaseId: 'checkpoint-phase', diff --git a/src/autopilot/services/scheduler.service.ts b/src/autopilot/services/scheduler.service.ts index 2828c69..4f2fbeb 100644 --- a/src/autopilot/services/scheduler.service.ts +++ b/src/autopilot/services/scheduler.service.ts @@ -27,6 +27,7 @@ import { APPROVAL_PHASE_NAMES, SUBMISSION_PHASE_NAME, TOPGEAR_SUBMISSION_PHASE_NAME, + ITERATIVE_REVIEW_PHASE_NAME, getRoleNamesForPhase, isPostMortemPhaseName, } from '../constants/review.constants'; @@ -45,6 +46,7 @@ import { getMemberReviewerConfigs, getReviewerConfigsForPhase, } from '../utils/reviewer.utils'; +import { First2FinishService } from './first2finish.service'; const PHASE_QUEUE_NAME = 'autopilot-phase-transitions'; const PHASE_QUEUE_PREFIX = '{autopilot-phase-transitions}'; @@ -108,6 +110,7 @@ export class SchedulerService implements OnModuleInit, OnModuleDestroy { private readonly resourcesService: ResourcesService, private readonly phaseChangeNotificationService: PhaseChangeNotificationService, private readonly configService: ConfigService, + private readonly first2FinishService: First2FinishService, ) { this.submitterRoles = getNormalizedStringArray( this.configService.get('autopilot.submitterRoles'), @@ -861,6 +864,20 @@ export class SchedulerService implements OnModuleInit, OnModuleDestroy { ); } + if (operation === 'close' && phaseName === ITERATIVE_REVIEW_PHASE_NAME) { + try { + await this.first2FinishService.handleIterativePhaseClosed( + data.challengeId, + ); + } catch (error) { + const err = error as Error; + this.logger.error( + `Failed to refresh iterative submissions for challenge ${data.challengeId} after closing phase ${data.phaseId}: ${err.message}`, + err.stack, + ); + } + } + if (operation === 'open') { try { await this.phaseReviewService.handlePhaseOpened( From fd58892b2a6155dd2e31ba67ace8a7b22d37ca8b Mon Sep 17 00:00:00 2001 From: Justin Gasper Date: Thu, 13 Nov 2025 13:20:42 +1100 Subject: [PATCH 2/2] Kafka resiliency changes --- src/health/kafka.health.spec.ts | 113 ++++++++++++ src/health/kafka.health.ts | 39 +++- src/kafka/kafka.service.ts | 312 +++++++++++++++++++++++++++++--- 3 files changed, 434 insertions(+), 30 deletions(-) create mode 100644 src/health/kafka.health.spec.ts diff --git a/src/health/kafka.health.spec.ts b/src/health/kafka.health.spec.ts new file mode 100644 index 0000000..f2fe0c1 --- /dev/null +++ b/src/health/kafka.health.spec.ts @@ -0,0 +1,113 @@ +import { HealthCheckError } from '@nestjs/terminus'; +import { KafkaHealthIndicator } from './kafka.health'; +import { + KafkaConnectionState, + KafkaService, +} from '../kafka/kafka.service'; + +jest.mock('../kafka/kafka.service', () => { + const KafkaConnectionStateMock = { + initializing: 'initializing', + ready: 'ready', + reconnecting: 'reconnecting', + failed: 'failed', + disabled: 'disabled', + } as const; + + class KafkaServiceMock { + isConnected = jest.fn(); + getKafkaStatus = jest.fn(); + } + + return { + KafkaConnectionState: KafkaConnectionStateMock, + KafkaService: KafkaServiceMock, + }; +}); + +jest.mock('@platformatic/kafka', () => { + class MockConsumer { + consume = jest.fn(); + isConnected = jest.fn().mockReturnValue(true); + close = jest.fn(); + on = jest.fn(); + } + + class MockProducer { + metadata = jest.fn(); + send = jest.fn(); + close = jest.fn(); + isConnected = jest.fn().mockReturnValue(true); + } + + return { + Consumer: MockConsumer, + Producer: MockProducer, + MessagesStream: class {}, + ProduceAcks: { ALL: 'all' }, + jsonDeserializer: jest.fn(), + jsonSerializer: jest.fn(), + stringDeserializer: jest.fn(), + stringSerializer: jest.fn(), + }; +}); + +describe('KafkaHealthIndicator', () => { + let kafkaService: jest.Mocked< + Pick + >; + let indicator: KafkaHealthIndicator; + + beforeEach(() => { + kafkaService = { + isConnected: jest.fn(), + getKafkaStatus: jest.fn(), + } as unknown as jest.Mocked< + Pick + >; + + indicator = new KafkaHealthIndicator( + kafkaService as unknown as KafkaService, + ); + }); + + it('returns a healthy status when Kafka is connected', async () => { + kafkaService.getKafkaStatus.mockReturnValue({ + state: KafkaConnectionState.ready, + reconnectAttempts: 0, + }); + kafkaService.isConnected.mockResolvedValue(true); + + const result = await indicator.isHealthy('kafka'); + + expect(result.kafka.status).toBe('up'); + expect(result.kafka.state).toBe(KafkaConnectionState.ready); + expect(result.kafka.reconnectAttempts).toBe(0); + }); + + it('throws when Kafka state is failed', async () => { + kafkaService.getKafkaStatus.mockReturnValue({ + state: KafkaConnectionState.failed, + reconnectAttempts: 3, + reason: 'Kafka reconnection attempts exhausted', + }); + + await expect(indicator.isHealthy('kafka')).rejects.toBeInstanceOf( + HealthCheckError, + ); + expect(kafkaService.isConnected).not.toHaveBeenCalled(); + }); + + it('throws when Kafka connections are not ready', async () => { + kafkaService.getKafkaStatus.mockReturnValue({ + state: KafkaConnectionState.ready, + reconnectAttempts: 1, + reason: 'Kafka is not connected', + }); + kafkaService.isConnected.mockResolvedValue(false); + + await expect(indicator.isHealthy('kafka')).rejects.toBeInstanceOf( + HealthCheckError, + ); + }); +}); diff --git a/src/health/kafka.health.ts b/src/health/kafka.health.ts index 1ba6bfc..843e25c 100644 --- a/src/health/kafka.health.ts +++ b/src/health/kafka.health.ts @@ -1,6 +1,9 @@ import { Injectable } from '@nestjs/common'; import { HealthIndicator, HealthCheckError } from '@nestjs/terminus'; -import { KafkaService } from '../kafka/kafka.service'; +import { + KafkaConnectionState, + KafkaService, +} from '../kafka/kafka.service'; import { LoggerService } from '../common/services/logger.service'; @Injectable() @@ -13,22 +16,40 @@ export class KafkaHealthIndicator extends HealthIndicator { async isHealthy(key: string) { try { + const status = this.kafkaService.getKafkaStatus(); + const timestamp = new Date().toISOString(); + + if (status.state === KafkaConnectionState.failed) { + throw new HealthCheckError( + 'KafkaHealthCheck failed', + this.getStatus(key, false, { + state: status.state, + reconnectAttempts: status.reconnectAttempts, + reason: + status.reason || 'Kafka reconnection attempts exhausted', + timestamp, + }), + ); + } + const isConnected = await this.kafkaService.isConnected(); if (!isConnected) { throw new HealthCheckError( 'KafkaHealthCheck failed', - this.getStatus(key, false, { error: 'Kafka is not connected' }), + this.getStatus(key, false, { + state: status.state, + reconnectAttempts: status.reconnectAttempts, + reason: status.reason || 'Kafka is not connected', + timestamp, + }), ); } return this.getStatus(key, true, { - status: 'up', - timestamp: new Date().toISOString(), - details: { - producer: 'connected', - consumers: 'active', - }, + state: status.state, + reconnectAttempts: status.reconnectAttempts, + timestamp, }); } catch (error: unknown) { const err = error as Error; @@ -41,7 +62,7 @@ export class KafkaHealthIndicator extends HealthIndicator { throw new HealthCheckError( 'KafkaHealthCheck failed', this.getStatus(key, false, { - error: err.message, + reason: err.message, timestamp: new Date().toISOString(), }), ); diff --git a/src/kafka/kafka.service.ts b/src/kafka/kafka.service.ts index 197240a..1e1cc2d 100644 --- a/src/kafka/kafka.service.ts +++ b/src/kafka/kafka.service.ts @@ -30,6 +30,25 @@ type KafkaProducer = Producer; type KafkaConsumer = Consumer; type KafkaStream = MessagesStream; +export enum KafkaConnectionState { + initializing = 'initializing', + ready = 'ready', + reconnecting = 'reconnecting', + failed = 'failed', + disabled = 'disabled', +} + +export interface KafkaHealthStatus { + state: KafkaConnectionState; + reconnectAttempts: number; + reason?: string; +} + +interface ConsumerConfig { + topics: string[]; + onMessage: (message: unknown) => Promise; +} + @Injectable() export class KafkaService implements OnApplicationShutdown, OnModuleInit { private readonly logger = new LoggerService(KafkaService.name); @@ -38,10 +57,15 @@ export class KafkaService implements OnApplicationShutdown, OnModuleInit { resetTimeout: CONFIG.CIRCUIT_BREAKER.DEFAULT_RESET_TIMEOUT, }); private readonly kafkaConfig: IKafkaConfig; - private readonly producer: KafkaProducer; + private producer: KafkaProducer; private readonly consumers = new Map(); private readonly consumerStreams = new Map(); private readonly consumerLoops = new Map>(); + private readonly consumerConfigs = new Map(); + private kafkaState: KafkaConnectionState = KafkaConnectionState.initializing; + private kafkaFailureReason?: string; + private reconnectAttempts = 0; + private reconnectionTask?: Promise; private shuttingDown = false; constructor(private readonly configService: ConfigService) { @@ -78,6 +102,7 @@ export class KafkaService implements OnApplicationShutdown, OnModuleInit { 'Failed to initialize Kafka service', ); this.logger.error(err.message, { error: err.stack || err.message }); + this.kafkaState = KafkaConnectionState.failed; throw new KafkaConnectionException({ error: err.stack || err.message, }); @@ -88,12 +113,15 @@ export class KafkaService implements OnApplicationShutdown, OnModuleInit { try { await this.producer.metadata({ topics: [] }); this.logger.info('Kafka service initialized successfully'); + this.kafkaState = KafkaConnectionState.ready; + this.kafkaFailureReason = undefined; } catch (error) { const err = this.normalizeError( error, 'Failed to initialize Kafka producer metadata request', ); this.logger.error(err.message, { error: err.stack || err.message }); + this.kafkaState = KafkaConnectionState.failed; throw new KafkaConnectionException({ error: err.stack || err.message, }); @@ -188,23 +216,10 @@ export class KafkaService implements OnApplicationShutdown, OnModuleInit { topics: string[], onMessage: (message: unknown) => Promise, ): Promise { - try { - await this.circuitBreaker.execute(async () => { - const consumer = this.getOrCreateConsumer(groupId); - - if (this.consumerStreams.has(groupId)) { - await this.closeStream(groupId); - } - - const stream = await consumer.consume({ - topics, - autocommit: true, - }); + this.consumerConfigs.set(groupId, { topics, onMessage }); - this.consumerStreams.set(groupId, stream); - const loop = this.startConsumerLoop(groupId, topics, stream, onMessage); - this.consumerLoops.set(groupId, loop); - }); + try { + await this.startConsumerSession(groupId); } catch (error) { const err = this.normalizeError( error, @@ -215,6 +230,7 @@ export class KafkaService implements OnApplicationShutdown, OnModuleInit { topics, error: err.stack || err.message, }); + this.handleKafkaFailure(err.message, err); throw new KafkaConsumerException( `Failed to start consumer for group ${groupId}`, { error: err.stack || err.message }, @@ -222,11 +238,66 @@ export class KafkaService implements OnApplicationShutdown, OnModuleInit { } } + private async startConsumerSession(groupId: string): Promise { + const config = this.consumerConfigs.get(groupId); + + if (!config) { + this.logger.warn(`No consumer configuration found for group ${groupId}`); + return; + } + + await this.circuitBreaker.execute(async () => { + const consumer = this.getOrCreateConsumer(groupId); + + if (this.consumerStreams.has(groupId)) { + await this.closeStream(groupId); + } + + const stream = await consumer.consume({ + topics: config.topics, + autocommit: true, + }); + + stream.on('error', (error) => { + this.handleKafkaFailure( + `Kafka stream error for group ${groupId}`, + error, + ); + }); + + this.consumerStreams.set(groupId, stream); + const loop = this.startConsumerLoop( + groupId, + config.topics, + stream, + config.onMessage, + ); + this.consumerLoops.set(groupId, loop); + + this.kafkaState = KafkaConnectionState.ready; + this.kafkaFailureReason = undefined; + this.reconnectAttempts = 0; + }); + } + async onApplicationShutdown(signal?: string): Promise { this.logger.info('Starting Kafka graceful shutdown', { signal }); this.shuttingDown = true; try { + if (this.reconnectionTask) { + this.logger.info('Waiting for Kafka reconnection task to finish...'); + try { + await this.reconnectionTask; + } catch (error) { + const err = this.normalizeError( + error, + 'Kafka reconnection task failed during shutdown', + ); + this.logger.warn(err.message, { error: err.stack || err.message }); + } + } + this.logger.info('Closing consumer streams...'); await Promise.all( Array.from(this.consumerStreams.keys()).map((groupId) => @@ -281,17 +352,38 @@ export class KafkaService implements OnApplicationShutdown, OnModuleInit { this.consumerLoops.clear(); this.consumerStreams.clear(); this.consumers.clear(); + this.consumerConfigs.clear(); + this.kafkaState = KafkaConnectionState.disabled; + this.kafkaFailureReason = undefined; } } - isConnected(): Promise { + async isConnected(): Promise { + if ( + this.kafkaState === KafkaConnectionState.failed || + this.kafkaState === KafkaConnectionState.reconnecting || + this.kafkaState === KafkaConnectionState.initializing || + this.kafkaState === KafkaConnectionState.disabled + ) { + return false; + } + try { - const producerConnected = this.producer.isConnected(); + const producerConnected = this.producer?.isConnected?.() ?? false; const consumersConnected = Array.from(this.consumers.values()).every( (consumer) => consumer.isConnected(), ); - return Promise.resolve(producerConnected && consumersConnected); + const connected = producerConnected && consumersConnected; + + if (!connected && !this.shuttingDown) { + this.handleKafkaFailure( + 'Kafka connection verification failed', + new Error('Kafka producer or consumers are disconnected'), + ); + } + + return connected; } catch (error) { const err = this.normalizeError( error, @@ -301,10 +393,19 @@ export class KafkaService implements OnApplicationShutdown, OnModuleInit { error: err.stack || err.message, timestamp: new Date().toISOString(), }); - return Promise.resolve(false); + this.handleKafkaFailure('Kafka connection status error', err); + return false; } } + getKafkaStatus(): KafkaHealthStatus { + return { + state: this.kafkaState, + reconnectAttempts: this.reconnectAttempts, + reason: this.kafkaFailureReason, + }; + } + private createProducer(): KafkaProducer { return new Producer({ clientId: this.kafkaConfig.clientId, @@ -362,6 +463,14 @@ export class KafkaService implements OnApplicationShutdown, OnModuleInit { this.logger.error(`Kafka consumer ${groupId} broker failure`, { details, }); + const normalized = this.normalizeError( + details, + `Kafka consumer ${groupId} broker failure`, + ); + this.handleKafkaFailure( + `Kafka consumer ${groupId} broker failure`, + normalized, + ); }); this.consumers.set(groupId, consumer); @@ -442,6 +551,7 @@ export class KafkaService implements OnApplicationShutdown, OnModuleInit { topics, error: err.stack || err.message, }); + this.handleKafkaFailure('Kafka consumer loop error', err); } } finally { this.consumerStreams.delete(groupId); @@ -546,6 +656,166 @@ export class KafkaService implements OnApplicationShutdown, OnModuleInit { } } + private handleKafkaFailure(context: string, error: unknown): void { + if (this.shuttingDown) { + return; + } + + const err = + error instanceof Error + ? error + : this.normalizeError(error, context || 'Kafka failure'); + const trace = err.stack || err.message; + + this.logger.error(context, { + error: trace, + timestamp: new Date().toISOString(), + }); + + this.kafkaFailureReason = trace; + + if (this.kafkaState !== KafkaConnectionState.reconnecting) { + this.kafkaState = KafkaConnectionState.reconnecting; + } + + this.scheduleReconnect(); + } + + private scheduleReconnect(): void { + if (this.reconnectionTask || this.shuttingDown) { + return; + } + + this.reconnectionTask = this.performReconnect().finally(() => { + this.reconnectionTask = undefined; + }); + } + + private async performReconnect(): Promise { + if (this.shuttingDown) { + return; + } + + const maxAttempts = Math.max(this.kafkaConfig.retry.retries, 1); + + for (let attempt = 1; attempt <= maxAttempts; attempt += 1) { + if (this.shuttingDown) { + return; + } + + this.reconnectAttempts = attempt; + + try { + await this.restartConsumers(); + + this.kafkaState = KafkaConnectionState.ready; + this.kafkaFailureReason = undefined; + this.reconnectAttempts = 0; + + this.logger.info('Kafka consumers reconnected successfully', { + attempt, + timestamp: new Date().toISOString(), + }); + + return; + } catch (error) { + const err = this.normalizeError( + error, + 'Kafka reconnection attempt failed', + ); + this.kafkaFailureReason = err.stack || err.message; + this.logger.error(err.message, { + attempt, + maxAttempts, + error: err.stack || err.message, + }); + + if (attempt < maxAttempts && !this.shuttingDown) { + await this.wait(this.getRetryDelay(attempt)); + } + } + } + + if (!this.shuttingDown) { + this.kafkaState = KafkaConnectionState.failed; + this.logger.error('Kafka reconnection attempts exhausted', { + reason: this.kafkaFailureReason, + attempts: this.reconnectAttempts, + }); + } + } + + private async restartConsumers(): Promise { + const groupIds = Array.from(this.consumerConfigs.keys()); + + if (groupIds.length === 0) { + return; + } + + await Promise.all(groupIds.map((groupId) => this.closeConsumer(groupId))); + + for (const groupId of groupIds) { + await this.startConsumerSession(groupId); + } + } + + private async closeConsumer(groupId: string): Promise { + await this.closeStream(groupId).catch((error) => { + const err = this.normalizeError( + error, + `Failed to close Kafka stream for group ${groupId}`, + ); + this.logger.warn(err.message, { error: err.stack || err.message }); + }); + + const loop = this.consumerLoops.get(groupId); + if (loop) { + try { + await loop; + } catch (error) { + const err = this.normalizeError( + error, + `Kafka consumer loop rejected for group ${groupId}`, + ); + this.logger.warn(err.message, { error: err.stack || err.message }); + } finally { + this.consumerLoops.delete(groupId); + } + } + + const consumer = this.consumers.get(groupId); + if (consumer) { + try { + await consumer.close(); + } catch (error) { + const err = this.normalizeError( + error, + `Failed to close Kafka consumer ${groupId}`, + ); + this.logger.warn(err.message, { error: err.stack || err.message }); + } finally { + this.consumers.delete(groupId); + } + } + } + + private async wait(delayMs: number): Promise { + if (delayMs <= 0) { + return; + } + + await new Promise((resolve) => setTimeout(resolve, delayMs)); + } + + private getRetryDelay(attempt: number): number { + const baseDelay = this.kafkaConfig.retry.initialRetryTime; + const maxDelay = this.kafkaConfig.retry.maxRetryTime; + const exponent = Math.max(attempt - 1, 0); + const calculatedDelay = baseDelay * Math.pow(2, exponent); + + return Math.min(calculatedDelay, maxDelay); + } + private normalizeError(error: unknown, fallbackMessage: string): Error { if (error instanceof Error) { return error;