Skip to content

Commit e2e54ca

Browse files
Add transaction configurability to EF saga repository (MassTransit#6130)
* Add transaction configurability to EF saga repository Introduce `_isTransactionEnabled` to control transaction usage in `EntityFrameworkSagaRepositoryConfigurator<TSaga>`. Add the `ConfigureTransaction` method to enable or disable transactions. Update `CreateOptimisticLockStrategy` and `CreatePessimisticLockStrategy` to propagate transaction settings. Modify `EntityFrameworkSagaRepository` methods to accept `isTransactionEnabled` as a parameter. Enhance `ISagaRepositoryLockStrategy<TSaga>` with `IsTransactionEnabled` property and update `OptimisticSagaRepositoryLockStrategy` and `PessimisticSagaRepositoryLockStrategy` to support transaction control. Refactor `WithinTransaction` in `EntityFrameworkSagaRepositoryContextFactory` to skip transaction wrapping when disabled. Perform minor refactoring and import adjustments for consistency. * ConfigureTransaction method moved to IEntityFrameworkSagaRepositoryConfigurator * Enforce transactions for pessimistic concurrency Refactored `CreatePessimisticLockStrategy` and removed the `isTransactionEnabled` parameter, enforcing transactions for pessimistic concurrency. Renamed `ConfigureTransaction` to `SetOptimisticConcurrency` to support optimistic concurrency with optional transaction control. Updated `PessimisticSagaRepositoryLockStrategy` to always enable transactions. Simplified `CreatePessimistic` method and adjusted related tests to reflect the new design. Removed tests and configurations for pessimistic concurrency without transactions, as this is no longer supported. Updated documentation and test cases to align with the new API. * Refactor saga transaction tests for clarity and coverage Refactored test classes to separate optimistic and pessimistic concurrency tests with transactions enabled/disabled. Updated test names, descriptions, and categories for better organization. Consolidated saga repository initialization logic, removing redundant fields and simplifying setup. Added new test for handling multiple saga instances without transactions. Introduced setup and teardown methods to ensure a clean database state for each test run. Updated lock strategy tests to reflect transaction-enabled behavior. Removed redundant code and improved overall maintainability of the test suite.
1 parent d7f77a6 commit e2e54ca

11 files changed

Lines changed: 550 additions & 13 deletions

src/Persistence/MassTransit.EntityFrameworkCoreIntegration/Configuration/Configuration/EntityFrameworkSagaRepositoryConfigurator.cs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ public class EntityFrameworkSagaRepositoryConfigurator<TSaga> :
2525
IsolationLevel _isolationLevel;
2626
ILockStatementProvider _lockStatementProvider;
2727
Func<IQueryable<TSaga>, IQueryable<TSaga>> _queryCustomization;
28+
bool _isTransactionEnabled = true;
2829

2930
public EntityFrameworkSagaRepositoryConfigurator()
3031
{
@@ -152,7 +153,7 @@ ISagaRepositoryLockStrategy<TSaga> CreateOptimisticLockStrategy()
152153
{
153154
var queryExecutor = new OptimisticLoadQueryExecutor<TSaga>(_queryCustomization);
154155

155-
return new OptimisticSagaRepositoryLockStrategy<TSaga>(queryExecutor, _queryCustomization, _isolationLevel);
156+
return new OptimisticSagaRepositoryLockStrategy<TSaga>(queryExecutor, _queryCustomization, _isolationLevel, _isTransactionEnabled);
156157
}
157158

158159
ISagaRepositoryLockStrategy<TSaga> CreatePessimisticLockStrategy()
@@ -163,5 +164,11 @@ ISagaRepositoryLockStrategy<TSaga> CreatePessimisticLockStrategy()
163164

164165
return new PessimisticSagaRepositoryLockStrategy<TSaga>(queryExecutor, _isolationLevel);
165166
}
167+
168+
public void SetOptimisticConcurrency(bool useTransaction = true)
169+
{
170+
_concurrencyMode = ConcurrencyMode.Optimistic;
171+
_isTransactionEnabled = useTransaction;
172+
}
166173
}
167174
}

