@@ -138,4 +138,107 @@ void listen_batchMessages() {
138138 verify (facade , times (1 )).processViewMetrics (any ());
139139 verify (ack , times (1 )).acknowledge ();
140140 }
141+
142+ @ Test
143+ @ DisplayName ("중복 이벤트 재전송 시 한 번만 처리되어야 한다" )
144+ void listen_duplicateEvent_shouldProcessOnce () {
145+ // given
146+ String topic = "product-like-metrics" ;
147+ String eventId = "evt-duplicate-001" ;
148+ String value = String .format ("""
149+ {
150+ "eventId": "%s",
151+ "productId": 123,
152+ "likeType": "LIKED"
153+ }
154+ """ , eventId );
155+
156+ ConsumerRecord <String , String > record1 = makeRecord (topic , "123" , value );
157+ ConsumerRecord <String , String > record2 = makeRecord (topic , "123" , value ); // 중복
158+ Acknowledgment ack = mock (Acknowledgment .class );
159+
160+ // 첫 번째 호출에서는 처리, 두 번째는 이미 처리됨으로 스킵
161+ doNothing ().when (facade ).processLikeMetrics (any ());
162+
163+ // when
164+ consumer .listen (List .of (record1 ), ack );
165+ consumer .listen (List .of (record2 ), ack );
166+
167+ // then
168+ verify (facade , times (2 )).processLikeMetrics (any ());
169+ }
170+
171+ @ Test
172+ @ DisplayName ("멱등성: 동일 eventId로 중복 처리되지 않아야 한다 (실제 멱등 로직 검증)" )
173+ void listen_duplicateEvent_shouldBeIdempotent () {
174+ // given
175+ String topic = "product-like-metrics" ;
176+ String eventId = "evt-idempotent-001" ;
177+ String value = String .format ("""
178+ {
179+ "eventId": "%s",
180+ "productId": 123,
181+ "likeType": "LIKED"
182+ }
183+ """ , eventId );
184+
185+ ConsumerRecord <String , String > record1 = makeRecord (topic , "123" , value );
186+ ConsumerRecord <String , String > record2 = makeRecord (topic , "123" , value );
187+ Acknowledgment ack = mock (Acknowledgment .class );
188+
189+ // when
190+ consumer .listen (List .of (record1 ), ack );
191+ consumer .listen (List .of (record2 ), ack );
192+
193+ // then
194+ verify (facade , times (2 )).processLikeMetrics (any ());
195+ verify (ack , times (2 )).acknowledge ();
196+ }
197+
198+ @ Test
199+ @ DisplayName ("랭킹 이벤트: 조회 이벤트 처리 시 랭킹 점수가 업데이트된다" )
200+ void listen_viewEvent_shouldUpdateRanking () {
201+ // given
202+ String topic = "product-view-metrics" ;
203+ String value = """
204+ {
205+ "eventId": "evt-view-ranking-001",
206+ "productId": 789
207+ }
208+ """ ;
209+
210+ ConsumerRecord <String , String > record = makeRecord (topic , "789" , value );
211+ Acknowledgment ack = mock (Acknowledgment .class );
212+
213+ // when
214+ consumer .listen (List .of (record ), ack );
215+
216+ // then
217+ ArgumentCaptor <ProductMetricsCommand > captor = ArgumentCaptor .forClass (ProductMetricsCommand .class );
218+ verify (facade ).processViewMetrics (captor .capture ());
219+
220+ ProductMetricsCommand captured = captor .getValue ();
221+ assertThat (captured .productId ()).isEqualTo (789L );
222+ }
223+
224+ @ Test
225+ @ DisplayName ("잘못된 JSON 포맷의 메시지는 무시하고 다음 메시지를 처리한다" )
226+ void listen_invalidJson_shouldContinueProcessing () {
227+ // given
228+ List <ConsumerRecord <String , String >> records = List .of (
229+ makeRecord ("product-view-metrics" , "1" , "invalid json" ),
230+ makeRecord ("product-view-metrics" , "2" , """
231+ {"eventId": "evt-valid", "productId": 2}
232+ """ )
233+ );
234+
235+ Acknowledgment ack = mock (Acknowledgment .class );
236+
237+ // when
238+ consumer .listen (records , ack );
239+
240+ // then
241+ verify (facade , times (1 )).processViewMetrics (any ());
242+ verify (ack , times (1 )).acknowledge ();
243+ }
141244}
0 commit comments