Skip to content

Commit 0d845c1

Browse files
author
andrey.leskov
committed
Improved performance test & performance by unsubscribing from transport in command waiters
1 parent e7b202a commit 0d845c1

3 files changed

Lines changed: 61 additions & 32 deletions

File tree

GridDomain.Node/AkkaMessaging/Waiting/LocalMessagesWaiter.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ public async Task<IWaitResults> Start(TimeSpan? timeout = null)
6161

6262
await WaitForMessages(inbox).TimeoutAfter(timeout ?? _defaultTimeout);
6363

64+
foreach (var type in _messageTypesToSubscribe)
65+
_subscriber.Unsubscribe(inbox.Receiver,type);
66+
6467
return new WaitResults(_allExpectedMessages);
6568
}
6669
}

GridGomain.Tests.Stress/InsertOptimazedBulkConfiguration.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ namespace GridGomain.Tests.Stress
55
{
66
public class InsertOptimazedBulkConfiguration : IPersistentChildsRecycleConfiguration
77
{
8-
public TimeSpan ChildClearPeriod => TimeSpan.FromSeconds(30);
9-
public TimeSpan ChildMaxInactiveTime => TimeSpan.FromSeconds(20);
8+
public TimeSpan ChildClearPeriod => TimeSpan.FromSeconds(1);
9+
public TimeSpan ChildMaxInactiveTime => TimeSpan.FromSeconds(1);
1010
}
1111
}

GridGomain.Tests.Stress/Program.cs

Lines changed: 56 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
using System;
2+
using System.Collections.Generic;
23
using System.Diagnostics;
34
using System.Linq;
5+
using System.Security.Policy;
46
using System.Threading;
57
using System.Threading.Tasks;
68
using Akka.Actor;
@@ -29,9 +31,11 @@ public static void Main(params string[] args)
2931
var cfg = new CustomContainerConfiguration(
3032
c => c.Register(new SampleDomainContainerConfiguration()),
3133
c => c.RegisterType<IPersistentChildsRecycleConfiguration, InsertOptimazedBulkConfiguration>(),
32-
c => c.RegisterType<IQuartzConfig, PersistedQuartzConfig>());
34+
// c => c.RegisterType<IQuartzConfig, PersistedQuartzConfig>());
35+
c => c.RegisterType<IQuartzConfig, InMemoryQuartzConfig>());
3336

34-
Func<ActorSystem[]> actorSystemFactory = () => new[] {new StressTestAkkaConfiguration().CreateSystem()};
37+
// Func<ActorSystem[]> actorSystemFactory = () => new[] {new StressTestAkkaConfiguration().CreateSystem()};
38+
Func<ActorSystem[]> actorSystemFactory = () => new[] {new StressTestAkkaConfiguration().CreateInMemorySystem()};
3539

3640
var node = new GridDomainNode(cfg, new SampleRouteMap(unityContainer), actorSystemFactory);
3741

@@ -40,47 +44,69 @@ public static void Main(params string[] args)
4044
var timer = new Stopwatch();
4145
timer.Start();
4246

43-
var count = 500000;
44-
var step = 100;
47+
var totalAggregatePacksCount = 50000;
48+
var aggregatePackSize = 100;
49+
int timeoutedCommads = 0;
50+
var random = new Random();
51+
int aggregateChangeAmount = 2;
52+
var commandsInPack = aggregatePackSize * (aggregateChangeAmount + 1);
4553