src/Persistence/MassTransit.EntityFrameworkCoreIntegration/Configuration/IEntityFrameworkSagaRepositoryConfigurator.cs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
namespace MassTransit
22
{
3+
using EntityFrameworkCoreIntegration;
4+
using Microsoft.EntityFrameworkCore;
35
using System;
46
using System.Data;
57
using System.Linq;
6-
using EntityFrameworkCoreIntegration;
7-
using Microsoft.EntityFrameworkCore;
88

99

1010
public interface IEntityFrameworkSagaRepositoryConfigurator
@@ -42,6 +42,15 @@ void AddDbContext<TContext, TImplementation>(Action<IServiceProvider, DbContextO
4242
/// <typeparam name="TContext"></typeparam>
4343
void ExistingDbContext<TContext>()
4444
where TContext : DbContext;
45+
46+
/// <summary>
47+
/// Configures the saga to use optimistic concurrency, with optional transaction support.
48+
/// </summary>
49+
/// <param name="useTransaction">
50+
/// If <c>true</c>, operations on the saga will be executed within a transaction;
51+
/// if <c>false</c>, no transaction will be used.
52+
/// </param>
53+
void SetOptimisticConcurrency(bool useTransaction = true);
4554
}
4655

4756

src/Persistence/MassTransit.EntityFrameworkCoreIntegration/EntityFrameworkCoreIntegration/EntityFrameworkSagaRepository.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,18 @@ public static class EntityFrameworkSagaRepository<TSaga>
1212
where TSaga : class, ISaga
1313
{
1414
public static ISagaRepository<TSaga> CreateOptimistic(ISagaDbContextFactory<TSaga> dbContextFactory,
15-
Func<IQueryable<TSaga>, IQueryable<TSaga>> queryCustomization = null)
15+
Func<IQueryable<TSaga>, IQueryable<TSaga>> queryCustomization = null, bool isTransactionEnabled = true)
1616
{
1717
var queryExecutor = new OptimisticLoadQueryExecutor<TSaga>(queryCustomization);
18-
var lockStrategy = new OptimisticSagaRepositoryLockStrategy<TSaga>(queryExecutor, queryCustomization, IsolationLevel.ReadCommitted);
18+
var lockStrategy = new OptimisticSagaRepositoryLockStrategy<TSaga>(queryExecutor, queryCustomization, IsolationLevel.ReadCommitted, isTransactionEnabled);
1919

2020
return CreateRepository(dbContextFactory, lockStrategy);
2121
}
2222

2323
public static ISagaRepository<TSaga> CreateOptimistic(Func<DbContext> dbContextFactory,
24-
Func<IQueryable<TSaga>, IQueryable<TSaga>> queryCustomization = null)
24+
Func<IQueryable<TSaga>, IQueryable<TSaga>> queryCustomization = null, bool isTransactionEnabled = true)
2525
{
26-
return CreateOptimistic(new DelegateSagaDbContextFactory<TSaga>(dbContextFactory), queryCustomization);
26+
return CreateOptimistic(new DelegateSagaDbContextFactory<TSaga>(dbContextFactory), queryCustomization, isTransactionEnabled);
2727
}
2828

2929
public static ISagaRepository<TSaga> CreatePessimistic(ISagaDbContextFactory<TSaga> dbContextFactory,

src/Persistence/MassTransit.EntityFrameworkCoreIntegration/EntityFrameworkCoreIntegration/ISagaRepositoryLockStrategy.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
namespace MassTransit.EntityFrameworkCoreIntegration
22
{
3+
using Microsoft.EntityFrameworkCore;
4+
using Saga;
35
using System;
46
using System.Data;
57
using System.Threading;
68
using System.Threading.Tasks;
7-
using Microsoft.EntityFrameworkCore;
8-
using Saga;
99

1010

1111
public interface ISagaRepositoryLockStrategy<TSaga>
@@ -16,5 +16,7 @@ public interface ISagaRepositoryLockStrategy<TSaga>
1616
Task<TSaga> Load(DbContext context, Guid correlationId, CancellationToken cancellationToken);
1717

1818
Task<SagaLockContext<TSaga>> CreateLockContext(DbContext context, ISagaQuery<TSaga> query, CancellationToken cancellationToken);
19+
20+
bool IsTransactionEnabled { get; }
1921
}
2022
}

