Skip to content
This repository was archived by the owner on Dec 15, 2025. It is now read-only.

Commit 19cbe05

Browse files
committed
Update hibench common
* Added profile scala 2.12 to POM. Scala < 2.11 does not compiles on java 1.11 jdk. * Changed in POM org.apache.kafka to 0.10.2.2. * Following scala files has been rewritten to work with kafka_2.12 version 0.10.2.2: * KafkaCollector.scala * KafkaConsumer.scala * MetricsUtil.scala Signed-off-by: Luis Ponce <luis.f.ponce.navarro@linux.intel.com>
1 parent aae754b commit 19cbe05

4 files changed

Lines changed: 38 additions & 17 deletions

File tree

common/pom.xml

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
<dependency>
4242
<groupId>org.apache.kafka</groupId>
4343
<artifactId>kafka_${scala.binary.version}</artifactId>
44-
<version>0.8.2.1</version>
44+
<version>0.10.2.2</version>
4545
</dependency>
4646
</dependencies>
4747

@@ -134,5 +134,20 @@
134134
</property>
135135
</activation>
136136
</profile>
137+
138+
<profile>
139+
<id>scala2.12</id>
140+
<properties>
141+
<scala.version>2.12.8</scala.version>
142+
<scala.binary.version>2.12</scala.binary.version>
143+
</properties>
144+
<activation>
145+
<property>
146+
<name>scala</name>
147+
<value>2.12</value>
148+
</property>
149+
</activation>
150+
</profile>
151+
137152
</profiles>
138153
</project>

common/src/main/scala/com/intel/hibench/common/streaming/metrics/KafkaCollector.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.util.Date
2222
import java.util.concurrent.{TimeUnit, Future, Executors}
2323

2424
import com.codahale.metrics.{UniformReservoir, Histogram}
25-
import kafka.utils.{ZKStringSerializer, ZkUtils}
25+
import kafka.utils.{ZKStringSerializer$, ZkUtils}
2626
import org.I0Itec.zkclient.ZkClient
2727

2828
import scala.collection.mutable.ArrayBuffer
@@ -60,9 +60,10 @@ class KafkaCollector(zkConnect: String, metricsTopic: String,
6060
}
6161

6262
private def getPartitions(topic: String, zkConnect: String): Seq[Int] = {
63-
val zkClient = new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)
63+
val zkClient = new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer$.MODULE$)
64+
val zkUtils = ZkUtils.apply(zkClient, false);
6465
try {
65-
ZkUtils.getPartitionsForTopics(zkClient, Seq(topic)).flatMap(_._2).toSeq
66+
zkUtils.getPartitionsForTopics(Seq(topic)).flatMap(_._2).toSeq
6667
} finally {
6768
zkClient.close()
6869
}

common/src/main/scala/com/intel/hibench/common/streaming/metrics/KafkaConsumer.scala

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,11 @@ import kafka.common.ErrorMapping._
2323
import kafka.common.TopicAndPartition
2424
import kafka.consumer.{ConsumerConfig, SimpleConsumer}
2525
import kafka.message.MessageAndOffset
26-
import kafka.utils.{ZKStringSerializer, ZkUtils, Utils}
26+
import kafka.utils.{ZKStringSerializer$, ZkUtils}
2727
import org.I0Itec.zkclient.ZkClient
28+
import org.apache.kafka.common.utils.Utils
29+
import org.apache.kafka.common.protocol.SecurityProtocol
30+
import org.apache.kafka.common.network.ListenerName
2831

2932
class KafkaConsumer(zookeeperConnect: String, topic: String, partition: Int) {
3033

@@ -67,15 +70,15 @@ class KafkaConsumer(zookeeperConnect: String, topic: String, partition: Int) {
6770
}
6871

6972
private def createConsumer: SimpleConsumer = {
70-
val zkClient = new ZkClient(zookeeperConnect, 6000, 6000, ZKStringSerializer)
73+
val zkClient = new ZkClient(zookeeperConnect, 6000, 6000, ZKStringSerializer$.MODULE$)
74+
val zkUtils = ZkUtils.apply(zkClient, false);
7175
try {
72-
val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
73-
.getOrElse(throw new RuntimeException(
74-
s"leader not available for TopicAndPartition($topic, $partition)"))
75-
val broker = ZkUtils.getBrokerInfo(zkClient, leader)
76-
.getOrElse(throw new RuntimeException(s"broker info not found for leader $leader"))
77-
new SimpleConsumer(broker.host, broker.port,
78-
config.socketTimeoutMs, config.socketReceiveBufferBytes, CLIENT_ID)
76+
val leader = zkUtils.getLeaderForPartition(topic, partition)
77+
.getOrElse(throw new RuntimeException(s"leader not available for TopicAndPartition($topic, $partition)"))
78+
val brokerInfo = zkUtils.getBrokerInfo(leader)
79+
.getOrElse(throw new RuntimeException(s"broker info not found for leader $leader"))
80+
val broker = brokerInfo.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
81+
new SimpleConsumer(broker.host, broker.port , config.socketTimeoutMs, config.socketReceiveBufferBytes, CLIENT_ID)
7982
} catch {
8083
case e: Exception =>
8184
throw e

common/src/main/scala/com/intel/hibench/common/streaming/metrics/MetricsUtil.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ package com.intel.hibench.common.streaming.metrics
1818

1919
import com.intel.hibench.common.streaming.Platform
2020
import kafka.admin.AdminUtils
21-
import kafka.utils.ZKStringSerializer
21+
import kafka.utils.ZKStringSerializer$
22+
import kafka.utils.ZkUtils
2223
import org.I0Itec.zkclient.ZkClient
2324

2425
object MetricsUtil {
@@ -34,10 +35,11 @@ object MetricsUtil {
3435
}
3536

3637
def createTopic(zkConnect: String, topic: String, partitions: Int): Unit = {
37-
val zkClient = new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)
38+
val zkClient = new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer$.MODULE$)
3839
try {
39-
AdminUtils.createTopic(zkClient, topic, partitions, 1)
40-
while (!AdminUtils.topicExists(zkClient, topic)) {
40+
val zkUtils = ZkUtils.apply(zkClient, false);
41+
AdminUtils.createTopic(zkUtils, topic, partitions, 1)
42+
while (!AdminUtils.topicExists(zkUtils, topic)) {
4143
Thread.sleep(100)
4244
}
4345
} catch {

0 commit comments

Comments
 (0)