-
-
Notifications
You must be signed in to change notification settings - Fork 96
Expand file tree
/
Copy pathLoadTest.cs
More file actions
72 lines (56 loc) · 2.54 KB
/
LoadTest.cs
File metadata and controls
72 lines (56 loc) · 2.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
using System.Data;
using Eventuous.SqlServer;
using Eventuous.SqlServer.Extensions;
using Eventuous.Sut.App;
using Eventuous.Sut.Domain;
using Eventuous.Sut.Subs;
using Eventuous.Tests.SqlServer.Fixtures;
using Hypothesist;
namespace Eventuous.Tests.SqlServer.Subscriptions;
public class LoadTest : SubscriptionFixture<TestEventHandler> {
readonly IntegrationFixture _fixture;
readonly BookingService _service;
public LoadTest(IntegrationFixture fixture, ITestOutputHelper output)
: base(fixture, output, new TestEventHandler(), true, autoStart: false, logLevel: LogLevel.Debug) {
_fixture = fixture;
var eventStore = new SqlServerStore(fixture.GetConnection, new SqlServerStoreOptions(SchemaName));
var store = new AggregateStore(eventStore);
_service = new BookingService(store);
}
[Fact]
public async Task ProduceAndConsumeManyEvents() {
const int count = 55000;
Handler.AssertThat().Any(_ => true);
var generateTask = Task.Run(() => GenerateAndHandleCommands(count));
await Start();
await Task.Delay(TimeSpan.FromMinutes(7));
await Stop();
Handler.Count.Should().Be(count);
var checkpoint = await CheckpointStore.GetLastCheckpoint(SubscriptionId, default);
checkpoint.Position.Value.Should().Be(count - 1);
await using var connection = _fixture.GetConnection();
await connection.OpenAsync();
await using var cmd = connection.GetStoredProcCommand(Schema.ReadAllBackwards)
.Add("@from_position", SqlDbType.BigInt, long.MaxValue)
.Add("@count", SqlDbType.Int, 1);
await using var reader = await cmd.ExecuteReaderAsync(CancellationToken.None);
var result = reader.ReadEvents(CancellationToken.None);
var lastEvent = await result.LastAsync();
lastEvent.GlobalPosition.Should().Be(count - 1);
}
async Task<List<Commands.ImportBooking>> GenerateAndHandleCommands(int count) {
var commands = Enumerable
.Range(0, count)
.Select(_ => DomainFixture.CreateImportBooking())
.ToList();
foreach (var cmd in commands) {
var result = await _service.Handle(cmd, default);
if (result is ErrorResult<BookingState> error) {
throw error.Exception ?? new Exception(error.Message);
}
}
return commands;
}
static BookingEvents.BookingImported ToEvent(Commands.ImportBooking cmd)
=> new(cmd.RoomId, cmd.Price, cmd.CheckIn, cmd.CheckOut);
}