Skip to content
This repository was archived by the owner on Dec 15, 2025. It is now read-only.

Commit 6634fe9

Browse files
authored
Merge pull request #1 from carsonwang/mergeStreamingWithMaster
Merge streaming with master
2 parents 83992e1 + d695c64 commit 6634fe9

179 files changed

Lines changed: 4782 additions & 43412 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

bin/functions/hibench_prop_env_mapping.py

Lines changed: 27 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -123,31 +123,36 @@
123123
MODEL="hibench.nweight.model",
124124

125125
# For streaming bench
126-
# zkHelper
127-
STREAMING_ZKHELPER_JAR="hibench.streamingbench.zkhelper.jar",
126+
STREAMING_TESTCASE="hibench.streambench.testCase",
127+
128128
# prepare
129-
STREAMING_TOPIC_NAME="hibench.streamingbench.topic_name",
130-
STREAMING_KAFKA_HOME="hibench.streamingbench.kafka.home",
131-
STREAMING_ZKADDR="hibench.streamingbench.zookeeper.host",
132-
STREAMING_CONSUMER_GROUP="hibench.streamingbench.consumer_group",
133-
STREAMING_DATA_SCALE_FACTOR="hibench.streamingbench.datagen.scale_factor",
134-
STREAMING_DATA_DIR="hibench.streamingbench.datagen.dir",
135-
STREAMING_DATA1_NAME="hibench.streamingbench.datagen.data1.name",
136-
STREAMING_DATA1_DIR="hibench.streamingbench.datagen.data1.dir",
137-
STREAMING_DATA1_LENGTH="hibench.streamingbench.datagen.data1.length",
138-
STREAMING_DATA2_SAMPLE_DIR="hibench.streamingbench.datagen.data2_samples.dir",
139-
STREAMING_DATA2_CLUSTER_DIR="hibench.streamingbench.datagen.data2_cluster.dir",
140-
STREAMING_PARTITIONS="hibench.streamingbench.partitions",
141-
DATA_GEN_JAR="hibench.streamingbench.datagen.jar",
142-
143-
STREAMING_DATAGEN_MODE="hibench.streamingbench.prepare.mode",
144-
STREAMING_DATAGEN_RECORDS="hibench.streamingbench.prepare.push.records",
129+
STREAMING_TOPIC_NAME="hibench.streambench.kafka.topic",
130+
STREAMING_KAFKA_HOME="hibench.streambench.kafka.home",
131+
STREAMING_ZKADDR="hibench.streambench.zkHost",
132+
STREAMING_CONSUMER_GROUP="hibench.streambench.kafka.consumerGroup",
133+
STREAMING_DATA_DIR="hibench.streambench.datagen.dir",
134+
STREAMING_DATA1_NAME="hibench.streambench.datagen.data1.name",
135+
STREAMING_DATA1_DIR="hibench.streambench.datagen.data1.dir",
136+
STREAMING_DATA1_LENGTH="hibench.streambench.datagen.recordLength",
137+
STREAMING_DATA2_SAMPLE_DIR="hibench.streambench.datagen.data2_samples.dir",
138+
STREAMING_DATA2_CLUSTER_DIR="hibench.streambench.datagen.data2_cluster.dir",
139+
STREAMING_PARTITIONS="hibench.streambench.kafka.topicPartitions",
140+
DATA_GEN_JAR="hibench.streambench.datagen.jar",
145141

146142
# sparkstreaming
147-
STREAMINGBENCH_JARS="hibench.streamingbench.jars",
148-
STREAMBENCH_STORM_JAR="hibench.streamingbench.stormbench.jar",
149-
STORM_BIN_HOME="hibench.streamingbench.storm.bin",
150-
STREAMING_BENCHNAME="hibench.streamingbench.benchname",
143+
STREAMBENCH_SPARK_JAR="hibench.streambench.sparkbench.jar",
144+
STREAMBENCH_STORM_JAR="hibench.streambench.stormbench.jar",
145+
146+
# gearpump
147+
GEARPUMP_HOME="hibench.streambench.gearpump.home",
148+
STREAMBENCH_GEARPUMP_JAR="hibench.streambench.gearpump.jar",
149+
STREAMBENCH_GEARPUMP_EXECUTORS="hibench.streambench.gearpump.executors",
150+
151+
# flinkstreaming
152+
HIBENCH_FLINK_MASTER="hibench.flink.master",
153+
FLINK_HOME="hibench.streambench.flink.home",
154+
STREAMBENCH_FLINK_JAR="hibench.streambench.flinkbench.jar",
155+
STREAMBENCH_FLINK_PARALLELISM="hibench.streambench.flink.parallelism",
151156

