@@ -62,9 +62,9 @@ public class PullerInternal extends ReplicationInternal implements ChangeTracker
6262 private ChangeTracker changeTracker ;
6363 protected SequenceMap pendingSequences ;
6464 protected Boolean canBulkGet ; // Does the server support _bulk_get requests?
65- protected List <RevisionInternal > revsToPull ;
66- protected List <RevisionInternal > bulkRevsToPull ;
67- protected List <RevisionInternal > deletedRevsToPull ;
65+ protected List <RevisionInternal > revsToPull = Collections . synchronizedList ( new ArrayList < RevisionInternal >( 100 )) ;
66+ protected List <RevisionInternal > bulkRevsToPull = Collections . synchronizedList ( new ArrayList < RevisionInternal >( 100 )) ;
67+ protected List <RevisionInternal > deletedRevsToPull = Collections . synchronizedList ( new ArrayList < RevisionInternal >( 100 )) ;
6868 protected int httpConnectionCount ;
6969 protected Batcher <RevisionInternal > downloadsToInsert ;
7070
@@ -198,12 +198,6 @@ protected void processInbox(RevisionList inbox)
198198
199199 //TODO: add support for rev isConflicted
200200 if (canBulkGet || (rev .getGeneration () == 1 && !rev .isDeleted ())) { // &&!rev.isConflicted)
201-
202- //optimistically pull 1st-gen revs in bulk
203- if (bulkRevsToPull == null )
204- // bulkRevsToPull could be accessed from multiple threads
205- bulkRevsToPull = Collections .synchronizedList (new ArrayList <RevisionInternal >(100 ));
206-
207201 bulkRevsToPull .add (rev );
208202 }
209203 else {
@@ -228,34 +222,31 @@ public void pullRemoteRevisions() {
228222 List <RevisionInternal > workToStartNow = new ArrayList <RevisionInternal >();
229223 List <RevisionInternal > bulkWorkToStartNow = new ArrayList <RevisionInternal >();
230224 while (httpConnectionCount + workToStartNow .size () < MAX_OPEN_HTTP_CONNECTIONS ) {
231- int nBulk = 0 ;
232- if (bulkRevsToPull != null ) {
233- nBulk = (bulkRevsToPull .size () < MAX_REVS_TO_GET_IN_BULK ) ? bulkRevsToPull .size () : MAX_REVS_TO_GET_IN_BULK ;
234- }
225+ int nBulk = (bulkRevsToPull .size () < MAX_REVS_TO_GET_IN_BULK ) ? bulkRevsToPull .size () : MAX_REVS_TO_GET_IN_BULK ;
226+
235227 if (nBulk == 1 ) {
236228 // Rather than pulling a single revision in 'bulk', just pull it normally:
237- queueRemoteRevision (bulkRevsToPull .get (0 ));
238- bulkRevsToPull .remove (0 );
229+ queueRemoteRevision (bulkRevsToPull .remove (0 ));
239230 nBulk = 0 ;
240231 }
232+
241233 if (nBulk > 0 ) {
242234 // Prefer to pull bulk revisions:
243235 // Note: ArrayList.addAll(Collection) iterates parameter collection
244236 // https://github.com/couchbase/couchbase-lite-java-core/issues/361
245237 synchronized (bulkRevsToPull ) {
246238 bulkWorkToStartNow .addAll (bulkRevsToPull .subList (0 , nBulk ));
239+ bulkRevsToPull .subList (0 , nBulk ).clear ();
247240 }
248- bulkRevsToPull .subList (0 , nBulk ).clear ();
249241 } else {
250242 // Prefer to pull an existing revision over a deleted one:
251- List <RevisionInternal > queue = revsToPull ;
252- if (queue == null || queue .size () == 0 ) {
253- queue = deletedRevsToPull ;
254- if (queue == null || queue .size () == 0 )
255- break ; // both queues are empty
243+ if (revsToPull .size () == 0 && deletedRevsToPull .size () == 0 ) {
244+ break ; // both queues are empty
245+ } else if (revsToPull .size () > 0 ) {
246+ workToStartNow .add (revsToPull .remove (0 ));
247+ } else if (deletedRevsToPull .size () > 0 ) {
248+ workToStartNow .add (deletedRevsToPull .remove (0 ));
256249 }
257- workToStartNow .add (queue .get (0 ));
258- queue .remove (0 );
259250 }
260251 }
261252
@@ -662,14 +653,8 @@ public String joinQuotedEscaped(List<String> strings) {
662653 @ InterfaceAudience .Private
663654 protected void queueRemoteRevision (RevisionInternal rev ) {
664655 if (rev .isDeleted ()) {
665- if (deletedRevsToPull == null ) {
666- deletedRevsToPull = new ArrayList <RevisionInternal >(100 );
667- }
668-
669656 deletedRevsToPull .add (rev );
670657 } else {
671- if (revsToPull == null )
672- revsToPull = new ArrayList <RevisionInternal >(100 );
673658 revsToPull .add (rev );
674659 }
675660 }
@@ -1026,6 +1011,7 @@ protected void goOnline() {
10261011
10271012 protected void pauseOrResume (){
10281013 int pending = batcher .count () + pendingSequences .count ();
1014+ //Log.e(Log.TAG_SYNC, "[pauseOrResume()] batcher.count()="+batcher.count() + ", pendingSequences.count()="+pendingSequences.count()+", pendingFutures.size()=" +pendingFutures.size());
10291015 changeTracker .setPaused (pending >= MAX_PENDING_DOCS );
10301016 }
10311017}
0 commit comments