Skip to content

Commit 18caa43

Browse files
multi-threaded writing
1 parent 9919904 commit 18caa43

7 files changed

Lines changed: 99 additions & 30 deletions

File tree

src/Config.SqlStreamStore.Tests/ConfigRepositoryTests.cs

Lines changed: 54 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,11 @@ namespace Config.SqlStreamStore.Tests
1414

1515
public class ConfigRepositoryTests
1616
{
17-
private ConfigRepository _configRepository;
17+
private StreamStoreConfigRepository _streamStoreConfigRepository;
1818

1919
public ConfigRepositoryTests()
2020
{
21-
_configRepository = new ConfigRepository(new InMemoryStreamStore());
21+
_streamStoreConfigRepository = new StreamStoreConfigRepository(new InMemoryStreamStore());
2222
}
2323

2424
[Fact]
@@ -28,10 +28,10 @@ public async Task Can_save_new_settings()
2828
("setting1", "value1"),
2929
("setting2", "setting2"));
3030

31-
var result = await _configRepository.WriteChanges(settings, CancellationToken.None);
31+
var result = await _streamStoreConfigRepository.WriteChanges(settings, CancellationToken.None);
3232
Assert.Equal(0, result.Version);
3333

34-
var saved = await _configRepository.GetLatest(CancellationToken.None);
34+
var saved = await _streamStoreConfigRepository.GetLatest(CancellationToken.None);
3535

3636
Assert.Equal(settings, saved);
3737
}
@@ -66,11 +66,11 @@ public async Task Can_delete_existing_settings()
6666
Assert.False(saved.ContainsKey("setting1"));
6767
}
6868

69-
private async Task<ConfigurationSettings> SaveSettings(IConfigurationSettings settings)
69+
private async Task<ConfigurationSettings> SaveSettings(ModifiedConfigurationSettings settings)
7070
{
71-
await _configRepository.WriteChanges(settings, CancellationToken.None);
71+
await _streamStoreConfigRepository.WriteChanges(settings, CancellationToken.None);
7272

73-
return await _configRepository.GetLatest(CancellationToken.None);
73+
return await _streamStoreConfigRepository.GetLatest(CancellationToken.None);
7474
}
7575

7676
[Fact]
@@ -86,19 +86,63 @@ Task OnSettingsChanged(ConfigurationSettings configurationSettings, Cancellation
8686
return Task.CompletedTask;
8787
}
8888

89-
var subscription = _configRepository.SubscribeToChanges(settings.Version, OnSettingsChanged,
89+
var subscription = _streamStoreConfigRepository.SubscribeToChanges(settings.Version, OnSettingsChanged,
9090
ct: CancellationToken.None);
9191

92-
var modified = await _configRepository.WriteChanges(settings.WithModifiedSettings(("setting1", "newValue")), CancellationToken.None);
92+
var modified = await _streamStoreConfigRepository.WriteChanges(settings.WithModifiedSettings(("setting1", "newValue")), CancellationToken.None);
9393

9494
var noftifiedSettings = await Task.WhenAny(tcs.Task, Task.Delay(TimeSpan.FromSeconds(1)));
9595

9696
Assert.True(tcs.Task.IsCompleted);
9797
Assert.Equal(modified, tcs.Task.Result);
9898
}
9999

100+
[Fact]
101+
public async Task Can_modify_concurrently()
102+
{
103+
var delayWriting = new TaskCompletionSource<bool>();
104+
int count = 0;
105+
var waitUntilBothStarted = new TaskCompletionSource<bool>();
106+
bool errorHandlerInvoked = false;
107+
Task<IConfigurationSettings> StartModification(string value)
108+
{
109+
return _streamStoreConfigRepository.Modify(
110+
changeSettings: async (currentSettings, ct) =>
111+
{
112+
if (++count == 2)
113+
{
114+
waitUntilBothStarted.SetResult(true);
115+
}
116+
await delayWriting.Task;
117+
return currentSettings.WithModifiedSettings(("setting1", value));
118+
},
119+
errorHandler: (e, i) =>
120+
{
121+
errorHandlerInvoked = true;
122+
return Task.FromResult(true);
123+
},
124+
ct: CancellationToken.None);
125+
}
126+
127+
// save initial set of data
128+
var saved = await SaveSettings(BuildNewSettings());
129+
Assert.Equal(0, saved.Version);
130+
131+
var t1 = Task.Run(() => StartModification("modified by t1"));
132+
var t2 = Task.Run(() => StartModification("modified by t2"));
133+
134+
await waitUntilBothStarted.Task;
135+
delayWriting.SetResult(true);
136+
137+
await Task.WhenAll(t1, t2);
138+
139+
saved = await _streamStoreConfigRepository.GetLatest(CancellationToken.None);
140+
141+
Assert.Equal(2, saved.Version);
142+
Assert.True(errorHandlerInvoked);
143+
}
100144

