Skip to content
This repository was archived by the owner on Dec 15, 2025. It is now read-only.

Commit 1f49a4f

Browse files
authored
Merge pull request #2 from carsonwang/revert-1-mergeStreamingWithMaster
Revert "Merge streaming with master"
2 parents 6634fe9 + 38c8a27 commit 1f49a4f

177 files changed

Lines changed: 43376 additions & 4746 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

bin/functions/hibench_prop_env_mapping.py

Lines changed: 22 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -123,36 +123,31 @@
123123
MODEL="hibench.nweight.model",
124124

125125
# For streaming bench
126-
STREAMING_TESTCASE="hibench.streambench.testCase",
127-
126+
# zkHelper
127+
STREAMING_ZKHELPER_JAR="hibench.streamingbench.zkhelper.jar",
128128
# prepare
129-
STREAMING_TOPIC_NAME="hibench.streambench.kafka.topic",
130-
STREAMING_KAFKA_HOME="hibench.streambench.kafka.home",
131-
STREAMING_ZKADDR="hibench.streambench.zkHost",
132-
STREAMING_CONSUMER_GROUP="hibench.streambench.kafka.consumerGroup",
133-
STREAMING_DATA_DIR="hibench.streambench.datagen.dir",
134-
STREAMING_DATA1_NAME="hibench.streambench.datagen.data1.name",
135-
STREAMING_DATA1_DIR="hibench.streambench.datagen.data1.dir",
136-
STREAMING_DATA1_LENGTH="hibench.streambench.datagen.recordLength",
137-
STREAMING_DATA2_SAMPLE_DIR="hibench.streambench.datagen.data2_samples.dir",
138-
STREAMING_DATA2_CLUSTER_DIR="hibench.streambench.datagen.data2_cluster.dir",
139-
STREAMING_PARTITIONS="hibench.streambench.kafka.topicPartitions",
140-
DATA_GEN_JAR="hibench.streambench.datagen.jar",
129+
STREAMING_TOPIC_NAME="hibench.streamingbench.topic_name",
130+
STREAMING_KAFKA_HOME="hibench.streamingbench.kafka.home",
131+
STREAMING_ZKADDR="hibench.streamingbench.zookeeper.host",
132+
STREAMING_CONSUMER_GROUP="hibench.streamingbench.consumer_group",
133+
STREAMING_DATA_SCALE_FACTOR="hibench.streamingbench.datagen.scale_factor",
134+
STREAMING_DATA_DIR="hibench.streamingbench.datagen.dir",
135+
STREAMING_DATA1_NAME="hibench.streamingbench.datagen.data1.name",
136+
STREAMING_DATA1_DIR="hibench.streamingbench.datagen.data1.dir",
137+
STREAMING_DATA1_LENGTH="hibench.streamingbench.datagen.data1.length",
138+
STREAMING_DATA2_SAMPLE_DIR="hibench.streamingbench.datagen.data2_samples.dir",
139+
STREAMING_DATA2_CLUSTER_DIR="hibench.streamingbench.datagen.data2_cluster.dir",
140+
STREAMING_PARTITIONS="hibench.streamingbench.partitions",
141+
DATA_GEN_JAR="hibench.streamingbench.datagen.jar",
142+
143+
STREAMING_DATAGEN_MODE="hibench.streamingbench.prepare.mode",
144+
STREAMING_DATAGEN_RECORDS="hibench.streamingbench.prepare.push.records",
141145

142146
# sparkstreaming
143-
STREAMBENCH_SPARK_JAR="hibench.streambench.sparkbench.jar",
144-
STREAMBENCH_STORM_JAR="hibench.streambench.stormbench.jar",
145-
146-
# gearpump
147-
GEARPUMP_HOME="hibench.streambench.gearpump.home",
148-
STREAMBENCH_GEARPUMP_JAR="hibench.streambench.gearpump.jar",
149-
STREAMBENCH_GEARPUMP_EXECUTORS="hibench.streambench.gearpump.executors",
150-
151-
# flinkstreaming
152-
HIBENCH_FLINK_MASTER="hibench.flink.master",
153-
FLINK_HOME="hibench.streambench.flink.home",
154-
STREAMBENCH_FLINK_JAR="hibench.streambench.flinkbench.jar",
155-
STREAMBENCH_FLINK_PARALLELISM="hibench.streambench.flink.parallelism",
147+
STREAMINGBENCH_JARS="hibench.streamingbench.jars",
148+
STREAMBENCH_STORM_JAR="hibench.streamingbench.stormbench.jar",
149+
STORM_BIN_HOME="hibench.streamingbench.storm.bin",
150+
STREAMING_BENCHNAME="hibench.streamingbench.benchname",
156151

