Skip to content

Commit e40c4cd

Browse files
committed
refactor: ProductMetrics 낙관적 락 및 타임스탬프 검증 제거
- ProductMetrics @Version 필드 제거 (단일 컨슈머 환경에서 불필요) - 타임스탬프 기반 out-of-order 이벤트 체크 로직 제거 - Kafka 파티션 순서 보장으로 충분하므로 복잡한 검증 로직 단순화 - 메트릭 업데이트 메서드에서 타임스탬프 파라미터 제거 - event_handled 테이블 기반 멱등성만 유지
1 parent 3174797 commit e40c4cd

3 files changed

Lines changed: 30 additions & 125 deletions

File tree

apps/commerce-streamer/src/main/java/com/loopers/application/metrics/MetricsAggregationService.java

Lines changed: 16 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,6 @@
1313
import org.springframework.stereotype.Service;
1414
import org.springframework.transaction.annotation.Transactional;
1515

16-
import java.time.ZoneId;
17-
import java.time.ZonedDateTime;
18-
1916
/**
2017
* 메트릭 집계 서비스
2118
* - Kafka 이벤트를 받아서 ProductMetrics 테이블 업데이트
@@ -44,28 +41,16 @@ public void handleProductLiked(String eventId, ProductLikedEvent payload) {
4441
ProductMetrics metrics = metricsRepository.findById(payload.productId())
4542
.orElse(ProductMetrics.create(payload.productId()));
4643

47-
ZonedDateTime eventOccurredAt = toZonedDateTime(payload.likedAt());
48-
boolean updated = metrics.incrementLikeCount(eventOccurredAt);
49-
50-
if (!updated) {
51-
log.warn("[Metrics] Out-of-order event ignored - eventId: {}, productId: {}, eventTime: {}, lastUpdated: {}",
52-
eventId, payload.productId(), eventOccurredAt, metrics.getLastUpdated());
53-
// 오래된 이벤트도 처리 완료 기록 (재처리 방지)
54-
eventHandledRepository.save(
55-
EventHandled.create(eventId, "PRODUCT_LIKED", String.valueOf(payload.productId()))
56-
);
57-
return;
58-
}
59-
44+
metrics.incrementLikeCount();
6045
metricsRepository.save(metrics);
6146

6247
// 처리 완료 기록
6348
eventHandledRepository.save(
6449
EventHandled.create(eventId, "PRODUCT_LIKED", String.valueOf(payload.productId()))
6550
);
6651

67-
log.info("[Metrics] Like count incremented - productId: {}, count: {}, eventTime: {}",
68-
payload.productId(), metrics.getLikeCount(), eventOccurredAt);
52+
log.info("[Metrics] Like count incremented - productId: {}, count: {}",
53+
payload.productId(), metrics.getLikeCount());
6954
}
7055

7156
/**
@@ -82,28 +67,16 @@ public void handleProductUnliked(String eventId, ProductUnlikedEvent payload) {
8267
ProductMetrics metrics = metricsRepository.findById(payload.productId())
8368
.orElse(ProductMetrics.create(payload.productId()));
8469

85-
ZonedDateTime eventOccurredAt = toZonedDateTime(payload.unlikedAt());
86-
boolean updated = metrics.decrementLikeCount(eventOccurredAt);
87-
88-
if (!updated) {
89-
log.warn("[Metrics] Out-of-order event ignored - eventId: {}, productId: {}, eventTime: {}, lastUpdated: {}",
90-
eventId, payload.productId(), eventOccurredAt, metrics.getLastUpdated());
91-
// 오래된 이벤트도 처리 완료 기록 (재처리 방지)
92-
eventHandledRepository.save(
93-
EventHandled.create(eventId, "PRODUCT_UNLIKED", String.valueOf(payload.productId()))
94-
);
95-
return;
96-
}
97-
70+
metrics.decrementLikeCount();
9871
metricsRepository.save(metrics);
9972

10073
// 처리 완료 기록
10174
eventHandledRepository.save(
10275
EventHandled.create(eventId, "PRODUCT_UNLIKED", String.valueOf(payload.productId()))
10376
);
10477

105-
log.info("[Metrics] Like count decremented - productId: {}, count: {}, eventTime: {}",
106-
payload.productId(), metrics.getLikeCount(), eventOccurredAt);
78+
log.info("[Metrics] Like count decremented - productId: {}, count: {}",
79+
payload.productId(), metrics.getLikeCount());
10780
}
10881

10982
/**
@@ -116,38 +89,27 @@ public void handleOrderCompleted(String eventId, OrderCompletedEvent payload) {
11689
return;
11790
}
11891

119-
ZonedDateTime eventOccurredAt = toZonedDateTime(payload.completedAt());
120-
int updatedCount = 0;
121-
int ignoredCount = 0;
122-
12392
// 각 주문 아이템별로 ProductMetrics 업데이트
12493
for (var item : payload.items()) {
12594
ProductMetrics metrics = metricsRepository.findById(item.productId())
12695
.orElse(ProductMetrics.create(item.productId()));
12796

12897
// 판매 금액 = 수량 * 단가
12998
long totalAmount = item.quantity() * item.price().longValue();
130-
boolean updated = metrics.addSales(item.quantity(), totalAmount, eventOccurredAt);
131-
132-
if (updated) {
133-
metricsRepository.save(metrics);
134-
updatedCount++;
135-
log.debug("[Metrics] Sales updated - productId: {}, quantity: {}, amount: {}, eventTime: {}",
136-
item.productId(), item.quantity(), totalAmount, eventOccurredAt);
137-
} else {
138-
ignoredCount++;
139-
log.warn("[Metrics] Out-of-order sales event ignored - productId: {}, eventTime: {}, lastUpdated: {}",
140-
item.productId(), eventOccurredAt, metrics.getLastUpdated());
141-
}
99+
metrics.addSales(item.quantity(), totalAmount);
100+
metricsRepository.save(metrics);
101+
102+
log.debug("[Metrics] Sales updated - productId: {}, quantity: {}, amount: {}",
103+
item.productId(), item.quantity(), totalAmount);
142104
}
143105

144106
// 처리 완료 기록
145107
eventHandledRepository.save(
146108
EventHandled.create(eventId, "ORDER_COMPLETED", payload.orderNo())
147109
);
148110

149-
log.info("[Metrics] Sales aggregated for order: {} ({} items updated, {} ignored)",
150-
payload.orderNo(), updatedCount, ignoredCount);
111+
log.info("[Metrics] Sales aggregated for order: {} ({} items)",
112+
payload.orderNo(), payload.items().size());
151113
}
152114

153115
/**
@@ -164,34 +126,15 @@ public void handleProductViewed(String eventId, ProductViewedEvent payload) {
164126
ProductMetrics metrics = metricsRepository.findById(payload.productId())
165127
.orElse(ProductMetrics.create(payload.productId()));
166128

167-
ZonedDateTime eventOccurredAt = toZonedDateTime(payload.viewedAt());
168-
boolean updated = metrics.incrementViewCount(eventOccurredAt);
169-
170-
if (!updated) {
171-
log.warn("[Metrics] Out-of-order event ignored - eventId: {}, productId: {}, eventTime: {}, lastUpdated: {}",
172-
eventId, payload.productId(), eventOccurredAt, metrics.getLastUpdated());
173-
// 오래된 이벤트도 처리 완료 기록 (재처리 방지)
174-
eventHandledRepository.save(
175-
EventHandled.create(eventId, "PRODUCT_VIEWED", String.valueOf(payload.productId()))
176-
);
177-
return;
178-
}
179-
129+
metrics.incrementViewCount();
180130
metricsRepository.save(metrics);
181131

182132
// 처리 완료 기록
183133
eventHandledRepository.save(
184134
EventHandled.create(eventId, "PRODUCT_VIEWED", String.valueOf(payload.productId()))
185135
);
186136

187-
log.info("[Metrics] View count incremented - productId: {}, count: {}, eventTime: {}",
188-
payload.productId(), metrics.getViewCount(), eventOccurredAt);
189-
}
190-
191-
/**
192-
* LocalDateTime을 ZonedDateTime으로 변환 (Asia/Seoul)
193-
*/
194-
private ZonedDateTime toZonedDateTime(java.time.LocalDateTime localDateTime) {
195-
return localDateTime.atZone(ZoneId.of("Asia/Seoul"));
137+
log.info("[Metrics] View count incremented - productId: {}, count: {}",
138+
payload.productId(), metrics.getViewCount());
196139
}
197140
}

