Skip to content

Commit 516ba51

Browse files
committed
fixes
1 parent bd8ebc4 commit 516ba51

15 files changed

Lines changed: 559 additions & 111 deletions

core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/SketchBasedJoinEstimator.java

Lines changed: 157 additions & 47 deletions
Large diffs are not rendered by default.

core/sail/base/src/test/java/org/eclipse/rdf4j/sail/base/SketchBasedJoinEstimatorBackgroundRefreshTest.java

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,13 @@
1212

1313
package org.eclipse.rdf4j.sail.base;
1414

15+
import static org.junit.jupiter.api.Assertions.assertEquals;
16+
import static org.junit.jupiter.api.Assertions.assertFalse;
1517
import static org.junit.jupiter.api.Assertions.assertTrue;
1618

19+
import java.nio.file.Path;
20+
import java.util.List;
21+
import java.util.concurrent.TimeUnit;
1722
import java.util.concurrent.atomic.AtomicInteger;
1823

1924
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
@@ -29,6 +34,7 @@
2934
import org.eclipse.rdf4j.query.algebra.evaluation.impl.EvaluationStatistics;
3035
import org.eclipse.rdf4j.sail.SailException;
3136
import org.junit.jupiter.api.Test;
37+
import org.junit.jupiter.api.io.TempDir;
3238

3339
class SketchBasedJoinEstimatorBackgroundRefreshTest {
3440

@@ -55,8 +61,43 @@ void backgroundRefreshDoesNotRebuildEmptyStoreEveryTick() throws Exception {
5561
}
5662
}
5763

