Skip to content

Commit 5be06f4

Browse files
shrinandthakkarShrinand Thakkar
andauthored
Minor improvements w.r.t #928 & Releasing a newer version (#932)
* Releasing a new version And Minor improvements * Using immutable empty set & keeping SNAPSHOT to accidently not release any version --------- Co-authored-by: Shrinand Thakkar <sthakkar@sthakkar-mn2.linkedin.biz>
1 parent a00a29f commit 5be06f4

3 files changed

Lines changed: 20 additions & 8 deletions

File tree

datastream-server/src/main/java/com/linkedin/datastream/server/EventProducer.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,20 @@ public class EventProducer implements DatastreamEventProducer {
122122
* @param customCheckpointing decides whether producer should use custom checkpointing or the datastream server
123123
* provided checkpointing.
124124
*/
125+
public EventProducer(DatastreamTask task, TransportProvider transportProvider, CheckpointProvider checkpointProvider,
126+
Properties config, boolean customCheckpointing) {
127+
this(task, transportProvider, checkpointProvider, config, customCheckpointing, (t) -> Collections.emptySet());
128+
}
129+
130+
/**
131+
* Construct an EventProducer instance.
132+
* @param transportProvider the transport provider
133+
* @param checkpointProvider the checkpoint provider
134+
* @param config the config options
135+
* @param customCheckpointing decides whether producer should use custom checkpointing or the datastream server
136+
* provided checkpointing.
137+
* @param throughputViolatingTopicsProvider function parameter per task to find the throughput violating topics
138+
*/
125139
public EventProducer(DatastreamTask task, TransportProvider transportProvider, CheckpointProvider checkpointProvider,
126140
Properties config, boolean customCheckpointing,
127141
Function<DatastreamTask, Set<String>> throughputViolatingTopicsProvider) {

datastream-server/src/test/java/com/linkedin/datastream/server/TestEventProducer.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import java.lang.reflect.Field;
99
import java.util.Collections;
1010
import java.util.HashMap;
11-
import java.util.HashSet;
1211
import java.util.Properties;
1312
import java.util.concurrent.atomic.AtomicInteger;
1413

@@ -71,8 +70,8 @@ public void send(String destination, DatastreamProducerRecord record, SendCallba
7170
}
7271
};
7372

74-
EventProducer eventProducer = new EventProducer(task, transport,
75-
new NoOpCheckpointProvider(), new Properties(), false, (t) -> new HashSet<>());
73+
EventProducer eventProducer =
74+
new EventProducer(task, transport, new NoOpCheckpointProvider(), new Properties(), false);
7675

7776
int eventCount = 5;
7877
for (int i = 0; i < eventCount; i++) {
@@ -122,8 +121,8 @@ public void send(String destination, DatastreamProducerRecord record, SendCallba
122121
}
123122
};
124123

125-
EventProducer eventProducer = new EventProducer(task, transport,
126-
new NoOpCheckpointProvider(), new Properties(), false, (t) -> new HashSet<>());
124+
EventProducer eventProducer =
125+
new EventProducer(task, transport, new NoOpCheckpointProvider(), new Properties(), false);
127126

128127
int eventCount = 5;
129128
for (int i = 0; i < eventCount; i++) {
@@ -143,8 +142,7 @@ public void testPerDatastreamMetrics() {
143142

144143
Properties props = new Properties();
145144
props.put(EventProducer.CONFIG_ENABLE_PER_TOPIC_METRICS, Boolean.FALSE.toString());
146-
EventProducer eventProducer =
147-
new EventProducer(task, transport, new NoOpCheckpointProvider(), props, false, (t) -> new HashSet<>());
145+
EventProducer eventProducer = new EventProducer(task, transport, new NoOpCheckpointProvider(), props, false);
148146

149147
eventProducer.send(createDatastreamProducerRecord(), (m, e) -> {
150148
});

gradle/maven.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
allprojects {
2-
version = "5.2.8-SNAPSHOT"
2+
version = "5.3.0-SNAPSHOT"
33
}
44

55
subprojects {

0 commit comments

Comments
 (0)