157152
# samza
158153
STREAMING_SAMZA_WORDCOUNT_INTERNAL_TOPIC="samza_internal.wordcount.kafka.input.name",

bin/functions/load-config.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -239,9 +239,9 @@ def wildcard_replacement(key, value):
239239
finish = False
240240

241241

242-
# wildcard_rules = [(key, HibenchConf[key]) for key in HibenchConf if "*" in key]
242+
wildcard_rules = [(key, HibenchConf[key]) for key in HibenchConf if "*" in key]
243243
# now, let's check wildcard replacement rules
244-
# for key, value in wildcard_rules:
244+
for key, value in wildcard_rules:
245245
# check if we found a rule like: aaa.*.ccc.*.ddd -> bbb.*.*
246246

247247
# wildcard replacement is useful for samza conf, which
@@ -253,12 +253,12 @@ def wildcard_replacement(key, value):
253253
# switch the order of two wildcards, something like the
254254
# first wildcard in key to match the second wildcard in
255255
# value. I just don't think it'll be needed.
256-
# if not wildcard_replacement(key, value): # not wildcard rules? re-add
256+
if not wildcard_replacement(key, value): # not wildcard rules? re-add
257257
HibenchConf[key] = value
258-
# if wildcard_rules: # need try again
259-
# wildcard_rules = []
260-
# else: break
261-
break
258+
if wildcard_rules: # need try again
259+
wildcard_rules = []
260+
else: break
261+
262262
# all finished, remove values contains no_value_sign
263263
for key in [x for x in HibenchConf if no_value_sign in HibenchConf[x]]:
264264
del HibenchConf[key]

bin/functions/workload-functions.sh

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -271,27 +271,15 @@ function run-spark-job() {
271271
}
272272

273273
function run-streaming-job (){
274-
run-spark-job --jars ${STREAMBENCH_SPARK_JAR} $@
274+
run-spark-job --jars ${STREAMINGBENCH_JARS} $@
275275
}
276276

277277
function run-storm-job(){
278-
CMD="${STORM_HOME}/bin/storm jar ${STREAMBENCH_STORM_JAR} $@"
278+
CMD="${STORM_BIN_HOME}/storm jar ${STREAMBENCH_STORM_JAR} $@"
279279
echo -e "${BGreen}Submit Storm Job: ${Green}$CMD${Color_Off}"
280280
execute_withlog $CMD
281281
}
282282

