Skip to content

Commit 3af88f9

Browse files
authored
Feature/kafka (#36) (#191)
* chore:kafka producer 설정 * chore: kafka 토픽 자동 생성 설정 추가 * feat: kafka event publisher, comsumer 추가 * test: 집계 도메인 단위 테스트 코드 추가 * feat: 집계 도메인 domain 레이어 구현 * feat: 집계 도메인 infra 레이어 구현 * chore: kafka 토픽 자동 생성 설정 추가 * chore: kafka 빌드 의존성 추가 * test: 집계 통합 테스트 추가 * feat: 집계 서비스 로직 구현 * test: kafka consumer 테스트 코드 추가 * feat: kafka comsumer 구현 * outbox 패턴 적용위해 기존 kafka 설정 삭제 * test: outboxevent 단위 테스트 추가 * feat: outbox 도메인 구현 * feat: outbox infrastructure repository구현 * metric 오타 수정 * refactor: consumer 관련 로직들은 commerce-streamer 모듈로 이동 * test: outbox 테스트 코드 추가 * test: outbox 구현 * outbox event listener 구현 * feat: 상품 조회 이벤트 추가 * feat: 상품 조회시 이벤트 발행 * chore: kafka 설정 수정 * fix: outbox 처리되지 않는 오류 수정 * chore: 테스트 코드 실행시 kafka 사용할 수 있도록 test container 설정 추가 * test: offset.reset: latest 설정이 제대로 적용되는지 확인하는 테스트 코드 추가 * test: kafka 파티션 키 설정에 대한 테스트 코드 추가 * chore: commerce-api 테스트 환경에서 카프카 사용하도록 설ㄹ정 * test: event id 기준으로 한 번만 publish, consume하는 것을 검증하는 테스트 코드 추가 * chore: 충돌 발생한 테스트 코드 수정 * feat: event id 기준 1회 처리되도록 로직 구현 * test: 버전 기준으로 최신 이벤트만 처리하도록 테스트 코드 수정 * feat: version 기준으로 최신 이벤트만 처리하도록 함 * test: 중복 메시지 재전송 시 한 번만 처리되는지 검증하는 테스트 코드 추가 * feat: kafka 이벤트 publish 할 때 콜백 사용하여 이벤트 유실 방지 * feat: kafka메시지 헤더에 event type 추가 * feat: 버전 조회와 저장 사이의 경쟁 조건 가능성 해결 * feat: 신규 상품 등록시 event 발행에서 발생하는 경합 문제 수정
1 parent fb769b2 commit 3af88f9

41 files changed

Lines changed: 4075 additions & 8 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

apps/commerce-api/build.gradle.kts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ dependencies {
22
// add-ons
33
implementation(project(":modules:jpa"))
44
implementation(project(":modules:redis"))
5+
implementation(project(":modules:kafka"))
56
implementation(project(":supports:jackson"))
67
implementation(project(":supports:logging"))
78
implementation(project(":supports:monitoring"))
@@ -34,4 +35,5 @@ dependencies {
3435
// test-fixtures
3536
testImplementation(testFixtures(project(":modules:jpa")))
3637
testImplementation(testFixtures(project(":modules:redis")))
38+
testImplementation(testFixtures(project(":modules:kafka")))
3739
}

apps/commerce-api/src/main/java/com/loopers/application/catalog/CatalogFacade.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,13 @@
66
import com.loopers.domain.brand.Brand;
77
import com.loopers.domain.product.Product;
88
import com.loopers.domain.product.ProductDetail;
9+
import com.loopers.domain.product.ProductEvent;
10+
import com.loopers.domain.product.ProductEventPublisher;
911
import com.loopers.support.error.CoreException;
1012
import com.loopers.support.error.ErrorType;
1113
import lombok.RequiredArgsConstructor;
1214
import org.springframework.stereotype.Component;
15+
import org.springframework.transaction.annotation.Transactional;
1316

1417
import java.util.List;
1518
import java.util.Map;
@@ -30,6 +33,7 @@ public class CatalogFacade {
3033
private final BrandService brandService;
3134
private final ProductService productService;
3235
private final ProductCacheService productCacheService;
36+
private final ProductEventPublisher productEventPublisher;
3337

3438
/**
3539
* 상품 목록을 조회합니다.
@@ -103,16 +107,20 @@ public ProductInfoList getProducts(Long brandId, String sort, int page, int size
103107
* 상품 정보를 조회합니다.
104108
* <p>
105109
* Redis 캐시를 먼저 확인하고, 캐시에 없으면 DB에서 조회한 후 캐시에 저장합니다.
110+
* 상품 조회 시 ProductViewed 이벤트를 발행하여 메트릭 집계에 사용합니다.
106111
* </p>
107112
*
108113
* @param productId 상품 ID
109114
* @return 상품 정보와 좋아요 수
110115
* @throws CoreException 상품을 찾을 수 없는 경우
111116
*/
117+
@Transactional(readOnly = true)
112118
public ProductInfo getProduct(Long productId) {
113119
// 캐시에서 조회 시도
114120
ProductInfo cachedResult = productCacheService.getCachedProduct(productId);
115121
if (cachedResult != null) {
122+
// 캐시 히트 시에도 조회 수 집계를 위해 이벤트 발행
123+
productEventPublisher.publish(ProductEvent.ProductViewed.from(productId));
116124
return cachedResult;
117125
}
118126

@@ -133,6 +141,9 @@ public ProductInfo getProduct(Long productId) {
133141
// 캐시에 저장
134142
productCacheService.cacheProduct(productId, result);
135143

144+
// ✅ 상품 조회 이벤트 발행 (메트릭 집계용)
145+
productEventPublisher.publish(ProductEvent.ProductViewed.from(productId));
146+
136147
// 로컬 캐시의 좋아요 수 델타 적용 (DB 조회 결과에도 델타 반영)
137148
return productCacheService.applyLikeCountDelta(result);
138149
}
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
package com.loopers.application.outbox;
2+
3+
import com.loopers.domain.like.LikeEvent;
4+
import com.loopers.domain.order.OrderEvent;
5+
import com.loopers.domain.product.ProductEvent;
6+
import lombok.RequiredArgsConstructor;
7+
import lombok.extern.slf4j.Slf4j;
8+
import org.springframework.scheduling.annotation.Async;
9+
import org.springframework.stereotype.Component;
10+
import org.springframework.transaction.event.TransactionPhase;
11+
import org.springframework.transaction.event.TransactionalEventListener;
12+
13+
/**
14+
* Outbox Bridge Event Listener.
15+
* <p>
16+
* ApplicationEvent를 구독하여 외부 시스템(Kafka)으로 전송해야 하는 이벤트를
17+
* Transactional Outbox Pattern을 통해 Outbox에 저장합니다.
18+
* </p>
19+
* <p>
20+
* <b>표준 패턴:</b>
21+
* <ul>
22+
* <li>EventPublisher는 ApplicationEvent만 발행 (단일 책임)</li>
23+
* <li>이 컴포넌트가 ApplicationEvent를 구독하여 Outbox에 저장 (관심사 분리)</li>
24+
* <li>트랜잭션 커밋 후(AFTER_COMMIT) 처리하여 에러 격리</li>
25+
* </ul>
26+
* </p>
27+
* <p>
28+
* <b>처리 이벤트:</b>
29+
* <ul>
30+
* <li><b>LikeEvent:</b> LikeAdded, LikeRemoved → like-events</li>
31+
* <li><b>OrderEvent:</b> OrderCreated → order-events</li>
32+
* <li><b>ProductEvent:</b> ProductViewed → product-events</li>
33+
* </ul>
34+
* </p>
35+
*
36+
* @author Loopers
37+
* @version 1.0
38+
*/
39+
@Slf4j
40+
@Component
41+
@RequiredArgsConstructor
42+
public class OutboxBridgeEventListener {
43+
44+
private final OutboxEventService outboxEventService;
45+
46+
/**
47+
* LikeAdded 이벤트를 Outbox에 저장합니다.
48+
*
49+
* @param event LikeAdded 이벤트
50+
*/
51+
@Async
52+
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
53+
public void handleLikeAdded(LikeEvent.LikeAdded event) {
54+
try {
55+
outboxEventService.saveEvent(
56+
"LikeAdded",
57+
event.productId().toString(),
58+
"Product",
59+
event,
60+
"like-events",
61+
event.productId().toString()
62+
);
63+
log.debug("LikeAdded 이벤트를 Outbox에 저장: productId={}", event.productId());
64+
} catch (Exception e) {
65+
log.error("LikeAdded 이벤트 Outbox 저장 실패: productId={}", event.productId(), e);
66+
// 외부 시스템 전송 실패는 내부 처리에 영향 없음
67+
}
68+
}
69+
70+
/**
71+
* LikeRemoved 이벤트를 Outbox에 저장합니다.
72+
*
73+
* @param event LikeRemoved 이벤트
74+
*/
75+
@Async
76+
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
77+
public void handleLikeRemoved(LikeEvent.LikeRemoved event) {
78+
try {
79+
outboxEventService.saveEvent(
80+
"LikeRemoved",
81+
event.productId().toString(),
82+
"Product",
83+
event,
84+
"like-events",
85+
event.productId().toString()
86+
);
87+
log.debug("LikeRemoved 이벤트를 Outbox에 저장: productId={}", event.productId());
88+
} catch (Exception e) {
89+
log.error("LikeRemoved 이벤트 Outbox 저장 실패: productId={}", event.productId(), e);
90+
// 외부 시스템 전송 실패는 내부 처리에 영향 없음
91+
}
92+
}
93+
94+
/**
95+
* OrderCreated 이벤트를 Outbox에 저장합니다.
96+
*
97+
* @param event OrderCreated 이벤트
98+
*/
99+
@Async
100+
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
101+
public void handleOrderCreated(OrderEvent.OrderCreated event) {
102+
try {
103+
outboxEventService.saveEvent(
104+
"OrderCreated",
105+
event.orderId().toString(),
106+
"Order",
107+
event,
108+
"order-events",
109+
event.orderId().toString()
110+
);
111+
log.debug("OrderCreated 이벤트를 Outbox에 저장: orderId={}", event.orderId());
112+
} catch (Exception e) {
113+
log.error("OrderCreated 이벤트 Outbox 저장 실패: orderId={}", event.orderId(), e);
114+
// 외부 시스템 전송 실패는 내부 처리에 영향 없음
115+
}
116+
}
117+
118+
/**
119+
* ProductViewed 이벤트를 Outbox에 저장합니다.
120+
*
121+
* @param event ProductViewed 이벤트
122+
*/
123+
@Async
124+
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
125+
public void handleProductViewed(ProductEvent.ProductViewed event) {
126+
try {
127+
outboxEventService.saveEvent(
128+
"ProductViewed",
129+
event.productId().toString(),
130+
"Product",
131+
event,
132+
"product-events",
133+
event.productId().toString()
134+
);
135+
log.debug("ProductViewed 이벤트를 Outbox에 저장: productId={}", event.productId());
136+
} catch (Exception e) {
137+
log.error("ProductViewed 이벤트 Outbox 저장 실패: productId={}", event.productId(), e);
138+
// 외부 시스템 전송 실패는 내부 처리에 영향 없음
139+
}
140+
}
141+
}
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package com.loopers.application.outbox;
2+
3+
import com.fasterxml.jackson.databind.ObjectMapper;
4+
import com.loopers.domain.outbox.OutboxEvent;
5+
import com.loopers.domain.outbox.OutboxEventRepository;
6+
import lombok.RequiredArgsConstructor;
7+
import lombok.extern.slf4j.Slf4j;
8+
import org.springframework.dao.DataIntegrityViolationException;
9+
import org.springframework.stereotype.Service;
10+
import org.springframework.transaction.annotation.Transactional;
11+
12+
import java.util.UUID;
13+
14+
/**
15+
* Outbox 이벤트 저장 서비스.
16+
* <p>
17+
* 도메인 트랜잭션과 같은 트랜잭션에서 Outbox에 이벤트를 저장합니다.
18+
* Application 레이어에 위치하여 비즈니스 로직(이벤트 저장 결정)을 처리합니다.
19+
* </p>
20+
*
21+
* @author Loopers
22+
* @version 1.0
23+
*/
24+
@Slf4j
25+
@Service
26+
@RequiredArgsConstructor
27+
public class OutboxEventService {
28+
29+
private final OutboxEventRepository outboxEventRepository;
30+
private final ObjectMapper objectMapper;
31+
32+
/**
33+
* Kafka로 전송할 이벤트를 Outbox에 저장합니다.
34+
* <p>
35+
* 도메인 트랜잭션과 같은 트랜잭션에서 실행되어야 합니다.
36+
* 집계 ID별로 순차적인 버전을 자동으로 부여합니다.
37+
* </p>
38+
* <p>
39+
* 버전 충돌 시 최대 3회까지 재시도합니다.
40+
* 유니크 제약 조건을 통해 경쟁 조건을 감지하고 재시도합니다.
41+
* </p>
42+
*
43+
* @param eventType 이벤트 타입 (예: "OrderCreated", "LikeAdded")
44+
* @param aggregateId 집계 ID (예: orderId, productId)
45+
* @param aggregateType 집계 타입 (예: "Order", "Product")
46+
* @param event 이벤트 객체
47+
* @param topic Kafka 토픽 이름
48+
* @param partitionKey 파티션 키
49+
*/
50+
@Transactional
51+
public void saveEvent(
52+
String eventType,
53+
String aggregateId,
54+
String aggregateType,
55+
Object event,
56+
String topic,
57+
String partitionKey
58+
) {
59+
int maxRetries = 3;
60+
for (int i = 0; i < maxRetries; i++) {
61+
try {
62+
String eventId = UUID.randomUUID().toString();
63+
String payload = objectMapper.writeValueAsString(event);
64+
65+
// 집계 ID별 최신 버전 조회 후 +1
66+
Long latestVersion = outboxEventRepository.findLatestVersionByAggregateId(aggregateId, aggregateType);
67+
Long nextVersion = latestVersion + 1L;
68+
69+
OutboxEvent outboxEvent = OutboxEvent.builder()
70+
.eventId(eventId)
71+
.eventType(eventType)
72+
.aggregateId(aggregateId)
73+
.aggregateType(aggregateType)
74+
.payload(payload)
75+
.topic(topic)
76+
.partitionKey(partitionKey)
77+
.version(nextVersion)
78+
.build();
79+
80+
outboxEventRepository.save(outboxEvent);
81+
log.debug("Outbox 이벤트 저장: eventType={}, aggregateId={}, topic={}, version={}",
82+
eventType, aggregateId, topic, nextVersion);
83+
return; // 성공
84+
} catch (DataIntegrityViolationException e) {
85+
// 유니크 제약 조건 위반 (버전 충돌)
86+
if (i == maxRetries - 1) {
87+
log.error("Outbox 이벤트 저장 실패 (최대 재시도 횟수 초과): eventType={}, aggregateId={}, retryCount={}",
88+
eventType, aggregateId, i + 1, e);
89+
throw new RuntimeException("Outbox 이벤트 저장 실패: 버전 충돌", e);
90+
}
91+
log.warn("Outbox 이벤트 저장 재시도: eventType={}, aggregateId={}, retryCount={}",
92+
eventType, aggregateId, i + 1);
93+
} catch (Exception e) {
94+
log.error("Outbox 이벤트 저장 실패: eventType={}, aggregateId={}",
95+
eventType, aggregateId, e);
96+
throw new RuntimeException("Outbox 이벤트 저장 실패", e);
97+
}
98+
}
99+
}
100+
}

0 commit comments

Comments
 (0)