|
9 | 9 | import com.loopers.domain.ProductMetricRepository; |
10 | 10 | import lombok.RequiredArgsConstructor; |
11 | 11 | import org.apache.kafka.clients.consumer.ConsumerRecord; |
| 12 | +import org.springframework.data.redis.core.StringRedisTemplate; |
| 13 | +import org.springframework.data.redis.core.ZSetOperations; |
12 | 14 | import org.springframework.kafka.annotation.KafkaListener; |
13 | 15 | import org.springframework.kafka.support.Acknowledgment; |
14 | 16 | import org.springframework.stereotype.Component; |
15 | 17 | import org.springframework.transaction.annotation.Transactional; |
16 | 18 |
|
| 19 | +import java.time.Duration; |
| 20 | +import java.time.LocalDate; |
| 21 | +import java.time.ZoneId; |
| 22 | +import java.time.format.DateTimeFormatter; |
| 23 | +import java.util.HashMap; |
17 | 24 | import java.util.List; |
| 25 | +import java.util.Map; |
18 | 26 |
|
19 | 27 | @Component |
20 | 28 | @RequiredArgsConstructor |
21 | 29 | public class KafkaOutboxConsumer { |
22 | 30 | private final ProductMetricRepository productMetricRepository; |
23 | 31 | private final ObjectMapper objectMapper; |
| 32 | + private final StringRedisTemplate redisTemplate; |
| 33 | + |
| 34 | + private static final ZoneId KST = ZoneId.of("Asia/Seoul"); |
| 35 | + private static final DateTimeFormatter YYYYMMDD = DateTimeFormatter.BASIC_ISO_DATE; |
24 | 36 |
|
25 | 37 | @KafkaListener( |
26 | | - topics = {"product-viewed", "product-liked"}, |
| 38 | + topics = {"product-viewed", "product-liked", "product-sales"}, |
27 | 39 | containerFactory = KafkaConfig.BATCH_LISTENER |
28 | 40 | ) |
29 | 41 | @Transactional |
30 | 42 | public void productViewedListener( |
31 | 43 | List<ConsumerRecord<String, String>> messages, |
32 | 44 | Acknowledgment acknowledgment |
33 | 45 | ) throws JsonProcessingException { |
| 46 | + |
| 47 | + // 1) 배치 내 productId별 점수 누적 |
| 48 | + Map<Long, Double> scoreDelta = new HashMap<>(); |
| 49 | + |
34 | 50 | for (var record : messages) { |
35 | 51 | OutboxEvent value = objectMapper.readValue(record.value(), OutboxEvent.class); |
36 | 52 | Long productId = value.aggregateId(); |
37 | | - String eventType = value.eventType(); |
| 53 | + ProductEventType eventType = ProductEventType.valueOf(value.eventType()); |
| 54 | + |
| 55 | + scoreDelta.merge(productId, weight(eventType), Double::sum); |
38 | 56 |
|
39 | 57 | ProductMetric productMetric = productMetricRepository.findByProductId(productId); |
40 | 58 |
|
41 | 59 | if (productMetric == null) { |
42 | | - ProductMetric newProductMetric = ProductMetric.of(productId, ProductEventType.valueOf(eventType)); |
| 60 | + ProductMetric newProductMetric = ProductMetric.of(productId, eventType); |
43 | 61 |
|
44 | 62 | productMetricRepository.save(newProductMetric); |
45 | 63 | } |
46 | 64 | else { |
47 | | - productMetric.increaseProductMetric(ProductEventType.valueOf(eventType)); |
| 65 | + productMetric.increaseProductMetric(eventType); |
48 | 66 | } |
| 67 | + } |
49 | 68 |
|
| 69 | + String key = "ranking:all:" + LocalDate.now(KST).format(YYYYMMDD); |
| 70 | + ZSetOperations<String, String> zset = redisTemplate.opsForZSet(); |
| 71 | + |
| 72 | + for (var e : scoreDelta.entrySet()) { |
| 73 | + zset.incrementScore(key, String.valueOf(e.getKey()), e.getValue()); // ZINCRBY |
50 | 74 | } |
| 75 | + |
| 76 | + // 3) 일간 키는 TTL 걸어두는 게 운영에 유리 (예: 8일 보관) |
| 77 | + redisTemplate.expire(key, Duration.ofDays(8)); |
| 78 | + |
51 | 79 | acknowledgment.acknowledge(); |
52 | 80 | } |
| 81 | + |
| 82 | + double weight(ProductEventType eventType) { |
| 83 | + return switch (eventType) { |
| 84 | + case PRODUCT_VIEWED -> 0.1; |
| 85 | + case PRODUCT_LIKED -> 0.3; |
| 86 | + case PRODUCT_SALES -> 0.6; |
| 87 | + }; |
| 88 | + } |
53 | 89 | } |
0 commit comments