Skip to content

Commit 8fed8fc

Browse files
author
Jos Hickson
committed
Merge remote-tracking branch 'refs/remotes/wintoncode/master'
2 parents 917a5c2 + 56cb084 commit 8fed8fc

16 files changed

Lines changed: 243 additions & 57 deletions

USAGE.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,9 +134,35 @@ the actor is not blocked.
134134
to `GetEmployee` for different employee ids or _the same_ employee id.
135135
In the latter case a second call to the database will occur.
136136
The code could be engineered to prevent this if desired but let's assume we don't mind the odd extra call to the database.
137+
* If we did want to prevent this one way would be to pause the actor whilst we wait for the database to return. In this particular case that would probably be a little heavy-handed but a means of doing this is described below.
137138
1. Finally, if the call to the database causes an exception because, say, no employee exists for the given `payrollId` then that exception
138139
will be passed up to the code that called into the `EmployeeCache` object.
139140

141+
### Pausing the actor during an `await`
142+
143+
Let's say you wanted to ensure only one call to the database to get an employee.
144+
A rather blunt way of doing that would be to essentially pause the actor until the database call returns.
145+
You'd achieve that using the `WhileActorPaused` extension:
146+
147+
```csharp
148+
public Task<Employee> GetEmployee(int payrollId)
149+
{
150+
return _actor.Enqueue(
151+
async () =>
152+
{
153+
if (!_cache.ContainsKey(payrollId))
154+
{
155+
_cache[payrollId] = await _database.GetEmployee(payrollId).WhileActorPaused();
156+
}
157+
158+
return _cache[payrollId];
159+
});
160+
}
161+
```
162+
163+
Essentially this will prevent the actor from doing anything at all until the task returned by `_database.GetEmployee(payrollId)` completes and the first thing processed by the actor when it resumes will be the continuation after the call (i.e. `_cache[payrollId] = ...`).
164+
This device should be used with caution - and would most likely be inappropriate in the case of this example - but could be useful in some circumstances.
165+
140166
## Specifying work to do when the actor starts
141167

142168
Sometimes it's useful to to specify work that the actor must do when it starts prior to processing any more work.

Winton.Extensions.Threading.Actor.Tests.Unit/ActorTests.cs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -875,6 +875,76 @@ public void ShouldStopActorAndNotProcessAnyAlreadyEnqueuedWorkIfStartWorkCancell
875875
attempts.Should().Be(0);
876876
}
877877

878+
[Fact]
879+
public async Task ShouldBeAbleToPauseActorUntilResumeFromAwait()
880+
{
881+
var actor = CreateActor();
882+
var numbers = new List<int>();
883+
884+
await actor.Start();
885+
886+
var tasks =
887+
new[]
888+
{
889+
actor.Enqueue(
890+
async () =>
891+
{
892+
numbers.Add(1);
893+
894+
await Task.Delay(TimeSpan.FromSeconds(1)).WhileActorPaused();
895+
896+
numbers.Add(2);
897+
}),
898+
actor.Enqueue(() => numbers.Add(3)),
899+
actor.Enqueue(() => numbers.Add(4)),
900+
actor.Enqueue(() => numbers.Add(5))
901+
};
902+
903+
await Task.WhenAll(tasks);
904+
905+
numbers.Should().Equal(1, 2, 3, 4, 5);
906+
}
907+
908+
[Fact]
909+
public async Task ShouldBeAbleToPauseActorUntilResumeFromAwaitReturningData()
910+
{
911+
var actor = CreateActor();
912+
var numbers = new List<int>();
913+
var promise = new TaskCompletionSource<int>();
914+
915+
await actor.Start();
916+
917+
var tasks =
918+
new[]
919+
{
920+
actor.Enqueue(
921+
async () =>
922+
{
923+
numbers.Add(1);
924+
925+
var next = await promise.Task.WhileActorPaused();
926+
927+
numbers.Add(next);
928+
}),
929+
actor.Enqueue(() => numbers.Add(3)),
930+
actor.Enqueue(() => numbers.Add(4)),
931+
actor.Enqueue(() => numbers.Add(5))
932+
};
933+
934+
var promiseSetter =
935+
Task.Run(
936+
async () =>
937+
{
938+
await Task.Delay(TimeSpan.FromSeconds(1));
939+
promise.SetResult(2);
940+
});
941+
942+
await Task.WhenAll(tasks);
943+
await promiseSetter;
944+
945+
numbers.Should().Equal(1, 2, 3, 4, 5);
946+
}
947+
878948
[Flags]
879949
private enum ActorCreateOptions
880950
{

Winton.Extensions.Threading.Actor.Tests.Unit/Winton.Extensions.Threading.Actor.Tests.Unit.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
<ItemGroup>
1212
<PackageReference Include="FluentAssertions" Version="4.19.2" />
1313
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.0.0" />
14-
<PackageReference Include="Moq" Version="4.7.1" />
14+
<PackageReference Include="Moq" Version="4.7.12" />
1515
<PackageReference Include="xunit" Version="2.2.0" />
1616
<PackageReference Include="xunit.runner.visualstudio" Version="2.2.0" />
1717
</ItemGroup>

Winton.Extensions.Threading.Actor/Actor.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ private Actor(ActorImpl impl)
4646
/// </summary>
4747
public static ActorId CurrentId
4848
{
49-
get { return _currentId; }
50-
internal set { _currentId = value; }
49+
get => _currentId;
50+
internal set => _currentId = value;
5151
}
5252

5353
/// <inheritdoc cref="IActor.Id"/>
@@ -56,13 +56,13 @@ public static ActorId CurrentId
5656
/// <inheritdoc cref="IActor.StartWork"/>
5757
public ActorStartWork StartWork
5858
{
59-
set { _impl.StartWork = value; }
59+
set => _impl.StartWork = value;
6060
}
6161

6262
/// <inheritdoc cref="IActor.StopWork"/>
6363
public ActorStopWork StopWork
6464
{
65-
set { _impl.StopWork = value; }
65+
set => _impl.StopWork = value;
6666
}
6767

6868
/// <inheritdoc cref="IActor.Start"/>

Winton.Extensions.Threading.Actor/Internal/ActorImpl.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,12 @@ public ActorImpl(IActorTaskFactory actorTaskFactory)
1616

1717
public ActorStartWork StartWork
1818
{
19-
set { Context.StartWork = value; }
19+
set => Context.StartWork = value;
2020
}
2121

2222
public ActorStopWork StopWork
2323
{
24-
set { Context.StopWork = value; }
24+
set => Context.StopWork = value;
2525
}
2626

2727
public Task Start()

Winton.Extensions.Threading.Actor/Internal/ActorSynchronizationContext.cs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,23 @@ internal sealed class ActorSynchronizationContext : SynchronizationContext
77
{
88
private readonly ActorTaskScheduler _scheduler;
99
private readonly IActorTaskFactory _actorTaskFactory;
10+
private readonly ActorTaskKind _actorTaskKind;
1011

11-
public ActorSynchronizationContext(ActorTaskScheduler scheduler, IActorTaskFactory actorTaskFactory)
12+
public ActorSynchronizationContext(ActorTaskScheduler scheduler, IActorTaskFactory actorTaskFactory, ActorTaskKind actorTaskKind)
1213
{
1314
_scheduler = scheduler;
1415
_actorTaskFactory = actorTaskFactory;
16+
_actorTaskKind = actorTaskKind;
17+
}
18+
19+
public ActorSynchronizationContext ChangeActorTaskKind(ActorTaskKind actorTaskKind)
20+
{
21+
return new ActorSynchronizationContext(_scheduler, _actorTaskFactory, actorTaskKind);
1522
}
1623

1724
public override void Post(SendOrPostCallback callback, object state)
1825
{
19-
_actorTaskFactory.Create(() => callback(state), CancellationToken.None, TaskCreationOptions.HideScheduler).Start(_scheduler);
26+
_actorTaskFactory.Create(() => callback(state), CancellationToken.None, TaskCreationOptions.HideScheduler, _actorTaskKind).Start(_scheduler);
2027
}
2128
}
2229
}

