Skip to content

Commit 93564c0

Browse files
committed
test: Kafka 설정 학습 테스트 추가
Embedded Kafka를 활용한 4가지 학습 테스트 구현: - Experiment1: Auto Commit vs Manual Commit 동작 비교 - Experiment2: auto.offset.reset (earliest/latest) 학습 - Experiment3: max.poll.interval.ms 타임아웃 체험 - Experiment4: max.poll.records 배치 크기 트레이드오프 특징: - @embeddedkafka 사용으로 Docker 불필요 - IntelliJ에서 바로 실행 가능 - 각 테스트는 독립적인 토픽 사용
1 parent c14f898 commit 93564c0

5 files changed

Lines changed: 1278 additions & 0 deletions

File tree

Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
package com.loopers.kafka.learning;
2+
3+
import java.time.Duration;
4+
import java.util.Collections;
5+
import java.util.Properties;
6+
import org.apache.kafka.clients.consumer.ConsumerConfig;
7+
import org.apache.kafka.clients.consumer.ConsumerRecord;
8+
import org.apache.kafka.clients.consumer.ConsumerRecords;
9+
import org.apache.kafka.clients.consumer.KafkaConsumer;
10+
import org.apache.kafka.clients.producer.KafkaProducer;
11+
import org.apache.kafka.clients.producer.ProducerConfig;
12+
import org.apache.kafka.clients.producer.ProducerRecord;
13+
import org.apache.kafka.common.serialization.StringDeserializer;
14+
import org.apache.kafka.common.serialization.StringSerializer;
15+
import org.junit.jupiter.api.DisplayName;
16+
import org.junit.jupiter.api.Test;
17+
import org.slf4j.Logger;
18+
import org.slf4j.LoggerFactory;
19+
import org.springframework.kafka.test.context.EmbeddedKafka;
20+
21+
/**
22+
* 학습 테스트 1: Auto Commit vs Manual Commit
23+
*
24+
* 학습 목표:
25+
* - Auto Commit의 동작 방식과 메시지 유실 가능성 이해
26+
* - Manual Commit의 정확한 제어 방식 이해
27+
* - 커밋 타이밍이 메시지 처리에 미치는 영향 체험
28+
*
29+
* 참고: @EmbeddedKafka를 사용하여 테스트 실행 시 자동으로 Kafka가 시작됩니다.
30+
*/
31+
@DisplayName("학습 1: Auto Commit vs Manual Commit")
32+
@EmbeddedKafka(
33+
partitions = 1,
34+
topics = {"learning-auto-commit"},
35+
brokerProperties = {
36+
"listeners=PLAINTEXT://localhost:9092",
37+
"port=9092"
38+
}
39+
)
40+
public class Experiment1_AutoCommitTest {
41+
42+
private static final Logger log = LoggerFactory.getLogger(Experiment1_AutoCommitTest.class);
43+
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
44+
private static final String TOPIC = "learning-auto-commit";
45+
46+
@Test
47+
@DisplayName("Auto Commit: 처리 중 실패 시 메시지 유실")
48+
void autoCommit_MessageLoss() throws Exception {
49+
log.info("\n");
50+
log.info("========================================");
51+
log.info("실험 시작: Auto Commit 메시지 유실 시나리오");
52+
log.info("========================================");
53+
54+
// 1. 메시지 전송
55+
log.info("\n=== 1단계: Producer가 메시지 10개 전송 ===");
56+
produceMessages(10);
57+
Thread.sleep(1000);
58+
59+
// 2. Auto Commit Consumer
60+
log.info("\n=== 2단계: Auto Commit Consumer 시작 ===");
61+
log.info("설정: enable.auto.commit=true, auto.commit.interval=3초");
62+
63+
Properties props = createConsumerProps("auto-commit-group");
64+
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
65+
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "3000");
66+
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
67+
68+
try {
69+
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
70+
consumer.subscribe(Collections.singletonList(TOPIC));
71+
72+
log.info("\n=== poll() 호출 ===");
73+
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
74+
log.info("읽은 메시지 개수: {}", records.count());
75+
76+
int count = 0;
77+
for (ConsumerRecord<String, String> record : records) {
78+
count++;
79+
log.info("처리 중: offset={}, value={}", record.offset(), record.value());
80+
81+
if (count == 5) {
82+
log.error("\n!!! 5번째 메시지에서 에러 발생 !!!");
83+
log.error("처리 완료: 0~4 (5개)");
84+
log.error("처리 실패: 5~9 (5개)");
85+
Thread.sleep(4000); // 4초 대기 (auto commit interval 초과)
86+
throw new RuntimeException("Processing failed at message 5");
87+
}
88+
}
89+
}
90+
} catch (Exception e) {
91+
log.error("\n=== Consumer 종료 (에러로 인한 종료) ===");
92+
}
93+
94+
// 3. 재시작
95+
log.info("\n=== 3단계: Consumer 재시작 ===");
96+
Thread.sleep(2000);
97+
98+
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
99+
consumer.subscribe(Collections.singletonList(TOPIC));
100+
101+
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
102+
log.info("재시작 후 읽은 메시지 개수: {}", records.count());
103+
104+
if (records.isEmpty()) {
105+
log.error("\n========================================");
106+
log.error("결과: 메시지 유실 발생!");
107+
log.error("========================================");
108+
log.error("- 5~9번 메시지는 처리 안 됐지만 커밋됨");
109+
log.error("- 재시작 시 이미 커밋된 오프셋부터 읽음");
110+
log.error("- 결과: 5~9번 메시지 영구 유실");
111+
log.error("\n학습: Auto Commit은 처리 성공 여부와 무관하게");
112+
log.error(" 시간 기반으로 자동 커밋됨!");
113+
} else {
114+
records.forEach(record -> {
115+
log.info("재시작 후 읽음: offset={}, value={}", record.offset(), record.value());
116+
});
117+
}
118+
}
119+
}
120+
121+
@Test
122+
@DisplayName("Manual Commit: 처리 성공 시만 커밋")
123+
void manualCommit_NoMessageLoss() throws Exception {
124+
log.info("\n");
125+
log.info("========================================");
126+
log.info("실험 시작: Manual Commit 안전한 처리");
127+
log.info("========================================");
128+
129+
// 1. 메시지 전송
130+
log.info("\n=== 1단계: Producer가 메시지 10개 전송 ===");
131+
produceMessages(10);
132+
Thread.sleep(1000);
133+
134+
// 2. Manual Commit Consumer
135+
log.info("\n=== 2단계: Manual Commit Consumer 시작 ===");
136+
log.info("설정: enable.auto.commit=false");
137+
138+
Properties props = createConsumerProps("manual-commit-group");
139+
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
140+
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
141+
142+
try {
143+
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
144+
consumer.subscribe(Collections.singletonList(TOPIC));
145+
146+
log.info("\n=== poll() 호출 ===");
147+
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
148+
log.info("읽은 메시지 개수: {}", records.count());
149+
150+
int count = 0;
151+
for (ConsumerRecord<String, String> record : records) {
152+
count++;
153+
log.info("처리 중: offset={}, value={}", record.offset(), record.value());
154+
155+
if (count == 5) {
156+
log.error("\n!!! 5번째 메시지에서 에러 발생 !!!");
157+
log.error("처리 완료: 0~4 (5개)");
158+
log.error("처리 실패: 5~9 (5개)");
159+
log.error("커밋하지 않고 종료");
160+
throw new RuntimeException("Processing failed at message 5");
161+
}
162+
}
163+
164+
consumer.commitSync();
165+
log.info("커밋 완료");
166+
}
167+
} catch (Exception e) {
168+
log.error("\n=== Consumer 종료 (커밋 안 됨!) ===");
169+
}
170+
171+
// 3. 재시작
172+
log.info("\n=== 3단계: Consumer 재시작 ===");
173+
Thread.sleep(2000);
174+
175+
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
176+
consumer.subscribe(Collections.singletonList(TOPIC));
177+
178+
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
179+
log.info("재시작 후 읽은 메시지 개수: {}", records.count());
180+
181+
log.info("\n========================================");
182+
log.info("결과: 메시지 유실 없음!");
183+
log.info("========================================");
184+
log.info("- 커밋하지 않았으므로 처음부터 다시 읽음");
185+
log.info("- 0~9번 메시지 모두 재처리 가능");
186+
log.info("- 결과: At Least Once 보장");
187+
log.info("\n학습: Manual Commit은 명시적으로 호출해야 커밋");
188+
log.info(" 처리 실패 시 커밋 안 하면 재처리 가능!");
189+
log.info(" 단, 중복 처리 가능 (멱등성 필요)");
190+
191+
records.forEach(record -> {
192+
log.info("재시작 후 읽음: offset={}, value={}", record.offset(), record.value());
193+
});
194+
}
195+
}
196+
197+
// Helper methods
198+
private void produceMessages(int count) {
199+
Properties props = new Properties();
200+
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
201+
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
202+
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
203+
204+
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
205+
for (int i = 0; i < count; i++) {
206+
ProducerRecord<String, String> record = new ProducerRecord<>(
207+
TOPIC,
208+
"key-" + i,
209+
"Message " + i
210+
);
211+
producer.send(record);
212+
}
213+
producer.flush();
214+
log.info("메시지 {}개 전송 완료", count);
215+
}
216+
}
217+
218+
private Properties createConsumerProps(String groupId) {
219+
Properties props = new Properties();
220+
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
221+
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
222+
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
223+
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
224+
return props;
225+
}
226+
}

0 commit comments

Comments
 (0)