Skip to content

Commit 13bc86a

Browse files
authored
RecordRecoverableProcessor and DltAwareProcessor example (#254)
* example implementation with tests update tests and README Add implementation of junit * Added fixes for comments Made the test much more deterministic * Fixed compilation error from IDE vs CLI invocation * Removed unnecessary annotation Handled by spring boot, not needed for Kafka autoconfiguration.
1 parent b6462f4 commit 13bc86a

14 files changed

Lines changed: 853 additions & 2 deletions

File tree

kafka-batch-sample/src/main/java/com/example/demo/KafkaBatchSampleApplication.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ class BatchProduce extends Base {
100100
@Bean
101101
Function<List<String>, List<Message<String>>> consumer() {
102102
return list -> list.stream()
103-
.map(string -> string.toUpperCase())
103+
.map(String::toUpperCase)
104104
.map(uppercasedString -> MessageBuilder.withPayload(uppercasedString).build())
105105
.collect(Collectors.toList());
106106
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
target/
2+
!.mvn/wrapper/maven-wrapper.jar
3+
4+
### STS ###
5+
.apt_generated
6+
.classpath
7+
.factorypath
8+
.project
9+
.settings
10+
.springBeans
11+
12+
### IntelliJ IDEA ###
13+
.idea
14+
*.iws
15+
*.iml
16+
*.ipr
17+
18+
### NetBeans ###
19+
nbproject/private/
20+
build/
21+
nbbuild/
22+
dist/
23+
nbdist/
24+
.nb-gradle/
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
== What is this app?
2+
3+
This is an example of a Spring Cloud Stream processor using Kafka Streams support.
4+
5+
This is a demonstration of utilizing the RecordRecoverableProcessor and DltAwareProcessor.
6+
7+
The example increments a java.util.Suppler integer by 1 every 2 seconds, and publishes
8+
to the `topic-in` kafka topic.
9+
10+
There's then two processors, both of which are listening on `topic-in`.
11+
12+
The first processor throws an exception in the workflow when the integer is divisible by 5.
13+
The second processor throws an exception in the workflow when the integer is divisible by 4.
14+
15+
This sample uses lambda expressions and thus requires Java 8+.
16+
17+
=== Running the app:
18+
19+
`docker-compose up -d`
20+
21+
Go to the root of the repository and do: `./mvnw clean package`
22+
23+
`java -jar target/kafka-streams-recoverable-0.0.1-SNAPSHOT.jar`
24+
25+
The Supplier will automatically publish messages and the consumers
26+
will write messages to the application log.
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
version: '3'
2+
services:
3+
kafka:
4+
image: wurstmeister/kafka
5+
container_name: kafka-dlq
6+
ports:
7+
- "9092:9092"
8+
environment:
9+
- KAFKA_ADVERTISED_HOST_NAME=127.0.0.1
10+
- KAFKA_ADVERTISED_PORT=9092
11+
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
12+
depends_on:
13+
- zookeeper
14+
zookeeper:
15+
image: wurstmeister/zookeeper
16+
ports:
17+
- "2181:2181"
18+
environment:
19+
- KAFKA_ADVERTISED_HOST_NAME=zookeeper

kafka-streams-samples/kafka-streams-recoverable/mvnw

Lines changed: 225 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)