Skip to content

Commit 61d0214

Browse files
committed
Merge pull request #640 from couchbase/feature/issue_597_pullrepl_couchdb
Fixed Android 597 - problems with pull request
2 parents f7349de + 13395e8 commit 61d0214

6 files changed

Lines changed: 102 additions & 80 deletions

File tree

src/main/java/com/couchbase/lite/replicator/ChangeTracker.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -609,8 +609,10 @@ public Map<String, Object> changesFeedPOSTBodyMap() {
609609
public void setPaused(boolean paused) {
610610
Log.v(Log.TAG, "setPaused: " + paused);
611611
synchronized (pausedObj) {
612-
this.paused = paused;
613-
pausedObj.notifyAll();
612+
if(this.paused != paused) {
613+
this.paused = paused;
614+
pausedObj.notifyAll();
615+
}
614616
}
615617
}
616618

src/main/java/com/couchbase/lite/replicator/PullerInternal.java

Lines changed: 51 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import com.couchbase.lite.storage.SQLException;
1212
import com.couchbase.lite.support.BatchProcessor;
1313
import com.couchbase.lite.support.Batcher;
14+
import com.couchbase.lite.support.CustomFuture;
1415
import com.couchbase.lite.support.HttpClientFactory;
1516
import com.couchbase.lite.support.RemoteRequestCompletionBlock;
1617
import com.couchbase.lite.support.SequenceMap;
@@ -61,9 +62,9 @@ public class PullerInternal extends ReplicationInternal implements ChangeTracker
6162
private ChangeTracker changeTracker;
6263
protected SequenceMap pendingSequences;
6364
protected Boolean canBulkGet; // Does the server support _bulk_get requests?
64-
protected List<RevisionInternal> revsToPull;
65-
protected List<RevisionInternal> bulkRevsToPull;
66-
protected List<RevisionInternal> deletedRevsToPull;
65+
protected List<RevisionInternal> revsToPull = Collections.synchronizedList(new ArrayList<RevisionInternal>(100));
66+
protected List<RevisionInternal> bulkRevsToPull = Collections.synchronizedList(new ArrayList<RevisionInternal>(100));
67+
protected List<RevisionInternal> deletedRevsToPull = Collections.synchronizedList(new ArrayList<RevisionInternal>(100));
6768
protected int httpConnectionCount;
6869
protected Batcher<RevisionInternal> downloadsToInsert;
6970

@@ -197,12 +198,6 @@ protected void processInbox(RevisionList inbox)
197198

198199
//TODO: add support for rev isConflicted
199200
if (canBulkGet || (rev.getGeneration() == 1 && !rev.isDeleted())) { // &&!rev.isConflicted)
200-
201-
//optimistically pull 1st-gen revs in bulk
202-
if (bulkRevsToPull == null)
203-
// bulkRevsToPull could be accessed from multiple threads
204-
bulkRevsToPull = Collections.synchronizedList(new ArrayList<RevisionInternal>(100));
205-
206201
bulkRevsToPull.add(rev);
207202
}
208203
else {
@@ -226,40 +221,35 @@ public void pullRemoteRevisions() {
226221
//find the work to be done in a synchronized block
227222
List<RevisionInternal> workToStartNow = new ArrayList<RevisionInternal>();
228223
List<RevisionInternal> bulkWorkToStartNow = new ArrayList<RevisionInternal>();
229-
while (httpConnectionCount + workToStartNow.size() < MAX_OPEN_HTTP_CONNECTIONS) {
230-
int nBulk = 0;
231-
if (bulkRevsToPull != null) {
232-
nBulk = (bulkRevsToPull.size() < MAX_REVS_TO_GET_IN_BULK) ? bulkRevsToPull.size() : MAX_REVS_TO_GET_IN_BULK;
233-
}
234-
if (nBulk == 1) {
235-
// Rather than pulling a single revision in 'bulk', just pull it normally:
236-
queueRemoteRevision(bulkRevsToPull.get(0));
237-
bulkRevsToPull.remove(0);
238-
nBulk = 0;
239-
}
240-
if (nBulk > 0) {
241-
// Prefer to pull bulk revisions:
242-
// Note: ArrayList.addAll(Collection) iterates parameter collection
243-
// https://github.com/couchbase/couchbase-lite-java-core/issues/361
244-
synchronized (bulkRevsToPull) {
245-
bulkWorkToStartNow.addAll(bulkRevsToPull.subList(0, nBulk));
224+
225+
synchronized (bulkRevsToPull) {
226+
while (httpConnectionCount + workToStartNow.size() < MAX_OPEN_HTTP_CONNECTIONS) {
227+
int nBulk = (bulkRevsToPull.size() < MAX_REVS_TO_GET_IN_BULK) ? bulkRevsToPull.size() : MAX_REVS_TO_GET_IN_BULK;
228+
229+
if (nBulk == 1) {
230+
// Rather than pulling a single revision in 'bulk', just pull it normally:
231+
queueRemoteRevision(bulkRevsToPull.remove(0));
232+
nBulk = 0;
246233
}
247-
bulkRevsToPull.subList(0, nBulk).clear();
248-
} else {
249-
// Prefer to pull an existing revision over a deleted one:
250-
List<RevisionInternal> queue = revsToPull;
251-
if (queue == null || queue.size() == 0) {
252-
queue = deletedRevsToPull;
253-
if (queue == null || queue.size() == 0)
234+
235+
if (nBulk > 0) {
236+
bulkWorkToStartNow.addAll(bulkRevsToPull.subList(0, nBulk));
237+
bulkRevsToPull.subList(0, nBulk).clear();
238+
} else {
239+
// Prefer to pull an existing revision over a deleted one:
240+
if (revsToPull.size() == 0 && deletedRevsToPull.size() == 0) {
254241
break; // both queues are empty
242+
} else if (revsToPull.size() > 0) {
243+
workToStartNow.add(revsToPull.remove(0));
244+
} else if (deletedRevsToPull.size() > 0) {
245+
workToStartNow.add(deletedRevsToPull.remove(0));
246+
}
255247
}
256-
workToStartNow.add(queue.get(0));
257-
queue.remove(0);
258248
}
259249
}
260250

261251
//actually run it outside the synchronized block
262-
if(bulkWorkToStartNow.size() > 0) {
252+
if (bulkWorkToStartNow.size() > 0) {
263253
pullBulkRevisions(bulkWorkToStartNow);
264254
}
265255

@@ -609,7 +599,7 @@ public void pullRemoteRevision(final RevisionInternal rev) {
609599
//create a final version of this variable for the log statement inside
610600
//FIXME find a way to avoid this
611601
final String pathInside = path.toString();
612-
Future future = sendAsyncMultipartDownloaderRequest("GET", pathInside, null, db, new RemoteRequestCompletionBlock() {
602+
CustomFuture future = sendAsyncMultipartDownloaderRequest("GET", pathInside, null, db, new RemoteRequestCompletionBlock() {
613603

614604
@Override
615605
public void onCompletion(HttpResponse httpResponse, Object result, Throwable e) {
@@ -620,22 +610,24 @@ public void onCompletion(HttpResponse httpResponse, Object result, Throwable e)
620610
Map<String, Object> properties = (Map<String, Object>) result;
621611
PulledRevision gotRev = new PulledRevision(properties);
622612
gotRev.setSequence(rev.getSequence());
623-
624-
//if(gotRev.getBody() != null)
625-
// gotRev.getBody().compact();
626-
613+
627614
Log.d(Log.TAG_SYNC, "%s: pullRemoteRevision add rev: %s to batcher: %s", PullerInternal.this, gotRev, downloadsToInsert);
628615

616+
if(gotRev.getBody() != null)
617+
gotRev.getBody().compact();
618+
629619
// Add to batcher ... eventually it will be fed to -insertRevisions:.
630620
downloadsToInsert.queueObject(gotRev);
631621
}
632622

633623
// Note that we've finished this task:
634624
--httpConnectionCount;
625+
635626
// Start another task if there are still revisions waiting to be pulled:
636627
pullRemoteRevisions();
637628
}
638629
});
630+
future.setQueue(pendingFutures);
639631
pendingFutures.add(future);
640632
}
641633

@@ -659,14 +651,8 @@ public String joinQuotedEscaped(List<String> strings) {
659651
@InterfaceAudience.Private
660652
protected void queueRemoteRevision(RevisionInternal rev) {
661653
if (rev.isDeleted()) {
662-
if (deletedRevsToPull == null) {
663-
deletedRevsToPull = new ArrayList<RevisionInternal>(100);
664-
}
665-
666654
deletedRevsToPull.add(rev);
667655
} else {
668-
if (revsToPull == null)
669-
revsToPull = new ArrayList<RevisionInternal>(100);
670656
revsToPull.add(rev);
671657
}
672658
}
@@ -702,27 +688,22 @@ public HttpClient getHttpClient() {
702688

703689
@Override
704690
public void changeTrackerReceivedChange(final Map<String, Object> change) {
705-
// this callback will be on the changetracker thread, but we need
706-
// to do the work on the replicator thread.
707-
synchronized (workExecutor) {
708-
if (!workExecutor.isShutdown()) {
709-
workExecutor.submit(new Runnable() {
710-
@Override
711-
public void run() {
712-
try {
713-
Log.d(Log.TAG_SYNC, "changeTrackerReceivedChange: %s", change);
714-
processChangeTrackerChange(change);
715-
} catch (Exception e) {
716-
Log.e(Log.TAG_SYNC, "Error processChangeTrackerChange(): %s", e);
717-
e.printStackTrace();
718-
throw new RuntimeException(e);
719-
}
720-
}
721-
});
722-
}
691+
try {
692+
Log.d(Log.TAG_SYNC, "changeTrackerReceivedChange: %s", change);
693+
processChangeTrackerChange(change);
694+
} catch (Exception e) {
695+
Log.e(Log.TAG_SYNC, "Error processChangeTrackerChange(): %s", e);
696+
throw new RuntimeException(e);
723697
}
724698
}
725699

700+
/**
701+
* in CBL_Puller.m
702+
* - (void) changeTrackerReceivedSequence: (id)remoteSequenceID
703+
* docID: (NSString*)docID
704+
* revIDs: (NSArray*)revIDs
705+
* deleted: (BOOL)deleted
706+
*/
726707
protected void processChangeTrackerChange(final Map<String, Object> change)
727708
{
728709
String lastSequence = change.get("seq").toString();
@@ -744,6 +725,11 @@ protected void processChangeTrackerChange(final Map<String, Object> change)
744725
}
745726
PulledRevision rev = new PulledRevision(docID, revID, deleted);
746727
rev.setRemoteSequenceID(lastSequence);
728+
729+
// TODO: Need to do conflict check?
730+
// if (revIDs.count > 1)
731+
// rev.conflicted = true;
732+
747733
Log.d(Log.TAG_SYNC, "%s: adding rev to inbox %s", this, rev);
748734

749735
Log.v(Log.TAG_SYNC, "%s: changeTrackerReceivedChange() incrementing changesCount by 1", this);

src/main/java/com/couchbase/lite/replicator/ReplicationInternal.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import com.couchbase.lite.support.BatchProcessor;
1212
import com.couchbase.lite.support.Batcher;
1313
import com.couchbase.lite.support.BlockingQueueListener;
14+
import com.couchbase.lite.support.CustomFuture;
1415
import com.couchbase.lite.support.CustomLinkedBlockingQueue;
1516
import com.couchbase.lite.support.HttpClientFactory;
1617
import com.couchbase.lite.support.RemoteRequestCompletionBlock;
@@ -596,7 +597,7 @@ public Future sendAsyncMultipartRequest(String method, String relativePath, Mult
596597
* @exclude
597598
*/
598599
@InterfaceAudience.Private
599-
public Future sendAsyncMultipartDownloaderRequest(String method, String relativePath, Object body, Database db, RemoteRequestCompletionBlock onCompletion) {
600+
public CustomFuture sendAsyncMultipartDownloaderRequest(String method, String relativePath, Object body, Database db, RemoteRequestCompletionBlock onCompletion) {
600601
try {
601602

602603
String urlStr = buildRelativeURLString(relativePath);
@@ -617,7 +618,7 @@ public Future sendAsyncMultipartDownloaderRequest(String method, String relative
617618

618619
request.setAuthenticator(getAuthenticator());
619620

620-
Future future = request.submit();
621+
CustomFuture future = request.submit();
621622
return future;
622623
} catch (MalformedURLException e) {
623624
Log.e(Log.TAG_SYNC, "Malformed URL for async request", e);

src/main/java/com/couchbase/lite/support/Batcher.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import java.util.Iterator;
88
import java.util.List;
99
import java.util.concurrent.BlockingQueue;
10+
import java.util.concurrent.CancellationException;
1011
import java.util.concurrent.ExecutionException;
1112
import java.util.concurrent.Future;
1213
import java.util.concurrent.LinkedBlockingQueue;
@@ -107,28 +108,25 @@ public void queueObjects(List<T> objects) {
107108
}
108109

109110
public void waitForPendingFutures() {
110-
111111
Log.d(Log.TAG_BATCHER, "%s: waitForPendingFutures", this);
112-
113112
try {
114-
115113
while (!pendingFutures.isEmpty()) {
116114
Future future = pendingFutures.take();
117115
try {
118116
Log.d(Log.TAG_BATCHER, "calling future.get() on %s", future);
119117
future.get();
120118
Log.d(Log.TAG_BATCHER, "done calling future.get() on %s", future);
121-
} catch (InterruptedException e) {
122-
e.printStackTrace();
119+
} catch (CancellationException e) {
120+
Log.i(Log.TAG_BATCHER, "Task was canceled: " + e.getMessage());
123121
} catch (ExecutionException e) {
124-
e.printStackTrace();
122+
Log.e(Log.TAG_BATCHER, "ERROR: Task aborted: " + e.getMessage());
123+
} catch (InterruptedException e) {
124+
Log.w(Log.TAG_BATCHER, e.getMessage());
125125
}
126126
}
127-
128127
} catch (Exception e) {
129128
Log.e(Log.TAG_BATCHER, "Exception waiting for pending futures: %s", e);
130129
}
131-
132130
Log.d(Log.TAG_BATCHER, "%s: /waitForPendingFutures", this);
133131
}
134132

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package com.couchbase.lite.support;
2+
3+
import java.util.Queue;
4+
import java.util.concurrent.Future;
5+
6+
/**
7+
* Created by hideki on 5/21/15.
8+
*/
9+
public interface CustomFuture<V> extends Future<V> {
10+
void setQueue(Queue queue);
11+
}

src/main/java/com/couchbase/lite/support/RemoteRequestRetry.java

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import java.io.IOException;
1313
import java.net.URL;
1414
import java.util.Map;
15+
import java.util.Queue;
1516
import java.util.concurrent.BlockingQueue;
1617
import java.util.concurrent.ExecutionException;
1718
import java.util.concurrent.ExecutorService;
@@ -31,7 +32,7 @@
3132
* in between the retries.
3233
*
3334
*/
34-
public class RemoteRequestRetry<T> implements Future<T> {
35+
public class RemoteRequestRetry<T> implements CustomFuture<T> {
3536

3637
public static int MAX_RETRIES = 3; // total number of attempts = 4 (1 initial + MAX_RETRIES)
3738
public static int RETRY_DELAY_MS = 4 * 1000; // 4 sec
@@ -64,9 +65,18 @@ public class RemoteRequestRetry<T> implements Future<T> {
6465

6566
private RemoteRequestType requestType;
6667

68+
6769
// for Retry task
6870
ScheduledFuture retryFuture = null;
6971

72+
private Queue queue = null;
73+
74+
@Override
75+
public void setQueue(Queue queue) {
76+
this.queue = queue;
77+
}
78+
79+
7080
/**
7181
* The kind of RemoteRequest that will be created on each retry attempt
7282
*/
@@ -105,14 +115,14 @@ public RemoteRequestRetry(RemoteRequestType requestType,
105115

106116
}
107117

108-
public Future submit() {
118+
public CustomFuture submit() {
109119
return submit(false);
110120
}
111121

112122
/**
113123
* @param gzip true - send gzipped request
114124
*/
115-
public Future submit(boolean gzip) {
125+
public CustomFuture submit(boolean gzip) {
116126

117127
RemoteRequest request = generateRemoteRequest();
118128

@@ -185,6 +195,12 @@ private RemoteRequest generateRemoteRequest() {
185195
return request;
186196
}
187197

198+
void removeFromQueue() {
199+
if (queue != null) {
200+
queue.remove(this);
201+
setQueue(null);
202+
}
203+
}
188204

189205
RemoteRequestCompletionBlock onCompletionInner = new RemoteRequestCompletionBlock() {
190206

@@ -193,7 +209,15 @@ private void completed(HttpResponse httpResponse, Object result, Throwable e) {
193209
requestResult = result;
194210
requestThrowable = e;
195211
completedSuccessfully.set(true);
212+
196213
onCompletionCaller.onCompletion(requestHttpResponse, requestResult, requestThrowable);
214+
215+
// release unnecessary references to reduce memory usage as soon as called onComplete().
216+
requestHttpResponse = null;
217+
requestResult = null;
218+
requestThrowable = null;
219+
220+
removeFromQueue();
197221
}
198222

199223
@Override

0 commit comments

Comments
 (0)