Skip to content

Commit 04570b0

Browse files
committed
GH-0000 fix: recover inactive LMDB readers when full
1 parent 4e26cff commit 04570b0

2 files changed

Lines changed: 233 additions & 31 deletions

File tree

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TxnManager.java

Lines changed: 200 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,27 @@
88
*
99
* SPDX-License-Identifier: BSD-3-Clause
1010
*******************************************************************************/
11+
// Some portions generated by Codex
1112
package org.eclipse.rdf4j.sail.lmdb;
1213

1314
import static org.eclipse.rdf4j.sail.lmdb.LmdbUtil.E;
1415
import static org.lwjgl.system.MemoryStack.stackPush;
1516
import static org.lwjgl.system.MemoryUtil.NULL;
1617
import static org.lwjgl.util.lmdb.LMDB.MDB_RDONLY;
18+
import static org.lwjgl.util.lmdb.LMDB.MDB_READERS_FULL;
19+
import static org.lwjgl.util.lmdb.LMDB.MDB_SUCCESS;
20+
import static org.lwjgl.util.lmdb.LMDB.mdb_reader_check;
1721
import static org.lwjgl.util.lmdb.LMDB.mdb_txn_abort;
1822
import static org.lwjgl.util.lmdb.LMDB.mdb_txn_begin;
1923
import static org.lwjgl.util.lmdb.LMDB.mdb_txn_renew;
2024
import static org.lwjgl.util.lmdb.LMDB.mdb_txn_reset;
2125

2226
import java.io.Closeable;
2327
import java.io.IOException;
28+
import java.nio.IntBuffer;
29+
import java.util.ArrayList;
2430
import java.util.IdentityHashMap;
31+
import java.util.List;
2532

