Skip to content

Commit 9566e5c

Browse files
author
Hideki Itakura
committed
Batcher.inbox is accessed from multiple Threads, WorkExecutor and ChangeTracker thread. Also, in the method of Batcher access inbox multiple times in it. This caused threading problem.
By fixing the issue, methods in `Batcher` accesses inbox more than two times should be synchronized by inbox.
1 parent 61d0214 commit 9566e5c

1 file changed

Lines changed: 72 additions & 73 deletions

File tree

src/main/java/com/couchbase/lite/support/Batcher.java

Lines changed: 72 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -80,30 +80,32 @@ private boolean isCurrentlyProcessing() {
8080
*/
8181
public void queueObjects(List<T> objects) {
8282

83-
Log.v(Log.TAG_BATCHER, "%s: queueObjects called with %d objects. Thread: %s", this, objects.size(), Thread.currentThread());
84-
if (objects.size() == 0) {
85-
return;
86-
}
83+
synchronized (inbox) {
84+
Log.v(Log.TAG_BATCHER, "%s: queueObjects called with %d objects. Thread: %s", this, objects.size(), Thread.currentThread());
85+
if (objects.size() == 0) {
86+
return;
87+
}
8788

88-
Log.v(Log.TAG_BATCHER, "%s: inbox size before adding objects: %d", this, inbox.size());
89+
Log.v(Log.TAG_BATCHER, "%s: inbox size before adding objects: %d", this, inbox.size());
8990

90-
inbox.addAll(objects);
91+
inbox.addAll(objects);
9192

92-
if (inbox.size() >= capacity) {
93-
Log.v(Log.TAG_BATCHER, "%s: calling scheduleWithDelay(0)", this);
94-
if (!isCurrentlyProcessing()) {
95-
// we only want to unschedule and re-schedule immediately if we
96-
// aren't currently processing, otherwise we'll end up in a situation where
97-
// the things currently processed get interrupted and re-scheduled,
98-
// and nothing ever gets done. at the end of the processNow() method,
99-
// it will schedule tasks if more are still pending.
100-
unscheduleAllPending();
101-
scheduleWithDelay(0);
93+
if (inbox.size() >= capacity) {
94+
Log.v(Log.TAG_BATCHER, "%s: calling scheduleWithDelay(0)", this);
95+
if (!isCurrentlyProcessing()) {
96+
// we only want to unschedule and re-schedule immediately if we
97+
// aren't currently processing, otherwise we'll end up in a situation where
98+
// the things currently processed get interrupted and re-scheduled,
99+
// and nothing ever gets done. at the end of the processNow() method,
100+
// it will schedule tasks if more are still pending.
101+
unscheduleAllPending();
102+
scheduleWithDelay(0);
103+
}
104+
} else {
105+
int suggestedDelay = delayToUse();
106+
Log.v(Log.TAG_BATCHER, "%s: calling scheduleWithDelay(%d)", this, suggestedDelay);
107+
scheduleWithDelay(suggestedDelay);
102108
}
103-
} else {
104-
int suggestedDelay = delayToUse();
105-
Log.v(Log.TAG_BATCHER, "%s: calling scheduleWithDelay(%d)", this, suggestedDelay);
106-
scheduleWithDelay(suggestedDelay);
107109
}
108110
}
109111

@@ -146,10 +148,7 @@ public void flush() {
146148
}
147149

148150
public int count() {
149-
synchronized(this) {
150-
if(inbox == null) {
151-
return 0;
152-
}
151+
synchronized (inbox) {
153152
return inbox.size();
154153
}
155154
}
@@ -158,65 +157,65 @@ public int count() {
158157
* this is mainly for debugging
159158
*/
160159
public int sizeOfPendingFutures() {
161-
synchronized(this) {
162-
if(pendingFutures == null) {
163-
return 0;
164-
}
160+
synchronized (pendingFutures) {
165161
return pendingFutures.size();
166162
}
167163
}
168164

169165
private void processNow() {
170-
Log.v(Log.TAG_BATCHER, this + ": processNow() called");
171-
172-
List<T> toProcess = new ArrayList<T>();
173166

174-
if (inbox == null || inbox.size() == 0) {
175-
Log.v(Log.TAG_BATCHER, this + ": processNow() called, but inbox is empty");
176-
return;
177-
} else if (inbox.size() <= capacity) {
178-
Log.v(Log.TAG_BATCHER, "%s: inbox.size() <= capacity, adding %d items from inbox -> toProcess", this, inbox.size());
179-
while (inbox.size() > 0) {
180-
try {
181-
T t = inbox.take();
182-
toProcess.add(t);
183-
} catch (InterruptedException e) {
184-
Log.w(Log.TAG_BATCHER, "%s: processNow(): %s", this, e.getMessage());
167+
synchronized (inbox) {
168+
Log.v(Log.TAG_BATCHER, this + ": processNow() called");
169+
170+
List<T> toProcess = new ArrayList<T>();
171+
172+
if (inbox.size() == 0) {
173+
Log.v(Log.TAG_BATCHER, this + ": processNow() called, but inbox is empty");
174+
return;
175+
} else if (inbox.size() <= capacity) {
176+
Log.v(Log.TAG_BATCHER, "%s: inbox.size() <= capacity, adding %d items from inbox -> toProcess", this, inbox.size());
177+
while (inbox.size() > 0) {
178+
try {
179+
T t = inbox.take();
180+
toProcess.add(t);
181+
} catch (InterruptedException e) {
182+
Log.w(Log.TAG_BATCHER, "%s: processNow(): %s", this, e.getMessage());
183+
}
185184
}
186-
}
187-
} else {
188-
Log.v(Log.TAG_BATCHER, "%s: processNow() called, inbox size: %d", this, inbox.size());
189-
int i = 0;
190-
while (inbox.size() > 0 && i < capacity) {
191-
try {
192-
T t = inbox.take();
193-
toProcess.add(t);
194-
} catch (InterruptedException e) {
195-
Log.w(Log.TAG_BATCHER, "%s: processNow(): %s", this, e.getMessage());
185+
} else {
186+
Log.v(Log.TAG_BATCHER, "%s: processNow() called, inbox size: %d", this, inbox.size());
187+
int i = 0;
188+
while (inbox.size() > 0 && i < capacity) {
189+
try {
190+
T t = inbox.take();
191+
toProcess.add(t);
192+
} catch (InterruptedException e) {
193+
Log.w(Log.TAG_BATCHER, "%s: processNow(): %s", this, e.getMessage());
194+
}
195+
i += 1;
196196
}
197-
i += 1;
198-
}
199197

200-
Log.v(Log.TAG_BATCHER, "%s: inbox.size() > capacity, moving %d items from inbox -> toProcess array", this, toProcess.size());
201-
}
198+
Log.v(Log.TAG_BATCHER, "%s: inbox.size() > capacity, moving %d items from inbox -> toProcess array", this, toProcess.size());
199+
}
202200

203-
if (toProcess != null && toProcess.size() > 0) {
204-
Log.v(Log.TAG_BATCHER, "%s: invoking processor %s with %d items ", this, processor, toProcess.size());
205-
processor.process(toProcess);
206-
} else {
207-
Log.v(Log.TAG_BATCHER, "%s: nothing to process", this);
208-
}
209-
lastProcessedTime = System.currentTimeMillis();
210-
211-
// in case we ignored any schedule requests while processing, if
212-
// we have more items in inbox, lets schedule another processing attempt
213-
if (inbox.size() > 0) {
214-
Log.v(Log.TAG_BATCHER, "%s: finished processing a batch, but inbox size > 0: %d", this, inbox.size());
215-
//int delayToUse = delayToUse();
216-
int delayToUse = 0;
217-
Log.v(Log.TAG_BATCHER, "%s: going to process with delay: %d", this, delayToUse);
218-
ScheduledFuture pendingFuture = workExecutor.schedule(processNowRunnable, delayToUse, TimeUnit.MILLISECONDS);
219-
pendingFutures.add(pendingFuture);
201+
if (toProcess != null && toProcess.size() > 0) {
202+
Log.v(Log.TAG_BATCHER, "%s: invoking processor %s with %d items ", this, processor, toProcess.size());
203+
processor.process(toProcess);
204+
} else {
205+
Log.v(Log.TAG_BATCHER, "%s: nothing to process", this);
206+
}
207+
lastProcessedTime = System.currentTimeMillis();
208+
209+
// in case we ignored any schedule requests while processing, if
210+
// we have more items in inbox, lets schedule another processing attempt
211+
if (inbox.size() > 0) {
212+
Log.v(Log.TAG_BATCHER, "%s: finished processing a batch, but inbox size > 0: %d", this, inbox.size());
213+
//int delayToUse = delayToUse();
214+
int delayToUse = 0;
215+
Log.v(Log.TAG_BATCHER, "%s: going to process with delay: %d", this, delayToUse);
216+
ScheduledFuture pendingFuture = workExecutor.schedule(processNowRunnable, delayToUse, TimeUnit.MILLISECONDS);
217+
pendingFutures.add(pendingFuture);
218+
}
220219
}
221220
}
222221

0 commit comments

Comments
 (0)