From dba4ab4aa8d249376768fd1e05a46642c9c2fd68 Mon Sep 17 00:00:00 2001 From: Nick Roscarel <169005170+nick-roscarel-octo@users.noreply.github.com> Date: Tue, 2 Jul 2024 11:15:06 +1000 Subject: [PATCH 01/11] Remove broken deadlock awareness --- .../Nevermore/Advanced/DeadlockAwareLock.cs | 27 ------------------- 1 file changed, 27 deletions(-) diff --git a/source/Nevermore/Advanced/DeadlockAwareLock.cs b/source/Nevermore/Advanced/DeadlockAwareLock.cs index c81f7a9..095fcf8 100644 --- a/source/Nevermore/Advanced/DeadlockAwareLock.cs +++ b/source/Nevermore/Advanced/DeadlockAwareLock.cs @@ -32,8 +32,6 @@ public DeadlockAwareLock(bool logConcurrentExecution) public void Wait() { - AssertNoDeadlock(); - // `SemaphoreSlim` counts down, so if it's 0 then there's a concurrent execution happening. if (logConcurrentExecution && semaphore.CurrentCount == 0) { @@ -46,8 +44,6 @@ public void Wait() public async Task WaitAsync(CancellationToken cancellationToken) { - AssertNoDeadlock(); - // `SemaphoreSlim` counts down, so if it's 0 then there's a concurrent execution happening. if (logConcurrentExecution && semaphore.CurrentCount == 0) { @@ -65,29 +61,6 @@ public void Release() semaphore.Release(); } - [MethodImpl(MethodImplOptions.AggressiveInlining)] - void AssertNoDeadlock() - { - if (taskWhichHasAcquiredLock is not null) - { - // If we have a task then we can rely on the task ID. It's not guaranteed (one task can still spawn another) but it's better than nothing. - // If it's a different task which has acquired the lock then there's at least _some_ hope that that task might complete without requiring - // this task to complete. If this task has the lock and is attempting to acquire it again then there's no way out - deadlock. - if (taskWhichHasAcquiredLock == Task.CurrentId) - throw new DeadlockException("This task context has already acquired this lock and has attempted to do so again."); - } - else - { - // If we have no task then our best guess is that we're using sync-only code, which means that the thread ID _should_ be - // a good indicator of the call context. - if (threadWhichHasAcquiredLock is not null) - // If this thread has already acquired a lock and it's trying to do so again then - // it's very unlikely that the first lock will be released, hence deadlock. - if (threadWhichHasAcquiredLock == Thread.CurrentThread.ManagedThreadId) - throw new DeadlockException("This thread has already acquired this lock and has attempted to do so again."); - } - } - [MethodImpl(MethodImplOptions.AggressiveInlining)] void RecordLockAcquisition() { From 2e8befac0bc9485389faf7b91a407ae921465348 Mon Sep 17 00:00:00 2001 From: Nick Roscarel <169005170+nick-roscarel-octo@users.noreply.github.com> Date: Tue, 2 Jul 2024 11:22:51 +1000 Subject: [PATCH 02/11] Remove unnecessary task/thread monitoring --- .../Nevermore/Advanced/DeadlockAwareLock.cs | 22 ------------------- 1 file changed, 22 deletions(-) diff --git a/source/Nevermore/Advanced/DeadlockAwareLock.cs b/source/Nevermore/Advanced/DeadlockAwareLock.cs index 095fcf8..a164ce3 100644 --- a/source/Nevermore/Advanced/DeadlockAwareLock.cs +++ b/source/Nevermore/Advanced/DeadlockAwareLock.cs @@ -19,9 +19,6 @@ public class DeadlockAwareLock : IDisposable static readonly ILog Log = LogProvider.For(); readonly SemaphoreSlim semaphore = new(1, 1); - - int? taskWhichHasAcquiredLock; - int? threadWhichHasAcquiredLock; readonly bool logConcurrentExecution; @@ -39,7 +36,6 @@ public void Wait() } semaphore.Wait(); - RecordLockAcquisition(); } public async Task WaitAsync(CancellationToken cancellationToken) @@ -51,31 +47,13 @@ public async Task WaitAsync(CancellationToken cancellationToken) } await semaphore.WaitAsync(cancellationToken).ConfigureAwait(false); - await RecordLockAcquisitionAsync().ConfigureAwait(false); } public void Release() { - threadWhichHasAcquiredLock = null; - taskWhichHasAcquiredLock = null; semaphore.Release(); } - [MethodImpl(MethodImplOptions.AggressiveInlining)] - void RecordLockAcquisition() - { - threadWhichHasAcquiredLock = Thread.CurrentThread.ManagedThreadId; - taskWhichHasAcquiredLock = Task.CurrentId; - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - async Task RecordLockAcquisitionAsync() - { - await Task.Yield(); - threadWhichHasAcquiredLock = Thread.CurrentThread.ManagedThreadId; - taskWhichHasAcquiredLock = Task.CurrentId; - } - public void Dispose() { semaphore.Dispose(); From 77999504abcac1958e3f1ab24868d5179f9a4edd Mon Sep 17 00:00:00 2001 From: Nick Roscarel <169005170+nick-roscarel-octo@users.noreply.github.com> Date: Tue, 2 Jul 2024 13:33:55 +1000 Subject: [PATCH 03/11] Part-way implement concurrency handlers --- .../DeadlockAwareLockFixture.cs | 18 +-- .../Nevermore/Advanced/DeadlockAwareLock.cs | 61 -------- .../Advanced/DeadlockAwareLockExtensions.cs | 38 +++++ .../LockWithLoggingConcurrencyHandler.cs | 134 ++++++++++++++++++ source/Nevermore/Advanced/ReadTransaction.cs | 22 +-- .../Advanced/ThreadSafeAsyncEnumerable.cs | 10 +- .../Advanced/ThreadSafeEnumerable.cs | 12 +- source/Nevermore/ConcurrencyMode.cs | 10 ++ .../IRelationalStoreConfiguration.cs | 11 +- .../Nevermore/RelationalStoreConfiguration.cs | 6 +- 10 files changed, 219 insertions(+), 103 deletions(-) delete mode 100644 source/Nevermore/Advanced/DeadlockAwareLock.cs create mode 100644 source/Nevermore/Advanced/DeadlockAwareLockExtensions.cs create mode 100644 source/Nevermore/Advanced/LockWithLoggingConcurrencyHandler.cs create mode 100644 source/Nevermore/ConcurrencyMode.cs diff --git a/source/Nevermore.Tests/DeadlockAwareLockFixture.cs b/source/Nevermore.Tests/DeadlockAwareLockFixture.cs index 555bc0a..bf98abc 100644 --- a/source/Nevermore.Tests/DeadlockAwareLockFixture.cs +++ b/source/Nevermore.Tests/DeadlockAwareLockFixture.cs @@ -28,7 +28,7 @@ public void TearDown() [Test] public void MultipleCallsToWait_ShouldThrow() { - using var deadlockAwareLock = new DeadlockAwareLock(); + using var deadlockAwareLock = new LockWithLoggingConcurrencyHandler(); deadlockAwareLock.Wait(); @@ -42,7 +42,7 @@ public void MultipleCallsToWaitWithinATask_ShouldThrow() Task.Run(() => { // ReSharper disable AccessToDisposedClosure - using var deadlockAwareLock = new DeadlockAwareLock(); + using var deadlockAwareLock = new LockWithLoggingConcurrencyHandler(false); deadlockAwareLock.Wait(); @@ -57,7 +57,7 @@ public void MultipleCallsToWaitWithinATask_ShouldThrow() [Test] public void AcquiringThenReleasingThenAcquiring_ShouldNotThrow() { - using var deadlockAwareLock = new DeadlockAwareLock(); + using var deadlockAwareLock = new LockWithLoggingConcurrencyHandler(); deadlockAwareLock.Wait(); deadlockAwareLock.Release(); @@ -70,7 +70,7 @@ public void AcquiringThenReleasingThenAcquiringInATask_ShouldNotThrow() Task.Run(() => { // ReSharper disable AccessToDisposedClosure - using var deadlockAwareLock = new DeadlockAwareLock(); + using var deadlockAwareLock = new LockWithLoggingConcurrencyHandler(); deadlockAwareLock.Wait(); deadlockAwareLock.Release(); @@ -84,7 +84,7 @@ public void AcquiringThenReleasingThenAcquiringInATask_ShouldNotThrow() [Test] public async Task MultipleCallsToWaitAsync_ShouldThrow() { - using var deadlockAwareLock = new DeadlockAwareLock(); + using var deadlockAwareLock = new LockWithLoggingConcurrencyHandler(); await deadlockAwareLock.WaitAsync(cancellationToken); @@ -95,7 +95,7 @@ public async Task MultipleCallsToWaitAsync_ShouldThrow() [Test] public async Task AcquiringAsyncThenReleasingThenAcquiringAsync_ShouldNotThrow() { - using var deadlockAwareLock = new DeadlockAwareLock(); + using var deadlockAwareLock = new LockWithLoggingConcurrencyHandler(); await deadlockAwareLock.WaitAsync(cancellationToken); deadlockAwareLock.Release(); @@ -106,7 +106,7 @@ public async Task AcquiringAsyncThenReleasingThenAcquiringAsync_ShouldNotThrow() public async Task MultipleTasksContending_ShouldNotThrow() { // ReSharper disable AccessToDisposedClosure - using var deadlockAwareLock = new DeadlockAwareLock(); + using var deadlockAwareLock = new LockWithLoggingConcurrencyHandler(); // Loop so that we increase the probability that two different tasks are scheduled onto // the same worker thread. This helps us guarantee that we're not accidentally relying @@ -136,7 +136,7 @@ public async Task MultipleTasksContending_ShouldNotThrow() [Test] public void UsingSyncExtensionMethods_AndReleasingLocksCorrectly_ShouldNotThrow() { - using var deadlockAwareLock = new DeadlockAwareLock(); + using var deadlockAwareLock = new LockWithLoggingConcurrencyHandler(false); using (var _ = deadlockAwareLock.Lock()) { @@ -150,7 +150,7 @@ public void UsingSyncExtensionMethods_AndReleasingLocksCorrectly_ShouldNotThrow( [Test] public async Task UsingAsyncExtensionMethods_AndReleasingLocksCorrectly_ShouldNotThrow() { - using var deadlockAwareLock = new DeadlockAwareLock(); + using var deadlockAwareLock = new LockWithLoggingConcurrencyHandler(false); using (var _ = await deadlockAwareLock.LockAsync(cancellationToken)) { diff --git a/source/Nevermore/Advanced/DeadlockAwareLock.cs b/source/Nevermore/Advanced/DeadlockAwareLock.cs deleted file mode 100644 index 0762510..0000000 --- a/source/Nevermore/Advanced/DeadlockAwareLock.cs +++ /dev/null @@ -1,61 +0,0 @@ -using System; -using System.Threading; -using System.Threading.Tasks; -using Nevermore.Diagnositcs; - -namespace Nevermore.Advanced -{ - /// - /// This class provides a best-effort deadlock detection mechanism. It will identify re-entrant calls from the same - /// task (if there is a task) or the same thread (if there is no task). While it does not _guarantee_ deadlock - /// detection, - /// it does provide a pretty good guarantee that _if_ a DeadlockException is thrown then there was almost certainly - /// going to be a deadlock. In other words: very few false positives; probably some false negatives; better than - /// nothing. - /// - public class DeadlockAwareLock : IDisposable - { - static readonly ILog Log = LogProvider.For(); - - readonly SemaphoreSlim semaphore = new(1, 1); - - readonly bool logConcurrentExecution; - - public DeadlockAwareLock(bool logConcurrentExecution) - { - this.logConcurrentExecution = logConcurrentExecution; - } - - public void Wait() - { - // `SemaphoreSlim` counts down, so if it's 0 then there's a concurrent execution happening. - if (logConcurrentExecution && semaphore.CurrentCount == 0) - { - Log.Warn("Concurrent query execution detected while waiting for lock"); - } - - semaphore.Wait(); - } - - public async Task WaitAsync(CancellationToken cancellationToken) - { - // `SemaphoreSlim` counts down, so if it's 0 then there's a concurrent execution happening. - if (logConcurrentExecution && semaphore.CurrentCount == 0) - { - Log.Warn("Concurrent query execution detected while waiting for lock"); - } - - await semaphore.WaitAsync(cancellationToken).ConfigureAwait(false); - } - - public void Release() - { - semaphore.Release(); - } - - public void Dispose() - { - semaphore.Dispose(); - } - } -} \ No newline at end of file diff --git a/source/Nevermore/Advanced/DeadlockAwareLockExtensions.cs b/source/Nevermore/Advanced/DeadlockAwareLockExtensions.cs new file mode 100644 index 0000000..5c62837 --- /dev/null +++ b/source/Nevermore/Advanced/DeadlockAwareLockExtensions.cs @@ -0,0 +1,38 @@ +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace Nevermore.Advanced +{ + public static class DeadlockAwareLockExtensions + { + public static IDisposable Lock(this LockWithLoggingConcurrencyHandler lockWithLoggingConcurrencyHandler) + { + lockWithLoggingConcurrencyHandler.Wait(); + return new Disposable(lockWithLoggingConcurrencyHandler.Release); + } + + public static async Task LockAsync( + this LockWithLoggingConcurrencyHandler lockWithLoggingConcurrencyHandler, + CancellationToken cancellationToken) + { + await lockWithLoggingConcurrencyHandler.WaitAsync(cancellationToken).ConfigureAwait(false); + return new Disposable(lockWithLoggingConcurrencyHandler.Release); + } + + class Disposable : IDisposable + { + readonly Action callback; + + public Disposable(Action callback) + { + this.callback = callback; + } + + public void Dispose() + { + callback(); + } + } + } +} \ No newline at end of file diff --git a/source/Nevermore/Advanced/LockWithLoggingConcurrencyHandler.cs b/source/Nevermore/Advanced/LockWithLoggingConcurrencyHandler.cs new file mode 100644 index 0000000..85d7aea --- /dev/null +++ b/source/Nevermore/Advanced/LockWithLoggingConcurrencyHandler.cs @@ -0,0 +1,134 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Nevermore.Diagnositcs; +using Nito.AsyncEx; +using Nito.Disposables; + +namespace Nevermore.Advanced +{ + public interface ITransactionConcurrencyHandler : IDisposable + { + IDisposable Wait(); + + Task WaitAsync(CancellationToken cancellationToken); + } + + /// + /// This class provides a best-effort deadlock detection mechanism. It will identify re-entrant calls from the same + /// task (if there is a task) or the same thread (if there is no task). While it does not _guarantee_ deadlock + /// detection, + /// it does provide a pretty good guarantee that _if_ a DeadlockException is thrown then there was almost certainly + /// going to be a deadlock. In other words: very few false positives; probably some false negatives; better than + /// nothing. + /// + public class LockWithLoggingConcurrencyHandler : ITransactionConcurrencyHandler + { + static readonly ILog Log = LogProvider.For(); + + readonly SemaphoreSlim semaphore = new(1, 1); + + public IDisposable Wait() + { + // `SemaphoreSlim` counts down, so if it's 0 then there's a concurrent execution happening. + if (semaphore.CurrentCount == 0) + { + Log.Warn("Concurrent query execution detected while waiting for lock"); + } + + return semaphore.Lock(); + } + + public async Task WaitAsync(CancellationToken cancellationToken) + { + // `SemaphoreSlim` counts down, so if it's 0 then there's a concurrent execution happening. + if (semaphore.CurrentCount == 0) + { + Log.Warn("Concurrent query execution detected while waiting for lock"); + } + + return await semaphore.LockAsync(cancellationToken).ConfigureAwait(false); + } + + public void Release() + { + semaphore.Release(); + } + + public void Dispose() + { + semaphore.Dispose(); + } + } + + public class LockOnlyConcurrencyHandler : ITransactionConcurrencyHandler + { + readonly SemaphoreSlim semaphore = new(1, 1); + + public IDisposable Wait() + { + return semaphore.Lock(); + } + + public async Task WaitAsync(CancellationToken cancellationToken) + { + return await semaphore.LockAsync(cancellationToken).ConfigureAwait(false); + } + + public void Dispose() + { + semaphore.Dispose(); + } + } + + public class LogOnlyConcurrencyHandler : ITransactionConcurrencyHandler + { + static readonly ILog Log = LogProvider.For(); + + readonly SemaphoreSlim semaphore = new(1, 1); + + public IDisposable Wait() + { + if (!semaphore.Wait(TimeSpan.Zero)) + { + Log.Warn("Concurrent query execution detected while waiting for lock"); + + return NoopDisposable.Instance; + } + + return new ConcurrencyDisposable(() => semaphore.Release()); + } + + public async Task WaitAsync(CancellationToken cancellationToken) + { + if (!await semaphore.WaitAsync(TimeSpan.Zero, cancellationToken).ConfigureAwait(false)) + { + Log.Warn("Concurrent query execution detected while waiting for lock"); + + return NoopDisposable.Instance; + } + + return new ConcurrencyDisposable(() => semaphore.Release()); + } + + public void Dispose() + { + semaphore.Dispose(); + } + } + + public class ConcurrencyDisposable : IDisposable + { + readonly Action disposeAction; + + public ConcurrencyDisposable(Action disposeAction) + { + this.disposeAction = disposeAction; + } + + public void Dispose() + { + disposeAction(); + } + } +} \ No newline at end of file diff --git a/source/Nevermore/Advanced/ReadTransaction.cs b/source/Nevermore/Advanced/ReadTransaction.cs index 4183182..4ad482e 100644 --- a/source/Nevermore/Advanced/ReadTransaction.cs +++ b/source/Nevermore/Advanced/ReadTransaction.cs @@ -44,7 +44,7 @@ public class ReadTransaction : IReadTransaction, ITransactionDiagnostic DbConnection? connection; protected IUniqueParameterNameGenerator ParameterNameGenerator { get; } = new UniqueParameterNameGenerator(); - protected DeadlockAwareLock DeadlockAwareLock { get; } = new(); + protected ITransactionConcurrencyHandler TransactionConcurrencyHandler { get; } // To help track deadlocks readonly List commandTrace; @@ -522,7 +522,7 @@ IEnumerable Execute() } return configuration.SupportConcurrentExecution - ? new ThreadSafeEnumerable(Execute, DeadlockAwareLock) + ? new ThreadSafeEnumerable(Execute, TransactionConcurrencyHandler) : Execute(); } @@ -539,7 +539,7 @@ async IAsyncEnumerable Execute() } return configuration.SupportConcurrentExecution - ? new ThreadSafeAsyncEnumerable(Execute, DeadlockAwareLock) + ? new ThreadSafeAsyncEnumerable(Execute, TransactionConcurrencyHandler) : Execute(); } @@ -621,7 +621,7 @@ async IAsyncEnumerable Execute() } return configuration.SupportConcurrentExecution - ? new ThreadSafeAsyncEnumerable(Execute, DeadlockAwareLock) + ? new ThreadSafeAsyncEnumerable(Execute, TransactionConcurrencyHandler) : Execute(); } @@ -652,7 +652,7 @@ public Task ExecuteNonQueryAsync(string query, CommandParameterValues? args public int ExecuteNonQuery(PreparedCommand preparedCommand) { using var mutex = configuration.SupportConcurrentExecution - ? DeadlockAwareLock.Lock() + ? TransactionConcurrencyHandler.Lock() : NoopDisposable.Instance; using var command = CreateCommand(preparedCommand); return command.ExecuteNonQuery(); @@ -661,7 +661,7 @@ public int ExecuteNonQuery(PreparedCommand preparedCommand) public async Task ExecuteNonQueryAsync(PreparedCommand preparedCommand, CancellationToken cancellationToken = default) { using var mutex = configuration.SupportConcurrentExecution - ? await DeadlockAwareLock.LockAsync(cancellationToken).ConfigureAwait(false) + ? await TransactionConcurrencyHandler.LockAsync(cancellationToken).ConfigureAwait(false) : NoopDisposable.Instance; using var command = CreateCommand(preparedCommand); return await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false); @@ -680,7 +680,7 @@ public Task ExecuteScalarAsync(string query, CommandParameterV public TResult ExecuteScalar(PreparedCommand preparedCommand) { using var mutex = configuration.SupportConcurrentExecution - ? DeadlockAwareLock.Lock() + ? TransactionConcurrencyHandler.Lock() : NoopDisposable.Instance; using var command = CreateCommand(preparedCommand); var result = command.ExecuteScalar(); @@ -692,7 +692,7 @@ public TResult ExecuteScalar(PreparedCommand preparedCommand) public async Task ExecuteScalarAsync(PreparedCommand preparedCommand, CancellationToken cancellationToken = default) { using var mutex = configuration.SupportConcurrentExecution - ? await DeadlockAwareLock.LockAsync(cancellationToken).ConfigureAwait(false) + ? await TransactionConcurrencyHandler.LockAsync(cancellationToken).ConfigureAwait(false) : NoopDisposable.Instance; using var command = CreateCommand(preparedCommand); var result = await command.ExecuteScalarAsync(cancellationToken).ConfigureAwait(false); @@ -726,7 +726,7 @@ public async Task ExecuteReaderAsync(PreparedCommand preparedComma protected TResult[] ReadResults(PreparedCommand preparedCommand, Func mapper) { using var mutex = configuration.SupportConcurrentExecution - ? DeadlockAwareLock.Lock() + ? TransactionConcurrencyHandler.Lock() : NoopDisposable.Instance; using var command = CreateCommand(preparedCommand); @@ -736,7 +736,7 @@ protected TResult[] ReadResults(PreparedCommand preparedCommand, Func ReadResultsAsync(PreparedCommand preparedCommand, Func> mapper, CancellationToken cancellationToken) { using var mutex = configuration.SupportConcurrentExecution - ? await DeadlockAwareLock.LockAsync(cancellationToken).ConfigureAwait(false) + ? await TransactionConcurrencyHandler.LockAsync(cancellationToken).ConfigureAwait(false) : NoopDisposable.Instance; using var command = CreateCommand(preparedCommand); @@ -914,7 +914,7 @@ public void Dispose() } TryDispose(TransactionTimer); - TryDispose(DeadlockAwareLock); + TryDispose(TransactionConcurrencyHandler); if (OwnsSqlTransaction) { diff --git a/source/Nevermore/Advanced/ThreadSafeAsyncEnumerable.cs b/source/Nevermore/Advanced/ThreadSafeAsyncEnumerable.cs index 9bb73a3..a9f9b78 100644 --- a/source/Nevermore/Advanced/ThreadSafeAsyncEnumerable.cs +++ b/source/Nevermore/Advanced/ThreadSafeAsyncEnumerable.cs @@ -9,21 +9,21 @@ namespace Nevermore.Advanced public class ThreadSafeAsyncEnumerable : IAsyncEnumerable { readonly Func> innerFunc; - readonly DeadlockAwareLock deadlockAwareLock; + readonly LockWithLoggingConcurrencyHandler _lockWithLoggingConcurrencyHandler; - public ThreadSafeAsyncEnumerable(IAsyncEnumerable inner, DeadlockAwareLock deadlockAwareLock) : this(() => inner, deadlockAwareLock) + public ThreadSafeAsyncEnumerable(IAsyncEnumerable inner, LockWithLoggingConcurrencyHandler lockWithLoggingConcurrencyHandler) : this(() => inner, lockWithLoggingConcurrencyHandler) { } - public ThreadSafeAsyncEnumerable(Func> innerFunc, DeadlockAwareLock deadlockAwareLock) + public ThreadSafeAsyncEnumerable(Func> innerFunc, LockWithLoggingConcurrencyHandler lockWithLoggingConcurrencyHandler) { this.innerFunc = innerFunc; - this.deadlockAwareLock = deadlockAwareLock; + this._lockWithLoggingConcurrencyHandler = lockWithLoggingConcurrencyHandler; } public async IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = new()) { - using var mutex = await deadlockAwareLock.LockAsync(cancellationToken).ConfigureAwait(false); + using var mutex = await _lockWithLoggingConcurrencyHandler.LockAsync(cancellationToken).ConfigureAwait(false); var inner = innerFunc(); await foreach (var item in inner.WithCancellation(cancellationToken).ConfigureAwait(false)) yield return item; } diff --git a/source/Nevermore/Advanced/ThreadSafeEnumerable.cs b/source/Nevermore/Advanced/ThreadSafeEnumerable.cs index dfe9dc3..7a14535 100644 --- a/source/Nevermore/Advanced/ThreadSafeEnumerable.cs +++ b/source/Nevermore/Advanced/ThreadSafeEnumerable.cs @@ -8,23 +8,23 @@ namespace Nevermore.Advanced internal class ThreadSafeEnumerable : IEnumerable { readonly Func> innerFunc; - readonly DeadlockAwareLock deadlockAwareLock; + readonly LockWithLoggingConcurrencyHandler _lockWithLoggingConcurrencyHandler; - public ThreadSafeEnumerable(IEnumerable inner, DeadlockAwareLock deadlockAwareLock) : this(() => inner, deadlockAwareLock) + public ThreadSafeEnumerable(IEnumerable inner, LockWithLoggingConcurrencyHandler lockWithLoggingConcurrencyHandler) : this(() => inner, lockWithLoggingConcurrencyHandler) { } - public ThreadSafeEnumerable(Func> innerFunc, DeadlockAwareLock deadlockAwareLock) + public ThreadSafeEnumerable(Func> innerFunc, LockWithLoggingConcurrencyHandler lockWithLoggingConcurrencyHandler) { this.innerFunc = innerFunc; - this.deadlockAwareLock = deadlockAwareLock; + this._lockWithLoggingConcurrencyHandler = lockWithLoggingConcurrencyHandler; } public IEnumerator GetEnumerator() { - deadlockAwareLock.Wait(); + _lockWithLoggingConcurrencyHandler.Wait(); var inner = innerFunc(); - return new ThreadSafeEnumerator(inner.GetEnumerator(), () => deadlockAwareLock.Release()); + return new ThreadSafeEnumerator(inner.GetEnumerator(), () => _lockWithLoggingConcurrencyHandler.Release()); } IEnumerator IEnumerable.GetEnumerator() diff --git a/source/Nevermore/ConcurrencyMode.cs b/source/Nevermore/ConcurrencyMode.cs new file mode 100644 index 0000000..6b10fc7 --- /dev/null +++ b/source/Nevermore/ConcurrencyMode.cs @@ -0,0 +1,10 @@ +namespace Nevermore +{ + public enum ConcurrencyMode + { + NoLock, + LockOnly, + LogOnly, + LockWithLogging + } +} \ No newline at end of file diff --git a/source/Nevermore/IRelationalStoreConfiguration.cs b/source/Nevermore/IRelationalStoreConfiguration.cs index 9832c84..46a1b49 100644 --- a/source/Nevermore/IRelationalStoreConfiguration.cs +++ b/source/Nevermore/IRelationalStoreConfiguration.cs @@ -88,13 +88,8 @@ public interface IRelationalStoreConfiguration /// Used to get the table name for a document type. By default, the table name is retrieved from the document map. /// ITableNameResolver TableNameResolver { get; set; } - - /// - /// Gets or sets whether concurrent execution of queries is handled by Nevermore. When true, Nevermore will attempt - /// to sequence queries issued concurrently. - /// - /// The default is true. - /// - bool SupportConcurrentExecution { get; set; } + + // TODO: Doc + public ConcurrencyMode ConcurrencyMode { get; set; } } } \ No newline at end of file diff --git a/source/Nevermore/RelationalStoreConfiguration.cs b/source/Nevermore/RelationalStoreConfiguration.cs index 2b7c3b5..b2772fa 100644 --- a/source/Nevermore/RelationalStoreConfiguration.cs +++ b/source/Nevermore/RelationalStoreConfiguration.cs @@ -57,7 +57,7 @@ public RelationalStoreConfiguration(Func connectionStringFunc, IPrimaryK TableColumnNameResolver = _ => new SelectAllColumnsTableResolver(); AllowSynchronousOperations = true; - SupportConcurrentExecution = true; + ConcurrencyMode = ConcurrencyMode.LockOnly; QueryLogger = new DefaultQueryLogger(); TransactionLogger = new DefaultTransactionLogger(); @@ -119,8 +119,8 @@ public RelationalStoreConfiguration(Func connectionStringFunc, IPrimaryK public Func KeyAllocatorFactory { get; set; } public ITableNameResolver TableNameResolver { get; set; } - - public bool SupportConcurrentExecution { get; set; } + + public ConcurrencyMode ConcurrencyMode { get; set; } string InitializeConnectionString(string sqlConnectionString) { From eaccf7f106af843294e4458a287a0fc01a1c122a Mon Sep 17 00:00:00 2001 From: Eoin Motherway <25342760+YuKitsune@users.noreply.github.com> Date: Tue, 2 Jul 2024 15:26:58 +1000 Subject: [PATCH 04/11] Clean up and tests --- .../Advanced/ConcurrentAccessFixture.cs | 73 +++++++- .../DeadlockAwareLockFixture.cs | 164 ------------------ .../Advanced/DeadlockAwareLockExtensions.cs | 38 ---- .../LockWithLoggingConcurrencyHandler.cs | 36 ++-- source/Nevermore/Advanced/ReadTransaction.cs | 29 +++- .../Advanced/ThreadSafeAsyncEnumerable.cs | 12 +- .../Advanced/ThreadSafeEnumerable.cs | 14 +- 7 files changed, 128 insertions(+), 238 deletions(-) delete mode 100644 source/Nevermore.Tests/DeadlockAwareLockFixture.cs delete mode 100644 source/Nevermore/Advanced/DeadlockAwareLockExtensions.cs diff --git a/source/Nevermore.IntegrationTests/Advanced/ConcurrentAccessFixture.cs b/source/Nevermore.IntegrationTests/Advanced/ConcurrentAccessFixture.cs index 65ecf87..35e7258 100644 --- a/source/Nevermore.IntegrationTests/Advanced/ConcurrentAccessFixture.cs +++ b/source/Nevermore.IntegrationTests/Advanced/ConcurrentAccessFixture.cs @@ -16,10 +16,12 @@ public class ConcurrentAccessFixture : FixtureWithRelationalStore const int NumberOfDocuments = 256; const int DegreeOfParallelism = NumberOfDocuments; - [Test] - public void ConcurrentAccessDoesNotGoBoom() + [TestCase(ConcurrencyMode.LockOnly)] + [TestCase(ConcurrencyMode.LockWithLogging)] + public void ConcurrentAccessDoesNotGoBoom(ConcurrencyMode concurrencyMode) { NoMonkeyBusiness(); + Configuration.ConcurrencyMode = concurrencyMode; var namePrefix = $"{Guid.NewGuid()}-"; @@ -103,10 +105,12 @@ static void ThreadWaitAll(params Thread[] threads) } } - [Test] - public async Task AsyncConcurrentAccessDoesNotGoBoom() + [TestCase(ConcurrencyMode.LockOnly)] + [TestCase(ConcurrencyMode.LockWithLogging)] + public async Task AsyncConcurrentAccessDoesNotGoBoom(ConcurrencyMode concurrencyMode) { NoMonkeyBusiness(); + Configuration.ConcurrencyMode = concurrencyMode; var namePrefix = $"{Guid.NewGuid()}-"; @@ -162,5 +166,66 @@ await Task.WhenAll( .WhenAll(); } } + + [TestCase(ConcurrencyMode.NoLock)] + [TestCase(ConcurrencyMode.LogOnly)] + public void ConcurrentAccessGoesBoomWhenLockingIsDisabled(ConcurrencyMode concurrencyMode) + { + NoMonkeyBusiness(); + Configuration.ConcurrencyMode = concurrencyMode; + + var namePrefix = $"{Guid.NewGuid()}-"; + + // Create a bunch of documents so that we can query for them. + using (var transaction = Store.BeginTransaction()) + { + FluentActions.Invoking( + () => + { + // Only using 10 here because using NumberOfDocuments (256) is too slow + Enumerable.Range(0, 10) + .Select(i => new DocumentWithIdentityId { Name = $"{namePrefix}{i}" }) + .AsParallel() + .WithDegreeOfParallelism(DegreeOfParallelism) + .Select(document => + { + // ReSharper disable once AccessToDisposedClosure + transaction.Insert(document); + return 0; + }) + .ToArray(); + }) + .Should() + .Throw(); + } + } + + [TestCase(ConcurrencyMode.NoLock)] + [TestCase(ConcurrencyMode.LogOnly)] + public async Task AsyncConcurrentAccessGoesBoomWhenLockingIsDisabled(ConcurrencyMode concurrencyMode) + { + NoMonkeyBusiness(); + Configuration.ConcurrencyMode = concurrencyMode; + + var namePrefix = $"{Guid.NewGuid()}-"; + + // Create a bunch of documents so that we can query for them. + using (var transaction = await Store.BeginWriteTransactionAsync()) + { + await FluentActions.Awaiting( + async () => + { + await Enumerable.Range(0, NumberOfDocuments) + .Select(i => new DocumentWithIdentityId { Name = $"{namePrefix}{i}" }) + // ReSharper disable once AccessToDisposedClosure + .Select(document => transaction.InsertAsync(document)) + .WhenAll(); + await transaction.CommitAsync(); + }) + .Should() + .ThrowExactlyAsync() + .WithMessage("The connection does not support MultipleActiveResultSets."); + } + } } } \ No newline at end of file diff --git a/source/Nevermore.Tests/DeadlockAwareLockFixture.cs b/source/Nevermore.Tests/DeadlockAwareLockFixture.cs deleted file mode 100644 index bf98abc..0000000 --- a/source/Nevermore.Tests/DeadlockAwareLockFixture.cs +++ /dev/null @@ -1,164 +0,0 @@ -using System; -using System.Threading; -using System.Threading.Tasks; -using Nevermore.Advanced; -using Nito.AsyncEx; -using NUnit.Framework; - -namespace Nevermore.Tests -{ - public class DeadlockAwareLockFixture - { - CancellationToken cancellationToken; - CancellationTokenSource cts; - - [SetUp] - public void SetUp() - { - cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); - cancellationToken = cts.Token; - } - - [TearDown] - public void TearDown() - { - cts?.Dispose(); - } - - [Test] - public void MultipleCallsToWait_ShouldThrow() - { - using var deadlockAwareLock = new LockWithLoggingConcurrencyHandler(); - - deadlockAwareLock.Wait(); - - // The second call should immediately throw rather than waiting forever. - Assert.Throws(() => deadlockAwareLock.Wait()); - } - - [Test] - public void MultipleCallsToWaitWithinATask_ShouldThrow() - { - Task.Run(() => - { - // ReSharper disable AccessToDisposedClosure - using var deadlockAwareLock = new LockWithLoggingConcurrencyHandler(false); - - deadlockAwareLock.Wait(); - - // The second call should immediately throw rather than waiting forever. - Assert.Throws(() => deadlockAwareLock.Wait()); - - // ReSharper restore AccessToDisposedClosure - }, cancellationToken) - .Wait(cancellationToken); - } - - [Test] - public void AcquiringThenReleasingThenAcquiring_ShouldNotThrow() - { - using var deadlockAwareLock = new LockWithLoggingConcurrencyHandler(); - - deadlockAwareLock.Wait(); - deadlockAwareLock.Release(); - deadlockAwareLock.Wait(); - } - - [Test] - public void AcquiringThenReleasingThenAcquiringInATask_ShouldNotThrow() - { - Task.Run(() => - { - // ReSharper disable AccessToDisposedClosure - using var deadlockAwareLock = new LockWithLoggingConcurrencyHandler(); - - deadlockAwareLock.Wait(); - deadlockAwareLock.Release(); - deadlockAwareLock.Wait(); - - // ReSharper restore AccessToDisposedClosure - }, cancellationToken) - .Wait(cancellationToken); - } - - [Test] - public async Task MultipleCallsToWaitAsync_ShouldThrow() - { - using var deadlockAwareLock = new LockWithLoggingConcurrencyHandler(); - - await deadlockAwareLock.WaitAsync(cancellationToken); - - // The second call should immediately throw rather than waiting forever. - Assert.ThrowsAsync(async () => await deadlockAwareLock.WaitAsync(cancellationToken)); - } - - [Test] - public async Task AcquiringAsyncThenReleasingThenAcquiringAsync_ShouldNotThrow() - { - using var deadlockAwareLock = new LockWithLoggingConcurrencyHandler(); - - await deadlockAwareLock.WaitAsync(cancellationToken); - deadlockAwareLock.Release(); - await deadlockAwareLock.WaitAsync(cancellationToken); - } - - [Test] - public async Task MultipleTasksContending_ShouldNotThrow() - { - // ReSharper disable AccessToDisposedClosure - using var deadlockAwareLock = new LockWithLoggingConcurrencyHandler(); - - // Loop so that we increase the probability that two different tasks are scheduled onto - // the same worker thread. This helps us guarantee that we're not accidentally relying - // on thread IDs or thread locals anywhere. - for (var i = 0; i < 1000; i++) - { - var task0 = Task.Run(async () => - { - await deadlockAwareLock.WaitAsync(cancellationToken); - await Task.Yield(); - deadlockAwareLock.Release(); - }, cancellationToken); - - var task1 = Task.Run(async () => - { - await deadlockAwareLock.WaitAsync(cancellationToken); - await Task.Yield(); - deadlockAwareLock.Release(); - }, cancellationToken); - - await Task.WhenAll(task0, task1); - } - - // ReSharper restore AccessToDisposedClosure - } - - [Test] - public void UsingSyncExtensionMethods_AndReleasingLocksCorrectly_ShouldNotThrow() - { - using var deadlockAwareLock = new LockWithLoggingConcurrencyHandler(false); - - using (var _ = deadlockAwareLock.Lock()) - { - } - - using (var _ = deadlockAwareLock.Lock()) - { - } - } - - [Test] - public async Task UsingAsyncExtensionMethods_AndReleasingLocksCorrectly_ShouldNotThrow() - { - using var deadlockAwareLock = new LockWithLoggingConcurrencyHandler(false); - - using (var _ = await deadlockAwareLock.LockAsync(cancellationToken)) - { - } - - using (var _ = await deadlockAwareLock.LockAsync(cancellationToken)) - { - } - } - } -} \ No newline at end of file diff --git a/source/Nevermore/Advanced/DeadlockAwareLockExtensions.cs b/source/Nevermore/Advanced/DeadlockAwareLockExtensions.cs deleted file mode 100644 index 5c62837..0000000 --- a/source/Nevermore/Advanced/DeadlockAwareLockExtensions.cs +++ /dev/null @@ -1,38 +0,0 @@ -using System; -using System.Threading; -using System.Threading.Tasks; - -namespace Nevermore.Advanced -{ - public static class DeadlockAwareLockExtensions - { - public static IDisposable Lock(this LockWithLoggingConcurrencyHandler lockWithLoggingConcurrencyHandler) - { - lockWithLoggingConcurrencyHandler.Wait(); - return new Disposable(lockWithLoggingConcurrencyHandler.Release); - } - - public static async Task LockAsync( - this LockWithLoggingConcurrencyHandler lockWithLoggingConcurrencyHandler, - CancellationToken cancellationToken) - { - await lockWithLoggingConcurrencyHandler.WaitAsync(cancellationToken).ConfigureAwait(false); - return new Disposable(lockWithLoggingConcurrencyHandler.Release); - } - - class Disposable : IDisposable - { - readonly Action callback; - - public Disposable(Action callback) - { - this.callback = callback; - } - - public void Dispose() - { - callback(); - } - } - } -} \ No newline at end of file diff --git a/source/Nevermore/Advanced/LockWithLoggingConcurrencyHandler.cs b/source/Nevermore/Advanced/LockWithLoggingConcurrencyHandler.cs index 85d7aea..4f0eb53 100644 --- a/source/Nevermore/Advanced/LockWithLoggingConcurrencyHandler.cs +++ b/source/Nevermore/Advanced/LockWithLoggingConcurrencyHandler.cs @@ -9,9 +9,9 @@ namespace Nevermore.Advanced { public interface ITransactionConcurrencyHandler : IDisposable { - IDisposable Wait(); + IDisposable Lock(); - Task WaitAsync(CancellationToken cancellationToken); + Task LockAsync(CancellationToken cancellationToken); } /// @@ -28,7 +28,7 @@ public class LockWithLoggingConcurrencyHandler : ITransactionConcurrencyHandler readonly SemaphoreSlim semaphore = new(1, 1); - public IDisposable Wait() + public IDisposable Lock() { // `SemaphoreSlim` counts down, so if it's 0 then there's a concurrent execution happening. if (semaphore.CurrentCount == 0) @@ -39,7 +39,7 @@ public IDisposable Wait() return semaphore.Lock(); } - public async Task WaitAsync(CancellationToken cancellationToken) + public async Task LockAsync(CancellationToken cancellationToken) { // `SemaphoreSlim` counts down, so if it's 0 then there's a concurrent execution happening. if (semaphore.CurrentCount == 0) @@ -65,12 +65,12 @@ public class LockOnlyConcurrencyHandler : ITransactionConcurrencyHandler { readonly SemaphoreSlim semaphore = new(1, 1); - public IDisposable Wait() + public IDisposable Lock() { return semaphore.Lock(); } - public async Task WaitAsync(CancellationToken cancellationToken) + public async Task LockAsync(CancellationToken cancellationToken) { return await semaphore.LockAsync(cancellationToken).ConfigureAwait(false); } @@ -87,24 +87,22 @@ public class LogOnlyConcurrencyHandler : ITransactionConcurrencyHandler readonly SemaphoreSlim semaphore = new(1, 1); - public IDisposable Wait() + public IDisposable Lock() { if (!semaphore.Wait(TimeSpan.Zero)) { Log.Warn("Concurrent query execution detected while waiting for lock"); - return NoopDisposable.Instance; } return new ConcurrencyDisposable(() => semaphore.Release()); } - public async Task WaitAsync(CancellationToken cancellationToken) + public async Task LockAsync(CancellationToken cancellationToken) { if (!await semaphore.WaitAsync(TimeSpan.Zero, cancellationToken).ConfigureAwait(false)) { Log.Warn("Concurrent query execution detected while waiting for lock"); - return NoopDisposable.Instance; } @@ -117,6 +115,24 @@ public void Dispose() } } + public class NoOpConcurrencyHandler : ITransactionConcurrencyHandler + { + public IDisposable Lock() + { + return NoopDisposable.Instance; + } + + public async Task LockAsync(CancellationToken cancellationToken) + { + await Task.Yield(); + return NoopDisposable.Instance; + } + + public void Dispose() + { + } + } + public class ConcurrencyDisposable : IDisposable { readonly Action disposeAction; diff --git a/source/Nevermore/Advanced/ReadTransaction.cs b/source/Nevermore/Advanced/ReadTransaction.cs index 4ad482e..366420e 100644 --- a/source/Nevermore/Advanced/ReadTransaction.cs +++ b/source/Nevermore/Advanced/ReadTransaction.cs @@ -21,6 +21,7 @@ using Nevermore.Transient; using Nito.AsyncEx; using Nito.Disposables; +using NotSupportedException = System.NotSupportedException; namespace Nevermore.Advanced { @@ -114,6 +115,14 @@ internal ReadTransaction( registry.Add(this); columnNameResolver = configuration.TableColumnNameResolver(store); + TransactionConcurrencyHandler = configuration.ConcurrencyMode switch + { + ConcurrencyMode.NoLock => new NoOpConcurrencyHandler(), + ConcurrencyMode.LockOnly => new LockOnlyConcurrencyHandler(), + ConcurrencyMode.LogOnly => new LogOnlyConcurrencyHandler(), + ConcurrencyMode.LockWithLogging => new LockWithLoggingConcurrencyHandler(), + _ => throw new NotSupportedException($"Concurrency mode {this.configuration.ConcurrencyMode} not supported") + }; } protected DbTransaction? Transaction { get; private set; } @@ -521,7 +530,8 @@ IEnumerable Execute() yield return item; } - return configuration.SupportConcurrentExecution + // Use the thread safe variant if we know it's going to be used + return configuration.ConcurrencyMode is not ConcurrencyMode.NoLock ? new ThreadSafeEnumerable(Execute, TransactionConcurrencyHandler) : Execute(); } @@ -538,7 +548,8 @@ async IAsyncEnumerable Execute() } } - return configuration.SupportConcurrentExecution + // Use the thread safe variant if we know it's going to be used + return configuration.ConcurrencyMode is not ConcurrencyMode.NoLock ? new ThreadSafeAsyncEnumerable(Execute, TransactionConcurrencyHandler) : Execute(); } @@ -620,7 +631,7 @@ async IAsyncEnumerable Execute() } } - return configuration.SupportConcurrentExecution + return configuration.ConcurrencyMode is not ConcurrencyMode.NoLock ? new ThreadSafeAsyncEnumerable(Execute, TransactionConcurrencyHandler) : Execute(); } @@ -651,7 +662,7 @@ public Task ExecuteNonQueryAsync(string query, CommandParameterValues? args public int ExecuteNonQuery(PreparedCommand preparedCommand) { - using var mutex = configuration.SupportConcurrentExecution + using var mutex = configuration.ConcurrencyMode is not ConcurrencyMode.NoLock ? TransactionConcurrencyHandler.Lock() : NoopDisposable.Instance; using var command = CreateCommand(preparedCommand); @@ -660,7 +671,7 @@ public int ExecuteNonQuery(PreparedCommand preparedCommand) public async Task ExecuteNonQueryAsync(PreparedCommand preparedCommand, CancellationToken cancellationToken = default) { - using var mutex = configuration.SupportConcurrentExecution + using var mutex = configuration.ConcurrencyMode is not ConcurrencyMode.NoLock ? await TransactionConcurrencyHandler.LockAsync(cancellationToken).ConfigureAwait(false) : NoopDisposable.Instance; using var command = CreateCommand(preparedCommand); @@ -679,7 +690,7 @@ public Task ExecuteScalarAsync(string query, CommandParameterV public TResult ExecuteScalar(PreparedCommand preparedCommand) { - using var mutex = configuration.SupportConcurrentExecution + using var mutex = configuration.ConcurrencyMode is not ConcurrencyMode.NoLock ? TransactionConcurrencyHandler.Lock() : NoopDisposable.Instance; using var command = CreateCommand(preparedCommand); @@ -691,7 +702,7 @@ public TResult ExecuteScalar(PreparedCommand preparedCommand) public async Task ExecuteScalarAsync(PreparedCommand preparedCommand, CancellationToken cancellationToken = default) { - using var mutex = configuration.SupportConcurrentExecution + using var mutex = configuration.ConcurrencyMode is not ConcurrencyMode.NoLock ? await TransactionConcurrencyHandler.LockAsync(cancellationToken).ConfigureAwait(false) : NoopDisposable.Instance; using var command = CreateCommand(preparedCommand); @@ -725,7 +736,7 @@ public async Task ExecuteReaderAsync(PreparedCommand preparedComma protected TResult[] ReadResults(PreparedCommand preparedCommand, Func mapper) { - using var mutex = configuration.SupportConcurrentExecution + using var mutex = configuration.ConcurrencyMode is not ConcurrencyMode.NoLock ? TransactionConcurrencyHandler.Lock() : NoopDisposable.Instance; @@ -735,7 +746,7 @@ protected TResult[] ReadResults(PreparedCommand preparedCommand, Func ReadResultsAsync(PreparedCommand preparedCommand, Func> mapper, CancellationToken cancellationToken) { - using var mutex = configuration.SupportConcurrentExecution + using var mutex = configuration.ConcurrencyMode is not ConcurrencyMode.NoLock ? await TransactionConcurrencyHandler.LockAsync(cancellationToken).ConfigureAwait(false) : NoopDisposable.Instance; diff --git a/source/Nevermore/Advanced/ThreadSafeAsyncEnumerable.cs b/source/Nevermore/Advanced/ThreadSafeAsyncEnumerable.cs index a9f9b78..484f7ed 100644 --- a/source/Nevermore/Advanced/ThreadSafeAsyncEnumerable.cs +++ b/source/Nevermore/Advanced/ThreadSafeAsyncEnumerable.cs @@ -2,28 +2,28 @@ using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; -using Nito.AsyncEx; namespace Nevermore.Advanced { public class ThreadSafeAsyncEnumerable : IAsyncEnumerable { readonly Func> innerFunc; - readonly LockWithLoggingConcurrencyHandler _lockWithLoggingConcurrencyHandler; + readonly ITransactionConcurrencyHandler transactionConcurrencyHandler; - public ThreadSafeAsyncEnumerable(IAsyncEnumerable inner, LockWithLoggingConcurrencyHandler lockWithLoggingConcurrencyHandler) : this(() => inner, lockWithLoggingConcurrencyHandler) + public ThreadSafeAsyncEnumerable(IAsyncEnumerable inner, ITransactionConcurrencyHandler transactionConcurrencyHandler) + : this(() => inner, transactionConcurrencyHandler) { } - public ThreadSafeAsyncEnumerable(Func> innerFunc, LockWithLoggingConcurrencyHandler lockWithLoggingConcurrencyHandler) + public ThreadSafeAsyncEnumerable(Func> innerFunc, ITransactionConcurrencyHandler transactionConcurrencyHandler) { this.innerFunc = innerFunc; - this._lockWithLoggingConcurrencyHandler = lockWithLoggingConcurrencyHandler; + this.transactionConcurrencyHandler = transactionConcurrencyHandler; } public async IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = new()) { - using var mutex = await _lockWithLoggingConcurrencyHandler.LockAsync(cancellationToken).ConfigureAwait(false); + using var mutex = await transactionConcurrencyHandler.LockAsync(cancellationToken).ConfigureAwait(false); var inner = innerFunc(); await foreach (var item in inner.WithCancellation(cancellationToken).ConfigureAwait(false)) yield return item; } diff --git a/source/Nevermore/Advanced/ThreadSafeEnumerable.cs b/source/Nevermore/Advanced/ThreadSafeEnumerable.cs index 7a14535..36e5968 100644 --- a/source/Nevermore/Advanced/ThreadSafeEnumerable.cs +++ b/source/Nevermore/Advanced/ThreadSafeEnumerable.cs @@ -1,30 +1,30 @@ using System; using System.Collections; using System.Collections.Generic; -using System.Threading; namespace Nevermore.Advanced { internal class ThreadSafeEnumerable : IEnumerable { readonly Func> innerFunc; - readonly LockWithLoggingConcurrencyHandler _lockWithLoggingConcurrencyHandler; + readonly ITransactionConcurrencyHandler transactionConcurrencyHandler; - public ThreadSafeEnumerable(IEnumerable inner, LockWithLoggingConcurrencyHandler lockWithLoggingConcurrencyHandler) : this(() => inner, lockWithLoggingConcurrencyHandler) + public ThreadSafeEnumerable(IEnumerable inner, ITransactionConcurrencyHandler transactionConcurrencyHandler) + : this(() => inner, transactionConcurrencyHandler) { } - public ThreadSafeEnumerable(Func> innerFunc, LockWithLoggingConcurrencyHandler lockWithLoggingConcurrencyHandler) + public ThreadSafeEnumerable(Func> innerFunc, ITransactionConcurrencyHandler transactionConcurrencyHandler) { this.innerFunc = innerFunc; - this._lockWithLoggingConcurrencyHandler = lockWithLoggingConcurrencyHandler; + this.transactionConcurrencyHandler = transactionConcurrencyHandler; } public IEnumerator GetEnumerator() { - _lockWithLoggingConcurrencyHandler.Wait(); + var disposable = transactionConcurrencyHandler.Lock(); var inner = innerFunc(); - return new ThreadSafeEnumerator(inner.GetEnumerator(), () => _lockWithLoggingConcurrencyHandler.Release()); + return new ThreadSafeEnumerator(inner.GetEnumerator(), () => disposable.Dispose()); } IEnumerator IEnumerable.GetEnumerator() From 598df6fb13a3328a236e656ae89a9aa436d2244a Mon Sep 17 00:00:00 2001 From: Eoin Motherway <25342760+YuKitsune@users.noreply.github.com> Date: Tue, 2 Jul 2024 15:31:52 +1000 Subject: [PATCH 05/11] Moving things around --- .../ITransactionConcurrencyHandler.cs | 13 ++ .../Concurrency/LockOnlyConcurrencyHandler.cs | 27 ++++ .../LockWithLoggingConcurrencyHandler.cs | 55 +++++++ .../Concurrency/LogOnlyConcurrencyHandler.cs | 42 +++++ .../Concurrency/NoOpConcurrencyHandler.cs | 25 +++ .../LockWithLoggingConcurrencyHandler.cs | 150 ------------------ source/Nevermore/Advanced/ReadTransaction.cs | 1 + .../Advanced/ThreadSafeAsyncEnumerable.cs | 1 + .../Advanced/ThreadSafeEnumerable.cs | 1 + 9 files changed, 165 insertions(+), 150 deletions(-) create mode 100644 source/Nevermore/Advanced/Concurrency/ITransactionConcurrencyHandler.cs create mode 100644 source/Nevermore/Advanced/Concurrency/LockOnlyConcurrencyHandler.cs create mode 100644 source/Nevermore/Advanced/Concurrency/LockWithLoggingConcurrencyHandler.cs create mode 100644 source/Nevermore/Advanced/Concurrency/LogOnlyConcurrencyHandler.cs create mode 100644 source/Nevermore/Advanced/Concurrency/NoOpConcurrencyHandler.cs delete mode 100644 source/Nevermore/Advanced/LockWithLoggingConcurrencyHandler.cs diff --git a/source/Nevermore/Advanced/Concurrency/ITransactionConcurrencyHandler.cs b/source/Nevermore/Advanced/Concurrency/ITransactionConcurrencyHandler.cs new file mode 100644 index 0000000..747c105 --- /dev/null +++ b/source/Nevermore/Advanced/Concurrency/ITransactionConcurrencyHandler.cs @@ -0,0 +1,13 @@ +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace Nevermore.Advanced.Concurrency +{ + public interface ITransactionConcurrencyHandler : IDisposable + { + IDisposable Lock(); + + Task LockAsync(CancellationToken cancellationToken); + } +} \ No newline at end of file diff --git a/source/Nevermore/Advanced/Concurrency/LockOnlyConcurrencyHandler.cs b/source/Nevermore/Advanced/Concurrency/LockOnlyConcurrencyHandler.cs new file mode 100644 index 0000000..b22950d --- /dev/null +++ b/source/Nevermore/Advanced/Concurrency/LockOnlyConcurrencyHandler.cs @@ -0,0 +1,27 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Nito.AsyncEx; + +namespace Nevermore.Advanced.Concurrency +{ + public class LockOnlyConcurrencyHandler : ITransactionConcurrencyHandler + { + readonly SemaphoreSlim semaphore = new(1, 1); + + public IDisposable Lock() + { + return semaphore.Lock(); + } + + public async Task LockAsync(CancellationToken cancellationToken) + { + return await semaphore.LockAsync(cancellationToken).ConfigureAwait(false); + } + + public void Dispose() + { + semaphore.Dispose(); + } + } +} \ No newline at end of file diff --git a/source/Nevermore/Advanced/Concurrency/LockWithLoggingConcurrencyHandler.cs b/source/Nevermore/Advanced/Concurrency/LockWithLoggingConcurrencyHandler.cs new file mode 100644 index 0000000..5c6537d --- /dev/null +++ b/source/Nevermore/Advanced/Concurrency/LockWithLoggingConcurrencyHandler.cs @@ -0,0 +1,55 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Nevermore.Diagnositcs; +using Nito.AsyncEx; + +namespace Nevermore.Advanced.Concurrency +{ + /// + /// This class provides a best-effort deadlock detection mechanism. It will identify re-entrant calls from the same + /// task (if there is a task) or the same thread (if there is no task). While it does not _guarantee_ deadlock + /// detection, + /// it does provide a pretty good guarantee that _if_ a DeadlockException is thrown then there was almost certainly + /// going to be a deadlock. In other words: very few false positives; probably some false negatives; better than + /// nothing. + /// + public class LockWithLoggingConcurrencyHandler : ITransactionConcurrencyHandler + { + static readonly ILog Log = LogProvider.For(); + + readonly SemaphoreSlim semaphore = new(1, 1); + + public IDisposable Lock() + { + // `SemaphoreSlim` counts down, so if it's 0 then there's a concurrent execution happening. + if (semaphore.CurrentCount == 0) + { + Log.Warn("Concurrent query execution detected while waiting for lock"); + } + + return semaphore.Lock(); + } + + public async Task LockAsync(CancellationToken cancellationToken) + { + // `SemaphoreSlim` counts down, so if it's 0 then there's a concurrent execution happening. + if (semaphore.CurrentCount == 0) + { + Log.Warn("Concurrent query execution detected while waiting for lock"); + } + + return await semaphore.LockAsync(cancellationToken).ConfigureAwait(false); + } + + public void Release() + { + semaphore.Release(); + } + + public void Dispose() + { + semaphore.Dispose(); + } + } +} \ No newline at end of file diff --git a/source/Nevermore/Advanced/Concurrency/LogOnlyConcurrencyHandler.cs b/source/Nevermore/Advanced/Concurrency/LogOnlyConcurrencyHandler.cs new file mode 100644 index 0000000..c7242fc --- /dev/null +++ b/source/Nevermore/Advanced/Concurrency/LogOnlyConcurrencyHandler.cs @@ -0,0 +1,42 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Nevermore.Diagnositcs; +using Nito.Disposables; + +namespace Nevermore.Advanced.Concurrency +{ + public class LogOnlyConcurrencyHandler : ITransactionConcurrencyHandler + { + static readonly ILog Log = LogProvider.For(); + + readonly SemaphoreSlim semaphore = new(1, 1); + + public IDisposable Lock() + { + if (!semaphore.Wait(TimeSpan.Zero)) + { + Log.Warn("Concurrent query execution detected while waiting for lock"); + return NoopDisposable.Instance; + } + + return new Disposable(() => semaphore.Release()); + } + + public async Task LockAsync(CancellationToken cancellationToken) + { + if (!await semaphore.WaitAsync(TimeSpan.Zero, cancellationToken).ConfigureAwait(false)) + { + Log.Warn("Concurrent query execution detected while waiting for lock"); + return NoopDisposable.Instance; + } + + return new Disposable(() => semaphore.Release()); + } + + public void Dispose() + { + semaphore.Dispose(); + } + } +} \ No newline at end of file diff --git a/source/Nevermore/Advanced/Concurrency/NoOpConcurrencyHandler.cs b/source/Nevermore/Advanced/Concurrency/NoOpConcurrencyHandler.cs new file mode 100644 index 0000000..912c66f --- /dev/null +++ b/source/Nevermore/Advanced/Concurrency/NoOpConcurrencyHandler.cs @@ -0,0 +1,25 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Nito.Disposables; + +namespace Nevermore.Advanced.Concurrency +{ + public class NoOpConcurrencyHandler : ITransactionConcurrencyHandler + { + public IDisposable Lock() + { + return NoopDisposable.Instance; + } + + public async Task LockAsync(CancellationToken cancellationToken) + { + await Task.Yield(); + return NoopDisposable.Instance; + } + + public void Dispose() + { + } + } +} \ No newline at end of file diff --git a/source/Nevermore/Advanced/LockWithLoggingConcurrencyHandler.cs b/source/Nevermore/Advanced/LockWithLoggingConcurrencyHandler.cs deleted file mode 100644 index 4f0eb53..0000000 --- a/source/Nevermore/Advanced/LockWithLoggingConcurrencyHandler.cs +++ /dev/null @@ -1,150 +0,0 @@ -using System; -using System.Threading; -using System.Threading.Tasks; -using Nevermore.Diagnositcs; -using Nito.AsyncEx; -using Nito.Disposables; - -namespace Nevermore.Advanced -{ - public interface ITransactionConcurrencyHandler : IDisposable - { - IDisposable Lock(); - - Task LockAsync(CancellationToken cancellationToken); - } - - /// - /// This class provides a best-effort deadlock detection mechanism. It will identify re-entrant calls from the same - /// task (if there is a task) or the same thread (if there is no task). While it does not _guarantee_ deadlock - /// detection, - /// it does provide a pretty good guarantee that _if_ a DeadlockException is thrown then there was almost certainly - /// going to be a deadlock. In other words: very few false positives; probably some false negatives; better than - /// nothing. - /// - public class LockWithLoggingConcurrencyHandler : ITransactionConcurrencyHandler - { - static readonly ILog Log = LogProvider.For(); - - readonly SemaphoreSlim semaphore = new(1, 1); - - public IDisposable Lock() - { - // `SemaphoreSlim` counts down, so if it's 0 then there's a concurrent execution happening. - if (semaphore.CurrentCount == 0) - { - Log.Warn("Concurrent query execution detected while waiting for lock"); - } - - return semaphore.Lock(); - } - - public async Task LockAsync(CancellationToken cancellationToken) - { - // `SemaphoreSlim` counts down, so if it's 0 then there's a concurrent execution happening. - if (semaphore.CurrentCount == 0) - { - Log.Warn("Concurrent query execution detected while waiting for lock"); - } - - return await semaphore.LockAsync(cancellationToken).ConfigureAwait(false); - } - - public void Release() - { - semaphore.Release(); - } - - public void Dispose() - { - semaphore.Dispose(); - } - } - - public class LockOnlyConcurrencyHandler : ITransactionConcurrencyHandler - { - readonly SemaphoreSlim semaphore = new(1, 1); - - public IDisposable Lock() - { - return semaphore.Lock(); - } - - public async Task LockAsync(CancellationToken cancellationToken) - { - return await semaphore.LockAsync(cancellationToken).ConfigureAwait(false); - } - - public void Dispose() - { - semaphore.Dispose(); - } - } - - public class LogOnlyConcurrencyHandler : ITransactionConcurrencyHandler - { - static readonly ILog Log = LogProvider.For(); - - readonly SemaphoreSlim semaphore = new(1, 1); - - public IDisposable Lock() - { - if (!semaphore.Wait(TimeSpan.Zero)) - { - Log.Warn("Concurrent query execution detected while waiting for lock"); - return NoopDisposable.Instance; - } - - return new ConcurrencyDisposable(() => semaphore.Release()); - } - - public async Task LockAsync(CancellationToken cancellationToken) - { - if (!await semaphore.WaitAsync(TimeSpan.Zero, cancellationToken).ConfigureAwait(false)) - { - Log.Warn("Concurrent query execution detected while waiting for lock"); - return NoopDisposable.Instance; - } - - return new ConcurrencyDisposable(() => semaphore.Release()); - } - - public void Dispose() - { - semaphore.Dispose(); - } - } - - public class NoOpConcurrencyHandler : ITransactionConcurrencyHandler - { - public IDisposable Lock() - { - return NoopDisposable.Instance; - } - - public async Task LockAsync(CancellationToken cancellationToken) - { - await Task.Yield(); - return NoopDisposable.Instance; - } - - public void Dispose() - { - } - } - - public class ConcurrencyDisposable : IDisposable - { - readonly Action disposeAction; - - public ConcurrencyDisposable(Action disposeAction) - { - this.disposeAction = disposeAction; - } - - public void Dispose() - { - disposeAction(); - } - } -} \ No newline at end of file diff --git a/source/Nevermore/Advanced/ReadTransaction.cs b/source/Nevermore/Advanced/ReadTransaction.cs index 366420e..928e98c 100644 --- a/source/Nevermore/Advanced/ReadTransaction.cs +++ b/source/Nevermore/Advanced/ReadTransaction.cs @@ -12,6 +12,7 @@ using System.Threading; using System.Threading.Tasks; using Microsoft.Data.SqlClient; +using Nevermore.Advanced.Concurrency; using Nevermore.Advanced.Queryable; using Nevermore.Advanced.QueryBuilders; using Nevermore.Diagnositcs; diff --git a/source/Nevermore/Advanced/ThreadSafeAsyncEnumerable.cs b/source/Nevermore/Advanced/ThreadSafeAsyncEnumerable.cs index 484f7ed..c3c679c 100644 --- a/source/Nevermore/Advanced/ThreadSafeAsyncEnumerable.cs +++ b/source/Nevermore/Advanced/ThreadSafeAsyncEnumerable.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; +using Nevermore.Advanced.Concurrency; namespace Nevermore.Advanced { diff --git a/source/Nevermore/Advanced/ThreadSafeEnumerable.cs b/source/Nevermore/Advanced/ThreadSafeEnumerable.cs index 36e5968..ccf1fef 100644 --- a/source/Nevermore/Advanced/ThreadSafeEnumerable.cs +++ b/source/Nevermore/Advanced/ThreadSafeEnumerable.cs @@ -1,6 +1,7 @@ using System; using System.Collections; using System.Collections.Generic; +using Nevermore.Advanced.Concurrency; namespace Nevermore.Advanced { From 2e330d9389dddb0b7116d854946bc318a76df9dc Mon Sep 17 00:00:00 2001 From: Eoin Motherway <25342760+YuKitsune@users.noreply.github.com> Date: Tue, 2 Jul 2024 15:38:21 +1000 Subject: [PATCH 06/11] Docs --- source/Nevermore/ConcurrencyMode.cs | 15 +++++++++++++++ source/Nevermore/IRelationalStoreConfiguration.cs | 5 ++++- 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/source/Nevermore/ConcurrencyMode.cs b/source/Nevermore/ConcurrencyMode.cs index 6b10fc7..44555db 100644 --- a/source/Nevermore/ConcurrencyMode.cs +++ b/source/Nevermore/ConcurrencyMode.cs @@ -2,9 +2,24 @@ { public enum ConcurrencyMode { + /// + /// When queries are executed in parallel, no locking will be performed. + /// NoLock, + + /// + /// When queries are executed in parallel, locking will be performed preventing queries from running in parallel against the same transaction. + /// LockOnly, + + /// + /// When queries are executed in parallel, a log message will be written. + /// LogOnly, + + /// + /// When queries are executed in parallel, locking will be performed and a log message will be written. + /// LockWithLogging } } \ No newline at end of file diff --git a/source/Nevermore/IRelationalStoreConfiguration.cs b/source/Nevermore/IRelationalStoreConfiguration.cs index 46a1b49..794f695 100644 --- a/source/Nevermore/IRelationalStoreConfiguration.cs +++ b/source/Nevermore/IRelationalStoreConfiguration.cs @@ -89,7 +89,10 @@ public interface IRelationalStoreConfiguration /// ITableNameResolver TableNameResolver { get; set; } - // TODO: Doc + /// + /// Gets or sets how concurrent execution of queries is handled by Nevermore. When true. + /// The default is . + /// public ConcurrencyMode ConcurrencyMode { get; set; } } } \ No newline at end of file From 43c4b4c30d619e97be723cbb2d61e4191920deed Mon Sep 17 00:00:00 2001 From: Eoin Motherway <25342760+YuKitsune@users.noreply.github.com> Date: Tue, 2 Jul 2024 16:09:03 +1000 Subject: [PATCH 07/11] Clean up --- .../LockWithLoggingConcurrencyHandler.cs | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/source/Nevermore/Advanced/Concurrency/LockWithLoggingConcurrencyHandler.cs b/source/Nevermore/Advanced/Concurrency/LockWithLoggingConcurrencyHandler.cs index 5c6537d..0b327cb 100644 --- a/source/Nevermore/Advanced/Concurrency/LockWithLoggingConcurrencyHandler.cs +++ b/source/Nevermore/Advanced/Concurrency/LockWithLoggingConcurrencyHandler.cs @@ -6,14 +6,6 @@ namespace Nevermore.Advanced.Concurrency { - /// - /// This class provides a best-effort deadlock detection mechanism. It will identify re-entrant calls from the same - /// task (if there is a task) or the same thread (if there is no task). While it does not _guarantee_ deadlock - /// detection, - /// it does provide a pretty good guarantee that _if_ a DeadlockException is thrown then there was almost certainly - /// going to be a deadlock. In other words: very few false positives; probably some false negatives; better than - /// nothing. - /// public class LockWithLoggingConcurrencyHandler : ITransactionConcurrencyHandler { static readonly ILog Log = LogProvider.For(); @@ -42,11 +34,6 @@ public async Task LockAsync(CancellationToken cancellationToken) return await semaphore.LockAsync(cancellationToken).ConfigureAwait(false); } - public void Release() - { - semaphore.Release(); - } - public void Dispose() { semaphore.Dispose(); From 336e820e565c5d6b57b3e98f2cc6e0a77fef6fa0 Mon Sep 17 00:00:00 2001 From: Eoin Motherway <25342760+YuKitsune@users.noreply.github.com> Date: Wed, 3 Jul 2024 08:51:37 +1000 Subject: [PATCH 08/11] Feedback clean up --- .../Concurrency/NoOpConcurrencyHandler.cs | 5 ++- source/Nevermore/Advanced/ReadTransaction.cs | 31 ++++++------------- 2 files changed, 11 insertions(+), 25 deletions(-) diff --git a/source/Nevermore/Advanced/Concurrency/NoOpConcurrencyHandler.cs b/source/Nevermore/Advanced/Concurrency/NoOpConcurrencyHandler.cs index 912c66f..bb1a10b 100644 --- a/source/Nevermore/Advanced/Concurrency/NoOpConcurrencyHandler.cs +++ b/source/Nevermore/Advanced/Concurrency/NoOpConcurrencyHandler.cs @@ -12,10 +12,9 @@ public IDisposable Lock() return NoopDisposable.Instance; } - public async Task LockAsync(CancellationToken cancellationToken) + public Task LockAsync(CancellationToken cancellationToken) { - await Task.Yield(); - return NoopDisposable.Instance; + return Task.FromResult(NoopDisposable.Instance); } public void Dispose() diff --git a/source/Nevermore/Advanced/ReadTransaction.cs b/source/Nevermore/Advanced/ReadTransaction.cs index 928e98c..e52acaa 100644 --- a/source/Nevermore/Advanced/ReadTransaction.cs +++ b/source/Nevermore/Advanced/ReadTransaction.cs @@ -531,7 +531,7 @@ IEnumerable Execute() yield return item; } - // Use the thread safe variant if we know it's going to be used + // Use the thread safe enumerable if we know the concurrency handler enables thread safety return configuration.ConcurrencyMode is not ConcurrencyMode.NoLock ? new ThreadSafeEnumerable(Execute, TransactionConcurrencyHandler) : Execute(); @@ -549,7 +549,7 @@ async IAsyncEnumerable Execute() } } - // Use the thread safe variant if we know it's going to be used + // Use the thread safe enumerable if we know the concurrency handler enables thread safety return configuration.ConcurrencyMode is not ConcurrencyMode.NoLock ? new ThreadSafeAsyncEnumerable(Execute, TransactionConcurrencyHandler) : Execute(); @@ -632,6 +632,7 @@ async IAsyncEnumerable Execute() } } + // Use the thread safe enumerable if we know the concurrency handler enables thread safety return configuration.ConcurrencyMode is not ConcurrencyMode.NoLock ? new ThreadSafeAsyncEnumerable(Execute, TransactionConcurrencyHandler) : Execute(); @@ -663,18 +664,14 @@ public Task ExecuteNonQueryAsync(string query, CommandParameterValues? args public int ExecuteNonQuery(PreparedCommand preparedCommand) { - using var mutex = configuration.ConcurrencyMode is not ConcurrencyMode.NoLock - ? TransactionConcurrencyHandler.Lock() - : NoopDisposable.Instance; + using var mutex = TransactionConcurrencyHandler.Lock(); using var command = CreateCommand(preparedCommand); return command.ExecuteNonQuery(); } public async Task ExecuteNonQueryAsync(PreparedCommand preparedCommand, CancellationToken cancellationToken = default) { - using var mutex = configuration.ConcurrencyMode is not ConcurrencyMode.NoLock - ? await TransactionConcurrencyHandler.LockAsync(cancellationToken).ConfigureAwait(false) - : NoopDisposable.Instance; + using var mutex = await TransactionConcurrencyHandler.LockAsync(cancellationToken).ConfigureAwait(false); using var command = CreateCommand(preparedCommand); return await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false); } @@ -691,9 +688,7 @@ public Task ExecuteScalarAsync(string query, CommandParameterV public TResult ExecuteScalar(PreparedCommand preparedCommand) { - using var mutex = configuration.ConcurrencyMode is not ConcurrencyMode.NoLock - ? TransactionConcurrencyHandler.Lock() - : NoopDisposable.Instance; + using var mutex = TransactionConcurrencyHandler.Lock(); using var command = CreateCommand(preparedCommand); var result = command.ExecuteScalar(); if (result == DBNull.Value) @@ -703,9 +698,7 @@ public TResult ExecuteScalar(PreparedCommand preparedCommand) public async Task ExecuteScalarAsync(PreparedCommand preparedCommand, CancellationToken cancellationToken = default) { - using var mutex = configuration.ConcurrencyMode is not ConcurrencyMode.NoLock - ? await TransactionConcurrencyHandler.LockAsync(cancellationToken).ConfigureAwait(false) - : NoopDisposable.Instance; + using var mutex = await TransactionConcurrencyHandler.LockAsync(cancellationToken).ConfigureAwait(false); using var command = CreateCommand(preparedCommand); var result = await command.ExecuteScalarAsync(cancellationToken).ConfigureAwait(false); if (result == DBNull.Value) @@ -737,20 +730,14 @@ public async Task ExecuteReaderAsync(PreparedCommand preparedComma protected TResult[] ReadResults(PreparedCommand preparedCommand, Func mapper) { - using var mutex = configuration.ConcurrencyMode is not ConcurrencyMode.NoLock - ? TransactionConcurrencyHandler.Lock() - : NoopDisposable.Instance; - + using var mutex = TransactionConcurrencyHandler.Lock(); using var command = CreateCommand(preparedCommand); return command.ReadResults(mapper); } protected async Task ReadResultsAsync(PreparedCommand preparedCommand, Func> mapper, CancellationToken cancellationToken) { - using var mutex = configuration.ConcurrencyMode is not ConcurrencyMode.NoLock - ? await TransactionConcurrencyHandler.LockAsync(cancellationToken).ConfigureAwait(false) - : NoopDisposable.Instance; - + using var mutex = await TransactionConcurrencyHandler.LockAsync(cancellationToken).ConfigureAwait(false); using var command = CreateCommand(preparedCommand); return await command.ReadResultsAsync(mapper, cancellationToken).ConfigureAwait(false); } From 7e6d058f6b403f2eeed594b586b05f599ead03bc Mon Sep 17 00:00:00 2001 From: Eoin Motherway <25342760+YuKitsune@users.noreply.github.com> Date: Wed, 3 Jul 2024 09:04:59 +1000 Subject: [PATCH 09/11] Make implementations private --- .../Advanced/Concurrency/LockOnlyConcurrencyHandler.cs | 2 +- .../Advanced/Concurrency/LockWithLoggingConcurrencyHandler.cs | 2 +- .../Nevermore/Advanced/Concurrency/LogOnlyConcurrencyHandler.cs | 2 +- source/Nevermore/Advanced/Concurrency/NoOpConcurrencyHandler.cs | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/source/Nevermore/Advanced/Concurrency/LockOnlyConcurrencyHandler.cs b/source/Nevermore/Advanced/Concurrency/LockOnlyConcurrencyHandler.cs index b22950d..4ca32bd 100644 --- a/source/Nevermore/Advanced/Concurrency/LockOnlyConcurrencyHandler.cs +++ b/source/Nevermore/Advanced/Concurrency/LockOnlyConcurrencyHandler.cs @@ -5,7 +5,7 @@ namespace Nevermore.Advanced.Concurrency { - public class LockOnlyConcurrencyHandler : ITransactionConcurrencyHandler + class LockOnlyConcurrencyHandler : ITransactionConcurrencyHandler { readonly SemaphoreSlim semaphore = new(1, 1); diff --git a/source/Nevermore/Advanced/Concurrency/LockWithLoggingConcurrencyHandler.cs b/source/Nevermore/Advanced/Concurrency/LockWithLoggingConcurrencyHandler.cs index 0b327cb..071aa52 100644 --- a/source/Nevermore/Advanced/Concurrency/LockWithLoggingConcurrencyHandler.cs +++ b/source/Nevermore/Advanced/Concurrency/LockWithLoggingConcurrencyHandler.cs @@ -6,7 +6,7 @@ namespace Nevermore.Advanced.Concurrency { - public class LockWithLoggingConcurrencyHandler : ITransactionConcurrencyHandler + class LockWithLoggingConcurrencyHandler : ITransactionConcurrencyHandler { static readonly ILog Log = LogProvider.For(); diff --git a/source/Nevermore/Advanced/Concurrency/LogOnlyConcurrencyHandler.cs b/source/Nevermore/Advanced/Concurrency/LogOnlyConcurrencyHandler.cs index c7242fc..7939b9f 100644 --- a/source/Nevermore/Advanced/Concurrency/LogOnlyConcurrencyHandler.cs +++ b/source/Nevermore/Advanced/Concurrency/LogOnlyConcurrencyHandler.cs @@ -6,7 +6,7 @@ namespace Nevermore.Advanced.Concurrency { - public class LogOnlyConcurrencyHandler : ITransactionConcurrencyHandler + class LogOnlyConcurrencyHandler : ITransactionConcurrencyHandler { static readonly ILog Log = LogProvider.For(); diff --git a/source/Nevermore/Advanced/Concurrency/NoOpConcurrencyHandler.cs b/source/Nevermore/Advanced/Concurrency/NoOpConcurrencyHandler.cs index bb1a10b..75cf1d8 100644 --- a/source/Nevermore/Advanced/Concurrency/NoOpConcurrencyHandler.cs +++ b/source/Nevermore/Advanced/Concurrency/NoOpConcurrencyHandler.cs @@ -5,7 +5,7 @@ namespace Nevermore.Advanced.Concurrency { - public class NoOpConcurrencyHandler : ITransactionConcurrencyHandler + class NoOpConcurrencyHandler : ITransactionConcurrencyHandler { public IDisposable Lock() { From 9d7b661639bee944c960a81ebb0680c83fbdfe84 Mon Sep 17 00:00:00 2001 From: Eoin Motherway <25342760+YuKitsune@users.noreply.github.com> Date: Wed, 3 Jul 2024 10:13:02 +1000 Subject: [PATCH 10/11] Refactor thread-safe enumerables --- ... => AsyncEnumerableWithConcurrencyHandling.cs} | 6 +++--- ...le.cs => EnumerableWithConcurrencyHandling.cs} | 12 ++++++------ source/Nevermore/Advanced/ReadTransaction.cs | 15 +++------------ 3 files changed, 12 insertions(+), 21 deletions(-) rename source/Nevermore/Advanced/{ThreadSafeAsyncEnumerable.cs => AsyncEnumerableWithConcurrencyHandling.cs} (71%) rename source/Nevermore/Advanced/{ThreadSafeEnumerable.cs => EnumerableWithConcurrencyHandling.cs} (70%) diff --git a/source/Nevermore/Advanced/ThreadSafeAsyncEnumerable.cs b/source/Nevermore/Advanced/AsyncEnumerableWithConcurrencyHandling.cs similarity index 71% rename from source/Nevermore/Advanced/ThreadSafeAsyncEnumerable.cs rename to source/Nevermore/Advanced/AsyncEnumerableWithConcurrencyHandling.cs index c3c679c..115e490 100644 --- a/source/Nevermore/Advanced/ThreadSafeAsyncEnumerable.cs +++ b/source/Nevermore/Advanced/AsyncEnumerableWithConcurrencyHandling.cs @@ -6,17 +6,17 @@ namespace Nevermore.Advanced { - public class ThreadSafeAsyncEnumerable : IAsyncEnumerable + internal class AsyncEnumerableWithConcurrencyHandling : IAsyncEnumerable { readonly Func> innerFunc; readonly ITransactionConcurrencyHandler transactionConcurrencyHandler; - public ThreadSafeAsyncEnumerable(IAsyncEnumerable inner, ITransactionConcurrencyHandler transactionConcurrencyHandler) + public AsyncEnumerableWithConcurrencyHandling(IAsyncEnumerable inner, ITransactionConcurrencyHandler transactionConcurrencyHandler) : this(() => inner, transactionConcurrencyHandler) { } - public ThreadSafeAsyncEnumerable(Func> innerFunc, ITransactionConcurrencyHandler transactionConcurrencyHandler) + public AsyncEnumerableWithConcurrencyHandling(Func> innerFunc, ITransactionConcurrencyHandler transactionConcurrencyHandler) { this.innerFunc = innerFunc; this.transactionConcurrencyHandler = transactionConcurrencyHandler; diff --git a/source/Nevermore/Advanced/ThreadSafeEnumerable.cs b/source/Nevermore/Advanced/EnumerableWithConcurrencyHandling.cs similarity index 70% rename from source/Nevermore/Advanced/ThreadSafeEnumerable.cs rename to source/Nevermore/Advanced/EnumerableWithConcurrencyHandling.cs index ccf1fef..5930c41 100644 --- a/source/Nevermore/Advanced/ThreadSafeEnumerable.cs +++ b/source/Nevermore/Advanced/EnumerableWithConcurrencyHandling.cs @@ -5,17 +5,17 @@ namespace Nevermore.Advanced { - internal class ThreadSafeEnumerable : IEnumerable + internal class EnumerableWithConcurrencyHandling : IEnumerable { readonly Func> innerFunc; readonly ITransactionConcurrencyHandler transactionConcurrencyHandler; - public ThreadSafeEnumerable(IEnumerable inner, ITransactionConcurrencyHandler transactionConcurrencyHandler) + public EnumerableWithConcurrencyHandling(IEnumerable inner, ITransactionConcurrencyHandler transactionConcurrencyHandler) : this(() => inner, transactionConcurrencyHandler) { } - public ThreadSafeEnumerable(Func> innerFunc, ITransactionConcurrencyHandler transactionConcurrencyHandler) + public EnumerableWithConcurrencyHandling(Func> innerFunc, ITransactionConcurrencyHandler transactionConcurrencyHandler) { this.innerFunc = innerFunc; this.transactionConcurrencyHandler = transactionConcurrencyHandler; @@ -25,7 +25,7 @@ public IEnumerator GetEnumerator() { var disposable = transactionConcurrencyHandler.Lock(); var inner = innerFunc(); - return new ThreadSafeEnumerator(inner.GetEnumerator(), () => disposable.Dispose()); + return new EnumeratorWithConcurrencyHandling(inner.GetEnumerator(), () => disposable.Dispose()); } IEnumerator IEnumerable.GetEnumerator() @@ -33,12 +33,12 @@ IEnumerator IEnumerable.GetEnumerator() return GetEnumerator(); } - internal class ThreadSafeEnumerator : IEnumerator + internal class EnumeratorWithConcurrencyHandling : IEnumerator { readonly IEnumerator inner; readonly Action onDisposed; - public ThreadSafeEnumerator(IEnumerator inner, Action onDisposed) + public EnumeratorWithConcurrencyHandling(IEnumerator inner, Action onDisposed) { this.inner = inner; this.onDisposed = onDisposed; diff --git a/source/Nevermore/Advanced/ReadTransaction.cs b/source/Nevermore/Advanced/ReadTransaction.cs index e52acaa..d0abfa8 100644 --- a/source/Nevermore/Advanced/ReadTransaction.cs +++ b/source/Nevermore/Advanced/ReadTransaction.cs @@ -531,10 +531,7 @@ IEnumerable Execute() yield return item; } - // Use the thread safe enumerable if we know the concurrency handler enables thread safety - return configuration.ConcurrencyMode is not ConcurrencyMode.NoLock - ? new ThreadSafeEnumerable(Execute, TransactionConcurrencyHandler) - : Execute(); + return new EnumerableWithConcurrencyHandling(Execute, TransactionConcurrencyHandler); } public IAsyncEnumerable StreamAsync(PreparedCommand command, CancellationToken cancellationToken = default) @@ -549,10 +546,7 @@ async IAsyncEnumerable Execute() } } - // Use the thread safe enumerable if we know the concurrency handler enables thread safety - return configuration.ConcurrencyMode is not ConcurrencyMode.NoLock - ? new ThreadSafeAsyncEnumerable(Execute, TransactionConcurrencyHandler) - : Execute(); + return new AsyncEnumerableWithConcurrencyHandling(Execute, TransactionConcurrencyHandler); } IEnumerable ProcessReader(DbDataReader reader, PreparedCommand command) @@ -632,10 +626,7 @@ async IAsyncEnumerable Execute() } } - // Use the thread safe enumerable if we know the concurrency handler enables thread safety - return configuration.ConcurrencyMode is not ConcurrencyMode.NoLock - ? new ThreadSafeAsyncEnumerable(Execute, TransactionConcurrencyHandler) - : Execute(); + return new AsyncEnumerableWithConcurrencyHandling(Execute, TransactionConcurrencyHandler); } void AddCommandTrace(string commandText) From 596a0b0ccb1f7f85017192bcd2a61dc4462840b5 Mon Sep 17 00:00:00 2001 From: Eoin Motherway <25342760+YuKitsune@users.noreply.github.com> Date: Thu, 4 Jul 2024 11:25:53 +1000 Subject: [PATCH 11/11] Spike: Include the stacktrace in the log --- .../Advanced/Concurrency/LockWithLoggingConcurrencyHandler.cs | 4 ++-- .../Advanced/Concurrency/LogOnlyConcurrencyHandler.cs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/source/Nevermore/Advanced/Concurrency/LockWithLoggingConcurrencyHandler.cs b/source/Nevermore/Advanced/Concurrency/LockWithLoggingConcurrencyHandler.cs index 071aa52..3f49be0 100644 --- a/source/Nevermore/Advanced/Concurrency/LockWithLoggingConcurrencyHandler.cs +++ b/source/Nevermore/Advanced/Concurrency/LockWithLoggingConcurrencyHandler.cs @@ -17,7 +17,7 @@ public IDisposable Lock() // `SemaphoreSlim` counts down, so if it's 0 then there's a concurrent execution happening. if (semaphore.CurrentCount == 0) { - Log.Warn("Concurrent query execution detected while waiting for lock"); + Log.WarnFormat("Concurrent query execution detected. Stacktrace: {0}", Environment.StackTrace); } return semaphore.Lock(); @@ -28,7 +28,7 @@ public async Task LockAsync(CancellationToken cancellationToken) // `SemaphoreSlim` counts down, so if it's 0 then there's a concurrent execution happening. if (semaphore.CurrentCount == 0) { - Log.Warn("Concurrent query execution detected while waiting for lock"); + Log.WarnFormat("Concurrent query execution detected. Stacktrace: {0}", Environment.StackTrace); } return await semaphore.LockAsync(cancellationToken).ConfigureAwait(false); diff --git a/source/Nevermore/Advanced/Concurrency/LogOnlyConcurrencyHandler.cs b/source/Nevermore/Advanced/Concurrency/LogOnlyConcurrencyHandler.cs index 7939b9f..50ddb3d 100644 --- a/source/Nevermore/Advanced/Concurrency/LogOnlyConcurrencyHandler.cs +++ b/source/Nevermore/Advanced/Concurrency/LogOnlyConcurrencyHandler.cs @@ -16,7 +16,7 @@ public IDisposable Lock() { if (!semaphore.Wait(TimeSpan.Zero)) { - Log.Warn("Concurrent query execution detected while waiting for lock"); + Log.WarnFormat("Concurrent query execution detected. Stacktrace: {0}", Environment.StackTrace); return NoopDisposable.Instance; } @@ -27,7 +27,7 @@ public async Task LockAsync(CancellationToken cancellationToken) { if (!await semaphore.WaitAsync(TimeSpan.Zero, cancellationToken).ConfigureAwait(false)) { - Log.Warn("Concurrent query execution detected while waiting for lock"); + Log.WarnFormat("Concurrent query execution detected. Stacktrace: {0}", Environment.StackTrace); return NoopDisposable.Instance; }