Skip to content

Commit 77c6675

Browse files
committed
Merge remote-tracking branch 'refs/remotes/Azure/master'
2 parents d769048 + e92db07 commit 77c6675

9 files changed

Lines changed: 137 additions & 32 deletions
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// ----------------------------------------------------------------------------------
2+
// Copyright Microsoft Corporation
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
// Unless required by applicable law or agreed to in writing, software
8+
// distributed under the License is distributed on an "AS IS" BASIS,
9+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
// See the License for the specific language governing permissions and
11+
// limitations under the License.
12+
// ----------------------------------------------------------------------------------
13+
14+
namespace DurableTask
15+
{
16+
using System;
17+
using System.Diagnostics;
18+
using Microsoft.ServiceBus.Messaging;
19+
using Tracing;
20+
21+
/// <summary>
22+
/// Extension methods for BrokeredMessage
23+
/// </summary>
24+
public static class BrokeredMessageExtensions
25+
{
26+
/// <summary>
27+
/// Returns delivery latency of the message
28+
/// </summary>
29+
public static double DeliveryLatency(this BrokeredMessage message)
30+
{
31+
if (message == null)
32+
{
33+
return 0;
34+
}
35+
36+
DateTime actualEnqueueTimeUtc = (message.ScheduledEnqueueTimeUtc == DateTime.MinValue) ? message.EnqueuedTimeUtc : message.ScheduledEnqueueTimeUtc;
37+
return (DateTime.UtcNow - actualEnqueueTimeUtc).TotalMilliseconds;
38+
}
39+
}
40+
}

Framework/Framework.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@
100100
<Reference Include="System.Xml" />
101101
</ItemGroup>
102102
<ItemGroup>
103+
<Compile Include="BrokeredMessageExtensions.cs" />
103104
<Compile Include="SynchronousTaskScheduler.cs" />
104105
<Compile Include="History\GenericEvent.cs" />
105106
<Compile Include="CompressionSettings.cs" />

Framework/Properties/AssemblyInfo.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,6 @@
2727
[assembly: AssemblyCulture("")]
2828
[assembly: ComVisible(false)]
2929
[assembly: Guid("1d626d8b-330b-4c6a-b689-c0fefbeb99cd")]
30-
[assembly: AssemblyVersion("1.0.0.8")]
31-
[assembly: AssemblyFileVersion("1.0.0.8")]
32-
[assembly: InternalsVisibleTo("FrameworkUnitTests")]
30+
[assembly: AssemblyVersion("1.0.0.9")]
31+
[assembly: AssemblyFileVersion("1.0.0.9")]
32+
[assembly: InternalsVisibleTo("FrameworkUnitTests")]

Framework/TaskActivityDispatcher.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,7 @@ protected override async Task<BrokeredMessage> OnFetchWorkItem(TimeSpan receiveT
6161
{
6262
TraceHelper.TraceSession(TraceEventType.Information,
6363
receivedMessage.SessionId,
64-
GetFormattedLog("New message to process: " + receivedMessage.MessageId + " [" +
65-
receivedMessage.SequenceNumber + "]"));
64+
GetFormattedLog($"New message to process: {receivedMessage.MessageId} [{receivedMessage.SequenceNumber}], latency: {receivedMessage.DeliveryLatency()}ms"));
6665
}
6766

6867
return receivedMessage;

Framework/TaskOrchestrationContext.cs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,11 @@ namespace DurableTask
1515
{
1616
using System;
1717
using System.Collections.Generic;
18+
using System.Diagnostics;
19+
using System.Globalization;
1820
using System.Threading;
1921
using System.Threading.Tasks;
22+
using DurableTask.Tracing;
2023
using Command;
2124
using History;
2225

@@ -251,6 +254,10 @@ public void HandleTaskCompletedEvent(TaskCompletedEvent completedEvent)
251254

252255
openTasks.Remove(taskId);
253256
}
257+
else
258+
{
259+
LogDuplicateEvent("TaskCompleted", completedEvent, taskId);
260+
}
254261
}
255262

256263
public void HandleTaskFailedEvent(TaskFailedEvent failedEvent)
@@ -269,6 +276,10 @@ public void HandleTaskFailedEvent(TaskFailedEvent failedEvent)
269276

270277
openTasks.Remove(taskId);
271278
}
279+
else
280+
{
281+
LogDuplicateEvent("TaskFailed", failedEvent, taskId);
282+
}
272283
}
273284

