Skip to content

Commit e7aa496

Browse files
committed
fixes
1 parent 516ba51 commit e7aa496

3 files changed

Lines changed: 240 additions & 42 deletions

File tree

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbFilterSelectivityStats.java

Lines changed: 93 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.eclipse.rdf4j.query.algebra.FunctionCall;
4646
import org.eclipse.rdf4j.query.algebra.ListMemberOperator;
4747
import org.eclipse.rdf4j.query.algebra.StatementPattern;
48+
import org.eclipse.rdf4j.query.algebra.ValueExpr;
4849
import org.eclipse.rdf4j.query.algebra.Var;
4950
import org.eclipse.rdf4j.query.algebra.evaluation.QueryBindingSet;
5051
import org.eclipse.rdf4j.query.algebra.evaluation.QueryValueEvaluationStep;
@@ -271,7 +272,7 @@ public SketchBasedJoinEstimator.PatternFilterSampleEstimate estimateFilterPass(F
271272
return new SketchBasedJoinEstimator.PatternFilterSampleEstimate(-1.0d, -1L);
272273
}
273274

274-
SampledPassRatio sampled = sampleFilterPassRatio(filter, pattern, candidate);
275+
SampledPassRatio sampled = sampleFilterPassRatio(candidate, true, 0L);
275276
if (!isUsableSampledPassRatio(sampled)) {
276277
return new SketchBasedJoinEstimator.PatternFilterSampleEstimate(-1.0d, -1L);
277278
}
@@ -300,6 +301,31 @@ synchronized List<BackgroundSamplingRequest> drainBackgroundSamplingRequests(int
300301
return drained;
301302
}
302303