283-
function run-gearpump-app(){
284-
CMD="${GEARPUMP_HOME}/bin/gear app -executors ${STREAMBENCH_GEARPUMP_EXECUTORS} -jar ${STREAMBENCH_GEARPUMP_JAR} $@"
285-
echo -e "${BGreen}Submit Gearpump Application: ${Green}$CMD${Color_Off}"
286-
execute_withlog $CMD
287-
}
288-
289-
function run-flink-job(){
290-
CMD="${FLINK_HOME}/bin/flink run -p ${STREAMBENCH_FLINK_PARALLELISM} -m ${HIBENCH_FLINK_MASTER} $@ ${STREAMBENCH_FLINK_JAR} ${SPARKBENCH_PROPERTIES_FILES}"
291-
echo -e "${BGreen}Submit Flink Job: ${Green}$CMD${Color_Off}"
292-
execute_withlog $CMD
293-
}
294-
295283
function run-hadoop-job(){
296284
ENABLE_MONITOR=1
297285
if [ "$1" = "--without-monitor" ]; then

conf/00-default-properties.conf

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -257,9 +257,3 @@ spark.default.parallelism ${hibench.default.map.parallelism}
257257

258258
# set spark sql's default shuffle partitions according to hibench's parallelism value
259259
spark.sql.shuffle.partitions ${hibench.default.map.parallelism}
260-
261-
262-
#=======================================================
263-
# Flink
264-
#=======================================================
265-
hibench.flink.master FLINK_JM_HOST:PORT
Lines changed: 98 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -1,152 +1,146 @@
1-
#########################################################
2-
# General Stream Config
3-
#########################################################
1+
# Two data sets(text and numeric) are available, app argument indicates to use which
2+
#app=micro-sketch #use text dataset, avg record size: 60 bytes
3+
#app=micro-statistics #use numeric dataset, avg record size: 200 bytes
4+
hibench.streamingbench.app micro-sketch
45

5-
# Note to ensure benchName to be consistent with datagen type. Numeric data for statistics and text data for others
6-
# (available benchname: identity, repartition) TDB: sample project grep wordcount distinctcount statistics
7-
hibench.streambench.testCase identity
6+
# Text dataset can be scaled in terms of record size
7+
hibench.streamingbench.prepare.textdataset_recordsize_factor
88

9-
# zookeeper address for Kakfa serverce, (default: HOSTNAME:HOSTPORT)
10-
hibench.streambench.zkHost HOSTNAME:HOSTPORT
9+
# Two modes of generator: push,periodic
10+
# Push means to send data to kafka cluster as fast as it could
11+
# Periodic means sending data according to sending rate specification
12+
#hibench.streamingbench.prepare.mode push
13+
hibench.streamingbench.prepare.mode periodic
1114

12-
# Probability used in sample test case
13-
hibench.streambench.sampleProbability 0.1
15+
# Under push mode: number of total records that will be generated
16+
hibench.streamingbench.prepare.push.records 900000000
1417

15-
# Indicate whether in debug mode for correctness verfication (default: false)
16-
hibench.streambench.debugMode false
18+
# Following three params are under periodic mode
19+
# Bytes to push per interval
20+
hibench.streamingbench.prepare.periodic.recordPerInterval 600000
1721

18-
# JARS
19-
hibench.streambench.datagen.jar ${hibench.home}/src/streambench/datagen/target/streaming-bench-datagen-5.0-SNAPSHOT-jar-with-dependencies.jar
20-
hibench.streambench.sparkbench.jar ${hibench.home}/src/streambench/sparkbench/target/streaming-bench-spark-5.0-SNAPSHOT-${hibench.spark.version}-jar-with-dependencies.jar
21-
hibench.streambench.stormbench.jar ${hibench.home}/src/streambench/stormbench/target/streaming-bench-storm-5.0-SNAPSHOT.jar
22-
hibench.streambench.gearpump.jar ${hibench.home}/src/streambench/gearpumpbench/target/streaming-bench-gearpump-5.0-SNAPSHOT-jar-with-dependencies.jar
23-
hibench.streambench.flinkbench.jar ${hibench.home}/src/streambench/flinkbench/target/streaming-bench-flink-5.0-SNAPSHOT-jar-with-dependencies.jar
22+
# Interval time (in ms)
23+
hibench.streamingbench.prepare.periodic.intervalSpan 5000
2424

25-
#########################################################
26-
# Kafka Config
27-
#########################################################
25+
# Total round count of data send
26+
hibench.streamingbench.prepare.periodic.totalRound 100
2827

29-
# Kafka home
30-
hibench.streambench.kafka.home /PATH/TO/KAFKA/HOME
31-
32-
# the topic that spark will receive input data (default: ${hibench.streambench.testCase})
33-
hibench.streambench.kafka.topic ${hibench.streambench.testCase}
34-
35-
# number of partitions of generated topic (default 20)
36-
hibench.streambench.kafka.topicPartitions 20
28+
# zookeeper host:port of kafka cluster
3729

38-
# consumer group of the consumer for kafka (default: HiBench)
39-
hibench.streambench.kafka.consumerGroup HiBench
30+
#example: hostname:9092
31+
hibench.streamingbench.zookeeper.host HOSTNAME:HOSTPORT
4032

41-
# Kafka broker lists, written in mode "host:port,host:port,..." (default: HOSTNAME:HOSTPORT)
42-
hibench.streambench.kafka.brokerList HOSTNAME:HOSTPORT
33+
#Parallel config
34+
# number of nodes that will receive kafka input
35+
hibench.streamingbench.receiver_nodes 4
4336

44-
# Set the starting offset of kafkaConsumer (default: largest)
45-
hibench.streambench.kafka.offsetReset largest
46-
#########################################################
47-
# Data Generator Config
48-
#########################################################
37+
###############
38+
#Benchmark args
39+
#Note to ensure benchName to be consistent with datagen type. Numeric data for statistics and text data for others
40+
# available benchname: identity sample project grep wordcount distinctcount statistics
4941

50-
# Interval span in millisecond (default: 50)
51-
hibench.streambench.datagen.intervalSpan 50
42+
hibench.streamingbench.benchname identity
5243

53-
# Number of records to generate per interval span (default: 5)
54-
hibench.streambench.datagen.recordsPerInterval 5
44+
#common args
45+
# the topic that spark will receive input data
46+
hibench.streamingbench.topic_name ${hibench.streamingbench.benchname}
5547

56-
# Number of total records that will be generated (default: -1 means infinity)
57-
hibench.streambench.datagen.totalRecords -1
48+
# Spark stream batch interval (in seconds)
49+
hibench.streamingbench.batch_interval 10
5850

59-
# Total round count of data send (default: -1 means infinity)
60-
hibench.streambench.datagen.totalRounds -1
51+
# consumer group of the spark consumer for kafka
52+
hibench.streamingbench.consumer_group HiBench
6153

62-
# default path to store seed files (default: ${hibench.hdfs.data.dir}/Streaming)
63-
hibench.streambench.datagen.dir ${hibench.hdfs.data.dir}/Streaming
54+
# expected number of records to be processed
55+
hibench.streamingbench.record_count 900000000
6456

65-
# fixed length of record (default: 200)
66-
hibench.streambench.datagen.recordLength 200
57+
#sketch/distinctcount/statistics arg
58+
# the field index of the record that will be extracted
59+
hibench.streamingbench.field_index 1
6760

68-
# Number of KafkaProducer running on different thread (default: 1)
69-
# The limitation of a single KafkaProducer is about 100Mb/s
70-
hibench.streambench.datagen.producerNumber 1
61+
#sketch/wordcount/distinctcount/statistics arg
62+
# the seperator between fields of a single record
63+
hibench.streamingbench.separator \\s+
7164

72-
hibench.streambench.fixWindowDuration 30000
65+
#sample arg
66+
# probability that a record will be taken as a sample
67+
hibench.streamingbench.prob 0.1
7368

74-
hibench.streambench.fixWindowSlideStep 30000
75-
#########################################################
76-
# Spark Streaming Config
77-
#########################################################
69+
#grep arg
70+
# the substring that will be checked to see if contained in a record
71+
hibench.streamingbench.pattern the
7872

79-
# Number of nodes that will receive kafka input (default: 4)
80-
hibench.streambench.spark.receiverNumber 4
73+
#common arg
74+
# indicate RDD storage level.
75+
# 1 for memory only 1 copy. Others for default mem_disk_ser 2 copies
76+
hibench.streamingbench.copies 2
8177

82-
# Spark streaming Batchnterval in millisecond (default 100)
83-
hibench.streambench.spark.batchInterval 100
78+
# indicate whether to test the write ahead log new feature
79+
# set true to test WAL feature
80+
hibench.streamingbench.testWAL false
8481

85-
# Indicate RDD storage level. (default: 2)
86-
# 0 = StorageLevel.MEMORY_ONLY
87-
# 1 = StorageLevel.MEMORY_AND_DISK_SER
88-
# other = StorageLevel.MEMORY_AND_DISK_SER_2
89-
hibench.streambench.spark.storageLevel 2
82+
# if testWAL is true, this path to store stream context in hdfs shall be specified. If false, it can be empty
83+
hibench.streamingbench.checkpoint_path
9084

91-
# indicate whether to test the write ahead log new feature (default: false)
92-
hibench.streambench.spark.enableWAL false
85+
#common arg
86+
# indicate whether in debug mode for correctness verfication
87+
hibench.streamingbench.debug false
9388

94-
# if testWAL is true, this path to store stream context in hdfs shall be specified. If false, it can be empty (default: /var/tmp)
95-
hibench.streambench.spark.checkpointPath /var/tmp
89+
# whether to use direct approach or not ( sparkstreaming only )
90+
hibench.streamingbench.direct_mode true
9691

97-
# whether to use direct approach or not (dafault: true)
98-
hibench.streambench.spark.useDirectMode true
92+
# Kafka broker lists, used for direct mode, written in mode "host:port,host:port,..."
9993

100-
#########################################################
101-
# Flink Config
102-
#########################################################
103-
hibench.streambench.flink.home /PATH/TO/FLINK/HOME
94+
# example: hostname:9092
95+
hibench.streamingbench.brokerList HOSTNAME:HOSTPORT
10496

105-
# default parallelism of flink job
106-
hibench.streambench.flink.parallelism 20
97+
hibench.streamingbench.broker_list_with_quote "${hibench.streamingbench.brokerList}"
10798

108-
hibench.streambench.flink.bufferTimeout 5
99+
# storm bench conf
109100

110-
hibench.streambench.flink.checkpointDuration 1000
101+
# STORM_BIN_HOME
102+
hibench.streamingbench.storm.home /PATH/TO/STORM/HOME
111103

112-
#########################################################
113-
# Storm Config
114-
#########################################################
104+
# Kafka home
105+
hibench.streamingbench.kafka.home /PATH/TO/KAFKA/HOME
115106

116-
# STORM_BIN_HOME
117-
hibench.streambench.storm.home /PATH/TO/STORM/HOME
118107

108+
#Cluster config
119109
# nimbus of storm cluster
120-
hibench.streambench.storm.nimbus HOSTNAME_OF_STORM_NIMBUS
121-
hibench.streambench.storm.nimbusAPIPort 6627
110+
hibench.streamingbench.storm.nimbus HOSTNAME_OF_STORM
111+
hibench.streamingbench.storm.nimbusAPIPort 6627
122112

123113
# time interval to contact nimbus to judge if finished
124-
hibench.streambench.storm.nimbusContactInterval 10
114+
hibench.streamingbench.storm.nimbusContactInterval 10
115+
116+
117+
#Parallel config
125118

126119
# number of workers of Storm. Number of most bolt threads is also equal to this param.
127-
hibench.streambench.storm.worker_count 12
120+
hibench.streamingbench.storm.worker_count 12
128121

129122
# number of kafka spout threads of Storm
130-
hibench.streambench.storm.spout_threads 12
123+
hibench.streamingbench.storm.spout_threads 12
131124

132125
# number of bolt threads altogether
133-
hibench.streambench.storm.bolt_threads 12
126+
hibench.streamingbench.storm.bolt_threads 12
134127

135128
# kafka arg indicating whether to read data from kafka from the start or go on to read from last position
136-
hibench.streambench.storm.read_from_start true
129+
hibench.streamingbench.storm.read_from_start true
137130

138131
# whether to turn on ack
139-
hibench.streambench.storm.ackon true
140-
141-
#########################################################
142-
# Gearpump Config
143-
#########################################################
144-
145-
hibench.streambench.gearpump.home /PATH/TO/GEARPUMP/HOME
146-
147-
hibench.streambench.gearpump.executors 1
148-
149-
hibench.streambench.gearpump.parallelism 1
132+
hibench.streamingbench.storm.ackon true
150133

134+
# Added for default rules:
135+
hibench.streamingbench.jars ${hibench.streamingbench.sparkbench.jar}
136+
hibench.streamingbench.sparkbench.jar ${hibench.home}/src/streambench/sparkbench/target/streaming-bench-spark_0.1-5.0-SNAPSHOT-${hibench.spark.version}-jar-with-dependencies.jar
137+
hibench.streamingbench.stormbench.jar ${hibench.home}/src/streambench/stormbench/target/streaming-bench-storm-0.1-SNAPSHOT-jar-with-dependencies.jar
138+
hibench.streamingbench.datagen.jar ${hibench.home}/src/streambench/datagen/target/datagen-0.0.1-jar-with-dependencies.jar
139+
hibench.streamingbench.storm.bin ${hibench.streamingbench.storm.home}/bin
140+
hibench.streamingbench.zkhelper.jar ${hibench.home}/src/streambench/zkHelper/target/streaming-bench-zkhelper-0.1-SNAPSHOT-jar-with-dependencies.jar
151141

142+
# default path setting for store of data1 & data2
143+
hibench.streamingbench.datagen.dir ${hibench.hdfs.data.dir}/Streaming
152144

145+
# partition size settings
146+
hibench.streamingbench.partitions 1

0 commit comments

Comments
 (0)