1414import org .apache .bcel .util .Repository ;
1515import org .apache .logging .log4j .LogManager ;
1616import org .apache .logging .log4j .Logger ;
17+ import org .apache .logging .log4j .message .ParameterizedMessage ;
1718
1819import java .util .*;
1920import java .util .concurrent .*;
@@ -32,18 +33,23 @@ public Class apply(Object metric) {
3233 return metric .getClass ();
3334 }
3435 };
35- private final PipelineExecutor executionPlan ;
36+ private final int executionId = UNIQUE_EXECUTION_ID . incrementAndGet () ;
3637 private final ExecutorService executorPool = Executors .newFixedThreadPool (
3738 Runtime .getRuntime ().availableProcessors (),
3839 new ThreadFactory () {
3940 private final AtomicInteger executionId = new AtomicInteger (0 );
4041
4142 @ Override
4243 public Thread newThread (Runnable r ) {
43- return new Thread (r , String .format ("JSM Execution Thread #%d" , executionId .getAndIncrement ()));
44+ return new Thread (r , String .format (
45+ "JSM Execution Thread #%d/%d" ,
46+ ControllerThread .this .executionId ,
47+ executionId .getAndIncrement ()
48+ ));
4449 }
4550 }
4651 );
52+ private final PipelineExecutor executionPlan ;
4753 private final Map <String , EventBus > stateContainers = Maps .newHashMap ();
4854 private final Table <MetricScope , String , List <Object >> dataForFutureScope = Tables .newCustomTable (
4955 Maps .<MetricScope , Map <String , List <Object >>>newEnumMap (MetricScope .class ),
@@ -63,7 +69,9 @@ public boolean apply(List<MetricResult> metricResults) {
6369 };
6470
6571 public ControllerThread (final PipelineExecutor executionPlan , final Set <String > classNames ) {
66- super (String .format ("JSM Controller Thread #%d" , ControllerThread .UNIQUE_EXECUTION_ID .getAndIncrement ()));
72+ super ();
73+ this .setName (String .format ("JSM Controller Thread #%d" , this .executionId ));
74+
6775 this .executionPlan = executionPlan ;
6876
6977 final HandlerMap classHandlerMap = this .executionPlan .getHandlerMap (MetricScope .CLASS );
@@ -107,8 +115,7 @@ public void run() {
107115 cvFactory .createClassVisitor (jc , entry .getValue ())
108116 ));
109117 } catch (ClassNotFoundException e ) {
110- //TODO: proper handling
111- logger .debug (e );
118+ logger .error ("Failed to load '{}', it will not be evaluated." , e );
112119 }
113120 }
114121
@@ -125,8 +132,7 @@ public void run() {
125132 try {
126133 performCalculationStage (taskQueue , currentFrame .getIsolatedMetrics ());
127134 } catch (InterruptedException e ) {
128- //TODO: handle interruption correctly.
129- logger .debug (e );
135+ logger .warn ("Execution interrupted, stopping evaluation." , e );
130136 break ;
131137 }
132138
@@ -145,8 +151,7 @@ public void run() {
145151 performCollectionStage (dataMap , currentFrame , produceList );
146152 }
147153 } catch (InterruptedException e ) {
148- //TODO: handle interruption correctly.
149- logger .debug (e );
154+ logger .warn ("Execution interrupted, stopping evaluation." , e );
150155 break ;
151156 }
152157
@@ -182,7 +187,11 @@ public void run() {
182187 logger .debug ("State containers and tasks added for next scope." );
183188 }
184189
185- prepareCalculatorsForNextFrame (taskQueue , nextFrameExecutionData );
190+ prepareCalculatorsForNextFrame (
191+ taskQueue ,
192+ nextFrameExecutionData ,
193+ this .executionPlan .getHandlerMap (currentScope )
194+ );
186195
187196 logger .debug ("Tasks for next frame prepared." );
188197 }
@@ -194,11 +203,23 @@ public void run() {
194203
195204 private void prepareCalculatorsForNextFrame (
196205 final Queue <Pair <EventBus , Runnable >> taskQueue ,
197- final Map <String , List <Object >> executionData
206+ final Map <String , List <Object >> executionData ,
207+ final HandlerMap hMap
198208 ) {
199209 for (final Map .Entry <String , List <Object >> entry : executionData .entrySet ()) {
200- final EventBus eBus = this .stateContainers .get (entry .getKey ());
201- //TODO: if eBus == null, try to fix it....
210+ final EventBus eBus ;
211+ if (this .stateContainers .containsKey (entry .getKey ())) {
212+ eBus = this .stateContainers .get (entry .getKey ());
213+ } else {
214+ logger .warn (
215+ "Request for undefined identifier, might indicate that " +
216+ "a producer is returning wrong data: '{}'" ,
217+ entry .getKey ()
218+ );
219+
220+ eBus = new EventBus (entry .getKey (), hMap );
221+ this .stateContainers .put (entry .getKey (), eBus );
222+ }
202223 taskQueue .add (new Pair <EventBus , Runnable >(eBus , new DataListDispatcher (eBus , entry .getValue ())));
203224 }
204225 }
@@ -238,36 +259,44 @@ private void performCollectionStage(
238259
239260 final CountDownLatch latch = createCountdownLatch (sharedMetrics .size () + producerMetrics .size ());
240261
241- //TODO: use Pair to link Futures to the metrics, for logging purposes
242- final List <Future <List <MetricResult >>> futureResults = Lists .newLinkedList ();
243- final List <Future <List <ProducerMetric .Produce >>> futureProduce = Lists .newLinkedList ();
262+ final List <Pair <SharedMetric , Future <List <MetricResult >>>> futureResults = Lists .newLinkedList ();
263+ final List <Pair <ProducerMetric , Future <List <ProducerMetric .Produce >>>> futureProduce = Lists .newLinkedList ();
244264
245265 for (final SharedMetric metric : sharedMetrics ) {
246266 final Map <String , MetricState > data = dataMap .get (metric .getClass ());
247- futureResults .add (this .executorPool .submit (CollectionStageTask .forSharedMetric (metric , data , latch )));
267+ futureResults .add (new Pair <SharedMetric , Future <List <MetricResult >>>(
268+ metric ,
269+ this .executorPool .submit (CollectionStageTask .forSharedMetric (metric , data , latch ))
270+ ));
248271 }
249272
250273 for (final ProducerMetric metric : producerMetrics ) {
251274 final Map <String , MetricState > data = dataMap .get (metric .getClass ());
252- futureProduce .add (this .executorPool .submit (CollectionStageTask .forProducer (metric , data , latch )));
275+ futureProduce .add (new Pair <ProducerMetric , Future <List <ProducerMetric .Produce >>>(
276+ metric ,
277+ this .executorPool .submit (CollectionStageTask .forProducer (metric , data , latch )))
278+ );
253279 }
254280
255281 //Await completion of async collection.
256282 latch .await ();
257283
258284 //Collection Results
259285 final List <MetricResult > results = Lists .newLinkedList ();
260- for (final Future <List <MetricResult >> fResult : futureResults ) {
286+ for (final Pair < SharedMetric , Future <List <MetricResult > >> fResult : futureResults ) {
261287 try {
262- final List <MetricResult > resList = fResult .get ();
288+ final List <MetricResult > resList = fResult .second . get ();
263289 if (resList != null ) {
264290 results .addAll (resList );
265291 } else {
266- //TODO: log invalid metric
292+ logger . warn ( "'{}' returned null as a list of results." , fResult . first . getClass (). getName ());
267293 }
268294 } catch (ExecutionException e ) {
269- //TODO: how to handle errors here (do they even happen?)
270- logger .warn ("Exception getting results" , e );
295+ logger .warn (new ParameterizedMessage (
296+ "Exception occurred whilst calculating results for '{}': {}" ,
297+ fResult .first .getClass ().getName (),
298+ e .getCause ().getMessage ()
299+ ), e );
271300 }
272301 }
273302
@@ -276,17 +305,20 @@ private void performCollectionStage(
276305 this .resultsCallback .apply (results );
277306
278307 //Collect and return produce.
279- for (final Future <List <ProducerMetric .Produce >> fProduce : futureProduce ) {
308+ for (final Pair < ProducerMetric , Future <List <ProducerMetric .Produce > >> fProduce : futureProduce ) {
280309 try {
281- final List <ProducerMetric .Produce > prodList = fProduce .get ();
310+ final List <ProducerMetric .Produce > prodList = fProduce .second . get ();
282311 if (prodList != null ) {
283312 produceOutput .addAll (prodList );
284313 } else {
285- //TODO: log invalid metric
314+ logger . warn ( "'{}' returned null as a list of produce." , fProduce . first . getClass (). getName ());
286315 }
287316 } catch (ExecutionException e ) {
288- //TODO: how to handle errors here (do they even happen?)
289- logger .warn ("Exception getting results" , e );
317+ logger .warn (new ParameterizedMessage (
318+ "Exception occurred whilst calculating produce for '{}': {}" ,
319+ fProduce .first .getClass ().getName (),
320+ e .getCause ().getMessage ()
321+ ), e );
290322 }
291323 }
292324 }
0 commit comments