Skip to content

Commit 5532bb8

Browse files
committed
feat: kafka consumer 정의
1 parent 08430d9 commit 5532bb8

7 files changed

Lines changed: 182 additions & 0 deletions

File tree

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package com.loopers.domain;
2+
3+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
4+
5+
import java.math.BigDecimal;
6+
7+
@JsonIgnoreProperties(ignoreUnknown = true)
8+
public record OutboxEvent(
9+
Long id,
10+
String aggregateType,
11+
Long aggregateId,
12+
String eventType,
13+
String status,
14+
BigDecimal createdAt,
15+
BigDecimal updatedAt
16+
) {
17+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package com.loopers.domain;
2+
3+
public enum ProductEventType {
4+
PRODUCT_VIEWED,
5+
PRODUCT_LIKED,
6+
PRODUCT_SALES
7+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package com.loopers.domain;
2+
3+
import jakarta.persistence.*;
4+
import lombok.AccessLevel;
5+
import lombok.Getter;
6+
import lombok.NoArgsConstructor;
7+
8+
@Entity
9+
@Table(name = "product_metrics")
10+
@NoArgsConstructor(access = AccessLevel.PROTECTED)
11+
@Getter
12+
public class ProductMetric extends BaseEntity {
13+
@Column(name = "ref_product_id", nullable = false)
14+
private Long productId;
15+
16+
@Column(name = "view_count", nullable = false)
17+
private Long viewCount;
18+
19+
@Column(name = "like_count", nullable = false)
20+
private Long likeCount;
21+
22+
@Column(name = "sales_volume", nullable = false)
23+
private Long salesVolume;
24+
25+
@Enumerated(EnumType.STRING)
26+
@Column(name = "event_type", nullable = false)
27+
private ProductEventType eventType;
28+
29+
public ProductMetric(Long productId, Long viewCount, Long likeCount, Long salesVolume, ProductEventType eventType) {
30+
this.productId = productId;
31+
this.viewCount = viewCount;
32+
this.likeCount = likeCount;
33+
this.salesVolume = salesVolume;
34+
this.eventType = eventType;
35+
}
36+
37+
public static ProductMetric of(Long productId, Long viewCount, Long likeCount, Long salesVolume, ProductEventType eventType) {
38+
return new ProductMetric(productId, viewCount, likeCount, salesVolume, eventType);
39+
}
40+
41+
public void increaseProductMetric(ProductEventType eventType) {
42+
switch (eventType) {
43+
case PRODUCT_VIEWED -> increaseViewCount();
44+
case PRODUCT_LIKED -> increaseLikeCount();
45+
case PRODUCT_SALES -> increaseSalesVolume();
46+
}
47+
}
48+
49+
public void increaseViewCount() {
50+
this.viewCount++;
51+
}
52+
53+
public void increaseLikeCount() {
54+
this.likeCount++;
55+
}
56+
57+
public void increaseSalesVolume() {
58+
this.salesVolume++;
59+
}
60+
61+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package com.loopers.domain;
2+
3+
4+
public interface ProductMetricRepository {
5+
ProductMetric findByProductId(Long productId);
6+
7+
ProductMetric save(ProductMetric of);
8+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package com.loopers.infrastructure;
2+
3+
import com.loopers.domain.ProductMetric;
4+
import org.springframework.data.jpa.repository.JpaRepository;
5+
6+
7+
public interface ProductMetricJpaRepository extends JpaRepository<ProductMetric, Long> {
8+
ProductMetric findByProductId(Long productId);
9+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package com.loopers.infrastructure;
2+
3+
import com.loopers.domain.ProductMetric;
4+
import com.loopers.domain.ProductMetricRepository;
5+
import lombok.RequiredArgsConstructor;
6+
import org.springframework.stereotype.Component;
7+
8+
9+
@Component
10+
@RequiredArgsConstructor
11+
public class ProductMetricRepositoryImpl implements ProductMetricRepository {
12+
private final ProductMetricJpaRepository productMetricJpaRepository;
13+
14+
@Override
15+
public ProductMetric findByProductId(Long productId) {
16+
return productMetricJpaRepository.findByProductId(productId);
17+
}
18+
19+
@Override
20+
public ProductMetric save(ProductMetric productMetric) {
21+
return productMetricJpaRepository.save(productMetric);
22+
}
23+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package com.loopers.interfaces.consumer;
2+
3+
import com.fasterxml.jackson.core.JsonProcessingException;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import com.loopers.confg.kafka.KafkaConfig;
6+
import com.loopers.domain.OutboxEvent;
7+
import com.loopers.domain.ProductEventType;
8+
import com.loopers.domain.ProductMetric;
9+
import com.loopers.domain.ProductMetricRepository;
10+
import lombok.RequiredArgsConstructor;
11+
import org.apache.kafka.clients.consumer.ConsumerRecord;
12+
import org.springframework.kafka.annotation.KafkaListener;
13+
import org.springframework.kafka.support.Acknowledgment;
14+
import org.springframework.stereotype.Component;
15+
16+
import java.util.List;
17+
18+
@Component
19+
@RequiredArgsConstructor
20+
public class KafkaOutboxConsumer {
21+
private final ProductMetricRepository productMetricRepository;
22+
private final ObjectMapper objectMapper;
23+
24+
@KafkaListener(
25+
topics = {"product-viewed"},
26+
containerFactory = KafkaConfig.BATCH_LISTENER
27+
)
28+
public void demoListener(
29+
List<ConsumerRecord<String, String>> messages,
30+
Acknowledgment acknowledgment
31+
) throws JsonProcessingException {
32+
for (var record : messages) {
33+
OutboxEvent value = objectMapper.readValue(record.value(), OutboxEvent.class);
34+
Long productId = value.aggregateId();
35+
String eventType = value.eventType();
36+
37+
ProductMetric productMetric = productMetricRepository.findByProductId(productId);
38+
39+
if (productMetric == null) {
40+
productMetricRepository.save(
41+
ProductMetric.of(
42+
productId,
43+
0L,
44+
0L,
45+
0L,
46+
ProductEventType.valueOf(eventType)
47+
)
48+
);
49+
}
50+
else {
51+
productMetric.increaseProductMetric(ProductEventType.valueOf(eventType));
52+
}
53+
54+
}
55+
acknowledgment.acknowledge();
56+
}
57+
}

0 commit comments

Comments
 (0)