Skip to content

Commit 1e9627a

Browse files
author
Hideki Itakura
committed
At Line 238 of OLD code's causes Out-Of-Index error. Because RemoteExecutor's Thread pool size is 5, so this method could be called from multiple threads. And it could remove items from bulkRevsToPull concurrently. To avoid this problem, entire while loop is surrounded by synchronization block.
1 parent 9b71b6d commit 1e9627a

1 file changed

Lines changed: 20 additions & 22 deletions

File tree

src/main/java/com/couchbase/lite/replicator/PullerInternal.java

Lines changed: 20 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -221,37 +221,35 @@ public void pullRemoteRevisions() {
221221
//find the work to be done in a synchronized block
222222
List<RevisionInternal> workToStartNow = new ArrayList<RevisionInternal>();
223223
List<RevisionInternal> bulkWorkToStartNow = new ArrayList<RevisionInternal>();
224-
while (httpConnectionCount + workToStartNow.size() < MAX_OPEN_HTTP_CONNECTIONS) {
225-
int nBulk = (bulkRevsToPull.size() < MAX_REVS_TO_GET_IN_BULK) ? bulkRevsToPull.size() : MAX_REVS_TO_GET_IN_BULK;
226224

227-
if (nBulk == 1) {
228-
// Rather than pulling a single revision in 'bulk', just pull it normally:
229-
queueRemoteRevision(bulkRevsToPull.remove(0));
230-
nBulk = 0;
231-
}
225+
synchronized (bulkRevsToPull) {
226+
while (httpConnectionCount + workToStartNow.size() < MAX_OPEN_HTTP_CONNECTIONS) {
227+
int nBulk = (bulkRevsToPull.size() < MAX_REVS_TO_GET_IN_BULK) ? bulkRevsToPull.size() : MAX_REVS_TO_GET_IN_BULK;
228+
229+
if (nBulk == 1) {
230+
// Rather than pulling a single revision in 'bulk', just pull it normally:
231+
queueRemoteRevision(bulkRevsToPull.remove(0));
232+
nBulk = 0;
233+
}
232234

233-
if (nBulk > 0) {
234-
// Prefer to pull bulk revisions:
235-
// Note: ArrayList.addAll(Collection) iterates parameter collection
236-
// https://github.com/couchbase/couchbase-lite-java-core/issues/361
237-
synchronized (bulkRevsToPull) {
235+
if (nBulk > 0) {
238236
bulkWorkToStartNow.addAll(bulkRevsToPull.subList(0, nBulk));
239237
bulkRevsToPull.subList(0, nBulk).clear();
240-
}
241-
} else {
242-
// Prefer to pull an existing revision over a deleted one:
243-
if (revsToPull.size() == 0 && deletedRevsToPull.size() == 0) {
244-
break; // both queues are empty
245-
} else if (revsToPull.size() > 0) {
246-
workToStartNow.add(revsToPull.remove(0));
247-
} else if (deletedRevsToPull.size() > 0) {
248-
workToStartNow.add(deletedRevsToPull.remove(0));
238+
} else {
239+
// Prefer to pull an existing revision over a deleted one:
240+
if (revsToPull.size() == 0 && deletedRevsToPull.size() == 0) {
241+
break; // both queues are empty
242+
} else if (revsToPull.size() > 0) {
243+
workToStartNow.add(revsToPull.remove(0));
244+
} else if (deletedRevsToPull.size() > 0) {
245+
workToStartNow.add(deletedRevsToPull.remove(0));
246+
}
249247
}
250248
}
251249
}
252250

253251
//actually run it outside the synchronized block
254-
if(bulkWorkToStartNow.size() > 0) {
252+
if (bulkWorkToStartNow.size() > 0) {
255253
pullBulkRevisions(bulkWorkToStartNow);
256254
}
257255

0 commit comments

Comments
 (0)