1111import lombok .RequiredArgsConstructor ;
1212import lombok .extern .slf4j .Slf4j ;
1313import org .apache .kafka .clients .consumer .ConsumerRecord ;
14+ import org .springframework .beans .factory .annotation .Value ;
1415import org .springframework .kafka .annotation .KafkaListener ;
1516import org .springframework .kafka .support .Acknowledgment ;
1617import org .springframework .stereotype .Component ;
1718
1819import java .util .List ;
20+ import java .util .Set ;
1921
2022@ Slf4j
2123@ Component
@@ -25,6 +27,15 @@ public class ProductMetricsConsumer {
2527 private final ProductMetricsFacade productMetricsFacade ;
2628 private final ObjectMapper objectMapper ;
2729
30+ @ Value ("${kafka.topic.product-like-name}" )
31+ private String productLikeTopic ;
32+
33+ @ Value ("${kafka.topic.product-stock-name}" )
34+ private String productStockTopic ;
35+
36+ @ Value ("${kafka.topic.product-view-name}" )
37+ private String productViewTopic ;
38+
2839 @ KafkaListener (
2940 topics = {"${kafka.topic.product-like-name}" , "${kafka.topic.product-stock-name}" , "${kafka.topic.product-view-name}" },
3041 groupId = "${kafka.consumer.product-metrics-group}" ,
@@ -58,6 +69,13 @@ public void listen(
5869 }
5970
6071 private void processPayload (String topic , String payload ) throws JsonProcessingException {
72+ Set <String > allowedTopics = Set .of (productLikeTopic , productStockTopic , productViewTopic );
73+
74+ if (!allowedTopics .contains (topic )) {
75+ log .warn ("허용되지 않은 토픽: {}" , topic );
76+ return ;
77+ }
78+
6179 if (topic .contains ("product-like" )) {
6280 ProductLikePayload likePayload = objectMapper .readValue (payload , ProductLikePayload .class );
6381 ProductMetricsCommand likeCommand = ProductMetricsCommand .from (likePayload );
0 commit comments