88 *
99 * SPDX-License-Identifier: BSD-3-Clause
1010 *******************************************************************************/
11+ // Some portions generated by Codex
1112package org .eclipse .rdf4j .sail .lmdb ;
1213
1314import static org .eclipse .rdf4j .sail .lmdb .LmdbUtil .E ;
1415import static org .lwjgl .system .MemoryStack .stackPush ;
1516import static org .lwjgl .system .MemoryUtil .NULL ;
1617import static org .lwjgl .util .lmdb .LMDB .MDB_RDONLY ;
18+ import static org .lwjgl .util .lmdb .LMDB .MDB_READERS_FULL ;
19+ import static org .lwjgl .util .lmdb .LMDB .MDB_SUCCESS ;
20+ import static org .lwjgl .util .lmdb .LMDB .mdb_reader_check ;
1721import static org .lwjgl .util .lmdb .LMDB .mdb_txn_abort ;
1822import static org .lwjgl .util .lmdb .LMDB .mdb_txn_begin ;
1923import static org .lwjgl .util .lmdb .LMDB .mdb_txn_renew ;
2024import static org .lwjgl .util .lmdb .LMDB .mdb_txn_reset ;
2125
2226import java .io .Closeable ;
2327import java .io .IOException ;
28+ import java .nio .IntBuffer ;
29+ import java .util .ArrayList ;
2430import java .util .IdentityHashMap ;
31+ import java .util .List ;
2532
2633import org .eclipse .rdf4j .common .concurrent .locks .StampedLongAdderLockManager ;
2734import org .eclipse .rdf4j .sail .SailException ;
3441 */
3542class TxnManager {
3643
44+ private static final int READERS_FULL_RETRIES = 500 ;
45+ private static final long READERS_FULL_WAIT_MILLIS = 10L ;
46+
3747 private final Mode mode ;
3848 private final IdentityHashMap <Txn , Boolean > active = new IdentityHashMap <>();
3949 private final long [] pool ;
@@ -51,12 +61,66 @@ private long startReadTxn() throws IOException {
5161 long readTxn ;
5262 try (MemoryStack stack = stackPush ()) {
5363 PointerBuffer pp = stack .mallocPointer (1 );
54- E (mdb_txn_begin (env , NULL , MDB_RDONLY , pp ));
64+ int rc = mdb_txn_begin (env , NULL , MDB_RDONLY , pp );
65+ if (rc == MDB_READERS_FULL ) {
66+ rc = retryStartReadTxn (stack , pp );
67+ }
68+ E (rc );
5569 readTxn = pp .get (0 );
5670 }
5771 return readTxn ;
5872 }
5973
74+ private int retryStartReadTxn (MemoryStack stack , PointerBuffer pp ) throws IOException {
75+ int rc = MDB_READERS_FULL ;
76+ for (int i = 0 ; i < READERS_FULL_RETRIES && rc == MDB_READERS_FULL ; i ++) {
77+ closePooledReaders ();
78+ checkForDeadReaders (stack );
79+ waitForTrackedReaderToClose (null );
80+ rc = mdb_txn_begin (env , NULL , MDB_RDONLY , pp );
81+ }
82+ return rc ;
83+ }
84+
85+ private void checkForDeadReaders (MemoryStack stack ) throws IOException {
86+ IntBuffer dead = stack .mallocInt (1 );
87+ E (mdb_reader_check (env , dead ));
88+ }
89+
90+ private void closePooledReaders () {
91+ if (mode == Mode .RESET ) {
92+ synchronized (pool ) {
93+ while (poolIndex >= 0 ) {
94+ long txn = pool [poolIndex ];
95+ pool [poolIndex --] = 0 ;
96+ mdb_txn_abort (txn );
97+ }
98+ }
99+ }
100+ }
101+
102+ private void waitForTrackedReaderToClose (Txn excludedTxn ) throws IOException {
103+ synchronized (active ) {
104+ if (hasTrackedReaders (excludedTxn )) {
105+ try {
106+ active .wait (READERS_FULL_WAIT_MILLIS );
107+ } catch (InterruptedException e ) {
108+ Thread .currentThread ().interrupt ();
109+ throw new IOException (e );
110+ }
111+ }
112+ }
113+ }
114+
115+ private boolean hasTrackedReaders (Txn excludedTxn ) {
116+ for (Txn txn : active .keySet ()) {
117+ if (txn != excludedTxn ) {
118+ return true ;
119+ }
120+ }
121+ return false ;
122+ }
123+
60124 /**
61125 * Wraps an existing transaction into a txn reference object.
62126 *
@@ -97,7 +161,12 @@ long createReadTxnInternal() throws IOException {
97161 if (txn == 0 ) {
98162 txn = startReadTxn ();
99163 } else {
100- mdb_txn_renew (txn );
164+ try {
165+ renewReadTxn (txn , null );
166+ } catch (IOException e ) {
167+ mdb_txn_abort (txn );
168+ throw e ;
169+ }
101170 }
102171 } else {
103172 txn = startReadTxn ();
@@ -131,25 +200,36 @@ StampedLongAdderLockManager lockManager() {
131200 }
132201
133202 void activate () throws IOException {
134- synchronized (active ) {
135- for (Txn txn : active .keySet ()) {
136- txn .setActive (true );
137- }
203+ for (Txn txn : activeTransactions ()) {
204+ txn .setActive (true );
138205 }
139206 }
140207
141208 void deactivate () throws IOException {
142- synchronized (active ) {
143- for (Txn txn : active .keySet ()) {
144- txn .setActive (false );
145- }
209+ for (Txn txn : activeTransactions ()) {
210+ txn .setActive (false );
146211 }
147212 }
148213
149214 void reset () throws IOException {
215+ for (Txn txn : activeTransactions ()) {
216+ txn .reset ();
217+ }
218+ }
219+
220+ private List <Txn > activeTransactions () {
150221 synchronized (active ) {
151- for (Txn txn : active .keySet ()) {
152- txn .reset ();
222+ return new ArrayList <>(active .keySet ());
223+ }
224+ }
225+
226+ private void updateActiveState (Txn txn , boolean isActive ) {
227+ synchronized (active ) {
228+ if (active .containsKey (txn )) {
229+ active .put (txn , isActive );
230+ }
231+ if (!isActive ) {
232+ active .notifyAll ();
153233 }
154234 }
155235 }
@@ -162,8 +242,10 @@ enum Mode {
162242
163243 class Txn implements Closeable , AutoCloseable {
164244
165- private final long txn ;
245+ private long txn ;
166246 private long version ;
247+ private boolean txnActive = true ;
248+ private boolean closed ;
167249
168250 Txn (long txn ) {
169251 this .txn = txn ;
@@ -177,12 +259,18 @@ StampedLongAdderLockManager lockManager() {
177259 return lockManager ;
178260 }
179261
180- private void free (long txn ) {
262+ private void free (boolean resetTxn ) {
263+ if (txn == 0 ) {
264+ return ;
265+ }
266+
181267 switch (mode ) {
182268 case RESET :
183269 synchronized (pool ) {
184270 if (poolIndex < pool .length - 1 ) {
185- mdb_txn_reset (txn );
271+ if (resetTxn ) {
272+ mdb_txn_reset (txn );
273+ }
186274 pool [++poolIndex ] = txn ;
187275 } else {
188276 mdb_txn_abort (txn );
@@ -195,39 +283,103 @@ private void free(long txn) {
195283 case NONE :
196284 break ;
197285 }
286+ txn = 0 ;
198287 }
199288
200289 @ Override
201- public void close () {
202- synchronized (active ) {
203- active .remove (this );
290+ public synchronized void close () {
291+ if (closed ) {
292+ return ;
293+ }
294+ closed = true ;
295+ synchronized (TxnManager .this .active ) {
296+ TxnManager .this .active .remove (this );
297+ }
298+ try {
299+ free (txnActive );
300+ } finally {
301+ synchronized (TxnManager .this .active ) {
302+ TxnManager .this .active .notifyAll ();
303+ }
204304 }
205- free (txn );
206305 }
207306
208307 /**
209308 * Resets current transaction as it points to "old" data.
210309 */
211- void reset () throws IOException {
212- mdb_txn_reset (txn );
213- E (mdb_txn_renew (txn ));
310+ synchronized void reset () throws IOException {
311+ if (closed ) {
312+ return ;
313+ }
314+ if (txnActive ) {
315+ mdb_txn_reset (txn );
316+ txnActive = false ;
317+ updateActiveState (this , false );
318+ activate ();
319+ }
214320 version ++;
215321 }
216322
217323 /**
218324 * Triggers active state of current transaction.
219325 */
220- void setActive (boolean active ) throws IOException {
326+ synchronized void setActive (boolean active ) throws IOException {
327+ if (closed ) {
328+ return ;
329+ }
221330 if (active ) {
222- E ( mdb_txn_renew ( txn ) );
331+ activate ( );
223332 version ++;
224333 } else {
334+ deactivate ();
335+ }
336+ }
337+
338+ private void activate () throws IOException {
339+ if (!txnActive ) {
340+ if (txn == 0 ) {
341+ txn = startReadTxn ();
342+ } else {
343+ renewReadTxn (txn , this );
344+ }
345+ txnActive = true ;
346+ updateActiveState (this , true );
347+ }
348+ }
349+
350+ private void deactivate () {
351+ if (txnActive && txn != 0 ) {
225352 mdb_txn_reset (txn );
226353 }
354+ txnActive = false ;
355+ updateActiveState (this , false );
227356 }
228357
229358 long version () {
230359 return version ;
231360 }
232361 }
362+
363+ private void renewReadTxn (long txn , Txn excludedTxn ) throws IOException {
364+ int rc = mdb_txn_renew (txn );
365+ if (rc == MDB_READERS_FULL ) {
366+ rc = retryRenewReadTxn (txn , excludedTxn );
367+ }
368+ if (rc != MDB_SUCCESS ) {
369+ E (rc );
370+ }
371+ }
372+
373+ private int retryRenewReadTxn (long txn , Txn excludedTxn ) throws IOException {
374+ int rc = MDB_READERS_FULL ;
375+ try (MemoryStack stack = stackPush ()) {
376+ for (int i = 0 ; i < READERS_FULL_RETRIES && rc == MDB_READERS_FULL ; i ++) {
377+ closePooledReaders ();
378+ checkForDeadReaders (stack );
379+ waitForTrackedReaderToClose (excludedTxn );
380+ rc = mdb_txn_renew (txn );
381+ }
382+ }
383+ return rc ;
384+ }
233385}
0 commit comments