Skip to content

Commit bd98b25

Browse files
[BED-5972] Adaptive Timeouts: opt-in throws ExcessiveTimeoutsException; ordinal tracking to ignore time spike 'hiccups' (#226)
* feat: Adaptive timeouts: optional throws, time spike hiccup management * feat: opt-in throws ExcessiveTimeoutsException; ordinal call tracking to ignore time spike 'hiccups' * chore: hopefully make timeout spike hiccup test less flakey, clear up confusion on ordinal task tracking * chore: replace startTime assignment with default struct value for ease of reading * fix: Make TimeSpikeSafetyValve test more robust
1 parent 60fde34 commit bd98b25

4 files changed

Lines changed: 197 additions & 40 deletions

File tree

src/CommonLib/AdaptiveTimeout.cs

Lines changed: 108 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,31 @@
11
using System;
2+
using System.Collections.Concurrent;
3+
using System.Linq;
24
using System.Threading;
35
using System.Threading.Tasks;
46
using Microsoft.Extensions.Logging;
7+
using SharpHoundCommonLib.Exceptions;
58
using SharpHoundRPC.NetAPINative;
69

710
namespace SharpHoundCommonLib;
811

912
public sealed class AdaptiveTimeout : IDisposable {
1013
private readonly ExecutionTimeSampler _sampler;
14+
private readonly ConcurrentQueue<DateTime> _latestSuccessTimestamps;
1115
private readonly ILogger _log;
1216
private readonly TimeSpan _maxTimeout;
1317
private readonly bool _useAdaptiveTimeout;
1418
private readonly int _minSamplesForAdaptiveTimeout;
15-
private int _clearSamplesDecay;
19+
private readonly bool _throwIfExcessiveTimeouts;
20+
private int _timeSpikeDecay;
1621
private const int TimeSpikePenalty = 2;
1722
private const int TimeSpikeForgiveness = 1;
18-
private const int ClearSamplesThreshold = 5;
23+
private const int TimeSpikeThreshold = 5;
24+
private const int ExcessiveTimeoutsThreshold = 7;
1925
private const int StdDevMultiplier = 5;
26+
private const int CountOfLatestSuccessToKeep = 4;
2027

21-
public AdaptiveTimeout(TimeSpan maxTimeout, ILogger log, int sampleCount = 100, int logFrequency = 1000, int minSamplesForAdaptiveTimeout = 30, bool useAdaptiveTimeout = true) {
28+
public AdaptiveTimeout(TimeSpan maxTimeout, ILogger log, int sampleCount = 100, int logFrequency = 1000, int minSamplesForAdaptiveTimeout = 30, bool useAdaptiveTimeout = true, bool throwIfExcessiveTimeouts = false) {
2229
if (maxTimeout <= TimeSpan.Zero)
2330
throw new ArgumentException("maxTimeout must be positive", nameof(maxTimeout));
2431
if (sampleCount <= 0)
@@ -31,14 +38,16 @@ public AdaptiveTimeout(TimeSpan maxTimeout, ILogger log, int sampleCount = 100,
3138
throw new ArgumentNullException(nameof(log));
3239

3340
_sampler = new ExecutionTimeSampler(log, sampleCount, logFrequency);
41+
_latestSuccessTimestamps = new ConcurrentQueue<DateTime>();
3442
_log = log;
3543
_maxTimeout = maxTimeout;
36-
_useAdaptiveTimeout = useAdaptiveTimeout;
3744
_minSamplesForAdaptiveTimeout = minSamplesForAdaptiveTimeout;
45+
_useAdaptiveTimeout = useAdaptiveTimeout;
46+
_throwIfExcessiveTimeouts = throwIfExcessiveTimeouts;
3847
}
3948

4049
public void ClearSamples() {
41-
Interlocked.Exchange(ref _clearSamplesDecay, 0);
50+
Interlocked.Exchange(ref _timeSpikeDecay, 0);
4251
_sampler.ClearSamples();
4352
}
4453

@@ -55,8 +64,13 @@ public void ClearSamples() {
5564
/// <param name="parentToken"></param>
5665
/// <returns>Returns a Fail result if a task runs longer than its budgeted time.</returns>
5766
public async Task<Result<T>> ExecuteWithTimeout<T>(Func<CancellationToken, T> func, CancellationToken parentToken = default) {
58-
var result = await Timeout.ExecuteWithTimeout(GetAdaptiveTimeout(), (timeoutToken) => _sampler.SampleExecutionTime(() => func(timeoutToken)), parentToken);
59-
TimeSpikeSafetyValve(result.IsSuccess);
67+
DateTime startTime = default;
68+
var result = await Timeout.ExecuteWithTimeout(GetAdaptiveTimeout(), (timeoutToken) =>
69+
_sampler.SampleExecutionTime(() => {
70+
startTime = DateTime.Now; // for ordinal tracking; see use in TimeSpikeSafetyValve
71+
return func(timeoutToken);
72+
}), parentToken);
73+
TimeSpikeSafetyValve(result.IsSuccess, startTime);
6074
return result;
6175
}
6276

@@ -72,8 +86,13 @@ public async Task<Result<T>> ExecuteWithTimeout<T>(Func<CancellationToken, T> fu
7286
/// <param name="parentToken"></param>
7387
/// <returns>Returns a Fail result if a task runs longer than its budgeted time.</returns>
7488
public async Task<Result> ExecuteWithTimeout(Action<CancellationToken> func, CancellationToken parentToken = default) {
75-
var result = await Timeout.ExecuteWithTimeout(GetAdaptiveTimeout(), (timeoutToken) => _sampler.SampleExecutionTime(() => func(timeoutToken)), parentToken);
76-
TimeSpikeSafetyValve(result.IsSuccess);
89+
DateTime startTime = default;
90+
var result = await Timeout.ExecuteWithTimeout(GetAdaptiveTimeout(), (timeoutToken) =>
91+
_sampler.SampleExecutionTime(() => {
92+
startTime = DateTime.Now; // for ordinal tracking; see use in TimeSpikeSafetyValve
93+
func(timeoutToken);
94+
}), parentToken);
95+
TimeSpikeSafetyValve(result.IsSuccess, startTime);
7796
return result;
7897
}
7998

@@ -90,8 +109,13 @@ public async Task<Result> ExecuteWithTimeout(Action<CancellationToken> func, Can
90109
/// <param name="parentToken"></param>
91110
/// <returns>Returns a Fail result if a task runs longer than its budgeted time.</returns>
92111
public async Task<Result<T>> ExecuteWithTimeout<T>(Func<CancellationToken, Task<T>> func, CancellationToken parentToken = default) {
93-
var result = await Timeout.ExecuteWithTimeout(GetAdaptiveTimeout(), (timeoutToken) => _sampler.SampleExecutionTime(() => func(timeoutToken)), parentToken);
94-
TimeSpikeSafetyValve(result.IsSuccess);
112+
DateTime startTime = default;
113+
var result = await Timeout.ExecuteWithTimeout(GetAdaptiveTimeout(), (timeoutToken) =>
114+
_sampler.SampleExecutionTime(() => {
115+
startTime = DateTime.Now; // for ordinal tracking; see use in TimeSpikeSafetyValve
116+
return func(timeoutToken);
117+
}), parentToken);
118+
TimeSpikeSafetyValve(result.IsSuccess, startTime);
95119
return result;
96120
}
97121

@@ -107,8 +131,13 @@ public async Task<Result<T>> ExecuteWithTimeout<T>(Func<CancellationToken, Task<
107131
/// <param name="parentToken"></param>
108132
/// <returns>Returns a Fail result if a task runs longer than its budgeted time.</returns>
109133
public async Task<Result> ExecuteWithTimeout(Func<CancellationToken, Task> func, CancellationToken parentToken = default) {
110-
var result = await Timeout.ExecuteWithTimeout(GetAdaptiveTimeout(), (timeoutToken) => _sampler.SampleExecutionTime(() => func(timeoutToken)), parentToken);
111-
TimeSpikeSafetyValve(result.IsSuccess);
134+
DateTime startTime = default;
135+
var result = await Timeout.ExecuteWithTimeout(GetAdaptiveTimeout(), (timeoutToken) =>
136+
_sampler.SampleExecutionTime(() => {
137+
startTime = DateTime.Now; // for ordinal tracking; see use in TimeSpikeSafetyValve
138+
return func(timeoutToken);
139+
}), parentToken);
140+
TimeSpikeSafetyValve(result.IsSuccess, startTime);
112141
return result;
113142
}
114143

@@ -125,8 +154,13 @@ public async Task<Result> ExecuteWithTimeout(Func<CancellationToken, Task> func,
125154
/// <param name="parentToken"></param>
126155
/// <returns>Returns a Fail result if a task runs longer than its budgeted time.</returns>
127156
public async Task<NetAPIResult<T>> ExecuteNetAPIWithTimeout<T>(Func<CancellationToken, NetAPIResult<T>> func, CancellationToken parentToken = default) {
128-
var result = await Timeout.ExecuteNetAPIWithTimeout(GetAdaptiveTimeout(), (timeoutToken) => _sampler.SampleExecutionTime(() => func(timeoutToken)), parentToken);
129-
TimeSpikeSafetyValve(result.IsSuccess);
157+
DateTime startTime = default;
158+
var result = await Timeout.ExecuteNetAPIWithTimeout(GetAdaptiveTimeout(), (timeoutToken) =>
159+
_sampler.SampleExecutionTime(() => {
160+
startTime = DateTime.Now; // for ordinal tracking; see use in TimeSpikeSafetyValve
161+
return func(timeoutToken);
162+
}), parentToken);
163+
TimeSpikeSafetyValve(result.IsSuccess, startTime);
130164
return result;
131165
}
132166

@@ -143,8 +177,13 @@ public async Task<NetAPIResult<T>> ExecuteNetAPIWithTimeout<T>(Func<Cancellation
143177
/// <param name="parentToken"></param>
144178
/// <returns>Returns a Fail result if a task runs longer than its budgeted time.</returns>
145179
public async Task<SharpHoundRPC.Result<T>> ExecuteRPCWithTimeout<T>(Func<CancellationToken, SharpHoundRPC.Result<T>> func, CancellationToken parentToken = default) {
146-
var result = await Timeout.ExecuteRPCWithTimeout(GetAdaptiveTimeout(), (timeoutToken) => _sampler.SampleExecutionTime(() => func(timeoutToken)), parentToken);
147-
TimeSpikeSafetyValve(result.IsSuccess);
180+
DateTime startTime = default;
181+
var result = await Timeout.ExecuteRPCWithTimeout(GetAdaptiveTimeout(), (timeoutToken) =>
182+
_sampler.SampleExecutionTime(() => {
183+
startTime = DateTime.Now; // for ordinal tracking; see use in TimeSpikeSafetyValve
184+
return func(timeoutToken);
185+
}), parentToken);
186+
TimeSpikeSafetyValve(result.IsSuccess, startTime);
148187
return result;
149188
}
150189

@@ -161,8 +200,13 @@ public async Task<NetAPIResult<T>> ExecuteNetAPIWithTimeout<T>(Func<Cancellation
161200
/// <param name="parentToken"></param>
162201
/// <returns>Returns a Fail result if a task runs longer than its budgeted time.</returns>
163202
public async Task<SharpHoundRPC.Result<T>> ExecuteRPCWithTimeout<T>(Func<CancellationToken, Task<SharpHoundRPC.Result<T>>> func, CancellationToken parentToken = default) {
164-
var result = await Timeout.ExecuteRPCWithTimeout(GetAdaptiveTimeout(), (timeoutToken) => _sampler.SampleExecutionTime(() => func(timeoutToken)), parentToken);
165-
TimeSpikeSafetyValve(result.IsSuccess);
203+
DateTime startTime = default;
204+
var result = await Timeout.ExecuteRPCWithTimeout(GetAdaptiveTimeout(), (timeoutToken) =>
205+
_sampler.SampleExecutionTime(() => {
206+
startTime = DateTime.Now; // for ordinal tracking; see use in TimeSpikeSafetyValve
207+
return func(timeoutToken);
208+
}), parentToken);
209+
TimeSpikeSafetyValve(result.IsSuccess, startTime);
166210
return result;
167211
}
168212

@@ -200,30 +244,66 @@ public TimeSpan GetAdaptiveTimeout() {
200244
// then suddenly starts taking a regular 100ms
201245
// this is fine (if it fits in our max timeout budget), and we shouldn't timeout
202246
// so we should create a safety valve in case this happens to reset our data samples
203-
private void TimeSpikeSafetyValve(bool isSuccess) {
247+
private void TimeSpikeSafetyValve(bool isSuccess, DateTime startTime) {
204248
if (isSuccess) {
205-
AtomicDecrementWithFloor(ref _clearSamplesDecay, TimeSpikeForgiveness);
249+
AtomicDecrementWithFloor(ref _timeSpikeDecay, TimeSpikeForgiveness);
250+
AddLatestSuccessTimestamp(startTime);
206251
}
207252
else {
208-
Interlocked.Add(ref _clearSamplesDecay, TimeSpikePenalty);
253+
Interlocked.Add(ref _timeSpikeDecay, TimeSpikePenalty);
209254

210-
if (_clearSamplesDecay >= ClearSamplesThreshold) {
211-
if (UseAdaptiveTimeout()) {
212-
ClearSamples();
213-
_log.LogTrace("Time spike safety valve event at timeout {CurrentTimeout}.", GetAdaptiveTimeout());
255+
if (Volatile.Read(ref _timeSpikeDecay) >= TimeSpikeThreshold) {
256+
if (EnoughSuccessesSince(startTime)) {
257+
// Time spike is in the past now, no action needed
258+
// This happens when earlier calls report back timeouts
259+
// but we've since seen sufficent successful calls completed in the time between
260+
_log.LogTrace("Time spike hiccup spotted but since recovered.");
261+
Interlocked.Exchange(ref _timeSpikeDecay, 0);
214262
}
215263
else {
216-
_log.LogWarning("This call is frequently running over the maximum allowed timeout of {MaxTimeout}.", _maxTimeout);
217-
Interlocked.Exchange(ref _clearSamplesDecay, 0);
264+
TriggerTimeSpikeEvent();
218265
}
219266
}
220267
}
221268
}
222269

270+
private void TriggerTimeSpikeEvent() {
271+
// Most recent calls made have been timing out
272+
// If adaptive timeout is in play when a spike in timeout events occurs,
273+
// flush our samples and back off to the max timeout until we have enough new ones
274+
// to rebuild our data confidence
275+
if (UseAdaptiveTimeout()) {
276+
_log.LogTrace("Time spike safety valve event at timeout {CurrentTimeout}.", GetAdaptiveTimeout());
277+
ClearSamples();
278+
}
279+
280+
// Otherwise, if we're using the max configured timeout already and this spike in timeout events is still occuring,
281+
// log it and maybe throw an error if so configuredx
282+
else if (Volatile.Read(ref _timeSpikeDecay) >= ExcessiveTimeoutsThreshold) {
283+
_log.LogWarning("This call is frequently running over the maximum allowed timeout of {MaxTimeout}.", _maxTimeout);
284+
Interlocked.Exchange(ref _timeSpikeDecay, 0);
285+
286+
if (_throwIfExcessiveTimeouts)
287+
throw new ExcessiveTimeoutsException($"This call is frequently running over the maximum allowed timeout of {_maxTimeout}.");
288+
}
289+
}
290+
223291
private bool UseAdaptiveTimeout() {
224292
return _useAdaptiveTimeout && _sampler.Count >= _minSamplesForAdaptiveTimeout;
225293
}
226294

295+
private void AddLatestSuccessTimestamp(DateTime startTime) {
296+
while (_latestSuccessTimestamps.Count >= CountOfLatestSuccessToKeep) {
297+
_latestSuccessTimestamps.TryDequeue(out var _);
298+
}
299+
300+
_latestSuccessTimestamps.Enqueue(startTime);
301+
}
302+
303+
private bool EnoughSuccessesSince(DateTime startTime) {
304+
return _latestSuccessTimestamps.All(t => t >= startTime);
305+
}
306+
227307
// AI-generated code
228308
// Effectively accomplishes:
229309
// // Interlocked.Add(ref location, -decrement);
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
using System;
2+
3+
namespace SharpHoundCommonLib.Exceptions {
4+
internal class ExcessiveTimeoutsException : ApplicationException {
5+
public ExcessiveTimeoutsException() {
6+
}
7+
8+
public ExcessiveTimeoutsException(string message) : base(message) { }
9+
}
10+
}

src/CommonLib/ExecutionTimeSampler.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using System.Collections.Concurrent;
33
using System.Diagnostics;
44
using System.Linq;
5+
using System.Threading;
56
using System.Threading.Tasks;
67
using Microsoft.Extensions.Logging;
78

@@ -84,7 +85,7 @@ private void AddTimeSample(TimeSpan timeSpan) {
8485
}
8586

8687
_samples.Enqueue(timeSpan.TotalMilliseconds);
87-
_samplesSinceLastLog++;
88+
Interlocked.Increment(ref _samplesSinceLastLog);
8889

8990
Log();
9091
}
@@ -98,7 +99,7 @@ private void Log(bool flush = false) {
9899
_log.LogWarning("Failed to calculate execution time statistics: {Error}", ex.Message);
99100
}
100101

101-
_samplesSinceLastLog = 0;
102+
Interlocked.Exchange(ref _samplesSinceLastLog, 0);
102103
}
103104
}
104105
}

test/unit/AdaptiveTimeoutTest.cs

Lines changed: 76 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
using System;
2+
using System.Collections.Generic;
23
using System.Threading;
34
using System.Threading.Tasks;
45
using SharpHoundCommonLib;
6+
using SharpHoundCommonLib.Exceptions;
57
using Xunit;
68
using Xunit.Abstractions;
79

@@ -53,32 +55,96 @@ public async Task AdaptiveTimeout_GetAdaptiveTimeout() {
5355

5456
[Fact]
5557
public async Task AdaptiveTimeout_GetAdaptiveTimeout_TimeSpikeSafetyValve() {
58+
var tasks = new List<Task>();
5659
var maxTimeout = TimeSpan.FromSeconds(1);
57-
var numSamples = 100;
60+
var numSamples = 30;
5861
var adaptiveTimeout = new AdaptiveTimeout(maxTimeout, new TestLogger(_testOutputHelper, Microsoft.Extensions.Logging.LogLevel.Trace), numSamples, 1000, 10);
5962

6063
for (int i = 0; i < numSamples; i++)
61-
await adaptiveTimeout.ExecuteWithTimeout((_) => Thread.Sleep(10));
64+
tasks.Add(adaptiveTimeout.ExecuteWithTimeout(async (_) => await Task.Delay(10)));
6265

63-
for (int i = 0; i < 6; i++)
64-
await adaptiveTimeout.ExecuteWithTimeout((_) => Thread.Sleep(200));
66+
await Task.WhenAll(tasks);
67+
68+
for (int i = 0; i < 3; i++)
69+
await adaptiveTimeout.ExecuteWithTimeout(async (_) => await Task.Delay(200));
6570

6671
var adaptiveTimeoutResult = adaptiveTimeout.GetAdaptiveTimeout();
6772
Assert.Equal(maxTimeout, adaptiveTimeoutResult);
6873
}
6974

7075
[Fact]
71-
public void AdaptiveTimeout_AtomicDecrementWithFloor_IsThreadSafe()
72-
{
76+
public async Task AdaptiveTimeout_GetAdaptiveTimeout_TimeSpikeSafetyValve_IgnoreHiccup() {
77+
var tasks = new List<Task>();
78+
var maxTimeout = TimeSpan.FromMilliseconds(100);
79+
var numSamples = 10;
80+
var adaptiveTimeout = new AdaptiveTimeout(maxTimeout, new TestLogger(_testOutputHelper, Microsoft.Extensions.Logging.LogLevel.Trace), numSamples, 1000, 5);
81+
82+
// Prepare our successful samples
83+
for (int i = 0; i < numSamples; i++)
84+
tasks.Add(adaptiveTimeout.ExecuteWithTimeout(async (_) => await Task.Delay(i)));
85+
86+
await Task.WhenAll(tasks);
87+
88+
// Add some timeout tasks that will resolve last
89+
for (int i = 0; i < 3; i++)
90+
tasks.Add(adaptiveTimeout.ExecuteWithTimeout(async (_) => await Task.Delay(200)));
91+
92+
// These tasks are added later but will resolve first
93+
for (int i = 0; i < 4; i++)
94+
tasks.Add(adaptiveTimeout.ExecuteWithTimeout(async (_) => await Task.Delay(i)));
95+
96+
await Task.WhenAll(tasks);
97+
var adaptiveTimeoutResult = adaptiveTimeout.GetAdaptiveTimeout();
98+
// So our time spike safety valve should ignore the hiccup, since later tasks have resolved
99+
// by the time the safety valve has triggered by the timeout tasks
100+
Assert.True(adaptiveTimeoutResult < maxTimeout);
101+
}
102+
103+
[Fact]
104+
public async Task AdaptiveTimeout_GetAdaptiveTimeout_ThrowWhenExcessiveTimeouts() {
105+
var tasks = new List<Task>();
106+
var maxTimeout = TimeSpan.FromMilliseconds(100);
107+
var numSamples = 10;
108+
var adaptiveTimeout = new AdaptiveTimeout(maxTimeout, new TestLogger(_testOutputHelper, Microsoft.Extensions.Logging.LogLevel.Trace), numSamples, 1000, 5, throwIfExcessiveTimeouts: true);
109+
110+
for (int i = 0; i < numSamples; i++)
111+
tasks.Add(adaptiveTimeout.ExecuteWithTimeout(async (_) => await Task.Delay(10)));
112+
113+
await Task.WhenAll(tasks);
114+
115+
for (int i = 0; i < 20; i++)
116+
tasks.Add(adaptiveTimeout.ExecuteWithTimeout(async (_) => await Task.Delay(200)));
117+
118+
await Assert.ThrowsAsync<ExcessiveTimeoutsException>(async () => await Task.WhenAll(tasks));
119+
}
120+
121+
[Fact]
122+
public async Task AdaptiveTimeout_GetAdaptiveTimeout_DoNotThrowWhenExcessiveTimeouts() {
123+
var tasks = new List<Task>();
124+
var maxTimeout = TimeSpan.FromMilliseconds(100);
125+
var numSamples = 10;
126+
var adaptiveTimeout = new AdaptiveTimeout(maxTimeout, new TestLogger(_testOutputHelper, Microsoft.Extensions.Logging.LogLevel.Trace), numSamples, 1000, 5, throwIfExcessiveTimeouts: false);
127+
128+
for (int i = 0; i < numSamples; i++)
129+
tasks.Add(adaptiveTimeout.ExecuteWithTimeout(async (_) => await Task.Delay(10)));
130+
131+
await Task.WhenAll(tasks);
132+
133+
for (int i = 0; i < 20; i++)
134+
tasks.Add(adaptiveTimeout.ExecuteWithTimeout(async (_) => await Task.Delay(200)));
135+
136+
await Task.WhenAll(tasks);
137+
}
138+
139+
[Fact]
140+
public void AdaptiveTimeout_AtomicDecrementWithFloor_IsThreadSafe() {
73141
int value = 1000;
74142
int decrement = 1;
75143
int threads = 10;
76144
int decrementsPerThread = 100;
77145

78-
Parallel.For(0, threads, i =>
79-
{
80-
for (int j = 0; j < decrementsPerThread; j++)
81-
{
146+
Parallel.For(0, threads, i => {
147+
for (int j = 0; j < decrementsPerThread; j++) {
82148
AdaptiveTimeout.AtomicDecrementWithFloor(ref value, decrement, 0);
83149
}
84150
});

0 commit comments

Comments
 (0)