diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TxnManager.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TxnManager.java index 7b5ec043da..99a083492a 100644 --- a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TxnManager.java +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TxnManager.java @@ -8,12 +8,16 @@ * * SPDX-License-Identifier: BSD-3-Clause *******************************************************************************/ +// Some portions generated by Codex package org.eclipse.rdf4j.sail.lmdb; import static org.eclipse.rdf4j.sail.lmdb.LmdbUtil.E; import static org.lwjgl.system.MemoryStack.stackPush; import static org.lwjgl.system.MemoryUtil.NULL; import static org.lwjgl.util.lmdb.LMDB.MDB_RDONLY; +import static org.lwjgl.util.lmdb.LMDB.MDB_READERS_FULL; +import static org.lwjgl.util.lmdb.LMDB.MDB_SUCCESS; +import static org.lwjgl.util.lmdb.LMDB.mdb_reader_check; import static org.lwjgl.util.lmdb.LMDB.mdb_txn_abort; import static org.lwjgl.util.lmdb.LMDB.mdb_txn_begin; import static org.lwjgl.util.lmdb.LMDB.mdb_txn_renew; @@ -21,7 +25,10 @@ import java.io.Closeable; import java.io.IOException; +import java.nio.IntBuffer; +import java.util.ArrayList; import java.util.IdentityHashMap; +import java.util.List; import org.eclipse.rdf4j.common.concurrent.locks.StampedLongAdderLockManager; import org.eclipse.rdf4j.sail.SailException; @@ -34,6 +41,9 @@ */ class TxnManager { + private static final int READERS_FULL_RETRIES = 500; + private static final long READERS_FULL_WAIT_MILLIS = 10L; + private final Mode mode; private final IdentityHashMap active = new IdentityHashMap<>(); private final long[] pool; @@ -51,12 +61,66 @@ private long startReadTxn() throws IOException { long readTxn; try (MemoryStack stack = stackPush()) { PointerBuffer pp = stack.mallocPointer(1); - E(mdb_txn_begin(env, NULL, MDB_RDONLY, pp)); + int rc = mdb_txn_begin(env, NULL, MDB_RDONLY, pp); + if (rc == MDB_READERS_FULL) { + rc = retryStartReadTxn(stack, pp); + } + E(rc); readTxn = pp.get(0); } return readTxn; } + private int retryStartReadTxn(MemoryStack stack, PointerBuffer pp) throws IOException { + int rc = MDB_READERS_FULL; + for (int i = 0; i < READERS_FULL_RETRIES && rc == MDB_READERS_FULL; i++) { + closePooledReaders(); + checkForDeadReaders(stack); + waitForTrackedReaderToClose(null); + rc = mdb_txn_begin(env, NULL, MDB_RDONLY, pp); + } + return rc; + } + + private void checkForDeadReaders(MemoryStack stack) throws IOException { + IntBuffer dead = stack.mallocInt(1); + E(mdb_reader_check(env, dead)); + } + + private void closePooledReaders() { + if (mode == Mode.RESET) { + synchronized (pool) { + while (poolIndex >= 0) { + long txn = pool[poolIndex]; + pool[poolIndex--] = 0; + mdb_txn_abort(txn); + } + } + } + } + + private void waitForTrackedReaderToClose(Txn excludedTxn) throws IOException { + synchronized (active) { + if (hasTrackedReaders(excludedTxn)) { + try { + active.wait(READERS_FULL_WAIT_MILLIS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException(e); + } + } + } + } + + private boolean hasTrackedReaders(Txn excludedTxn) { + for (Txn txn : active.keySet()) { + if (txn != excludedTxn) { + return true; + } + } + return false; + } + /** * Wraps an existing transaction into a txn reference object. * @@ -97,7 +161,12 @@ long createReadTxnInternal() throws IOException { if (txn == 0) { txn = startReadTxn(); } else { - mdb_txn_renew(txn); + try { + renewReadTxn(txn, null); + } catch (IOException e) { + mdb_txn_abort(txn); + throw e; + } } } else { txn = startReadTxn(); @@ -131,25 +200,36 @@ StampedLongAdderLockManager lockManager() { } void activate() throws IOException { - synchronized (active) { - for (Txn txn : active.keySet()) { - txn.setActive(true); - } + for (Txn txn : activeTransactions()) { + txn.setActive(true); } } void deactivate() throws IOException { - synchronized (active) { - for (Txn txn : active.keySet()) { - txn.setActive(false); - } + for (Txn txn : activeTransactions()) { + txn.setActive(false); } } void reset() throws IOException { + for (Txn txn : activeTransactions()) { + txn.reset(); + } + } + + private List activeTransactions() { synchronized (active) { - for (Txn txn : active.keySet()) { - txn.reset(); + return new ArrayList<>(active.keySet()); + } + } + + private void updateActiveState(Txn txn, boolean isActive) { + synchronized (active) { + if (active.containsKey(txn)) { + active.put(txn, isActive); + } + if (!isActive) { + active.notifyAll(); } } } @@ -162,8 +242,10 @@ enum Mode { class Txn implements Closeable, AutoCloseable { - private final long txn; + private long txn; private long version; + private boolean txnActive = true; + private boolean closed; Txn(long txn) { this.txn = txn; @@ -177,12 +259,18 @@ StampedLongAdderLockManager lockManager() { return lockManager; } - private void free(long txn) { + private void free(boolean resetTxn) { + if (txn == 0) { + return; + } + switch (mode) { case RESET: synchronized (pool) { if (poolIndex < pool.length - 1) { - mdb_txn_reset(txn); + if (resetTxn) { + mdb_txn_reset(txn); + } pool[++poolIndex] = txn; } else { mdb_txn_abort(txn); @@ -195,39 +283,103 @@ private void free(long txn) { case NONE: break; } + txn = 0; } @Override - public void close() { - synchronized (active) { - active.remove(this); + public synchronized void close() { + if (closed) { + return; + } + closed = true; + synchronized (TxnManager.this.active) { + TxnManager.this.active.remove(this); + } + try { + free(txnActive); + } finally { + synchronized (TxnManager.this.active) { + TxnManager.this.active.notifyAll(); + } } - free(txn); } /** * Resets current transaction as it points to "old" data. */ - void reset() throws IOException { - mdb_txn_reset(txn); - E(mdb_txn_renew(txn)); + synchronized void reset() throws IOException { + if (closed) { + return; + } + if (txnActive) { + mdb_txn_reset(txn); + txnActive = false; + updateActiveState(this, false); + activate(); + } version++; } /** * Triggers active state of current transaction. */ - void setActive(boolean active) throws IOException { + synchronized void setActive(boolean active) throws IOException { + if (closed) { + return; + } if (active) { - E(mdb_txn_renew(txn)); + activate(); version++; } else { + deactivate(); + } + } + + private void activate() throws IOException { + if (!txnActive) { + if (txn == 0) { + txn = startReadTxn(); + } else { + renewReadTxn(txn, this); + } + txnActive = true; + updateActiveState(this, true); + } + } + + private void deactivate() { + if (txnActive && txn != 0) { mdb_txn_reset(txn); } + txnActive = false; + updateActiveState(this, false); } long version() { return version; } } + + private void renewReadTxn(long txn, Txn excludedTxn) throws IOException { + int rc = mdb_txn_renew(txn); + if (rc == MDB_READERS_FULL) { + rc = retryRenewReadTxn(txn, excludedTxn); + } + if (rc != MDB_SUCCESS) { + E(rc); + } + } + + private int retryRenewReadTxn(long txn, Txn excludedTxn) throws IOException { + int rc = MDB_READERS_FULL; + try (MemoryStack stack = stackPush()) { + for (int i = 0; i < READERS_FULL_RETRIES && rc == MDB_READERS_FULL; i++) { + closePooledReaders(); + checkForDeadReaders(stack); + waitForTrackedReaderToClose(excludedTxn); + rc = mdb_txn_renew(txn); + } + } + return rc; + } } diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/ValueStore.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/ValueStore.java index 894ab212ba..3888a45642 100644 --- a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/ValueStore.java +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/ValueStore.java @@ -25,6 +25,7 @@ import static org.lwjgl.util.lmdb.LMDB.MDB_NOTLS; import static org.lwjgl.util.lmdb.LMDB.MDB_PREV; import static org.lwjgl.util.lmdb.LMDB.MDB_RDONLY; +import static org.lwjgl.util.lmdb.LMDB.MDB_READERS_FULL; import static org.lwjgl.util.lmdb.LMDB.MDB_RESERVE; import static org.lwjgl.util.lmdb.LMDB.MDB_SET_RANGE; import static org.lwjgl.util.lmdb.LMDB.MDB_SUCCESS; @@ -914,14 +915,23 @@ private long findId(byte[] data, boolean create) throws IOException { } T readTransaction(long env, Transaction transaction) throws IOException { - txnLock.readLock().lock(); - try { - if (writeTxn != 0) { - return LmdbUtil.readTransaction(env, writeTxn, transaction); + for (int attempt = 0;; attempt++) { + try { + txnLock.readLock().lock(); + try { + if (writeTxn != 0) { + return LmdbUtil.readTransaction(env, writeTxn, transaction); + } + return threadLocalReadTxn.get().execute(transaction, env); + } finally { + txnLock.readLock().unlock(); + } + } catch (ReadersFullException e) { + if (attempt > 0) { + throw e; + } + closeInactiveReadTransactions(); } - return threadLocalReadTxn.get().execute(transaction, env); - } finally { - txnLock.readLock().unlock(); } } @@ -1394,6 +1404,18 @@ private void closeReadTransactions() { } } + private void closeInactiveReadTransactions() { + txnLock.writeLock().lock(); + try { + ReadTxn.State[] snapshot = activeReadTransactions.toArray(new ReadTxn.State[0]); + for (ReadTxn.State readTxn : snapshot) { + readTxn.closeInactiveTxn(); + } + } finally { + txnLock.writeLock().unlock(); + } + } + /** * Closes the ValueStore, releasing any file references, etc. Once closed, the ValueStore can no longer be used. * @@ -1530,6 +1552,12 @@ synchronized void closeTxn() { activeReadTransactions.remove(this); } } + + synchronized void closeInactiveTxn() { + if (initialized && depth == 0) { + closeTxn(); + } + } } public ReadTxn(Set activeReadTransactions, ConcurrentCleaner cleaner) { @@ -1547,6 +1575,8 @@ synchronized T execute(Transaction transaction, long env) throws IOExcept } finally { releaseTxn(); } + } catch (ReadersFullException e) { + throw e; } catch (Exception e) { // Retry once try { @@ -1589,7 +1619,11 @@ private void ensureTxn(long env) throws IOException { private void startTxn(long env) throws IOException { try (MemoryStack stack = MemoryStack.stackPush()) { PointerBuffer pp = stack.mallocPointer(1); - E(mdb_txn_begin(env, NULL, MDB_RDONLY, pp)); + int rc = mdb_txn_begin(env, NULL, MDB_RDONLY, pp); + if (rc == MDB_READERS_FULL) { + throw new ReadersFullException(); + } + E(rc); state.initialize(pp.get(0)); } } @@ -1609,6 +1643,15 @@ void close() { } } + private static final class ReadersFullException extends IOException { + + private static final long serialVersionUID = 1L; + + private ReadersFullException() { + super("MDB_READERS_FULL: Environment maxreaders limit reached"); + } + } + /** * Checks if the supplied Value object is a LmdbValue object that has been created by this ValueStore. */ diff --git a/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/TxnManagerTest.java b/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/TxnManagerTest.java new file mode 100644 index 0000000000..b00e118b04 --- /dev/null +++ b/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/TxnManagerTest.java @@ -0,0 +1,117 @@ +/******************************************************************************* + * Copyright (c) 2026 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ +// Some portions generated by Codex +package org.eclipse.rdf4j.sail.lmdb; + +import static org.eclipse.rdf4j.sail.lmdb.LmdbUtil.E; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.lwjgl.system.MemoryStack.stackPush; +import static org.lwjgl.system.MemoryUtil.NULL; +import static org.lwjgl.util.lmdb.LMDB.MDB_NOMETASYNC; +import static org.lwjgl.util.lmdb.LMDB.MDB_NOSYNC; +import static org.lwjgl.util.lmdb.LMDB.MDB_NOTLS; +import static org.lwjgl.util.lmdb.LMDB.MDB_RDONLY; +import static org.lwjgl.util.lmdb.LMDB.mdb_env_close; +import static org.lwjgl.util.lmdb.LMDB.mdb_env_create; +import static org.lwjgl.util.lmdb.LMDB.mdb_env_open; +import static org.lwjgl.util.lmdb.LMDB.mdb_env_set_maxreaders; +import static org.lwjgl.util.lmdb.LMDB.mdb_txn_abort; +import static org.lwjgl.util.lmdb.LMDB.mdb_txn_begin; + +import java.io.IOException; +import java.nio.file.Path; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.lwjgl.PointerBuffer; +import org.lwjgl.system.MemoryStack; + +public class TxnManagerTest { + + @Test + public void readersFullRetryDoesNotAbortTrackedInactiveTxn(@TempDir Path dataDir) throws Exception { + long env = openEnv(dataDir, 2); + long rawTxn1 = 0; + TxnManager.Txn trackedTxn = null; + TxnManager.Txn extraTxn = null; + + try { + TxnManager txnManager = new TxnManager(env, TxnManager.Mode.RESET); + trackedTxn = txnManager.createReadTxn(); + long trackedHandle = trackedTxn.get(); + + txnManager.deactivate(); + rawTxn1 = beginReadTxn(env); + + try { + extraTxn = txnManager.createReadTxn(); + } catch (IOException exception) { + assertTrue(exception.getMessage().contains("MDB_READERS_FULL"), exception.getMessage()); + } + assertEquals(trackedHandle, trackedTxn.get(), + "Reader recovery must not abort an inactive transaction that is still owned by a live caller"); + } finally { + if (extraTxn != null) { + extraTxn.close(); + } + if (trackedTxn != null) { + trackedTxn.close(); + } + if (rawTxn1 != 0) { + mdb_txn_abort(rawTxn1); + } + mdb_env_close(env); + } + } + + @Test + public void resetDoesNotAbortTrackedInactiveTxn(@TempDir Path dataDir) throws Exception { + long env = openEnv(dataDir, 2); + TxnManager.Txn trackedTxn = null; + + try { + TxnManager txnManager = new TxnManager(env, TxnManager.Mode.RESET); + trackedTxn = txnManager.createReadTxn(); + long trackedHandle = trackedTxn.get(); + + txnManager.deactivate(); + txnManager.reset(); + + assertEquals(trackedHandle, trackedTxn.get(), + "Reset must not abort an inactive transaction that is still owned by a live caller"); + } finally { + if (trackedTxn != null) { + trackedTxn.close(); + } + mdb_env_close(env); + } + } + + private static long openEnv(Path dataDir, int maxReaders) throws IOException { + try (MemoryStack stack = stackPush()) { + PointerBuffer pp = stack.mallocPointer(1); + E(mdb_env_create(pp)); + long env = pp.get(0); + E(mdb_env_set_maxreaders(env, maxReaders)); + E(mdb_env_open(env, dataDir.toAbsolutePath().toString(), MDB_NOTLS | MDB_NOSYNC | MDB_NOMETASYNC, 0664)); + return env; + } + } + + private static long beginReadTxn(long env) throws IOException { + try (MemoryStack stack = stackPush()) { + PointerBuffer pp = stack.mallocPointer(1); + E(mdb_txn_begin(env, NULL, MDB_RDONLY, pp)); + return pp.get(0); + } + } +} diff --git a/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/ValueStoreTest.java b/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/ValueStoreTest.java index 68310a0a51..0f3c2d1a65 100644 --- a/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/ValueStoreTest.java +++ b/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/ValueStoreTest.java @@ -28,6 +28,10 @@ import java.util.List; import java.util.Random; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReadWriteLock; import java.util.stream.Collectors; import org.eclipse.rdf4j.model.IRI; @@ -407,6 +411,58 @@ public void testStaleBNodeHashCodeIgnoresReusedIdAfterClear() throws Exception { assertEquals("stale values must keep their original hash after a revision change", firstHash, first.hashCode()); } + @Test + public void testReadersFullCleanupDoesNotCloseInUseReadTxn() throws Exception { + ReadWriteLock txnLock = getField(valueStore, "txnLock", ReadWriteLock.class); + Set activeReadTransactions = getField(valueStore, "activeReadTransactions", Set.class); + long env = getLongField(valueStore, "env"); + + CountDownLatch readLockReleased = new CountDownLatch(1); + CountDownLatch cleanupAttempted = new CountDownLatch(1); + AtomicReference workerFailure = new AtomicReference<>(); + + Thread worker = new Thread(() -> { + try { + valueStore.readTransaction(env, (stack, txn) -> { + txnLock.readLock().unlock(); + try { + readLockReleased.countDown(); + try { + assertTrue("cleanup should run", cleanupAttempted.await(10, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException(e); + } + } finally { + txnLock.readLock().lock(); + } + return null; + }); + } catch (Throwable t) { + workerFailure.set(t); + } + }, "value-store-read-transaction-gap"); + worker.start(); + + assertTrue("read transaction should reach the resize-style read lock gap", + readLockReleased.await(10, TimeUnit.SECONDS)); + assertEquals("test setup should have one active read transaction", 1, activeReadTransactions.size()); + + try { + invokeCloseInactiveReadTransactions(valueStore); + assertEquals("reader cleanup must not close a read transaction that is still executing", 1, + activeReadTransactions.size()); + } finally { + cleanupAttempted.countDown(); + worker.join(TimeUnit.SECONDS.toMillis(10)); + } + + assertFalse("worker should finish", worker.isAlive()); + if (workerFailure.get() != null) { + throw new AssertionError(workerFailure.get()); + } + } + private long storeValueAndReopen(Value value) throws Exception { return storeValueAndReopen(value, new LmdbStoreConfig()); } @@ -438,6 +494,24 @@ private boolean isInitialized(Object value) throws Exception { return initializedField.getBoolean(value); } + private T getField(Object target, String fieldName, Class fieldType) throws Exception { + Field field = target.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + return fieldType.cast(field.get(target)); + } + + private long getLongField(Object target, String fieldName) throws Exception { + Field field = target.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + return field.getLong(target); + } + + private void invokeCloseInactiveReadTransactions(ValueStore store) throws Exception { + var method = store.getClass().getDeclaredMethod("closeInactiveReadTransactions"); + method.setAccessible(true); + method.invoke(store); + } + @AfterEach public void after() throws Exception { try { diff --git a/tools/server-boot/src/test/java/org/eclipse/rdf4j/tools/serverboot/LmdbTimedOutQueryReadHandleTest.java b/tools/server-boot/src/test/java/org/eclipse/rdf4j/tools/serverboot/LmdbTimedOutQueryReadHandleTest.java new file mode 100644 index 0000000000..176b97f2e5 --- /dev/null +++ b/tools/server-boot/src/test/java/org/eclipse/rdf4j/tools/serverboot/LmdbTimedOutQueryReadHandleTest.java @@ -0,0 +1,395 @@ +/******************************************************************************* + * Copyright (c) 2026 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ +// Some portions generated by Codex +package org.eclipse.rdf4j.tools.serverboot; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.SocketTimeoutException; +import java.net.URI; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.eclipse.rdf4j.http.protocol.Protocol; +import org.eclipse.rdf4j.model.ValueFactory; +import org.eclipse.rdf4j.model.impl.SimpleValueFactory; +import org.eclipse.rdf4j.repository.Repository; +import org.eclipse.rdf4j.repository.RepositoryConnection; +import org.eclipse.rdf4j.repository.RepositoryException; +import org.eclipse.rdf4j.repository.config.RepositoryConfig; +import org.eclipse.rdf4j.repository.config.RepositoryConfigException; +import org.eclipse.rdf4j.repository.manager.RemoteRepositoryManager; +import org.eclipse.rdf4j.repository.sail.config.SailRepositoryConfig; +import org.eclipse.rdf4j.sail.lmdb.config.LmdbStoreConfig; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.LoggerFactory; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.web.server.LocalServerPort; +import org.springframework.http.HttpStatus; + +import ch.qos.logback.classic.Logger; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.read.ListAppender; + +@SpringBootTest(classes = Rdf4jServerWorkbenchApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) +class LmdbTimedOutQueryReadHandleTest { + + private static final int STATEMENT_COUNT = Integer.getInteger("rdf4j.lmdb.timeout.statementCount", 1024); + private static final int QUERY_COUNT = Integer.getInteger("rdf4j.lmdb.timeout.queryCount", 120); + private static final int QUERY_WORKERS = Integer.getInteger("rdf4j.lmdb.timeout.queryWorkers", 8); + private static final int MAX_CAPTURED_RESPONSE_BYTES = 8192; + private static final String PREDICATE = "urn:example:p"; + private static final String SLOW_QUERY = "PREFIX ex: \n" + + "SELECT distinct * WHERE {\n" + + " ?s1 ex:p ?o1 .\n" + + " ?s2 ex:p ?o2 .\n" + + " ?s3 ex:p ?o3 .\n" + + "}"; + private static final String HEALTH_QUERY = "ASK { ?s ?p ?o }"; + + @LocalServerPort + private int port; + + private final ValueFactory valueFactory = SimpleValueFactory.getInstance(); + private final List createdRepositories = new ArrayList<>(); + private final List lmdbLoggers = new ArrayList<>(); + private ListAppender lmdbLogAppender; + private RemoteRepositoryManager repositoryManager; + + @BeforeEach + void setUp() { + attachLmdbLogAppender(); + repositoryManager = RemoteRepositoryManager.getInstance(serverUrl()); + } + + @AfterEach + void tearDown() { + try { + if (repositoryManager == null) { + return; + } + for (String repositoryId : createdRepositories) { + try { + repositoryManager.removeRepository(repositoryId); + } catch (RepositoryException ignored) { + // best-effort cleanup + } + } + createdRepositories.clear(); + repositoryManager.shutDown(); + repositoryManager = null; + } finally { + detachLmdbLogAppender(); + } + } + + @Test + void lmdbRepositoryStillAcceptsQueriesAfterManyTimedOutServerQueries() throws Exception { + String repositoryId = registerLmdbRepository(); + loadData(repositoryId); + + QueryStats stats = runTimedQueries(repositoryId); + System.out.println("LMDB timeout reproducer stats: timeouts=" + stats.timeouts + + ", readerHandleFailures=" + stats.readerHandleFailures + + ", failureSamples=" + stats.failureSamples); + + assertThat(stats.timeouts) + .as("at least one query should hit the one-second server-side timeout; failures: %s", + stats.failureSamples) + .isPositive(); + assertThat(stats.readerHandleFailures) + .as("reader handle failures: %s", stats.failureSamples) + .isZero(); + assertThat(lmdbReaderHandleLogEvents()) + .as("LMDB reader handle log events") + .isEmpty(); + + assertEventuallyHealthy(repositoryId); + } + + private String registerLmdbRepository() throws RepositoryException, RepositoryConfigException { + String repositoryId = "lmdb-timeout-" + UUID.randomUUID(); + RepositoryConfig config = new RepositoryConfig(repositoryId, + new SailRepositoryConfig(new LmdbStoreConfig())); + repositoryManager.addRepositoryConfig(config); + createdRepositories.add(repositoryId); + return repositoryId; + } + + private void loadData(String repositoryId) { + Repository repository = repositoryManager.getRepository(repositoryId); + repository.init(); + try (RepositoryConnection connection = repository.getConnection()) { + for (int i = 0; i < STATEMENT_COUNT; i++) { + connection.add(valueFactory.createIRI("urn:example:s" + i), valueFactory.createIRI(PREDICATE), + valueFactory.createLiteral("value-" + i)); + } + } finally { + repository.shutDown(); + } + } + + private QueryStats runTimedQueries(String repositoryId) throws InterruptedException, ExecutionException { + ExecutorService executor = Executors.newFixedThreadPool(QUERY_WORKERS); + List> futures = new ArrayList<>(QUERY_COUNT); + try { + for (int i = 0; i < QUERY_COUNT; i++) { + futures.add(executor.submit(timedQuery(repositoryId))); + } + + QueryStats stats = new QueryStats(); + for (Future future : futures) { + stats.record(future.get()); + } + return stats; + } finally { + executor.shutdownNow(); + } + } + + private final Random random = new Random(); + + private Callable timedQuery(String repositoryId) { + return () -> { + try { + return executeQuery(repositoryId, SLOW_QUERY, random.nextInt(3) + 1); + } catch (Exception e) { + return new QueryResponse(-1, exceptionMessage(e)); + } + }; + } + + private QueryResponse executeQuery(String repositoryId, String query, int timeoutSeconds) + throws IOException, InterruptedException { + int readTimeoutMillis = timeoutSeconds > 0 ? 3000 : 10000; + return executeQuery(repositoryId, query, timeoutSeconds, readTimeoutMillis); + } + + private QueryResponse executeQuery(String repositoryId, String query, int timeoutSeconds, int readTimeoutMillis) + throws IOException, InterruptedException { + String form = formValue(Protocol.QUERY_PARAM_NAME, query); + if (timeoutSeconds > 0) { + form += "&" + formValue(Protocol.TIMEOUT_PARAM_NAME, Integer.toString(timeoutSeconds)); + } + + HttpURLConnection connection = (HttpURLConnection) repositoryUri(repositoryId).toURL() + .openConnection(); + connection.setConnectTimeout(50000); + connection.setReadTimeout(readTimeoutMillis); + connection.setRequestMethod("POST"); + connection.setRequestProperty("Content-Type", Protocol.FORM_MIME_TYPE); + connection.setRequestProperty("Accept", "application/sparql-results+json"); + connection.setDoOutput(true); + + byte[] body = form.getBytes(StandardCharsets.UTF_8); + connection.setFixedLengthStreamingMode(body.length); + try (OutputStream output = connection.getOutputStream()) { + output.write(body); + } + + try { + int status = connection.getResponseCode(); + return new QueryResponse(status, readResponseBody(connection, status)); + } catch (SocketTimeoutException e) { + return new QueryResponse(-1, exceptionMessage(e)); + } finally { + connection.disconnect(); + } + } + + private void assertEventuallyHealthy(String repositoryId) throws IOException, InterruptedException { + QueryResponse response = null; + long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(60); + while (System.nanoTime() < deadline) { + response = executeQuery(repositoryId, HEALTH_QUERY, 0, 1000); + if (response.status == HttpStatus.OK.value()) { + return; + } + Thread.sleep(250); + } + + assertThat(response) + .as("health query response") + .isNotNull(); + assertThat(response.status) + .as("health query response body: %s", response.body) + .isEqualTo(HttpStatus.OK.value()); + } + + private String readResponseBody(HttpURLConnection connection, int status) throws IOException { + InputStream stream = status >= 400 ? connection.getErrorStream() : connection.getInputStream(); + if (stream == null) { + return ""; + } + + StringBuilder response = new StringBuilder(); + byte[] buffer = new byte[1024]; + int captured = 0; + try (InputStream input = stream) { + int read; + while ((read = input.read(buffer)) != -1) { + if (captured < MAX_CAPTURED_RESPONSE_BYTES) { + int length = Math.min(read, MAX_CAPTURED_RESPONSE_BYTES - captured); + response.append(new String(buffer, 0, length, StandardCharsets.UTF_8)); + captured += length; + } + } + } catch (SocketTimeoutException e) { + if (response.length() > 0) { + response.append('\n'); + } + response.append(exceptionMessage(e)); + } + return response.toString(); + } + + private String formValue(String name, String value) { + return URLEncoder.encode(name, StandardCharsets.UTF_8) + "=" + + URLEncoder.encode(value, StandardCharsets.UTF_8); + } + + private String exceptionMessage(Throwable throwable) { + StringBuilder message = new StringBuilder(); + Throwable cursor = throwable; + while (cursor != null) { + if (message.length() > 0) { + message.append(" -> "); + } + message.append(cursor.getClass().getSimpleName()) + .append(": ") + .append(cursor.getMessage()); + Throwable next = cursor.getCause(); + if (next == null || next == cursor) { + break; + } + cursor = next; + } + return message.toString(); + } + + private URI repositoryUri(String repositoryId) { + return URI.create(serverUrl() + "/repositories/" + repositoryId); + } + + private String serverUrl() { + return "http://localhost:" + port + "/rdf4j-server"; + } + + private void attachLmdbLogAppender() { + lmdbLogAppender = new ListAppender<>(); + lmdbLogAppender.start(); + attachLmdbLogger("org.eclipse.rdf4j.sail.lmdb.LmdbUtil"); + attachLmdbLogger("org.eclipse.rdf4j.sail.lmdb.LmdbEvaluationStatistics"); + } + + private void attachLmdbLogger(String loggerName) { + Logger logger = (Logger) LoggerFactory.getLogger(loggerName); + logger.addAppender(lmdbLogAppender); + lmdbLoggers.add(logger); + } + + private void detachLmdbLogAppender() { + for (Logger logger : lmdbLoggers) { + logger.detachAppender(lmdbLogAppender); + } + lmdbLoggers.clear(); + if (lmdbLogAppender != null) { + lmdbLogAppender.stop(); + lmdbLogAppender = null; + } + } + + private List lmdbReaderHandleLogEvents() { + List events = new ArrayList<>(); + for (ILoggingEvent event : lmdbLogAppender.list) { + String throwableMessage = throwableMessage(event); + if (containsReaderHandleFailure(event.getFormattedMessage()) + || containsReaderHandleFailure(throwableMessage)) { + events.add(event.getLoggerName() + ": " + event.getFormattedMessage() + " " + throwableMessage); + } + } + return events; + } + + private String throwableMessage(ILoggingEvent event) { + if (event.getThrowableProxy() == null) { + return ""; + } + return event.getThrowableProxy().getMessage(); + } + + private static boolean containsReaderHandleFailure(String value) { + String lowerValue = value.toLowerCase(); + return lowerValue.contains("mdb_readers_full") || lowerValue.contains("maxreaders"); + } + + private static final class QueryStats { + private int timeouts; + private int readerHandleFailures; + private final List failureSamples = new ArrayList<>(); + + private void record(QueryResponse response) { + if (isTimeout(response)) { + timeouts++; + return; + } + if (isReaderHandleFailure(response)) { + readerHandleFailures++; + } + if (!isSuccess(response) && failureSamples.size() < 5) { + failureSamples.add("HTTP " + response.status + ": " + response.body); + } + } + + private boolean isSuccess(QueryResponse response) { + return response.status >= 200 && response.status < 300; + } + + private boolean isTimeout(QueryResponse response) { + return (response.status == HttpStatus.SERVICE_UNAVAILABLE.value() + && response.body.contains("Query evaluation took too long")) + || response.body.contains("Query evaluation took too long") + || response.body.contains("SocketTimeoutException") + || response.body.contains("Unexpected end of file from server") + || response.body.contains("Connection reset"); + } + + private boolean isReaderHandleFailure(QueryResponse response) { + return containsReaderHandleFailure(response.body); + } + } + + private static final class QueryResponse { + private final int status; + private final String body; + + private QueryResponse(int status, String body) { + this.status = status; + this.body = body == null ? "" : body; + } + } +}