Skip to content

Commit a00a29f

Browse files
shrinandthakkarShrinand Thakkar
andauthored
Added Support and Handling For Generic Headers (#931)
* Added Support and Handling For Generic Headers * Added an exception path in case unsupported header type is encountered * Fix a comment --------- Co-authored-by: Shrinand Thakkar <sthakkar@sthakkar-mn2.linkedin.biz>
1 parent 6ebb701 commit a00a29f

6 files changed

Lines changed: 79 additions & 16 deletions

File tree

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,6 @@ project(':datastream-common') {
154154
compile "com.linkedin.pegasus:restli-server:$pegasusVersion"
155155
compile "com.intellij:annotations:$intellijAnnotationsVersion"
156156
compile "com.google.guava:guava:$guavaVersion"
157-
compile "com.linkedin.kafka:kafka-clients:$kafkaVersion"
158157
compile "com.fasterxml.jackson.core:jackson-annotations:$jacksonVersion"
159158
compile "com.fasterxml.jackson.core:jackson-core:$jacksonVersion"
160159
compile "com.fasterxml.jackson.core:jackson-databind:$jacksonVersion"
@@ -172,6 +171,7 @@ project(':datastream-server-api') {
172171
project(':datastream-utils') {
173172
dependencies {
174173
compile project(':datastream-common')
174+
compile "com.linkedin.kafka:kafka-clients:$kafkaVersion"
175175
compile "org.apache.helix:zookeeper-api:$helixZkclientVersion"
176176
compile "com.linkedin.zookeeper:zookeeper:$zookeeperVersion"
177177
compile "com.google.guava:guava:$guavaVersion"

datastream-common/src/main/java/com/linkedin/datastream/common/BrooklinEnvelope.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111

1212
import org.apache.avro.reflect.Nullable;
1313
import org.apache.commons.lang.Validate;
14-
import org.apache.kafka.common.header.Headers;
1514

1615

1716
/**
@@ -30,7 +29,7 @@ public class BrooklinEnvelope {
3029

3130
private Map<String, String> _metadata;
3231

33-
private Headers _headers;
32+
private Object _headers;
3433

3534
/**
3635
* Construct a BrooklinEnvelope using record key, value, and metadata
@@ -59,11 +58,11 @@ public BrooklinEnvelope(@Nullable Object key, @Nullable Object value, @Nullable
5958
* @param key The record key (e.g. primary key)
6059
* @param value The new record value
6160
* @param previousValue The old record value
62-
* @param headers Kafka headers to associate with the change event
61+
* @param headers Generic headers to associate with the event
6362
* @param metadata Additional metadata to associate with the change event
6463
*/
6564
public BrooklinEnvelope(@Nullable Object key, @Nullable Object value, @Nullable Object previousValue,
66-
@Nullable Headers headers, Map<String, String> metadata) {
65+
@Nullable Object headers, Map<String, String> metadata) {
6766
Validate.notNull(metadata, "metadata cannot be null");
6867
setKey(key);
6968
setValue(value);
@@ -93,16 +92,16 @@ public void setPreviousValue(Object previousValue) {
9392
}
9493

9594
/**
96-
* Get the Kafka headers
95+
* Get the Generic headers
9796
*/
98-
public Headers getHeaders() {
97+
public Object getHeaders() {
9998
return _headers;
10099
}
101100

102101
/**
103-
* Set the Kafka headers
102+
* Set the Generic headers
104103
*/
105-
public void setHeaders(Headers headers) {
104+
public void setHeaders(Object headers) {
106105
_headers = headers;
107106
}
108107

datastream-kafka-connector/src/test/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTestUtils.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.apache.kafka.clients.producer.Producer;
1717
import org.apache.kafka.clients.producer.ProducerConfig;
1818
import org.apache.kafka.clients.producer.ProducerRecord;
19+
import org.apache.kafka.common.header.Headers;
1920
import org.apache.kafka.common.serialization.ByteArraySerializer;
2021
import org.testng.Assert;
2122

@@ -52,19 +53,23 @@ static Properties getKafkaProducerProperties(DatastreamEmbeddedZookeeperKafkaClu
5253

5354
static void produceEvents(String topic, int destinationPartition, int numEvents,
5455
DatastreamEmbeddedZookeeperKafkaCluster kafkaCluster) {
55-
produceEventsToPartition(topic, destinationPartition, numEvents, kafkaCluster);
56+
produceEventsToPartition(topic, destinationPartition, numEvents, kafkaCluster, null);
5657
}
5758

5859
static void produceEvents(String topic, int numEvents, DatastreamEmbeddedZookeeperKafkaCluster kafkaCluster) {
59-
produceEventsToPartition(topic, null, numEvents, kafkaCluster);
60+
produceEventsToPartition(topic, null, numEvents, kafkaCluster, null);
61+
}
62+
63+
static void produceEventsWithHeaders(String topic, int numEvents, DatastreamEmbeddedZookeeperKafkaCluster kafkaCluster, Headers headers) {
64+
produceEventsToPartition(topic, null, numEvents, kafkaCluster, headers);
6065
}
6166

6267
static void produceEventsToPartition(String topic, Integer destinationPartition, int numEvents,
63-
DatastreamEmbeddedZookeeperKafkaCluster kafkaCluster) {
68+
DatastreamEmbeddedZookeeperKafkaCluster kafkaCluster, Headers headers) {
6469
try (Producer<byte[], byte[]> producer = new KafkaProducer<>(getKafkaProducerProperties(kafkaCluster))) {
6570
for (int i = 0; i < numEvents; i++) {
6671
producer.send(new ProducerRecord<>(topic, destinationPartition, ("key-" + i).getBytes(Charsets.UTF_8),
67-
("value-" + i).getBytes(Charsets.UTF_8)), (metadata, exception) -> {
72+
("value-" + i).getBytes(Charsets.UTF_8), headers), (metadata, exception) -> {
6873
if (exception != null) {
6974
throw new RuntimeException("Failed to send message.", exception);
7075
}

datastream-kafka-connector/src/test/java/com/linkedin/datastream/connectors/kafka/mirrormaker/TestKafkaConsumerOffsets.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public void testConsumerOffsetsDiagEndpoint() {
6161

6262
// produce messages to each topic partition
6363
topics.forEach(topic -> IntStream.range(0, PARTITION_COUNT).forEach(partition ->
64-
KafkaMirrorMakerConnectorTestUtils.produceEventsToPartition(topic, partition, PARTITION_MESSAGE_COUNT, _kafkaCluster)));
64+
KafkaMirrorMakerConnectorTestUtils.produceEventsToPartition(topic, partition, PARTITION_MESSAGE_COUNT, _kafkaCluster, null)));
6565

6666
connector.onAssignmentChange(Collections.singletonList(task));
6767

datastream-kafka-connector/src/test/java/com/linkedin/datastream/connectors/kafka/mirrormaker/TestKafkaMirrorMakerConnectorTask.java

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.concurrent.TimeUnit;
2020
import java.util.concurrent.atomic.AtomicReference;
2121

22+
import org.apache.commons.io.Charsets;
2223
import org.apache.commons.lang.StringUtils;
2324
import org.apache.kafka.clients.consumer.Consumer;
2425
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -27,6 +28,8 @@
2728
import org.apache.kafka.common.TopicPartition;
2829
import org.apache.kafka.common.errors.InterruptException;
2930
import org.apache.kafka.common.errors.WakeupException;
31+
import org.apache.kafka.common.header.Headers;
32+
import org.apache.kafka.common.header.internals.RecordHeaders;
3033
import org.slf4j.Logger;
3134
import org.slf4j.LoggerFactory;
3235
import org.testng.Assert;
@@ -153,6 +156,55 @@ public List<BrooklinMetricInfo> getMetricInfos() {
153156
"did not shut down on time");
154157
}
155158

159+
@Test
160+
public void testConsumeFromSingleTopicWithHeaders() throws Exception {
161+
String connectorName = "testConsumeFromSingleTopicWithHeaders";
162+
163+
String yummyTopic = "YummyPizza";
164+
165+
createTopic(_adminClient, yummyTopic);
166+
167+
// create a datastream to consume from topics ending in "Pizza"
168+
Datastream datastream = KafkaMirrorMakerConnectorTestUtils.createDatastream("pizzaStream", _broker, "\\w+Pizza");
169+
170+
DatastreamTaskImpl task = new DatastreamTaskImpl(Collections.singletonList(datastream));
171+
MockDatastreamEventProducer datastreamProducer = new MockDatastreamEventProducer();
172+
task.setEventProducer(datastreamProducer);
173+
174+
KafkaMirrorMakerConnectorTask connectorTask =
175+
KafkaMirrorMakerConnectorTestUtils.createKafkaMirrorMakerConnectorTask(task, connectorName);
176+
KafkaMirrorMakerConnectorTestUtils.runKafkaMirrorMakerConnectorTask(connectorTask);
177+
178+
Headers genericHeaders = new RecordHeaders().add("headerKey", "headerValue".getBytes(Charsets.UTF_8));
179+
// produce an event to the topic with generic headers
180+
KafkaMirrorMakerConnectorTestUtils.produceEventsWithHeaders(yummyTopic, 10, _kafkaCluster, genericHeaders);
181+
182+
if (!PollUtils.poll(() -> datastreamProducer.getEvents().size() == 10, POLL_PERIOD_MS, POLL_TIMEOUT_MS)) {
183+
Assert.fail("did not transfer the msgs within timeout. transferred " + datastreamProducer.getEvents().size());
184+
}
185+
186+
List<DatastreamProducerRecord> records = datastreamProducer.getEvents();
187+
for (DatastreamProducerRecord record : records) {
188+
String destinationTopic = record.getDestination().get();
189+
Assert.assertTrue(destinationTopic.endsWith("Pizza"),
190+
"Unexpected event consumed from Datastream and sent to topic: " + destinationTopic);
191+
Assert.assertEquals(genericHeaders, record.getEvents().get(0).getHeaders());
192+
}
193+
194+
// Verify that metrics created through DynamicMetricsManager match those returned by getMetricInfos() given the
195+
// connector name of interest.
196+
MetricsTestUtils.verifyMetrics(new MetricsAware() {
197+
@Override
198+
public List<BrooklinMetricInfo> getMetricInfos() {
199+
return KafkaMirrorMakerConnectorTask.getMetricInfos(connectorName);
200+
}
201+
}, DynamicMetricsManager.getInstance(), s -> s.startsWith(connectorName));
202+
203+
connectorTask.stop();
204+
Assert.assertTrue(connectorTask.awaitStop(CONNECTOR_AWAIT_STOP_TIMEOUT_MS, TimeUnit.MILLISECONDS),
205+
"did not shut down on time");
206+
}
207+
156208
@Test
157209
public void testConsumeFromMultipleTopicsWithDestinationTopicPrefixMetadata() throws Exception {
158210
String yummyTopic = "YummyPizza";
@@ -227,7 +279,7 @@ public void testIdentityPartitioningEnabled() throws Exception {
227279
// produce an event half of the partitions
228280
Set<Integer> expectedPartitionsWithData = new HashSet<>();
229281
for (int i = 0; i < partitionCount; i += 2) {
230-
KafkaMirrorMakerConnectorTestUtils.produceEventsToPartition(yummyTopic, i, 1, _kafkaCluster);
282+
KafkaMirrorMakerConnectorTestUtils.produceEventsToPartition(yummyTopic, i, 1, _kafkaCluster, null);
231283
expectedPartitionsWithData.add(i);
232284
}
233285

datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaTransportProvider.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,14 @@ private ProducerRecord<byte[], byte[]> convertToProducerRecord(String topicName,
112112
Headers headers = null;
113113
if (event instanceof BrooklinEnvelope) {
114114
BrooklinEnvelope envelope = (BrooklinEnvelope) event;
115-
headers = envelope.getHeaders();
115+
if (envelope.getHeaders() != null) {
116+
if (!(envelope.getHeaders() instanceof Headers)) {
117+
throw new DatastreamRuntimeException(
118+
String.format("Unsupported header encountered %s in kafka transport provider for record %s",
119+
envelope.getHeaders(), record));
120+
}
121+
headers = (Headers) envelope.getHeaders();
122+
}
116123
if (envelope.key().isPresent() && envelope.key().get() instanceof byte[]) {
117124
keyValue = (byte[]) envelope.key().get();
118125
}

0 commit comments

Comments
 (0)