Skip to content

Commit b5920db

Browse files
committed
refactor: RetryTracker 제거 및 Kafka 기본 재시도 메커니즘 활용
- ConcurrentHashMap 기반 수동 재시도 카운터 제거 (메모리 누수 위험 해소) - DlqMessage retryCount 필드 제거 - DlqPublisher.publishToDlq() retryCount 파라미터 제거 - ProductMetricsConsumer에서 RetryTracker 의존성 제거 - 에러 처리 단순화: 복구 불가능한 에러는 즉시 DLQ, 일시적 에러는 Kafka 자동 재시도 - 관련 테스트 코드 업데이트
1 parent e40c4cd commit b5920db

6 files changed

Lines changed: 12 additions & 243 deletions

File tree

apps/commerce-streamer/src/main/java/com/loopers/domain/dlq/DlqMessage.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -80,12 +80,6 @@ public class DlqMessage {
8080
@Column(name = "failed_at", nullable = false)
8181
private ZonedDateTime failedAt;
8282

83-
/**
84-
* 재처리 시도 횟수
85-
*/
86-
@Column(name = "retry_count", nullable = false)
87-
private int retryCount = 0;
88-
8983
/**
9084
* 재처리 완료 여부
9185
*/
@@ -101,8 +95,7 @@ public static DlqMessage create(
10195
Long originalOffset,
10296
String messageKey,
10397
String messageValue,
104-
Exception exception,
105-
int retryCount
98+
Exception exception
10699
) {
107100
DlqMessage dlqMessage = new DlqMessage();
108101
dlqMessage.originalTopic = originalTopic;
@@ -114,7 +107,6 @@ public static DlqMessage create(
114107
dlqMessage.errorMessage = exception.getMessage();
115108
dlqMessage.stackTrace = getStackTraceAsString(exception);
116109
dlqMessage.failedAt = ZonedDateTime.now();
117-
dlqMessage.retryCount = retryCount;
118110
dlqMessage.resolved = false;
119111
return dlqMessage;
120112
}

apps/commerce-streamer/src/main/java/com/loopers/infrastructure/kafka/DlqPublisher.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,19 +25,17 @@ public class DlqPublisher {
2525
*
2626
* @param record 실패한 Kafka 메시지
2727
* @param exception 발생한 예외
28-
* @param retryCount 재시도 횟수
2928
*/
3029
@Transactional
31-
public void publishToDlq(ConsumerRecord<String, String> record, Exception exception, int retryCount) {
30+
public void publishToDlq(ConsumerRecord<String, String> record, Exception exception) {
3231
try {
3332
DlqMessage dlqMessage = DlqMessage.create(
3433
record.topic(),
3534
record.partition(),
3635
record.offset(),
3736
record.key(),
3837
record.value(),
39-
exception,
40-
retryCount
38+
exception
4139
);
4240

4341
dlqMessageRepository.save(dlqMessage);

apps/commerce-streamer/src/main/java/com/loopers/infrastructure/kafka/RetryTracker.java

Lines changed: 0 additions & 86 deletions
This file was deleted.

apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/ProductMetricsConsumer.java

Lines changed: 7 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import com.loopers.application.metrics.MetricsAggregationService;
1010
import com.loopers.confg.kafka.KafkaConfig;
1111
import com.loopers.infrastructure.kafka.DlqPublisher;
12-
import com.loopers.infrastructure.kafka.RetryTracker;
1312
import com.loopers.infrastructure.kafka.event.KafkaEventEnvelope;
1413
import lombok.RequiredArgsConstructor;
1514
import lombok.extern.slf4j.Slf4j;
@@ -26,6 +25,7 @@
2625
* - Product 관련 이벤트를 수신하여 메트릭 집계
2726
* - Manual Ack: 처리 성공 후에만 offset commit
2827
* - Batch Listener: 한 번에 여러 메시지 처리
28+
* - 에러 처리: 복구 불가능한 에러는 DLQ로 전송, 일시적 에러는 Kafka 재시도
2929
*/
3030
@Slf4j
3131
@Component
@@ -35,7 +35,6 @@ public class ProductMetricsConsumer {
3535
private final MetricsAggregationService aggregationService;
3636
private final ObjectMapper objectMapper;
3737
private final DlqPublisher dlqPublisher;
38-
private final RetryTracker retryTracker;
3938

4039
/**
4140
* 상품 좋아요 이벤트 Consumer
@@ -66,9 +65,6 @@ public void consumeProductLiked(
6665
envelope.payload()
6766
);
6867

69-
// 성공 시 재시도 카운터 제거
70-
retryTracker.clearRetryCount(record.topic(), record.partition(), record.offset());
71-
7268
} catch (Exception e) {
7369
log.error("[Consumer] Failed to process product-liked event - offset: {}, key: {}",
7470
record.offset(), record.key(), e);
@@ -117,8 +113,6 @@ public void consumeProductUnliked(
117113
envelope.payload()
118114
);
119115

120-
retryTracker.clearRetryCount(record.topic(), record.partition(), record.offset());
121-
122116
} catch (Exception e) {
123117
log.error("[Consumer] Failed to process product-unliked event - offset: {}, key: {}",
124118
record.offset(), record.key(), e);
@@ -166,8 +160,6 @@ public void consumeOrderCompleted(
166160
envelope.payload()
167161
);
168162

169-
retryTracker.clearRetryCount(record.topic(), record.partition(), record.offset());
170-
171163
} catch (Exception e) {
172164
log.error("[Consumer] Failed to process order-completed event - offset: {}, key: {}",
173165
record.offset(), record.key(), e);
@@ -215,8 +207,6 @@ public void consumeProductViewed(
215207
envelope.payload()
216208
);
217209

218-
retryTracker.clearRetryCount(record.topic(), record.partition(), record.offset());
219-
220210
} catch (Exception e) {
221211
log.error("[Consumer] Failed to process product-viewed event - offset: {}, key: {}",
222212
record.offset(), record.key(), e);
@@ -237,28 +227,20 @@ public void consumeProductViewed(
237227

238228
/**
239229
* 실패한 레코드 처리 공통 메서드
230+
* - 복구 불가능한 에러: DLQ로 전송
231+
* - 일시적 에러: 배치 재처리 (Kafka가 자동으로 재시도)
240232
*/
241233
private void handleFailedRecord(
242234
ConsumerRecord<String, String> record,
243235
Exception exception,
244236
List<ConsumerRecord<String, String>> failedRecords
245237
) {
246-
// DLQ 전송 여부 결정
247238
if (dlqPublisher.shouldSendToDlq(exception)) {
248-
// 복구 불가능한 에러 → DLQ로 전송
249-
int retryCount = retryTracker.getRetryCount(record.topic(), record.partition(), record.offset());
250-
dlqPublisher.publishToDlq(record, exception, retryCount);
251-
retryTracker.clearRetryCount(record.topic(), record.partition(), record.offset());
239+
// 복구 불가능한 에러 → 즉시 DLQ로 전송
240+
dlqPublisher.publishToDlq(record, exception);
252241
} else {
253-
// 일시적 에러 → 재시도 가능 여부 확인
254-
if (retryTracker.canRetry(record.topic(), record.partition(), record.offset())) {
255-
failedRecords.add(record);
256-
} else {
257-
// 최대 재시도 횟수 초과 → DLQ로 전송
258-
int retryCount = retryTracker.getRetryCount(record.topic(), record.partition(), record.offset());
259-
dlqPublisher.publishToDlq(record, exception, retryCount);
260-
retryTracker.clearRetryCount(record.topic(), record.partition(), record.offset());
261-
}
242+
// 일시적 에러 → 재시도 대상에 추가
243+
failedRecords.add(record);
262244
}
263245
}
264246
}

apps/commerce-streamer/src/test/java/com/loopers/infrastructure/kafka/DlqPublisherTest.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,10 +78,9 @@ void shouldSaveFailedMessageToDlq() {
7878
"{\"invalid\":\"json}"
7979
);
8080
JsonProcessingException exception = new JsonProcessingException("Parse error") {};
81-
int retryCount = 3;
8281

8382
// when
84-
dlqPublisher.publishToDlq(record, exception, retryCount);
83+
dlqPublisher.publishToDlq(record, exception);
8584

8685
// then
8786
DlqMessage savedMessage = dlqMessageRepository.findAll().get(0);
@@ -91,7 +90,6 @@ void shouldSaveFailedMessageToDlq() {
9190
assertThat(savedMessage.getMessageKey()).isEqualTo("product-123");
9291
assertThat(savedMessage.getMessageValue()).isEqualTo("{\"invalid\":\"json}");
9392
assertThat(savedMessage.getErrorType()).isEqualTo("JsonProcessingException");
94-
assertThat(savedMessage.getRetryCount()).isEqualTo(3);
9593
assertThat(savedMessage.isResolved()).isFalse();
9694
}
9795

@@ -109,7 +107,7 @@ void shouldIncludeStackTrace() {
109107
Exception exception = new RuntimeException("Test error");
110108

111109
// when
112-
dlqPublisher.publishToDlq(record, exception, 1);
110+
dlqPublisher.publishToDlq(record, exception);
113111

114112
// then
115113
DlqMessage savedMessage = dlqMessageRepository.findAll().get(0);

apps/commerce-streamer/src/test/java/com/loopers/infrastructure/kafka/RetryTrackerTest.java

Lines changed: 0 additions & 115 deletions
This file was deleted.

0 commit comments

Comments
 (0)