Skip to content

Commit d6fb171

Browse files
committed
feat: Kafka Consumer 구현 및 ProductMetrics 실시간 집계
Consumer 구현: - CatalogEventConsumer: 좋아요/조회수 이벤트 처리 - OrderEventConsumer: 주문/결제 이벤트 처리 - Inbox 패턴으로 멱등성 보장 (중복 메시지 방지) - 배치 처리로 성능 최적화 - DLQ(Dead Letter Queue)로 실패 메시지 처리 ProductMetrics 집계: - 좋아요 수, 조회수, 주문 수, 판매 금액 실시간 집계 - 별도 테이블로 조회 성능 최적화
1 parent 649be8b commit d6fb171

17 files changed

Lines changed: 970 additions & 0 deletions
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package com.loopers.application.dlq;
2+
3+
import com.loopers.domain.dlq.DeadLetterQueue;
4+
import com.loopers.domain.dlq.DeadLetterQueueRepository;
5+
import lombok.RequiredArgsConstructor;
6+
import lombok.extern.slf4j.Slf4j;
7+
import org.springframework.stereotype.Service;
8+
import org.springframework.transaction.annotation.Transactional;
9+
10+
/**
11+
* DeadLetterQueue Service
12+
* - 처리 실패한 메시지 저장
13+
*/
14+
@Slf4j
15+
@Service
16+
@RequiredArgsConstructor
17+
public class DeadLetterQueueService {
18+
19+
private final DeadLetterQueueRepository deadLetterQueueRepository;
20+
21+
/**
22+
* DLQ에 실패 메시지 저장
23+
*
24+
* @param originalTopic 원본 Topic
25+
* @param partitionKey Partition Key
26+
* @param eventId 이벤트 ID
27+
* @param payload 원본 메시지
28+
* @param errorMessage 에러 메시지
29+
* @param retryCount 재시도 횟수
30+
*/
31+
@Transactional
32+
public void save(String originalTopic, String partitionKey, String eventId,
33+
String payload, String errorMessage, int retryCount) {
34+
35+
DeadLetterQueue dlq = DeadLetterQueue.builder()
36+
.originalTopic(originalTopic)
37+
.partitionKey(partitionKey)
38+
.eventId(eventId)
39+
.payload(payload)
40+
.errorMessage(errorMessage)
41+
.retryCount(retryCount)
42+
.build();
43+
44+
deadLetterQueueRepository.save(dlq);
45+
46+
log.error("⚠️ DLQ에 메시지 저장 - topic: {}, eventId: {}, retryCount: {}, error: {}",
47+
originalTopic, eventId, retryCount, errorMessage);
48+
}
49+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package com.loopers.application.inbox;
2+
3+
import com.loopers.domain.inbox.EventInbox;
4+
import com.loopers.domain.inbox.EventInboxRepository;
5+
import lombok.RequiredArgsConstructor;
6+
import lombok.extern.slf4j.Slf4j;
7+
import org.springframework.stereotype.Service;
8+
import org.springframework.transaction.annotation.Transactional;
9+
10+
/**
11+
* EventInbox Service
12+
* - Consumer의 멱등성 보장
13+
*/
14+
@Slf4j
15+
@Service
16+
@RequiredArgsConstructor
17+
public class EventInboxService {
18+
19+
private final EventInboxRepository eventInboxRepository;
20+
21+
/**
22+
* 중복 이벤트 체크
23+
*
24+
* @param eventId 이벤트 ID
25+
* @return 중복 여부
26+
*/
27+
public boolean isDuplicate(String eventId) {
28+
boolean exists = eventInboxRepository.existsByEventId(eventId);
29+
30+
if (exists) {
31+
log.info("중복 이벤트 감지 - eventId: {}", eventId);
32+
}
33+
34+
return exists;
35+
}
36+
37+
/**
38+
* Inbox에 이벤트 저장 (처리 완료 마킹)
39+
*
40+
* @param eventId 이벤트 ID
41+
* @param aggregateType Aggregate Type
42+
* @param aggregateId Aggregate ID
43+
* @param eventType Event Type
44+
*/
45+
@Transactional
46+
public void save(String eventId, String aggregateType, String aggregateId, String eventType) {
47+
EventInbox inbox = EventInbox.builder()
48+
.eventId(eventId)
49+
.aggregateType(aggregateType)
50+
.aggregateId(aggregateId)
51+
.eventType(eventType)
52+
.build();
53+
54+
eventInboxRepository.save(inbox);
55+
56+
log.info("Inbox에 이벤트 저장 - eventId: {}, eventType: {}, aggregateId: {}",
57+
eventId, eventType, aggregateId);
58+
}
59+
}
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package com.loopers.application.metrics;
2+
3+
import com.loopers.domain.metrics.ProductMetrics;
4+
import com.loopers.domain.metrics.ProductMetricsRepository;
5+
import java.math.BigDecimal;
6+
import lombok.RequiredArgsConstructor;
7+
import lombok.extern.slf4j.Slf4j;
8+
import org.springframework.stereotype.Service;
9+
import org.springframework.transaction.annotation.Transactional;
10+
11+
/**
12+
* ProductMetrics Service
13+
* - 상품별 집계 데이터 관리
14+
*/
15+
@Slf4j
16+
@Service
17+
@RequiredArgsConstructor
18+
public class ProductMetricsService {
19+
20+
private final ProductMetricsRepository productMetricsRepository;
21+
22+
/**
23+
* 좋아요 수 증가
24+
*
25+
* @param productId 상품 ID
26+
*/
27+
@Transactional
28+
public void incrementLikeCount(Long productId) {
29+
ProductMetrics metrics = getOrCreate(productId);
30+
metrics.incrementLikeCount();
31+
productMetricsRepository.save(metrics);
32+
33+
log.info("좋아요 수 증가 - productId: {}, likeCount: {}", productId, metrics.getLikeCount());
34+
}
35+
36+
/**
37+
* 좋아요 수 감소
38+
*
39+
* @param productId 상품 ID
40+
*/
41+
@Transactional
42+
public void decrementLikeCount(Long productId) {
43+
ProductMetrics metrics = getOrCreate(productId);
44+
metrics.decrementLikeCount();
45+
productMetricsRepository.save(metrics);
46+
47+
log.info("좋아요 수 감소 - productId: {}, likeCount: {}", productId, metrics.getLikeCount());
48+
}
49+
50+
/**
51+
* 조회 수 증가
52+
*
53+
* @param productId 상품 ID
54+
*/
55+
@Transactional
56+
public void incrementViewCount(Long productId) {
57+
ProductMetrics metrics = getOrCreate(productId);
58+
metrics.incrementViewCount();
59+
productMetricsRepository.save(metrics);
60+
61+
log.info("조회 수 증가 - productId: {}, viewCount: {}", productId, metrics.getViewCount());
62+
}
63+
64+
/**
65+
* 주문 수 및 판매 금액 증가
66+
*
67+
* @param productId 상품 ID
68+
* @param quantity 수량
69+
* @param amount 금액
70+
*/
71+
@Transactional
72+
public void incrementOrderCount(Long productId, int quantity, BigDecimal amount) {
73+
ProductMetrics metrics = getOrCreate(productId);
74+
metrics.incrementOrderCount(quantity, amount);
75+
productMetricsRepository.save(metrics);
76+
77+
log.info("주문 수 증가 - productId: {}, orderCount: {}, salesAmount: {}",
78+
productId, metrics.getOrderCount(), metrics.getSalesAmount());
79+
}
80+
81+
/**
82+
* ProductMetrics 조회 또는 생성
83+
*/
84+
private ProductMetrics getOrCreate(Long productId) {
85+
return productMetricsRepository.findByProductId(productId)
86+
.orElseGet(() -> ProductMetrics.builder()
87+
.productId(productId)
88+
.build());
89+
}
90+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package com.loopers.domain.dlq;
2+
3+
import com.loopers.domain.BaseEntity;
4+
import jakarta.persistence.Column;
5+
import jakarta.persistence.Entity;
6+
import jakarta.persistence.Index;
7+
import jakarta.persistence.Table;
8+
import lombok.AccessLevel;
9+
import lombok.Builder;
10+
import lombok.Getter;
11+
import lombok.NoArgsConstructor;
12+
13+
/**
14+
* Dead Letter Queue (DLQ)
15+
* - 처리 실패한 메시지를 저장
16+
* - 수동 재처리 또는 분석을 위한 테이블
17+
*/
18+
@Getter
19+
@Entity
20+
@Table(
21+
name = "dead_letter_queue",
22+
indexes = {
23+
@Index(name = "idx_failed_at", columnList = "failed_at"),
24+
@Index(name = "idx_event_id", columnList = "event_id"),
25+
@Index(name = "idx_topic", columnList = "original_topic")
26+
}
27+
)
28+
@NoArgsConstructor(access = AccessLevel.PROTECTED)
29+
public class DeadLetterQueue extends BaseEntity {
30+
31+
@Column(name = "original_topic", nullable = false, length = 100)
32+
private String originalTopic;
33+
34+
@Column(name = "partition_key", length = 100)
35+
private String partitionKey;
36+
37+
@Column(name = "event_id", length = 50)
38+
private String eventId;
39+
40+
@Column(name = "payload", columnDefinition = "TEXT", nullable = false)
41+
private String payload;
42+
43+
@Column(name = "error_message", columnDefinition = "TEXT")
44+
private String errorMessage;
45+
46+
@Column(name = "retry_count", nullable = false)
47+
private Integer retryCount = 0;
48+
49+
@Column(name = "failed_at", nullable = false)
50+
private java.time.ZonedDateTime failedAt;
51+
52+
@Builder
53+
private DeadLetterQueue(String originalTopic, String partitionKey, String eventId,
54+
String payload, String errorMessage, Integer retryCount,
55+
java.time.ZonedDateTime failedAt) {
56+
this.originalTopic = originalTopic;
57+
this.partitionKey = partitionKey;
58+
this.eventId = eventId;
59+
this.payload = payload;
60+
this.errorMessage = errorMessage;
61+
this.retryCount = retryCount != null ? retryCount : 0;
62+
this.failedAt = failedAt != null ? failedAt : java.time.ZonedDateTime.now();
63+
}
64+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package com.loopers.domain.dlq;
2+
3+
/**
4+
* DeadLetterQueue Repository
5+
*/
6+
public interface DeadLetterQueueRepository {
7+
8+
/**
9+
* DLQ 저장
10+
*/
11+
DeadLetterQueue save(DeadLetterQueue deadLetterQueue);
12+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package com.loopers.domain.inbox;
2+
3+
import com.loopers.domain.BaseEntity;
4+
import jakarta.persistence.Column;
5+
import jakarta.persistence.Entity;
6+
import jakarta.persistence.Index;
7+
import jakarta.persistence.Table;
8+
import lombok.AccessLevel;
9+
import lombok.Builder;
10+
import lombok.Getter;
11+
import lombok.NoArgsConstructor;
12+
13+
/**
14+
* Event Inbox 패턴 구현
15+
* - Consumer의 멱등성 보장을 위한 이벤트 수신 기록
16+
* - eventId를 Unique Key로 사용하여 중복 처리 방지
17+
*/
18+
@Getter
19+
@Entity
20+
@Table(
21+
name = "event_inbox",
22+
indexes = {
23+
@Index(name = "idx_event_id", columnList = "event_id", unique = true),
24+
@Index(name = "idx_aggregate", columnList = "aggregate_type, aggregate_id"),
25+
@Index(name = "idx_processed_at", columnList = "processed_at")
26+
}
27+
)
28+
@NoArgsConstructor(access = AccessLevel.PROTECTED)
29+
public class EventInbox extends BaseEntity {
30+
31+
@Column(name = "event_id", nullable = false, length = 50)
32+
private String eventId; // Outbox의 ID (멱등키)
33+
34+
@Column(name = "aggregate_type", nullable = false, length = 50)
35+
private String aggregateType; // ORDER, PRODUCT, LIKE, PAYMENT
36+
37+
@Column(name = "aggregate_id", nullable = false, length = 50)
38+
private String aggregateId; // orderId, productId 등
39+
40+
@Column(name = "event_type", nullable = false, length = 100)
41+
private String eventType; // OrderCreatedEvent, LikeCreatedEvent 등
42+
43+
@Column(name = "processed_at", nullable = false)
44+
private java.time.ZonedDateTime processedAt;
45+
46+
@Builder
47+
private EventInbox(String eventId, String aggregateType, String aggregateId,
48+
String eventType, java.time.ZonedDateTime processedAt) {
49+
this.eventId = eventId;
50+
this.aggregateType = aggregateType;
51+
this.aggregateId = aggregateId;
52+
this.eventType = eventType;
53+
this.processedAt = processedAt != null ? processedAt : java.time.ZonedDateTime.now();
54+
}
55+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package com.loopers.domain.inbox;
2+
3+
/**
4+
* EventInbox Repository
5+
*/
6+
public interface EventInboxRepository {
7+
8+
/**
9+
* eventId로 중복 체크
10+
*/
11+
boolean existsByEventId(String eventId);
12+
13+
/**
14+
* EventInbox 저장
15+
*/
16+
EventInbox save(EventInbox eventInbox);
17+
18+
/**
19+
* 모든 EventInbox 삭제 (테스트용)
20+
*/
21+
void deleteAll();
22+
}

0 commit comments

Comments
 (0)