274285
public void HandleSubOrchestrationInstanceCompletedEvent(SubOrchestrationInstanceCompletedEvent completedEvent)
@@ -281,6 +292,10 @@ public void HandleSubOrchestrationInstanceCompletedEvent(SubOrchestrationInstanc
281292

282293
openTasks.Remove(taskId);
283294
}
295+
else
296+
{
297+
LogDuplicateEvent("SubOrchestrationInstanceCompleted", completedEvent, taskId);
298+
}
284299
}
285300

286301
public void HandleSubOrchestrationInstanceFailedEvent(SubOrchestrationInstanceFailedEvent failedEvent)
@@ -298,6 +313,10 @@ public void HandleSubOrchestrationInstanceFailedEvent(SubOrchestrationInstanceFa
298313

299314
openTasks.Remove(taskId);
300315
}
316+
else
317+
{
318+
LogDuplicateEvent("SubOrchestrationInstanceFailed", failedEvent, taskId);
319+
}
301320
}
302321

303322
public void HandleTimerFiredEvent(TimerFiredEvent timerFiredEvent)
@@ -309,6 +328,22 @@ public void HandleTimerFiredEvent(TimerFiredEvent timerFiredEvent)
309328
info.Result.SetResult(timerFiredEvent.TimerId.ToString());
310329
openTasks.Remove(taskId);
311330
}
331+
else
332+
{
333+
LogDuplicateEvent("TimerFired", timerFiredEvent, taskId);
334+
}
335+
}
336+
337+
private void LogDuplicateEvent(string source, HistoryEvent historyEvent, int taskId)
338+
{
339+
TraceHelper.TraceSession(
340+
TraceEventType.Warning,
341+
this.OrchestrationInstance.InstanceId,
342+
"Duplicate {0} Event: {1}, type: {2}, ts: {3}",
343+
source,
344+
taskId.ToString(),
345+
historyEvent.EventType,
346+
historyEvent.Timestamp.ToString(CultureInfo.InvariantCulture));
312347
}
313348

314349
public void HandleExecutionTerminatedEvent(ExecutionTerminatedEvent terminatedEvent)

Framework/TaskOrchestrationDispatcher.cs

Lines changed: 49 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -124,18 +124,20 @@ await Utils.ExecuteWithRetries(() => session.ReceiveBatchAsync(PrefetchCount),
124124

125125
TraceHelper.TraceSession(TraceEventType.Information, session.SessionId,
126126
GetFormattedLog(
127-
string.Format("{0} new messages to process: {1}",
128-
newMessages.Count(),
129-
string.Join(",", newMessages.Select(m => m.MessageId)))));
127+
$@"{newMessages.Count()} new messages to process: {
128+
string.Join(",", newMessages.Select(m => m.MessageId))
129+
}, max latency: {
130+
newMessages.Max(message => message.DeliveryLatency())}ms"
131+
));
130132

131133
return new SessionWorkItem {Session = session, Messages = newMessages};
132134
}
133135

