Skip to content

Commit 79fc493

Browse files
committed
Merge pull request #1166 from couchbase/feature/issue_1147_postChangeNotifications
FIXED #1147 - [Scenario 4] Missing changes in Database Change Notific…
2 parents 35b59c5 + 59626fd commit 79fc493

1 file changed

Lines changed: 70 additions & 54 deletions

File tree

src/main/java/com/couchbase/lite/Database.java

Lines changed: 70 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@
5858
import java.util.List;
5959
import java.util.Map;
6060
import java.util.Set;
61-
import java.util.concurrent.CopyOnWriteArrayList;
6261
import java.util.concurrent.Future;
6362
import java.util.concurrent.ScheduledExecutorService;
6463
import java.util.concurrent.atomic.AtomicBoolean;
@@ -90,7 +89,7 @@ public class Database implements StoreDelegate {
9089
private Store store = null;
9190
private String path;
9291
private String name;
93-
private boolean open = false;
92+
final private AtomicBoolean open = new AtomicBoolean(false);
9493

9594
private Map<String, View> views;
9695
private Map<String, String> viewDocTypes;
@@ -103,11 +102,12 @@ public class Database implements StoreDelegate {
103102

104103
private BlobStore attachments;
105104
private Manager manager;
106-
final private CopyOnWriteArrayList<ChangeListener> changeListeners;
107-
final private CopyOnWriteArrayList<DatabaseListener> databaseListeners;
105+
final private Set<ChangeListener> changeListeners;
106+
final private Set<DatabaseListener> databaseListeners;
108107
private Cache<String, Document> docCache;
109-
private List<DocumentChange> changesToNotify;
108+
final private List<DocumentChange> changesToNotify;
110109
private boolean postingChangeNotifications;
110+
final private Object lockPostingChangeNotifications = new Object();
111111
private long startTime;
112112

113113
/**
@@ -140,12 +140,13 @@ public Database(String path, String name, Manager manager, boolean readOnly) {
140140
this.name = name != null ? name : FileDirUtils.getDatabaseNameFromPath(path);
141141
this.manager = manager;
142142
this.startTime = System.currentTimeMillis();
143-
this.changeListeners = new CopyOnWriteArrayList<ChangeListener>();
144-
this.databaseListeners = new CopyOnWriteArrayList<DatabaseListener>();
143+
this.changeListeners = Collections.synchronizedSet(new HashSet<ChangeListener>());
144+
this.databaseListeners = Collections.synchronizedSet(new HashSet<DatabaseListener>());
145145
this.docCache = new Cache<String, Document>();
146-
this.changesToNotify = new ArrayList<DocumentChange>();
146+
this.changesToNotify = Collections.synchronizedList(new ArrayList<DocumentChange>());
147147
this.activeReplicators = Collections.synchronizedSet(new HashSet());
148148
this.allReplicators = Collections.synchronizedSet(new HashSet());
149+
this.postingChangeNotifications = false;
149150
}
150151

151152
///////////////////////////////////////////////////////////////////////////
@@ -235,7 +236,7 @@ public String getName() {
235236
*/
236237
@InterfaceAudience.Public
237238
public void addChangeListener(ChangeListener listener) {
238-
changeListeners.addIfAbsent(listener);
239+
changeListeners.add(listener);
239240
}
240241

241242
/**
@@ -332,7 +333,7 @@ public Replication createPushReplication(URL remote) {
332333
*/
333334
@InterfaceAudience.Public
334335
public void delete() throws CouchbaseLiteException {
335-
if (open) {
336+
if (open.get()) {
336337
if (!close()) {
337338
throw new CouchbaseLiteException("The database was open, and could not be closed",
338339
Status.INTERNAL_SERVER_ERROR);
@@ -683,13 +684,14 @@ public String toString() {
683684
public void storageExitedTransaction(boolean committed) {
684685
if (!committed) {
685686
// I already told cached CBLDocuments about these new revisions. Back that out:
686-
for (DocumentChange change : changesToNotify) {
687-
Document doc = cachedDocumentWithID(change.getDocumentId());
688-
if (doc != null) {
689-
doc.forgetCurrentRevision();
687+
synchronized (changesToNotify) {
688+
for (DocumentChange change : changesToNotify) {
689+
Document doc = cachedDocumentWithID(change.getDocumentId());
690+
if (doc != null)
691+
doc.forgetCurrentRevision();
690692
}
693+
changesToNotify.clear();
691694
}
692-
changesToNotify.clear();
693695
}
694696
postChangeNotifications();
695697
}
@@ -701,9 +703,6 @@ public void storageExitedTransaction(boolean committed) {
701703
@InterfaceAudience.Private
702704
public void databaseStorageChanged(DocumentChange change) {
703705
Log.v(Log.TAG_DATABASE, "Added: " + change.getAddedRevision());
704-
if (changesToNotify == null) {
705-
changesToNotify = new ArrayList<DocumentChange>();
706-
}
707706
changesToNotify.add(change);
708707
if (!postChangeNotifications()) {
709708
// The notification wasn't posted yet, probably because a transaction is open.
@@ -719,8 +718,9 @@ public void databaseStorageChanged(DocumentChange change) {
719718
// Squish the change objects if too many of them are piling up:
720719
if (changesToNotify.size() >= MANY_CHANGES_TO_NOTIFY) {
721720
if (changesToNotify.size() == MANY_CHANGES_TO_NOTIFY) {
722-
for (DocumentChange c : changesToNotify) {
723-
c.reduceMemoryUsage();
721+
synchronized (changesToNotify) {
722+
for (DocumentChange c : changesToNotify)
723+
c.reduceMemoryUsage();
724724
}
725725
} else {
726726
change.reduceMemoryUsage();
@@ -1016,7 +1016,7 @@ public interface DatabaseListener {
10161016
// NOTE: router-only
10171017
@InterfaceAudience.Private
10181018
public void addDatabaseListener(DatabaseListener listener) {
1019-
databaseListeners.addIfAbsent(listener);
1019+
databaseListeners.add(listener);
10201020
}
10211021

10221022
// NOTE: router-only
@@ -1080,7 +1080,7 @@ public synchronized void open() throws CouchbaseLiteException {
10801080

10811081
@InterfaceAudience.Private
10821082
public synchronized void open(DatabaseOptions options) throws CouchbaseLiteException {
1083-
if (open)
1083+
if (open.get())
10841084
return;
10851085

10861086
Log.v(TAG, "Opening %s", this);
@@ -1199,7 +1199,7 @@ public synchronized void open(DatabaseOptions options) throws CouchbaseLiteExcep
11991199
Status.INTERNAL_SERVER_ERROR);
12001200
}
12011201

1202-
open = true;
1202+
open.set(true);
12031203

12041204
if (upgrade) {
12051205
Log.i(TAG, "Upgrading to %s ...", storageType);
@@ -1251,14 +1251,16 @@ SymmetricKey createSymmetricKey(Object keyOrPassword) throws CouchbaseLiteExcept
12511251

12521252
@InterfaceAudience.Public
12531253
public boolean close() {
1254-
if (!open) {
1254+
if (!open.get()) {
12551255
// Ensure that the database is forgotten:
12561256
manager.forgetDatabase(this);
12571257
return false;
12581258
}
12591259

1260-
for (DatabaseListener listener : databaseListeners)
1261-
listener.databaseClosing();
1260+
synchronized (databaseListeners) {
1261+
for (DatabaseListener listener : databaseListeners)
1262+
listener.databaseClosing();
1263+
}
12621264

12631265
if (views != null) {
12641266
for (View view : views.values())
@@ -1303,7 +1305,7 @@ public boolean close() {
13031305
// Forget database:
13041306
manager.forgetDatabase(this);
13051307

1306-
open = false;
1308+
open.set(false);
13071309
return true;
13081310
}
13091311

@@ -1846,7 +1848,7 @@ public long getStartTime() {
18461848
*/
18471849
@InterfaceAudience.Private
18481850
public boolean isOpen() {
1849-
return open;
1851+
return open.get();
18501852
}
18511853

18521854
@InterfaceAudience.Private
@@ -2194,48 +2196,62 @@ protected void rememberAttachmentWriter(BlobStoreWriter writer) {
21942196
// Database+Insertion
21952197

21962198
private boolean postChangeNotifications() {
2197-
boolean posted = false;
2198-
// This is a 'while' instead of an 'if' because when we finish posting notifications, there
2199-
// might be new ones that have arrived as a result of notification handlers making document
2200-
// changes of their own (the replicator manager will do this.) So we need to check again.
2201-
while (!store.inTransaction() && isOpen() && !postingChangeNotifications
2202-
&& changesToNotify.size() > 0) {
2203-
2204-
try {
2205-
postingChangeNotifications = true; // Disallow re-entrant calls
2206-
2199+
synchronized (lockPostingChangeNotifications) {
2200+
if (postingChangeNotifications)
2201+
return false;
2202+
postingChangeNotifications = true;
2203+
}
2204+
try {
2205+
boolean posted = false;
2206+
// This is a 'while' instead of an 'if' because when we finish posting notifications, there
2207+
// might be new ones that have arrived as a result of notification handlers making document
2208+
// changes of their own (the replicator manager will do this.) So we need to check again.
2209+
while (!store.inTransaction() && open.get() && changesToNotify.size() > 0) {
22072210
List<DocumentChange> outgoingChanges = new ArrayList<DocumentChange>();
2208-
outgoingChanges.addAll(changesToNotify);
2209-
changesToNotify.clear();
2211+
synchronized (changesToNotify) {
2212+
outgoingChanges.addAll(changesToNotify);
2213+
changesToNotify.clear();
2214+
}
22102215

22112216
// TODO: postPublicChangeNotification in CBLDatabase+Internal.m should replace
22122217
// following lines of code.
2213-
22142218
boolean isExternal = false;
22152219
for (DocumentChange change : outgoingChanges) {
22162220
Document doc = cachedDocumentWithID(change.getDocumentId());
2217-
if (doc != null) {
2221+
if (doc != null)
22182222
doc.revisionAdded(change, true);
2219-
}
2220-
if (change.getSource() != null) {
2223+
if (change.getSource() != null)
22212224
isExternal = true;
2222-
}
22232225
}
22242226

2225-
ChangeEvent changeEvent = new ChangeEvent(this, isExternal, outgoingChanges);
2226-
2227-
for (ChangeListener changeListener : changeListeners) {
2228-
changeListener.changed(changeEvent);
2227+
final ChangeEvent changeEvent = new ChangeEvent(this, isExternal, outgoingChanges);
2228+
synchronized (changeListeners) {
2229+
for (ChangeListener changeListener : changeListeners) {
2230+
if (changeListener != null) {
2231+
try {
2232+
changeListener.changed(changeEvent);
2233+
} catch (Exception ex) {
2234+
// Implementation of ChangeListener might throw RuntimeException,
2235+
// ignore it.
2236+
Log.e(TAG, "%s got exception posting change notification: %s",
2237+
ex, this, changeListener);
2238+
}
2239+
}
2240+
}
22292241
}
2230-
22312242
posted = true;
2232-
} catch (Exception e) {
2233-
Log.e(Database.TAG, this + " got exception posting change notifications", e);
2234-
} finally {
2243+
}
2244+
return posted;
2245+
} catch (Exception e) {
2246+
// In general, non of methods that are used in this method throws Exception.
2247+
// This catch block is just in case RuntimeException is thrown.
2248+
Log.e(TAG, "Unknown Exception: %s got exception posting change notifications", e, this);
2249+
return false;
2250+
} finally {
2251+
synchronized (lockPostingChangeNotifications) {
22352252
postingChangeNotifications = false;
22362253
}
22372254
}
2238-
return posted;
22392255
}
22402256

22412257
// Database+Replication

0 commit comments

Comments
 (0)