-
Notifications
You must be signed in to change notification settings - Fork 3
Prod - Kafka resiliency and better dropped connection handling #21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,6 +23,8 @@ import { | |
| PhaseTransitionPayload, | ||
| } from '../interfaces/autopilot.interface'; | ||
| import { FinanceApiService } from '../../finance/finance-api.service'; | ||
| import { First2FinishService } from './first2finish.service'; | ||
| import { ITERATIVE_REVIEW_PHASE_NAME } from '../constants/review.constants'; | ||
|
|
||
| type MockedMethod<T extends (...args: any[]) => any> = jest.Mock< | ||
| ReturnType<T>, | ||
|
|
@@ -99,6 +101,7 @@ describe('SchedulerService (review phase deferral)', () => { | |
| let resourcesService: jest.Mocked<ResourcesService>; | ||
| let phaseChangeNotificationService: jest.Mocked<PhaseChangeNotificationService>; | ||
| let configService: jest.Mocked<ConfigService>; | ||
| let first2FinishService: jest.Mocked<First2FinishService>; | ||
|
|
||
| beforeEach(() => { | ||
| kafkaService = { | ||
|
|
@@ -182,6 +185,10 @@ describe('SchedulerService (review phase deferral)', () => { | |
| get: jest.fn().mockReturnValue(undefined), | ||
| } as unknown as jest.Mocked<ConfigService>; | ||
|
|
||
| first2FinishService = { | ||
| handleIterativePhaseClosed: jest.fn().mockResolvedValue(undefined), | ||
| } as unknown as jest.Mocked<First2FinishService>; | ||
|
|
||
| scheduler = new SchedulerService( | ||
| kafkaService as unknown as KafkaService, | ||
| challengeApiService as unknown as ChallengeApiService, | ||
|
|
@@ -192,6 +199,7 @@ describe('SchedulerService (review phase deferral)', () => { | |
| resourcesService, | ||
| phaseChangeNotificationService, | ||
| configService, | ||
| first2FinishService, | ||
| ); | ||
| }); | ||
|
|
||
|
|
@@ -274,6 +282,32 @@ describe('SchedulerService (review phase deferral)', () => { | |
| ); | ||
| }); | ||
|
|
||
| it('refreshes submissions when iterative review closes', async () => { | ||
| const payload = createPayload({ | ||
| phaseTypeName: ITERATIVE_REVIEW_PHASE_NAME, | ||
| }); | ||
| const phaseDetails = createPhase({ | ||
| id: payload.phaseId, | ||
| phaseId: payload.phaseId, | ||
| name: ITERATIVE_REVIEW_PHASE_NAME, | ||
| isOpen: true, | ||
| }); | ||
|
|
||
| challengeApiService.getPhaseDetails.mockResolvedValue(phaseDetails); | ||
| reviewService.getPendingReviewCount.mockResolvedValue(0); | ||
| challengeApiService.advancePhase.mockResolvedValue({ | ||
| success: true, | ||
| message: 'closed iterative review', | ||
| updatedPhases: [], | ||
| }); | ||
|
|
||
| await scheduler.advancePhase(payload); | ||
|
|
||
| expect( | ||
| first2FinishService.handleIterativePhaseClosed, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [ |
||
| ).toHaveBeenCalledWith(payload.challengeId); | ||
| }); | ||
|
|
||
| it('assigns checkpoint winners after closing checkpoint review', async () => { | ||
| const payload = createPayload({ | ||
| phaseId: 'checkpoint-phase', | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,6 +27,7 @@ import { | |
| APPROVAL_PHASE_NAMES, | ||
| SUBMISSION_PHASE_NAME, | ||
| TOPGEAR_SUBMISSION_PHASE_NAME, | ||
| ITERATIVE_REVIEW_PHASE_NAME, | ||
| getRoleNamesForPhase, | ||
| isPostMortemPhaseName, | ||
| } from '../constants/review.constants'; | ||
|
|
@@ -45,6 +46,7 @@ import { | |
| getMemberReviewerConfigs, | ||
| getReviewerConfigsForPhase, | ||
| } from '../utils/reviewer.utils'; | ||
| import { First2FinishService } from './first2finish.service'; | ||
|
|
||
| const PHASE_QUEUE_NAME = 'autopilot-phase-transitions'; | ||
| const PHASE_QUEUE_PREFIX = '{autopilot-phase-transitions}'; | ||
|
|
@@ -108,6 +110,7 @@ export class SchedulerService implements OnModuleInit, OnModuleDestroy { | |
| private readonly resourcesService: ResourcesService, | ||
| private readonly phaseChangeNotificationService: PhaseChangeNotificationService, | ||
| private readonly configService: ConfigService, | ||
| private readonly first2FinishService: First2FinishService, | ||
| ) { | ||
| this.submitterRoles = getNormalizedStringArray( | ||
| this.configService.get('autopilot.submitterRoles'), | ||
|
|
@@ -861,6 +864,20 @@ export class SchedulerService implements OnModuleInit, OnModuleDestroy { | |
| ); | ||
| } | ||
|
|
||
| if (operation === 'close' && phaseName === ITERATIVE_REVIEW_PHASE_NAME) { | ||
| try { | ||
| await this.first2FinishService.handleIterativePhaseClosed( | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [ |
||
| data.challengeId, | ||
| ); | ||
| } catch (error) { | ||
| const err = error as Error; | ||
| this.logger.error( | ||
| `Failed to refresh iterative submissions for challenge ${data.challengeId} after closing phase ${data.phaseId}: ${err.message}`, | ||
| err.stack, | ||
| ); | ||
| } | ||
| } | ||
|
|
||
| if (operation === 'open') { | ||
| try { | ||
| await this.phaseReviewService.handlePhaseOpened( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,113 @@ | ||
| import { HealthCheckError } from '@nestjs/terminus'; | ||
| import { KafkaHealthIndicator } from './kafka.health'; | ||
| import { | ||
| KafkaConnectionState, | ||
| KafkaService, | ||
| } from '../kafka/kafka.service'; | ||
|
|
||
| jest.mock('../kafka/kafka.service', () => { | ||
| const KafkaConnectionStateMock = { | ||
| initializing: 'initializing', | ||
| ready: 'ready', | ||
| reconnecting: 'reconnecting', | ||
| failed: 'failed', | ||
| disabled: 'disabled', | ||
| } as const; | ||
|
|
||
| class KafkaServiceMock { | ||
| isConnected = jest.fn(); | ||
| getKafkaStatus = jest.fn(); | ||
| } | ||
|
|
||
| return { | ||
| KafkaConnectionState: KafkaConnectionStateMock, | ||
| KafkaService: KafkaServiceMock, | ||
| }; | ||
| }); | ||
|
|
||
| jest.mock('@platformatic/kafka', () => { | ||
| class MockConsumer { | ||
| consume = jest.fn(); | ||
| isConnected = jest.fn().mockReturnValue(true); | ||
| close = jest.fn(); | ||
| on = jest.fn(); | ||
| } | ||
|
|
||
| class MockProducer { | ||
| metadata = jest.fn(); | ||
| send = jest.fn(); | ||
| close = jest.fn(); | ||
| isConnected = jest.fn().mockReturnValue(true); | ||
| } | ||
|
|
||
| return { | ||
| Consumer: MockConsumer, | ||
| Producer: MockProducer, | ||
| MessagesStream: class {}, | ||
| ProduceAcks: { ALL: 'all' }, | ||
| jsonDeserializer: jest.fn(), | ||
| jsonSerializer: jest.fn(), | ||
| stringDeserializer: jest.fn(), | ||
| stringSerializer: jest.fn(), | ||
| }; | ||
| }); | ||
|
|
||
| describe('KafkaHealthIndicator', () => { | ||
| let kafkaService: jest.Mocked< | ||
| Pick<KafkaService, 'isConnected' | 'getKafkaStatus'> | ||
| >; | ||
| let indicator: KafkaHealthIndicator; | ||
|
|
||
| beforeEach(() => { | ||
| kafkaService = { | ||
| isConnected: jest.fn(), | ||
| getKafkaStatus: jest.fn(), | ||
| } as unknown as jest.Mocked< | ||
| Pick<KafkaService, 'isConnected' | 'getKafkaStatus'> | ||
| >; | ||
|
|
||
| indicator = new KafkaHealthIndicator( | ||
| kafkaService as unknown as KafkaService, | ||
| ); | ||
| }); | ||
|
|
||
| it('returns a healthy status when Kafka is connected', async () => { | ||
| kafkaService.getKafkaStatus.mockReturnValue({ | ||
| state: KafkaConnectionState.ready, | ||
| reconnectAttempts: 0, | ||
| }); | ||
| kafkaService.isConnected.mockResolvedValue(true); | ||
|
|
||
| const result = await indicator.isHealthy('kafka'); | ||
|
|
||
| expect(result.kafka.status).toBe('up'); | ||
| expect(result.kafka.state).toBe(KafkaConnectionState.ready); | ||
| expect(result.kafka.reconnectAttempts).toBe(0); | ||
| }); | ||
|
|
||
| it('throws when Kafka state is failed', async () => { | ||
| kafkaService.getKafkaStatus.mockReturnValue({ | ||
| state: KafkaConnectionState.failed, | ||
| reconnectAttempts: 3, | ||
| reason: 'Kafka reconnection attempts exhausted', | ||
| }); | ||
|
|
||
| await expect(indicator.isHealthy('kafka')).rejects.toBeInstanceOf( | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [💡 |
||
| HealthCheckError, | ||
| ); | ||
| expect(kafkaService.isConnected).not.toHaveBeenCalled(); | ||
| }); | ||
|
|
||
| it('throws when Kafka connections are not ready', async () => { | ||
| kafkaService.getKafkaStatus.mockReturnValue({ | ||
| state: KafkaConnectionState.ready, | ||
| reconnectAttempts: 1, | ||
| reason: 'Kafka is not connected', | ||
| }); | ||
| kafkaService.isConnected.mockResolvedValue(false); | ||
|
|
||
| await expect(indicator.isHealthy('kafka')).rejects.toBeInstanceOf( | ||
| HealthCheckError, | ||
| ); | ||
| }); | ||
| }); | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,9 @@ | ||
| import { Injectable } from '@nestjs/common'; | ||
| import { HealthIndicator, HealthCheckError } from '@nestjs/terminus'; | ||
| import { KafkaService } from '../kafka/kafka.service'; | ||
| import { | ||
| KafkaConnectionState, | ||
| KafkaService, | ||
| } from '../kafka/kafka.service'; | ||
| import { LoggerService } from '../common/services/logger.service'; | ||
|
|
||
| @Injectable() | ||
|
|
@@ -13,22 +16,40 @@ export class KafkaHealthIndicator extends HealthIndicator { | |
|
|
||
| async isHealthy(key: string) { | ||
| try { | ||
| const status = this.kafkaService.getKafkaStatus(); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [ |
||
| const timestamp = new Date().toISOString(); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [💡 |
||
|
|
||
| if (status.state === KafkaConnectionState.failed) { | ||
| throw new HealthCheckError( | ||
| 'KafkaHealthCheck failed', | ||
| this.getStatus(key, false, { | ||
| state: status.state, | ||
| reconnectAttempts: status.reconnectAttempts, | ||
| reason: | ||
| status.reason || 'Kafka reconnection attempts exhausted', | ||
| timestamp, | ||
| }), | ||
| ); | ||
| } | ||
|
|
||
| const isConnected = await this.kafkaService.isConnected(); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [💡 |
||
|
|
||
| if (!isConnected) { | ||
| throw new HealthCheckError( | ||
| 'KafkaHealthCheck failed', | ||
| this.getStatus(key, false, { error: 'Kafka is not connected' }), | ||
| this.getStatus(key, false, { | ||
| state: status.state, | ||
| reconnectAttempts: status.reconnectAttempts, | ||
| reason: status.reason || 'Kafka is not connected', | ||
| timestamp, | ||
| }), | ||
| ); | ||
| } | ||
|
|
||
| return this.getStatus(key, true, { | ||
| status: 'up', | ||
| timestamp: new Date().toISOString(), | ||
| details: { | ||
| producer: 'connected', | ||
| consumers: 'active', | ||
| }, | ||
| state: status.state, | ||
| reconnectAttempts: status.reconnectAttempts, | ||
| timestamp, | ||
| }); | ||
| } catch (error: unknown) { | ||
| const err = error as Error; | ||
|
|
@@ -41,7 +62,7 @@ export class KafkaHealthIndicator extends HealthIndicator { | |
| throw new HealthCheckError( | ||
| 'KafkaHealthCheck failed', | ||
| this.getStatus(key, false, { | ||
| error: err.message, | ||
| reason: err.message, | ||
| timestamp: new Date().toISOString(), | ||
| }), | ||
| ); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[💡
maintainability]Consider adding more context to the error logging. While the error message and stack trace are logged, additional context such as the function name or parameters could aid in debugging.