2222
2323import static com .google .common .base .Preconditions .checkState ;
2424
25+ /**
26+ * Implementation of the execution of the {@link nl.rug.jbi.jsm.core.pipeline.Pipeline}.
27+ * The execution is performed by performing the following loop.
28+ * <ul>
29+ * <li>{@link #performCalculationStage(java.util.Queue, java.util.List)}</li>
30+ * <li>{@link #performMetricCollection(nl.rug.jbi.jsm.core.pipeline.PipelineFrame, java.util.Map)}</li>
31+ * <li>{@link #performCollectionStage(java.util.Map, nl.rug.jbi.jsm.core.pipeline.PipelineFrame, java.util.List)}</li>
32+ * <li>{@link #prepareProduceForNextFrame(java.util.Map, java.util.List, nl.rug.jbi.jsm.core.calculator.MetricScope)}</li>
33+ * <li>If end of frame, merge data for the next scope into the deliverables for the next frame.</li>
34+ * <li>{@link #prepareCalculatorsForNextFrame(java.util.Queue, java.util.Map, nl.rug.jbi.jsm.core.pipeline.HandlerMap)}</li>
35+ * <li>If there is no next frame, exit loop.</li>
36+ * </ul>
37+ *
38+ * @author David van Leusen
39+ * @since 2014-06-02
40+ */
2541class ControllerThread extends Thread {
2642 private final static Logger logger = LogManager .getLogger (ControllerThread .class );
2743 private final static AtomicInteger UNIQUE_EXECUTION_ID = new AtomicInteger (0 );
@@ -107,6 +123,7 @@ public void run() {
107123 final Repository repo = this .executionPlan .getRepository ();
108124 final ClassVisitorFactory cvFactory = this .executionPlan .getClassVisitorFactory ();
109125
126+ //Create the base set of modifiers for the first frame, based on the classes that need to be inspected.
110127 for (final Map .Entry <String , EventBus > entry : this .stateContainers .entrySet ()) {
111128 try {
112129 final JavaClass jc = repo .loadClass (entry .getKey ());
@@ -187,6 +204,7 @@ public void run() {
187204 logger .debug ("State containers and tasks added for next scope." );
188205 }
189206
207+ //Prepare tasks containing data for the next frame.
190208 prepareCalculatorsForNextFrame (
191209 taskQueue ,
192210 nextFrameExecutionData ,
@@ -201,6 +219,12 @@ public void run() {
201219 executionPlan .onFinish ();
202220 }
203221
222+ /**
223+ * Step that gets executed at the end of the frame, it will turn the produce produced by the producers into a set of
224+ * {@link nl.rug.jbi.jsm.core.execution.DataListDispatcher} for execution in the next frame.
225+ *
226+ * @see nl.rug.jbi.jsm.core.execution.DataListDispatcher
227+ */
204228 private void prepareCalculatorsForNextFrame (
205229 final Queue <Pair <EventBus , Runnable >> taskQueue ,
206230 final Map <String , List <Object >> executionData ,
@@ -213,7 +237,7 @@ private void prepareCalculatorsForNextFrame(
213237 } else {
214238 logger .warn (
215239 "Request for undefined identifier, might indicate that " +
216- "a producer is returning wrong data : '{}'" ,
240+ "a producer is returning wrong identifiers : '{}'" ,
217241 entry .getKey ()
218242 );
219243
@@ -224,6 +248,13 @@ private void prepareCalculatorsForNextFrame(
224248 }
225249 }
226250
251+ /**
252+ * Processes the produce of {@link #performCollectionStage(java.util.Map, nl.rug.jbi.jsm.core.pipeline.PipelineFrame, java.util.List)}
253+ * by splitting the produce based on whether it needs to be delivered in the current frame or a future frame.
254+ * The produce meant for the current frame gets prepared to be sent to
255+ * {@link #prepareCalculatorsForNextFrame(java.util.Queue, java.util.Map, nl.rug.jbi.jsm.core.pipeline.HandlerMap)}
256+ * while the other produce gets stored based on the scope its meant to be delivered in.
257+ */
227258 private void prepareProduceForNextFrame (
228259 final Map <String , List <Object >> executionMap ,
229260 final List <ProducerMetric .Produce > produceList ,
@@ -249,6 +280,16 @@ private void prepareProduceForNextFrame(
249280 }
250281 }
251282
283+ /**
284+ * Based on the data collected by {@link #performMetricCollection(nl.rug.jbi.jsm.core.pipeline.PipelineFrame, java.util.Map)}
285+ * calculate both results and produce of declared {@link nl.rug.jbi.jsm.core.calculator.SharedMetric} and
286+ * {@link nl.rug.jbi.jsm.core.calculator.ProducerMetric}.
287+ * The results from the shared metrics will be sent off to the frontend while the produce gets returned to the loop
288+ * for further processing.
289+ *
290+ * @see nl.rug.jbi.jsm.core.execution.CollectionStageTask#forSharedMetric(nl.rug.jbi.jsm.core.calculator.SharedMetric, java.util.Map, java.util.concurrent.CountDownLatch)
291+ * @see nl.rug.jbi.jsm.core.execution.CollectionStageTask#forProducer(nl.rug.jbi.jsm.core.calculator.ProducerMetric, java.util.Map, java.util.concurrent.CountDownLatch)
292+ */
252293 private void performCollectionStage (
253294 final Map <Class , Map <String , MetricState >> dataMap ,
254295 final PipelineFrame currentFrame ,
@@ -262,6 +303,7 @@ private void performCollectionStage(
262303 final List <Pair <SharedMetric , Future <List <MetricResult >>>> futureResults = Lists .newLinkedList ();
263304 final List <Pair <ProducerMetric , Future <List <ProducerMetric .Produce >>>> futureProduce = Lists .newLinkedList ();
264305
306+ //Begin execution of Shared Metric collection
265307 for (final SharedMetric metric : sharedMetrics ) {
266308 final Map <String , MetricState > data = dataMap .get (metric .getClass ());
267309 futureResults .add (new Pair <SharedMetric , Future <List <MetricResult >>>(
@@ -270,6 +312,7 @@ private void performCollectionStage(
270312 ));
271313 }
272314
315+ //Begin execution of Producer collection
273316 for (final ProducerMetric metric : producerMetrics ) {
274317 final Map <String , MetricState > data = dataMap .get (metric .getClass ());
275318 futureProduce .add (new Pair <ProducerMetric , Future <List <ProducerMetric .Produce >>>(
@@ -323,9 +366,13 @@ private void performCollectionStage(
323366 }
324367 }
325368
369+ /**
370+ * Collects the states from the individual targets, then transposes the results so they are mapped by metric
371+ * instead of by target.
372+ */
326373 private void performMetricCollection (
327374 final PipelineFrame currentFrame ,
328- final Map <Class , Map <String , MetricState >> dataMap
375+ final Map <Class , Map <String , MetricState >> outputMap
329376 ) {
330377 final Table <String , Class , MetricState > dataTmp = HashBasedTable .create ();
331378
@@ -341,9 +388,16 @@ private void performMetricCollection(
341388 }
342389
343390 //Output the data
344- dataMap .putAll (Tables .transpose (dataTmp ).rowMap ());
391+ outputMap .putAll (Tables .transpose (dataTmp ).rowMap ());
345392 }
346393
394+ /**
395+ * Applies the set of modifiers left by the last frame and calculate the isolated metrics.
396+ * This stage happens in parallel within a {@link java.util.concurrent.ExecutorService}, this thread will block
397+ * until all individual processing is done.
398+ *
399+ * @see nl.rug.jbi.jsm.core.execution.CalculationStageTask
400+ */
347401 private void performCalculationStage (
348402 final Queue <Pair <EventBus , Runnable >> taskQueue ,
349403 final List <IsolatedMetric > isolatedMetrics
@@ -361,6 +415,7 @@ private void performCalculationStage(
361415 ));
362416 }
363417
418+ //Block until individual processes are complete.
364419 calculationStageLatch .await ();
365420 }
366421}
0 commit comments