@@ -80,30 +80,32 @@ private boolean isCurrentlyProcessing() {
8080 */
8181 public void queueObjects (List <T > objects ) {
8282
83- Log .v (Log .TAG_BATCHER , "%s: queueObjects called with %d objects. Thread: %s" , this , objects .size (), Thread .currentThread ());
84- if (objects .size () == 0 ) {
85- return ;
86- }
83+ synchronized (inbox ) {
84+ Log .v (Log .TAG_BATCHER , "%s: queueObjects called with %d objects. Thread: %s" , this , objects .size (), Thread .currentThread ());
85+ if (objects .size () == 0 ) {
86+ return ;
87+ }
8788
88- Log .v (Log .TAG_BATCHER , "%s: inbox size before adding objects: %d" , this , inbox .size ());
89+ Log .v (Log .TAG_BATCHER , "%s: inbox size before adding objects: %d" , this , inbox .size ());
8990
90- inbox .addAll (objects );
91+ inbox .addAll (objects );
9192
92- if (inbox .size () >= capacity ) {
93- Log .v (Log .TAG_BATCHER , "%s: calling scheduleWithDelay(0)" , this );
94- if (!isCurrentlyProcessing ()) {
95- // we only want to unschedule and re-schedule immediately if we
96- // aren't currently processing, otherwise we'll end up in a situation where
97- // the things currently processed get interrupted and re-scheduled,
98- // and nothing ever gets done. at the end of the processNow() method,
99- // it will schedule tasks if more are still pending.
100- unscheduleAllPending ();
101- scheduleWithDelay (0 );
93+ if (inbox .size () >= capacity ) {
94+ Log .v (Log .TAG_BATCHER , "%s: calling scheduleWithDelay(0)" , this );
95+ if (!isCurrentlyProcessing ()) {
96+ // we only want to unschedule and re-schedule immediately if we
97+ // aren't currently processing, otherwise we'll end up in a situation where
98+ // the things currently processed get interrupted and re-scheduled,
99+ // and nothing ever gets done. at the end of the processNow() method,
100+ // it will schedule tasks if more are still pending.
101+ unscheduleAllPending ();
102+ scheduleWithDelay (0 );
103+ }
104+ } else {
105+ int suggestedDelay = delayToUse ();
106+ Log .v (Log .TAG_BATCHER , "%s: calling scheduleWithDelay(%d)" , this , suggestedDelay );
107+ scheduleWithDelay (suggestedDelay );
102108 }
103- } else {
104- int suggestedDelay = delayToUse ();
105- Log .v (Log .TAG_BATCHER , "%s: calling scheduleWithDelay(%d)" , this , suggestedDelay );
106- scheduleWithDelay (suggestedDelay );
107109 }
108110 }
109111
@@ -146,10 +148,7 @@ public void flush() {
146148 }
147149
148150 public int count () {
149- synchronized (this ) {
150- if (inbox == null ) {
151- return 0 ;
152- }
151+ synchronized (inbox ) {
153152 return inbox .size ();
154153 }
155154 }
@@ -158,65 +157,65 @@ public int count() {
158157 * this is mainly for debugging
159158 */
160159 public int sizeOfPendingFutures () {
161- synchronized (this ) {
162- if (pendingFutures == null ) {
163- return 0 ;
164- }
160+ synchronized (pendingFutures ) {
165161 return pendingFutures .size ();
166162 }
167163 }
168164
169165 private void processNow () {
170- Log .v (Log .TAG_BATCHER , this + ": processNow() called" );
171-
172- List <T > toProcess = new ArrayList <T >();
173166
174- if (inbox == null || inbox .size () == 0 ) {
175- Log .v (Log .TAG_BATCHER , this + ": processNow() called, but inbox is empty" );
176- return ;
177- } else if (inbox .size () <= capacity ) {
178- Log .v (Log .TAG_BATCHER , "%s: inbox.size() <= capacity, adding %d items from inbox -> toProcess" , this , inbox .size ());
179- while (inbox .size () > 0 ) {
180- try {
181- T t = inbox .take ();
182- toProcess .add (t );
183- } catch (InterruptedException e ) {
184- Log .w (Log .TAG_BATCHER , "%s: processNow(): %s" , this , e .getMessage ());
167+ synchronized (inbox ) {
168+ Log .v (Log .TAG_BATCHER , this + ": processNow() called" );
169+
170+ List <T > toProcess = new ArrayList <T >();
171+
172+ if (inbox .size () == 0 ) {
173+ Log .v (Log .TAG_BATCHER , this + ": processNow() called, but inbox is empty" );
174+ return ;
175+ } else if (inbox .size () <= capacity ) {
176+ Log .v (Log .TAG_BATCHER , "%s: inbox.size() <= capacity, adding %d items from inbox -> toProcess" , this , inbox .size ());
177+ while (inbox .size () > 0 ) {
178+ try {
179+ T t = inbox .take ();
180+ toProcess .add (t );
181+ } catch (InterruptedException e ) {
182+ Log .w (Log .TAG_BATCHER , "%s: processNow(): %s" , this , e .getMessage ());
183+ }
185184 }
186- }
187- } else {
188- Log .v (Log .TAG_BATCHER , "%s: processNow() called, inbox size: %d" , this , inbox .size ());
189- int i = 0 ;
190- while (inbox .size () > 0 && i < capacity ) {
191- try {
192- T t = inbox .take ();
193- toProcess .add (t );
194- } catch (InterruptedException e ) {
195- Log .w (Log .TAG_BATCHER , "%s: processNow(): %s" , this , e .getMessage ());
185+ } else {
186+ Log .v (Log .TAG_BATCHER , "%s: processNow() called, inbox size: %d" , this , inbox .size ());
187+ int i = 0 ;
188+ while (inbox .size () > 0 && i < capacity ) {
189+ try {
190+ T t = inbox .take ();
191+ toProcess .add (t );
192+ } catch (InterruptedException e ) {
193+ Log .w (Log .TAG_BATCHER , "%s: processNow(): %s" , this , e .getMessage ());
194+ }
195+ i += 1 ;
196196 }
197- i += 1 ;
198- }
199197
200- Log .v (Log .TAG_BATCHER , "%s: inbox.size() > capacity, moving %d items from inbox -> toProcess array" , this , toProcess .size ());
201- }
198+ Log .v (Log .TAG_BATCHER , "%s: inbox.size() > capacity, moving %d items from inbox -> toProcess array" , this , toProcess .size ());
199+ }
202200
203- if (toProcess != null && toProcess .size () > 0 ) {
204- Log .v (Log .TAG_BATCHER , "%s: invoking processor %s with %d items " , this , processor , toProcess .size ());
205- processor .process (toProcess );
206- } else {
207- Log .v (Log .TAG_BATCHER , "%s: nothing to process" , this );
208- }
209- lastProcessedTime = System .currentTimeMillis ();
210-
211- // in case we ignored any schedule requests while processing, if
212- // we have more items in inbox, lets schedule another processing attempt
213- if (inbox .size () > 0 ) {
214- Log .v (Log .TAG_BATCHER , "%s: finished processing a batch, but inbox size > 0: %d" , this , inbox .size ());
215- //int delayToUse = delayToUse();
216- int delayToUse = 0 ;
217- Log .v (Log .TAG_BATCHER , "%s: going to process with delay: %d" , this , delayToUse );
218- ScheduledFuture pendingFuture = workExecutor .schedule (processNowRunnable , delayToUse , TimeUnit .MILLISECONDS );
219- pendingFutures .add (pendingFuture );
201+ if (toProcess != null && toProcess .size () > 0 ) {
202+ Log .v (Log .TAG_BATCHER , "%s: invoking processor %s with %d items " , this , processor , toProcess .size ());
203+ processor .process (toProcess );
204+ } else {
205+ Log .v (Log .TAG_BATCHER , "%s: nothing to process" , this );
206+ }
207+ lastProcessedTime = System .currentTimeMillis ();
208+
209+ // in case we ignored any schedule requests while processing, if
210+ // we have more items in inbox, lets schedule another processing attempt
211+ if (inbox .size () > 0 ) {
212+ Log .v (Log .TAG_BATCHER , "%s: finished processing a batch, but inbox size > 0: %d" , this , inbox .size ());
213+ //int delayToUse = delayToUse();
214+ int delayToUse = 0 ;
215+ Log .v (Log .TAG_BATCHER , "%s: going to process with delay: %d" , this , delayToUse );
216+ ScheduledFuture pendingFuture = workExecutor .schedule (processNowRunnable , delayToUse , TimeUnit .MILLISECONDS );
217+ pendingFutures .add (pendingFuture );
218+ }
220219 }
221220 }
222221
0 commit comments