Skip to content

Commit 5669df9

Browse files
author
anders-wartoft
committed
1.1-5
1 parent 82159ac commit 5669df9

5 files changed

Lines changed: 61 additions & 18 deletions

File tree

README.md

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@ java -jar target/LogGenerator{version}.jar -i kafka -ci test3 -t OUTPUT -b 192.1
1818
When running the last command, press Ctrl-C to see the gaps in the received data. Since we started the counter on 100, there should at least be one gap: 1-99.
1919

2020
### Latest Release Notes
21+
22+
### 1.1-5
23+
Added a property --print-keys (-pk) to the KafkaInputItem to print the Kafka key for lines read.
24+
Added a property --start-number (-sn) to the CounterInputItem to alter the start number
25+
2126
#### 1.1-4
2227
Changed kafka-clients dependency version from 3.7.1 to 3.9.1 due to CVE-2025-27817
2328

@@ -162,7 +167,7 @@ Parameters and example, see below in the Q&A section.
162167
#### Fetch from Kafka topics
163168
Connect to a Kafka server and read from a topic
164169

165-
Parameters: `-i kafka -ci {client id} -t {topic name} -b {boostrap server}`
170+
Parameters: `-i kafka -ci {client id} -t {topic name} -b {boostrap server} --print-keys {true/false}`
166171

167172
Example: `-i kafka -ci test -t testtopic -b localhost:9092`
168173

@@ -174,7 +179,7 @@ The main use case is to get one field from an index and send to a receiver, for
174179
Parameters: `-i elastic --hostname {hostname or ip} -p {port number} -i {index name} -f {field to get} -ak {API key} -c {x.509 certificate in cer format} -q {query string in query dsl format}`
175180

176181
Example:
177-
``` properties
182+
```ini
178183
# Elasticsearch
179184
input=elastic
180185
# Elastic instance to connect to
@@ -295,7 +300,7 @@ The main use case is to be able to generate events with _id set to some enumerab
295300

296301
Parameters `-o elastic --hostname {hostname or ip} --port {port number} --index {index} --api-key {API Key} --regex {regex to find an id from input} --id {format for output id} --certificate-path {X.509 certificate for the elastic server in cer format}`
297302

298-
``` properties
303+
```ini
299304
# Elasticsearch
300305
output=elastic
301306
# Elastic instance to connect to
@@ -728,7 +733,7 @@ A combination of parameters from the command line and property file is possible.
728733
The property file can have all short- or long names for configuration. Comment lines start with the hash character '#'.
729734

730735
Example:
731-
```properties
736+
```ini
732737
# Start an udp proxy, listening on port 9999, rewriting dates to the current date.
733738
# Dates are parsed according to a specified format and finally sending the result
734739
# to a syslog server.
@@ -774,15 +779,15 @@ There are three ways to add custom variables:
774779

