Skip to content

Commit 72d81d9

Browse files
committed
feat: Outbox 이벤트 실패 처리 및 재발행 로직 추가
- `OutboxEvent`에 `OutboxStatus` 및 `retryCount` 필드 추가로 상태 관리 지원. - 실패 이벤트를 처리하기 위한 `markFailed` 메서드 구현. - `OutboxRelay` 컴포넌트 추가로 실패 이벤트 재발행 스케줄링 및 처리 로직 구현. - 관련 서비스 및 핸들러에 실패 상태 업데이트 로직 통합 (`markFailed` 호출 추가).
1 parent 47844a0 commit 72d81d9

9 files changed

Lines changed: 85 additions & 8 deletions

File tree

apps/commerce-api/src/main/java/com/loopers/application/event/DeadLetterQueueProcessor.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,13 @@
44
import com.loopers.domain.event.DomainEvent;
55
import com.loopers.domain.event.FailedEvent;
66
import com.loopers.infrastructure.event.FailedEventRepository;
7+
import java.util.List;
78
import lombok.RequiredArgsConstructor;
8-
import lombok.extern.slf4j.Slf4j;
99
import org.springframework.context.ApplicationEventPublisher;
1010
import org.springframework.scheduling.annotation.Scheduled;
1111
import org.springframework.stereotype.Component;
1212
import org.springframework.transaction.annotation.Transactional;
1313

14-
import java.util.List;
15-
1614
@Component
1715
@RequiredArgsConstructor
1816
public class DeadLetterQueueProcessor {
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package com.loopers.application.event;
2+
3+
import com.fasterxml.jackson.databind.ObjectMapper;
4+
import com.loopers.domain.event.OutboxEvent;
5+
import com.loopers.domain.event.OutboxStatus;
6+
import com.loopers.infrastructure.event.OutboxRepository;
7+
import java.util.List;
8+
import lombok.RequiredArgsConstructor;
9+
import lombok.extern.slf4j.Slf4j;
10+
import org.springframework.context.ApplicationEventPublisher;
11+
import org.springframework.scheduling.annotation.Scheduled;
12+
import org.springframework.stereotype.Component;
13+
import org.springframework.transaction.annotation.Transactional;
14+
15+
@Component
16+
@RequiredArgsConstructor
17+
@Slf4j
18+
public class OutboxRelay {
19+
20+
private final OutboxRepository outboxRepository;
21+
private final ApplicationEventPublisher eventPublisher;
22+
private final ObjectMapper objectMapper;
23+
24+
@Scheduled(fixedDelay = 60000)
25+
@Transactional
26+
public void resendPendingEvents() {
27+
List<OutboxEvent> pendingEvents = outboxRepository.findTop10ByStatusInAndRetryCountLessThanOrderByCreatedAtAsc(
28+
List.of(OutboxStatus.INIT, OutboxStatus.FAILED), 5
29+
);
30+
31+
if (pendingEvents.isEmpty()) return;
32+
33+
for (OutboxEvent outbox : pendingEvents) {
34+
try {
35+
Class<?> eventClass = Class.forName(outbox.getEventType());
36+
Object originalEvent = objectMapper.readValue(outbox.getPayload(), eventClass);
37+
38+
eventPublisher.publishEvent(originalEvent);
39+
40+
log.info("[Outbox Relay] 이벤트 재발행 성공: {}", outbox.getEventId());
41+
outbox.markPublished();
42+
43+
} catch (Exception e) {
44+
log.error("[Outbox Relay] 재발행 실패: {}", outbox.getEventId());
45+
outbox.markFailed();
46+
}
47+
}
48+
}
49+
}

apps/commerce-api/src/main/java/com/loopers/application/like/event/LikeEventOutboxHandler.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,15 @@
22

33
import com.loopers.domain.event.OutboxService;
44
import lombok.RequiredArgsConstructor;
5-
import lombok.extern.slf4j.Slf4j;
65
import org.springframework.kafka.core.KafkaTemplate;
76
import org.springframework.stereotype.Component;
87
import org.springframework.transaction.event.TransactionPhase;
98
import org.springframework.transaction.event.TransactionalEventListener;
109

1110
@Component
1211
@RequiredArgsConstructor
13-
@Slf4j
1412
public class LikeEventOutboxHandler {
13+
1514
private final KafkaTemplate<Object, Object> kafkaTemplate;
1615
private final OutboxService outboxService;
1716

@@ -24,7 +23,7 @@ public void handle(LikeCreatedEvent event) {
2423
if (ex == null) {
2524
outboxService.markPublished(event.eventId());
2625
} else {
27-
log.error("카프카 전송 실패: {}", ex.getMessage());
26+
outboxService.markFailed(event.eventId());
2827
}
2928
});
3029
}

