Skip to content

Commit 0c79c4f

Browse files
committed
feat: Transactional Outbox 패턴 적용
1 parent 008e8a8 commit 0c79c4f

9 files changed

Lines changed: 163 additions & 11 deletions

File tree

apps/commerce-api/src/main/java/com/loopers/application/product/ProductFacade.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,31 @@
11
package com.loopers.application.product;
22

3-
import com.loopers.domain.actionlog.ActionType;
43
import com.loopers.domain.brand.BrandRepository;
4+
import com.loopers.domain.outbox.AggregateType;
5+
import com.loopers.domain.outbox.OutboxEvent;
6+
import com.loopers.domain.outbox.OutboxType;
7+
import com.loopers.domain.outbox.OutboxRepository;
58
import com.loopers.domain.product.Product;
69
import com.loopers.domain.product.ProductRepository;
7-
import com.loopers.domain.product.ProductViewedMessage;
810
import com.loopers.infrastructure.product.ProductViewKafkaProducer;
911
import com.loopers.interfaces.api.product.ProductV1Dto;
1012
import com.loopers.support.error.CoreException;
1113
import com.loopers.support.error.ErrorType;
1214
import lombok.RequiredArgsConstructor;
1315
import org.springframework.cache.annotation.CacheEvict;
1416
import org.springframework.cache.annotation.Cacheable;
15-
import org.springframework.context.ApplicationEventPublisher;
1617
import org.springframework.stereotype.Component;
1718
import org.springframework.transaction.annotation.Transactional;
1819

1920
import java.math.BigDecimal;
2021
import java.util.List;
21-
import java.util.UUID;
2222

