-
Notifications
You must be signed in to change notification settings - Fork 24
Expand file tree
/
Copy pathKafkaConfig.java
More file actions
98 lines (84 loc) · 4.66 KB
/
KafkaConfig.java
File metadata and controls
98 lines (84 loc) · 4.66 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package com.yape.antifraud.config;
import com.yape.antifraud.model.entity.Transaction;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate;
import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.receiver.ReceiverOptions;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ReactiveKafkaProducerTemplate<String, Transaction> reactiveKafkaProducerTemplate() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
configProps.put(JsonDeserializer.TRUSTED_PACKAGES,"*");
configProps.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false); // Disable type info headers
SenderOptions<String, Transaction> senderOptions = SenderOptions.create(configProps);
return new ReactiveKafkaProducerTemplate<>(senderOptions);
}
@Bean
public ReactiveKafkaConsumerTemplate<String, Transaction> reactiveKafkaConsumerTemplate() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class.getName());
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Transaction.class.getName());
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.yape.transaction.model.entity,com.yape.antifraud.model.entity");
ReceiverOptions<String, Transaction> receiverOptions = ReceiverOptions.create(props);
return new ReactiveKafkaConsumerTemplate<>(receiverOptions);
}
@Bean
public ConsumerFactory<String, Transaction> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.yape.antifraud.model.entity.Transaction");
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.yape.antifraud.model.entity");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Transaction> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Transaction> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public KafkaAdmin kafkaAdmin() {
KafkaAdmin kafkaAdmin = new KafkaAdmin(Collections.singletonMap("bootstrap.servers", bootstrapServers));
kafkaAdmin.setAutoCreate(true);
return kafkaAdmin;
}
@Bean
public AdminClient adminClient() {
return AdminClient.create(kafkaAdmin().getConfig());
}
@Bean
public NewTopic topic() {
return new NewTopic("transactionCreated", 1, (short) 1);
}
}