apps/commerce-streamer/src/main/java/com/loopers/domain/metrics/ProductMetrics.java

Lines changed: 13 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
* 상품별 집계 메트릭
1212
* - 좋아요 수, 조회 수, 판매량 등을 실시간 집계
1313
* - Kafka Consumer가 이벤트를 받아서 업데이트
14+
* - event_handled 테이블로 멱등성 보장 (중복 처리 방지)
1415
*/
1516
@Entity
1617
@Table(name = "product_metrics", indexes = {
@@ -59,12 +60,6 @@ public class ProductMetrics {
5960
@Column(name = "last_updated", nullable = false)
6061
private ZonedDateTime lastUpdated;
6162

62-
/**
63-
* 낙관적 락 (동시성 제어)
64-
*/
65-
@Version
66-
private int version;
67-
6863
/**
6964
* 상품 메트릭 초기 생성
7065
*/
@@ -80,71 +75,38 @@ public static ProductMetrics create(Long productId) {
8075
}
8176

8277
/**
83-
* 좋아요 수 증가 (타임스탬프 체크)
84-
* @param eventOccurredAt 이벤트 발생 시각
85-
* @return 업데이트 성공 여부 (false = 오래된 이벤트로 무시됨)
78+
* 좋아요 수 증가
8679
*/
87-
public boolean incrementLikeCount(ZonedDateTime eventOccurredAt) {
88-
if (isEventOutdated(eventOccurredAt)) {
89-
return false;
90-
}
80+
public void incrementLikeCount() {
9181
this.likeCount++;
92-
this.lastUpdated = eventOccurredAt;
93-
return true;
82+
this.lastUpdated = ZonedDateTime.now();
9483
}
9584

9685
/**
97-
* 좋아요 수 감소 (타임스탬프 체크)
98-
* @param eventOccurredAt 이벤트 발생 시각
99-
* @return 업데이트 성공 여부 (false = 오래된 이벤트로 무시됨)
86+
* 좋아요 수 감소
10087
*/
101-
public boolean decrementLikeCount(ZonedDateTime eventOccurredAt) {
102-
if (isEventOutdated(eventOccurredAt)) {
103-
return false;
104-
}
88+
public void decrementLikeCount() {
10589
this.likeCount = Math.max(0, this.likeCount - 1);
106-
this.lastUpdated = eventOccurredAt;
107-
return true;
90+
this.lastUpdated = ZonedDateTime.now();
10891
}
10992

11093
/**
111-
* 조회 수 증가 (타임스탬프 체크)
112-
* @param eventOccurredAt 이벤트 발생 시각
113-
* @return 업데이트 성공 여부 (false = 오래된 이벤트로 무시됨)
94+
* 조회 수 증가
11495
*/
115-
public boolean incrementViewCount(ZonedDateTime eventOccurredAt) {
116-
if (isEventOutdated(eventOccurredAt)) {
117-
return false;
118-
}
96+
public void incrementViewCount() {
11997
this.viewCount++;
120-
this.lastUpdated = eventOccurredAt;
121-
return true;
98+
this.lastUpdated = ZonedDateTime.now();
12299
}
123100

124101
/**
125-
* 판매 데이터 추가 (타임스탬프 체크)
102+
* 판매 데이터 추가
126103
* @param quantity 판매 수량
127104
* @param amount 판매 금액
128-
* @param eventOccurredAt 이벤트 발생 시각
129-
* @return 업데이트 성공 여부 (false = 오래된 이벤트로 무시됨)
130105
*/
131-
public boolean addSales(int quantity, long amount, ZonedDateTime eventOccurredAt) {
132-
if (isEventOutdated(eventOccurredAt)) {
133-
return false;
134-
}
106+
public void addSales(int quantity, long amount) {
135107
this.salesCount += quantity;
136108
this.salesAmount += amount;
137-
this.lastUpdated = eventOccurredAt;
138-
return true;
139-
}
140-
141-
/**
142-
* 이벤트가 현재 상태보다 오래된 것인지 체크
143-
* @param eventOccurredAt 이벤트 발생 시각
144-
* @return true = 오래된 이벤트 (무시해야 함), false = 최신 이벤트 (처리해야 함)
145-
*/
146-
private boolean isEventOutdated(ZonedDateTime eventOccurredAt) {
147-
return this.lastUpdated != null && eventOccurredAt.isBefore(this.lastUpdated);
109+
this.lastUpdated = ZonedDateTime.now();
148110
}
149111

150112
/**

apps/commerce-streamer/src/test/java/com/loopers/application/metrics/MetricsAggregationServiceIdempotencyTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ void shouldIgnoreDuplicateProductLikedEvent() {
8585
void shouldIgnoreDuplicateProductUnlikedEvent() {
8686
// given - 좋아요 수를 먼저 1로 설정
8787
ProductMetrics metrics = metricsRepository.findById(PRODUCT_ID).orElseThrow();
88-
metrics.incrementLikeCount(LocalDateTime.now().atZone(java.time.ZoneId.of("Asia/Seoul")));
88+
metrics.incrementLikeCount();
8989
metricsRepository.save(metrics);
9090

9191
String eventId = "event-unliked-001";

0 commit comments

Comments
 (0)