775780
Example:
776781
Say you would like to always exchange the text `{company-name}` in a template with `sitia.nu`. Then add the following to a property file (let's call it variables.properties):
777-
```properties
782+
```ini
778783
company-name=sitia.nu
779784
```
780785
and add that file to the property file loaded by `-pf` on the command to LogGenerator:
781-
```properties
786+
```ini
782787
variable-file=variables.properties
783788
```
784789
If you only have a small number of custom variables, you could just load all of them in the property file loaded by `-p`:
785-
```properties
790+
```ini
786791
custom.company-name=sitia.nu
787792
```
788793
Custom variables can contain variables.
@@ -1071,7 +1076,7 @@ Also, you can omit the escape:
10711076
-f gap --regex "<(\d+)>"
10721077
```
10731078
But, in the property file you have to write the regex as is, without extra escape characters, like
1074-
```properties
1079+
```ini
10751080
filter=gap
10761081
regex=<(\d+)>
10771082
```

dependency-reduced-pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<groupId>nu.sitia.LogGenerator</groupId>
55
<artifactId>LogGenerator</artifactId>
66
<name>LogGenerator</name>
7-
<version>1.1-4</version>
7+
<version>1.1-5</version>
88
<url>http://maven.apache.org</url>
99
<build>
1010
<plugins>

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
<groupId>nu.sitia.LogGenerator</groupId>
66
<artifactId>LogGenerator</artifactId>
7-
<version>1.1-4</version>
7+
<version>1.1-5</version>
88
<packaging>jar</packaging>
99

1010
<name>LogGenerator</name>

src/main/java/nu/sitia/loggenerator/inputitems/CounterInputItem.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,9 @@ public boolean setParameter(String key, String value) {
4646
System.out.println("CounterInputItem. Return a string with an ever-increasing number\n" +
4747
"Parameters:\n" +
4848
"--string <string> (-st <string>)\n" +
49-
" The string to return\n");
50-
super.setParameter(key, value);
49+
" The string to return\n" +
50+
"--start-number <number> (-sn <number>)\n" +
51+
" The number to start with (default 1)\n");
5152
}
5253
if(super.setParameter(key, value)) {
5354
return true;
@@ -56,6 +57,14 @@ public boolean setParameter(String key, String value) {
5657
this.string = value;
5758
return true;
5859
}
60+
if (key != null && (key.equalsIgnoreCase("--start-number") || key.equalsIgnoreCase("-sn"))) {
61+
try {
62+
this.number = Long.parseLong(value);
63+
} catch (NumberFormatException e) {
64+
System.out.println("Invalid number format for --start-number");
65+
}
66+
return true;
67+
}
5968
return false;
6069
}
6170

src/main/java/nu/sitia/loggenerator/inputitems/KafkaInputItem.java

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import nu.sitia.loggenerator.Configuration;
2121
import org.apache.kafka.clients.consumer.*;
22+
import org.apache.kafka.common.errors.UnknownTopicIdException;
2223

2324
import java.time.Duration;
2425
import java.util.ArrayList;
@@ -45,6 +46,10 @@ public class KafkaInputItem extends AbstractInputItem {
4546
/** The topic name */
4647
private String topicName;
4748

49+
/** Whether to print the keys read from Kafka */
50+
private boolean printKeys = false;
51+
52+
/** The default Kafka batch size */
4853
private final static int DEFAULT_KAFKA_BATCHSIZE = 200;
4954

5055
/**
@@ -65,7 +70,9 @@ public boolean setParameter(String key, String value) {
6570
"--topic <topic> (-t <topic>)\n" +
6671
" The topic to read from\n" +
6772
"--bootstrap-server <hostname:port> (-b <hostname:port>)\n" +
68-
" The hostname:port to connect to\n");
73+
" The hostname:port to connect to\n" +
74+
"--print-keys <true|false> (-pk <true|false>)\n" +
75+
" Whether to print the keys read from Kafka. Default false\n");
6976
super.setParameter(key, value);
7077
}
7178
if (super.setParameter(key, value)) {
@@ -86,6 +93,11 @@ public boolean setParameter(String key, String value) {
8693
logger.fine("bootstrapServer " + value);
8794
return true;
8895
}
96+
if (key != null && (key.equalsIgnoreCase("--print-keys") || key.equalsIgnoreCase("-pk"))) {
97+
this.printKeys = Boolean.parseBoolean(value);
98+
logger.fine("printKeys " + value);
99+
return true;
100+
}
89101
return false;
90102
}
91103

@@ -114,10 +126,12 @@ public void setup() throws RuntimeException {
114126
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, String.valueOf(batchSize));
115127
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
116128
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
129+
properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
117130

118131
consumer = new KafkaConsumer<>(properties);
119132
logger.info("Connected to kafka " + this.bootstrapServer);
120133
consumer.subscribe(Collections.singletonList(this.topicName));
134+
consumer.poll(Duration.ofSeconds(5)); // fetch metadata and leader info
121135
logger.info("Subscribed to topic " + this.topicName);
122136
}
123137

@@ -136,13 +150,28 @@ public boolean hasNext() {
136150
* @return The read input
137151
*/
138152
public List<String> next() {
139-
ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(1000));
140153
List<String> result = new ArrayList<>();
141-
for (ConsumerRecord<Integer, String> record: records) {
142-
result.add(record.value());
154+
while (true) {
155+
try {
156+
ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(1000));
157+
for (ConsumerRecord<Integer, String> record: records) {
158+
if (printKeys) {
159+
result.add(record.key() + ": " + record.value());
160+
} else {
161+
result.add(record.value());
162+
}
163+
}
164+
logger.log(Level.FINEST, result.toString());
165+
return result;
166+
} catch (UnknownTopicIdException e) {
167+
try {
168+
// wait a bit and retry
169+
Thread.sleep(500);
170+
} catch (InterruptedException e1) {
171+
return result;
172+
}
173+
}
143174
}
144-
logger.log(Level.FINEST, result.toString());
145-
return result;
146175
}
147176

148177
/**

0 commit comments

Comments
 (0)