diff --git a/source/Nevermore.IntegrationTests/Advanced/ConcurrentAccessFixture.cs b/source/Nevermore.IntegrationTests/Advanced/ConcurrentAccessFixture.cs index 65ecf87f..af392f1c 100644 --- a/source/Nevermore.IntegrationTests/Advanced/ConcurrentAccessFixture.cs +++ b/source/Nevermore.IntegrationTests/Advanced/ConcurrentAccessFixture.cs @@ -3,6 +3,7 @@ using System.Threading; using System.Threading.Tasks; using FluentAssertions; +using Nevermore.Advanced; using Nevermore.Advanced.Queryable; using Nevermore.IntegrationTests.Model; using Nevermore.IntegrationTests.SetUp; @@ -16,6 +17,22 @@ public class ConcurrentAccessFixture : FixtureWithRelationalStore const int NumberOfDocuments = 256; const int DegreeOfParallelism = NumberOfDocuments; + [Test] + public async Task DeadlocksDoGoBoom() + { + NoMonkeyBusiness(); + + await FluentActions.Awaiting(async () => + { + using var transaction = await Store.BeginWriteTransactionAsync(); + + var t1 = transaction.ExecuteNonQueryAsync("WAITFOR DELAY '00:00:02'"); + var t2 = transaction.ExecuteNonQueryAsync("WAITFOR DELAY '00:00:02'"); + + await Task.WhenAll(t1, t2); + }).Should().ThrowAsync(); + } + [Test] public void ConcurrentAccessDoesNotGoBoom() { @@ -45,7 +62,7 @@ public void ConcurrentAccessDoesNotGoBoom() { ThreadWaitAll( Enumerable.Range(0, DegreeOfParallelism) - .Select(i => + .Select(_ => { // ReSharper disable AccessToDisposedClosure // ReSharper disable ReturnValueOfPureMethodIsNotUsed @@ -113,11 +130,11 @@ public async Task AsyncConcurrentAccessDoesNotGoBoom() // Create a bunch of documents so that we can query for them. using (var transaction = await Store.BeginWriteTransactionAsync()) { - await Enumerable.Range(0, NumberOfDocuments) - .Select(i => new DocumentWithIdentityId {Name = $"{namePrefix}{i}"}) - // ReSharper disable once AccessToDisposedClosure - .Select(document => transaction.InsertAsync(document)) - .WhenAll(); + var documents = Enumerable.Range(0, NumberOfDocuments) + .Select(i => new DocumentWithIdentityId { Name = $"{namePrefix}{i}" }) + .ToArray(); + + await transaction.InsertManyAsync(documents); await transaction.CommitAsync(); } @@ -125,10 +142,9 @@ await Enumerable.Range(0, NumberOfDocuments) using (var transaction = await Store.BeginWriteTransactionAsync()) { await Enumerable.Range(0, DegreeOfParallelism) - .Select(async i => + .Select(_ => Task.Run(async () => { // ReSharper disable AccessToDisposedClosure - // ReSharper disable ReturnValueOfPureMethodIsNotUsed var documents = await transaction.Query() .Where(x => x.Name.StartsWith(namePrefix)) .ToListAsync(); @@ -158,7 +174,7 @@ await Task.WhenAll( // ReSharper restore ReturnValueOfPureMethodIsNotUsed // ReSharper restore AccessToDisposedClosure - }) + })) .WhenAll(); } } diff --git a/source/Nevermore/Advanced/ReadTransaction.cs b/source/Nevermore/Advanced/ReadTransaction.cs index 7508fbcb..83172b3f 100644 --- a/source/Nevermore/Advanced/ReadTransaction.cs +++ b/source/Nevermore/Advanced/ReadTransaction.cs @@ -18,7 +18,6 @@ using Nevermore.Querying.AST; using Nevermore.TableColumnNameResolvers; using Nevermore.Transient; -using Nito.AsyncEx; namespace Nevermore.Advanced { @@ -572,16 +571,30 @@ public Task ExecuteNonQueryAsync(string query, CommandParameterValues? args public int ExecuteNonQuery(PreparedCommand preparedCommand) { - using var mutex = DeadlockAwareLock.Lock(); - using var command = CreateCommand(preparedCommand); - return command.ExecuteNonQuery(); + DeadlockAwareLock.Wait(); + try + { + using var command = CreateCommand(preparedCommand); + return command.ExecuteNonQuery(); + } + finally + { + DeadlockAwareLock.Release(); + } } public async Task ExecuteNonQueryAsync(PreparedCommand preparedCommand, CancellationToken cancellationToken = default) { - using var mutex = await DeadlockAwareLock.LockAsync(cancellationToken); - using var command = CreateCommand(preparedCommand); - return await command.ExecuteNonQueryAsync(cancellationToken); + await DeadlockAwareLock.WaitAsync(cancellationToken); + try + { + using var command = CreateCommand(preparedCommand); + return await command.ExecuteNonQueryAsync(cancellationToken); + } + finally + { + DeadlockAwareLock.Release(); + } } public TResult ExecuteScalar(string query, CommandParameterValues? args = null, RetriableOperation retriableOperation = RetriableOperation.Select, TimeSpan? commandTimeout = null) @@ -596,22 +609,36 @@ public Task ExecuteScalarAsync(string query, CommandParameterV public TResult ExecuteScalar(PreparedCommand preparedCommand) { - using var mutex = DeadlockAwareLock.Lock(); - using var command = CreateCommand(preparedCommand); - var result = command.ExecuteScalar(); - if (result == DBNull.Value) - return default!; - return (TResult)result; + DeadlockAwareLock.Wait(); + try + { + using var command = CreateCommand(preparedCommand); + var result = command.ExecuteScalar(); + if (result == DBNull.Value) + return default!; + return (TResult)result; + } + finally + { + DeadlockAwareLock.Release(); + } } public async Task ExecuteScalarAsync(PreparedCommand preparedCommand, CancellationToken cancellationToken = default) { - using var mutex = await DeadlockAwareLock.LockAsync(cancellationToken); - using var command = CreateCommand(preparedCommand); - var result = await command.ExecuteScalarAsync(cancellationToken); - if (result == DBNull.Value) - return default!; - return (TResult)result; + await DeadlockAwareLock.WaitAsync(cancellationToken); + try + { + using var command = CreateCommand(preparedCommand); + var result = await command.ExecuteScalarAsync(cancellationToken); + if (result == DBNull.Value) + return default!; + return (TResult)result; + } + finally + { + DeadlockAwareLock.Release(); + } } public DbDataReader ExecuteReader(string query, CommandParameterValues? args = null, TimeSpan? commandTimeout = null) @@ -638,18 +665,30 @@ public async Task ExecuteReaderAsync(PreparedCommand preparedComma protected TResult[] ReadResults(PreparedCommand preparedCommand, Func mapper) { - using var mutex = DeadlockAwareLock.Lock(); - - using var command = CreateCommand(preparedCommand); - return command.ReadResults(mapper); + DeadlockAwareLock.Wait(); + try + { + using var command = CreateCommand(preparedCommand); + return command.ReadResults(mapper); + } + finally + { + DeadlockAwareLock.Release(); + } } protected async Task ReadResultsAsync(PreparedCommand preparedCommand, Func> mapper, CancellationToken cancellationToken) { - using var mutex = await DeadlockAwareLock.LockAsync(cancellationToken); - - using var command = CreateCommand(preparedCommand); - return await command.ReadResultsAsync(mapper, cancellationToken); + await DeadlockAwareLock.WaitAsync(cancellationToken); + try + { + using var command = CreateCommand(preparedCommand); + return await command.ReadResultsAsync(mapper, cancellationToken); + } + finally + { + DeadlockAwareLock.Release(); + } } PreparedCommand PrepareLoad(TKey id) diff --git a/source/Nevermore/Advanced/ThreadSafeAsyncEnumerable.cs b/source/Nevermore/Advanced/ThreadSafeAsyncEnumerable.cs index 04840e21..9feaeca7 100644 --- a/source/Nevermore/Advanced/ThreadSafeAsyncEnumerable.cs +++ b/source/Nevermore/Advanced/ThreadSafeAsyncEnumerable.cs @@ -2,7 +2,6 @@ using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; -using Nito.AsyncEx; namespace Nevermore.Advanced { @@ -23,9 +22,16 @@ public ThreadSafeAsyncEnumerable(Func> innerFunc, DeadlockAw public async IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = new()) { - using var mutex = await deadlockAwareLock.LockAsync(cancellationToken); - var inner = innerFunc(); - await foreach (var item in inner.WithCancellation(cancellationToken)) yield return item; + await deadlockAwareLock.WaitAsync(cancellationToken); + try + { + var inner = innerFunc(); + await foreach (var item in inner.WithCancellation(cancellationToken)) yield return item; + } + finally + { + deadlockAwareLock.Release(); + } } } } \ No newline at end of file diff --git a/source/Nevermore/Advanced/ThreadSafeEnumerable.cs b/source/Nevermore/Advanced/ThreadSafeEnumerable.cs index dfe9dc3c..8ff4d217 100644 --- a/source/Nevermore/Advanced/ThreadSafeEnumerable.cs +++ b/source/Nevermore/Advanced/ThreadSafeEnumerable.cs @@ -1,7 +1,6 @@ using System; using System.Collections; using System.Collections.Generic; -using System.Threading; namespace Nevermore.Advanced {