Skip to content
This repository was archived by the owner on Dec 15, 2025. It is now read-only.

Commit ec2e832

Browse files
committed
fix trident wordcount
1 parent 9569446 commit ec2e832

1 file changed

Lines changed: 12 additions & 29 deletions

File tree

  • src/streambench/stormbench/src/main/java/com/intel/hibench/streambench/storm/trident

src/streambench/stormbench/src/main/java/com/intel/hibench/streambench/storm/trident/TridentWordcount.java

Lines changed: 12 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,12 @@
2323
import com.intel.hibench.streambench.storm.topologies.SingleTridentSpoutTops;
2424
import com.intel.hibench.streambench.storm.trident.functions.Parser;
2525
import com.intel.hibench.streambench.storm.util.StormBenchConfig;
26-
import org.apache.storm.kafka.trident.OpaqueTridentKafkaSpout;
2726
import org.apache.storm.trident.TridentTopology;
28-
import org.apache.storm.trident.operation.Aggregator;
29-
import org.apache.storm.trident.operation.TridentCollector;
30-
import org.apache.storm.trident.operation.TridentOperationContext;
27+
import org.apache.storm.trident.operation.ReducerAggregator;
3128
import org.apache.storm.trident.spout.ITridentDataSource;
29+
import org.apache.storm.trident.testing.MemoryMapState;
3230
import org.apache.storm.trident.tuple.TridentTuple;
3331
import org.apache.storm.tuple.Fields;
34-
import org.apache.storm.tuple.Values;
35-
36-
import java.util.Map;
3732

3833
public class TridentWordcount extends SingleTridentSpoutTops {
3934

@@ -51,50 +46,38 @@ public TridentTopology createTopology() {
5146
.project(new Fields("ip", "time"))
5247
.parallelismHint(config.spoutThreads)
5348
.groupBy(new Fields("ip"))
54-
.aggregate(new Fields("ip", "time"), new Count(config), new Fields("word", "count"))
49+
.persistentAggregate(new MemoryMapState.Factory(), new Fields("ip", "time"), new Count(config),
50+
new Fields("word", "count"))
5551
.parallelismHint(config.boltThreads);
5652
return topology;
5753
}
5854

59-
private static class Count implements Aggregator<Count.State> {
55+
private static class Count implements ReducerAggregator<Count.State> {
6056

6157
private final StormBenchConfig config;
6258
private LatencyReporter reporter = null;
6359

64-
static class State {
65-
String ip;
66-
long count = 0;
67-
68-
}
69-
7060
Count(StormBenchConfig config) {
7161
this.config = config;
7262
}
7363

7464
@Override
75-
public void prepare(Map conf, TridentOperationContext context) {
65+
public State init() {
7666
this.reporter = new KafkaReporter(config.reporterTopic, config.brokerList);
77-
}
78-
79-
@Override
80-
public State init(Object o, TridentCollector tridentCollector) {
8167
return new State();
8268
}
8369

8470
@Override
85-
public void aggregate(State state, TridentTuple tridentTuple, TridentCollector tridentCollector) {
71+
public State reduce(State state, TridentTuple tridentTuple) {
8672
state.ip = tridentTuple.getString(0);
8773
state.count++;
88-
tridentCollector.emit(new Values(state.ip, state.count));
89-
reporter.report(tridentTuple.getLong(0), System.currentTimeMillis());
90-
}
91-
92-
@Override
93-
public void complete(State state, TridentCollector tridentCollector) {
74+
reporter.report(tridentTuple.getLong(1), System.currentTimeMillis());
75+
return state;
9476
}
9577

96-
@Override
97-
public void cleanup() {
78+
static class State {
79+
String ip;
80+
long count = 0;
9881
}
9982
}
10083
}

0 commit comments

Comments
 (0)