Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ public class ConcurrentAccessFixture : FixtureWithRelationalStore
const int NumberOfDocuments = 256;
const int DegreeOfParallelism = NumberOfDocuments;

[Test]
public void ConcurrentAccessDoesNotGoBoom()
[TestCase(ConcurrencyMode.LockOnly)]
[TestCase(ConcurrencyMode.LockWithLogging)]
public void ConcurrentAccessDoesNotGoBoom(ConcurrencyMode concurrencyMode)
{
NoMonkeyBusiness();
Configuration.ConcurrencyMode = concurrencyMode;

var namePrefix = $"{Guid.NewGuid()}-";

Expand Down Expand Up @@ -103,10 +105,12 @@ static void ThreadWaitAll(params Thread[] threads)
}
}

[Test]
public async Task AsyncConcurrentAccessDoesNotGoBoom()
[TestCase(ConcurrencyMode.LockOnly)]
[TestCase(ConcurrencyMode.LockWithLogging)]
public async Task AsyncConcurrentAccessDoesNotGoBoom(ConcurrencyMode concurrencyMode)
{
NoMonkeyBusiness();
Configuration.ConcurrencyMode = concurrencyMode;

var namePrefix = $"{Guid.NewGuid()}-";

Expand Down Expand Up @@ -162,5 +166,66 @@ await Task.WhenAll(
.WhenAll();
}
}

[TestCase(ConcurrencyMode.NoLock)]
[TestCase(ConcurrencyMode.LogOnly)]
public void ConcurrentAccessGoesBoomWhenLockingIsDisabled(ConcurrencyMode concurrencyMode)
{
NoMonkeyBusiness();
Configuration.ConcurrencyMode = concurrencyMode;

var namePrefix = $"{Guid.NewGuid()}-";

// Create a bunch of documents so that we can query for them.
using (var transaction = Store.BeginTransaction())
{
FluentActions.Invoking(
() =>
{
// Only using 10 here because using NumberOfDocuments (256) is too slow
Enumerable.Range(0, 10)
.Select(i => new DocumentWithIdentityId { Name = $"{namePrefix}{i}" })
.AsParallel()
.WithDegreeOfParallelism(DegreeOfParallelism)
.Select(document =>
{
// ReSharper disable once AccessToDisposedClosure
transaction.Insert(document);
return 0;
})
.ToArray();
})
.Should()
.Throw<Exception>();
}
}

[TestCase(ConcurrencyMode.NoLock)]
[TestCase(ConcurrencyMode.LogOnly)]
public async Task AsyncConcurrentAccessGoesBoomWhenLockingIsDisabled(ConcurrencyMode concurrencyMode)
{
NoMonkeyBusiness();
Configuration.ConcurrencyMode = concurrencyMode;

var namePrefix = $"{Guid.NewGuid()}-";

// Create a bunch of documents so that we can query for them.
using (var transaction = await Store.BeginWriteTransactionAsync())
{
await FluentActions.Awaiting(
async () =>
{
await Enumerable.Range(0, NumberOfDocuments)
.Select(i => new DocumentWithIdentityId { Name = $"{namePrefix}{i}" })
// ReSharper disable once AccessToDisposedClosure
.Select(document => transaction.InsertAsync(document))
.WhenAll();
await transaction.CommitAsync();
})
.Should()
.ThrowExactlyAsync<InvalidOperationException>()
.WithMessage("The connection does not support MultipleActiveResultSets.");
}
}
}
}
164 changes: 0 additions & 164 deletions source/Nevermore.Tests/DeadlockAwareLockFixture.cs

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Nevermore.Advanced.Concurrency;

namespace Nevermore.Advanced
{
internal class AsyncEnumerableWithConcurrencyHandling<T> : IAsyncEnumerable<T>
{
readonly Func<IAsyncEnumerable<T>> innerFunc;
readonly ITransactionConcurrencyHandler transactionConcurrencyHandler;

public AsyncEnumerableWithConcurrencyHandling(IAsyncEnumerable<T> inner, ITransactionConcurrencyHandler transactionConcurrencyHandler)
: this(() => inner, transactionConcurrencyHandler)
{
}

public AsyncEnumerableWithConcurrencyHandling(Func<IAsyncEnumerable<T>> innerFunc, ITransactionConcurrencyHandler transactionConcurrencyHandler)
{
this.innerFunc = innerFunc;
this.transactionConcurrencyHandler = transactionConcurrencyHandler;
}

public async IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = new())
{
using var mutex = await transactionConcurrencyHandler.LockAsync(cancellationToken).ConfigureAwait(false);
var inner = innerFunc();
await foreach (var item in inner.WithCancellation(cancellationToken).ConfigureAwait(false)) yield return item;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using System;
using System.Threading;
using System.Threading.Tasks;

namespace Nevermore.Advanced.Concurrency
{
public interface ITransactionConcurrencyHandler : IDisposable
{
IDisposable Lock();

Task<IDisposable> LockAsync(CancellationToken cancellationToken);
}
Comment on lines +7 to +12
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We decided to replace the DeadlockAwareLock with this abstraction as a way for us to enable logging without locking. It's a bit much, but felt cleaner than introducing a bunch of additional checks into a single class.
Open to alternatives.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's a spike of what this would look like if we had a single type: #289

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Nito.AsyncEx;

namespace Nevermore.Advanced.Concurrency
{
class LockOnlyConcurrencyHandler : ITransactionConcurrencyHandler
{
readonly SemaphoreSlim semaphore = new(1, 1);

public IDisposable Lock()
{
return semaphore.Lock();
}

public async Task<IDisposable> LockAsync(CancellationToken cancellationToken)
{
return await semaphore.LockAsync(cancellationToken).ConfigureAwait(false);
}

public void Dispose()
{
semaphore.Dispose();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Nevermore.Diagnositcs;
using Nito.AsyncEx;

namespace Nevermore.Advanced.Concurrency
{
class LockWithLoggingConcurrencyHandler : ITransactionConcurrencyHandler
{
static readonly ILog Log = LogProvider.For<LockWithLoggingConcurrencyHandler>();

readonly SemaphoreSlim semaphore = new(1, 1);

public IDisposable Lock()
{
// `SemaphoreSlim` counts down, so if it's 0 then there's a concurrent execution happening.
if (semaphore.CurrentCount == 0)
{
Log.WarnFormat("Concurrent query execution detected. Stacktrace: {0}", Environment.StackTrace);
}

return semaphore.Lock();
}

public async Task<IDisposable> LockAsync(CancellationToken cancellationToken)
{
// `SemaphoreSlim` counts down, so if it's 0 then there's a concurrent execution happening.
if (semaphore.CurrentCount == 0)
{
Log.WarnFormat("Concurrent query execution detected. Stacktrace: {0}", Environment.StackTrace);
}

return await semaphore.LockAsync(cancellationToken).ConfigureAwait(false);
}

public void Dispose()
{
semaphore.Dispose();
}
}
}
Loading