Skip to content

Commit 7241c68

Browse files
author
Jos Hickson
authored
Merge pull request #12 from jhickson/pausewhileawait
Add ability to pause actor whilst awaiting
2 parents 096df3b + 8c145ed commit 7241c68

15 files changed

Lines changed: 217 additions & 57 deletions

Winton.Extensions.Threading.Actor.Tests.Unit/ActorTests.cs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -875,6 +875,76 @@ public void ShouldStopActorAndNotProcessAnyAlreadyEnqueuedWorkIfStartWorkCancell
875875
attempts.Should().Be(0);
876876
}
877877

878+
[Fact]
879+
public async Task ShouldBeAbleToPauseActorUntilResumeFromAwait()
880+
{
881+
var actor = CreateActor();
882+
var numbers = new List<int>();
883+
884+
await actor.Start();
885+
886+
var tasks =
887+
new[]
888+
{
889+
actor.Enqueue(
890+
async () =>
891+
{
892+
numbers.Add(1);
893+
894+
await Task.Delay(TimeSpan.FromSeconds(1)).WhileActorPaused();
895+
896+
numbers.Add(2);
897+
}),
898+
actor.Enqueue(() => numbers.Add(3)),
899+
actor.Enqueue(() => numbers.Add(4)),
900+
actor.Enqueue(() => numbers.Add(5))
901+
};
902+
903+
await Task.WhenAll(tasks);
904+
905+
numbers.Should().Equal(1, 2, 3, 4, 5);
906+
}
907+
908+
[Fact]
909+
public async Task ShouldBeAbleToPauseActorUntilResumeFromAwaitReturningData()
910+
{
911+
var actor = CreateActor();
912+
var numbers = new List<int>();
913+
var promise = new TaskCompletionSource<int>();
914+
915+
await actor.Start();
916+
917+
var tasks =
918+
new[]
919+
{
920+
actor.Enqueue(
921+
async () =>
922+
{
923+
numbers.Add(1);
924+
925+
var next = await promise.Task.WhileActorPaused();
926+
927+
numbers.Add(next);
928+
}),
929+
actor.Enqueue(() => numbers.Add(3)),
930+
actor.Enqueue(() => numbers.Add(4)),
931+
actor.Enqueue(() => numbers.Add(5))
932+
};
933+
934+
var promiseSetter =
935+
Task.Run(
936+
async () =>
937+
{
938+
await Task.Delay(TimeSpan.FromSeconds(1));
939+
promise.SetResult(2);
940+
});
941+
942+
await Task.WhenAll(tasks);
943+
await promiseSetter;
944+
945+
numbers.Should().Equal(1, 2, 3, 4, 5);
946+
}
947+
878948
[Flags]
879949
private enum ActorCreateOptions
880950
{

Winton.Extensions.Threading.Actor.Tests.Unit/Winton.Extensions.Threading.Actor.Tests.Unit.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
<ItemGroup>
1212
<PackageReference Include="FluentAssertions" Version="4.19.2" />
1313
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.0.0" />
14-
<PackageReference Include="Moq" Version="4.7.1" />
14+
<PackageReference Include="Moq" Version="4.7.12" />
1515
<PackageReference Include="xunit" Version="2.2.0" />
1616
<PackageReference Include="xunit.runner.visualstudio" Version="2.2.0" />
1717
</ItemGroup>

Winton.Extensions.Threading.Actor/Actor.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ private Actor(ActorImpl impl)
4646
/// </summary>
4747
public static ActorId CurrentId
4848
{
49-
get { return _currentId; }
50-
internal set { _currentId = value; }
49+
get => _currentId;
50+
internal set => _currentId = value;
5151
}
5252

5353
/// <inheritdoc cref="IActor.Id"/>
@@ -56,13 +56,13 @@ public static ActorId CurrentId
5656
/// <inheritdoc cref="IActor.StartWork"/>
5757
public ActorStartWork StartWork
5858
{
59-
set { _impl.StartWork = value; }
59+
set => _impl.StartWork = value;
6060
}
6161

6262
/// <inheritdoc cref="IActor.StopWork"/>
6363
public ActorStopWork StopWork
6464
{
65-
set { _impl.StopWork = value; }
65+
set => _impl.StopWork = value;
6666
}
6767

6868
/// <inheritdoc cref="IActor.Start"/>

Winton.Extensions.Threading.Actor/Internal/ActorImpl.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,12 @@ public ActorImpl(IActorTaskFactory actorTaskFactory)
1616

1717
public ActorStartWork StartWork
1818
{
19-
set { Context.StartWork = value; }
19+
set => Context.StartWork = value;
2020
}
2121

2222
public ActorStopWork StopWork
2323
{
24-
set { Context.StopWork = value; }
24+
set => Context.StopWork = value;
2525
}
2626

2727
public Task Start()

Winton.Extensions.Threading.Actor/Internal/ActorSynchronizationContext.cs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,23 @@ internal sealed class ActorSynchronizationContext : SynchronizationContext
77
{
88
private readonly ActorTaskScheduler _scheduler;
99
private readonly IActorTaskFactory _actorTaskFactory;
10+
private readonly ActorTaskKind _actorTaskKind;
1011

11-
public ActorSynchronizationContext(ActorTaskScheduler scheduler, IActorTaskFactory actorTaskFactory)
12+
public ActorSynchronizationContext(ActorTaskScheduler scheduler, IActorTaskFactory actorTaskFactory, ActorTaskKind actorTaskKind)
1213
{
1314
_scheduler = scheduler;
1415
_actorTaskFactory = actorTaskFactory;
16+
_actorTaskKind = actorTaskKind;
17+
}
18+
19+
public ActorSynchronizationContext ChangeActorTaskKind(ActorTaskKind actorTaskKind)
20+
{
21+
return new ActorSynchronizationContext(_scheduler, _actorTaskFactory, actorTaskKind);
1522
}
1623

1724
public override void Post(SendOrPostCallback callback, object state)
1825
{
19-
_actorTaskFactory.Create(() => callback(state), CancellationToken.None, TaskCreationOptions.HideScheduler).Start(_scheduler);
26+
_actorTaskFactory.Create(() => callback(state), CancellationToken.None, TaskCreationOptions.HideScheduler, _actorTaskKind).Start(_scheduler);
2027
}
2128
}
2229
}