src/Persistence/MassTransit.EntityFrameworkCoreIntegration/EntityFrameworkCoreIntegration/Saga/EntityFrameworkSagaRepositoryContextFactory.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
namespace MassTransit.EntityFrameworkCoreIntegration.Saga
22
{
3+
using MassTransit.Saga;
4+
using Microsoft.EntityFrameworkCore;
5+
using Microsoft.EntityFrameworkCore.Storage;
36
using System;
47
using System.Collections.Generic;
58
using System.Linq;
69
using System.Threading;
710
using System.Threading.Tasks;
8-
using MassTransit.Saga;
9-
using Microsoft.EntityFrameworkCore;
10-
using Microsoft.EntityFrameworkCore.Storage;
1111

1212

1313
public class EntityFrameworkSagaRepositoryContextFactory<TSaga> :
@@ -161,6 +161,9 @@ Task<T> ExecuteAsync()
161161

162162
Task WithinTransaction(DbContext context, CancellationToken cancellationToken, Func<Task> callback)
163163
{
164+
if (!_lockStrategy.IsTransactionEnabled)
165+
return callback();
166+
164167
async Task<bool> Create()
165168
{
166169
await callback().ConfigureAwait(false);

src/Persistence/MassTransit.EntityFrameworkCoreIntegration/EntityFrameworkCoreIntegration/Saga/OptimisticSagaRepositoryLockStrategy.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,19 @@ public class OptimisticSagaRepositoryLockStrategy<TSaga> :
1616
readonly Func<IQueryable<TSaga>, IQueryable<TSaga>> _queryCustomization;
1717

1818
public OptimisticSagaRepositoryLockStrategy(ILoadQueryExecutor<TSaga> executor, Func<IQueryable<TSaga>, IQueryable<TSaga>> queryCustomization,
19-
IsolationLevel isolationLevel)
19+
IsolationLevel isolationLevel, bool isTransactionEnabled)
2020
{
2121
_executor = executor;
2222
_queryCustomization = queryCustomization;
2323

2424
IsolationLevel = isolationLevel;
25+
IsTransactionEnabled = isTransactionEnabled;
2526
}
2627

2728
public IsolationLevel IsolationLevel { get; }
2829

30+
public bool IsTransactionEnabled { get; }
31+
2932
public Task<TSaga> Load(DbContext context, Guid correlationId, CancellationToken cancellationToken)
3033
{
3134
return _executor.Load(context, correlationId, cancellationToken);

src/Persistence/MassTransit.EntityFrameworkCoreIntegration/EntityFrameworkCoreIntegration/Saga/PessimisticSagaRepositoryLockStrategy.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@ public PessimisticSagaRepositoryLockStrategy(ILoadQueryExecutor<TSaga> executor,
2424

2525
public IsolationLevel IsolationLevel { get; }
2626

27+
/// <summary>
28+
/// Pessimistic concurrency always uses transactions as locks require transaction scope.
29+
/// </summary>
30+
public bool IsTransactionEnabled => true;
31+
2732
public Task<TSaga> Load(DbContext context, Guid correlationId, CancellationToken cancellationToken)
2833
{
2934
return _executor.Load(context, correlationId, cancellationToken);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
namespace MassTransit.EntityFrameworkCoreIntegration.Tests.TransactionConfiguration
2+
{
3+
using System;
4+
using System.Data;
5+
using MassTransit.Configuration;
6+
using MassTransit.Tests.Saga;
7+
using NUnit.Framework;
8+
9+
10+
/// <summary>
11+
/// Tests for EntityFrameworkSagaRepositoryConfigurator transaction configuration functionality
12+
/// </summary>
13+
[TestFixture]
14+
public class Configurator_TransactionConfiguration_Specs
15+
{
16+
[Test]
17+
public void Should_accept_configure_transaction_false()
18+
{
19+
var configurator = new EntityFrameworkSagaRepositoryConfigurator<SimpleSaga>();
20+
21+
Assert.DoesNotThrow(() => configurator.SetOptimisticConcurrency(false),
22+
"ConfigureTransaction(false) should not throw");
23+
}
24+
25+
[Test]
26+
public void Should_accept_configure_transaction_true()
27+
{
28+
var configurator = new EntityFrameworkSagaRepositoryConfigurator<SimpleSaga>();
29+
30+
Assert.DoesNotThrow(() => configurator.SetOptimisticConcurrency(true),
31+
"ConfigureTransaction(true) should not throw");
32+
}
33+
34+
[Test]
35+
public void Should_allow_multiple_configuration_calls()
36+
{
37+
var configurator = new EntityFrameworkSagaRepositoryConfigurator<SimpleSaga>();
38+
39+
Assert.DoesNotThrow(() => {
40+
configurator.SetOptimisticConcurrency(false);
41+
configurator.SetOptimisticConcurrency(true);
42+
configurator.SetOptimisticConcurrency(false);
43+
}, "Multiple ConfigureTransaction calls should not throw");
44+
}
45+
46+
[Test]
47+
public void Should_support_fluent_configuration()
48+
{
49+
var configurator = new EntityFrameworkSagaRepositoryConfigurator<SimpleSaga>();
50+
51+
Assert.DoesNotThrow(() => {
52+
configurator.ConcurrencyMode = ConcurrencyMode.Optimistic;
53+
configurator.SetOptimisticConcurrency(false);
54+
configurator.IsolationLevel = IsolationLevel.ReadCommitted;
55+
}, "Fluent configuration with transaction setting should work");
56+
}
57+
}
58+
}

0 commit comments

Comments
 (0)