From 142a6c13182c107fa0fd5ccd9c2237a80363abbc Mon Sep 17 00:00:00 2001
From: Sam Matthews <36134497+SamBucaMatthews@users.noreply.github.com>
Date: Fri, 30 May 2025 15:40:11 +0100
Subject: [PATCH 01/15] Initial test setup
---
.../MessageDispatch.KurrentDB.Tests.csproj | 29 ++++
.../SubscriberTests.cs | 142 ++++++++++++++++++
src/MessageDispatch.KurrentDB.sln | 6 +
3 files changed, 177 insertions(+)
create mode 100644 src/MessageDispatch.KurrentDB.Tests/MessageDispatch.KurrentDB.Tests.csproj
create mode 100644 src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs
diff --git a/src/MessageDispatch.KurrentDB.Tests/MessageDispatch.KurrentDB.Tests.csproj b/src/MessageDispatch.KurrentDB.Tests/MessageDispatch.KurrentDB.Tests.csproj
new file mode 100644
index 0000000..0589167
--- /dev/null
+++ b/src/MessageDispatch.KurrentDB.Tests/MessageDispatch.KurrentDB.Tests.csproj
@@ -0,0 +1,29 @@
+
+
+
+ net8.0
+ enable
+ enable
+
+ false
+ true
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs b/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs
new file mode 100644
index 0000000..6a8ac83
--- /dev/null
+++ b/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs
@@ -0,0 +1,142 @@
+using System.Runtime.InteropServices;
+using System.Text;
+using System.Text.Json;
+using CorshamScience.MessageDispatch.Core;
+using DotNet.Testcontainers.Builders;
+using KurrentDB.Client;
+using Microsoft.Extensions.Logging.Abstractions;
+using PharmaxoScientific.MessageDispatch.KurrentDB;
+using Testcontainers.EventStoreDb;
+
+namespace MessageDispatch.KurrentDB.Tests;
+
+public class SubscriberTests
+{
+ private const string StreamName = "stream1";
+ private string _connectionString;
+
+ [SetUp]
+ public async Task Setup()
+ {
+ const int eventStoreHostPort = 1234;
+ const string eventStoreVersion = "23.10.0";
+
+ var eventStoreImageName = RuntimeInformation.OSArchitecture == Architecture.Arm64
+ ? $"ghcr.io/eventstore/eventstore:{eventStoreVersion}-alpha-arm64v8"
+ : $"eventstore/eventstore:{eventStoreVersion}-bookworm-slim";
+
+ var eventStoreContainer = BuildEventStoreContainer(eventStoreImageName, eventStoreHostPort);
+ await eventStoreContainer.StartAsync();
+
+ var mappedHostPort = eventStoreContainer.GetMappedPublicPort(eventStoreHostPort);
+ _connectionString = $"esdb://admin:changeit@localhost:{mappedHostPort}?tls=false";
+ }
+
+ [Test]
+ public async Task CreateLiveSubscription_GivenNoEventsInStreamWhenNewEventsAdded_DispatchesEventsAndBecomesLive()
+ {
+ var kurrentDbClient = new KurrentDBClient(KurrentDBClientSettings.Create(_connectionString));
+
+ var dispatcher = new AwaitableDispatcherSpy();
+ var subscriber = KurrentDbSubscriber.CreateLiveSubscription(
+ kurrentDbClient,
+ dispatcher,
+ StreamName,
+ new NullLogger());
+
+ subscriber.Start();
+
+ var event1 = SimpleEvent.Create();
+ var event2 = SimpleEvent.Create();
+ var event3 = SimpleEvent.Create();
+
+ List events = [event1, event2, event3];
+
+ await AppendEventsToStreamAsync(event1, event2, event3);
+ await dispatcher.WaitForEventsToBeDispatched(event1, event2, event3);
+
+ var deserializedDispatchedEvents =
+ dispatcher.DispatchedEvents.Select(DeserializeEventData);
+
+ Assert.Multiple(() =>
+ {
+ Assert.That(deserializedDispatchedEvents, Is.EqualTo(events));
+ Assert.That(subscriber.IsLive);
+ });
+ }
+
+ // ReSharper disable once NotAccessedPositionalProperty.Local
+ private record SimpleEvent(Guid Id)
+ {
+ public static SimpleEvent Create() => new(Guid.NewGuid());
+ }
+
+ private class AwaitableDispatcherSpy : IDispatcher
+ {
+ public List DispatchedEvents { get; } = [];
+
+ public void Dispatch(ResolvedEvent message) => DispatchedEvents.Add(message);
+
+ public Task WaitForEventsToBeDispatched(params object[] events)
+ {
+ if (events.Length == 0)
+ {
+ return Task.CompletedTask;
+ }
+
+ var iterations = 0;
+ while (DispatchedEvents.Count != events.Length)
+ {
+ Thread.Sleep(100);
+ iterations++;
+
+ if (iterations > 10)
+ {
+ throw new TimeoutException("Expected events weren't dispatched within the allotted time.");
+ }
+ }
+
+ return Task.CompletedTask;
+ }
+ }
+
+ private static T? DeserializeEventData(ResolvedEvent message) =>
+ JsonSerializer.Deserialize(Encoding.UTF8.GetString(message.Event.Data.Span));
+
+ private static EventStoreDbContainer BuildEventStoreContainer(string imageName, int hostPort) =>
+ new EventStoreDbBuilder()
+ .WithImage(imageName)
+ .WithCleanUp(true)
+ .WithPortBinding(hostPort, true)
+ .WithEnvironment(new Dictionary
+ {
+ { "EVENTSTORE_INSECURE", "true" },
+ { "EVENTSTORE_ENABLE_ATOM_PUB_OVER_HTTP", "true" },
+ { "EVENTSTORE_ENABLE_EXTERNAL_TCP", "true" },
+ { "EVENTSTORE_HTTP_PORT", hostPort.ToString() },
+ { "EVENTSTORE_RUN_PROJECTIONS", "All" },
+ })
+ .WithWaitStrategy(Wait.ForUnixContainer().UntilPortIsAvailable(hostPort))
+ .Build();
+
+ private static EventData ToEventData(object data, JsonSerializerOptions? options = null)
+ {
+ var metaData = new { ClrType = data.GetType().AssemblyQualifiedName, };
+
+ var type = data.GetType().Name;
+
+ return new EventData(
+ Uuid.NewUuid(),
+ type,
+ Encoding.UTF8.GetBytes(JsonSerializer.Serialize(data, options)),
+ Encoding.UTF8.GetBytes(JsonSerializer.Serialize(metaData, options)));
+ }
+
+ private async Task AppendEventsToStreamAsync(params object[] events)
+ {
+ var eventData = events.Select(e => ToEventData(e));
+ var client = new KurrentDBClient(KurrentDBClientSettings.Create(_connectionString));
+
+ await client.AppendToStreamAsync(StreamName, StreamState.Any, eventData);
+ }
+}
diff --git a/src/MessageDispatch.KurrentDB.sln b/src/MessageDispatch.KurrentDB.sln
index 2d4c0c8..b502ac7 100644
--- a/src/MessageDispatch.KurrentDB.sln
+++ b/src/MessageDispatch.KurrentDB.sln
@@ -5,6 +5,8 @@ VisualStudioVersion = 17.12.35931.192
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MessageDispatch.KurrentDB", "MessageDispatch.KurrentDB\MessageDispatch.KurrentDB.csproj", "{8B3CD5D9-28DB-4823-8ACE-C5B9770FFE5B}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MessageDispatch.KurrentDB.Tests", "MessageDispatch.KurrentDB.Tests\MessageDispatch.KurrentDB.Tests.csproj", "{10045A11-D589-4A7F-BC17-C4CE508B04F4}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -15,6 +17,10 @@ Global
{8B3CD5D9-28DB-4823-8ACE-C5B9770FFE5B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{8B3CD5D9-28DB-4823-8ACE-C5B9770FFE5B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{8B3CD5D9-28DB-4823-8ACE-C5B9770FFE5B}.Release|Any CPU.Build.0 = Release|Any CPU
+ {10045A11-D589-4A7F-BC17-C4CE508B04F4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {10045A11-D589-4A7F-BC17-C4CE508B04F4}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {10045A11-D589-4A7F-BC17-C4CE508B04F4}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {10045A11-D589-4A7F-BC17-C4CE508B04F4}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
From 096d593a896b4f2b4e0d98f383e8398ca5600c01 Mon Sep 17 00:00:00 2001
From: Sam Matthews <36134497+SamBucaMatthews@users.noreply.github.com>
Date: Fri, 30 May 2025 16:09:23 +0100
Subject: [PATCH 02/15] Add (failing) test for catch up sub to all
---
.../SubscriberTests.cs | 82 +++++++++++++++++--
1 file changed, 76 insertions(+), 6 deletions(-)
diff --git a/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs b/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs
index 6a8ac83..c3bb578 100644
--- a/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs
+++ b/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs
@@ -14,6 +14,8 @@ public class SubscriberTests
{
private const string StreamName = "stream1";
private string _connectionString;
+ private KurrentDBClient _kurrentDbClient;
+ private AwaitableDispatcherSpy _dispatcher;
[SetUp]
public async Task Setup()
@@ -30,20 +32,58 @@ public async Task Setup()
var mappedHostPort = eventStoreContainer.GetMappedPublicPort(eventStoreHostPort);
_connectionString = $"esdb://admin:changeit@localhost:{mappedHostPort}?tls=false";
+
+ _kurrentDbClient = new KurrentDBClient(KurrentDBClientSettings.Create(_connectionString));
+ _dispatcher = new AwaitableDispatcherSpy();
}
+ [TearDown]
+ public async Task TearDown() => await _kurrentDbClient.DisposeAsync();
+
[Test]
public async Task CreateLiveSubscription_GivenNoEventsInStreamWhenNewEventsAdded_DispatchesEventsAndBecomesLive()
{
- var kurrentDbClient = new KurrentDBClient(KurrentDBClientSettings.Create(_connectionString));
+ var subscriber = KurrentDbSubscriber.CreateLiveSubscription(
+ _kurrentDbClient,
+ _dispatcher,
+ StreamName,
+ new NullLogger());
+
+ subscriber.Start();
+
+ var event1 = SimpleEvent.Create();
+ var event2 = SimpleEvent.Create();
+ var event3 = SimpleEvent.Create();
+
+ List events = [event1, event2, event3];
- var dispatcher = new AwaitableDispatcherSpy();
+ await AppendEventsToStreamAsync(event1, event2, event3);
+ await _dispatcher.WaitForEventsToBeDispatched(event1, event2, event3);
+
+ var deserializedDispatchedEvents =
+ _dispatcher.DispatchedEvents.Select(DeserializeEventData);
+
+ Assert.Multiple(() =>
+ {
+ Assert.That(deserializedDispatchedEvents, Is.EqualTo(events));
+ Assert.That(subscriber.IsLive);
+ });
+ }
+
+ [Test]
+ public async Task CreateLiveSubscription_GivenExistingEventsInStreamWhenNewEventsAdded_DispatchesNewEventsAndBecomesLive()
+ {
var subscriber = KurrentDbSubscriber.CreateLiveSubscription(
- kurrentDbClient,
- dispatcher,
+ _kurrentDbClient,
+ _dispatcher,
StreamName,
new NullLogger());
+ var oldEvent1 = SimpleEvent.Create();
+ var oldEvent2 = SimpleEvent.Create();
+
+ await AppendEventsToStreamAsync(oldEvent1, oldEvent2);
+
subscriber.Start();
var event1 = SimpleEvent.Create();
@@ -53,10 +93,40 @@ public async Task CreateLiveSubscription_GivenNoEventsInStreamWhenNewEventsAdded
List events = [event1, event2, event3];
await AppendEventsToStreamAsync(event1, event2, event3);
- await dispatcher.WaitForEventsToBeDispatched(event1, event2, event3);
+ await _dispatcher.WaitForEventsToBeDispatched(event1, event2, event3);
+
+ var deserializedDispatchedEvents =
+ _dispatcher.DispatchedEvents.Select(DeserializeEventData);
+
+ Assert.Multiple(() =>
+ {
+ Assert.That(deserializedDispatchedEvents, Is.EqualTo(events));
+ Assert.That(subscriber.IsLive);
+ });
+ }
+
+ [Test]
+ public async Task CreateCatchupSubscriptionSubscribedToAll_GivenEventsInStream_DispatchesEventsAndBecomesLive()
+ {
+ var subscriber = KurrentDbSubscriber.CreateCatchupSubscriptionSubscribedToAll(
+ _kurrentDbClient,
+ _dispatcher,
+ new NullLogger());
+
+ var event1 = SimpleEvent.Create();
+ var event2 = SimpleEvent.Create();
+ var event3 = SimpleEvent.Create();
+
+ List events = [event1, event2, event3];
+
+ await AppendEventsToStreamAsync(event1, event2, event3);
+
+ subscriber.Start();
+
+ await _dispatcher.WaitForEventsToBeDispatched(event1, event2, event3);
var deserializedDispatchedEvents =
- dispatcher.DispatchedEvents.Select(DeserializeEventData);
+ _dispatcher.DispatchedEvents.Select(DeserializeEventData);
Assert.Multiple(() =>
{
From 8373278a1c6e4acb617f80ee6e1983eabd599d81 Mon Sep 17 00:00:00 2001
From: Sam Matthews <36134497+SamBucaMatthews@users.noreply.github.com>
Date: Fri, 30 May 2025 16:18:36 +0100
Subject: [PATCH 03/15] Add test for CreateCatchupSubscriptionFromPosition
---
.../SubscriberTests.cs | 61 +++++++++++++++++++
1 file changed, 61 insertions(+)
diff --git a/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs b/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs
index c3bb578..45c1424 100644
--- a/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs
+++ b/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs
@@ -135,6 +135,67 @@ public async Task CreateCatchupSubscriptionSubscribedToAll_GivenEventsInStream_D
});
}
+ [Test]
+ public async Task CreateCatchupSubscriptionSubscribedToAll_GivenNoEventsInStreamGivenNewEvents_DispatchesEventsAndBecomesLive()
+ {
+ var subscriber = KurrentDbSubscriber.CreateCatchupSubscriptionSubscribedToAll(
+ _kurrentDbClient,
+ _dispatcher,
+ new NullLogger());
+
+ var event1 = SimpleEvent.Create();
+ var event2 = SimpleEvent.Create();
+ var event3 = SimpleEvent.Create();
+
+ List events = [event1, event2, event3];
+
+ subscriber.Start();
+
+ await AppendEventsToStreamAsync(event1, event2, event3);
+ await _dispatcher.WaitForEventsToBeDispatched(event1, event2, event3);
+
+ var deserializedDispatchedEvents =
+ _dispatcher.DispatchedEvents.Select(DeserializeEventData);
+
+ Assert.Multiple(() =>
+ {
+ Assert.That(deserializedDispatchedEvents, Is.EqualTo(events));
+ Assert.That(subscriber.IsLive);
+ });
+ }
+
+ [Test]
+ public async Task CreateCatchupSubscriptionFromPosition_GivenEventsInStreamAndStartPosition_DispatchesEventsFromPositionAndBecomesLive()
+ {
+ var subscriber = KurrentDbSubscriber.CreateCatchupSubscriptionFromPosition(
+ _kurrentDbClient,
+ _dispatcher,
+ StreamName,
+ new NullLogger(),
+ 1);
+
+ var event1 = SimpleEvent.Create();
+ var event2 = SimpleEvent.Create();
+ var event3 = SimpleEvent.Create();
+
+ List events = [event3];
+
+ await AppendEventsToStreamAsync(event1, event2, event3);
+
+ subscriber.Start();
+
+ await _dispatcher.WaitForEventsToBeDispatched(event3);
+
+ var deserializedDispatchedEvents =
+ _dispatcher.DispatchedEvents.Select(DeserializeEventData);
+
+ Assert.Multiple(() =>
+ {
+ Assert.That(deserializedDispatchedEvents, Is.EqualTo(events));
+ Assert.That(subscriber.IsLive);
+ });
+ }
+
// ReSharper disable once NotAccessedPositionalProperty.Local
private record SimpleEvent(Guid Id)
{
From 68da149f4993dd82e0a4453e3e06a2bf07085b61 Mon Sep 17 00:00:00 2001
From: Sam Matthews <36134497+SamBucaMatthews@users.noreply.github.com>
Date: Fri, 30 May 2025 16:20:14 +0100
Subject: [PATCH 04/15] Fix formatting
---
src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs | 2 ++
1 file changed, 2 insertions(+)
diff --git a/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs b/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs
index 45c1424..8519846 100644
--- a/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs
+++ b/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs
@@ -1,3 +1,5 @@
+// Copyright (c) Pharmaxo. All rights reserved.
+
using System.Runtime.InteropServices;
using System.Text;
using System.Text.Json;
From b4b7d82209f8d610c6dbb672d65070bb340adc78 Mon Sep 17 00:00:00 2001
From: Sam Matthews <36134497+SamBucaMatthews@users.noreply.github.com>
Date: Fri, 30 May 2025 16:26:14 +0100
Subject: [PATCH 05/15] Cancel subscription after each test
---
.../SubscriberTests.cs | 37 +++++++++++--------
.../KurrentDbSubscriber.cs | 5 ---
2 files changed, 21 insertions(+), 21 deletions(-)
diff --git a/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs b/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs
index 8519846..6895b20 100644
--- a/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs
+++ b/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs
@@ -18,6 +18,7 @@ public class SubscriberTests
private string _connectionString;
private KurrentDBClient _kurrentDbClient;
private AwaitableDispatcherSpy _dispatcher;
+ private KurrentDbSubscriber? _subscriber;
[SetUp]
public async Task Setup()
@@ -40,18 +41,22 @@ public async Task Setup()
}
[TearDown]
- public async Task TearDown() => await _kurrentDbClient.DisposeAsync();
+ public async Task TearDown()
+ {
+ await _kurrentDbClient.DisposeAsync();
+ _subscriber?.ShutDown();
+ }
[Test]
public async Task CreateLiveSubscription_GivenNoEventsInStreamWhenNewEventsAdded_DispatchesEventsAndBecomesLive()
{
- var subscriber = KurrentDbSubscriber.CreateLiveSubscription(
+ _subscriber = KurrentDbSubscriber.CreateLiveSubscription(
_kurrentDbClient,
_dispatcher,
StreamName,
new NullLogger());
- subscriber.Start();
+ _subscriber.Start();
var event1 = SimpleEvent.Create();
var event2 = SimpleEvent.Create();
@@ -68,14 +73,14 @@ public async Task CreateLiveSubscription_GivenNoEventsInStreamWhenNewEventsAdded
Assert.Multiple(() =>
{
Assert.That(deserializedDispatchedEvents, Is.EqualTo(events));
- Assert.That(subscriber.IsLive);
+ Assert.That(_subscriber.IsLive);
});
}
[Test]
public async Task CreateLiveSubscription_GivenExistingEventsInStreamWhenNewEventsAdded_DispatchesNewEventsAndBecomesLive()
{
- var subscriber = KurrentDbSubscriber.CreateLiveSubscription(
+ _subscriber = KurrentDbSubscriber.CreateLiveSubscription(
_kurrentDbClient,
_dispatcher,
StreamName,
@@ -86,7 +91,7 @@ public async Task CreateLiveSubscription_GivenExistingEventsInStreamWhenNewEvent
await AppendEventsToStreamAsync(oldEvent1, oldEvent2);
- subscriber.Start();
+ _subscriber.Start();
var event1 = SimpleEvent.Create();
var event2 = SimpleEvent.Create();
@@ -103,14 +108,14 @@ public async Task CreateLiveSubscription_GivenExistingEventsInStreamWhenNewEvent
Assert.Multiple(() =>
{
Assert.That(deserializedDispatchedEvents, Is.EqualTo(events));
- Assert.That(subscriber.IsLive);
+ Assert.That(_subscriber.IsLive);
});
}
[Test]
public async Task CreateCatchupSubscriptionSubscribedToAll_GivenEventsInStream_DispatchesEventsAndBecomesLive()
{
- var subscriber = KurrentDbSubscriber.CreateCatchupSubscriptionSubscribedToAll(
+ _subscriber = KurrentDbSubscriber.CreateCatchupSubscriptionSubscribedToAll(
_kurrentDbClient,
_dispatcher,
new NullLogger());
@@ -123,7 +128,7 @@ public async Task CreateCatchupSubscriptionSubscribedToAll_GivenEventsInStream_D
await AppendEventsToStreamAsync(event1, event2, event3);
- subscriber.Start();
+ _subscriber.Start();
await _dispatcher.WaitForEventsToBeDispatched(event1, event2, event3);
@@ -133,14 +138,14 @@ public async Task CreateCatchupSubscriptionSubscribedToAll_GivenEventsInStream_D
Assert.Multiple(() =>
{
Assert.That(deserializedDispatchedEvents, Is.EqualTo(events));
- Assert.That(subscriber.IsLive);
+ Assert.That(_subscriber.IsLive);
});
}
[Test]
public async Task CreateCatchupSubscriptionSubscribedToAll_GivenNoEventsInStreamGivenNewEvents_DispatchesEventsAndBecomesLive()
{
- var subscriber = KurrentDbSubscriber.CreateCatchupSubscriptionSubscribedToAll(
+ _subscriber = KurrentDbSubscriber.CreateCatchupSubscriptionSubscribedToAll(
_kurrentDbClient,
_dispatcher,
new NullLogger());
@@ -151,7 +156,7 @@ public async Task CreateCatchupSubscriptionSubscribedToAll_GivenNoEventsInStream
List events = [event1, event2, event3];
- subscriber.Start();
+ _subscriber.Start();
await AppendEventsToStreamAsync(event1, event2, event3);
await _dispatcher.WaitForEventsToBeDispatched(event1, event2, event3);
@@ -162,14 +167,14 @@ public async Task CreateCatchupSubscriptionSubscribedToAll_GivenNoEventsInStream
Assert.Multiple(() =>
{
Assert.That(deserializedDispatchedEvents, Is.EqualTo(events));
- Assert.That(subscriber.IsLive);
+ Assert.That(_subscriber.IsLive);
});
}
[Test]
public async Task CreateCatchupSubscriptionFromPosition_GivenEventsInStreamAndStartPosition_DispatchesEventsFromPositionAndBecomesLive()
{
- var subscriber = KurrentDbSubscriber.CreateCatchupSubscriptionFromPosition(
+ _subscriber = KurrentDbSubscriber.CreateCatchupSubscriptionFromPosition(
_kurrentDbClient,
_dispatcher,
StreamName,
@@ -184,7 +189,7 @@ public async Task CreateCatchupSubscriptionFromPosition_GivenEventsInStreamAndSt
await AppendEventsToStreamAsync(event1, event2, event3);
- subscriber.Start();
+ _subscriber.Start();
await _dispatcher.WaitForEventsToBeDispatched(event3);
@@ -194,7 +199,7 @@ public async Task CreateCatchupSubscriptionFromPosition_GivenEventsInStreamAndSt
Assert.Multiple(() =>
{
Assert.That(deserializedDispatchedEvents, Is.EqualTo(events));
- Assert.That(subscriber.IsLive);
+ Assert.That(_subscriber.IsLive);
});
}
diff --git a/src/MessageDispatch.KurrentDB/KurrentDbSubscriber.cs b/src/MessageDispatch.KurrentDB/KurrentDbSubscriber.cs
index cca6cc4..df79ffd 100644
--- a/src/MessageDispatch.KurrentDB/KurrentDbSubscriber.cs
+++ b/src/MessageDispatch.KurrentDB/KurrentDbSubscriber.cs
@@ -99,7 +99,6 @@ public CatchupProgress CatchupProgress
/// Stream name to push events into.
/// Logger.
/// A new KurrentDbSubscriber object.
- // ReSharper disable once UnusedMember.Global
public static KurrentDbSubscriber CreateLiveSubscription(
KurrentDBClient kurrentDbClient,
IDispatcher dispatcher,
@@ -134,7 +133,6 @@ public static KurrentDbSubscriber CreateCatchupSubscriptionUsingCheckpoint(
/// Logger.
/// Starting Position.
/// A new KurrentDbSubscriber object.
- // ReSharper disable once UnusedMember.Global
public static KurrentDbSubscriber CreateCatchupSubscriptionFromPosition(
KurrentDBClient kurrentDbClient,
IDispatcher dispatcher,
@@ -150,7 +148,6 @@ public static KurrentDbSubscriber CreateCatchupSubscriptionFromPosition(
/// Dispatcher.
/// Logger.
/// A new KurrentDbSubscriber object.
- // ReSharper disable once UnusedMember.Global
public static KurrentDbSubscriber CreateCatchupSubscriptionSubscribedToAll(
KurrentDBClient kurrentDbClient,
IDispatcher dispatcher,
@@ -206,7 +203,6 @@ public static KurrentDbSubscriber CreateCatchupSubscriptionSubscribedToAllUsingC
///
/// Start the subscriber.
///
- // ReSharper disable once MemberCanBePrivate.Global
public async void Start()
{
_cts = new CancellationTokenSource();
@@ -307,7 +303,6 @@ private StreamSubscriptionResult CreateSubscription()
///
/// Shut down the subscription.
///
- // ReSharper disable once UnusedMember.Global
public void ShutDown() => _cts.Cancel();
private void Init(
From 4f20845ba9cf064dd065abff1dbed3b5ae7cf910 Mon Sep 17 00:00:00 2001
From: Sam Matthews <36134497+SamBucaMatthews@users.noreply.github.com>
Date: Mon, 2 Jun 2025 09:12:21 +0100
Subject: [PATCH 06/15] Add test for
CreateCatchupSubscriptionSubscribedToAllFromPosition
---
.../SubscriberTests.cs | 37 ++++++++++++++++++-
.../KurrentDbSubscriber.cs | 1 -
2 files changed, 35 insertions(+), 3 deletions(-)
diff --git a/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs b/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs
index 6895b20..600348b 100644
--- a/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs
+++ b/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs
@@ -203,6 +203,39 @@ public async Task CreateCatchupSubscriptionFromPosition_GivenEventsInStreamAndSt
});
}
+ [Test]
+ public async Task CreateCatchupSubscriptionSubscribedToAllFromPosition_GivenEventsInStreamAndStartPosition_DispatchesEventsFromPositionAndBecomesLive()
+ {
+ var event1 = SimpleEvent.Create();
+ var event2 = SimpleEvent.Create();
+ var event3 = SimpleEvent.Create();
+
+ List eventsExpectedToBeDispatched = [event3];
+
+ await AppendEventsToStreamAsync(event1);
+ var startingPosition = await AppendEventsToStreamAsync(event2);
+ await AppendEventsToStreamAsync(event3);
+
+ _subscriber = KurrentDbSubscriber.CreateCatchupSubscriptionSubscribedToAllFromPosition(
+ _kurrentDbClient,
+ _dispatcher,
+ new NullLogger(),
+ startingPosition.LogPosition.CommitPosition);
+
+ _subscriber.Start();
+
+ await _dispatcher.WaitForEventsToBeDispatched(event3);
+
+ var deserializedDispatchedEvents =
+ _dispatcher.DispatchedEvents.Select(DeserializeEventData);
+
+ Assert.Multiple(() =>
+ {
+ Assert.That(deserializedDispatchedEvents, Is.EqualTo(eventsExpectedToBeDispatched));
+ Assert.That(_subscriber.IsLive);
+ });
+ }
+
// ReSharper disable once NotAccessedPositionalProperty.Local
private record SimpleEvent(Guid Id)
{
@@ -270,11 +303,11 @@ private static EventData ToEventData(object data, JsonSerializerOptions? options
Encoding.UTF8.GetBytes(JsonSerializer.Serialize(metaData, options)));
}
- private async Task AppendEventsToStreamAsync(params object[] events)
+ private async Task AppendEventsToStreamAsync(params object[] events)
{
var eventData = events.Select(e => ToEventData(e));
var client = new KurrentDBClient(KurrentDBClientSettings.Create(_connectionString));
- await client.AppendToStreamAsync(StreamName, StreamState.Any, eventData);
+ return await client.AppendToStreamAsync(StreamName, StreamState.Any, eventData);
}
}
diff --git a/src/MessageDispatch.KurrentDB/KurrentDbSubscriber.cs b/src/MessageDispatch.KurrentDB/KurrentDbSubscriber.cs
index df79ffd..db5b914 100644
--- a/src/MessageDispatch.KurrentDB/KurrentDbSubscriber.cs
+++ b/src/MessageDispatch.KurrentDB/KurrentDbSubscriber.cs
@@ -166,7 +166,6 @@ public static KurrentDbSubscriber CreateCatchupSubscriptionSubscribedToAll(
/// Logger.
/// Starting Position.
/// A new KurrentDbSubscriber object.
- // ReSharper disable once UnusedMember.Global
public static KurrentDbSubscriber CreateCatchupSubscriptionSubscribedToAllFromPosition(
KurrentDBClient kurrentDbClient,
IDispatcher dispatcher,
From 24b9503a05ebf6110c0662e07fddbf6bfa2db6e3 Mon Sep 17 00:00:00 2001
From: Sam Matthews <36134497+SamBucaMatthews@users.noreply.github.com>
Date: Mon, 2 Jun 2025 09:14:24 +0100
Subject: [PATCH 07/15] Fix CreateCatchupSubscriptionSubscribedToAll (actually
start from beginning of All)
---
src/MessageDispatch.KurrentDB/KurrentDbSubscriber.cs | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/src/MessageDispatch.KurrentDB/KurrentDbSubscriber.cs b/src/MessageDispatch.KurrentDB/KurrentDbSubscriber.cs
index db5b914..2ddb444 100644
--- a/src/MessageDispatch.KurrentDB/KurrentDbSubscriber.cs
+++ b/src/MessageDispatch.KurrentDB/KurrentDbSubscriber.cs
@@ -156,7 +156,8 @@ public static KurrentDbSubscriber CreateCatchupSubscriptionSubscribedToAll(
kurrentDbClient,
dispatcher,
AllStreamName,
- logger);
+ logger,
+ 0);
///
/// Creates an KurrentDB catchup subscription that is subscribed to all from a position.
From 7800674b5c194606546082ecf23e56fc19ea275a Mon Sep 17 00:00:00 2001
From: Sam Matthews <36134497+SamBucaMatthews@users.noreply.github.com>
Date: Mon, 2 Jun 2025 10:36:17 +0100
Subject: [PATCH 08/15] Add tests for checkpoint subscriptions
---
.../SubscriberTests.cs | 148 ++++++++++++++++++
.../KurrentDbSubscriber.cs | 2 -
.../MessageDispatch.KurrentDB.csproj | 4 +
3 files changed, 152 insertions(+), 2 deletions(-)
diff --git a/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs b/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs
index 600348b..ec977bd 100644
--- a/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs
+++ b/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs
@@ -236,6 +236,154 @@ public async Task CreateCatchupSubscriptionSubscribedToAllFromPosition_GivenEven
});
}
+ [Test]
+ public async Task CreateCatchupSubscriptionSubscribedToAllUsingCheckpoint_GivenEventsInStreamAndNoExistingCheckpointFile_DispatchesAllEventsAndBecomesLive()
+ {
+ var event1 = SimpleEvent.Create();
+ var event2 = SimpleEvent.Create();
+ var event3 = SimpleEvent.Create();
+
+ List eventsExpectedToBeDispatched = [event1, event2, event3];
+
+ await AppendEventsToStreamAsync(event1);
+ await AppendEventsToStreamAsync(event2);
+ await AppendEventsToStreamAsync(event3);
+
+ var checkpointFileName = Path.GetRandomFileName();
+
+ _subscriber = KurrentDbSubscriber.CreateCatchupSubscriptionSubscribedToAllUsingCheckpoint(
+ _kurrentDbClient,
+ _dispatcher,
+ new NullLogger(),
+ checkpointFileName);
+
+ _subscriber.Start();
+
+ await _dispatcher.WaitForEventsToBeDispatched(event1, event2, event3);
+
+ var deserializedDispatchedEvents =
+ _dispatcher.DispatchedEvents.Select(DeserializeEventData);
+
+ Assert.Multiple(() =>
+ {
+ Assert.That(deserializedDispatchedEvents, Is.EqualTo(eventsExpectedToBeDispatched));
+ Assert.That(_subscriber.IsLive);
+ });
+ }
+
+ [Test]
+ public async Task CreateCatchupSubscriptionSubscribedToAllUsingCheckpoint_GivenEventsInStreamAndExistingCheckpointFile_DispatchesEventsFromPositionAndBecomesLive()
+ {
+ var event1 = SimpleEvent.Create();
+ var event2 = SimpleEvent.Create();
+ var event3 = SimpleEvent.Create();
+
+ List eventsExpectedToBeDispatched = [event3];
+
+ await AppendEventsToStreamAsync(event1);
+ var startingPosition = await AppendEventsToStreamAsync(event2);
+ await AppendEventsToStreamAsync(event3);
+
+ var checkpointFileName = Path.GetRandomFileName();
+ var checkpoint = new WriteThroughFileCheckpoint(checkpointFileName);
+ checkpoint.Write((long)startingPosition.LogPosition.CommitPosition);
+
+ _subscriber = KurrentDbSubscriber.CreateCatchupSubscriptionSubscribedToAllUsingCheckpoint(
+ _kurrentDbClient,
+ _dispatcher,
+ new NullLogger(),
+ checkpointFileName);
+
+ _subscriber.Start();
+
+ await _dispatcher.WaitForEventsToBeDispatched(event3);
+
+ var deserializedDispatchedEvents =
+ _dispatcher.DispatchedEvents.Select(DeserializeEventData);
+
+ Assert.Multiple(() =>
+ {
+ Assert.That(deserializedDispatchedEvents, Is.EqualTo(eventsExpectedToBeDispatched));
+ Assert.That(_subscriber.IsLive);
+ });
+ }
+
+
+ [Test]
+ public async Task CreateCatchupSubscriptionUsingCheckpoint_GivenEventsInStreamAndNoExistingCheckpointFile_DispatchesAllEventsAndBecomesLive()
+ {
+ var event1 = SimpleEvent.Create();
+ var event2 = SimpleEvent.Create();
+ var event3 = SimpleEvent.Create();
+
+ List eventsExpectedToBeDispatched = [event1, event2, event3];
+
+ await AppendEventsToStreamAsync(event1);
+ await AppendEventsToStreamAsync(event2);
+ await AppendEventsToStreamAsync(event3);
+
+ var checkpointFileName = Path.GetRandomFileName();
+
+ _subscriber = KurrentDbSubscriber.CreateCatchupSubscriptionUsingCheckpoint(
+ _kurrentDbClient,
+ _dispatcher,
+ StreamName,
+ new NullLogger(),
+ checkpointFileName);
+
+ _subscriber.Start();
+
+ await _dispatcher.WaitForEventsToBeDispatched(event1, event2, event3);
+
+ var deserializedDispatchedEvents =
+ _dispatcher.DispatchedEvents.Select(DeserializeEventData);
+
+ Assert.Multiple(() =>
+ {
+ Assert.That(deserializedDispatchedEvents, Is.EqualTo(eventsExpectedToBeDispatched));
+ Assert.That(_subscriber.IsLive);
+ });
+ }
+
+ [Test]
+ public async Task CreateCatchupSubscriptionUsingCheckpoint_GivenEventsInStreamAndExistingCheckpointFile_DispatchesEventsFromPositionAndBecomesLive()
+ {
+ var event1 = SimpleEvent.Create();
+ var event2 = SimpleEvent.Create();
+ var event3 = SimpleEvent.Create();
+
+ List eventsExpectedToBeDispatched = [event3];
+
+ await AppendEventsToStreamAsync(event1);
+ await AppendEventsToStreamAsync(event2);
+ await AppendEventsToStreamAsync(event3);
+
+ var checkpointFileName = Path.GetRandomFileName();
+
+ var checkpoint = new WriteThroughFileCheckpoint(checkpointFileName);
+ checkpoint.Write(1);
+
+ _subscriber = KurrentDbSubscriber.CreateCatchupSubscriptionUsingCheckpoint(
+ _kurrentDbClient,
+ _dispatcher,
+ StreamName,
+ new NullLogger(),
+ checkpointFileName);
+
+ _subscriber.Start();
+
+ await _dispatcher.WaitForEventsToBeDispatched(event3);
+
+ var deserializedDispatchedEvents =
+ _dispatcher.DispatchedEvents.Select(DeserializeEventData);
+
+ Assert.Multiple(() =>
+ {
+ Assert.That(deserializedDispatchedEvents, Is.EqualTo(eventsExpectedToBeDispatched));
+ Assert.That(_subscriber.IsLive);
+ });
+ }
+
// ReSharper disable once NotAccessedPositionalProperty.Local
private record SimpleEvent(Guid Id)
{
diff --git a/src/MessageDispatch.KurrentDB/KurrentDbSubscriber.cs b/src/MessageDispatch.KurrentDB/KurrentDbSubscriber.cs
index 2ddb444..f0d7328 100644
--- a/src/MessageDispatch.KurrentDB/KurrentDbSubscriber.cs
+++ b/src/MessageDispatch.KurrentDB/KurrentDbSubscriber.cs
@@ -115,7 +115,6 @@ public static KurrentDbSubscriber CreateLiveSubscription(
/// Logger.
/// Path of the checkpoint file.
/// A new KurrentDbSubscriber object.
- // ReSharper disable once UnusedMember.Global
public static KurrentDbSubscriber CreateCatchupSubscriptionUsingCheckpoint(
KurrentDBClient kurrentDbClient,
IDispatcher dispatcher,
@@ -187,7 +186,6 @@ public static KurrentDbSubscriber CreateCatchupSubscriptionSubscribedToAllFromPo
/// Logger.
/// Path of the checkpoint file.
/// A new KurrentDbSubscriber object.
- // ReSharper disable once UnusedMember.Global
public static KurrentDbSubscriber CreateCatchupSubscriptionSubscribedToAllUsingCheckpoint(
KurrentDBClient kurrentDbClient,
IDispatcher dispatcher,
diff --git a/src/MessageDispatch.KurrentDB/MessageDispatch.KurrentDB.csproj b/src/MessageDispatch.KurrentDB/MessageDispatch.KurrentDB.csproj
index a1d3180..09f98b3 100644
--- a/src/MessageDispatch.KurrentDB/MessageDispatch.KurrentDB.csproj
+++ b/src/MessageDispatch.KurrentDB/MessageDispatch.KurrentDB.csproj
@@ -43,4 +43,8 @@
+
+
+
+
From abbdc9983a98c0a76d549cfc3d6af671f0766d59 Mon Sep 17 00:00:00 2001
From: Sam Matthews <36134497+SamBucaMatthews@users.noreply.github.com>
Date: Mon, 2 Jun 2025 10:39:29 +0100
Subject: [PATCH 09/15] dotnet format
---
src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs | 1 -
1 file changed, 1 deletion(-)
diff --git a/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs b/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs
index ec977bd..384367d 100644
--- a/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs
+++ b/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs
@@ -308,7 +308,6 @@ public async Task CreateCatchupSubscriptionSubscribedToAllUsingCheckpoint_GivenE
});
}
-
[Test]
public async Task CreateCatchupSubscriptionUsingCheckpoint_GivenEventsInStreamAndNoExistingCheckpointFile_DispatchesAllEventsAndBecomesLive()
{
From 6c53ae8c1e90bcdca7eebf924561a39201c066ec Mon Sep 17 00:00:00 2001
From: Sam Matthews <36134497+SamBucaMatthews@users.noreply.github.com>
Date: Tue, 3 Jun 2025 11:53:52 +0100
Subject: [PATCH 10/15] Update to use ES v24.10.5
---
src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
diff --git a/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs b/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs
index 384367d..143673e 100644
--- a/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs
+++ b/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs
@@ -24,7 +24,7 @@ public class SubscriberTests
public async Task Setup()
{
const int eventStoreHostPort = 1234;
- const string eventStoreVersion = "23.10.0";
+ const string eventStoreVersion = "24.10.5";
var eventStoreImageName = RuntimeInformation.OSArchitecture == Architecture.Arm64
? $"ghcr.io/eventstore/eventstore:{eventStoreVersion}-alpha-arm64v8"
@@ -430,7 +430,6 @@ private static EventStoreDbContainer BuildEventStoreContainer(string imageName,
{
{ "EVENTSTORE_INSECURE", "true" },
{ "EVENTSTORE_ENABLE_ATOM_PUB_OVER_HTTP", "true" },
- { "EVENTSTORE_ENABLE_EXTERNAL_TCP", "true" },
{ "EVENTSTORE_HTTP_PORT", hostPort.ToString() },
{ "EVENTSTORE_RUN_PROJECTIONS", "All" },
})
From 980793fb8cf30b4dae671c05b99feed61de6e731 Mon Sep 17 00:00:00 2001
From: Sam Matthews <36134497+SamBucaMatthews@users.noreply.github.com>
Date: Wed, 4 Jun 2025 11:34:22 +0100
Subject: [PATCH 11/15] Pass null as starting position
---
src/MessageDispatch.KurrentDB/KurrentDbSubscriber.cs | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/src/MessageDispatch.KurrentDB/KurrentDbSubscriber.cs b/src/MessageDispatch.KurrentDB/KurrentDbSubscriber.cs
index f0d7328..a9c6875 100644
--- a/src/MessageDispatch.KurrentDB/KurrentDbSubscriber.cs
+++ b/src/MessageDispatch.KurrentDB/KurrentDbSubscriber.cs
@@ -156,7 +156,7 @@ public static KurrentDbSubscriber CreateCatchupSubscriptionSubscribedToAll(
dispatcher,
AllStreamName,
logger,
- 0);
+ null);
///
/// Creates an KurrentDB catchup subscription that is subscribed to all from a position.
From 29e2a6978e230335e6f22fd6c1d3d4fe93183ba6 Mon Sep 17 00:00:00 2001
From: Sam Matthews <36134497+SamBucaMatthews@users.noreply.github.com>
Date: Wed, 4 Jun 2025 12:44:17 +0100
Subject: [PATCH 12/15] Build tests for 481 (currently failing)
---
.../MessageDispatch.KurrentDB.Tests.csproj | 3 ++-
.../SubscriberTests.cs | 18 +++++++++++++++---
2 files changed, 17 insertions(+), 4 deletions(-)
diff --git a/src/MessageDispatch.KurrentDB.Tests/MessageDispatch.KurrentDB.Tests.csproj b/src/MessageDispatch.KurrentDB.Tests/MessageDispatch.KurrentDB.Tests.csproj
index 0589167..126a79b 100644
--- a/src/MessageDispatch.KurrentDB.Tests/MessageDispatch.KurrentDB.Tests.csproj
+++ b/src/MessageDispatch.KurrentDB.Tests/MessageDispatch.KurrentDB.Tests.csproj
@@ -1,12 +1,13 @@
- net8.0
enable
enable
false
true
+ net8.0;net481
+ latest
diff --git a/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs b/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs
index 143673e..58aa2cc 100644
--- a/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs
+++ b/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs
@@ -383,10 +383,22 @@ public async Task CreateCatchupSubscriptionUsingCheckpoint_GivenEventsInStreamAn
});
}
- // ReSharper disable once NotAccessedPositionalProperty.Local
- private record SimpleEvent(Guid Id)
+ private class SimpleEvent
{
+ // ReSharper disable once UnusedAutoPropertyAccessor.Local
+ // ReSharper disable once MemberCanBePrivate.Local
+ public Guid Id { get; }
+
+ // ReSharper disable once MemberCanBePrivate.Local
+ public SimpleEvent(Guid id) => Id = id;
+
public static SimpleEvent Create() => new(Guid.NewGuid());
+
+ public override bool Equals(object? obj) => obj is SimpleEvent other && Id.Equals(other.Id);
+
+ public override int GetHashCode() => Id.GetHashCode();
+
+ public override string ToString() => Id.ToString();
}
private class AwaitableDispatcherSpy : IDispatcher
@@ -419,7 +431,7 @@ public Task WaitForEventsToBeDispatched(params object[] events)
}
private static T? DeserializeEventData(ResolvedEvent message) =>
- JsonSerializer.Deserialize(Encoding.UTF8.GetString(message.Event.Data.Span));
+ JsonSerializer.Deserialize(Encoding.UTF8.GetString(message.Event.Data.Span.ToArray()));
private static EventStoreDbContainer BuildEventStoreContainer(string imageName, int hostPort) =>
new EventStoreDbBuilder()
From ebd686dec86fd4424ed56b62b93710f256362a5d Mon Sep 17 00:00:00 2001
From: Sam Matthews <36134497+SamBucaMatthews@users.noreply.github.com>
Date: Wed, 4 Jun 2025 16:16:47 +0100
Subject: [PATCH 13/15] Add test for and fix null ref on missing linked event
---
.../SubscriberTests.cs | 53 +++++++++++++++++++
.../KurrentDbSubscriber.cs | 3 +-
2 files changed, 55 insertions(+), 1 deletion(-)
diff --git a/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs b/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs
index 58aa2cc..204a7d7 100644
--- a/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs
+++ b/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs
@@ -1,5 +1,6 @@
// Copyright (c) Pharmaxo. All rights reserved.
+using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Text;
using System.Text.Json;
@@ -383,6 +384,58 @@ public async Task CreateCatchupSubscriptionUsingCheckpoint_GivenEventsInStreamAn
});
}
+ [Test]
+ public async Task IsLive_WhenCatchingUpUsingLinkedEventsGivenMissingLinkedEvent_ReturnsTrueOnceCaughtUp()
+ {
+ await AppendEventsToStreamAsync(SimpleEvent.Create(), SimpleEvent.Create());
+
+ const string linkedStream = "non-system";
+ var event1LinkedData = new EventData(
+ Uuid.NewUuid(),
+ SystemEventTypes.LinkTo,
+ Encoding.UTF8.GetBytes($"0@{StreamName}")
+ );
+
+ var event2LinkedData = new EventData(
+ Uuid.NewUuid(),
+ SystemEventTypes.LinkTo,
+ Encoding.UTF8.GetBytes($"1@{StreamName}")
+ );
+
+ var deletedLinkData = new EventData(
+ Uuid.NewUuid(),
+ SystemEventTypes.LinkTo,
+ Encoding.UTF8.GetBytes($"2@{StreamName}")
+ );
+
+ await _kurrentDbClient.AppendToStreamAsync(
+ linkedStream,
+ StreamState.NoStream,
+ [event1LinkedData, event2LinkedData, deletedLinkData]);
+
+ _subscriber = KurrentDbSubscriber.CreateCatchupSubscriptionFromPosition(
+ _kurrentDbClient,
+ _dispatcher,
+ linkedStream,
+ new NullLogger(),
+ null);
+
+ _subscriber.Start();
+
+ var stopwatch = Stopwatch.StartNew();
+ while (stopwatch.Elapsed < TimeSpan.FromSeconds(5))
+ {
+ if (_subscriber.IsLive)
+ {
+ break;
+ }
+
+ Thread.Sleep(TimeSpan.FromMilliseconds(100));
+ }
+
+ Assert.That(_subscriber.IsLive, "Subscriber was not live");
+ }
+
private class SimpleEvent
{
// ReSharper disable once UnusedAutoPropertyAccessor.Local
diff --git a/src/MessageDispatch.KurrentDB/KurrentDbSubscriber.cs b/src/MessageDispatch.KurrentDB/KurrentDbSubscriber.cs
index a9c6875..801443d 100644
--- a/src/MessageDispatch.KurrentDB/KurrentDbSubscriber.cs
+++ b/src/MessageDispatch.KurrentDB/KurrentDbSubscriber.cs
@@ -349,7 +349,8 @@ private void Init(
private void ProcessEvent(ResolvedEvent resolvedEvent)
{
- if (resolvedEvent.Event.EventType.StartsWith("$"))
+ // ReSharper disable once ConditionIsAlwaysTrueOrFalse - the linked event could be null if the original event was deleted.
+ if (resolvedEvent.Event is null || resolvedEvent.Event.EventType.StartsWith("$"))
{
return;
}
From b5735c6964e96ae94bd96b049d4779924aa5c41f Mon Sep 17 00:00:00 2001
From: Sam Matthews <36134497+SamBucaMatthews@users.noreply.github.com>
Date: Thu, 5 Jun 2025 11:15:11 +0100
Subject: [PATCH 14/15] Run ES container as root & don't verify cert
---
src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
diff --git a/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs b/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs
index 204a7d7..c7d8a8a 100644
--- a/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs
+++ b/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs
@@ -35,7 +35,7 @@ public async Task Setup()
await eventStoreContainer.StartAsync();
var mappedHostPort = eventStoreContainer.GetMappedPublicPort(eventStoreHostPort);
- _connectionString = $"esdb://admin:changeit@localhost:{mappedHostPort}?tls=false";
+ _connectionString = $"esdb://admin:changeit@localhost:{mappedHostPort}?tls=true&tlsVerifyCert=false";
_kurrentDbClient = new KurrentDBClient(KurrentDBClientSettings.Create(_connectionString));
_dispatcher = new AwaitableDispatcherSpy();
@@ -490,9 +490,11 @@ private static EventStoreDbContainer BuildEventStoreContainer(string imageName,
new EventStoreDbBuilder()
.WithImage(imageName)
.WithCleanUp(true)
+ .WithCreateParameterModifier(cmd => cmd.User = "root")
.WithPortBinding(hostPort, true)
.WithEnvironment(new Dictionary
{
+ { "EVENTSTORE_DEV", "true" },
{ "EVENTSTORE_INSECURE", "true" },
{ "EVENTSTORE_ENABLE_ATOM_PUB_OVER_HTTP", "true" },
{ "EVENTSTORE_HTTP_PORT", hostPort.ToString() },
From 6c99b40d643bcb3ff99731620a3a69dbde3174fc Mon Sep 17 00:00:00 2001
From: Josh Pattie
Date: Thu, 5 Jun 2025 11:23:24 +0100
Subject: [PATCH 15/15] Do not run insecure
---
src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs b/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs
index c7d8a8a..92d8bb3 100644
--- a/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs
+++ b/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs
@@ -495,7 +495,7 @@ private static EventStoreDbContainer BuildEventStoreContainer(string imageName,
.WithEnvironment(new Dictionary
{
{ "EVENTSTORE_DEV", "true" },
- { "EVENTSTORE_INSECURE", "true" },
+ { "EVENTSTORE_INSECURE", "false" },
{ "EVENTSTORE_ENABLE_ATOM_PUB_OVER_HTTP", "true" },
{ "EVENTSTORE_HTTP_PORT", hostPort.ToString() },
{ "EVENTSTORE_RUN_PROJECTIONS", "All" },