Skip to content

Commit 103225d

Browse files
implemented config watching
1 parent ee416ee commit 103225d

3 files changed

Lines changed: 91 additions & 6 deletions

File tree

src/Config.SqlStreamStore.Tests/ConfigRepositoryTests.cs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
using System;
12
using System.Threading;
23
using System.Threading.Tasks;
34
using SqlStreamStore;
@@ -73,6 +74,31 @@ private async Task<ConfigurationSettings> SaveSettings(IConfigurationSettings se
7374
return await _configRepository.GetLatest(CancellationToken.None);
7475
}
7576

77+
[Fact]
78+
public async Task Can_subscribe_to_changes()
79+
{
80+
var settings = await SaveSettings(BuildNewSettings());
81+
82+
var tcs = new TaskCompletionSource<ConfigurationSettings>();
83+
84+
Task OnSettingsChanged(ConfigurationSettings configurationSettings, CancellationToken ct)
85+
{
86+
tcs.SetResult(configurationSettings);
87+
return Task.CompletedTask;
88+
}
89+
90+
var subscription = _configRepository.SubscribeToChanges(settings.Version, OnSettingsChanged,
91+
ct: CancellationToken.None);
92+
93+
var modified = await _configRepository.WriteChanges(settings.Modify(("setting1", "newValue")), CancellationToken.None);
94+
95+
var noftifiedSettings = await Task.WhenAny(tcs.Task, Task.Delay(TimeSpan.FromSeconds(1)));
96+
97+
Assert.True(tcs.Task.IsCompleted);
98+
Assert.Equal(modified, tcs.Task.Result);
99+
}
100+
101+
76102
private static IConfigurationSettings BuildNewSettings()
77103
{
78104
var settings = new ModifiedConfigurationSettings(

src/Config.SqlStreamStore/ConfigRepository.cs

Lines changed: 62 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using Newtonsoft.Json;
66
using SqlStreamStore;
77
using SqlStreamStore.Streams;
8+
using SqlStreamStore.Subscriptions;
89

910
namespace Config.SqlStreamStore
1011
{
@@ -15,6 +16,8 @@ public interface IConfigRepository
1516

1617
public class ConfigRepository : IConfigRepository
1718
{
19+
public delegate Task OnSettingsChanged(ConfigurationSettings settings, CancellationToken ct);
20+
1821
private readonly IStreamStore _streamStore;
1922
private readonly StreamId _streamId;
2023

@@ -38,11 +41,17 @@ public async Task<ConfigurationSettings> GetLatest(CancellationToken ct)
3841
}
3942

4043
var lastMessage = lastPage.Messages.First();
41-
var data = await lastMessage.GetJsonData(ct);
44+
return await BuildConfigurationSettingsFromMessage(lastMessage, ct);
45+
}
46+
47+
private static async Task<ConfigurationSettings> BuildConfigurationSettingsFromMessage(
48+
StreamMessage message, CancellationToken ct)
49+
{
50+
var data = await message.GetJsonData(ct);
4251

4352
var configChanged = JsonConvert.DeserializeObject<ConfigChanged>(data);
4453

45-
return new ConfigurationSettings(lastMessage.StreamVersion, lastMessage.CreatedUtc, configChanged.AllSettings);
54+
return new ConfigurationSettings(message.StreamVersion, message.CreatedUtc, configChanged.AllSettings);
4655
}
4756

4857
public async Task<IConfigurationSettings> WriteChanges(IConfigurationSettings settings, CancellationToken ct)
@@ -61,6 +70,56 @@ public async Task<IConfigurationSettings> WriteChanges(IConfigurationSettings se
6170

6271
return new ConfigurationSettings(result.CurrentVersion, null, changes.AllSettings);
6372
}
64-
73+
74+
public IDisposable SubscribeToChanges(int version,
75+
OnSettingsChanged onSettingsChanged,
76+
CancellationToken ct)
77+
{
78+
IStreamSubscription subscription = null;
79+
80+
81+
async Task StreamMessageReceived(IStreamSubscription _, StreamMessage streamMessage,
82+
CancellationToken cancellationToken)
83+
{
84+
var settings = await BuildConfigurationSettingsFromMessage(streamMessage, ct);
85+
await onSettingsChanged(settings, ct);
86+
};
87+
88+
void SubscriptionDropped(IStreamSubscription _, SubscriptionDroppedReason reason,
89+
Exception exception = null)
90+
{
91+
if (reason != SubscriptionDroppedReason.Disposed)
92+
{
93+
SetupSubscription();
94+
}
95+
};
96+
97+
void SetupSubscription()
98+
{
99+
subscription = _streamStore.SubscribeToStream(
100+
streamId: _streamId,
101+
continueAfterVersion: version,
102+
streamMessageReceived: StreamMessageReceived,
103+
subscriptionDropped: SubscriptionDropped);
104+
}
105+
106+
SetupSubscription();
107+
108+
return subscription;
109+
}
110+
111+
private class DelegateDisposable : IDisposable
112+
{
113+
public readonly Action Action;
114+
public DelegateDisposable(Action action)
115+
{
116+
Action = action;
117+
}
118+
119+
public void Dispose()
120+
{
121+
Action();
122+
}
123+
}
65124
}
66125
}

src/Config.SqlStreamStore/StreamStoreConfigurationSource.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,13 @@ namespace Config.SqlStreamStore
66
{
77
public class StreamStoreConfigurationSource : IConfigurationSource
88
{
9-
private readonly BuildStreamStoreFromConfig _buildStreamStoreFromConfig;
109

1110
public delegate IStreamStore BuildStreamStoreFromConnectionString(string connectionString);
1211
public delegate IStreamStore BuildStreamStoreFromConfig(IConfigurationRoot configurationRoot);
13-
1412
public delegate IConfigRepository BuildConfigRepository();
13+
private readonly BuildStreamStoreFromConfig _buildStreamStoreFromConfig;
14+
15+
1516

1617
private readonly BuildConfigRepository _getConfigRepository;
1718

@@ -69,6 +70,5 @@ public IConfigurationProvider Build(IConfigurationBuilder builder)
6970

7071
}
7172

72-
7373
}
7474
}

0 commit comments

Comments
 (0)