134136
protected override async Task OnProcessWorkItem(SessionWorkItem sessionWorkItem)
135137
{
136-
var messagesToSend = new List<BrokeredMessage>();
137-
var timerMessages = new List<BrokeredMessage>();
138-
var subOrchestrationMessages = new List<BrokeredMessage>();
138+
var messagesToSend = new List<MessageContainer>();
139+
var timerMessages = new List<MessageContainer>();
140+
var subOrchestrationMessages = new List<MessageContainer>();
139141
bool isCompleted = false;
140142
bool continuedAsNew = false;
141143

@@ -195,7 +197,7 @@ protected override async Task OnProcessWorkItem(SessionWorkItem sessionWorkItem)
195197
foreach (OrchestratorAction decision in decisions)
196198
{
197199
TraceHelper.TraceInstance(TraceEventType.Information, runtimeState.OrchestrationInstance,
198-
"Processing orchestrator action of type {0}", decision.OrchestratorActionType);
200+
"Processing orchestrator action of type {0}, id {1}", decision.OrchestratorActionType, decision.Id.ToString());
199201
switch (decision.OrchestratorActionType)
200202
{
201203
case OrchestratorActionType.ScheduleOrchestrator:
@@ -206,7 +208,7 @@ protected override async Task OnProcessWorkItem(SessionWorkItem sessionWorkItem)
206208
taskMessage, settings.MessageCompressionSettings, runtimeState.OrchestrationInstance,
207209
"ScheduleTask");
208210
brokeredMessage.SessionId = session.SessionId;
209-
messagesToSend.Add(brokeredMessage);
211+
messagesToSend.Add(new MessageContainer(brokeredMessage, decision));
210212
break;
211213
case OrchestratorActionType.CreateTimer:
212214
var timerOrchestratorAction = (CreateTimerOrchestratorAction) decision;
@@ -216,7 +218,7 @@ protected override async Task OnProcessWorkItem(SessionWorkItem sessionWorkItem)
216218
"Timer");
217219
brokeredTimerMessage.ScheduledEnqueueTimeUtc = timerOrchestratorAction.FireAt;
218220
brokeredTimerMessage.SessionId = session.SessionId;
219-
timerMessages.Add(brokeredTimerMessage);
221+
timerMessages.Add(new MessageContainer(brokeredTimerMessage, decision));
220222
break;
221223
case OrchestratorActionType.CreateSubOrchestration:
222224
var createSubOrchestrationAction = (CreateSubOrchestrationAction) decision;
@@ -228,7 +230,7 @@ protected override async Task OnProcessWorkItem(SessionWorkItem sessionWorkItem)
228230
runtimeState.OrchestrationInstance, "Schedule Suborchestration");
229231
createSubOrchestrationMessage.SessionId =
230232
createSubOrchestrationInstanceMessage.OrchestrationInstance.InstanceId;
231-
subOrchestrationMessages.Add(createSubOrchestrationMessage);
233+
subOrchestrationMessages.Add(new MessageContainer(createSubOrchestrationMessage, decision));
232234
break;
233235
case OrchestratorActionType.OrchestrationComplete:
234236
TaskMessage workflowInstanceCompletedMessage =
@@ -252,7 +254,7 @@ protected override async Task OnProcessWorkItem(SessionWorkItem sessionWorkItem)
252254
}
253255
else
254256
{
255-
subOrchestrationMessages.Add(workflowCompletedBrokeredMessage);
257+
subOrchestrationMessages.Add(new MessageContainer(workflowCompletedBrokeredMessage, decision));
256258
}
257259
}
258260
isCompleted = !continuedAsNew;
@@ -294,7 +296,7 @@ protected override async Task OnProcessWorkItem(SessionWorkItem sessionWorkItem)
294296
"MaxMessageCount Timer");
295297
brokeredTimerMessage.ScheduledEnqueueTimeUtc = dummyTimer.FireAt;
296298
brokeredTimerMessage.SessionId = session.SessionId;
297-
timerMessages.Add(brokeredTimerMessage);
299+
timerMessages.Add(new MessageContainer(brokeredTimerMessage, null));
298300
break;
299301
}
300302
}
@@ -423,35 +425,40 @@ protected override async Task OnProcessWorkItem(SessionWorkItem sessionWorkItem)
423425
{
424426
if (messagesToSend.Count > 0)
425427
{
426-
messagesToSend.ForEach(m => workerSender.Send(m));
428+
messagesToSend.ForEach(m => workerSender.Send(m.Message));
429+
this.LogSentMessages(session, "Worker outbound", messagesToSend);
427430
}
428431

429432
if (timerMessages.Count > 0)
430433
{
431-
timerMessages.ForEach(m => orchestratorQueueClient.Send(m));
434+
timerMessages.ForEach(m => orchestratorQueueClient.Send(m.Message));
435+
this.LogSentMessages(session, "Timer Message", timerMessages);
432436
}
433437
}
434438

435439
if (subOrchestrationMessages.Count > 0)
436440
{
437-
subOrchestrationMessages.ForEach(m => orchestratorQueueClient.Send(m));
441+
subOrchestrationMessages.ForEach(m => orchestratorQueueClient.Send(m.Message));
442+
this.LogSentMessages(session, "Sub Orchestration", subOrchestrationMessages);
438443
}
439444

440445
if (continuedAsNewMessage != null)
441446
{
442447
orchestratorQueueClient.Send(continuedAsNewMessage);
448+
this.LogSentMessages(session, "Continue as new", new List<MessageContainer> () { new MessageContainer(continuedAsNewMessage, null) });
443449
}
444450

445451
if (isTrackingEnabled)
446452
{
447-
List<BrokeredMessage> trackingMessages = CreateTrackingMessages(runtimeState);
453+
List<MessageContainer> trackingMessages = CreateTrackingMessages(runtimeState);
448454

449455
TraceHelper.TraceInstance(TraceEventType.Information, runtimeState.OrchestrationInstance,
450456
"Created {0} tracking messages", trackingMessages.Count);
451457

452458
if (trackingMessages.Count > 0)
453459
{
454-
trackingMessages.ForEach(m => trackingSender.Send(m));
460+
trackingMessages.ForEach(m => trackingSender.Send(m.Message));
461+
this.LogSentMessages(session, "Tracking messages", trackingMessages);
455462
}
456463
}
457464
}
@@ -472,6 +479,15 @@ protected override async Task OnProcessWorkItem(SessionWorkItem sessionWorkItem)
472479
}
473480
}
474481