Winton.Extensions.Threading.Actor/Internal/ActorTask.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
using System;
12
using System.Threading.Tasks;
23

34
namespace Winton.Extensions.Threading.Actor.Internal
@@ -9,6 +10,11 @@ public static bool IsActorTask(this Task self)
910
return self.AsyncState is ActorTaskContext;
1011
}
1112

13+
public static ActorTaskContext GetActorTaskContext(this Task self)
14+
{
15+
return self.AsyncState as ActorTaskContext ?? throw new InvalidOperationException("Task is not an actor task.");
16+
}
17+
1218
public static void Cancel(this Task self)
1319
{
1420
var context = self.AsyncState as ActorTaskContext;

Winton.Extensions.Threading.Actor/Internal/ActorTaskContext.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,14 @@ namespace Winton.Extensions.Threading.Actor.Internal
44
{
55
internal sealed class ActorTaskContext
66
{
7-
public ActorTaskContext(CancellationToken cancellationToken)
7+
public ActorTaskContext(CancellationToken cancellationToken, ActorTaskKind kind)
88
{
9+
Kind = kind;
910
Canceller = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
1011
}
1112

1213
public CancellationTokenSource Canceller { get; }
14+
15+
public ActorTaskKind Kind { get; }
1316
}
1417
}

Winton.Extensions.Threading.Actor/Internal/ActorTaskFactoryExtensions.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ internal static class ActorTaskFactoryExtensions
88
{
99
private const TaskCreationOptions BaseCreationOptions = TaskCreationOptions.HideScheduler;
1010

11-
public static Task Create(this IActorTaskFactory self, Action work, CancellationToken cancellationToken, TaskCreationOptions taskCreationOptions)
11+
public static Task Create(this IActorTaskFactory self, Action work, CancellationToken cancellationToken, TaskCreationOptions taskCreationOptions, ActorTaskKind kind = ActorTaskKind.Standard)
1212
{
13-
var actorTaskState = new ActorTaskContext(cancellationToken);
13+
var actorTaskState = new ActorTaskContext(cancellationToken, kind);
1414
return self.Create(state =>
1515
{
1616
try
@@ -25,9 +25,9 @@ public static Task Create(this IActorTaskFactory self, Action work, Cancellation
2525
actorTaskState.Canceller.Token, taskCreationOptions | BaseCreationOptions, actorTaskState);
2626
}
2727

28-
public static Task<T> Create<T>(this IActorTaskFactory self, Func<T> work, CancellationToken cancellationToken, TaskCreationOptions taskCreationOptions)
28+
public static Task<T> Create<T>(this IActorTaskFactory self, Func<T> work, CancellationToken cancellationToken, TaskCreationOptions taskCreationOptions, ActorTaskKind kind = ActorTaskKind.Standard)
2929
{
30-
var actorTaskState = new ActorTaskContext(cancellationToken);
30+
var actorTaskState = new ActorTaskContext(cancellationToken, kind);
3131
return self.Create(state =>
3232
{
3333
try
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
namespace Winton.Extensions.Threading.Actor.Internal
2+
{
3+
internal enum ActorTaskKind
4+
{
5+
Standard,
6+
Resumer
7+
}
8+
}

0 commit comments

Comments
 (0)