46-
47-
for (int i = 0; i < count; i += step)
54+
for (int i = 0; i < totalAggregatePacksCount; i += aggregatePackSize)
4855
{
4956
var packTimer = new Stopwatch();
5057
packTimer.Start();
51-
var tasks = Enumerable.Range(0, step).Select(t =>
58+
var tasks = Enumerable.Range(0, aggregatePackSize)
59+
.Select(t => WaitAggregateCommands(aggregateChangeAmount, random, node))
60+
.ToArray();
61+
try
62+
{
63+
Task.WhenAll(tasks).Wait();
64+
}
65+
catch
5266
{
53-
var data = new Fixture();
54-
var createAggregateCommand = data.Create<CreateSampleAggregateCommand>();
55-
var changeAggregateCommandA = new ChangeSampleAggregateCommand(data.Create<int>(),
56-
createAggregateCommand.AggregateId);
57-
var changeAggregateCommandB = new ChangeSampleAggregateCommand(data.Create<int>(),
58-
createAggregateCommand.AggregateId);
59-
var changeAggregateCommandC = new ChangeSampleAggregateCommand(data.Create<int>(),
60-
createAggregateCommand.AggregateId);
61-
62-
return node.NewCommandWaiter(TimeSpan.FromSeconds(100))
63-
.Expect<SampleAggregateCreatedEvent>(e => e.SourceId == createAggregateCommand.AggregateId)
64-
.And<SampleAggregateCreatedEvent> (e => e.SourceId == changeAggregateCommandA.AggregateId)
65-
.And<SampleAggregateCreatedEvent> (e => e.SourceId == changeAggregateCommandB.AggregateId)
66-
.And<SampleAggregateCreatedEvent> (e => e.SourceId == changeAggregateCommandC.AggregateId)
67-
.Create()
68-
.Execute(createAggregateCommand);
69-
70-
}).ToArray();
71-
72-
Task.WaitAll(tasks);
67+
timeoutedCommads += tasks.Count(t => t.IsCanceled || t.IsFaulted);
68+
}
69+
7370
packTimer.Stop();
74-
Console.WriteLine($"Executed {step} commands in {packTimer.Elapsed}, Total: {i} in {timer.Elapsed}");
71+
var speed = (decimal) (commandsInPack / packTimer.Elapsed.TotalSeconds);
72+
//Console.WriteLine($"Executed {aggregatePackSize} commands in {packTimer.Elapsed}, Total: {i} in {timer.Elapsed}, " +
73+
Console.WriteLine($"speed :{speed} cmd/sec," +
74+
$"total errors: {timeoutedCommads}, " +
75+
$"total commands executed: {i * commandsInPack}," +
76+
$"approx time remaining: {(totalAggregatePacksCount - i) / speed }");
7577
}
7678

7779
timer.Stop();
78-
Console.WriteLine($"Executed {count} batches in {timer.Elapsed}");
80+
Console.WriteLine($"Executed {totalAggregatePacksCount} batches in {timer.Elapsed}");
81+
node.Stop();
7982

8083
Console.WriteLine("Sleeping");
8184
Thread.Sleep(60);
8285

83-
node.Stop();
86+
}
87+
88+
private static Task<IWaitResults> WaitAggregateCommands(int changeNumber, Random random, GridDomainNode node)
89+
{
90+
var commands = new List<ICommand>(changeNumber + 1);
91+
var createCmd = new CreateSampleAggregateCommand(random.Next(), Guid.NewGuid());
92+
93+
var changeCmds = Enumerable.Range(0, changeNumber)
94+
.Select(n => new ChangeSampleAggregateCommand(random.Next(), createCmd.AggregateId))
95+
.ToArray();
96+
97+
commands.Add(createCmd);
98+
commands.AddRange(changeCmds);
99+
100+
101+
var expectBuilder = node.NewCommandWaiter()
102+
.Expect<SampleAggregateCreatedEvent>(e => e.SourceId == createCmd.AggregateId);
103+
104+
foreach (var cmd in changeCmds)
105+
expectBuilder.And<SampleAggregateChangedEvent>(e => e.SourceId == cmd.AggregateId && e.Value == cmd.Parameter.ToString());
106+
107+
108+
return expectBuilder.Create()
109+
.Execute(commands.ToArray());
84110
}
85111
}
86112
}

0 commit comments

Comments
 (0)