-
Notifications
You must be signed in to change notification settings - Fork 325
Expand file tree
/
Copy pathOrchestrationSession.cs
More file actions
243 lines (209 loc) · 10.2 KB
/
OrchestrationSession.cs
File metadata and controls
243 lines (209 loc) · 10.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
// ----------------------------------------------------------------------------------
// Copyright Microsoft Corporation
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------
namespace DurableTask.AzureStorage.Messaging
{
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using DurableTask.Core;
using DurableTask.Core.History;
using Newtonsoft.Json;
sealed class OrchestrationSession : SessionBase, IOrchestrationSession
{
readonly TimeSpan idleTimeout;
readonly AsyncAutoResetEvent messagesAvailableEvent;
readonly MessageCollection nextMessageBatch;
public OrchestrationSession(
AzureStorageOrchestrationServiceSettings settings,
string storageAccountName,
OrchestrationInstance orchestrationInstance,
ControlQueue controlQueue,
List<MessageData> initialMessageBatch,
OrchestrationRuntimeState runtimeState,
OrchestrationETags eTags,
DateTime lastCheckpointTime,
object trackingStoreContext,
TimeSpan idleTimeout,
Guid traceActivityId)
: base(settings, storageAccountName, orchestrationInstance, traceActivityId)
{
this.idleTimeout = idleTimeout;
this.ControlQueue = controlQueue ?? throw new ArgumentNullException(nameof(controlQueue));
this.CurrentMessageBatch = initialMessageBatch ?? throw new ArgumentNullException(nameof(initialMessageBatch));
this.RuntimeState = runtimeState ?? throw new ArgumentNullException(nameof(runtimeState));
this.ETags = eTags;
this.LastCheckpointTime = lastCheckpointTime;
this.TrackingStoreContext = trackingStoreContext;
this.messagesAvailableEvent = new AsyncAutoResetEvent(signaled: false);
this.nextMessageBatch = new MessageCollection();
}
public ControlQueue ControlQueue { get; }
public List<MessageData> CurrentMessageBatch { get; private set; }
public MessageCollection DeferredMessages { get; } = new MessageCollection();
public MessageCollection DiscardedMessages { get; } = new MessageCollection();
public OrchestrationRuntimeState RuntimeState { get; private set; }
public OrchestrationETags ETags { get; set; }
public DateTime LastCheckpointTime { get; }
public object TrackingStoreContext { get; }
public IReadOnlyList<MessageData> PendingMessages => this.nextMessageBatch;
public override int GetCurrentEpisode()
{
// RuntimeState is mutable, so we cannot cache the current episode number.
return Utils.GetEpisodeNumber(this.RuntimeState);
}
public void AddOrReplaceMessages(IEnumerable<MessageData> messages)
{
lock (this.nextMessageBatch)
{
foreach (MessageData message in messages)
{
this.nextMessageBatch.AddOrReplace(message);
}
// Force running asynchronously to avoid blocking the main dispatch thread.
Task.Run(() => this.messagesAvailableEvent.Set());
}
}
// Called by the DTFx dispatcher thread
public async Task<IList<TaskMessage>> FetchNewOrchestrationMessagesAsync(
TaskOrchestrationWorkItem workItem)
{
if (!await this.messagesAvailableEvent.WaitAsync(this.idleTimeout))
{
return null; // timed-out
}
this.StartNewLogicalTraceScope();
lock (this.nextMessageBatch)
{
this.CurrentMessageBatch = this.nextMessageBatch.ToList();
this.nextMessageBatch.Clear();
}
var messages = new List<TaskMessage>(this.CurrentMessageBatch.Count);
foreach (MessageData msg in this.CurrentMessageBatch)
{
this.TraceProcessingMessage(msg, isExtendedSession: true, partitionId: this.ControlQueue.Name);
messages.Add(msg.TaskMessage);
}
return messages;
}
public void UpdateRuntimeState(OrchestrationRuntimeState runtimeState)
{
this.RuntimeState = runtimeState;
this.Instance = runtimeState.OrchestrationInstance;
}
public void DeferMessage(MessageData message)
{
this.DeferredMessages.AddOrReplace(message);
}
public void DiscardMessage(MessageData data)
{
this.DiscardedMessages.AddOrReplace(data);
}
public bool IsOutOfOrderMessage(MessageData message)
{
if (message.TaskMessage.Event.EventType != EventType.TaskCompleted &&
message.TaskMessage.Event.EventType != EventType.TaskFailed &&
message.TaskMessage.Event.EventType != EventType.SubOrchestrationInstanceCompleted &&
message.TaskMessage.Event.EventType != EventType.SubOrchestrationInstanceFailed &&
message.TaskMessage.Event.EventType != EventType.TimerFired &&
!(message.TaskMessage.Event.EventType == EventType.EventRaised && Core.Common.Entities.IsEntityInstance(message.Sender.InstanceId) && !Core.Common.Entities.IsEntityInstance(this.Instance.InstanceId)))
{
// The above message types are the only ones that can potentially be considered out-of-order.
return false;
}
if (this.IsNonexistantInstance() && message.OriginalQueueMessage.DequeueCount > 5)
{
// The first five times a message for a nonexistant instance is dequeued, give the message the benefit
// of the doubt and assume that the instance hasn't had its history table populated yet. After the
// fifth execution, ~30 seconds have passed and the most likely scenario is that this is a zombie event.
// This means the history table for the message's orchestration no longer exists, either due to an explicit
// PurgeHistory request or due to a ContinueAsNew call cleaning the old execution's history.
return false;
}
if (this.LastCheckpointTime > message.TaskMessage.Event.Timestamp)
{
// LastCheckpointTime represents the time at which the most recent history checkpoint completed.
// The checkpoint is written to the history table only *after* all queue messages are sent.
// A message is out of order when its timestamp *preceeds* the most recent checkpoint timestamp.
// In this case, we see that the checkpoint came *after* the message, so there is no out-of-order
// concern. Note that this logic only applies for messages sent by orchestrations to themselves.
// The next check considers the other cases (activities, sub-orchestrations, etc.).
// Orchestration checkpoint time information was added only after v1.6.4.
return false;
}
if (Utils.TryGetTaskScheduledId(message.TaskMessage.Event, out int taskScheduledId))
{
// This message is a response to a task. Search the history to make sure that we've recorded the fact that
// this task was scheduled.
HistoryEvent mostRecentTaskEvent = this.RuntimeState.Events.LastOrDefault(e => e.EventId == taskScheduledId);
if (mostRecentTaskEvent != null)
{
return false;
}
}
if (message.TaskMessage.Event.EventType == EventType.EventRaised)
{
// This EventRaised message is a response to an EventSent message.
var requestId = ((EventRaisedEvent)message.TaskMessage.Event).Name;
if (requestId != null)
{
HistoryEvent mostRecentTaskEvent = this.RuntimeState.Events.FirstOrDefault(e => e.EventType == EventType.EventSent && FindRequestId(((EventSentEvent)e).Input)?.ToString() == requestId);
if (mostRecentTaskEvent != null)
{
return false;
}
}
}
// The message is out of order and cannot be handled by the current session.
return true;
}
Guid? FindRequestId(string input)
{
try
{
JsonTextReader reader = new JsonTextReader(new StringReader(input));
reader.Read(); // JsonToken.StartObject
while (reader.Read() && reader.TokenType == JsonToken.PropertyName)
{
switch (reader.Value)
{
case "$type":
case "op":
case "signal":
case "input":
reader.Read(); // skip these, they may appear before the id
continue;
case "id":
return Guid.Parse(reader.ReadAsString());
default:
break;
}
}
}
catch
{
}
return null;
}
bool IsNonexistantInstance()
{
return this.RuntimeState.Events.Count == 0 || this.RuntimeState.ExecutionStartedEvent == null;
}
public Task EndSessionAsync()
{
// No-op
return Task.CompletedTask;
}
}
}