Skip to content

Commit dadc0cc

Browse files
committed
feat: consumer 로직 작성
1 parent c680697 commit dadc0cc

2 files changed

Lines changed: 30 additions & 13 deletions

File tree

apps/commerce-streamer/src/main/java/com/loopers/domain/ProductMetric.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,31 @@ public ProductMetric(Long productId, Long viewCount, Long likeCount, Long salesV
3434
this.eventType = eventType;
3535
}
3636

37-
public static ProductMetric of(Long productId, Long viewCount, Long likeCount, Long salesVolume, ProductEventType eventType) {
38-
return new ProductMetric(productId, viewCount, likeCount, salesVolume, eventType);
37+
/*
38+
- [ ] 리팩토링 예정
39+
*/
40+
public static ProductMetric of(Long productId, ProductEventType eventType) {
41+
if (eventType == null) {
42+
throw new IllegalArgumentException("정의되지 않은 event type입니다.");
43+
}
44+
45+
return switch (eventType) {
46+
case PRODUCT_VIEWED -> ofProductViewed(productId);
47+
case PRODUCT_LIKED -> ofProductLiked(productId);
48+
case PRODUCT_SALES -> ofProductSales(productId);
49+
};
50+
}
51+
52+
public static ProductMetric ofProductViewed(Long productId) {
53+
return new ProductMetric(productId, 1L, 0L, 0L, ProductEventType.PRODUCT_VIEWED);
54+
}
55+
56+
public static ProductMetric ofProductLiked(Long productId) {
57+
return new ProductMetric(productId, 0L, 1L, 0L, ProductEventType.PRODUCT_LIKED);
58+
}
59+
60+
public static ProductMetric ofProductSales(Long productId) {
61+
return new ProductMetric(productId, 0L, 0L, 1L, ProductEventType.PRODUCT_SALES);
3962
}
4063

4164
public void increaseProductMetric(ProductEventType eventType) {

apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/KafkaOutboxConsumer.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,11 @@ public class KafkaOutboxConsumer {
2323
private final ObjectMapper objectMapper;
2424

2525
@KafkaListener(
26-
topics = {"product-viewed"},
26+
topics = {"product-viewed", "product-liked"},
2727
containerFactory = KafkaConfig.BATCH_LISTENER
2828
)
2929
@Transactional
30-
public void demoListener(
30+
public void productViewedListener(
3131
List<ConsumerRecord<String, String>> messages,
3232
Acknowledgment acknowledgment
3333
) throws JsonProcessingException {
@@ -39,15 +39,9 @@ public void demoListener(
3939
ProductMetric productMetric = productMetricRepository.findByProductId(productId);
4040

4141
if (productMetric == null) {
42-
productMetricRepository.save(
43-
ProductMetric.of(
44-
productId,
45-
0L,
46-
0L,
47-
0L,
48-
ProductEventType.valueOf(eventType)
49-
)
50-
);
42+
ProductMetric newProductMetric = ProductMetric.of(productId, ProductEventType.valueOf(eventType));
43+
44+
productMetricRepository.save(newProductMetric);
5145
}
5246
else {
5347
productMetric.increaseProductMetric(ProductEventType.valueOf(eventType));

0 commit comments

Comments
 (0)