482+
void LogSentMessages(MessageSession session, string messageType, IList<MessageContainer> messages)
483+
{
484+
TraceHelper.TraceSession(
485+
TraceEventType.Information,
486+
session.SessionId,
487+
this.GetFormattedLog($@"{messages.Count().ToString()} messages queued for {messageType}: {
488+
string.Join(",", messages.Select(m => $"{m.Message.MessageId} <{m.Action?.Id.ToString()}>"))}"));
489+
}
490+
475491
BrokeredMessage CreateForcedTerminateMessage(string instanceId, string reason)
476492
{
477493
var taskMessage = new TaskMessage
@@ -487,9 +503,9 @@ BrokeredMessage CreateForcedTerminateMessage(string instanceId, string reason)
487503
return message;
488504
}
489505

490-
List<BrokeredMessage> CreateTrackingMessages(OrchestrationRuntimeState runtimeState)
506+
List<MessageContainer> CreateTrackingMessages(OrchestrationRuntimeState runtimeState)
491507
{
492-
var trackingMessages = new List<BrokeredMessage>();
508+
var trackingMessages = new List<MessageContainer> ();
493509

494510
// We cannot create tracking messages if runtime state does not have Orchestration InstanceId
495511
// This situation can happen due to corruption of service bus session state or if somehow first message of orchestration is not execution started
@@ -516,7 +532,7 @@ List<BrokeredMessage> CreateTrackingMessages(OrchestrationRuntimeState runtimeSt
516532
trackingMessage.ContentType = FrameworkConstants.TaskMessageContentType;
517533
trackingMessage.SessionId = runtimeState.OrchestrationInstance.InstanceId;
518534
trackingMessage.Properties[FrameworkConstants.HistoryEventIndexPropertyName] = historyEventIndex++;
519-
trackingMessages.Add(trackingMessage);
535+
trackingMessages.Add(new MessageContainer(trackingMessage, null));
520536
}
521537

522538
var stateMessage = new StateMessage {State = BuildOrchestrationState(runtimeState)};
@@ -526,7 +542,7 @@ List<BrokeredMessage> CreateTrackingMessages(OrchestrationRuntimeState runtimeSt
526542
"State Tracking Message");
527543
brokeredStateMessage.SessionId = runtimeState.OrchestrationInstance.InstanceId;
528544
brokeredStateMessage.ContentType = FrameworkConstants.StateMessageContentType;
529-
trackingMessages.Add(brokeredStateMessage);
545+
trackingMessages.Add(new MessageContainer(brokeredStateMessage, null));
530546
return trackingMessages;
531547
}
532548

@@ -881,5 +897,17 @@ protected override int GetDelayInSecondsAfterOnFetchException(Exception exceptio
881897
}
882898
return delay;
883899
}
900+
901+
internal class MessageContainer
902+
{
903+
internal BrokeredMessage Message { get; set; }
904+
internal OrchestratorAction Action { get; set; }
905+
906+
internal MessageContainer(BrokeredMessage message, OrchestratorAction action)
907+
{
908+
this.Message = message;
909+
this.Action = action;
910+
}
911+
}
884912
}
885913
}

Framework/Tracing/OrchestrationConsoleTraceListener.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,7 @@ public override void TraceEvent(TraceEventCache eventCache, string source, Trace
4444
}
4545
catch (Exception exception)
4646
{
47-
string toWrite = string.Format("Exception while parsing trace: {0}\n\t", exception.Message,
48-
exception.StackTrace);
47+
string toWrite = $"Exception while parsing trace: {message} : {exception.Message}\n\t{exception.StackTrace}";
4948
Console.WriteLine(toWrite);
5049
Debug.WriteLine(toWrite);
5150
}

Framework/Tracing/TraceHelper.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ static void ExceptionHandlingWrapper(Action innerFunc)
208208
try
209209
{
210210
source.TraceEvent(TraceEventType.Critical, 0,
211-
"Failed to log actual trace because one or more trace listeners threw an exception.");
211+
$"Failed to log actual trace because one or more trace listeners threw an exception: {exception}");
212212
}
213213
catch (Exception anotherException)
214214
{

0 commit comments

Comments
 (0)