apps/commerce-api/src/main/java/com/loopers/application/order/event/OrderEventOutboxHandler.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ public void handle(OrderCreatedEvent event) {
1919
.whenComplete((result, ex) -> {
2020
if (ex == null) {
2121
outboxService.markPublished(event.eventId());
22+
} else {
23+
outboxService.markFailed(event.eventId());
2224
}
2325
});
2426
}

apps/commerce-api/src/main/java/com/loopers/application/product/event/ProductEventOutboxHandler.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ public void handle(ProductViewEvent event) {
2525
.whenComplete((result, ex) -> {
2626
if (ex == null) {
2727
outboxService.markPublished(event.eventId());
28+
} else {
29+
outboxService.markFailed(event.eventId());
2830
}
2931
});
3032
}

apps/commerce-api/src/main/java/com/loopers/domain/event/OutboxEvent.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import jakarta.persistence.Column;
44
import jakarta.persistence.Entity;
5+
import jakarta.persistence.EnumType;
6+
import jakarta.persistence.Enumerated;
57
import jakarta.persistence.GeneratedValue;
68
import jakarta.persistence.GenerationType;
79
import jakarta.persistence.Id;
@@ -27,7 +29,10 @@ public class OutboxEvent {
2729
@Column(columnDefinition = "TEXT")
2830
private String payload;
2931

30-
private boolean published = false;
32+
@Enumerated(EnumType.STRING)
33+
private OutboxStatus status = OutboxStatus.INIT;
34+
35+
private int retryCount = 0;
3136
private LocalDateTime createdAt = LocalDateTime.now();
3237

3338
public OutboxEvent(String eventId, String aggregateType, String aggregateId, String eventType, String payload) {
@@ -39,6 +44,11 @@ public OutboxEvent(String eventId, String aggregateType, String aggregateId, Str
3944
}
4045

4146
public void markPublished() {
42-
this.published = true;
47+
this.status = OutboxStatus.PUBLISHED;
48+
}
49+
50+
public void markFailed() {
51+
this.status = OutboxStatus.FAILED;
52+
this.retryCount++;
4353
}
4454
}

apps/commerce-api/src/main/java/com/loopers/domain/event/OutboxService.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,4 +40,11 @@ public void markPublished(String eventId) {
4040
outboxRepository.save(event);
4141
});
4242
}
43+
44+
public void markFailed(String eventId) {
45+
outboxRepository.findByEventId(eventId).ifPresent(event -> {
46+
event.markFailed();
47+
outboxRepository.save(event);
48+
});
49+
}
4350
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package com.loopers.domain.event;
2+
3+
public enum OutboxStatus {
4+
INIT, PUBLISHED, FAILED
5+
}

apps/commerce-api/src/main/java/com/loopers/infrastructure/event/OutboxRepository.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.loopers.infrastructure.event;
22

33
import com.loopers.domain.event.OutboxEvent;
4+
import com.loopers.domain.event.OutboxStatus;
45
import java.util.List;
56
import java.util.Optional;
67
import org.springframework.data.jpa.repository.JpaRepository;
@@ -11,4 +12,8 @@ public interface OutboxRepository extends JpaRepository<OutboxEvent, Long> {
1112

1213
Optional<OutboxEvent> findByEventId(String eventId);
1314

15+
List<OutboxEvent> findTop10ByStatusInAndRetryCountLessThanOrderByCreatedAtAsc(
16+
List<OutboxStatus> statuses,
17+
int retryCount
18+
);
1419
}

0 commit comments

Comments
 (0)