Skip to content

Commit 9a00c2a

Browse files
committed
Changes per review comments
Added the following test-cases: - When preserveEventSourceTimestamp is True - When preserveEventSourceTimestamp is False - When preserveEventSourceTimestamp is not configured
2 parents b470599 + 9fe959a commit 9a00c2a

2 files changed

Lines changed: 24 additions & 8 deletions

File tree

datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaTransportProvider.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public KafkaTransportProvider(DatastreamTask datastreamTask, List<KafkaProducerW
8282
}
8383

8484
_preserveEventSourceTimestamp = Boolean.parseBoolean(datastreamTask.getDatastreams().get(0).getMetadata()
85-
.get(DatastreamMetadataConstants.PRESERVE_EVENT_SOURCE_TIMESTAMP));
85+
.getOrDefault(DatastreamMetadataConstants.PRESERVE_EVENT_SOURCE_TIMESTAMP, Boolean.FALSE.toString()));
8686

8787
// initialize metrics
8888
_dynamicMetricsManager = DynamicMetricsManager.getInstance();
@@ -101,7 +101,7 @@ private ProducerRecord<byte[], byte[]> convertToProducerRecord(String topicName,
101101

102102
Optional<Integer> partition = record.getPartition();
103103

104-
byte[] keyValue = new byte[0];
104+
byte[] keyValue = null;
105105
byte[] payloadValue = new byte[0];
106106
if (event instanceof BrooklinEnvelope) {
107107
BrooklinEnvelope envelope = (BrooklinEnvelope) event;
@@ -125,7 +125,7 @@ private ProducerRecord<byte[], byte[]> convertToProducerRecord(String topicName,
125125
// If the partition is not specified. We use the partitionKey as the key. Kafka will use the hash of that
126126
// to determine the partition. If partitionKey does not exist, use the key value.
127127
keyValue = record.getPartitionKey().isPresent()
128-
? record.getPartitionKey().get().getBytes(StandardCharsets.UTF_8) : null;
128+
? record.getPartitionKey().get().getBytes(StandardCharsets.UTF_8) : keyValue;
129129
return new ProducerRecord<>(topicName, null, recordTimeStamp, keyValue, payloadValue);
130130
}
131131
}

datastream-kafka/src/test/java/com/linkedin/datastream/kafka/TestKafkaTransportProvider.java

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -209,10 +209,20 @@ public void testEventWithoutKeyValueAndPartition() throws Exception {
209209
}
210210

211211
@Test
212-
public void testEventWithTimestamp() throws Exception {
212+
public void testEventWithPreserveTimestampTrue() throws Exception {
213213
testEventSendWithTimestamp(1, 2, -1, false, false, "test", true);
214214
}
215215

216+
@Test
217+
public void testEventWithPreserveTimestampFalse() throws Exception {
218+
testEventSendWithTimestamp(1, 2, -1, false, false, "test", false);
219+
}
220+
221+
@Test
222+
public void testEventWithoutPreservingTimestamp() throws Exception {
223+
testEventSendWithTimestamp(1, 2, -1, false, false, "test", null);
224+
}
225+
216226
@Test
217227
public void testSendMultipleEventsInSingleDatastreamProducerRecord() throws Exception {
218228
String metricsPrefix = "test";
@@ -290,7 +300,7 @@ public void testSendMultipleEventsInSingleDatastreamProducerRecord() throws Exce
290300
}
291301

292302
private void testEventSendWithTimestamp(int numberOfEvents, int numberOfPartitions, int partition, boolean includeKey,
293-
boolean includeValue, String metricsPrefix, boolean preserveSourceEventTimestamp) throws Exception {
303+
boolean includeValue, String metricsPrefix, Boolean preserveSourceEventTimestamp) throws Exception {
294304
String topicName = getUniqueTopicName();
295305

296306
if (metricsPrefix != null) {
@@ -302,7 +312,9 @@ private void testEventSendWithTimestamp(int numberOfEvents, int numberOfPartitio
302312

303313
Datastream ds = DatastreamTestUtils.createDatastream("test", "ds1", "source", destinationUri, numberOfPartitions);
304314

305-
ds.getMetadata().put(DatastreamMetadataConstants.PRESERVE_EVENT_SOURCE_TIMESTAMP, Boolean.TRUE.toString());
315+
if (preserveSourceEventTimestamp != null) {
316+
ds.getMetadata().put(DatastreamMetadataConstants.PRESERVE_EVENT_SOURCE_TIMESTAMP, preserveSourceEventTimestamp.toString());
317+
}
306318

307319
DatastreamTask task = new DatastreamTaskImpl(Collections.singletonList(ds));
308320
TransportProvider transportProvider = provider.assignTransportProvider(task);
@@ -322,7 +334,7 @@ private void testEventSendWithTimestamp(int numberOfEvents, int numberOfPartitio
322334
}
323335

324336
List<DatastreamProducerRecord> datastreamEvents =
325-
createEvents(topicName, partition, numberOfEvents, includeKey, includeValue, eventSourceTimestamps);
337+
createEvents(topicName, partition, numberOfEvents, includeKey, includeValue, eventSourceTimestamps);
326338

327339
LOG.info(String.format("Trying to send %d events to topic %s", datastreamEvents.size(), topicName));
328340

@@ -343,7 +355,11 @@ private void testEventSendWithTimestamp(int numberOfEvents, int numberOfPartitio
343355
return readTimestamps.size() < numberOfEvents;
344356
});
345357

346-
Assert.assertEquals(readTimestamps, eventSourceTimestamps);
358+
if (preserveSourceEventTimestamp != null && preserveSourceEventTimestamp) {
359+
Assert.assertEquals(readTimestamps, eventSourceTimestamps);
360+
} else {
361+
Assert.assertNotEquals(readTimestamps, eventSourceTimestamps);
362+
}
347363

348364
if (metricsPrefix != null) {
349365
// verify that configured metrics prefix was used

0 commit comments

Comments
 (0)