152157
# samza
153158
STREAMING_SAMZA_WORDCOUNT_INTERNAL_TOPIC="samza_internal.wordcount.kafka.input.name",

bin/functions/load-config.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -239,9 +239,9 @@ def wildcard_replacement(key, value):
239239
finish = False
240240

241241

242-
wildcard_rules = [(key, HibenchConf[key]) for key in HibenchConf if "*" in key]
242+
# wildcard_rules = [(key, HibenchConf[key]) for key in HibenchConf if "*" in key]
243243
# now, let's check wildcard replacement rules
244-
for key, value in wildcard_rules:
244+
# for key, value in wildcard_rules:
245245
# check if we found a rule like: aaa.*.ccc.*.ddd -> bbb.*.*
246246

247247
# wildcard replacement is useful for samza conf, which
@@ -253,12 +253,12 @@ def wildcard_replacement(key, value):
253253
# switch the order of two wildcards, something like the
254254
# first wildcard in key to match the second wildcard in
255255
# value. I just don't think it'll be needed.
256-
if not wildcard_replacement(key, value): # not wildcard rules? re-add
256+
# if not wildcard_replacement(key, value): # not wildcard rules? re-add
257257
HibenchConf[key] = value
258-
if wildcard_rules: # need try again
259-
wildcard_rules = []
260-
else: break
261-
258+
# if wildcard_rules: # need try again
259+
# wildcard_rules = []
260+
# else: break
261+
break
262262
# all finished, remove values contains no_value_sign
263263
for key in [x for x in HibenchConf if no_value_sign in HibenchConf[x]]:
264264
del HibenchConf[key]

bin/functions/workload-functions.sh

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -271,15 +271,27 @@ function run-spark-job() {
271271
}
272272

273273
function run-streaming-job (){
274-
run-spark-job --jars ${STREAMINGBENCH_JARS} $@
274+
run-spark-job --jars ${STREAMBENCH_SPARK_JAR} $@
275275
}
276276

277277
function run-storm-job(){
278-
CMD="${STORM_BIN_HOME}/storm jar ${STREAMBENCH_STORM_JAR} $@"
278+
CMD="${STORM_HOME}/bin/storm jar ${STREAMBENCH_STORM_JAR} $@"
279279
echo -e "${BGreen}Submit Storm Job: ${Green}$CMD${Color_Off}"
280280
execute_withlog $CMD
281281
}
282282