2323
@Component
2424
@RequiredArgsConstructor
2525
public class ProductFacade {
2626
private final ProductRepository productRepository;
2727
private final BrandRepository brandRepository;
28+
private final OutboxRepository outBoxRepository;
2829
private final ProductViewKafkaProducer kafkaProducer;
2930

3031
@Transactional
@@ -51,21 +52,20 @@ public List<ProductInfo> findAllProducts() {
5152
.toList();
5253
}
5354

54-
@Transactional(readOnly = true)
55+
@Transactional
5556
@Cacheable(value = "product", key = "#id")
5657
public ProductInfo findProductById(Long id) {
5758
Product product = productRepository.findById(id).orElseThrow(
5859
() -> new CoreException(ErrorType.NOT_FOUND, "찾고자 하는 상품이 존재하지 않습니다.")
5960
);
6061

61-
// 유저 ID는 임시로 하드 코딩했습니다. 추후 인증/인가 기능이 추가되면 수정할 예정입니다.
62-
ProductViewedMessage message = new ProductViewedMessage(
63-
UUID.randomUUID().toString(),
64-
1L,
65-
product.getId()
62+
OutboxEvent outBoxEvent = OutboxEvent.of(
63+
AggregateType.PRODUCT,
64+
product.getId(),
65+
OutboxType.PRODUCT_VIEWED
6666
);
6767

68-
kafkaProducer.sendProductViewedMessage(message);
68+
outBoxRepository.save(outBoxEvent);
6969

7070
return ProductInfo.from(product);
7171
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package com.loopers.domain.outbox;
2+
3+
public enum AggregateType {
4+
PRODUCT,
5+
ORDER,
6+
USER
7+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package com.loopers.domain.outbox;
2+
3+
import com.loopers.domain.BaseEntity;
4+
import jakarta.persistence.Column;
5+
import jakarta.persistence.Entity;
6+
import jakarta.persistence.EnumType;
7+
import jakarta.persistence.Enumerated;
8+
import jakarta.persistence.Table;
9+
import lombok.AccessLevel;
10+
import lombok.NoArgsConstructor;
11+
12+
@Entity
13+
@Table(name = "outbox")
14+
@NoArgsConstructor(access = AccessLevel.PROTECTED)
15+
public class OutboxEvent extends BaseEntity {
16+
@Enumerated(EnumType.STRING)
17+
@Column(name = "aggregate_type", nullable = false)
18+
private AggregateType aggregateType;
19+
20+
@Column(name = "aggregate_id", nullable = false)
21+
private Long aggregateId;
22+
23+
// @Enumerated(EnumType.STRING)
24+
@Column(name = "event_type", nullable = false)
25+
private OutboxType eventType;
26+
27+
@Enumerated(EnumType.STRING)
28+
@Column(name = "status", nullable = false)
29+
private OutboxStatus status;
30+
31+
public OutboxEvent(AggregateType aggregateType, Long aggregateId, OutboxType eventType, OutboxStatus status) {
32+
this.aggregateType = aggregateType;
33+
this.aggregateId = aggregateId;
34+
this.eventType = eventType;
35+
this.status = status;
36+
}
37+
38+
public static OutboxEvent of(AggregateType aggregateType, Long aggregateId, OutboxType eventType) {
39+
return new OutboxEvent(aggregateType, aggregateId, eventType, OutboxStatus.PENDING);
40+
}
41+
42+
public void markAsProcessed() {
43+
this.status = OutboxStatus.PROCESSED;
44+
}
45+
46+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package com.loopers.domain.outbox;
2+
3+
import java.util.List;
4+
5+
public interface OutboxRepository {
6+
OutboxEvent save(OutboxEvent outBoxEvent);
7+
8+
List<OutboxEvent> findPending(int limit);
9+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package com.loopers.domain.outbox;
2+
3+
public enum OutboxStatus {
4+
PENDING,
5+
PROCESSED,
6+
FAILED
7+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package com.loopers.domain.outbox;
2+
3+
public enum OutboxType {
4+
PRODUCT_VIEWED,
5+
ORDER_CREATED,
6+
PAYMENT_COMPLETED
7+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package com.loopers.infrastructure.outbox;
2+
3+
import com.loopers.domain.outbox.OutboxEvent;
4+
import com.loopers.domain.outbox.OutboxRepository;
5+
import lombok.RequiredArgsConstructor;
6+
import org.springframework.kafka.core.KafkaTemplate;
7+
import org.springframework.scheduling.annotation.Scheduled;
8+
import org.springframework.stereotype.Component;
9+
import org.springframework.transaction.annotation.Transactional;
10+
11+
import java.util.List;
12+
13+
@Component
14+
@RequiredArgsConstructor
15+
public class KafkaOutboxPublisher {
16+
17+
private final OutboxRepository outboxRepository;
18+
private final KafkaTemplate<Object, Object> kafkaTemplate;
19+
20+
@Scheduled(fixedDelayString = "1000")
21+
@Transactional
22+
public void publish() {
23+
// System.out.println("hello");
24+
List<OutboxEvent> events = outboxRepository.findPending(5);
25+
for (OutboxEvent event : events) {
26+
System.out.println("event.toString() = " + event.toString());
27+
}
28+
29+
for (OutboxEvent event : events) {
30+
kafkaTemplate.send("product-viewed", event);
31+
32+
event.markAsProcessed();
33+
}
34+
35+
}
36+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package com.loopers.infrastructure.outbox;
2+
3+
import com.loopers.domain.outbox.OutboxEvent;
4+
import org.springframework.data.jpa.repository.JpaRepository;
5+
import org.springframework.data.jpa.repository.Modifying;
6+
import org.springframework.data.jpa.repository.Query;
7+
import org.springframework.data.repository.query.Param;
8+
9+
import java.util.List;
10+
11+
public interface OutboxJpaRepository extends JpaRepository<OutboxEvent, Long> {
12+
@Query("SELECT o FROM OutboxEvent o WHERE o.status = 'PENDING' ORDER BY o.createdAt ASC")
13+
List<OutboxEvent> findPending(int limit);
14+
15+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package com.loopers.infrastructure.outbox;
2+
3+
import com.loopers.domain.outbox.OutboxEvent;
4+
import com.loopers.domain.outbox.OutboxRepository;
5+
import lombok.RequiredArgsConstructor;
6+
import org.springframework.stereotype.Component;
7+
8+
import java.util.List;
9+
10+
@Component
11+
@RequiredArgsConstructor
12+
public class OutboxRepositoryImpl implements OutboxRepository {
13+
private final OutboxJpaRepository outBoxJpaRepository;
14+
15+
@Override
16+
public OutboxEvent save(OutboxEvent outBoxEvent) {
17+
return outBoxJpaRepository.save(outBoxEvent);
18+
}
19+
20+
@Override
21+
public List<OutboxEvent> findPending(int limit) {
22+
return outBoxJpaRepository.findPending(limit);
23+
}
24+
25+
}

0 commit comments

Comments
 (0)