101-
private static IConfigurationSettings BuildNewSettings()
145+
private static ModifiedConfigurationSettings BuildNewSettings()
102146
{
103147
var settings = new ModifiedConfigurationSettings(
104148
("setting1", "value1"),

src/Config.SqlStreamStore.Tests/StreamStoreConfigurationProviderTests.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ public async Task Can_reload_when_data_chagnes()
103103
})
104104
.Build();
105105

106-
await new ConfigRepository(instance).Modify(CancellationToken.None,
106+
await new StreamStoreConfigRepository(instance).Modify(CancellationToken.None,
107107
("setting1", "modified"));
108108

109109
await WaitUntil(() => config["setting1"] == "modified");
@@ -128,7 +128,7 @@ public async Task Triggers_reload_token_on_change()
128128
var tcs = new TaskCompletionSource<bool>();
129129
ChangeToken.OnChange(config.GetReloadToken, () => tcs.SetResult(true));
130130

131-
await new ConfigRepository(instance).Modify(CancellationToken.None,
131+
await new StreamStoreConfigRepository(instance).Modify(CancellationToken.None,
132132
("setting1", "modified"));
133133

134134
var noftifiedSettings = await Task.WhenAny(tcs.Task, Task.Delay(TimeSpan.FromSeconds(1)));
@@ -142,7 +142,7 @@ private static async Task<InMemoryStreamStore> BuildSteamStoreWithSettings(param
142142
{
143143
var instance = new InMemoryStreamStore();
144144

145-
var repo = new ConfigRepository(instance);
145+
var repo = new StreamStoreConfigRepository(instance);
146146
await repo.WriteChanges(new ModifiedConfigurationSettings(
147147
settings), CancellationToken.None);
148148

src/Config.SqlStreamStore/ConfigRepository.cs

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,24 +9,24 @@
99

1010
namespace Config.SqlStreamStore
1111
{
12-
public interface IConfigRepository
12+
public interface IStreamStoreConfigRepository
1313
{
1414
Task<ConfigurationSettings> GetLatest(CancellationToken ct);
15-
Task<IConfigurationSettings> WriteChanges(IConfigurationSettings settings, CancellationToken ct);
15+
Task<IConfigurationSettings> WriteChanges(ModifiedConfigurationSettings settings, CancellationToken ct);
1616

1717
IDisposable SubscribeToChanges(int version,
18-
ConfigRepository.OnSettingsChanged onSettingsChanged,
18+
StreamStoreConfigRepository.OnSettingsChanged onSettingsChanged,
1919
CancellationToken ct);
2020
}
2121

22-
public class ConfigRepository : IConfigRepository
22+
public class StreamStoreConfigRepository : IStreamStoreConfigRepository
2323
{
2424
public delegate Task OnSettingsChanged(ConfigurationSettings settings, CancellationToken ct);
2525

2626
private readonly IStreamStore _streamStore;
2727
private readonly StreamId _streamId;
2828

29-
public ConfigRepository(IStreamStore streamStore, string streamId = Constants.DefaultStreamName)
29+
public StreamStoreConfigRepository(IStreamStore streamStore, string streamId = Constants.DefaultStreamName)
3030
{
3131
_streamStore = streamStore;
3232
_streamId = streamId;
@@ -67,10 +67,35 @@ public async Task<IConfigurationSettings> Modify(CancellationToken ct,
6767
return await WriteChanges(modified, ct);
6868
}
6969

70+
public async Task<IConfigurationSettings> Modify(
71+
Func<IConfigurationSettings, CancellationToken, Task<ModifiedConfigurationSettings>> changeSettings,
72+
ErrorHandler errorHandler,
73+
CancellationToken ct)
74+
{
75+
int retryCount = 0;
76+
while (!ct.IsCancellationRequested)
77+
{
78+
var latest = await GetLatest(ct);
79+
var changed = await changeSettings(latest, ct);
80+
try
81+
{
82+
var saved = await WriteChanges(changed, ct);
83+
return saved;
84+
}
85+
catch (Exception e)
86+
{
87+
if (!await errorHandler(e, retryCount++))
88+
throw;
89+
}
90+
}
91+
92+
throw new InvalidOperationException("Failed to write configuration settings");
93+
}
94+
7095

71-
public async Task<IConfigurationSettings> WriteChanges(IConfigurationSettings settings, CancellationToken ct)
96+
public async Task<IConfigurationSettings> WriteChanges(ModifiedConfigurationSettings settings, CancellationToken ct)
7297
{
73-
var changes = (settings as ModifiedConfigurationSettings)?.GetChanges();
98+
var changes = settings.GetChanges();
7499

75100
if (changes == null)
76101
// Nothing to save

src/Config.SqlStreamStore/StreamStoreConfigurationProvider.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,14 @@ namespace Config.SqlStreamStore
1212
public class StreamStoreConfigurationProvider : ConfigurationProvider
1313
{
1414
private readonly StreamStoreConfigurationSource _source;
15-
private readonly IConfigRepository _configRepository;
15+
private readonly IStreamStoreConfigRepository _streamStoreConfigRepository;
1616
private ConfigurationSettings _configurationSettings;
1717

1818
public StreamStoreConfigurationProvider(StreamStoreConfigurationSource source,
19-
IConfigRepository configRepository)
19+
IStreamStoreConfigRepository streamStoreConfigRepository)
2020
{
2121
_source = source;
22-
_configRepository = configRepository;
22+
_streamStoreConfigRepository = streamStoreConfigRepository;
2323

2424
}
2525
public override void Load()
@@ -31,7 +31,7 @@ public override void Load()
3131

3232
try
3333
{
34-
_configurationSettings = _configRepository
34+
_configurationSettings = _streamStoreConfigRepository
3535
.GetLatest(CancellationToken.None).GetAwaiter().GetResult();
3636

3737
break;
@@ -59,7 +59,7 @@ public override void Load()
5959

6060
public IDisposable SubscribeToChanges(CancellationToken ct)
6161
{
62-
return _configRepository.SubscribeToChanges(
62+
return _streamStoreConfigRepository.SubscribeToChanges(
6363
version: _configurationSettings?.Version ?? 0,
6464
onSettingsChanged: OnChanged,
6565
ct: CancellationToken.None);

src/Config.SqlStreamStore/StreamStoreConfigurationSource.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ namespace Config.SqlStreamStore
77
{
88
public delegate IStreamStore BuildStreamStoreFromConnectionString(string connectionString);
99
public delegate IStreamStore BuildStreamStoreFromConfig(IConfigurationRoot configurationRoot);
10-
public delegate IConfigRepository BuildConfigRepository();
10+
public delegate IStreamStoreConfigRepository BuildConfigRepository();
1111

1212
public delegate Task<bool> ErrorHandler(Exception ex, int retryCount);
1313

@@ -45,7 +45,7 @@ public StreamStoreConfigurationSource(string connectionStringKey, BuildStreamSto
4545
}
4646

4747
public StreamStoreConfigurationSource(Func<IStreamStore> getStreamStore) :
48-
this(() => new ConfigRepository(getStreamStore()))
48+
this(() => new StreamStoreConfigRepository(getStreamStore()))
4949
{
5050

5151
}
@@ -71,7 +71,7 @@ public IConfigurationProvider Build(IConfigurationBuilder builder)
7171
}
7272
}
7373

74-
var repo = new ConfigRepository(_buildStreamStoreFromConfig(innerBuilder.Build()));
74+
var repo = new StreamStoreConfigRepository(_buildStreamStoreFromConfig(innerBuilder.Build()));
7575

7676
return new StreamStoreConfigurationProvider(this, repo);
7777

src/Config.SqlstreamStore.Example/Program.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ private static async Task CreateDatabase(string connectionString, CancellationTo
108108
await store.CreateSchema(ct);
109109
}
110110

111-
var repo = new ConfigRepository(store);
111+
var repo = new StreamStoreConfigRepository(store);
112112

113113
while (!ct.IsCancellationRequested)
114114
{
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
ConnectionString = "server=.;database=config;integrated security=true"
1+
ConnectionString = "server=.;database=config;integrated security=true"

0 commit comments

Comments
 (0)