Winton.Extensions.Threading.Actor/Internal/ActorTask.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
using System;
12
using System.Threading.Tasks;
23

34
namespace Winton.Extensions.Threading.Actor.Internal
@@ -9,6 +10,11 @@ public static bool IsActorTask(this Task self)
910
return self.AsyncState is ActorTaskContext;
1011
}
1112

13+
public static ActorTaskContext GetActorTaskContext(this Task self)
14+
{
15+
return self.AsyncState as ActorTaskContext ?? throw new InvalidOperationException("Task is not an actor task.");
16+
}
17+
1218
public static void Cancel(this Task self)
1319
{
1420
var context = self.AsyncState as ActorTaskContext;

Winton.Extensions.Threading.Actor/Internal/ActorTaskContext.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,14 @@ namespace Winton.Extensions.Threading.Actor.Internal
44
{
55
internal sealed class ActorTaskContext
66
{
7-
public ActorTaskContext(CancellationToken cancellationToken)
7+
public ActorTaskContext(CancellationToken cancellationToken, ActorTaskKind kind)
88
{
9+
Kind = kind;
910
Canceller = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
1011
}
1112

1213
public CancellationTokenSource Canceller { get; }
14+
15+
public ActorTaskKind Kind { get; }
1316
}
1417
}

Winton.Extensions.Threading.Actor/Internal/ActorTaskFactoryExtensions.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ internal static class ActorTaskFactoryExtensions
88
{
99
private const TaskCreationOptions BaseCreationOptions = TaskCreationOptions.HideScheduler;
1010

11-
public static Task Create(this IActorTaskFactory self, Action work, CancellationToken cancellationToken, TaskCreationOptions taskCreationOptions)
11+
public static Task Create(this IActorTaskFactory self, Action work, CancellationToken cancellationToken, TaskCreationOptions taskCreationOptions, ActorTaskKind kind = ActorTaskKind.Standard)
1212
{
13-
var actorTaskState = new ActorTaskContext(cancellationToken);
13+
var actorTaskState = new ActorTaskContext(cancellationToken, kind);
1414
return self.Create(state =>
1515
{
1616
try
@@ -25,9 +25,9 @@ public static Task Create(this IActorTaskFactory self, Action work, Cancellation
2525
actorTaskState.Canceller.Token, taskCreationOptions | BaseCreationOptions, actorTaskState);
2626
}
2727

28-
public static Task<T> Create<T>(this IActorTaskFactory self, Func<T> work, CancellationToken cancellationToken, TaskCreationOptions taskCreationOptions)
28+
public static Task<T> Create<T>(this IActorTaskFactory self, Func<T> work, CancellationToken cancellationToken, TaskCreationOptions taskCreationOptions, ActorTaskKind kind = ActorTaskKind.Standard)
2929
{
30-
var actorTaskState = new ActorTaskContext(cancellationToken);
30+
var actorTaskState = new ActorTaskContext(cancellationToken, kind);
3131
return self.Create(state =>
3232
{
3333
try
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
namespace Winton.Extensions.Threading.Actor.Internal
2+
{
3+
internal enum ActorTaskKind
4+
{
5+
Standard,
6+
Resumer
7+
}
8+
}

0 commit comments

Comments
 (0)