diff --git a/src/MessageDispatch.KurrentDB.Tests/MessageDispatch.KurrentDB.Tests.csproj b/src/MessageDispatch.KurrentDB.Tests/MessageDispatch.KurrentDB.Tests.csproj
new file mode 100644
index 0000000..126a79b
--- /dev/null
+++ b/src/MessageDispatch.KurrentDB.Tests/MessageDispatch.KurrentDB.Tests.csproj
@@ -0,0 +1,30 @@
+
+
+
+ enable
+ enable
+
+ false
+ true
+ net8.0;net481
+ latest
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs b/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs
new file mode 100644
index 0000000..92d8bb3
--- /dev/null
+++ b/src/MessageDispatch.KurrentDB.Tests/SubscriberTests.cs
@@ -0,0 +1,526 @@
+// Copyright (c) Pharmaxo. All rights reserved.
+
+using System.Diagnostics;
+using System.Runtime.InteropServices;
+using System.Text;
+using System.Text.Json;
+using CorshamScience.MessageDispatch.Core;
+using DotNet.Testcontainers.Builders;
+using KurrentDB.Client;
+using Microsoft.Extensions.Logging.Abstractions;
+using PharmaxoScientific.MessageDispatch.KurrentDB;
+using Testcontainers.EventStoreDb;
+
+namespace MessageDispatch.KurrentDB.Tests;
+
+public class SubscriberTests
+{
+ private const string StreamName = "stream1";
+ private string _connectionString;
+ private KurrentDBClient _kurrentDbClient;
+ private AwaitableDispatcherSpy _dispatcher;
+ private KurrentDbSubscriber? _subscriber;
+
+ [SetUp]
+ public async Task Setup()
+ {
+ const int eventStoreHostPort = 1234;
+ const string eventStoreVersion = "24.10.5";
+
+ var eventStoreImageName = RuntimeInformation.OSArchitecture == Architecture.Arm64
+ ? $"ghcr.io/eventstore/eventstore:{eventStoreVersion}-alpha-arm64v8"
+ : $"eventstore/eventstore:{eventStoreVersion}-bookworm-slim";
+
+ var eventStoreContainer = BuildEventStoreContainer(eventStoreImageName, eventStoreHostPort);
+ await eventStoreContainer.StartAsync();
+
+ var mappedHostPort = eventStoreContainer.GetMappedPublicPort(eventStoreHostPort);
+ _connectionString = $"esdb://admin:changeit@localhost:{mappedHostPort}?tls=true&tlsVerifyCert=false";
+
+ _kurrentDbClient = new KurrentDBClient(KurrentDBClientSettings.Create(_connectionString));
+ _dispatcher = new AwaitableDispatcherSpy();
+ }
+
+ [TearDown]
+ public async Task TearDown()
+ {
+ await _kurrentDbClient.DisposeAsync();
+ _subscriber?.ShutDown();
+ }
+
+ [Test]
+ public async Task CreateLiveSubscription_GivenNoEventsInStreamWhenNewEventsAdded_DispatchesEventsAndBecomesLive()
+ {
+ _subscriber = KurrentDbSubscriber.CreateLiveSubscription(
+ _kurrentDbClient,
+ _dispatcher,
+ StreamName,
+ new NullLogger());
+
+ _subscriber.Start();
+
+ var event1 = SimpleEvent.Create();
+ var event2 = SimpleEvent.Create();
+ var event3 = SimpleEvent.Create();
+
+ List events = [event1, event2, event3];
+
+ await AppendEventsToStreamAsync(event1, event2, event3);
+ await _dispatcher.WaitForEventsToBeDispatched(event1, event2, event3);
+
+ var deserializedDispatchedEvents =
+ _dispatcher.DispatchedEvents.Select(DeserializeEventData);
+
+ Assert.Multiple(() =>
+ {
+ Assert.That(deserializedDispatchedEvents, Is.EqualTo(events));
+ Assert.That(_subscriber.IsLive);
+ });
+ }
+
+ [Test]
+ public async Task CreateLiveSubscription_GivenExistingEventsInStreamWhenNewEventsAdded_DispatchesNewEventsAndBecomesLive()
+ {
+ _subscriber = KurrentDbSubscriber.CreateLiveSubscription(
+ _kurrentDbClient,
+ _dispatcher,
+ StreamName,
+ new NullLogger());
+
+ var oldEvent1 = SimpleEvent.Create();
+ var oldEvent2 = SimpleEvent.Create();
+
+ await AppendEventsToStreamAsync(oldEvent1, oldEvent2);
+
+ _subscriber.Start();
+
+ var event1 = SimpleEvent.Create();
+ var event2 = SimpleEvent.Create();
+ var event3 = SimpleEvent.Create();
+
+ List events = [event1, event2, event3];
+
+ await AppendEventsToStreamAsync(event1, event2, event3);
+ await _dispatcher.WaitForEventsToBeDispatched(event1, event2, event3);
+
+ var deserializedDispatchedEvents =
+ _dispatcher.DispatchedEvents.Select(DeserializeEventData);
+
+ Assert.Multiple(() =>
+ {
+ Assert.That(deserializedDispatchedEvents, Is.EqualTo(events));
+ Assert.That(_subscriber.IsLive);
+ });
+ }
+
+ [Test]
+ public async Task CreateCatchupSubscriptionSubscribedToAll_GivenEventsInStream_DispatchesEventsAndBecomesLive()
+ {
+ _subscriber = KurrentDbSubscriber.CreateCatchupSubscriptionSubscribedToAll(
+ _kurrentDbClient,
+ _dispatcher,
+ new NullLogger());
+
+ var event1 = SimpleEvent.Create();
+ var event2 = SimpleEvent.Create();
+ var event3 = SimpleEvent.Create();
+
+ List events = [event1, event2, event3];
+
+ await AppendEventsToStreamAsync(event1, event2, event3);
+
+ _subscriber.Start();
+
+ await _dispatcher.WaitForEventsToBeDispatched(event1, event2, event3);
+
+ var deserializedDispatchedEvents =
+ _dispatcher.DispatchedEvents.Select(DeserializeEventData);
+
+ Assert.Multiple(() =>
+ {
+ Assert.That(deserializedDispatchedEvents, Is.EqualTo(events));
+ Assert.That(_subscriber.IsLive);
+ });
+ }
+
+ [Test]
+ public async Task CreateCatchupSubscriptionSubscribedToAll_GivenNoEventsInStreamGivenNewEvents_DispatchesEventsAndBecomesLive()
+ {
+ _subscriber = KurrentDbSubscriber.CreateCatchupSubscriptionSubscribedToAll(
+ _kurrentDbClient,
+ _dispatcher,
+ new NullLogger());
+
+ var event1 = SimpleEvent.Create();
+ var event2 = SimpleEvent.Create();
+ var event3 = SimpleEvent.Create();
+
+ List events = [event1, event2, event3];
+
+ _subscriber.Start();
+
+ await AppendEventsToStreamAsync(event1, event2, event3);
+ await _dispatcher.WaitForEventsToBeDispatched(event1, event2, event3);
+
+ var deserializedDispatchedEvents =
+ _dispatcher.DispatchedEvents.Select(DeserializeEventData);
+
+ Assert.Multiple(() =>
+ {
+ Assert.That(deserializedDispatchedEvents, Is.EqualTo(events));
+ Assert.That(_subscriber.IsLive);
+ });
+ }
+
+ [Test]
+ public async Task CreateCatchupSubscriptionFromPosition_GivenEventsInStreamAndStartPosition_DispatchesEventsFromPositionAndBecomesLive()
+ {
+ _subscriber = KurrentDbSubscriber.CreateCatchupSubscriptionFromPosition(
+ _kurrentDbClient,
+ _dispatcher,
+ StreamName,
+ new NullLogger(),
+ 1);
+
+ var event1 = SimpleEvent.Create();
+ var event2 = SimpleEvent.Create();
+ var event3 = SimpleEvent.Create();
+
+ List events = [event3];
+
+ await AppendEventsToStreamAsync(event1, event2, event3);
+
+ _subscriber.Start();
+
+ await _dispatcher.WaitForEventsToBeDispatched(event3);
+
+ var deserializedDispatchedEvents =
+ _dispatcher.DispatchedEvents.Select(DeserializeEventData);
+
+ Assert.Multiple(() =>
+ {
+ Assert.That(deserializedDispatchedEvents, Is.EqualTo(events));
+ Assert.That(_subscriber.IsLive);
+ });
+ }
+
+ [Test]
+ public async Task CreateCatchupSubscriptionSubscribedToAllFromPosition_GivenEventsInStreamAndStartPosition_DispatchesEventsFromPositionAndBecomesLive()
+ {
+ var event1 = SimpleEvent.Create();
+ var event2 = SimpleEvent.Create();
+ var event3 = SimpleEvent.Create();
+
+ List eventsExpectedToBeDispatched = [event3];
+
+ await AppendEventsToStreamAsync(event1);
+ var startingPosition = await AppendEventsToStreamAsync(event2);
+ await AppendEventsToStreamAsync(event3);
+
+ _subscriber = KurrentDbSubscriber.CreateCatchupSubscriptionSubscribedToAllFromPosition(
+ _kurrentDbClient,
+ _dispatcher,
+ new NullLogger(),
+ startingPosition.LogPosition.CommitPosition);
+
+ _subscriber.Start();
+
+ await _dispatcher.WaitForEventsToBeDispatched(event3);
+
+ var deserializedDispatchedEvents =
+ _dispatcher.DispatchedEvents.Select(DeserializeEventData);
+
+ Assert.Multiple(() =>
+ {
+ Assert.That(deserializedDispatchedEvents, Is.EqualTo(eventsExpectedToBeDispatched));
+ Assert.That(_subscriber.IsLive);
+ });
+ }
+
+ [Test]
+ public async Task CreateCatchupSubscriptionSubscribedToAllUsingCheckpoint_GivenEventsInStreamAndNoExistingCheckpointFile_DispatchesAllEventsAndBecomesLive()
+ {
+ var event1 = SimpleEvent.Create();
+ var event2 = SimpleEvent.Create();
+ var event3 = SimpleEvent.Create();
+
+ List eventsExpectedToBeDispatched = [event1, event2, event3];
+
+ await AppendEventsToStreamAsync(event1);
+ await AppendEventsToStreamAsync(event2);
+ await AppendEventsToStreamAsync(event3);
+
+ var checkpointFileName = Path.GetRandomFileName();
+
+ _subscriber = KurrentDbSubscriber.CreateCatchupSubscriptionSubscribedToAllUsingCheckpoint(
+ _kurrentDbClient,
+ _dispatcher,
+ new NullLogger(),
+ checkpointFileName);
+
+ _subscriber.Start();
+
+ await _dispatcher.WaitForEventsToBeDispatched(event1, event2, event3);
+
+ var deserializedDispatchedEvents =
+ _dispatcher.DispatchedEvents.Select(DeserializeEventData);
+
+ Assert.Multiple(() =>
+ {
+ Assert.That(deserializedDispatchedEvents, Is.EqualTo(eventsExpectedToBeDispatched));
+ Assert.That(_subscriber.IsLive);
+ });
+ }
+
+ [Test]
+ public async Task CreateCatchupSubscriptionSubscribedToAllUsingCheckpoint_GivenEventsInStreamAndExistingCheckpointFile_DispatchesEventsFromPositionAndBecomesLive()
+ {
+ var event1 = SimpleEvent.Create();
+ var event2 = SimpleEvent.Create();
+ var event3 = SimpleEvent.Create();
+
+ List eventsExpectedToBeDispatched = [event3];
+
+ await AppendEventsToStreamAsync(event1);
+ var startingPosition = await AppendEventsToStreamAsync(event2);
+ await AppendEventsToStreamAsync(event3);
+
+ var checkpointFileName = Path.GetRandomFileName();
+ var checkpoint = new WriteThroughFileCheckpoint(checkpointFileName);
+ checkpoint.Write((long)startingPosition.LogPosition.CommitPosition);
+
+ _subscriber = KurrentDbSubscriber.CreateCatchupSubscriptionSubscribedToAllUsingCheckpoint(
+ _kurrentDbClient,
+ _dispatcher,
+ new NullLogger(),
+ checkpointFileName);
+
+ _subscriber.Start();
+
+ await _dispatcher.WaitForEventsToBeDispatched(event3);
+
+ var deserializedDispatchedEvents =
+ _dispatcher.DispatchedEvents.Select(DeserializeEventData);
+
+ Assert.Multiple(() =>
+ {
+ Assert.That(deserializedDispatchedEvents, Is.EqualTo(eventsExpectedToBeDispatched));
+ Assert.That(_subscriber.IsLive);
+ });
+ }
+
+ [Test]
+ public async Task CreateCatchupSubscriptionUsingCheckpoint_GivenEventsInStreamAndNoExistingCheckpointFile_DispatchesAllEventsAndBecomesLive()
+ {
+ var event1 = SimpleEvent.Create();
+ var event2 = SimpleEvent.Create();
+ var event3 = SimpleEvent.Create();
+
+ List eventsExpectedToBeDispatched = [event1, event2, event3];
+
+ await AppendEventsToStreamAsync(event1);
+ await AppendEventsToStreamAsync(event2);
+ await AppendEventsToStreamAsync(event3);
+
+ var checkpointFileName = Path.GetRandomFileName();
+
+ _subscriber = KurrentDbSubscriber.CreateCatchupSubscriptionUsingCheckpoint(
+ _kurrentDbClient,
+ _dispatcher,
+ StreamName,
+ new NullLogger(),
+ checkpointFileName);
+
+ _subscriber.Start();
+
+ await _dispatcher.WaitForEventsToBeDispatched(event1, event2, event3);
+
+ var deserializedDispatchedEvents =
+ _dispatcher.DispatchedEvents.Select(DeserializeEventData);
+
+ Assert.Multiple(() =>
+ {
+ Assert.That(deserializedDispatchedEvents, Is.EqualTo(eventsExpectedToBeDispatched));
+ Assert.That(_subscriber.IsLive);
+ });
+ }
+
+ [Test]
+ public async Task CreateCatchupSubscriptionUsingCheckpoint_GivenEventsInStreamAndExistingCheckpointFile_DispatchesEventsFromPositionAndBecomesLive()
+ {
+ var event1 = SimpleEvent.Create();
+ var event2 = SimpleEvent.Create();
+ var event3 = SimpleEvent.Create();
+
+ List eventsExpectedToBeDispatched = [event3];
+
+ await AppendEventsToStreamAsync(event1);
+ await AppendEventsToStreamAsync(event2);
+ await AppendEventsToStreamAsync(event3);
+
+ var checkpointFileName = Path.GetRandomFileName();
+
+ var checkpoint = new WriteThroughFileCheckpoint(checkpointFileName);
+ checkpoint.Write(1);
+
+ _subscriber = KurrentDbSubscriber.CreateCatchupSubscriptionUsingCheckpoint(
+ _kurrentDbClient,
+ _dispatcher,
+ StreamName,
+ new NullLogger(),
+ checkpointFileName);
+
+ _subscriber.Start();
+
+ await _dispatcher.WaitForEventsToBeDispatched(event3);
+
+ var deserializedDispatchedEvents =
+ _dispatcher.DispatchedEvents.Select(DeserializeEventData);
+
+ Assert.Multiple(() =>
+ {
+ Assert.That(deserializedDispatchedEvents, Is.EqualTo(eventsExpectedToBeDispatched));
+ Assert.That(_subscriber.IsLive);
+ });
+ }
+
+ [Test]
+ public async Task IsLive_WhenCatchingUpUsingLinkedEventsGivenMissingLinkedEvent_ReturnsTrueOnceCaughtUp()
+ {
+ await AppendEventsToStreamAsync(SimpleEvent.Create(), SimpleEvent.Create());
+
+ const string linkedStream = "non-system";
+ var event1LinkedData = new EventData(
+ Uuid.NewUuid(),
+ SystemEventTypes.LinkTo,
+ Encoding.UTF8.GetBytes($"0@{StreamName}")
+ );
+
+ var event2LinkedData = new EventData(
+ Uuid.NewUuid(),
+ SystemEventTypes.LinkTo,
+ Encoding.UTF8.GetBytes($"1@{StreamName}")
+ );
+
+ var deletedLinkData = new EventData(
+ Uuid.NewUuid(),
+ SystemEventTypes.LinkTo,
+ Encoding.UTF8.GetBytes($"2@{StreamName}")
+ );
+
+ await _kurrentDbClient.AppendToStreamAsync(
+ linkedStream,
+ StreamState.NoStream,
+ [event1LinkedData, event2LinkedData, deletedLinkData]);
+
+ _subscriber = KurrentDbSubscriber.CreateCatchupSubscriptionFromPosition(
+ _kurrentDbClient,
+ _dispatcher,
+ linkedStream,
+ new NullLogger(),
+ null);
+
+ _subscriber.Start();
+
+ var stopwatch = Stopwatch.StartNew();
+ while (stopwatch.Elapsed < TimeSpan.FromSeconds(5))
+ {
+ if (_subscriber.IsLive)
+ {
+ break;
+ }
+
+ Thread.Sleep(TimeSpan.FromMilliseconds(100));
+ }
+
+ Assert.That(_subscriber.IsLive, "Subscriber was not live");
+ }
+
+ private class SimpleEvent
+ {
+ // ReSharper disable once UnusedAutoPropertyAccessor.Local
+ // ReSharper disable once MemberCanBePrivate.Local
+ public Guid Id { get; }
+
+ // ReSharper disable once MemberCanBePrivate.Local
+ public SimpleEvent(Guid id) => Id = id;
+
+ public static SimpleEvent Create() => new(Guid.NewGuid());
+
+ public override bool Equals(object? obj) => obj is SimpleEvent other && Id.Equals(other.Id);
+
+ public override int GetHashCode() => Id.GetHashCode();
+
+ public override string ToString() => Id.ToString();
+ }
+
+ private class AwaitableDispatcherSpy : IDispatcher
+ {
+ public List DispatchedEvents { get; } = [];
+
+ public void Dispatch(ResolvedEvent message) => DispatchedEvents.Add(message);
+
+ public Task WaitForEventsToBeDispatched(params object[] events)
+ {
+ if (events.Length == 0)
+ {
+ return Task.CompletedTask;
+ }
+
+ var iterations = 0;
+ while (DispatchedEvents.Count != events.Length)
+ {
+ Thread.Sleep(100);
+ iterations++;
+
+ if (iterations > 10)
+ {
+ throw new TimeoutException("Expected events weren't dispatched within the allotted time.");
+ }
+ }
+
+ return Task.CompletedTask;
+ }
+ }
+
+ private static T? DeserializeEventData(ResolvedEvent message) =>
+ JsonSerializer.Deserialize(Encoding.UTF8.GetString(message.Event.Data.Span.ToArray()));
+
+ private static EventStoreDbContainer BuildEventStoreContainer(string imageName, int hostPort) =>
+ new EventStoreDbBuilder()
+ .WithImage(imageName)
+ .WithCleanUp(true)
+ .WithCreateParameterModifier(cmd => cmd.User = "root")
+ .WithPortBinding(hostPort, true)
+ .WithEnvironment(new Dictionary
+ {
+ { "EVENTSTORE_DEV", "true" },
+ { "EVENTSTORE_INSECURE", "false" },
+ { "EVENTSTORE_ENABLE_ATOM_PUB_OVER_HTTP", "true" },
+ { "EVENTSTORE_HTTP_PORT", hostPort.ToString() },
+ { "EVENTSTORE_RUN_PROJECTIONS", "All" },
+ })
+ .WithWaitStrategy(Wait.ForUnixContainer().UntilPortIsAvailable(hostPort))
+ .Build();
+
+ private static EventData ToEventData(object data, JsonSerializerOptions? options = null)
+ {
+ var metaData = new { ClrType = data.GetType().AssemblyQualifiedName, };
+
+ var type = data.GetType().Name;
+
+ return new EventData(
+ Uuid.NewUuid(),
+ type,
+ Encoding.UTF8.GetBytes(JsonSerializer.Serialize(data, options)),
+ Encoding.UTF8.GetBytes(JsonSerializer.Serialize(metaData, options)));
+ }
+
+ private async Task AppendEventsToStreamAsync(params object[] events)
+ {
+ var eventData = events.Select(e => ToEventData(e));
+ var client = new KurrentDBClient(KurrentDBClientSettings.Create(_connectionString));
+
+ return await client.AppendToStreamAsync(StreamName, StreamState.Any, eventData);
+ }
+}
diff --git a/src/MessageDispatch.KurrentDB.sln b/src/MessageDispatch.KurrentDB.sln
index 2d4c0c8..b502ac7 100644
--- a/src/MessageDispatch.KurrentDB.sln
+++ b/src/MessageDispatch.KurrentDB.sln
@@ -5,6 +5,8 @@ VisualStudioVersion = 17.12.35931.192
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MessageDispatch.KurrentDB", "MessageDispatch.KurrentDB\MessageDispatch.KurrentDB.csproj", "{8B3CD5D9-28DB-4823-8ACE-C5B9770FFE5B}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MessageDispatch.KurrentDB.Tests", "MessageDispatch.KurrentDB.Tests\MessageDispatch.KurrentDB.Tests.csproj", "{10045A11-D589-4A7F-BC17-C4CE508B04F4}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -15,6 +17,10 @@ Global
{8B3CD5D9-28DB-4823-8ACE-C5B9770FFE5B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{8B3CD5D9-28DB-4823-8ACE-C5B9770FFE5B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{8B3CD5D9-28DB-4823-8ACE-C5B9770FFE5B}.Release|Any CPU.Build.0 = Release|Any CPU
+ {10045A11-D589-4A7F-BC17-C4CE508B04F4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {10045A11-D589-4A7F-BC17-C4CE508B04F4}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {10045A11-D589-4A7F-BC17-C4CE508B04F4}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {10045A11-D589-4A7F-BC17-C4CE508B04F4}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
diff --git a/src/MessageDispatch.KurrentDB/KurrentDbSubscriber.cs b/src/MessageDispatch.KurrentDB/KurrentDbSubscriber.cs
index cca6cc4..801443d 100644
--- a/src/MessageDispatch.KurrentDB/KurrentDbSubscriber.cs
+++ b/src/MessageDispatch.KurrentDB/KurrentDbSubscriber.cs
@@ -99,7 +99,6 @@ public CatchupProgress CatchupProgress
/// Stream name to push events into.
/// Logger.
/// A new KurrentDbSubscriber object.
- // ReSharper disable once UnusedMember.Global
public static KurrentDbSubscriber CreateLiveSubscription(
KurrentDBClient kurrentDbClient,
IDispatcher dispatcher,
@@ -116,7 +115,6 @@ public static KurrentDbSubscriber CreateLiveSubscription(
/// Logger.
/// Path of the checkpoint file.
/// A new KurrentDbSubscriber object.
- // ReSharper disable once UnusedMember.Global
public static KurrentDbSubscriber CreateCatchupSubscriptionUsingCheckpoint(
KurrentDBClient kurrentDbClient,
IDispatcher dispatcher,
@@ -134,7 +132,6 @@ public static KurrentDbSubscriber CreateCatchupSubscriptionUsingCheckpoint(
/// Logger.
/// Starting Position.
/// A new KurrentDbSubscriber object.
- // ReSharper disable once UnusedMember.Global
public static KurrentDbSubscriber CreateCatchupSubscriptionFromPosition(
KurrentDBClient kurrentDbClient,
IDispatcher dispatcher,
@@ -150,7 +147,6 @@ public static KurrentDbSubscriber CreateCatchupSubscriptionFromPosition(
/// Dispatcher.
/// Logger.
/// A new KurrentDbSubscriber object.
- // ReSharper disable once UnusedMember.Global
public static KurrentDbSubscriber CreateCatchupSubscriptionSubscribedToAll(
KurrentDBClient kurrentDbClient,
IDispatcher dispatcher,
@@ -159,7 +155,8 @@ public static KurrentDbSubscriber CreateCatchupSubscriptionSubscribedToAll(
kurrentDbClient,
dispatcher,
AllStreamName,
- logger);
+ logger,
+ null);
///
/// Creates an KurrentDB catchup subscription that is subscribed to all from a position.
@@ -169,7 +166,6 @@ public static KurrentDbSubscriber CreateCatchupSubscriptionSubscribedToAll(
/// Logger.
/// Starting Position.
/// A new KurrentDbSubscriber object.
- // ReSharper disable once UnusedMember.Global
public static KurrentDbSubscriber CreateCatchupSubscriptionSubscribedToAllFromPosition(
KurrentDBClient kurrentDbClient,
IDispatcher dispatcher,
@@ -190,7 +186,6 @@ public static KurrentDbSubscriber CreateCatchupSubscriptionSubscribedToAllFromPo
/// Logger.
/// Path of the checkpoint file.
/// A new KurrentDbSubscriber object.
- // ReSharper disable once UnusedMember.Global
public static KurrentDbSubscriber CreateCatchupSubscriptionSubscribedToAllUsingCheckpoint(
KurrentDBClient kurrentDbClient,
IDispatcher dispatcher,
@@ -206,7 +201,6 @@ public static KurrentDbSubscriber CreateCatchupSubscriptionSubscribedToAllUsingC
///
/// Start the subscriber.
///
- // ReSharper disable once MemberCanBePrivate.Global
public async void Start()
{
_cts = new CancellationTokenSource();
@@ -307,7 +301,6 @@ private StreamSubscriptionResult CreateSubscription()
///
/// Shut down the subscription.
///
- // ReSharper disable once UnusedMember.Global
public void ShutDown() => _cts.Cancel();
private void Init(
@@ -356,7 +349,8 @@ private void Init(
private void ProcessEvent(ResolvedEvent resolvedEvent)
{
- if (resolvedEvent.Event.EventType.StartsWith("$"))
+ // ReSharper disable once ConditionIsAlwaysTrueOrFalse - the linked event could be null if the original event was deleted.
+ if (resolvedEvent.Event is null || resolvedEvent.Event.EventType.StartsWith("$"))
{
return;
}
diff --git a/src/MessageDispatch.KurrentDB/MessageDispatch.KurrentDB.csproj b/src/MessageDispatch.KurrentDB/MessageDispatch.KurrentDB.csproj
index a1d3180..09f98b3 100644
--- a/src/MessageDispatch.KurrentDB/MessageDispatch.KurrentDB.csproj
+++ b/src/MessageDispatch.KurrentDB/MessageDispatch.KurrentDB.csproj
@@ -43,4 +43,8 @@
+
+
+
+