2633
import org.eclipse.rdf4j.common.concurrent.locks.StampedLongAdderLockManager;
2734
import org.eclipse.rdf4j.sail.SailException;
@@ -34,6 +41,9 @@
3441
*/
3542
class TxnManager {
3643

44+
private static final int READERS_FULL_RETRIES = 500;
45+
private static final long READERS_FULL_WAIT_MILLIS = 10L;
46+
3747
private final Mode mode;
3848
private final IdentityHashMap<Txn, Boolean> active = new IdentityHashMap<>();
3949
private final long[] pool;
@@ -51,12 +61,70 @@ private long startReadTxn() throws IOException {
5161
long readTxn;
5262
try (MemoryStack stack = stackPush()) {
5363
PointerBuffer pp = stack.mallocPointer(1);
54-
E(mdb_txn_begin(env, NULL, MDB_RDONLY, pp));
64+
int rc = mdb_txn_begin(env, NULL, MDB_RDONLY, pp);
65+
if (rc == MDB_READERS_FULL) {
66+
rc = retryStartReadTxn(stack, pp);
67+
}
68+
E(rc);
5569
readTxn = pp.get(0);
5670
}
5771
return readTxn;
5872
}
5973

74+
private int retryStartReadTxn(MemoryStack stack, PointerBuffer pp) throws IOException {
75+
int rc = MDB_READERS_FULL;
76+
for (int i = 0; i < READERS_FULL_RETRIES && rc == MDB_READERS_FULL; i++) {
77+
closeInactiveReaders();
78+
checkForDeadReaders(stack);
79+
waitForActiveReaderToClose();
80+
rc = mdb_txn_begin(env, NULL, MDB_RDONLY, pp);
81+
}
82+
return rc;
83+
}
84+
85+
private void checkForDeadReaders(MemoryStack stack) throws IOException {
86+
IntBuffer dead = stack.mallocInt(1);
87+
E(mdb_reader_check(env, dead));
88+
}
89+
90+
private void closeInactiveReaders() {
91+
closeInactiveReaders(null);
92+
}
93+
94+
private void closeInactiveReaders(Txn excludedTxn) {
95+
closePooledReaders();
96+
for (Txn txn : inactiveTransactions()) {
97+
if (txn != excludedTxn) {
98+
txn.closeInactive();
99+
}
100+
}
101+
}
102+
103+
private void closePooledReaders() {
104+
if (mode == Mode.RESET) {
105+
synchronized (pool) {
106+
while (poolIndex >= 0) {
107+
long txn = pool[poolIndex];
108+
pool[poolIndex--] = 0;
109+
mdb_txn_abort(txn);
110+
}
111+
}
112+
}
113+
}
114+
115+
private void waitForActiveReaderToClose() throws IOException {
116+
synchronized (active) {
117+
if (active.containsValue(Boolean.TRUE)) {
118+
try {
119+
active.wait(READERS_FULL_WAIT_MILLIS);
120+
} catch (InterruptedException e) {
121+
Thread.currentThread().interrupt();
122+
throw new IOException(e);
123+
}
124+
}
125+
}
126+
}
127+
60128
/**
61129
* Wraps an existing transaction into a txn reference object.
62130
*
@@ -97,7 +165,12 @@ long createReadTxnInternal() throws IOException {
97165
if (txn == 0) {
98166
txn = startReadTxn();
99167
} else {
100-
mdb_txn_renew(txn);
168+
try {
169+
renewReadTxn(txn, null);
170+
} catch (IOException e) {
171+
mdb_txn_abort(txn);
172+
throw e;
173+
}
101174
}
102175
} else {
103176
txn = startReadTxn();
@@ -131,25 +204,48 @@ StampedLongAdderLockManager lockManager() {
131204
}
132205

133206
void activate() throws IOException {
134-
synchronized (active) {
135-
for (Txn txn : active.keySet()) {
136-
txn.setActive(true);
137-
}
207+
for (Txn txn : activeTransactions()) {
208+
txn.setActive(true);
138209
}
139210
}
140211

141212
void deactivate() throws IOException {
213+
for (Txn txn : activeTransactions()) {
214+
txn.setActive(false);
215+
}
216+
}
217+
218+
void reset() throws IOException {
219+
for (Txn txn : activeTransactions()) {
220+
txn.reset();
221+
}
222+
}
223+
224+
private List<Txn> activeTransactions() {
225+
synchronized (active) {
226+
return new ArrayList<>(active.keySet());
227+
}
228+
}
229+
230+
private List<Txn> inactiveTransactions() {
231+
List<Txn> inactiveTransactions = new ArrayList<>();
142232
synchronized (active) {
143-
for (Txn txn : active.keySet()) {
144-
txn.setActive(false);
233+
for (var entry : active.entrySet()) {
234+
if (!entry.getValue()) {
235+
inactiveTransactions.add(entry.getKey());
236+
}
145237
}
146238
}
239+
return inactiveTransactions;
147240
}
148241

149-
void reset() throws IOException {
242+
private void updateActiveState(Txn txn, boolean isActive) {
150243
synchronized (active) {
151-
for (Txn txn : active.keySet()) {
152-
txn.reset();
244+
if (active.containsKey(txn)) {
245+
active.put(txn, isActive);
246+
}
247+
if (!isActive) {
248+
active.notifyAll();
153249
}
154250
}
155251
}
@@ -162,8 +258,10 @@ enum Mode {
162258

163259
class Txn implements Closeable, AutoCloseable {
164260

165-
private final long txn;
261+
private long txn;
166262
private long version;
263+
private boolean txnActive = true;
264+
private boolean closed;
167265

168266
Txn(long txn) {
169267
this.txn = txn;
@@ -177,12 +275,18 @@ StampedLongAdderLockManager lockManager() {
177275
return lockManager;
178276
}
179277

180-
private void free(long txn) {
278+
private void free(boolean resetTxn) {
279+
if (txn == 0) {
280+
return;
281+
}
282+
181283
switch (mode) {
182284
case RESET:
183285
synchronized (pool) {
184286
if (poolIndex < pool.length - 1) {
185-
mdb_txn_reset(txn);
287+
if (resetTxn) {
288+
mdb_txn_reset(txn);
289+
}
186290
pool[++poolIndex] = txn;
187291
} else {
188292
mdb_txn_abort(txn);
@@ -195,39 +299,112 @@ private void free(long txn) {
195299
case NONE:
196300
break;
197301
}
302+
txn = 0;
198303
}
199304

200305
@Override
201-
public void close() {
202-
synchronized (active) {
203-
active.remove(this);
306+
public synchronized void close() {
307+
if (closed) {
308+
return;
309+
}
310+
closed = true;
311+
synchronized (TxnManager.this.active) {
312+
TxnManager.this.active.remove(this);
313+
}
314+
try {
315+
free(txnActive);
316+
} finally {
317+
synchronized (TxnManager.this.active) {
318+
TxnManager.this.active.notifyAll();
319+
}
204320
}
205-
free(txn);
206321
}
207322

208323
/**
209324
* Resets current transaction as it points to "old" data.
210325
*/
211-
void reset() throws IOException {
212-
mdb_txn_reset(txn);
213-
E(mdb_txn_renew(txn));
326+
synchronized void reset() throws IOException {
327+
if (closed) {
328+
return;
329+
}
330+
if (txnActive) {
331+
mdb_txn_reset(txn);
332+
txnActive = false;
333+
updateActiveState(this, false);
334+
activate();
335+
} else {
336+
closeInactive();
337+
}
214338
version++;
215339
}
216340

217341
/**
218342
* Triggers active state of current transaction.
219343
*/
220-
void setActive(boolean active) throws IOException {
344+
synchronized void setActive(boolean active) throws IOException {
345+
if (closed) {
346+
return;
347+
}
221348
if (active) {
222-
E(mdb_txn_renew(txn));
349+
activate();
223350
version++;
224351
} else {
352+
deactivate();
353+
}
354+
}
355+
356+
private void activate() throws IOException {
357+
if (!txnActive) {
358+
if (txn == 0) {
359+
txn = startReadTxn();
360+
} else {
361+
renewReadTxn(txn, this);
362+
}
363+
txnActive = true;
364+
updateActiveState(this, true);
365+
}
366+
}
367+
368+
private void deactivate() {
369+
if (txnActive && txn != 0) {
225370
mdb_txn_reset(txn);
226371
}
372+
txnActive = false;
373+
updateActiveState(this, false);
374+
}
375+
376+
private synchronized void closeInactive() {
377+
if (!closed && !txnActive && txn != 0) {
378+
mdb_txn_abort(txn);
379+
txn = 0;
380+
}
227381
}
228382

229383
long version() {
230384
return version;
231385
}
232386
}
387+
388+
private void renewReadTxn(long txn, Txn excludedTxn) throws IOException {
389+
int rc = mdb_txn_renew(txn);
390+
if (rc == MDB_READERS_FULL) {
391+
rc = retryRenewReadTxn(txn, excludedTxn);
392+
}
393+
if (rc != MDB_SUCCESS) {
394+
E(rc);
395+
}
396+
}
397+
398+
private int retryRenewReadTxn(long txn, Txn excludedTxn) throws IOException {
399+
int rc = MDB_READERS_FULL;
400+
try (MemoryStack stack = stackPush()) {
401+
for (int i = 0; i < READERS_FULL_RETRIES && rc == MDB_READERS_FULL; i++) {
402+
closeInactiveReaders(excludedTxn);
403+
checkForDeadReaders(stack);
404+
waitForActiveReaderToClose();
405+
rc = mdb_txn_renew(txn);
406+
}
407+
}
408+
return rc;
409+
}
233410
}

0 commit comments

Comments
 (0)