Skip to content

Commit c680697

Browse files
committed
feat: 이벤트 발행 로직 작성
1분에 한 번씩 outbox 테이블 내 row 중 status가 PENDING인 데이터 최대 2개 조회해서 메시지로 발행
1 parent 406fccf commit c680697

4 files changed

Lines changed: 32 additions & 13 deletions

File tree

apps/commerce-api/src/main/java/com/loopers/domain/outbox/OutboxType.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
public enum OutboxType {
44
PRODUCT_VIEWED,
55
PRODUCT_LIKED,
6+
PRODUCT_SALES,
67
ORDER_CREATED,
78
PAYMENT_COMPLETED
89
}

apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/KafkaOutboxPublisher.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.loopers.domain.outbox.OutboxEvent;
44
import com.loopers.domain.outbox.OutboxRepository;
5+
import com.loopers.domain.outbox.OutboxType;
56
import lombok.RequiredArgsConstructor;
67
import org.springframework.kafka.core.KafkaTemplate;
78
import org.springframework.scheduling.annotation.Scheduled;
@@ -17,13 +18,21 @@ public class KafkaOutboxPublisher {
1718
private final OutboxRepository outboxRepository;
1819
private final KafkaTemplate<Object, Object> kafkaTemplate;
1920

20-
@Scheduled(fixedDelayString = "1000")
21+
@Scheduled(fixedDelayString = "60000")
2122
@Transactional
22-
public void publish() {
23-
List<OutboxEvent> events = outboxRepository.findPending(5);
23+
public void publishProductViewed() {
24+
List<OutboxEvent> events = outboxRepository.findPending(2);
2425

2526
for (OutboxEvent event : events) {
26-
kafkaTemplate.send("product-viewed", String.valueOf(event.getAggregateId()), event);
27+
OutboxType type = event.getEventType();
28+
29+
if (type.equals(OutboxType.PRODUCT_VIEWED)) {
30+
kafkaTemplate.send("product-viewed", String.valueOf(event.getAggregateId()), event);
31+
}
32+
33+
else if (type.equals(OutboxType.PRODUCT_LIKED)) {
34+
kafkaTemplate.send("product-liked", String.valueOf(event.getAggregateId()), event);
35+
}
2736

2837
event.markAsProcessed();
2938
}

apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/OutboxJpaRepository.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,8 @@
22

33
import com.loopers.domain.outbox.OutboxEvent;
44
import org.springframework.data.jpa.repository.JpaRepository;
5-
import org.springframework.data.jpa.repository.Modifying;
6-
import org.springframework.data.jpa.repository.Query;
7-
import org.springframework.data.repository.query.Param;
85

9-
import java.util.List;
106

117
public interface OutboxJpaRepository extends JpaRepository<OutboxEvent, Long> {
12-
@Query("SELECT o FROM OutboxEvent o WHERE o.status = 'PENDING' ORDER BY o.createdAt ASC")
13-
List<OutboxEvent> findPending(int limit);
148

159
}

apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/OutboxRepositoryImpl.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,25 @@
22

33
import com.loopers.domain.outbox.OutboxEvent;
44
import com.loopers.domain.outbox.OutboxRepository;
5-
import lombok.RequiredArgsConstructor;
5+
import com.loopers.domain.outbox.OutboxStatus;
6+
import com.loopers.domain.outbox.QOutboxEvent;
7+
import com.querydsl.jpa.impl.JPAQueryFactory;
8+
import jakarta.persistence.EntityManager;
69
import org.springframework.stereotype.Component;
710

811
import java.util.List;
912

13+
import static com.loopers.domain.outbox.QOutboxEvent.outboxEvent;
14+
1015
@Component
11-
@RequiredArgsConstructor
1216
public class OutboxRepositoryImpl implements OutboxRepository {
1317
private final OutboxJpaRepository outBoxJpaRepository;
18+
private final JPAQueryFactory queryFactory;
19+
20+
public OutboxRepositoryImpl(OutboxJpaRepository outBoxJpaRepository, EntityManager entityManager) {
21+
this.outBoxJpaRepository = outBoxJpaRepository;
22+
this.queryFactory = new JPAQueryFactory(entityManager);
23+
}
1424

1525
@Override
1626
public OutboxEvent save(OutboxEvent outBoxEvent) {
@@ -19,7 +29,12 @@ public OutboxEvent save(OutboxEvent outBoxEvent) {
1929

2030
@Override
2131
public List<OutboxEvent> findPending(int limit) {
22-
return outBoxJpaRepository.findPending(limit);
32+
return queryFactory
33+
.selectFrom(outboxEvent)
34+
.where(outboxEvent.status.eq(OutboxStatus.PENDING))
35+
.orderBy(outboxEvent.createdAt.asc())
36+
.limit(limit)
37+
.fetch();
2338
}
2439

2540
}

0 commit comments

Comments
 (0)