Skip to content

Commit 5dbf88c

Browse files
committed
GH-5807 fix: avoid aborting live LMDB read transactions
1 parent 04570b0 commit 5dbf88c

2 files changed

Lines changed: 132 additions & 40 deletions

File tree

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TxnManager.java

Lines changed: 15 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,9 @@ private long startReadTxn() throws IOException {
7474
private int retryStartReadTxn(MemoryStack stack, PointerBuffer pp) throws IOException {
7575
int rc = MDB_READERS_FULL;
7676
for (int i = 0; i < READERS_FULL_RETRIES && rc == MDB_READERS_FULL; i++) {
77-
closeInactiveReaders();
77+
closePooledReaders();
7878
checkForDeadReaders(stack);
79-
waitForActiveReaderToClose();
79+
waitForTrackedReaderToClose(null);
8080
rc = mdb_txn_begin(env, NULL, MDB_RDONLY, pp);
8181
}
8282
return rc;
@@ -87,19 +87,6 @@ private void checkForDeadReaders(MemoryStack stack) throws IOException {
8787
E(mdb_reader_check(env, dead));
8888
}
8989

90-
private void closeInactiveReaders() {
91-
closeInactiveReaders(null);
92-
}
93-
94-
private void closeInactiveReaders(Txn excludedTxn) {
95-
closePooledReaders();
96-
for (Txn txn : inactiveTransactions()) {
97-
if (txn != excludedTxn) {
98-
txn.closeInactive();
99-
}
100-
}
101-
}
102-
10390
private void closePooledReaders() {
10491
if (mode == Mode.RESET) {
10592
synchronized (pool) {
@@ -112,9 +99,9 @@ private void closePooledReaders() {
11299
}
113100
}
114101

115-
private void waitForActiveReaderToClose() throws IOException {
102+
private void waitForTrackedReaderToClose(Txn excludedTxn) throws IOException {
116103
synchronized (active) {
117-
if (active.containsValue(Boolean.TRUE)) {
104+
if (hasTrackedReaders(excludedTxn)) {
118105
try {
119106
active.wait(READERS_FULL_WAIT_MILLIS);
120107
} catch (InterruptedException e) {
@@ -125,6 +112,15 @@ private void waitForActiveReaderToClose() throws IOException {
125112
}
126113
}
127114

115+
private boolean hasTrackedReaders(Txn excludedTxn) {
116+
for (Txn txn : active.keySet()) {
117+
if (txn != excludedTxn) {
118+
return true;
119+
}
120+
}
121+
return false;
122+
}
123+
128124
/**
129125
* Wraps an existing transaction into a txn reference object.
130126
*
@@ -227,18 +223,6 @@ private List<Txn> activeTransactions() {
227223
}
228224
}
229225

230-
private List<Txn> inactiveTransactions() {
231-
List<Txn> inactiveTransactions = new ArrayList<>();
232-
synchronized (active) {
233-
for (var entry : active.entrySet()) {
234-
if (!entry.getValue()) {
235-
inactiveTransactions.add(entry.getKey());
236-
}
237-
}
238-
}
239-
return inactiveTransactions;
240-
}
241-
242226
private void updateActiveState(Txn txn, boolean isActive) {
243227
synchronized (active) {
244228
if (active.containsKey(txn)) {
@@ -332,8 +316,6 @@ synchronized void reset() throws IOException {
332316
txnActive = false;
333317
updateActiveState(this, false);
334318
activate();
335-
} else {
336-
closeInactive();
337319
}
338320
version++;
339321
}
@@ -373,13 +355,6 @@ private void deactivate() {
373355
updateActiveState(this, false);
374356
}
375357

376-
private synchronized void closeInactive() {
377-
if (!closed && !txnActive && txn != 0) {
378-
mdb_txn_abort(txn);
379-
txn = 0;
380-
}
381-
}
382-
383358
long version() {
384359
return version;
385360
}
@@ -399,9 +374,9 @@ private int retryRenewReadTxn(long txn, Txn excludedTxn) throws IOException {
399374
int rc = MDB_READERS_FULL;
400375
try (MemoryStack stack = stackPush()) {
401376
for (int i = 0; i < READERS_FULL_RETRIES && rc == MDB_READERS_FULL; i++) {
402-
closeInactiveReaders(excludedTxn);
377+
closePooledReaders();
403378
checkForDeadReaders(stack);
404-
waitForActiveReaderToClose();
379+
waitForTrackedReaderToClose(excludedTxn);
405380
rc = mdb_txn_renew(txn);
406381
}
407382
}
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*******************************************************************************
2+
* Copyright (c) 2026 Eclipse RDF4J contributors.
3+
*
4+
* All rights reserved. This program and the accompanying materials
5+
* are made available under the terms of the Eclipse Distribution License v1.0
6+
* which accompanies this distribution, and is available at
7+
* http://www.eclipse.org/org/documents/edl-v10.php.
8+
*
9+
* SPDX-License-Identifier: BSD-3-Clause
10+
*******************************************************************************/
11+
// Some portions generated by Codex
12+
package org.eclipse.rdf4j.sail.lmdb;
13+
14+
import static org.eclipse.rdf4j.sail.lmdb.LmdbUtil.E;
15+
import static org.junit.jupiter.api.Assertions.assertEquals;
16+
import static org.junit.jupiter.api.Assertions.assertTrue;
17+
import static org.lwjgl.system.MemoryStack.stackPush;
18+
import static org.lwjgl.system.MemoryUtil.NULL;
19+
import static org.lwjgl.util.lmdb.LMDB.MDB_NOMETASYNC;
20+
import static org.lwjgl.util.lmdb.LMDB.MDB_NOSYNC;
21+
import static org.lwjgl.util.lmdb.LMDB.MDB_NOTLS;
22+
import static org.lwjgl.util.lmdb.LMDB.MDB_RDONLY;
23+
import static org.lwjgl.util.lmdb.LMDB.mdb_env_close;
24+
import static org.lwjgl.util.lmdb.LMDB.mdb_env_create;
25+
import static org.lwjgl.util.lmdb.LMDB.mdb_env_open;
26+
import static org.lwjgl.util.lmdb.LMDB.mdb_env_set_maxreaders;
27+
import static org.lwjgl.util.lmdb.LMDB.mdb_txn_abort;
28+
import static org.lwjgl.util.lmdb.LMDB.mdb_txn_begin;
29+
30+
import java.io.IOException;
31+
import java.nio.file.Path;
32+
33+
import org.junit.jupiter.api.Test;
34+
import org.junit.jupiter.api.io.TempDir;
35+
import org.lwjgl.PointerBuffer;
36+
import org.lwjgl.system.MemoryStack;
37+
38+
public class TxnManagerTest {
39+
40+
@Test
41+
public void readersFullRetryDoesNotAbortTrackedInactiveTxn(@TempDir Path dataDir) throws Exception {
42+
long env = openEnv(dataDir, 2);
43+
long rawTxn1 = 0;
44+
TxnManager.Txn trackedTxn = null;
45+
TxnManager.Txn extraTxn = null;
46+
47+
try {
48+
TxnManager txnManager = new TxnManager(env, TxnManager.Mode.RESET);
49+
trackedTxn = txnManager.createReadTxn();
50+
long trackedHandle = trackedTxn.get();
51+
52+
txnManager.deactivate();
53+
rawTxn1 = beginReadTxn(env);
54+
55+
try {
56+
extraTxn = txnManager.createReadTxn();
57+
} catch (IOException exception) {
58+
assertTrue(exception.getMessage().contains("MDB_READERS_FULL"), exception.getMessage());
59+
}
60+
assertEquals(trackedHandle, trackedTxn.get(),
61+
"Reader recovery must not abort an inactive transaction that is still owned by a live caller");
62+
} finally {
63+
if (extraTxn != null) {
64+
extraTxn.close();
65+
}
66+
if (trackedTxn != null) {
67+
trackedTxn.close();
68+
}
69+
if (rawTxn1 != 0) {
70+
mdb_txn_abort(rawTxn1);
71+
}
72+
mdb_env_close(env);
73+
}
74+
}
75+
76+
@Test
77+
public void resetDoesNotAbortTrackedInactiveTxn(@TempDir Path dataDir) throws Exception {
78+
long env = openEnv(dataDir, 2);
79+
TxnManager.Txn trackedTxn = null;
80+
81+
try {
82+
TxnManager txnManager = new TxnManager(env, TxnManager.Mode.RESET);
83+
trackedTxn = txnManager.createReadTxn();
84+
long trackedHandle = trackedTxn.get();
85+
86+
txnManager.deactivate();
87+
txnManager.reset();
88+
89+
assertEquals(trackedHandle, trackedTxn.get(),
90+
"Reset must not abort an inactive transaction that is still owned by a live caller");
91+
} finally {
92+
if (trackedTxn != null) {
93+
trackedTxn.close();
94+
}
95+
mdb_env_close(env);
96+
}
97+
}
98+
99+
private static long openEnv(Path dataDir, int maxReaders) throws IOException {
100+
try (MemoryStack stack = stackPush()) {
101+
PointerBuffer pp = stack.mallocPointer(1);
102+
E(mdb_env_create(pp));
103+
long env = pp.get(0);
104+
E(mdb_env_set_maxreaders(env, maxReaders));
105+
E(mdb_env_open(env, dataDir.toAbsolutePath().toString(), MDB_NOTLS | MDB_NOSYNC | MDB_NOMETASYNC, 0664));
106+
return env;
107+
}
108+
}
109+
110+
private static long beginReadTxn(long env) throws IOException {
111+
try (MemoryStack stack = stackPush()) {
112+
PointerBuffer pp = stack.mallocPointer(1);
113+
E(mdb_txn_begin(env, NULL, MDB_RDONLY, pp));
114+
return pp.get(0);
115+
}
116+
}
117+
}

0 commit comments

Comments
 (0)