Skip to content

Commit 4759cf9

Browse files
authored
카프카 모듈 추가 (#8)
* gradle settings sync * :apps:commerce-streamer 모듈 추가 * :modules:kafka 모듈 추가 * infra-compose.yml // 카프카 docker 추가
1 parent 17b0da2 commit 4759cf9

10 files changed

Lines changed: 311 additions & 3 deletions

File tree

README.md

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,14 @@ docker-compose -f ./docker/monitoring-compose.yml up
2626
```
2727
Root
2828
├── apps ( spring-applications )
29-
│ └── 📦 commerce-api
29+
│ ├── 📦 commerce-api
30+
│ └── 📦 commerce-streamer
3031
├── modules ( reusable-configurations )
3132
│ ├── 📦 jpa
32-
│ └── 📦 redis
33+
│ ├── 📦 redis
34+
│ └── 📦 kafka
3335
└── supports ( add-ons )
36+
├── 📦 jackson
3437
├── 📦 monitoring
3538
└── 📦 logging
3639
```
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
dependencies {
2+
// add-ons
3+
implementation(project(":modules:jpa"))
4+
implementation(project(":modules:redis"))
5+
implementation(project(":modules:kafka"))
6+
implementation(project(":supports:jackson"))
7+
implementation(project(":supports:logging"))
8+
implementation(project(":supports:monitoring"))
9+
10+
// web
11+
implementation("org.springframework.boot:spring-boot-starter-web")
12+
implementation("org.springframework.boot:spring-boot-starter-actuator")
13+
14+
// querydsl
15+
annotationProcessor("com.querydsl:querydsl-apt::jakarta")
16+
annotationProcessor("jakarta.persistence:jakarta.persistence-api")
17+
annotationProcessor("jakarta.annotation:jakarta.annotation-api")
18+
19+
// test-fixtures
20+
testImplementation(testFixtures(project(":modules:jpa")))
21+
testImplementation(testFixtures(project(":modules:redis")))
22+
testImplementation(testFixtures(project(":modules:kafka")))
23+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package com.loopers;
2+
3+
import jakarta.annotation.PostConstruct;
4+
import org.springframework.boot.SpringApplication;
5+
import org.springframework.boot.autoconfigure.SpringBootApplication;
6+
import org.springframework.boot.context.properties.ConfigurationPropertiesScan;
7+
8+
import java.util.TimeZone;
9+
10+
@ConfigurationPropertiesScan
11+
@SpringBootApplication
12+
public class CommerceStreamerApplication {
13+
@PostConstruct
14+
public void started() {
15+
// set timezone
16+
TimeZone.setDefault(TimeZone.getTimeZone("Asia/Seoul"));
17+
}
18+
19+
public static void main(String[] args) {
20+
SpringApplication.run(CommerceStreamerApplication.class, args);
21+
}
22+
}
23+
24+
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package com.loopers.interfaces.consumer;
2+
3+
import com.loopers.confg.kafka.KafkaConfig;
4+
import org.apache.kafka.clients.consumer.ConsumerRecord;
5+
import org.springframework.kafka.annotation.KafkaListener;
6+
import org.springframework.kafka.support.Acknowledgment;
7+
import org.springframework.stereotype.Component;
8+
9+
import java.util.List;
10+
11+
@Component
12+
public class DemoKafkaConsumer {
13+
@KafkaListener(
14+
topics = {"${demo-kafka.test.topic-name}"},
15+
containerFactory = KafkaConfig.BATCH_LISTENER
16+
)
17+
public void demoListener(
18+
List<ConsumerRecord<Object,Object>> messages,
19+
Acknowledgment acknowledgment
20+
){
21+
System.out.println(messages);
22+
acknowledgment.acknowledge();
23+
}
24+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
server:
2+
shutdown: graceful
3+
tomcat:
4+
threads:
5+
max: 200 # 최대 워커 스레드 수 (default : 200)
6+
min-spare: 10 # 최소 유지 스레드 수 (default : 10)
7+
connection-timeout: 1m # 연결 타임아웃 (ms) (default : 60000ms = 1m)
8+
max-connections: 8192 # 최대 동시 연결 수 (default : 8192)
9+
accept-count: 100 # 대기 큐 크기 (default : 100)
10+
keep-alive-timeout: 60s # 60s
11+
max-http-request-header-size: 8KB
12+
13+
spring:
14+
main:
15+
web-application-type: servlet
16+
application:
17+
name: commerce-api
18+
profiles:
19+
active: local
20+
config:
21+
import:
22+
- jpa.yml
23+
- redis.yml
24+
- kafka.yml
25+
- logging.yml
26+
- monitoring.yml
27+
28+
demo-kafka:
29+
test:
30+
topic-name: demo.internal.topic-v1
31+
32+
---
33+
spring:
34+
config:
35+
activate:
36+
on-profile: local, test
37+
38+
---
39+
spring:
40+
config:
41+
activate:
42+
on-profile: dev
43+
44+
---
45+
spring:
46+
config:
47+
activate:
48+
on-profile: qa
49+
50+
---
51+
spring:
52+
config:
53+
activate:
54+
on-profile: prd
55+
56+
springdoc:
57+
api-docs:
58+
enabled: false

docker/infra-compose.yml

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,15 +54,57 @@ services:
5454
"--latency-monitor-threshold", "100",
5555
]
5656
healthcheck:
57-
test: ["CMD", "redis-cli", "-p", "6380", "PING"]
57+
test: ["CMD", "redis-cli", "-p", "6379", "PING"]
5858
interval: 5s
5959
timeout: 2s
6060
retries: 10
6161

62+
kafka:
63+
image: bitnami/kafka:3.5.1
64+
container_name: kafka
65+
ports:
66+
- "9092:9092" # 카프카 브로커 PORT
67+
- "19092:19092" # 호스트 리스너 얘 떄문인가
68+
environment:
69+
- KAFKA_CFG_NODE_ID=1 # 브로커 고유 ID
70+
- KAFKA_CFG_PROCESS_ROLES=broker,controller # KRaft 모드여서, broker / controller 역할 모두 부여
71+
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,PLAINTEXT_HOST://:19092,CONTROLLER://:9093
72+
# 브로커 클라이언트 (PLAINTEXT), 브로커 호스트 (PLAINTEXT) 내부 컨트롤러 (CONTROLLER)
73+
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:19092
74+
# 외부 클라이언트 접속 호스트 (localhost:9092), 브로커 접속 호스트 (localhost:19092)
75+
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT
76+
# 각 리스너별 보안 프로토콜 설정
77+
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
78+
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER # 컨트롤러 담당 리스너 지정
79+
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka:9093 # 컨트롤러 후보 노드 정의 (단일 브로커라 자기 자신만 있음)
80+
- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=1 # consumer offset 복제 갯수 (현재는 1 - 로컬용이라서)
81+
- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 # transaction log 토픽 복제 갯수 (현재는 1 - 로컬용이라서)
82+
- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=1 # In-Sync-Replica 최소 수 (현재는 1 - 로컬용이라서)
83+
volumes:
84+
- kafka-data:/bitnami/kafka
85+
healthcheck:
86+
test: ["CMD", "bash", "-c", "kafka-topics.sh --bootstrap-server localhost:9092 --list || exit 1"]
87+
interval: 10s
88+
timeout: 5s
89+
retries: 10
90+
91+
kafka-ui:
92+
image: provectuslabs/kafka-ui:latest
93+
container_name: kafka-ui
94+
depends_on:
95+
kafka:
96+
condition: service_healthy
97+
ports:
98+
- "9090:8080"
99+
environment:
100+
KAFKA_CLUSTERS_0_NAME: local # kafka-ui 에서 보이는 클러스터명
101+
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092 # kafka-ui 가 연겷할 브로커 주소
102+
62103
volumes:
63104
mysql-8-data:
64105
redis_master_data:
65106
redis_readonly_data:
107+
kafka-data:
66108

67109
networks:
68110
default:

modules/kafka/build.gradle.kts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
plugins {
2+
`java-library`
3+
`java-test-fixtures`
4+
}
5+
6+
dependencies {
7+
api("org.springframework.kafka:spring-kafka")
8+
9+
testImplementation("org.springframework.kafka:spring-kafka-test")
10+
testImplementation("org.testcontainers:kafka")
11+
12+
testFixturesImplementation("org.testcontainers:kafka")
13+
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package com.loopers.confg.kafka;
2+
3+
import com.fasterxml.jackson.databind.ObjectMapper;
4+
import org.apache.kafka.clients.consumer.ConsumerConfig;
5+
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
6+
import org.springframework.boot.context.properties.EnableConfigurationProperties;
7+
import org.springframework.context.annotation.Bean;
8+
import org.springframework.context.annotation.Configuration;
9+
import org.springframework.kafka.annotation.EnableKafka;
10+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
11+
import org.springframework.kafka.core.*;
12+
import org.springframework.kafka.listener.ContainerProperties;
13+
import org.springframework.kafka.support.converter.BatchMessagingMessageConverter;
14+
import org.springframework.kafka.support.converter.ByteArrayJsonMessageConverter;
15+
16+
import java.util.HashMap;
17+
import java.util.Map;
18+
19+
@EnableKafka
20+
@Configuration
21+
@EnableConfigurationProperties(KafkaProperties.class)
22+
public class KafkaConfig {
23+
public static final String BATCH_LISTENER = "BATCH_LISTENER_DEFAULT";
24+
25+
public static final int MAX_POLLING_SIZE = 3000; // read 3000 msg
26+
public static final int FETCH_MIN_BYTES = (1024 * 1024); // 1mb
27+
public static final int FETCH_MAX_WAIT_MS = 5 * 1000; // broker waiting time = 5s
28+
public static final int SESSION_TIMEOUT_MS = 60 * 1000; // session timeout = 1m
29+
public static final int HEARTBEAT_INTERVAL_MS = 20 * 1000; // heartbeat interval = 20s ( 1/3 of session_timeout )
30+
public static final int MAX_POLL_INTERVAL_MS = 2 * 60 * 1000; // max poll interval = 2m
31+
32+
@Bean
33+
public ProducerFactory<Object, Object> producerFactory(KafkaProperties kafkaProperties) {
34+
Map<String, Object> props = new HashMap<>(kafkaProperties.buildProducerProperties());
35+
return new DefaultKafkaProducerFactory<>(props);
36+
}
37+
38+
@Bean
39+
public ConsumerFactory<Object, Object> consumerFactory(KafkaProperties kafkaProperties) {
40+
Map<String, Object> props = new HashMap<>(kafkaProperties.buildConsumerProperties());
41+
return new DefaultKafkaConsumerFactory<>(props);
42+
}
43+
44+
@Bean
45+
public KafkaTemplate<Object, Object> kafkaTemplate(ProducerFactory<Object, Object> producerFactory) {
46+
return new KafkaTemplate<>(producerFactory);
47+
}
48+
49+
@Bean
50+
public ByteArrayJsonMessageConverter jsonMessageConverter(ObjectMapper objectMapper) {
51+
return new ByteArrayJsonMessageConverter(objectMapper);
52+
}
53+
54+
@Bean(name = BATCH_LISTENER)
55+
public ConcurrentKafkaListenerContainerFactory<Object, Object> defaultBatchListenerContainerFactory(
56+
KafkaProperties kafkaProperties,
57+
ByteArrayJsonMessageConverter converter
58+
) {
59+
Map<String, Object> consumerConfig = new HashMap<>(kafkaProperties.buildConsumerProperties());
60+
consumerConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLLING_SIZE);
61+
consumerConfig.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, FETCH_MIN_BYTES);
62+
consumerConfig.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, FETCH_MAX_WAIT_MS);
63+
consumerConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, SESSION_TIMEOUT_MS);
64+
consumerConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, HEARTBEAT_INTERVAL_MS);
65+
consumerConfig.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, MAX_POLL_INTERVAL_MS);
66+
67+
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
68+
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfig));
69+
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); // 수동 커밋
70+
factory.setBatchMessageConverter(new BatchMessagingMessageConverter(converter));
71+
factory.setConcurrency(3);
72+
factory.setBatchListener(true);
73+
return factory;
74+
}
75+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
spring:
2+
kafka:
3+
bootstrap-servers: ${BOOTSTRAP_SERVERS}
4+
client-id: ${spring.application.name}
5+
properties:
6+
spring.json.add.type.headers: false # json serialize off
7+
request.timeout.ms: 20000
8+
retry.backoff.ms: 500
9+
auto:
10+
create.topics.enable: false
11+
register.schemas: false
12+
offset.reset: latest
13+
use.latest.version: true
14+
producer:
15+
key-serializer: org.apache.kafka.common.serialization.StringSerializer
16+
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
17+
retries: 3
18+
consumer:
19+
group-id: loopers-default-consumer
20+
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
21+
value-serializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
22+
properties:
23+
enable-auto-commit: false
24+
listener:
25+
ack-mode: manual
26+
27+
---
28+
spring.config.activate.on-profile: local, test
29+
30+
spring:
31+
kafka:
32+
bootstrap-servers: localhost:19092
33+
admin:
34+
properties:
35+
bootstrap.servers: kafka:9092
36+
37+
---
38+
spring.config.activate.on-profile: dev
39+
40+
---
41+
spring.config.activate.on-profile: qa
42+
43+
---
44+
spring.config.activate.on-profile: prd

settings.gradle.kts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@ rootProject.name = "loopers-java-spring-template"
22

33
include(
44
":apps:commerce-api",
5+
":apps:commerce-streamer",
56
":modules:jpa",
67
":modules:redis",
8+
":modules:kafka",
79
":supports:jackson",
810
":supports:logging",
911
":supports:monitoring",

0 commit comments

Comments
 (0)