283+
function run-gearpump-app(){
284+
CMD="${GEARPUMP_HOME}/bin/gear app -executors ${STREAMBENCH_GEARPUMP_EXECUTORS} -jar ${STREAMBENCH_GEARPUMP_JAR} $@"
285+
echo -e "${BGreen}Submit Gearpump Application: ${Green}$CMD${Color_Off}"
286+
execute_withlog $CMD
287+
}
288+
289+
function run-flink-job(){
290+
CMD="${FLINK_HOME}/bin/flink run -p ${STREAMBENCH_FLINK_PARALLELISM} -m ${HIBENCH_FLINK_MASTER} $@ ${STREAMBENCH_FLINK_JAR} ${SPARKBENCH_PROPERTIES_FILES}"
291+
echo -e "${BGreen}Submit Flink Job: ${Green}$CMD${Color_Off}"
292+
execute_withlog $CMD
293+
}
294+
283295
function run-hadoop-job(){
284296
ENABLE_MONITOR=1
285297
if [ "$1" = "--without-monitor" ]; then

conf/00-default-properties.conf

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,3 +257,9 @@ spark.default.parallelism ${hibench.default.map.parallelism}
257257

258258
# set spark sql's default shuffle partitions according to hibench's parallelism value
259259
spark.sql.shuffle.partitions ${hibench.default.map.parallelism}
260+
261+
262+
#=======================================================
263+
# Flink
264+
#=======================================================
265+
hibench.flink.master FLINK_JM_HOST:PORT

conf/01-default-streamingbench.conf

Lines changed: 104 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -1,146 +1,152 @@
1-
# Two data sets(text and numeric) are available, app argument indicates to use which
2-
#app=micro-sketch #use text dataset, avg record size: 60 bytes
3-
#app=micro-statistics #use numeric dataset, avg record size: 200 bytes
4-
hibench.streamingbench.app micro-sketch
1+
#########################################################
2+
# General Stream Config
3+
#########################################################
54

6-
# Text dataset can be scaled in terms of record size
7-
hibench.streamingbench.prepare.textdataset_recordsize_factor
5+
# Note to ensure benchName to be consistent with datagen type. Numeric data for statistics and text data for others
6+
# (available benchname: identity, repartition) TDB: sample project grep wordcount distinctcount statistics
7+
hibench.streambench.testCase identity
88

9-
# Two modes of generator: push,periodic
10-
# Push means to send data to kafka cluster as fast as it could
11-
# Periodic means sending data according to sending rate specification
12-
#hibench.streamingbench.prepare.mode push
13-
hibench.streamingbench.prepare.mode periodic
9+
# zookeeper address for Kakfa serverce, (default: HOSTNAME:HOSTPORT)
10+
hibench.streambench.zkHost HOSTNAME:HOSTPORT
1411

15-
# Under push mode: number of total records that will be generated
16-
hibench.streamingbench.prepare.push.records 900000000
12+
# Probability used in sample test case
13+
hibench.streambench.sampleProbability 0.1
1714

18-
# Following three params are under periodic mode
19-
# Bytes to push per interval
20-
hibench.streamingbench.prepare.periodic.recordPerInterval 600000
15+
# Indicate whether in debug mode for correctness verfication (default: false)
16+
hibench.streambench.debugMode false
2117

22-
# Interval time (in ms)
23-
hibench.streamingbench.prepare.periodic.intervalSpan 5000
18+
# JARS
19+
hibench.streambench.datagen.jar ${hibench.home}/src/streambench/datagen/target/streaming-bench-datagen-5.0-SNAPSHOT-jar-with-dependencies.jar
20+
hibench.streambench.sparkbench.jar ${hibench.home}/src/streambench/sparkbench/target/streaming-bench-spark-5.0-SNAPSHOT-${hibench.spark.version}-jar-with-dependencies.jar
21+
hibench.streambench.stormbench.jar ${hibench.home}/src/streambench/stormbench/target/streaming-bench-storm-5.0-SNAPSHOT.jar
22+
hibench.streambench.gearpump.jar ${hibench.home}/src/streambench/gearpumpbench/target/streaming-bench-gearpump-5.0-SNAPSHOT-jar-with-dependencies.jar
23+
hibench.streambench.flinkbench.jar ${hibench.home}/src/streambench/flinkbench/target/streaming-bench-flink-5.0-SNAPSHOT-jar-with-dependencies.jar
2424

25-
# Total round count of data send
26-
hibench.streamingbench.prepare.periodic.totalRound 100
25+
#########################################################
26+
# Kafka Config
27+
#########################################################
2728

28-
# zookeeper host:port of kafka cluster
29+
# Kafka home
30+
hibench.streambench.kafka.home /PATH/TO/KAFKA/HOME
2931

30-
#example: hostname:9092
31-
hibench.streamingbench.zookeeper.host HOSTNAME:HOSTPORT
32+
# the topic that spark will receive input data (default: ${hibench.streambench.testCase})
33+
hibench.streambench.kafka.topic ${hibench.streambench.testCase}
3234

33-
#Parallel config
34-
# number of nodes that will receive kafka input
35-
hibench.streamingbench.receiver_nodes 4
35+
# number of partitions of generated topic (default 20)
36+
hibench.streambench.kafka.topicPartitions 20
3637

37-
###############
38-
#Benchmark args
39-
#Note to ensure benchName to be consistent with datagen type. Numeric data for statistics and text data for others
40-
# available benchname: identity sample project grep wordcount distinctcount statistics
38+
# consumer group of the consumer for kafka (default: HiBench)
39+
hibench.streambench.kafka.consumerGroup HiBench
4140

42-
hibench.streamingbench.benchname identity
41+
# Kafka broker lists, written in mode "host:port,host:port,..." (default: HOSTNAME:HOSTPORT)
42+
hibench.streambench.kafka.brokerList HOSTNAME:HOSTPORT
4343

44-
#common args
45-
# the topic that spark will receive input data
46-
hibench.streamingbench.topic_name ${hibench.streamingbench.benchname}
44+
# Set the starting offset of kafkaConsumer (default: largest)
45+
hibench.streambench.kafka.offsetReset largest
46+
#########################################################
47+
# Data Generator Config
48+
#########################################################
4749

48-
# Spark stream batch interval (in seconds)
49-
hibench.streamingbench.batch_interval 10
50+
# Interval span in millisecond (default: 50)
51+
hibench.streambench.datagen.intervalSpan 50
5052

51-
# consumer group of the spark consumer for kafka
52-
hibench.streamingbench.consumer_group HiBench
53+
# Number of records to generate per interval span (default: 5)
54+
hibench.streambench.datagen.recordsPerInterval 5
5355

54-
# expected number of records to be processed
55-
hibench.streamingbench.record_count 900000000
56+
# Number of total records that will be generated (default: -1 means infinity)
57+
hibench.streambench.datagen.totalRecords -1
5658

57-
#sketch/distinctcount/statistics arg
58-
# the field index of the record that will be extracted
59-
hibench.streamingbench.field_index 1
59+
# Total round count of data send (default: -1 means infinity)
60+
hibench.streambench.datagen.totalRounds -1
6061

61-
#sketch/wordcount/distinctcount/statistics arg
62-
# the seperator between fields of a single record
63-
hibench.streamingbench.separator \\s+
62+
# default path to store seed files (default: ${hibench.hdfs.data.dir}/Streaming)
63+
hibench.streambench.datagen.dir ${hibench.hdfs.data.dir}/Streaming
6464

65-
#sample arg
66-
# probability that a record will be taken as a sample
67-
hibench.streamingbench.prob 0.1
65+
# fixed length of record (default: 200)
66+
hibench.streambench.datagen.recordLength 200
6867

69-
#grep arg
70-
# the substring that will be checked to see if contained in a record
71-
hibench.streamingbench.pattern the
68+
# Number of KafkaProducer running on different thread (default: 1)
69+
# The limitation of a single KafkaProducer is about 100Mb/s
70+
hibench.streambench.datagen.producerNumber 1
7271

73-
#common arg
74-
# indicate RDD storage level.
75-
# 1 for memory only 1 copy. Others for default mem_disk_ser 2 copies
76-
hibench.streamingbench.copies 2
72+
hibench.streambench.fixWindowDuration 30000
7773

78-
# indicate whether to test the write ahead log new feature
79-
# set true to test WAL feature
80-
hibench.streamingbench.testWAL false
74+
hibench.streambench.fixWindowSlideStep 30000
75+
#########################################################
76+
# Spark Streaming Config
77+
#########################################################
8178

82-
# if testWAL is true, this path to store stream context in hdfs shall be specified. If false, it can be empty
83-
hibench.streamingbench.checkpoint_path
79+
# Number of nodes that will receive kafka input (default: 4)
80+
hibench.streambench.spark.receiverNumber 4
8481

85-
#common arg
86-
# indicate whether in debug mode for correctness verfication
87-
hibench.streamingbench.debug false
82+
# Spark streaming Batchnterval in millisecond (default 100)
83+
hibench.streambench.spark.batchInterval 100
8884

89-
# whether to use direct approach or not ( sparkstreaming only )
90-
hibench.streamingbench.direct_mode true
85+
# Indicate RDD storage level. (default: 2)
86+
# 0 = StorageLevel.MEMORY_ONLY
87+
# 1 = StorageLevel.MEMORY_AND_DISK_SER
88+
# other = StorageLevel.MEMORY_AND_DISK_SER_2
89+
hibench.streambench.spark.storageLevel 2
9190

92-
# Kafka broker lists, used for direct mode, written in mode "host:port,host:port,..."
91+
# indicate whether to test the write ahead log new feature (default: false)
92+
hibench.streambench.spark.enableWAL false
9393

94-
# example: hostname:9092
95-
hibench.streamingbench.brokerList HOSTNAME:HOSTPORT
94+
# if testWAL is true, this path to store stream context in hdfs shall be specified. If false, it can be empty (default: /var/tmp)
95+
hibench.streambench.spark.checkpointPath /var/tmp
9696

97-
hibench.streamingbench.broker_list_with_quote "${hibench.streamingbench.brokerList}"
97+
# whether to use direct approach or not (dafault: true)
98+
hibench.streambench.spark.useDirectMode true
9899

99-
# storm bench conf
100+
#########################################################
101+
# Flink Config
102+
#########################################################
103+
hibench.streambench.flink.home /PATH/TO/FLINK/HOME
100104

101-
# STORM_BIN_HOME
102-
hibench.streamingbench.storm.home /PATH/TO/STORM/HOME
105+
# default parallelism of flink job
106+
hibench.streambench.flink.parallelism 20
103107

104-
# Kafka home
105-
hibench.streamingbench.kafka.home /PATH/TO/KAFKA/HOME
108+
hibench.streambench.flink.bufferTimeout 5
106109

110+
hibench.streambench.flink.checkpointDuration 1000
107111

108-
#Cluster config
109-
# nimbus of storm cluster
110-
hibench.streamingbench.storm.nimbus HOSTNAME_OF_STORM
111-
hibench.streamingbench.storm.nimbusAPIPort 6627
112+
#########################################################
113+
# Storm Config
114+
#########################################################
112115

113-
# time interval to contact nimbus to judge if finished
114-
hibench.streamingbench.storm.nimbusContactInterval 10
116+
# STORM_BIN_HOME
117+
hibench.streambench.storm.home /PATH/TO/STORM/HOME
115118

119+
# nimbus of storm cluster
120+
hibench.streambench.storm.nimbus HOSTNAME_OF_STORM_NIMBUS
121+
hibench.streambench.storm.nimbusAPIPort 6627
116122

117-
#Parallel config
123+
# time interval to contact nimbus to judge if finished
124+
hibench.streambench.storm.nimbusContactInterval 10
118125

119126
# number of workers of Storm. Number of most bolt threads is also equal to this param.
120-
hibench.streamingbench.storm.worker_count 12
127+
hibench.streambench.storm.worker_count 12
121128

122129
# number of kafka spout threads of Storm
123-
hibench.streamingbench.storm.spout_threads 12
130+
hibench.streambench.storm.spout_threads 12
124131

125132
# number of bolt threads altogether
126-
hibench.streamingbench.storm.bolt_threads 12
133+
hibench.streambench.storm.bolt_threads 12
127134

128135
# kafka arg indicating whether to read data from kafka from the start or go on to read from last position
129-
hibench.streamingbench.storm.read_from_start true
136+
hibench.streambench.storm.read_from_start true
130137

131138
# whether to turn on ack
132-
hibench.streamingbench.storm.ackon true
139+
hibench.streambench.storm.ackon true
140+
141+
#########################################################
142+
# Gearpump Config
143+
#########################################################
144+
145+
hibench.streambench.gearpump.home /PATH/TO/GEARPUMP/HOME
146+
147+
hibench.streambench.gearpump.executors 1
148+
149+
hibench.streambench.gearpump.parallelism 1
133150

134-
# Added for default rules:
135-
hibench.streamingbench.jars ${hibench.streamingbench.sparkbench.jar}
136-
hibench.streamingbench.sparkbench.jar ${hibench.home}/src/streambench/sparkbench/target/streaming-bench-spark_0.1-5.0-SNAPSHOT-${hibench.spark.version}-jar-with-dependencies.jar
137-
hibench.streamingbench.stormbench.jar ${hibench.home}/src/streambench/stormbench/target/streaming-bench-storm-0.1-SNAPSHOT-jar-with-dependencies.jar
138-
hibench.streamingbench.datagen.jar ${hibench.home}/src/streambench/datagen/target/datagen-0.0.1-jar-with-dependencies.jar
139-
hibench.streamingbench.storm.bin ${hibench.streamingbench.storm.home}/bin
140-
hibench.streamingbench.zkhelper.jar ${hibench.home}/src/streambench/zkHelper/target/streaming-bench-zkhelper-0.1-SNAPSHOT-jar-with-dependencies.jar
141151

142-
# default path setting for store of data1 & data2
143-
hibench.streamingbench.datagen.dir ${hibench.hdfs.data.dir}/Streaming
144152

145-
# partition size settings
146-
hibench.streamingbench.partitions 1

0 commit comments

Comments
 (0)