Skip to content

Commit e7bf613

Browse files
authored
Merge pull request #196 from rnqhstmd/round8
[volume - 8] Decoupling with Kafka
2 parents 1cda434 + 5515b9b commit e7bf613

68 files changed

Lines changed: 2697 additions & 39 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

apps/commerce-api/build.gradle.kts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ dependencies {
22
// add-ons
33
implementation(project(":modules:jpa"))
44
implementation(project(":modules:redis"))
5+
implementation(project(":modules:kafka"))
56
implementation(project(":supports:jackson"))
67
implementation(project(":supports:logging"))
78
implementation(project(":supports:monitoring"))
@@ -22,6 +23,10 @@ dependencies {
2223

2324
testImplementation("net.datafaker:datafaker:2.0.2")
2425

26+
// Kafka
27+
testImplementation(testFixtures(project(":modules:kafka")))
28+
testImplementation("org.springframework.kafka:spring-kafka-test")
29+
2530
// Resilience4j
2631
implementation("io.github.resilience4j:resilience4j-spring-boot3")
2732
implementation("org.springframework.boot:spring-boot-starter-aop")

apps/commerce-api/src/main/java/com/loopers/application/order/OrderFacade.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,11 @@ public OrderInfo createOrder(OrderPlaceCommand command) {
9090
public void useCoupon(Long couponId) {
9191
log.info("쿠폰 사용 처리: couponId={}", couponId);
9292
Coupon coupon = couponService.getCouponWithOptimisticLock(couponId);
93+
if (!coupon.canUse()) {
94+
log.info("이미 사용된 쿠폰입니다 (멱등성 처리): couponId={}", couponId);
95+
return;
96+
}
97+
9398
coupon.use();
9499
couponService.save(coupon);
95100
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
package com.loopers.domain.outbox;
2+
3+
import jakarta.persistence.*;
4+
import lombok.AccessLevel;
5+
import lombok.Getter;
6+
import lombok.NoArgsConstructor;
7+
8+
import java.time.LocalDateTime;
9+
import java.util.UUID;
10+
11+
@Entity
12+
@Table(name = "outbox_event")
13+
@Getter
14+
@NoArgsConstructor(access = AccessLevel.PROTECTED)
15+
public class OutboxEvent {
16+
17+
@Id
18+
private String id;
19+
20+
@Column(name = "aggregate_type", nullable = false)
21+
private String aggregateType;
22+
23+
@Column(name = "aggregate_id", nullable = false)
24+
private String aggregateId;
25+
26+
@Column(name = "event_type", nullable = false)
27+
private String eventType;
28+
29+
@Column(name = "topic", nullable = false)
30+
private String topic;
31+
32+
@Column(name = "partition_key")
33+
private String partitionKey;
34+
35+
@Column(name = "payload", columnDefinition = "TEXT", nullable = false)
36+
private String payload;
37+
38+
@Column(name = "status", nullable = false)
39+
@Enumerated(EnumType.STRING)
40+
private OutboxStatus status;
41+
42+
@Column(name = "created_at", nullable = false)
43+
private LocalDateTime createdAt;
44+
45+
@Column(name = "processed_at")
46+
private LocalDateTime processedAt;
47+
48+
@Column(name = "retry_count")
49+
private int retryCount;
50+
51+
@Column(name = "last_error")
52+
private String lastError;
53+
54+
public static OutboxEvent create(
55+
String aggregateType,
56+
String aggregateId,
57+
String eventType,
58+
String topic,
59+
String partitionKey,
60+
String payload
61+
) {
62+
OutboxEvent event = new OutboxEvent();
63+
event.id = UUID.randomUUID().toString();
64+
event.aggregateType = aggregateType;
65+
event.aggregateId = aggregateId;
66+
event.eventType = eventType;
67+
event.topic = topic;
68+
event.partitionKey = partitionKey;
69+
event.payload = payload;
70+
event.status = OutboxStatus.PENDING;
71+
event.createdAt = LocalDateTime.now();
72+
event.retryCount = 0;
73+
return event;
74+
}
75+
76+
public void markAsProcessed() {
77+
this.status = OutboxStatus.PROCESSED;
78+
this.processedAt = LocalDateTime.now();
79+
}
80+
81+
public void markAsFailed(String error) {
82+
this.status = OutboxStatus.FAILED;
83+
this.lastError = error;
84+
this.retryCount++;
85+
}
86+
87+
public void markForRetry() {
88+
this.status = OutboxStatus.PENDING;
89+
}
90+
91+
public enum OutboxStatus {
92+
PENDING,
93+
PROCESSED,
94+
FAILED
95+
}
96+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package com.loopers.domain.outbox;
2+
3+
import java.util.List;
4+
5+
public interface OutboxEventRepository {
6+
OutboxEvent save(OutboxEvent event);
7+
List<OutboxEvent> findPendingEvents(int limit);
8+
List<OutboxEvent> findFailedEventsForRetry(int maxRetryCount, int limit);
9+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package com.loopers.domain.outbox;
2+
3+
import com.fasterxml.jackson.core.JsonProcessingException;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import lombok.RequiredArgsConstructor;
6+
import lombok.extern.slf4j.Slf4j;
7+
import org.springframework.stereotype.Service;
8+
import org.springframework.transaction.annotation.Transactional;
9+
10+
@Slf4j
11+
@Service
12+
@RequiredArgsConstructor
13+
public class OutboxService {
14+
15+
private final OutboxEventRepository outboxEventRepository;
16+
private final ObjectMapper objectMapper;
17+
18+
@Transactional
19+
public void saveEvent(
20+
String aggregateType,
21+
String aggregateId,
22+
String eventType,
23+
String topic,
24+
String partitionKey,
25+
Object payload
26+
) {
27+
try {
28+
String payloadJson = objectMapper.writeValueAsString(payload);
29+
OutboxEvent event = OutboxEvent.create(
30+
aggregateType,
31+
aggregateId,
32+
eventType,
33+
topic,
34+
partitionKey,
35+
payloadJson
36+
);
37+
outboxEventRepository.save(event);
38+
log.debug("Outbox 이벤트 저장: aggregateType={}, aggregateId={}, eventType={}",
39+
aggregateType, aggregateId, eventType);
40+
} catch (JsonProcessingException e) {
41+
log.error("Outbox 이벤트 직렬화 실패: aggregateType={}, aggregateId={}",
42+
aggregateType, aggregateId, e);
43+
throw new RuntimeException("이벤트 직렬화 실패", e);
44+
}
45+
}
46+
}

apps/commerce-api/src/main/java/com/loopers/domain/payment/event/PaymentSucceededEvent.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ public record PaymentSucceededEvent(
1212
Long couponId,
1313
Long amount,
1414
PaymentMethod paymentMethod,
15+
String transactionId,
1516
ZonedDateTime paidAt
1617
) {
1718
public static PaymentSucceededEvent of(Payment payment, Long couponId, ZonedDateTime paidAt) {
@@ -22,6 +23,7 @@ public static PaymentSucceededEvent of(Payment payment, Long couponId, ZonedDate
2223
couponId,
2324
payment.getAmountValue(),
2425
payment.getPaymentMethod(),
26+
payment.getTransactionId(),
2527
paidAt != null ? paidAt : ZonedDateTime.now()
2628
);
2729
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package com.loopers.infrastructure.kafka.dto;
2+
3+
import java.util.UUID;
4+
5+
public record LikeChangedDto(
6+
String eventId,
7+
Long productId,
8+
String likeType
9+
) {
10+
public static LikeChangedDto of(Long productId, String likeType) {
11+
return new LikeChangedDto(
12+
UUID.randomUUID().toString(),
13+
productId,
14+
likeType
15+
);
16+
}
17+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package com.loopers.infrastructure.kafka.dto;
2+
3+
import java.time.LocalDateTime;
4+
import java.util.List;
5+
import java.util.UUID;
6+
7+
public record OrderEventDto(
8+
String eventId,
9+
Long orderId,
10+
Long userId,
11+
String orderStatus,
12+
Long totalAmount,
13+
Long discountAmount,
14+
List<OrderItemDto> items,
15+
LocalDateTime occurredAt
16+
) {
17+
public static OrderEventDto created(
18+
Long orderId,
19+
Long userId,
20+
Long totalAmount,
21+
Long discountAmount,
22+
List<OrderItemDto> items
23+
) {
24+
return new OrderEventDto(
25+
UUID.randomUUID().toString(),
26+
orderId,
27+
userId,
28+
"CREATED",
29+
totalAmount,
30+
discountAmount,
31+
items,
32+
LocalDateTime.now()
33+
);
34+
}
35+
36+
public static OrderEventDto completed(Long orderId, Long userId) {
37+
return new OrderEventDto(
38+
UUID.randomUUID().toString(),
39+
orderId,
40+
userId,
41+
"COMPLETED",
42+
null,
43+
null,
44+
null,
45+
LocalDateTime.now()
46+
);
47+
}
48+
49+
public static OrderEventDto failed(Long orderId, Long userId) {
50+
return new OrderEventDto(
51+
UUID.randomUUID().toString(),
52+
orderId,
53+
userId,
54+
"FAILED",
55+
null,
56+
null,
57+
null,
58+
LocalDateTime.now()
59+
);
60+
}
61+
62+
public record OrderItemDto(
63+
Long productId,
64+
int quantity,
65+
Long unitPrice
66+
) {}
67+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package com.loopers.infrastructure.kafka.dto;
2+
3+
import java.time.LocalDateTime;
4+
import java.util.UUID;
5+
6+
public record PaymentEventDto(
7+
String eventId,
8+
Long orderId,
9+
Long userId,
10+
String paymentStatus,
11+
String transactionId,
12+
Long amount,
13+
String failureReason,
14+
LocalDateTime occurredAt
15+
) {
16+
public static PaymentEventDto success(Long orderId, Long userId, String transactionId, Long amount) {
17+
return new PaymentEventDto(
18+
UUID.randomUUID().toString(),
19+
orderId,
20+
userId,
21+
"SUCCESS",
22+
transactionId,
23+
amount,
24+
null,
25+
LocalDateTime.now()
26+
);
27+
}
28+
29+
public static PaymentEventDto failed(Long orderId, Long userId, String failureReason) {
30+
return new PaymentEventDto(
31+
UUID.randomUUID().toString(),
32+
orderId,
33+
userId,
34+
"FAILED",
35+
null,
36+
null,
37+
failureReason,
38+
LocalDateTime.now()
39+
);
40+
}
41+
42+
public static PaymentEventDto pending(Long orderId, Long userId, String transactionId) {
43+
return new PaymentEventDto(
44+
UUID.randomUUID().toString(),
45+
orderId,
46+
userId,
47+
"PENDING",
48+
transactionId,
49+
null,
50+
null,
51+
LocalDateTime.now()
52+
);
53+
}
54+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package com.loopers.infrastructure.kafka.dto;
2+
3+
import java.util.UUID;
4+
5+
public record ProductViewedDto(
6+
String eventId,
7+
Long productId
8+
) {
9+
public static ProductViewedDto of(Long productId) {
10+
return new ProductViewedDto(
11+
UUID.randomUUID().toString(),
12+
productId
13+
);
14+
}
15+
}

0 commit comments

Comments
 (0)