2323import com .intel .hibench .streambench .storm .topologies .SingleTridentSpoutTops ;
2424import com .intel .hibench .streambench .storm .trident .functions .Parser ;
2525import com .intel .hibench .streambench .storm .util .StormBenchConfig ;
26- import org .apache .storm .kafka .trident .OpaqueTridentKafkaSpout ;
2726import org .apache .storm .trident .TridentTopology ;
28- import org .apache .storm .trident .operation .Aggregator ;
29- import org .apache .storm .trident .operation .TridentCollector ;
30- import org .apache .storm .trident .operation .TridentOperationContext ;
27+ import org .apache .storm .trident .operation .ReducerAggregator ;
3128import org .apache .storm .trident .spout .ITridentDataSource ;
29+ import org .apache .storm .trident .testing .MemoryMapState ;
3230import org .apache .storm .trident .tuple .TridentTuple ;
3331import org .apache .storm .tuple .Fields ;
34- import org .apache .storm .tuple .Values ;
35-
36- import java .util .Map ;
3732
3833public class TridentWordcount extends SingleTridentSpoutTops {
3934
@@ -51,50 +46,38 @@ public TridentTopology createTopology() {
5146 .project (new Fields ("ip" , "time" ))
5247 .parallelismHint (config .spoutThreads )
5348 .groupBy (new Fields ("ip" ))
54- .aggregate (new Fields ("ip" , "time" ), new Count (config ), new Fields ("word" , "count" ))
49+ .persistentAggregate (new MemoryMapState .Factory (), new Fields ("ip" , "time" ), new Count (config ),
50+ new Fields ("word" , "count" ))
5551 .parallelismHint (config .boltThreads );
5652 return topology ;
5753 }
5854
59- private static class Count implements Aggregator <Count .State > {
55+ private static class Count implements ReducerAggregator <Count .State > {
6056
6157 private final StormBenchConfig config ;
6258 private LatencyReporter reporter = null ;
6359
64- static class State {
65- String ip ;
66- long count = 0 ;
67-
68- }
69-
7060 Count (StormBenchConfig config ) {
7161 this .config = config ;
7262 }
7363
7464 @ Override
75- public void prepare ( Map conf , TridentOperationContext context ) {
65+ public State init ( ) {
7666 this .reporter = new KafkaReporter (config .reporterTopic , config .brokerList );
77- }
78-
79- @ Override
80- public State init (Object o , TridentCollector tridentCollector ) {
8167 return new State ();
8268 }
8369
8470 @ Override
85- public void aggregate (State state , TridentTuple tridentTuple , TridentCollector tridentCollector ) {
71+ public State reduce (State state , TridentTuple tridentTuple ) {
8672 state .ip = tridentTuple .getString (0 );
8773 state .count ++;
88- tridentCollector .emit (new Values (state .ip , state .count ));
89- reporter .report (tridentTuple .getLong (0 ), System .currentTimeMillis ());
90- }
91-
92- @ Override
93- public void complete (State state , TridentCollector tridentCollector ) {
74+ reporter .report (tridentTuple .getLong (1 ), System .currentTimeMillis ());
75+ return state ;
9476 }
9577
96- @ Override
97- public void cleanup () {
78+ static class State {
79+ String ip ;
80+ long count = 0 ;
9881 }
9982 }
10083}
0 commit comments