Skip to content

Commit 8ace36d

Browse files
committed
GH-5807 fix: avoid closing active ValueStore readers
1 parent 5dbf88c commit 8ace36d

2 files changed

Lines changed: 93 additions & 1 deletion

File tree

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/ValueStore.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -930,7 +930,7 @@ <T> T readTransaction(long env, Transaction<T> transaction) throws IOException {
930930
if (attempt > 0) {
931931
throw e;
932932
}
933-
closeReadTransactions();
933+
closeInactiveReadTransactions();
934934
}
935935
}
936936
}
@@ -1404,6 +1404,18 @@ private void closeReadTransactions() {
14041404
}
14051405
}
14061406

1407+
private void closeInactiveReadTransactions() {
1408+
txnLock.writeLock().lock();
1409+
try {
1410+
ReadTxn.State[] snapshot = activeReadTransactions.toArray(new ReadTxn.State[0]);
1411+
for (ReadTxn.State readTxn : snapshot) {
1412+
readTxn.closeInactiveTxn();
1413+
}
1414+
} finally {
1415+
txnLock.writeLock().unlock();
1416+
}
1417+
}
1418+
14071419
/**
14081420
* Closes the ValueStore, releasing any file references, etc. Once closed, the ValueStore can no longer be used.
14091421
*
@@ -1540,6 +1552,12 @@ synchronized void closeTxn() {
15401552
activeReadTransactions.remove(this);
15411553
}
15421554
}
1555+
1556+
synchronized void closeInactiveTxn() {
1557+
if (initialized && depth == 0) {
1558+
closeTxn();
1559+
}
1560+
}
15431561
}
15441562

15451563
public ReadTxn(Set<State> activeReadTransactions, ConcurrentCleaner cleaner) {

core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/ValueStoreTest.java

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@
2828
import java.util.List;
2929
import java.util.Random;
3030
import java.util.Set;
31+
import java.util.concurrent.CountDownLatch;
32+
import java.util.concurrent.TimeUnit;
33+
import java.util.concurrent.atomic.AtomicReference;
34+
import java.util.concurrent.locks.ReadWriteLock;
3135
import java.util.stream.Collectors;
3236

3337
import org.eclipse.rdf4j.model.IRI;
@@ -407,6 +411,58 @@ public void testStaleBNodeHashCodeIgnoresReusedIdAfterClear() throws Exception {
407411
assertEquals("stale values must keep their original hash after a revision change", firstHash, first.hashCode());
408412
}
409413

414+
@Test
415+
public void testReadersFullCleanupDoesNotCloseInUseReadTxn() throws Exception {
416+
ReadWriteLock txnLock = getField(valueStore, "txnLock", ReadWriteLock.class);
417+
Set<?> activeReadTransactions = getField(valueStore, "activeReadTransactions", Set.class);
418+
long env = getLongField(valueStore, "env");
419+
420+
CountDownLatch readLockReleased = new CountDownLatch(1);
421+
CountDownLatch cleanupAttempted = new CountDownLatch(1);
422+
AtomicReference<Throwable> workerFailure = new AtomicReference<>();
423+
424+
Thread worker = new Thread(() -> {
425+
try {
426+
valueStore.readTransaction(env, (stack, txn) -> {
427+
txnLock.readLock().unlock();
428+
try {
429+
readLockReleased.countDown();
430+
try {
431+
assertTrue("cleanup should run", cleanupAttempted.await(10, TimeUnit.SECONDS));
432+
} catch (InterruptedException e) {
433+
Thread.currentThread().interrupt();
434+
throw new IOException(e);
435+
}
436+
} finally {
437+
txnLock.readLock().lock();
438+
}
439+
return null;
440+
});
441+
} catch (Throwable t) {
442+
workerFailure.set(t);
443+
}
444+
}, "value-store-read-transaction-gap");
445+
worker.start();
446+
447+
assertTrue("read transaction should reach the resize-style read lock gap",
448+
readLockReleased.await(10, TimeUnit.SECONDS));
449+
assertEquals("test setup should have one active read transaction", 1, activeReadTransactions.size());
450+
451+
try {
452+
invokeCloseInactiveReadTransactions(valueStore);
453+
assertEquals("reader cleanup must not close a read transaction that is still executing", 1,
454+
activeReadTransactions.size());
455+
} finally {
456+
cleanupAttempted.countDown();
457+
worker.join(TimeUnit.SECONDS.toMillis(10));
458+
}
459+
460+
assertFalse("worker should finish", worker.isAlive());
461+
if (workerFailure.get() != null) {
462+
throw new AssertionError(workerFailure.get());
463+
}
464+
}
465+
410466
private long storeValueAndReopen(Value value) throws Exception {
411467
return storeValueAndReopen(value, new LmdbStoreConfig());
412468
}
@@ -438,6 +494,24 @@ private boolean isInitialized(Object value) throws Exception {
438494
return initializedField.getBoolean(value);
439495
}
440496

497+
private <T> T getField(Object target, String fieldName, Class<T> fieldType) throws Exception {
498+
Field field = target.getClass().getDeclaredField(fieldName);
499+
field.setAccessible(true);
500+
return fieldType.cast(field.get(target));
501+
}
502+
503+
private long getLongField(Object target, String fieldName) throws Exception {
504+
Field field = target.getClass().getDeclaredField(fieldName);
505+
field.setAccessible(true);
506+
return field.getLong(target);
507+
}
508+
509+
private void invokeCloseInactiveReadTransactions(ValueStore store) throws Exception {
510+
var method = store.getClass().getDeclaredMethod("closeInactiveReadTransactions");
511+
method.setAccessible(true);
512+
method.invoke(store);
513+
}
514+
441515
@AfterEach
442516
public void after() throws Exception {
443517
try {

0 commit comments

Comments
 (0)