304+
int runBackgroundSamplingCycle(long maxMillis) {
305+
if (!backgroundRawSamplingEnabled || maxMillis <= 0L) {
306+
return 0;
307+
}
308+
long deadlineNanos = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(maxMillis);
309+
int sampledCount = 0;
310+
while (!samplingDeadlineExceeded(deadlineNanos)) {
311+
List<BackgroundSamplingRequest> requests = drainBackgroundSamplingRequests(1);
312+
if (requests.isEmpty()) {
313+
break;
314+
}
315+
SampledPassRatio sampled = sampleFilterPassRatio(requests.getFirst().toSamplingCandidate(), false,
316+
deadlineNanos);
317+
if (!isUsableSampledPassRatio(sampled)) {
318+
continue;
319+
}
320+
synchronized (this) {
321+
sampledByFilter.put(requests.getFirst().key, sampled);
322+
dirty = true;
323+
}
324+
sampledCount++;
325+
}
326+
return sampledCount;
327+
}
328+
303329
synchronized void persistIfDirty() {
304330
if (!dirty) {
305331
return;
@@ -449,32 +475,35 @@ private SamplingCandidate samplingCandidate(Filter filter, StatementPattern patt
449475

450476
double expectedRuntimeRows = expectedRuntimeRows(subjId, predId, objId, contextId);
451477
double expectedBenefitRows = expectedBenefitRows(filter, expectedRuntimeRows);
452-
return new SamplingCandidate(key, subjId, predId, objId, contextId, expectedRuntimeRows, expectedBenefitRows);
478+
return new SamplingCandidate(key, pattern.clone(), filter.getCondition().clone(), subjId, predId, objId,
479+
contextId, expectedRuntimeRows, expectedBenefitRows);
453480
}
454481

455-
private SampledPassRatio sampleFilterPassRatio(Filter filter, StatementPattern pattern,
456-
SamplingCandidate candidate) {
482+
private SampledPassRatio sampleFilterPassRatio(SamplingCandidate candidate, boolean foregroundSampling,
483+
long deadlineNanos) {
457484
if (candidate == null) {
458485
return null;
459486
}
460487

461-
int scanBudget = optimizerSamplingRowBudget(candidate.expectedRuntimeRows, candidate.expectedBenefitRows,
462-
1.0d);
488+
int scanBudget = samplingRowBudget(candidate.expectedRuntimeRows, candidate.expectedBenefitRows, 1.0d,
489+
foregroundSampling);
463490
if (scanBudget <= 0) {
464491
return null;
465492
}
466493

467494
DefaultEvaluationStrategy strategy = samplingStrategy();
468-
QueryValueEvaluationStep condition = prepareCondition(strategy, filter);
495+
QueryValueEvaluationStep condition = prepareCondition(strategy, candidate.condition);
469496
if (condition == null) {
470497
return null;
471498
}
499+
if (foregroundSampling) {
500+
deadlineNanos = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(optimizerSamplingMaxMillis);
501+
}
472502

473503
int reservoirSize = Math.min(SAMPLE_RESERVOIR_SIZE, scanBudget);
474504
List<BindingSet> samples = new ArrayList<>(reservoirSize);
475505
int eligibleRows = 0;
476506
int scannedRows = 0;
477-
long deadlineNanos = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(optimizerSamplingMaxMillis);
478507

479508
try (Txn txn = tripleStore.getTxnManager().createReadTxn()) {
480509
for (boolean explicit : new boolean[] { true, false }) {
@@ -484,10 +513,10 @@ private SampledPassRatio sampleFilterPassRatio(Filter filter, StatementPattern p
484513
while (scannedRows < scanBudget && !samplingDeadlineExceeded(deadlineNanos)
485514
&& (row = triples.next()) != null) {
486515
scannedRows++;
487-
if (!matchesRepeatedVarEquality(pattern, row)) {
516+
if (!matchesRepeatedVarEquality(candidate.pattern, row)) {
488517
continue;
489518
}
490-
BindingSet bindingSet = toBindingSet(pattern, row);
519+
BindingSet bindingSet = toBindingSet(candidate.pattern, row);
491520
eligibleRows++;
492521
if (samples.size() < reservoirSize) {
493522
samples.add(bindingSet);
@@ -540,11 +569,10 @@ private synchronized void voteBackgroundSamplingRequest(SamplingCandidate candid
540569

541570
BackgroundSamplingRequest request = backgroundSamplingRequests.get(candidate.key);
542571
if (request == null) {
543-
request = new BackgroundSamplingRequest(candidate.key);
572+
request = new BackgroundSamplingRequest(candidate);
544573
backgroundSamplingRequests.put(candidate.key, request);
545574
}
546-
request.vote(candidate.expectedRuntimeRows, candidate.expectedBenefitRows, foregroundNeeded,
547-
++backgroundSamplingSequence);
575+
request.vote(candidate, foregroundNeeded, ++backgroundSamplingSequence);
548576
trimBackgroundSamplingRequests();
549577
}
550578

@@ -565,7 +593,13 @@ private void trimBackgroundSamplingRequests() {
565593

566594
int optimizerSamplingRowBudget(double expectedRuntimeRows, double expectedBenefitRows,
567595
double decisionUncertainty) {
568-
if (!optimizerSamplingEnabled || optimizerSamplingMaxMillis <= 0L || optimizerSamplingMaxRows <= 0
596+
return samplingRowBudget(expectedRuntimeRows, expectedBenefitRows, decisionUncertainty, true);
597+
}
598+
599+
private int samplingRowBudget(double expectedRuntimeRows, double expectedBenefitRows, double decisionUncertainty,
600+
boolean foregroundSampling) {
601+
if ((foregroundSampling && (!optimizerSamplingEnabled || optimizerSamplingMaxMillis <= 0L))
602+
|| optimizerSamplingMaxRows <= 0
569603
|| !Double.isFinite(expectedRuntimeRows) || !Double.isFinite(expectedBenefitRows)
570604
|| expectedRuntimeRows <= 0.0d || expectedBenefitRows <= 0.0d) {
571605
return 0;
@@ -678,10 +712,9 @@ private static double clamp(double value, double min, double max) {
678712
return Math.max(min, Math.min(max, value));
679713
}
680714

681-
private QueryValueEvaluationStep prepareCondition(DefaultEvaluationStrategy strategy, Filter filter) {
715+
private QueryValueEvaluationStep prepareCondition(DefaultEvaluationStrategy strategy, ValueExpr condition) {
682716
try {
683-
return strategy.precompile(filter.getCondition(),
684-
new QueryEvaluationContext.Minimal(null, valueStore, null));
717+
return strategy.precompile(condition, new QueryEvaluationContext.Minimal(null, valueStore, null));
685718
} catch (RuntimeException e) {
686719
return null;
687720
}
@@ -837,44 +870,76 @@ private static String readString(DataInputStream in) throws IOException {
837870
return new String(bytes, java.nio.charset.StandardCharsets.UTF_8);
838871
}
839872

840-
private record SamplingCandidate(PatternFilterKey key, long subjId, long predId, long objId, long contextId,
841-
double expectedRuntimeRows, double expectedBenefitRows) {
873+
private record SamplingCandidate(PatternFilterKey key, StatementPattern pattern, ValueExpr condition, long subjId,
874+
long predId, long objId, long contextId, double expectedRuntimeRows, double expectedBenefitRows) {
842875
}
843876

844877
static final class BackgroundSamplingRequest {
845878
private final PatternFilterKey key;
879+
private StatementPattern pattern;
880+
private ValueExpr condition;
881+
private long subjId;
882+
private long predId;
883+
private long objId;
884+
private long contextId;
846885
private long voteCount;
847886
private double expectedRuntimeRows;
848887
private double expectedBenefitRows;
849888
private boolean foregroundNeeded;
850889
private long lastRequestedSequence;
851890

852-
private BackgroundSamplingRequest(PatternFilterKey key) {
853-
this.key = Objects.requireNonNull(key, "key");
891+
private BackgroundSamplingRequest(SamplingCandidate candidate) {
892+
this.key = Objects.requireNonNull(candidate.key, "key");
893+
refreshSamplingPayload(candidate);
854894
}
855895

856-
private BackgroundSamplingRequest(PatternFilterKey key, long voteCount, double expectedRuntimeRows,
896+
private BackgroundSamplingRequest(PatternFilterKey key, StatementPattern pattern, ValueExpr condition,
897+
long subjId, long predId, long objId, long contextId, long voteCount, double expectedRuntimeRows,
857898
double expectedBenefitRows, boolean foregroundNeeded, long lastRequestedSequence) {
858899
this.key = Objects.requireNonNull(key, "key");
900+
this.pattern = Objects.requireNonNull(pattern, "pattern");
901+
this.condition = Objects.requireNonNull(condition, "condition");
902+
this.subjId = subjId;
903+
this.predId = predId;
904+
this.objId = objId;
905+
this.contextId = contextId;
859906
this.voteCount = voteCount;
860907
this.expectedRuntimeRows = expectedRuntimeRows;
861908
this.expectedBenefitRows = expectedBenefitRows;
862909
this.foregroundNeeded = foregroundNeeded;
863910
this.lastRequestedSequence = lastRequestedSequence;
864911
}
865912

866-
private void vote(double expectedRuntimeRows, double expectedBenefitRows, boolean foregroundNeeded,
867-
long sequence) {
913+
private void vote(SamplingCandidate candidate, boolean foregroundNeeded, long sequence) {
914+
boolean refreshPayload = candidate.expectedBenefitRows > expectedBenefitRows;
868915
voteCount++;
869-
this.expectedRuntimeRows = Math.max(this.expectedRuntimeRows, expectedRuntimeRows);
870-
this.expectedBenefitRows = Math.max(this.expectedBenefitRows, expectedBenefitRows);
916+
this.expectedRuntimeRows = Math.max(this.expectedRuntimeRows, candidate.expectedRuntimeRows);
917+
this.expectedBenefitRows = Math.max(this.expectedBenefitRows, candidate.expectedBenefitRows);
918+
if (refreshPayload) {
919+
refreshSamplingPayload(candidate);
920+
}
871921
this.foregroundNeeded |= foregroundNeeded;
872922
lastRequestedSequence = sequence;
873923
}
874924

875925
private BackgroundSamplingRequest snapshot() {
876-
return new BackgroundSamplingRequest(key, voteCount, expectedRuntimeRows, expectedBenefitRows,
877-
foregroundNeeded, lastRequestedSequence);
926+
return new BackgroundSamplingRequest(key, pattern.clone(), condition.clone(), subjId, predId, objId,
927+
contextId, voteCount, expectedRuntimeRows, expectedBenefitRows, foregroundNeeded,
928+
lastRequestedSequence);
929+
}
930+
931+
private SamplingCandidate toSamplingCandidate() {
932+
return new SamplingCandidate(key, pattern.clone(), condition.clone(), subjId, predId, objId, contextId,
933+
expectedRuntimeRows, expectedBenefitRows);
934+
}
935+
936+
private void refreshSamplingPayload(SamplingCandidate candidate) {
937+
this.pattern = candidate.pattern.clone();
938+
this.condition = candidate.condition.clone();
939+
this.subjId = candidate.subjId;
940+
this.predId = candidate.predId;
941+
this.objId = candidate.objId;
942+
this.contextId = candidate.contextId;
878943
}
879944

880945
private static int compareForQueue(BackgroundSamplingRequest left, BackgroundSamplingRequest right) {

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStore.java

Lines changed: 71 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,9 @@ static ExecutorService createTripleStoreExecutor() {
104104
});
105105
private final AtomicBoolean persistScheduled = new AtomicBoolean(false);
106106
private volatile ScheduledFuture<?> persistFuture;
107+
private volatile ScheduledFuture<?> backgroundSamplingFuture;
107108
private final long estimatorPersistDelayMillis = 1000L;
109+
private final long backgroundRawSamplingMaxMillisPerCycle;
108110

109111
/**
110112
* A fast non-blocking circular buffer backed by an array.
@@ -296,6 +298,7 @@ public LmdbSailStore(File dataDir, StoreProperties properties, LmdbStoreConfig c
296298
throws IOException, SailException {
297299
this.setFactory = new PersistentSetFactory<>(dataDir);
298300
this.bulkOperationSize = config.getBulkOperationSize();
301+
this.backgroundRawSamplingMaxMillisPerCycle = config.getBackgroundRawSamplingMaxMillisPerCycle();
299302
this.sketchBasedJoinEstimator = sketchBasedJoinEstimatorEnabled
300303
? new SketchBasedJoinEstimator(this, sketchEstimatorConfig(config))
301304
: null;
@@ -331,6 +334,7 @@ public LmdbSailStore(File dataDir, StoreProperties properties, LmdbStoreConfig c
331334
sketchBasedJoinEstimator.discardAndMarkForRebuild();
332335
}
333336
sketchBasedJoinEstimator.startBackgroundRefresh(3);
337+
startBackgroundFilterSampling();
334338
}
335339
} finally {
336340
if (!initialized) {
@@ -400,6 +404,7 @@ void rollback() throws SailException {
400404
public void close() throws SailException {
401405
try {
402406
try {
407+
cancelAndDrainScheduledBackgroundSampling();
403408
cancelAndDrainScheduledEstimatorPersist();
404409
if (sketchBasedJoinEstimator != null) {
405410
sketchBasedJoinEstimator.close();
@@ -459,6 +464,26 @@ public void close() throws SailException {
459464
}
460465
}
461466

467+
private void cancelAndDrainScheduledBackgroundSampling() {
468+
ScheduledFuture<?> future = backgroundSamplingFuture;
469+
if (future == null) {
470+
return;
471+
}
472+
if (future.cancel(false)) {
473+
return;
474+
}
475+
try {
476+
future.get();
477+
} catch (CancellationException e) {
478+
// Already cancelled by the scheduler or another close path.
479+
} catch (InterruptedException e) {
480+
Thread.currentThread().interrupt();
481+
throw new InterruptedSailException(e);
482+
} catch (ExecutionException e) {
483+
logger.warn("Scheduled background filter sampling failed during close", e.getCause());
484+
}
485+
}
486+
462487
private void cancelAndDrainScheduledEstimatorPersist() {
463488
ScheduledFuture<?> future = persistFuture;
464489
if (future == null) {
@@ -479,6 +504,40 @@ private void cancelAndDrainScheduledEstimatorPersist() {
479504
}
480505
}
481506

507+
private void startBackgroundFilterSampling() {
508+
if (filterSelectivityStats == null || backgroundRawSamplingMaxMillisPerCycle <= 0L) {
509+
return;
510+
}
511+
backgroundSamplingFuture = estimatorPersistExec.scheduleWithFixedDelay(() -> {
512+
try {
513+
runBackgroundFilterSamplingCycle(backgroundRawSamplingMaxMillisPerCycle);
514+
} catch (RuntimeException e) {
515+
logger.warn("Background filter sampling cycle failed", e);
516+
}
517+
}, estimatorPersistDelayMillis, estimatorPersistDelayMillis, TimeUnit.MILLISECONDS);
518+
}
519+
520+
int runBackgroundFilterSamplingCycle(long maxMillis) {
521+
if (filterSelectivityStats == null || maxMillis <= 0L || storeTxnStarted.get()) {
522+
return 0;
523+
}
524+
if (!sinkStoreAccessLock.tryLock()) {
525+
return 0;
526+
}
527+
try {
528+
if (storeTxnStarted.get()) {
529+
return 0;
530+
}
531+
int sampled = filterSelectivityStats.runBackgroundSamplingCycle(maxMillis);
532+
if (sampled > 0) {
533+
scheduleEstimatorPersist();
534+
}
535+
return sampled;
536+
} finally {
537+
sinkStoreAccessLock.unlock();
538+
}
539+
}
540+
482541
private void persistEstimatorState() {
483542
if (sketchBasedJoinEstimator != null) {
484543
sketchBasedJoinEstimator.persistIfDirty();
@@ -488,6 +547,18 @@ private void persistEstimatorState() {
488547
}
489548
}
490549

550+
private void scheduleEstimatorPersist() {
551+
if (persistScheduled.compareAndSet(false, true)) {
552+
persistFuture = estimatorPersistExec.schedule(() -> {
553+
try {
554+
LmdbSailStore.this.persistEstimatorState();
555+
} finally {
556+
persistScheduled.set(false);
557+
}
558+
}, estimatorPersistDelayMillis, TimeUnit.MILLISECONDS);
559+
}
560+
}
561+
491562
SailException wrapTripleStoreException() {
492563
return tripleStoreException instanceof SailException ? (SailException) tripleStoreException
493564
: new SailException(tripleStoreException);
@@ -706,18 +777,6 @@ private void recoverEstimatorAfterFailure(String action, RuntimeException e) {
706777
e);
707778
}
708779

709-
private void scheduleEstimatorPersist() {
710-
if (persistScheduled.compareAndSet(false, true)) {
711-
persistFuture = estimatorPersistExec.schedule(() -> {
712-
try {
713-
LmdbSailStore.this.persistEstimatorState();
714-
} finally {
715-
persistScheduled.set(false);
716-
}
717-
}, estimatorPersistDelayMillis, TimeUnit.MILLISECONDS);
718-
}
719-
}
720-
721780
@Override
722781
public void close() {
723782
// do nothing

0 commit comments

Comments
 (0)