64+
@Test
65+
void backgroundRefreshLoadsPersistedSnapshotWithoutCallerReadyLazyLoad(@TempDir Path dataDir) throws Exception {
66+
CountingEmptyStore store = new CountingEmptyStore(VF.createStatement(VF.createIRI("urn:s"),
67+
VF.createIRI("urn:p"), VF.createIRI("urn:o")));
68+
SketchBasedJoinEstimator estimator = new SketchBasedJoinEstimator(store,
69+
SketchBasedJoinEstimator.Config.defaults()
70+
.withNominalEntries(64)
71+
.withThrottleEveryN(1)
72+
.withThrottleMillis(0)
73+
.withRefreshSleepMillis(10));
74+
estimator.configurePersistence(dataDir.resolve("join-estimator.rjes"), false);
75+
try {
76+
estimator.rebuild();
77+
assertTrue(estimator.isReadyNonBlocking());
78+
assertTrue(estimator.persistIfDirty());
79+
estimator.unload();
80+
81+
assertFalse(estimator.isReadyNonBlocking(), "Nonblocking readiness should not load persisted sketches");
82+
assertEquals(1, store.rebuildCount());
83+
84+
estimator.startBackgroundRefresh(3);
85+
86+
assertTrue(estimator.awaitReady(5, TimeUnit.SECONDS),
87+
"Background refresh should load the persisted snapshot");
88+
assertEquals(1, store.rebuildCount(), "Background snapshot load should not rebuild the store");
89+
} finally {
90+
estimator.close();
91+
}
92+
}
93+
5894
private static final class CountingEmptyStore implements SailStore {
5995
private final AtomicInteger rebuildCount = new AtomicInteger();
96+
private final List<Statement> statements;
97+
98+
CountingEmptyStore(Statement... statements) {
99+
this.statements = List.of(statements);
100+
}
60101

61102
int rebuildCount() {
62103
return rebuildCount.get();
@@ -115,7 +156,7 @@ public CloseableIteration<? extends Resource> getContextIDs() {
115156
public CloseableIteration<? extends Statement> getStatements(Resource subj, IRI pred, Value obj,
116157
Resource... contexts) {
117158
rebuildCount.incrementAndGet();
118-
return new CloseableIteratorIteration<>(java.util.Collections.<Statement>emptyIterator());
159+
return new CloseableIteratorIteration<>(statements.iterator());
119160
}
120161
};
121162
}

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbEvaluationStatistics.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,7 @@ public boolean supportsJoinEstimation() {
105105
return joinSupportCacheValue;
106106
}
107107

108-
boolean ready = sketchBasedJoinEstimator.isReady();
109-
// boolean ready = ensureRobustJoinEstimationReady();
108+
boolean ready = sketchBasedJoinEstimator.isReadyNonBlocking();
110109
joinSupportCacheValue = ready;
111110
joinSupportCacheRevisionId = revisionId;
112111
joinSupportCacheExpiryMs = now + JOIN_SUPPORT_CACHE_TTL_MS;

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStore.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,7 @@ public LmdbSailStore(File dataDir, StoreProperties properties, LmdbStoreConfig c
328328
sketchBasedJoinEstimator.setPatternCardinalityProvider(statementPatternCardinalitySource::estimate);
329329
sketchBasedJoinEstimator.configurePersistence(estimatorPath, snapshotExists);
330330
if (!snapshotExists) {
331-
sketchBasedJoinEstimator.rebuild();
331+
sketchBasedJoinEstimator.discardAndMarkForRebuild();
332332
}
333333
sketchBasedJoinEstimator.startBackgroundRefresh(3);
334334
}
@@ -401,10 +401,12 @@ public void close() throws SailException {
401401
try {
402402
try {
403403
cancelAndDrainScheduledEstimatorPersist();
404-
persistEstimatorState();
405404
if (sketchBasedJoinEstimator != null) {
406405
sketchBasedJoinEstimator.close();
407406
}
407+
if (filterSelectivityStats != null) {
408+
filterSelectivityStats.persistIfDirty();
409+
}
408410
} finally {
409411
try {
410412
if (namespaceStore != null) {

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbStore.java

Lines changed: 158 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import java.nio.file.Files;
1616
import java.nio.file.Path;
1717
import java.util.Comparator;
18+
import java.util.Optional;
19+
import java.util.concurrent.TimeUnit;
1820
import java.util.concurrent.locks.ReentrantLock;
1921
import java.util.function.Supplier;
2022
import java.util.stream.Stream;
@@ -27,17 +29,22 @@
2729
import org.eclipse.rdf4j.common.transaction.IsolationLevel;
2830
import org.eclipse.rdf4j.common.transaction.IsolationLevels;
2931
import org.eclipse.rdf4j.model.ValueFactory;
32+
import org.eclipse.rdf4j.query.Dataset;
3033
import org.eclipse.rdf4j.query.algebra.evaluation.EvaluationStrategy;
3134
import org.eclipse.rdf4j.query.algebra.evaluation.EvaluationStrategyFactory;
35+
import org.eclipse.rdf4j.query.algebra.evaluation.QueryOptimizerPipeline;
36+
import org.eclipse.rdf4j.query.algebra.evaluation.TripleSource;
3237
import org.eclipse.rdf4j.query.algebra.evaluation.federation.FederatedServiceResolver;
3338
import org.eclipse.rdf4j.query.algebra.evaluation.federation.FederatedServiceResolverClient;
3439
import org.eclipse.rdf4j.query.algebra.evaluation.impl.DefaultEvaluationStrategyFactory;
40+
import org.eclipse.rdf4j.query.algebra.evaluation.impl.EvaluationStatistics;
3541
import org.eclipse.rdf4j.repository.sparql.federation.SPARQLServiceResolver;
3642
import org.eclipse.rdf4j.sail.InterruptedSailException;
3743
import org.eclipse.rdf4j.sail.NotifyingSailConnection;
3844
import org.eclipse.rdf4j.sail.SailException;
3945
import org.eclipse.rdf4j.sail.base.SailSource;
4046
import org.eclipse.rdf4j.sail.base.SailStore;
47+
import org.eclipse.rdf4j.sail.base.SketchBasedJoinEstimator;
4148
import org.eclipse.rdf4j.sail.base.SnapshotSailStore;
4249
import org.eclipse.rdf4j.sail.helpers.AbstractNotifyingSail;
4350
import org.eclipse.rdf4j.sail.helpers.DirectoryLockManager;
@@ -84,7 +91,15 @@ public class LmdbStore extends AbstractNotifyingSail implements FederatedService
8491
*/
8592
private volatile Lock dirLock;
8693

87-
private EvaluationStrategyFactory evalStratFactory;
94+
private EvaluationStrategyFactory explicitEvalStratFactory;
95+
96+
private DefaultEvaluationStrategyFactory defaultEvalStratFactory;
97+
98+
private LmdbEvaluationStrategyFactory lmdbEvalStratFactory;
99+
100+
private EvaluationStrategyFactory connectionEvalStratFactory;
101+
102+
private QueryOptimizerPipeline automaticOptimizerPipeline;
88103

89104
/**
90105
* independent life cycle
@@ -167,13 +182,16 @@ public void setDataDir(File dataDir) {
167182
* @return Returns the {@link EvaluationStrategy}.
168183
*/
169184
public synchronized EvaluationStrategyFactory getEvaluationStrategyFactory() {
170-
if (evalStratFactory == null) {
171-
evalStratFactory = new LmdbEvaluationStrategyFactory(getFederatedServiceResolver());
185+
EvaluationStrategyFactory factory;
186+
if (explicitEvalStratFactory != null) {
187+
factory = explicitEvalStratFactory;
188+
} else if (isSketchEstimatorReadyNonBlocking()) {
189+
factory = getAutomaticLmdbEvaluationStrategyFactory();
190+
} else {
191+
factory = getAutomaticDefaultEvaluationStrategyFactory();
172192
}
173-
evalStratFactory.setQuerySolutionCacheThreshold(getIterationCacheSyncThreshold());
174-
evalStratFactory.setTrackResultSize(isTrackResultSize());
175-
evalStratFactory.setCollectionFactory(getCollectionFactory());
176-
return evalStratFactory;
193+
configureEvaluationStrategyFactory(factory);
194+
return factory;
177195
}
178196

179197
public boolean getPageCardinalityEstimator() {
@@ -184,7 +202,10 @@ public boolean getPageCardinalityEstimator() {
184202
* Sets the {@link EvaluationStrategy} to use.
185203
*/
186204
public synchronized void setEvaluationStrategyFactory(EvaluationStrategyFactory factory) {
187-
evalStratFactory = factory;
205+
explicitEvalStratFactory = factory;
206+
if (factory != null) {
207+
configureEvaluationStrategyFactory(factory);
208+
}
188209
}
189210

190211
/**
@@ -209,8 +230,14 @@ public synchronized FederatedServiceResolver getFederatedServiceResolver() {
209230
@Override
210231
public synchronized void setFederatedServiceResolver(FederatedServiceResolver resolver) {
211232
this.serviceResolver = resolver;
212-
if (resolver != null && evalStratFactory instanceof FederatedServiceResolverClient) {
213-
((FederatedServiceResolverClient) evalStratFactory).setFederatedServiceResolver(resolver);
233+
if (resolver != null && explicitEvalStratFactory instanceof FederatedServiceResolverClient) {
234+
((FederatedServiceResolverClient) explicitEvalStratFactory).setFederatedServiceResolver(resolver);
235+
}
236+
if (resolver != null && defaultEvalStratFactory != null) {
237+
defaultEvalStratFactory.setFederatedServiceResolver(resolver);
238+
}
239+
if (resolver != null && lmdbEvalStratFactory != null) {
240+
lmdbEvalStratFactory.setFederatedServiceResolver(resolver);
214241
}
215242
}
216243

@@ -265,7 +292,7 @@ protected void initializeInternal() throws SailException {
265292
properties.setVersion(String.valueOf(VERSION));
266293
}
267294

268-
boolean useSketchBasedJoinEstimator = usesLmdbEvaluationStrategyFactory();
295+
boolean useSketchBasedJoinEstimator = usesAutomaticEvaluationStrategyFactory();
269296
backingStore = new LmdbSailStore(dataDir, properties, config, useSketchBasedJoinEstimator);
270297

271298
// update version afer loading and potential internal migration within value and triple store
@@ -423,8 +450,126 @@ LmdbSailStore getBackingStore() {
423450
return backingStore;
424451
}
425452

426-
private boolean usesLmdbEvaluationStrategyFactory() {
427-
return getEvaluationStrategyFactory() instanceof LmdbEvaluationStrategyFactory;
453+
EvaluationStrategyFactory getConnectionEvaluationStrategyFactory() {
454+
EvaluationStrategyFactory factory = connectionEvalStratFactory;
455+
if (factory == null) {
456+
synchronized (this) {
457+
factory = connectionEvalStratFactory;
458+
if (factory == null) {
459+
factory = new AdaptiveEvaluationStrategyFactory();
460+
connectionEvalStratFactory = factory;
461+
}
462+
}
463+
}
464+
return factory;
465+
}
466+
467+
public boolean awaitSketchesReady(long timeout, TimeUnit unit) throws InterruptedException {
468+
SketchBasedJoinEstimator estimator = getSketchBasedJoinEstimator();
469+
return estimator != null && estimator.awaitReady(timeout, unit);
470+
}
471+
472+
private boolean usesAutomaticEvaluationStrategyFactory() {
473+
return explicitEvalStratFactory == null;
474+
}
475+
476+
private boolean isSketchEstimatorReadyNonBlocking() {
477+
SketchBasedJoinEstimator estimator = getSketchBasedJoinEstimator();
478+
return estimator != null && estimator.isReadyNonBlocking();
479+
}
480+
481+
private SketchBasedJoinEstimator getSketchBasedJoinEstimator() {
482+
LmdbSailStore backingStore = this.backingStore;
483+
return backingStore == null ? null : backingStore.getSketchBasedJoinEstimator();
484+
}
485+
486+
private DefaultEvaluationStrategyFactory getAutomaticDefaultEvaluationStrategyFactory() {
487+
if (defaultEvalStratFactory == null) {
488+
defaultEvalStratFactory = new DefaultEvaluationStrategyFactory(getFederatedServiceResolver());
489+
defaultEvalStratFactory.setOptimizerPipeline(automaticOptimizerPipeline);
490+
}
491+
return defaultEvalStratFactory;
492+
}
493+
494+
private LmdbEvaluationStrategyFactory getAutomaticLmdbEvaluationStrategyFactory() {
495+
if (lmdbEvalStratFactory == null) {
496+
lmdbEvalStratFactory = new LmdbEvaluationStrategyFactory(getFederatedServiceResolver());
497+
lmdbEvalStratFactory.setOptimizerPipeline(automaticOptimizerPipeline);
498+
}
499+
return lmdbEvalStratFactory;
500+
}
501+
502+
private void configureEvaluationStrategyFactory(EvaluationStrategyFactory factory) {
503+
factory.setQuerySolutionCacheThreshold(getIterationCacheSyncThreshold());
504+
factory.setTrackResultSize(isTrackResultSize());
505+
factory.setCollectionFactory(getCollectionFactory());
506+
if (factory instanceof FederatedServiceResolverClient) {
507+
((FederatedServiceResolverClient) factory).setFederatedServiceResolver(getFederatedServiceResolver());
508+
}
509+
}
510+
511+
private final class AdaptiveEvaluationStrategyFactory
512+
implements EvaluationStrategyFactory, FederatedServiceResolverClient {
513+
514+
@Override
515+
public void setQuerySolutionCacheThreshold(long threshold) {
516+
getEvaluationStrategyFactory().setQuerySolutionCacheThreshold(threshold);
517+
}
518+
519+
@Override
520+
public long getQuerySolutionCacheThreshold() {
521+
return getEvaluationStrategyFactory().getQuerySolutionCacheThreshold();
522+
}
523+
524+
@Override
525+
public void setOptimizerPipeline(QueryOptimizerPipeline pipeline) {
526+
automaticOptimizerPipeline = pipeline;
527+
if (explicitEvalStratFactory != null) {
528+
explicitEvalStratFactory.setOptimizerPipeline(pipeline);
529+
}
530+
if (defaultEvalStratFactory != null) {
531+
defaultEvalStratFactory.setOptimizerPipeline(pipeline);
532+
}
533+
if (lmdbEvalStratFactory != null) {
534+
lmdbEvalStratFactory.setOptimizerPipeline(pipeline);
535+
}
536+
}
537+
538+
@Override
539+
public Optional<QueryOptimizerPipeline> getOptimizerPipeline() {
540+
return getEvaluationStrategyFactory().getOptimizerPipeline();
541+
}
542+
543+
@Override
544+
public EvaluationStrategy createEvaluationStrategy(Dataset dataset, TripleSource tripleSource,
545+
EvaluationStatistics evaluationStatistics) {
546+
return getEvaluationStrategyFactory().createEvaluationStrategy(dataset, tripleSource, evaluationStatistics);
547+
}
548+
549+
@Override
550+
public boolean isTrackResultSize() {
551+
return getEvaluationStrategyFactory().isTrackResultSize();
552+
}
553+
554+
@Override
555+
public void setTrackResultSize(boolean trackResultSize) {
556+
getEvaluationStrategyFactory().setTrackResultSize(trackResultSize);
557+
}
558+
559+
@Override
560+
public void setCollectionFactory(Supplier<CollectionFactory> collectionFactory) {
561+
getEvaluationStrategyFactory().setCollectionFactory(collectionFactory);
562+
}
563+
564+
@Override
565+
public void setFederatedServiceResolver(FederatedServiceResolver resolver) {
566+
LmdbStore.this.setFederatedServiceResolver(resolver);
567+
}
568+
569+
@Override
570+
public FederatedServiceResolver getFederatedServiceResolver() {
571+
return LmdbStore.this.getFederatedServiceResolver();
572+
}
428573
}
429574

430575
private boolean upgradeStore(File dataDir, String version) throws SailException {

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbStoreConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public class LmdbStoreConnection extends SailSourceConnection {
5454
*--------------*/
5555

5656
protected LmdbStoreConnection(LmdbStore sail) {
57-
super(sail, sail.getSailStore(), sail.getEvaluationStrategyFactory());
57+
super(sail, sail.getSailStore(), sail.getConnectionEvaluationStrategyFactory());
5858
this.lmdbStore = sail;
5959
sailChangedEvent = new DefaultSailChangedEvent(sail);
6060
}

0 commit comments

Comments
 (0)