4747import java .util .concurrent .Future ;
4848import java .util .concurrent .TimeUnit ;
4949import java .util .concurrent .atomic .AtomicBoolean ;
50+ import java .util .concurrent .atomic .AtomicLong ;
5051
5152import okhttp3 .OkHttpClient ;
5253import okhttp3 .Response ;
@@ -77,6 +78,8 @@ public class PullerInternal extends ReplicationInternal implements ChangeTracker
7778 private static final int INSERTION_BATCHER_DELAY = 250 ; // 0.25 Seconds
7879 private static final int INSERTION_BATCHER_CAPACITY = 100 ;
7980
81+ private static final long MAX_QUEUE_MEMORY_SIZE = 2 * 1024 * 1024 ; // 2MB
82+
8083 private ChangeTracker changeTracker ;
8184 protected SequenceMap pendingSequences ;
8285 protected Boolean canBulkGet ; // Does the server support _bulk_get requests?
@@ -88,6 +91,7 @@ public class PullerInternal extends ReplicationInternal implements ChangeTracker
8891 new ArrayList <RevisionInternal >(100 ));
8992 protected int httpConnectionCount ;
9093 protected Batcher <RevisionInternal > downloadsToInsert ;
94+ protected AtomicLong queuedMemorySize = new AtomicLong (0 );
9195
9296 private String str = null ;
9397
@@ -125,6 +129,9 @@ private void initDownloadsToInsert() {
125129 @ Override
126130 public void process (List <RevisionInternal > inbox ) {
127131 insertDownloads (inbox );
132+ if (downloadsToInsert .count () == 0 ) {
133+ queuedMemorySize .set (0 );
134+ }
128135 }
129136 });
130137 }
@@ -332,12 +339,12 @@ protected void pullBulkRevisions(List<RevisionInternal> bulkRevs) {
332339 db ,
333340 this .requestHeaders ,
334341 new RemoteBulkDownloaderRequest .BulkDownloaderDocument () {
335- public void onDocument (Map <String , Object > props ) {
342+ public void onDocument (Map <String , Object > props , long size ) {
336343 // Got a revision!
337344 // Find the matching revision in 'remainingRevs' and get its sequence:
338345 RevisionInternal rev ;
339346 if (props .get ("_id" ) != null ) {
340- rev = new RevisionInternal (props );
347+ rev = new RevisionInternal (props , size );
341348 } else {
342349 rev = new RevisionInternal ((String ) props .get ("id" ),
343350 (String ) props .get ("rev" ), false );
@@ -456,12 +463,22 @@ private void queueDownloadedRevision(RevisionInternal rev) {
456463 }
457464 }
458465
459- if (rev != null && rev .getBody () != null )
460- rev .getBody ().compact ();
466+ // NOTE: should not/not necessary to call Body.compact()
467+ // new RevisionInternal(Map<string, Object>) creates Body instance only
468+ // with `object`. Serializing object to json causes two unnecessary
469+ // JSON serializations.
470+
471+ if (rev .getBody () != null )
472+ queuedMemorySize .addAndGet (rev .getBody ().getSize ());
461473
462474 downloadsToInsert .queueObject (rev );
463- }
464475
476+ // if queue memory size is more than maximum, force flush the queue.
477+ if (queuedMemorySize .get () > MAX_QUEUE_MEMORY_SIZE ) {
478+ Log .d (TAG , "Flushing queued memory size at: " + queuedMemorySize );
479+ downloadsToInsert .flushAll (true );
480+ }
481+ }
465482
466483 // Get as many revisions as possible in one _all_docs request.
467484 // This is compatible with CouchDB, but it only works for revs of generation 1 without attachments.
@@ -586,7 +603,9 @@ public boolean run() {
586603 continue ;
587604 }
588605 }
589- if (rev .getBody () != null ) rev .getBody ().compact ();
606+
607+ // NOTE: calling Body.compact() here cause another JSON serialization by Jackson.
608+ // At this point Body.json is null, and Body.object has values.
590609
591610 // Mark this revision's fake sequence as processed:
592611 pendingSequences .removeSequence (fakeSequence );
@@ -700,18 +719,33 @@ public void onCompletion(Response httpResponse, Object result, Throwable e) {
700719 setError (e );
701720 }
702721 } else {
722+
703723 Map <String , Object > properties = (Map <String , Object >) result ;
704- PulledRevision gotRev = new PulledRevision (properties );
724+ long size = 0 ;
725+ if (httpResponse != null && httpResponse .body () != null )
726+ size = httpResponse .body ().contentLength ();
727+ PulledRevision gotRev = new PulledRevision (properties , size );
705728 gotRev .setSequence (rev .getSequence ());
706729
707730 Log .d (TAG , "%s: pullRemoteRevision add rev: %s to batcher: %s" ,
708731 PullerInternal .this , gotRev , downloadsToInsert );
709732
733+ // NOTE: should not/not necessary to call Body.compact()
734+ // new PulledRevision(Map<string, Object>) creates Body instance only
735+ // with `object`. Serializing object to json causes two unnecessary
736+ // JSON serializations.
737+
710738 if (gotRev .getBody () != null )
711- gotRev .getBody ().compact ( );
739+ queuedMemorySize . addAndGet ( gotRev .getBody ().getSize () );
712740
713741 // Add to batcher ... eventually it will be fed to -insertRevisions:.
714742 downloadsToInsert .queueObject (gotRev );
743+
744+ // if queue memory size is more than maximum, force flush the queue.
745+ if (queuedMemorySize .get () > MAX_QUEUE_MEMORY_SIZE ) {
746+ Log .d (TAG , "Flushing queued memory size at: " + queuedMemorySize );
747+ downloadsToInsert .flushAll (true );
748+ }
715749 }
716750
717751 // Note that we've finished this task:
0 commit comments