diff --git a/source/Nevermore.IntegrationTests/Advanced/ConcurrentAccessFixture.cs b/source/Nevermore.IntegrationTests/Advanced/ConcurrentAccessFixture.cs index 65ecf87f..35e72587 100644 --- a/source/Nevermore.IntegrationTests/Advanced/ConcurrentAccessFixture.cs +++ b/source/Nevermore.IntegrationTests/Advanced/ConcurrentAccessFixture.cs @@ -16,10 +16,12 @@ public class ConcurrentAccessFixture : FixtureWithRelationalStore const int NumberOfDocuments = 256; const int DegreeOfParallelism = NumberOfDocuments; - [Test] - public void ConcurrentAccessDoesNotGoBoom() + [TestCase(ConcurrencyMode.LockOnly)] + [TestCase(ConcurrencyMode.LockWithLogging)] + public void ConcurrentAccessDoesNotGoBoom(ConcurrencyMode concurrencyMode) { NoMonkeyBusiness(); + Configuration.ConcurrencyMode = concurrencyMode; var namePrefix = $"{Guid.NewGuid()}-"; @@ -103,10 +105,12 @@ static void ThreadWaitAll(params Thread[] threads) } } - [Test] - public async Task AsyncConcurrentAccessDoesNotGoBoom() + [TestCase(ConcurrencyMode.LockOnly)] + [TestCase(ConcurrencyMode.LockWithLogging)] + public async Task AsyncConcurrentAccessDoesNotGoBoom(ConcurrencyMode concurrencyMode) { NoMonkeyBusiness(); + Configuration.ConcurrencyMode = concurrencyMode; var namePrefix = $"{Guid.NewGuid()}-"; @@ -162,5 +166,66 @@ await Task.WhenAll( .WhenAll(); } } + + [TestCase(ConcurrencyMode.NoLock)] + [TestCase(ConcurrencyMode.LogOnly)] + public void ConcurrentAccessGoesBoomWhenLockingIsDisabled(ConcurrencyMode concurrencyMode) + { + NoMonkeyBusiness(); + Configuration.ConcurrencyMode = concurrencyMode; + + var namePrefix = $"{Guid.NewGuid()}-"; + + // Create a bunch of documents so that we can query for them. + using (var transaction = Store.BeginTransaction()) + { + FluentActions.Invoking( + () => + { + // Only using 10 here because using NumberOfDocuments (256) is too slow + Enumerable.Range(0, 10) + .Select(i => new DocumentWithIdentityId { Name = $"{namePrefix}{i}" }) + .AsParallel() + .WithDegreeOfParallelism(DegreeOfParallelism) + .Select(document => + { + // ReSharper disable once AccessToDisposedClosure + transaction.Insert(document); + return 0; + }) + .ToArray(); + }) + .Should() + .Throw(); + } + } + + [TestCase(ConcurrencyMode.NoLock)] + [TestCase(ConcurrencyMode.LogOnly)] + public async Task AsyncConcurrentAccessGoesBoomWhenLockingIsDisabled(ConcurrencyMode concurrencyMode) + { + NoMonkeyBusiness(); + Configuration.ConcurrencyMode = concurrencyMode; + + var namePrefix = $"{Guid.NewGuid()}-"; + + // Create a bunch of documents so that we can query for them. + using (var transaction = await Store.BeginWriteTransactionAsync()) + { + await FluentActions.Awaiting( + async () => + { + await Enumerable.Range(0, NumberOfDocuments) + .Select(i => new DocumentWithIdentityId { Name = $"{namePrefix}{i}" }) + // ReSharper disable once AccessToDisposedClosure + .Select(document => transaction.InsertAsync(document)) + .WhenAll(); + await transaction.CommitAsync(); + }) + .Should() + .ThrowExactlyAsync() + .WithMessage("The connection does not support MultipleActiveResultSets."); + } + } } } \ No newline at end of file diff --git a/source/Nevermore.Tests/DeadlockAwareLockFixture.cs b/source/Nevermore.Tests/DeadlockAwareLockFixture.cs deleted file mode 100644 index 555bc0a4..00000000 --- a/source/Nevermore.Tests/DeadlockAwareLockFixture.cs +++ /dev/null @@ -1,164 +0,0 @@ -using System; -using System.Threading; -using System.Threading.Tasks; -using Nevermore.Advanced; -using Nito.AsyncEx; -using NUnit.Framework; - -namespace Nevermore.Tests -{ - public class DeadlockAwareLockFixture - { - CancellationToken cancellationToken; - CancellationTokenSource cts; - - [SetUp] - public void SetUp() - { - cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); - cancellationToken = cts.Token; - } - - [TearDown] - public void TearDown() - { - cts?.Dispose(); - } - - [Test] - public void MultipleCallsToWait_ShouldThrow() - { - using var deadlockAwareLock = new DeadlockAwareLock(); - - deadlockAwareLock.Wait(); - - // The second call should immediately throw rather than waiting forever. - Assert.Throws(() => deadlockAwareLock.Wait()); - } - - [Test] - public void MultipleCallsToWaitWithinATask_ShouldThrow() - { - Task.Run(() => - { - // ReSharper disable AccessToDisposedClosure - using var deadlockAwareLock = new DeadlockAwareLock(); - - deadlockAwareLock.Wait(); - - // The second call should immediately throw rather than waiting forever. - Assert.Throws(() => deadlockAwareLock.Wait()); - - // ReSharper restore AccessToDisposedClosure - }, cancellationToken) - .Wait(cancellationToken); - } - - [Test] - public void AcquiringThenReleasingThenAcquiring_ShouldNotThrow() - { - using var deadlockAwareLock = new DeadlockAwareLock(); - - deadlockAwareLock.Wait(); - deadlockAwareLock.Release(); - deadlockAwareLock.Wait(); - } - - [Test] - public void AcquiringThenReleasingThenAcquiringInATask_ShouldNotThrow() - { - Task.Run(() => - { - // ReSharper disable AccessToDisposedClosure - using var deadlockAwareLock = new DeadlockAwareLock(); - - deadlockAwareLock.Wait(); - deadlockAwareLock.Release(); - deadlockAwareLock.Wait(); - - // ReSharper restore AccessToDisposedClosure - }, cancellationToken) - .Wait(cancellationToken); - } - - [Test] - public async Task MultipleCallsToWaitAsync_ShouldThrow() - { - using var deadlockAwareLock = new DeadlockAwareLock(); - - await deadlockAwareLock.WaitAsync(cancellationToken); - - // The second call should immediately throw rather than waiting forever. - Assert.ThrowsAsync(async () => await deadlockAwareLock.WaitAsync(cancellationToken)); - } - - [Test] - public async Task AcquiringAsyncThenReleasingThenAcquiringAsync_ShouldNotThrow() - { - using var deadlockAwareLock = new DeadlockAwareLock(); - - await deadlockAwareLock.WaitAsync(cancellationToken); - deadlockAwareLock.Release(); - await deadlockAwareLock.WaitAsync(cancellationToken); - } - - [Test] - public async Task MultipleTasksContending_ShouldNotThrow() - { - // ReSharper disable AccessToDisposedClosure - using var deadlockAwareLock = new DeadlockAwareLock(); - - // Loop so that we increase the probability that two different tasks are scheduled onto - // the same worker thread. This helps us guarantee that we're not accidentally relying - // on thread IDs or thread locals anywhere. - for (var i = 0; i < 1000; i++) - { - var task0 = Task.Run(async () => - { - await deadlockAwareLock.WaitAsync(cancellationToken); - await Task.Yield(); - deadlockAwareLock.Release(); - }, cancellationToken); - - var task1 = Task.Run(async () => - { - await deadlockAwareLock.WaitAsync(cancellationToken); - await Task.Yield(); - deadlockAwareLock.Release(); - }, cancellationToken); - - await Task.WhenAll(task0, task1); - } - - // ReSharper restore AccessToDisposedClosure - } - - [Test] - public void UsingSyncExtensionMethods_AndReleasingLocksCorrectly_ShouldNotThrow() - { - using var deadlockAwareLock = new DeadlockAwareLock(); - - using (var _ = deadlockAwareLock.Lock()) - { - } - - using (var _ = deadlockAwareLock.Lock()) - { - } - } - - [Test] - public async Task UsingAsyncExtensionMethods_AndReleasingLocksCorrectly_ShouldNotThrow() - { - using var deadlockAwareLock = new DeadlockAwareLock(); - - using (var _ = await deadlockAwareLock.LockAsync(cancellationToken)) - { - } - - using (var _ = await deadlockAwareLock.LockAsync(cancellationToken)) - { - } - } - } -} \ No newline at end of file diff --git a/source/Nevermore/Advanced/AsyncEnumerableWithConcurrencyHandling.cs b/source/Nevermore/Advanced/AsyncEnumerableWithConcurrencyHandling.cs new file mode 100644 index 00000000..115e490a --- /dev/null +++ b/source/Nevermore/Advanced/AsyncEnumerableWithConcurrencyHandling.cs @@ -0,0 +1,32 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Nevermore.Advanced.Concurrency; + +namespace Nevermore.Advanced +{ + internal class AsyncEnumerableWithConcurrencyHandling : IAsyncEnumerable + { + readonly Func> innerFunc; + readonly ITransactionConcurrencyHandler transactionConcurrencyHandler; + + public AsyncEnumerableWithConcurrencyHandling(IAsyncEnumerable inner, ITransactionConcurrencyHandler transactionConcurrencyHandler) + : this(() => inner, transactionConcurrencyHandler) + { + } + + public AsyncEnumerableWithConcurrencyHandling(Func> innerFunc, ITransactionConcurrencyHandler transactionConcurrencyHandler) + { + this.innerFunc = innerFunc; + this.transactionConcurrencyHandler = transactionConcurrencyHandler; + } + + public async IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = new()) + { + using var mutex = await transactionConcurrencyHandler.LockAsync(cancellationToken).ConfigureAwait(false); + var inner = innerFunc(); + await foreach (var item in inner.WithCancellation(cancellationToken).ConfigureAwait(false)) yield return item; + } + } +} \ No newline at end of file diff --git a/source/Nevermore/Advanced/Concurrency/ITransactionConcurrencyHandler.cs b/source/Nevermore/Advanced/Concurrency/ITransactionConcurrencyHandler.cs new file mode 100644 index 00000000..747c1056 --- /dev/null +++ b/source/Nevermore/Advanced/Concurrency/ITransactionConcurrencyHandler.cs @@ -0,0 +1,13 @@ +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace Nevermore.Advanced.Concurrency +{ + public interface ITransactionConcurrencyHandler : IDisposable + { + IDisposable Lock(); + + Task LockAsync(CancellationToken cancellationToken); + } +} \ No newline at end of file diff --git a/source/Nevermore/Advanced/Concurrency/LockOnlyConcurrencyHandler.cs b/source/Nevermore/Advanced/Concurrency/LockOnlyConcurrencyHandler.cs new file mode 100644 index 00000000..4ca32bd5 --- /dev/null +++ b/source/Nevermore/Advanced/Concurrency/LockOnlyConcurrencyHandler.cs @@ -0,0 +1,27 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Nito.AsyncEx; + +namespace Nevermore.Advanced.Concurrency +{ + class LockOnlyConcurrencyHandler : ITransactionConcurrencyHandler + { + readonly SemaphoreSlim semaphore = new(1, 1); + + public IDisposable Lock() + { + return semaphore.Lock(); + } + + public async Task LockAsync(CancellationToken cancellationToken) + { + return await semaphore.LockAsync(cancellationToken).ConfigureAwait(false); + } + + public void Dispose() + { + semaphore.Dispose(); + } + } +} \ No newline at end of file diff --git a/source/Nevermore/Advanced/Concurrency/LockWithLoggingConcurrencyHandler.cs b/source/Nevermore/Advanced/Concurrency/LockWithLoggingConcurrencyHandler.cs new file mode 100644 index 00000000..3f49be00 --- /dev/null +++ b/source/Nevermore/Advanced/Concurrency/LockWithLoggingConcurrencyHandler.cs @@ -0,0 +1,42 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Nevermore.Diagnositcs; +using Nito.AsyncEx; + +namespace Nevermore.Advanced.Concurrency +{ + class LockWithLoggingConcurrencyHandler : ITransactionConcurrencyHandler + { + static readonly ILog Log = LogProvider.For(); + + readonly SemaphoreSlim semaphore = new(1, 1); + + public IDisposable Lock() + { + // `SemaphoreSlim` counts down, so if it's 0 then there's a concurrent execution happening. + if (semaphore.CurrentCount == 0) + { + Log.WarnFormat("Concurrent query execution detected. Stacktrace: {0}", Environment.StackTrace); + } + + return semaphore.Lock(); + } + + public async Task LockAsync(CancellationToken cancellationToken) + { + // `SemaphoreSlim` counts down, so if it's 0 then there's a concurrent execution happening. + if (semaphore.CurrentCount == 0) + { + Log.WarnFormat("Concurrent query execution detected. Stacktrace: {0}", Environment.StackTrace); + } + + return await semaphore.LockAsync(cancellationToken).ConfigureAwait(false); + } + + public void Dispose() + { + semaphore.Dispose(); + } + } +} \ No newline at end of file diff --git a/source/Nevermore/Advanced/Concurrency/LogOnlyConcurrencyHandler.cs b/source/Nevermore/Advanced/Concurrency/LogOnlyConcurrencyHandler.cs new file mode 100644 index 00000000..50ddb3da --- /dev/null +++ b/source/Nevermore/Advanced/Concurrency/LogOnlyConcurrencyHandler.cs @@ -0,0 +1,42 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Nevermore.Diagnositcs; +using Nito.Disposables; + +namespace Nevermore.Advanced.Concurrency +{ + class LogOnlyConcurrencyHandler : ITransactionConcurrencyHandler + { + static readonly ILog Log = LogProvider.For(); + + readonly SemaphoreSlim semaphore = new(1, 1); + + public IDisposable Lock() + { + if (!semaphore.Wait(TimeSpan.Zero)) + { + Log.WarnFormat("Concurrent query execution detected. Stacktrace: {0}", Environment.StackTrace); + return NoopDisposable.Instance; + } + + return new Disposable(() => semaphore.Release()); + } + + public async Task LockAsync(CancellationToken cancellationToken) + { + if (!await semaphore.WaitAsync(TimeSpan.Zero, cancellationToken).ConfigureAwait(false)) + { + Log.WarnFormat("Concurrent query execution detected. Stacktrace: {0}", Environment.StackTrace); + return NoopDisposable.Instance; + } + + return new Disposable(() => semaphore.Release()); + } + + public void Dispose() + { + semaphore.Dispose(); + } + } +} \ No newline at end of file diff --git a/source/Nevermore/Advanced/Concurrency/NoOpConcurrencyHandler.cs b/source/Nevermore/Advanced/Concurrency/NoOpConcurrencyHandler.cs new file mode 100644 index 00000000..75cf1d8d --- /dev/null +++ b/source/Nevermore/Advanced/Concurrency/NoOpConcurrencyHandler.cs @@ -0,0 +1,24 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Nito.Disposables; + +namespace Nevermore.Advanced.Concurrency +{ + class NoOpConcurrencyHandler : ITransactionConcurrencyHandler + { + public IDisposable Lock() + { + return NoopDisposable.Instance; + } + + public Task LockAsync(CancellationToken cancellationToken) + { + return Task.FromResult(NoopDisposable.Instance); + } + + public void Dispose() + { + } + } +} \ No newline at end of file diff --git a/source/Nevermore/Advanced/DeadlockAwareLock.cs b/source/Nevermore/Advanced/DeadlockAwareLock.cs deleted file mode 100644 index 2d863738..00000000 --- a/source/Nevermore/Advanced/DeadlockAwareLock.cs +++ /dev/null @@ -1,75 +0,0 @@ -using System.Runtime.CompilerServices; -using System.Threading; -using System.Threading.Tasks; - -namespace Nevermore.Advanced -{ - /// - /// This class provides a best-effort deadlock detection mechanism. It will identify re-entrant calls from the same - /// task (if there is a task) or the same thread (if there is no task). While it does not _guarantee_ deadlock - /// detection, - /// it does provide a pretty good guarantee that _if_ a DeadlockException is thrown then there was almost certainly - /// going to be a deadlock. In other words: very few false positives; probably some false negatives; better than - /// nothing. - /// - public class DeadlockAwareLock : SemaphoreSlim - { - int? taskWhichHasAcquiredLock; - int? threadWhichHasAcquiredLock; - - public DeadlockAwareLock() : base(1, 1) - { - } - - public new void Wait() - { - AssertNoDeadlock(); - base.Wait(); - RecordLockAcquisition(); - } - - public new async Task WaitAsync(CancellationToken cancellationToken) - { - AssertNoDeadlock(); - await base.WaitAsync(cancellationToken).ConfigureAwait(false); - RecordLockAcquisition(); - } - - public new void Release() - { - threadWhichHasAcquiredLock = null; - taskWhichHasAcquiredLock = null; - base.Release(); - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - void AssertNoDeadlock() - { - if (taskWhichHasAcquiredLock is not null) - { - // If we have a task then we can rely on the task ID. It's not guaranteed (one task can still spawn another) but it's better than nothing. - // If it's a different task which has acquired the lock then there's at least _some_ hope that that task might complete without requiring - // this task to complete. If this task has the lock and is attempting to acquire it again then there's no way out - deadlock. - if (taskWhichHasAcquiredLock == Task.CurrentId) - throw new DeadlockException("This task context has already acquired this lock and has attempted to do so again."); - } - else - { - // If we have no task then our best guess is that we're using sync-only code, which means that the thread ID _should_ be - // a good indicator of the call context. - if (threadWhichHasAcquiredLock is not null) - // If this thread has already acquired a lock and it's trying to do so again then - // it's very unlikely that the first lock will be released, hence deadlock. - if (threadWhichHasAcquiredLock == Thread.CurrentThread.ManagedThreadId) - throw new DeadlockException("This thread has already acquired this lock and has attempted to do so again."); - } - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - void RecordLockAcquisition() - { - threadWhichHasAcquiredLock = Thread.CurrentThread.ManagedThreadId; - taskWhichHasAcquiredLock = Task.CurrentId; - } - } -} \ No newline at end of file diff --git a/source/Nevermore/Advanced/ThreadSafeEnumerable.cs b/source/Nevermore/Advanced/EnumerableWithConcurrencyHandling.cs similarity index 54% rename from source/Nevermore/Advanced/ThreadSafeEnumerable.cs rename to source/Nevermore/Advanced/EnumerableWithConcurrencyHandling.cs index dfe9dc3c..5930c416 100644 --- a/source/Nevermore/Advanced/ThreadSafeEnumerable.cs +++ b/source/Nevermore/Advanced/EnumerableWithConcurrencyHandling.cs @@ -1,30 +1,31 @@ using System; using System.Collections; using System.Collections.Generic; -using System.Threading; +using Nevermore.Advanced.Concurrency; namespace Nevermore.Advanced { - internal class ThreadSafeEnumerable : IEnumerable + internal class EnumerableWithConcurrencyHandling : IEnumerable { readonly Func> innerFunc; - readonly DeadlockAwareLock deadlockAwareLock; + readonly ITransactionConcurrencyHandler transactionConcurrencyHandler; - public ThreadSafeEnumerable(IEnumerable inner, DeadlockAwareLock deadlockAwareLock) : this(() => inner, deadlockAwareLock) + public EnumerableWithConcurrencyHandling(IEnumerable inner, ITransactionConcurrencyHandler transactionConcurrencyHandler) + : this(() => inner, transactionConcurrencyHandler) { } - public ThreadSafeEnumerable(Func> innerFunc, DeadlockAwareLock deadlockAwareLock) + public EnumerableWithConcurrencyHandling(Func> innerFunc, ITransactionConcurrencyHandler transactionConcurrencyHandler) { this.innerFunc = innerFunc; - this.deadlockAwareLock = deadlockAwareLock; + this.transactionConcurrencyHandler = transactionConcurrencyHandler; } public IEnumerator GetEnumerator() { - deadlockAwareLock.Wait(); + var disposable = transactionConcurrencyHandler.Lock(); var inner = innerFunc(); - return new ThreadSafeEnumerator(inner.GetEnumerator(), () => deadlockAwareLock.Release()); + return new EnumeratorWithConcurrencyHandling(inner.GetEnumerator(), () => disposable.Dispose()); } IEnumerator IEnumerable.GetEnumerator() @@ -32,12 +33,12 @@ IEnumerator IEnumerable.GetEnumerator() return GetEnumerator(); } - internal class ThreadSafeEnumerator : IEnumerator + internal class EnumeratorWithConcurrencyHandling : IEnumerator { readonly IEnumerator inner; readonly Action onDisposed; - public ThreadSafeEnumerator(IEnumerator inner, Action onDisposed) + public EnumeratorWithConcurrencyHandling(IEnumerator inner, Action onDisposed) { this.inner = inner; this.onDisposed = onDisposed; diff --git a/source/Nevermore/Advanced/ReadTransaction.cs b/source/Nevermore/Advanced/ReadTransaction.cs index 41831828..d0abfa86 100644 --- a/source/Nevermore/Advanced/ReadTransaction.cs +++ b/source/Nevermore/Advanced/ReadTransaction.cs @@ -12,6 +12,7 @@ using System.Threading; using System.Threading.Tasks; using Microsoft.Data.SqlClient; +using Nevermore.Advanced.Concurrency; using Nevermore.Advanced.Queryable; using Nevermore.Advanced.QueryBuilders; using Nevermore.Diagnositcs; @@ -21,6 +22,7 @@ using Nevermore.Transient; using Nito.AsyncEx; using Nito.Disposables; +using NotSupportedException = System.NotSupportedException; namespace Nevermore.Advanced { @@ -44,7 +46,7 @@ public class ReadTransaction : IReadTransaction, ITransactionDiagnostic DbConnection? connection; protected IUniqueParameterNameGenerator ParameterNameGenerator { get; } = new UniqueParameterNameGenerator(); - protected DeadlockAwareLock DeadlockAwareLock { get; } = new(); + protected ITransactionConcurrencyHandler TransactionConcurrencyHandler { get; } // To help track deadlocks readonly List commandTrace; @@ -114,6 +116,14 @@ internal ReadTransaction( registry.Add(this); columnNameResolver = configuration.TableColumnNameResolver(store); + TransactionConcurrencyHandler = configuration.ConcurrencyMode switch + { + ConcurrencyMode.NoLock => new NoOpConcurrencyHandler(), + ConcurrencyMode.LockOnly => new LockOnlyConcurrencyHandler(), + ConcurrencyMode.LogOnly => new LogOnlyConcurrencyHandler(), + ConcurrencyMode.LockWithLogging => new LockWithLoggingConcurrencyHandler(), + _ => throw new NotSupportedException($"Concurrency mode {this.configuration.ConcurrencyMode} not supported") + }; } protected DbTransaction? Transaction { get; private set; } @@ -521,9 +531,7 @@ IEnumerable Execute() yield return item; } - return configuration.SupportConcurrentExecution - ? new ThreadSafeEnumerable(Execute, DeadlockAwareLock) - : Execute(); + return new EnumerableWithConcurrencyHandling(Execute, TransactionConcurrencyHandler); } public IAsyncEnumerable StreamAsync(PreparedCommand command, CancellationToken cancellationToken = default) @@ -538,9 +546,7 @@ async IAsyncEnumerable Execute() } } - return configuration.SupportConcurrentExecution - ? new ThreadSafeAsyncEnumerable(Execute, DeadlockAwareLock) - : Execute(); + return new AsyncEnumerableWithConcurrencyHandling(Execute, TransactionConcurrencyHandler); } IEnumerable ProcessReader(DbDataReader reader, PreparedCommand command) @@ -620,9 +626,7 @@ async IAsyncEnumerable Execute() } } - return configuration.SupportConcurrentExecution - ? new ThreadSafeAsyncEnumerable(Execute, DeadlockAwareLock) - : Execute(); + return new AsyncEnumerableWithConcurrencyHandling(Execute, TransactionConcurrencyHandler); } void AddCommandTrace(string commandText) @@ -651,18 +655,14 @@ public Task ExecuteNonQueryAsync(string query, CommandParameterValues? args public int ExecuteNonQuery(PreparedCommand preparedCommand) { - using var mutex = configuration.SupportConcurrentExecution - ? DeadlockAwareLock.Lock() - : NoopDisposable.Instance; + using var mutex = TransactionConcurrencyHandler.Lock(); using var command = CreateCommand(preparedCommand); return command.ExecuteNonQuery(); } public async Task ExecuteNonQueryAsync(PreparedCommand preparedCommand, CancellationToken cancellationToken = default) { - using var mutex = configuration.SupportConcurrentExecution - ? await DeadlockAwareLock.LockAsync(cancellationToken).ConfigureAwait(false) - : NoopDisposable.Instance; + using var mutex = await TransactionConcurrencyHandler.LockAsync(cancellationToken).ConfigureAwait(false); using var command = CreateCommand(preparedCommand); return await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false); } @@ -679,9 +679,7 @@ public Task ExecuteScalarAsync(string query, CommandParameterV public TResult ExecuteScalar(PreparedCommand preparedCommand) { - using var mutex = configuration.SupportConcurrentExecution - ? DeadlockAwareLock.Lock() - : NoopDisposable.Instance; + using var mutex = TransactionConcurrencyHandler.Lock(); using var command = CreateCommand(preparedCommand); var result = command.ExecuteScalar(); if (result == DBNull.Value) @@ -691,9 +689,7 @@ public TResult ExecuteScalar(PreparedCommand preparedCommand) public async Task ExecuteScalarAsync(PreparedCommand preparedCommand, CancellationToken cancellationToken = default) { - using var mutex = configuration.SupportConcurrentExecution - ? await DeadlockAwareLock.LockAsync(cancellationToken).ConfigureAwait(false) - : NoopDisposable.Instance; + using var mutex = await TransactionConcurrencyHandler.LockAsync(cancellationToken).ConfigureAwait(false); using var command = CreateCommand(preparedCommand); var result = await command.ExecuteScalarAsync(cancellationToken).ConfigureAwait(false); if (result == DBNull.Value) @@ -725,20 +721,14 @@ public async Task ExecuteReaderAsync(PreparedCommand preparedComma protected TResult[] ReadResults(PreparedCommand preparedCommand, Func mapper) { - using var mutex = configuration.SupportConcurrentExecution - ? DeadlockAwareLock.Lock() - : NoopDisposable.Instance; - + using var mutex = TransactionConcurrencyHandler.Lock(); using var command = CreateCommand(preparedCommand); return command.ReadResults(mapper); } protected async Task ReadResultsAsync(PreparedCommand preparedCommand, Func> mapper, CancellationToken cancellationToken) { - using var mutex = configuration.SupportConcurrentExecution - ? await DeadlockAwareLock.LockAsync(cancellationToken).ConfigureAwait(false) - : NoopDisposable.Instance; - + using var mutex = await TransactionConcurrencyHandler.LockAsync(cancellationToken).ConfigureAwait(false); using var command = CreateCommand(preparedCommand); return await command.ReadResultsAsync(mapper, cancellationToken).ConfigureAwait(false); } @@ -914,7 +904,7 @@ public void Dispose() } TryDispose(TransactionTimer); - TryDispose(DeadlockAwareLock); + TryDispose(TransactionConcurrencyHandler); if (OwnsSqlTransaction) { diff --git a/source/Nevermore/Advanced/ThreadSafeAsyncEnumerable.cs b/source/Nevermore/Advanced/ThreadSafeAsyncEnumerable.cs deleted file mode 100644 index 9bb73a34..00000000 --- a/source/Nevermore/Advanced/ThreadSafeAsyncEnumerable.cs +++ /dev/null @@ -1,31 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; -using Nito.AsyncEx; - -namespace Nevermore.Advanced -{ - public class ThreadSafeAsyncEnumerable : IAsyncEnumerable - { - readonly Func> innerFunc; - readonly DeadlockAwareLock deadlockAwareLock; - - public ThreadSafeAsyncEnumerable(IAsyncEnumerable inner, DeadlockAwareLock deadlockAwareLock) : this(() => inner, deadlockAwareLock) - { - } - - public ThreadSafeAsyncEnumerable(Func> innerFunc, DeadlockAwareLock deadlockAwareLock) - { - this.innerFunc = innerFunc; - this.deadlockAwareLock = deadlockAwareLock; - } - - public async IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = new()) - { - using var mutex = await deadlockAwareLock.LockAsync(cancellationToken).ConfigureAwait(false); - var inner = innerFunc(); - await foreach (var item in inner.WithCancellation(cancellationToken).ConfigureAwait(false)) yield return item; - } - } -} \ No newline at end of file diff --git a/source/Nevermore/ConcurrencyMode.cs b/source/Nevermore/ConcurrencyMode.cs new file mode 100644 index 00000000..44555db6 --- /dev/null +++ b/source/Nevermore/ConcurrencyMode.cs @@ -0,0 +1,25 @@ +namespace Nevermore +{ + public enum ConcurrencyMode + { + /// + /// When queries are executed in parallel, no locking will be performed. + /// + NoLock, + + /// + /// When queries are executed in parallel, locking will be performed preventing queries from running in parallel against the same transaction. + /// + LockOnly, + + /// + /// When queries are executed in parallel, a log message will be written. + /// + LogOnly, + + /// + /// When queries are executed in parallel, locking will be performed and a log message will be written. + /// + LockWithLogging + } +} \ No newline at end of file diff --git a/source/Nevermore/IRelationalStoreConfiguration.cs b/source/Nevermore/IRelationalStoreConfiguration.cs index 9832c84b..794f6958 100644 --- a/source/Nevermore/IRelationalStoreConfiguration.cs +++ b/source/Nevermore/IRelationalStoreConfiguration.cs @@ -88,13 +88,11 @@ public interface IRelationalStoreConfiguration /// Used to get the table name for a document type. By default, the table name is retrieved from the document map. /// ITableNameResolver TableNameResolver { get; set; } - + /// - /// Gets or sets whether concurrent execution of queries is handled by Nevermore. When true, Nevermore will attempt - /// to sequence queries issued concurrently. - /// - /// The default is true. + /// Gets or sets how concurrent execution of queries is handled by Nevermore. When true. + /// The default is . /// - bool SupportConcurrentExecution { get; set; } + public ConcurrencyMode ConcurrencyMode { get; set; } } } \ No newline at end of file diff --git a/source/Nevermore/RelationalStoreConfiguration.cs b/source/Nevermore/RelationalStoreConfiguration.cs index 2b7c3b5e..b2772fa9 100644 --- a/source/Nevermore/RelationalStoreConfiguration.cs +++ b/source/Nevermore/RelationalStoreConfiguration.cs @@ -57,7 +57,7 @@ public RelationalStoreConfiguration(Func connectionStringFunc, IPrimaryK TableColumnNameResolver = _ => new SelectAllColumnsTableResolver(); AllowSynchronousOperations = true; - SupportConcurrentExecution = true; + ConcurrencyMode = ConcurrencyMode.LockOnly; QueryLogger = new DefaultQueryLogger(); TransactionLogger = new DefaultTransactionLogger(); @@ -119,8 +119,8 @@ public RelationalStoreConfiguration(Func connectionStringFunc, IPrimaryK public Func KeyAllocatorFactory { get; set; } public ITableNameResolver TableNameResolver { get; set; } - - public bool SupportConcurrentExecution { get; set; } + + public ConcurrencyMode ConcurrencyMode { get; set; } string InitializeConnectionString(string sqlConnectionString) {