Skip to content

Commit 4e26cff

Browse files
committed
GH-0000 test: reproduce LMDB reader exhaustion after timeouts
1 parent 75404cb commit 4e26cff

1 file changed

Lines changed: 395 additions & 0 deletions

File tree

Lines changed: 395 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,395 @@
1+
/*******************************************************************************
2+
* Copyright (c) 2026 Eclipse RDF4J contributors.
3+
*
4+
* All rights reserved. This program and the accompanying materials
5+
* are made available under the terms of the Eclipse Distribution License v1.0
6+
* which accompanies this distribution, and is available at
7+
* http://www.eclipse.org/org/documents/edl-v10.php.
8+
*
9+
* SPDX-License-Identifier: BSD-3-Clause
10+
*******************************************************************************/
11+
// Some portions generated by Codex
12+
package org.eclipse.rdf4j.tools.serverboot;
13+
14+
import static org.assertj.core.api.Assertions.assertThat;
15+
16+
import java.io.IOException;
17+
import java.io.InputStream;
18+
import java.io.OutputStream;
19+
import java.net.HttpURLConnection;
20+
import java.net.SocketTimeoutException;
21+
import java.net.URI;
22+
import java.net.URLEncoder;
23+
import java.nio.charset.StandardCharsets;
24+
import java.util.ArrayList;
25+
import java.util.List;
26+
import java.util.Random;
27+
import java.util.UUID;
28+
import java.util.concurrent.Callable;
29+
import java.util.concurrent.ExecutionException;
30+
import java.util.concurrent.ExecutorService;
31+
import java.util.concurrent.Executors;
32+
import java.util.concurrent.Future;
33+
import java.util.concurrent.TimeUnit;
34+
35+
import org.eclipse.rdf4j.http.protocol.Protocol;
36+
import org.eclipse.rdf4j.model.ValueFactory;
37+
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
38+
import org.eclipse.rdf4j.repository.Repository;
39+
import org.eclipse.rdf4j.repository.RepositoryConnection;
40+
import org.eclipse.rdf4j.repository.RepositoryException;
41+
import org.eclipse.rdf4j.repository.config.RepositoryConfig;
42+
import org.eclipse.rdf4j.repository.config.RepositoryConfigException;
43+
import org.eclipse.rdf4j.repository.manager.RemoteRepositoryManager;
44+
import org.eclipse.rdf4j.repository.sail.config.SailRepositoryConfig;
45+
import org.eclipse.rdf4j.sail.lmdb.config.LmdbStoreConfig;
46+
import org.junit.jupiter.api.AfterEach;
47+
import org.junit.jupiter.api.BeforeEach;
48+
import org.junit.jupiter.api.Test;
49+
import org.slf4j.LoggerFactory;
50+
import org.springframework.boot.test.context.SpringBootTest;
51+
import org.springframework.boot.web.server.LocalServerPort;
52+
import org.springframework.http.HttpStatus;
53+
54+
import ch.qos.logback.classic.Logger;
55+
import ch.qos.logback.classic.spi.ILoggingEvent;
56+
import ch.qos.logback.core.read.ListAppender;
57+
58+
@SpringBootTest(classes = Rdf4jServerWorkbenchApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
59+
class LmdbTimedOutQueryReadHandleTest {
60+
61+
private static final int STATEMENT_COUNT = Integer.getInteger("rdf4j.lmdb.timeout.statementCount", 1024);
62+
private static final int QUERY_COUNT = Integer.getInteger("rdf4j.lmdb.timeout.queryCount", 120);
63+
private static final int QUERY_WORKERS = Integer.getInteger("rdf4j.lmdb.timeout.queryWorkers", 8);
64+
private static final int MAX_CAPTURED_RESPONSE_BYTES = 8192;
65+
private static final String PREDICATE = "urn:example:p";
66+
private static final String SLOW_QUERY = "PREFIX ex: <urn:example:>\n"
67+
+ "SELECT distinct * WHERE {\n"
68+
+ " ?s1 ex:p ?o1 .\n"
69+
+ " ?s2 ex:p ?o2 .\n"
70+
+ " ?s3 ex:p ?o3 .\n"
71+
+ "}";
72+
private static final String HEALTH_QUERY = "ASK { ?s ?p ?o }";
73+
74+
@LocalServerPort
75+
private int port;
76+
77+
private final ValueFactory valueFactory = SimpleValueFactory.getInstance();
78+
private final List<String> createdRepositories = new ArrayList<>();
79+
private final List<Logger> lmdbLoggers = new ArrayList<>();
80+
private ListAppender<ILoggingEvent> lmdbLogAppender;
81+
private RemoteRepositoryManager repositoryManager;
82+
83+
@BeforeEach
84+
void setUp() {
85+
attachLmdbLogAppender();
86+
repositoryManager = RemoteRepositoryManager.getInstance(serverUrl());
87+
}
88+
89+
@AfterEach
90+
void tearDown() {
91+
try {
92+
if (repositoryManager == null) {
93+
return;
94+
}
95+
for (String repositoryId : createdRepositories) {
96+
try {
97+
repositoryManager.removeRepository(repositoryId);
98+
} catch (RepositoryException ignored) {
99+
// best-effort cleanup
100+
}
101+
}
102+
createdRepositories.clear();
103+
repositoryManager.shutDown();
104+
repositoryManager = null;
105+
} finally {
106+
detachLmdbLogAppender();
107+
}
108+
}
109+
110+
@Test
111+
void lmdbRepositoryStillAcceptsQueriesAfterManyTimedOutServerQueries() throws Exception {
112+
String repositoryId = registerLmdbRepository();
113+
loadData(repositoryId);
114+
115+
QueryStats stats = runTimedQueries(repositoryId);
116+
System.out.println("LMDB timeout reproducer stats: timeouts=" + stats.timeouts
117+
+ ", readerHandleFailures=" + stats.readerHandleFailures
118+
+ ", failureSamples=" + stats.failureSamples);
119+
120+
assertThat(stats.timeouts)
121+
.as("at least one query should hit the one-second server-side timeout; failures: %s",
122+
stats.failureSamples)
123+
.isPositive();
124+
assertThat(stats.readerHandleFailures)
125+
.as("reader handle failures: %s", stats.failureSamples)
126+
.isZero();
127+
assertThat(lmdbReaderHandleLogEvents())
128+
.as("LMDB reader handle log events")
129+
.isEmpty();
130+
131+
assertEventuallyHealthy(repositoryId);
132+
}
133+
134+
private String registerLmdbRepository() throws RepositoryException, RepositoryConfigException {
135+
String repositoryId = "lmdb-timeout-" + UUID.randomUUID();
136+
RepositoryConfig config = new RepositoryConfig(repositoryId,
137+
new SailRepositoryConfig(new LmdbStoreConfig()));
138+
repositoryManager.addRepositoryConfig(config);
139+
createdRepositories.add(repositoryId);
140+
return repositoryId;
141+
}
142+
143+
private void loadData(String repositoryId) {
144+
Repository repository = repositoryManager.getRepository(repositoryId);
145+
repository.init();
146+
try (RepositoryConnection connection = repository.getConnection()) {
147+
for (int i = 0; i < STATEMENT_COUNT; i++) {
148+
connection.add(valueFactory.createIRI("urn:example:s" + i), valueFactory.createIRI(PREDICATE),
149+
valueFactory.createLiteral("value-" + i));
150+
}
151+
} finally {
152+
repository.shutDown();
153+
}
154+
}
155+
156+
private QueryStats runTimedQueries(String repositoryId) throws InterruptedException, ExecutionException {
157+
ExecutorService executor = Executors.newFixedThreadPool(QUERY_WORKERS);
158+
List<Future<QueryResponse>> futures = new ArrayList<>(QUERY_COUNT);
159+
try {
160+
for (int i = 0; i < QUERY_COUNT; i++) {
161+
futures.add(executor.submit(timedQuery(repositoryId)));
162+
}
163+
164+
QueryStats stats = new QueryStats();
165+
for (Future<QueryResponse> future : futures) {
166+
stats.record(future.get());
167+
}
168+
return stats;
169+
} finally {
170+
executor.shutdownNow();
171+
}
172+
}
173+
174+
private final Random random = new Random();
175+
176+
private Callable<QueryResponse> timedQuery(String repositoryId) {
177+
return () -> {
178+
try {
179+
return executeQuery(repositoryId, SLOW_QUERY, random.nextInt(3) + 1);
180+
} catch (Exception e) {
181+
return new QueryResponse(-1, exceptionMessage(e));
182+
}
183+
};
184+
}
185+
186+
private QueryResponse executeQuery(String repositoryId, String query, int timeoutSeconds)
187+
throws IOException, InterruptedException {
188+
int readTimeoutMillis = timeoutSeconds > 0 ? 3000 : 10000;
189+
return executeQuery(repositoryId, query, timeoutSeconds, readTimeoutMillis);
190+
}
191+
192+
private QueryResponse executeQuery(String repositoryId, String query, int timeoutSeconds, int readTimeoutMillis)
193+
throws IOException, InterruptedException {
194+
String form = formValue(Protocol.QUERY_PARAM_NAME, query);
195+
if (timeoutSeconds > 0) {
196+
form += "&" + formValue(Protocol.TIMEOUT_PARAM_NAME, Integer.toString(timeoutSeconds));
197+
}
198+
199+
HttpURLConnection connection = (HttpURLConnection) repositoryUri(repositoryId).toURL()
200+
.openConnection();
201+
connection.setConnectTimeout(50000);
202+
connection.setReadTimeout(readTimeoutMillis);
203+
connection.setRequestMethod("POST");
204+
connection.setRequestProperty("Content-Type", Protocol.FORM_MIME_TYPE);
205+
connection.setRequestProperty("Accept", "application/sparql-results+json");
206+
connection.setDoOutput(true);
207+
208+
byte[] body = form.getBytes(StandardCharsets.UTF_8);
209+
connection.setFixedLengthStreamingMode(body.length);
210+
try (OutputStream output = connection.getOutputStream()) {
211+
output.write(body);
212+
}
213+
214+
try {
215+
int status = connection.getResponseCode();
216+
return new QueryResponse(status, readResponseBody(connection, status));
217+
} catch (SocketTimeoutException e) {
218+
return new QueryResponse(-1, exceptionMessage(e));
219+
} finally {
220+
connection.disconnect();
221+
}
222+
}
223+
224+
private void assertEventuallyHealthy(String repositoryId) throws IOException, InterruptedException {
225+
QueryResponse response = null;
226+
long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(60);
227+
while (System.nanoTime() < deadline) {
228+
response = executeQuery(repositoryId, HEALTH_QUERY, 0, 1000);
229+
if (response.status == HttpStatus.OK.value()) {
230+
return;
231+
}
232+
Thread.sleep(250);
233+
}
234+
235+
assertThat(response)
236+
.as("health query response")
237+
.isNotNull();
238+
assertThat(response.status)
239+
.as("health query response body: %s", response.body)
240+
.isEqualTo(HttpStatus.OK.value());
241+
}
242+
243+
private String readResponseBody(HttpURLConnection connection, int status) throws IOException {
244+
InputStream stream = status >= 400 ? connection.getErrorStream() : connection.getInputStream();
245+
if (stream == null) {
246+
return "";
247+
}
248+
249+
StringBuilder response = new StringBuilder();
250+
byte[] buffer = new byte[1024];
251+
int captured = 0;
252+
try (InputStream input = stream) {
253+
int read;
254+
while ((read = input.read(buffer)) != -1) {
255+
if (captured < MAX_CAPTURED_RESPONSE_BYTES) {
256+
int length = Math.min(read, MAX_CAPTURED_RESPONSE_BYTES - captured);
257+
response.append(new String(buffer, 0, length, StandardCharsets.UTF_8));
258+
captured += length;
259+
}
260+
}
261+
} catch (SocketTimeoutException e) {
262+
if (response.length() > 0) {
263+
response.append('\n');
264+
}
265+
response.append(exceptionMessage(e));
266+
}
267+
return response.toString();
268+
}
269+
270+
private String formValue(String name, String value) {
271+
return URLEncoder.encode(name, StandardCharsets.UTF_8) + "="
272+
+ URLEncoder.encode(value, StandardCharsets.UTF_8);
273+
}
274+
275+
private String exceptionMessage(Throwable throwable) {
276+
StringBuilder message = new StringBuilder();
277+
Throwable cursor = throwable;
278+
while (cursor != null) {
279+
if (message.length() > 0) {
280+
message.append(" -> ");
281+
}
282+
message.append(cursor.getClass().getSimpleName())
283+
.append(": ")
284+
.append(cursor.getMessage());
285+
Throwable next = cursor.getCause();
286+
if (next == null || next == cursor) {
287+
break;
288+
}
289+
cursor = next;
290+
}
291+
return message.toString();
292+
}
293+
294+
private URI repositoryUri(String repositoryId) {
295+
return URI.create(serverUrl() + "/repositories/" + repositoryId);
296+
}
297+
298+
private String serverUrl() {
299+
return "http://localhost:" + port + "/rdf4j-server";
300+
}
301+
302+
private void attachLmdbLogAppender() {
303+
lmdbLogAppender = new ListAppender<>();
304+
lmdbLogAppender.start();
305+
attachLmdbLogger("org.eclipse.rdf4j.sail.lmdb.LmdbUtil");
306+
attachLmdbLogger("org.eclipse.rdf4j.sail.lmdb.LmdbEvaluationStatistics");
307+
}
308+
309+
private void attachLmdbLogger(String loggerName) {
310+
Logger logger = (Logger) LoggerFactory.getLogger(loggerName);
311+
logger.addAppender(lmdbLogAppender);
312+
lmdbLoggers.add(logger);
313+
}
314+
315+
private void detachLmdbLogAppender() {
316+
for (Logger logger : lmdbLoggers) {
317+
logger.detachAppender(lmdbLogAppender);
318+
}
319+
lmdbLoggers.clear();
320+
if (lmdbLogAppender != null) {
321+
lmdbLogAppender.stop();
322+
lmdbLogAppender = null;
323+
}
324+
}
325+
326+
private List<String> lmdbReaderHandleLogEvents() {
327+
List<String> events = new ArrayList<>();
328+
for (ILoggingEvent event : lmdbLogAppender.list) {
329+
String throwableMessage = throwableMessage(event);
330+
if (containsReaderHandleFailure(event.getFormattedMessage())
331+
|| containsReaderHandleFailure(throwableMessage)) {
332+
events.add(event.getLoggerName() + ": " + event.getFormattedMessage() + " " + throwableMessage);
333+
}
334+
}
335+
return events;
336+
}
337+
338+
private String throwableMessage(ILoggingEvent event) {
339+
if (event.getThrowableProxy() == null) {
340+
return "";
341+
}
342+
return event.getThrowableProxy().getMessage();
343+
}
344+
345+
private static boolean containsReaderHandleFailure(String value) {
346+
String lowerValue = value.toLowerCase();
347+
return lowerValue.contains("mdb_readers_full") || lowerValue.contains("maxreaders");
348+
}
349+
350+
private static final class QueryStats {
351+
private int timeouts;
352+
private int readerHandleFailures;
353+
private final List<String> failureSamples = new ArrayList<>();
354+
355+
private void record(QueryResponse response) {
356+
if (isTimeout(response)) {
357+
timeouts++;
358+
return;
359+
}
360+
if (isReaderHandleFailure(response)) {
361+
readerHandleFailures++;
362+
}
363+
if (!isSuccess(response) && failureSamples.size() < 5) {
364+
failureSamples.add("HTTP " + response.status + ": " + response.body);
365+
}
366+
}
367+
368+
private boolean isSuccess(QueryResponse response) {
369+
return response.status >= 200 && response.status < 300;
370+
}
371+
372+
private boolean isTimeout(QueryResponse response) {
373+
return (response.status == HttpStatus.SERVICE_UNAVAILABLE.value()
374+
&& response.body.contains("Query evaluation took too long"))
375+
|| response.body.contains("Query evaluation took too long")
376+
|| response.body.contains("SocketTimeoutException")
377+
|| response.body.contains("Unexpected end of file from server")
378+
|| response.body.contains("Connection reset");
379+
}
380+
381+
private boolean isReaderHandleFailure(QueryResponse response) {
382+
return containsReaderHandleFailure(response.body);
383+
}
384+
}
385+
386+
private static final class QueryResponse {
387+
private final int status;
388+
private final String body;
389+
390+
private QueryResponse(int status, String body) {
391+
this.status = status;
392+
this.body = body == null ? "" : body;
393+
}
394+
}
395+
}

0 commit comments

Comments
 (0)