Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 176 additions & 24 deletions core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TxnManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,27 @@
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
// Some portions generated by Codex
package org.eclipse.rdf4j.sail.lmdb;

import static org.eclipse.rdf4j.sail.lmdb.LmdbUtil.E;
import static org.lwjgl.system.MemoryStack.stackPush;
import static org.lwjgl.system.MemoryUtil.NULL;
import static org.lwjgl.util.lmdb.LMDB.MDB_RDONLY;
import static org.lwjgl.util.lmdb.LMDB.MDB_READERS_FULL;
import static org.lwjgl.util.lmdb.LMDB.MDB_SUCCESS;
import static org.lwjgl.util.lmdb.LMDB.mdb_reader_check;
import static org.lwjgl.util.lmdb.LMDB.mdb_txn_abort;
import static org.lwjgl.util.lmdb.LMDB.mdb_txn_begin;
import static org.lwjgl.util.lmdb.LMDB.mdb_txn_renew;
import static org.lwjgl.util.lmdb.LMDB.mdb_txn_reset;

import java.io.Closeable;
import java.io.IOException;
import java.nio.IntBuffer;
import java.util.ArrayList;
import java.util.IdentityHashMap;
import java.util.List;

import org.eclipse.rdf4j.common.concurrent.locks.StampedLongAdderLockManager;
import org.eclipse.rdf4j.sail.SailException;
Expand All @@ -34,6 +41,9 @@
*/
class TxnManager {

private static final int READERS_FULL_RETRIES = 500;
private static final long READERS_FULL_WAIT_MILLIS = 10L;

private final Mode mode;
private final IdentityHashMap<Txn, Boolean> active = new IdentityHashMap<>();
private final long[] pool;
Expand All @@ -51,12 +61,66 @@ private long startReadTxn() throws IOException {
long readTxn;
try (MemoryStack stack = stackPush()) {
PointerBuffer pp = stack.mallocPointer(1);
E(mdb_txn_begin(env, NULL, MDB_RDONLY, pp));
int rc = mdb_txn_begin(env, NULL, MDB_RDONLY, pp);
if (rc == MDB_READERS_FULL) {
rc = retryStartReadTxn(stack, pp);
}
E(rc);
readTxn = pp.get(0);
}
return readTxn;
}

private int retryStartReadTxn(MemoryStack stack, PointerBuffer pp) throws IOException {
int rc = MDB_READERS_FULL;
for (int i = 0; i < READERS_FULL_RETRIES && rc == MDB_READERS_FULL; i++) {
closePooledReaders();
checkForDeadReaders(stack);
waitForTrackedReaderToClose(null);
rc = mdb_txn_begin(env, NULL, MDB_RDONLY, pp);
}
return rc;
}

private void checkForDeadReaders(MemoryStack stack) throws IOException {
IntBuffer dead = stack.mallocInt(1);
E(mdb_reader_check(env, dead));
}

private void closePooledReaders() {
if (mode == Mode.RESET) {
synchronized (pool) {
while (poolIndex >= 0) {
long txn = pool[poolIndex];
pool[poolIndex--] = 0;
mdb_txn_abort(txn);
}
}
}
}

private void waitForTrackedReaderToClose(Txn excludedTxn) throws IOException {
synchronized (active) {
if (hasTrackedReaders(excludedTxn)) {
try {
active.wait(READERS_FULL_WAIT_MILLIS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
}
}
}
}

private boolean hasTrackedReaders(Txn excludedTxn) {
for (Txn txn : active.keySet()) {
if (txn != excludedTxn) {
return true;
}
}
return false;
}

/**
* Wraps an existing transaction into a txn reference object.
*
Expand Down Expand Up @@ -97,7 +161,12 @@ long createReadTxnInternal() throws IOException {
if (txn == 0) {
txn = startReadTxn();
} else {
mdb_txn_renew(txn);
try {
renewReadTxn(txn, null);
} catch (IOException e) {
mdb_txn_abort(txn);
throw e;
}
}
} else {
txn = startReadTxn();
Expand Down Expand Up @@ -131,25 +200,36 @@ StampedLongAdderLockManager lockManager() {
}

void activate() throws IOException {
synchronized (active) {
for (Txn txn : active.keySet()) {
txn.setActive(true);
}
for (Txn txn : activeTransactions()) {
txn.setActive(true);
}
}

void deactivate() throws IOException {
synchronized (active) {
for (Txn txn : active.keySet()) {
txn.setActive(false);
}
for (Txn txn : activeTransactions()) {
txn.setActive(false);
}
}

void reset() throws IOException {
for (Txn txn : activeTransactions()) {
txn.reset();
}
}

private List<Txn> activeTransactions() {
synchronized (active) {
for (Txn txn : active.keySet()) {
txn.reset();
return new ArrayList<>(active.keySet());
}
}

private void updateActiveState(Txn txn, boolean isActive) {
synchronized (active) {
if (active.containsKey(txn)) {
active.put(txn, isActive);
}
if (!isActive) {
active.notifyAll();
}
}
}
Expand All @@ -162,8 +242,10 @@ enum Mode {

class Txn implements Closeable, AutoCloseable {

private final long txn;
private long txn;
private long version;
private boolean txnActive = true;
private boolean closed;

Txn(long txn) {
this.txn = txn;
Expand All @@ -177,12 +259,18 @@ StampedLongAdderLockManager lockManager() {
return lockManager;
}

private void free(long txn) {
private void free(boolean resetTxn) {
if (txn == 0) {
return;
}

switch (mode) {
case RESET:
synchronized (pool) {
if (poolIndex < pool.length - 1) {
mdb_txn_reset(txn);
if (resetTxn) {
mdb_txn_reset(txn);
}
pool[++poolIndex] = txn;
} else {
mdb_txn_abort(txn);
Expand All @@ -195,39 +283,103 @@ private void free(long txn) {
case NONE:
break;
}
txn = 0;
}

@Override
public void close() {
synchronized (active) {
active.remove(this);
public synchronized void close() {
if (closed) {
return;
}
closed = true;
synchronized (TxnManager.this.active) {
TxnManager.this.active.remove(this);
}
try {
free(txnActive);
} finally {
synchronized (TxnManager.this.active) {
TxnManager.this.active.notifyAll();
}
}
free(txn);
}

/**
* Resets current transaction as it points to "old" data.
*/
void reset() throws IOException {
mdb_txn_reset(txn);
E(mdb_txn_renew(txn));
synchronized void reset() throws IOException {
if (closed) {
return;
}
if (txnActive) {
mdb_txn_reset(txn);
txnActive = false;
updateActiveState(this, false);
activate();
}
version++;
}

/**
* Triggers active state of current transaction.
*/
void setActive(boolean active) throws IOException {
synchronized void setActive(boolean active) throws IOException {
if (closed) {
return;
}
if (active) {
E(mdb_txn_renew(txn));
activate();
version++;
} else {
deactivate();
}
}

private void activate() throws IOException {
if (!txnActive) {
if (txn == 0) {
txn = startReadTxn();
} else {
renewReadTxn(txn, this);
}
txnActive = true;
updateActiveState(this, true);
}
}

private void deactivate() {
if (txnActive && txn != 0) {
mdb_txn_reset(txn);
}
txnActive = false;
updateActiveState(this, false);
}

long version() {
return version;
}
}

private void renewReadTxn(long txn, Txn excludedTxn) throws IOException {
int rc = mdb_txn_renew(txn);
if (rc == MDB_READERS_FULL) {
rc = retryRenewReadTxn(txn, excludedTxn);
}
if (rc != MDB_SUCCESS) {
E(rc);
}
}

private int retryRenewReadTxn(long txn, Txn excludedTxn) throws IOException {
int rc = MDB_READERS_FULL;
try (MemoryStack stack = stackPush()) {
for (int i = 0; i < READERS_FULL_RETRIES && rc == MDB_READERS_FULL; i++) {
closePooledReaders();
checkForDeadReaders(stack);
waitForTrackedReaderToClose(excludedTxn);
rc = mdb_txn_renew(txn);
}
}
return rc;
}
}
Loading
Loading