Skip to content

Commit 67344b8

Browse files
committed
feature: 메트릭 이벤트 컨슈머 확장 및 Kafka 기반 지표 관리 강화
- `LikeEventConsumer`를 `MetricsEventConsumer`로 변경하고 상품 지표(좋아요, 조회수, 판매량) 처리 지원. - `ProductMetricsService` 확장: 조회수(`ProductViewEvent`), 판매량(`SalesCountEvent`) 메트릭 처리 로직 추가. - `ProductMetrics` 엔티티에 조회수와 판매량 속성 및 관련 업데이트 메서드 추가.
1 parent 2a38caf commit 67344b8

4 files changed

Lines changed: 115 additions & 45 deletions

File tree

apps/commerce-streamer/src/main/java/com/loopers/domain/metrics/ProductMetrics.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@ public class ProductMetrics {
1515
@Id
1616
private Long productId;
1717

18-
private int likeCount;
18+
private int likeCount = 0;
19+
private int viewCount = 0;
20+
private int salesCount = 0;
1921

2022
private LocalDateTime updatedAt;
2123

@@ -30,4 +32,14 @@ public void updateLikeCount(int newCount, LocalDateTime eventTime) {
3032
this.likeCount = newCount;
3133
this.updatedAt = eventTime;
3234
}
35+
36+
public void incrementViewCount() {
37+
this.viewCount += 1;
38+
this.updatedAt = LocalDateTime.now();
39+
}
40+
41+
public void addSalesCount(int quantity) {
42+
this.salesCount += quantity;
43+
this.updatedAt = LocalDateTime.now();
44+
}
3345
}
Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package com.loopers.domain.metrics;
22

33
import com.loopers.domain.event.EventHandled;
4-
import com.loopers.event.LikeKafkaEvent;
4+
import com.loopers.event.LikeCountEvent;
5+
import com.loopers.event.ProductViewEvent;
6+
import com.loopers.event.SalesCountEvent;
57
import com.loopers.infrastructure.EventHandledRepository;
68
import com.loopers.infrastructure.ProductMetricsRepository;
79
import java.time.LocalDateTime;
@@ -17,17 +19,49 @@ public class ProductMetricsService {
1719
private final EventHandledRepository eventHandledRepository;
1820

1921
@Transactional
20-
public void processLikeEvent(LikeKafkaEvent event) {
21-
if (eventHandledRepository.existsById(event.eventId())) {
22-
return;
23-
}
22+
public void processLikeCountEvent(LikeCountEvent event) {
23+
if (isAlreadyHandled(event.eventId())) return;
2424

25-
ProductMetrics metrics = metricsRepository.findById(event.productId())
26-
.orElseGet(() -> new ProductMetrics(event.productId()));
25+
ProductMetrics metrics = getOrCreateMetrics(event.productId());
2726

2827
metrics.updateLikeCount(event.currentLikeCount(), LocalDateTime.now());
29-
metricsRepository.save(metrics);
3028

31-
eventHandledRepository.save(new EventHandled(event.eventId()));
29+
completeProcess(event.eventId(), metrics);
30+
}
31+
32+
@Transactional
33+
public void processProductViewEvent(ProductViewEvent event) {
34+
if (isAlreadyHandled(event.eventId())) return;
35+
36+
ProductMetrics metrics = getOrCreateMetrics(event.productId());
37+
38+
metrics.incrementViewCount();
39+
40+
completeProcess(event.eventId(), metrics);
41+
}
42+
43+
@Transactional
44+
public void processSalesCountEvent(SalesCountEvent event) {
45+
if (isAlreadyHandled(event.eventId())) return;
46+
47+
ProductMetrics metrics = getOrCreateMetrics(event.productId());
48+
49+
metrics.addSalesCount(event.quantity());
50+
51+
completeProcess(event.eventId(), metrics);
52+
}
53+
54+
private boolean isAlreadyHandled(String eventId) {
55+
return eventHandledRepository.existsById(eventId);
56+
}
57+
58+
private ProductMetrics getOrCreateMetrics(Long productId) {
59+
return metricsRepository.findById(productId)
60+
.orElseGet(() -> new ProductMetrics(productId));
61+
}
62+
63+
private void completeProcess(String eventId, ProductMetrics metrics) {
64+
metricsRepository.save(metrics);
65+
eventHandledRepository.save(new EventHandled(eventId));
3266
}
3367
}

apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/LikeEventConsumer.java

Lines changed: 0 additions & 35 deletions
This file was deleted.
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package com.loopers.interfaces.consumer;
2+
3+
import com.loopers.domain.metrics.ProductMetricsService;
4+
import com.loopers.event.LikeCountEvent;
5+
import com.loopers.event.ProductViewEvent;
6+
import com.loopers.event.SalesCountEvent;
7+
import lombok.RequiredArgsConstructor;
8+
import lombok.extern.slf4j.Slf4j;
9+
import org.apache.kafka.clients.consumer.ConsumerRecord;
10+
import org.springframework.kafka.annotation.KafkaListener;
11+
import org.springframework.kafka.support.Acknowledgment;
12+
import org.springframework.stereotype.Component;
13+
14+
@Component
15+
@RequiredArgsConstructor
16+
@Slf4j
17+
public class MetricsEventConsumer {
18+
19+
private final ProductMetricsService metricsService;
20+
21+
@KafkaListener(
22+
topics = "catalog-events",
23+
groupId = "metrics-group"
24+
)
25+
public void consumeLikeCount(ConsumerRecord<String, LikeCountEvent> record, Acknowledgment ack) {
26+
try {
27+
metricsService.processLikeCountEvent(record.value());
28+
ack.acknowledge();
29+
} catch (Exception e) {
30+
log.error("좋아요 메트릭 처리 실패: {}", e.getMessage());
31+
}
32+
}
33+
34+
@KafkaListener(
35+
topics = "catalog-events",
36+
groupId = "metrics-group"
37+
)
38+
public void consumeProductView(ProductViewEvent event, Acknowledgment ack) {
39+
try {
40+
metricsService.processProductViewEvent(event);
41+
ack.acknowledge();
42+
} catch (Exception e) {
43+
log.error("조회수 메트릭 처리 실패: {}", event.eventId(), e);
44+
}
45+
}
46+
47+
@KafkaListener(
48+
topics = "catalog-events",
49+
groupId = "metrics-group"
50+
)
51+
public void consumeSalesCount(SalesCountEvent event, Acknowledgment ack) {
52+
try {
53+
metricsService.processSalesCountEvent(event);
54+
ack.acknowledge();
55+
} catch (Exception e) {
56+
log.error("판매량 메트릭 처리 실패: {}", event.eventId(), e);
57+
}
58+
}
59+
}

0 commit comments

Comments
 (0)