Skip to content
This repository was archived by the owner on Dec 15, 2025. It is now read-only.

Commit a971b4b

Browse files
Merge pull request #304 from mwws/streaming
enable checkpoint for fixwindow and wordcount testcase in Flink
2 parents 31a5505 + 0bf153d commit a971b4b

6 files changed

Lines changed: 8 additions & 0 deletions

File tree

conf/01-default-streamingbench.conf

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@ hibench.streambench.flink.parallelism 20
107107

108108
hibench.streambench.flink.bufferTimeout 5
109109

110+
hibench.streambench.flink.checkpointDuration 1000
111+
110112
#########################################################
111113
# Storm Config
112114
#########################################################

src/streambench/common/src/main/java/com/intel/hibench/streambench/common/StreamBenchConfig.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ public class StreamBenchConfig {
8686
// ======================================
8787
public static String FLINK_BUFFERTIMEOUT = "hibench.streambench.flink.bufferTimeout";
8888

89+
public static String FLINK_CHECKPOINTDURATION = "hibench.streambench.flink.checkpointDuration";
90+
8991
// ======================================
9092
// Storm Related Conf
9193
// ======================================

src/streambench/flinkbench/src/main/java/com/intel/flinkbench/RunBench.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ public static void runAll(String[] args) throws Exception {
5151
conf.windowDuration = cl.getProperty(StreamBenchConfig.FixWINDOW_DURATION);
5252
conf.windowSlideStep = cl.getProperty(StreamBenchConfig.FixWINDOW_SLIDESTEP);
5353

54+
conf.checkpointDuration = Long.parseLong(cl.getProperty(StreamBenchConfig.FLINK_CHECKPOINTDURATION));
5455
int producerNum = Integer.parseInt(cl.getProperty(StreamBenchConfig.DATAGEN_PRODUCER_NUMBER));
5556
long recordsPerInterval = Long.parseLong(cl.getProperty(StreamBenchConfig.DATAGEN_RECORDS_PRE_INTERVAL));
5657
int intervalSpan = Integer.parseInt(cl.getProperty(StreamBenchConfig.DATAGEN_INTERVAL_SPAN));

src/streambench/flinkbench/src/main/java/com/intel/flinkbench/microbench/FixedWindow.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ public void processStream(final FlinkBenchConfig config) throws Exception{
1818

1919
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
2020
env.setBufferTimeout(config.bufferTimeout);
21+
env.enableCheckpointing(config.checkpointDuration);
2122

2223
createDataStream(config);
2324
DataStream<Tuple2<String, String>> dataStream = env.addSource(getDataStream());

src/streambench/flinkbench/src/main/java/com/intel/flinkbench/microbench/WordCount.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public class WordCount extends StreamBase {
3838
public void processStream(final FlinkBenchConfig config) throws Exception {
3939
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
4040
env.setBufferTimeout(config.bufferTimeout);
41+
env.enableCheckpointing(config.checkpointDuration);
4142
createDataStream(config);
4243
DataStream<Tuple2<String, String>> dataStream = env.addSource(getDataStream());
4344
dataStream

src/streambench/flinkbench/src/main/java/com/intel/flinkbench/util/FlinkBenchConfig.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,5 +39,6 @@ public class FlinkBenchConfig implements Serializable{
3939

4040
// Flink related
4141
public long bufferTimeout;
42+
public long checkpointDuration;
4243

4344
}

0 commit comments

Comments
 (0)