Skip to content

Commit 60fde34

Browse files
[BED-5972] Make Adaptive Timeout time-spike safety valve thread-safe (#224)
* fix: Make _clearSamplesDecay thread-safe * fix: atomic decrement with floor * chore: Add unit test for AtomicDecrementWithFloor, add more clarifying comments * fix: Critical bug in AtomicDecrementWithFloor test (Assert was wonked)
1 parent 11994cd commit 60fde34

2 files changed

Lines changed: 74 additions & 14 deletions

File tree

src/CommonLib/AdaptiveTimeout.cs

Lines changed: 55 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public AdaptiveTimeout(TimeSpan maxTimeout, ILogger log, int sampleCount = 100,
3838
}
3939

4040
public void ClearSamples() {
41-
_clearSamplesDecay = 0;
41+
Interlocked.Exchange(ref _clearSamplesDecay, 0);
4242
_sampler.ClearSamples();
4343
}
4444

@@ -198,29 +198,70 @@ public TimeSpan GetAdaptiveTimeout() {
198198
// AdaptiveTimeout will not respond well to rapid spikes in execution time
199199
// imagine the wrapped function very regularly executes in 10ms
200200
// then suddenly starts taking a regular 100ms
201-
// this is fine (if it fits in our max timeout budget), and we shouldn't block
201+
// this is fine (if it fits in our max timeout budget), and we shouldn't timeout
202202
// so we should create a safety valve in case this happens to reset our data samples
203203
private void TimeSpikeSafetyValve(bool isSuccess) {
204204
if (isSuccess) {
205-
_clearSamplesDecay -= TimeSpikeForgiveness;
206-
_clearSamplesDecay = Math.Max(0, _clearSamplesDecay);
205+
AtomicDecrementWithFloor(ref _clearSamplesDecay, TimeSpikeForgiveness);
207206
}
208-
else
209-
_clearSamplesDecay += TimeSpikePenalty;
207+
else {
208+
Interlocked.Add(ref _clearSamplesDecay, TimeSpikePenalty);
210209

211-
212-
if (_clearSamplesDecay >= ClearSamplesThreshold) {
213-
if (UseAdaptiveTimeout()) {
214-
ClearSamples();
215-
_log.LogTrace("Time spike safety valve event at timeout {CurrentTimeout}.", GetAdaptiveTimeout());
216-
}
217-
else {
218-
_log.LogWarning("This call is frequently running over the maximum allowed timeout of {MaxTimeout}.", _maxTimeout);
210+
if (_clearSamplesDecay >= ClearSamplesThreshold) {
211+
if (UseAdaptiveTimeout()) {
212+
ClearSamples();
213+
_log.LogTrace("Time spike safety valve event at timeout {CurrentTimeout}.", GetAdaptiveTimeout());
214+
}
215+
else {
216+
_log.LogWarning("This call is frequently running over the maximum allowed timeout of {MaxTimeout}.", _maxTimeout);
217+
Interlocked.Exchange(ref _clearSamplesDecay, 0);
218+
}
219219
}
220220
}
221221
}
222222

223223
private bool UseAdaptiveTimeout() {
224224
return _useAdaptiveTimeout && _sampler.Count >= _minSamplesForAdaptiveTimeout;
225225
}
226+
227+
// AI-generated code
228+
// Effectively accomplishes:
229+
// // Interlocked.Add(ref location, -decrement);
230+
// // Interlocked.Exchange(ref location, Math.Max(floor, location));
231+
// But since the above doesn't guarnantee atomicity, we need to be more clever.
232+
// This method will continually check the very latest value in <location>,
233+
// compute the new expected value after the decrement,
234+
// and try to replace <location> with this new value.
235+
// If it fails for any reason (race condition), it does all this again
236+
// until it wins the race.
237+
// This is however supposedly still much faster than using lock objects.
238+
// // Example:
239+
/*
240+
// target == 0
241+
// 1: this thread
242+
// 2: interceding thread
243+
244+
1: do {
245+
1: var initialVal = target;
246+
2: target = 2;
247+
1: var computedVal = Math.Max(0, initialVal - 1); // computedVal == 0
248+
1: } while (target != initialVal);
249+
250+
// target changed midway thru the op (2 != 0) and so isn't changed by CompareExchange, retry loop:
251+
252+
1: var initialVal = target; // 2
253+
1: var computedVal = Math.Max(0, initialVal - 1); // computedVal == 1
254+
1: } while (target != initialVal);
255+
256+
// target (2) == initialVal (2), assign target to 1 and exit loop
257+
*/
258+
public static void AtomicDecrementWithFloor(ref int target, int decrement, int floor = 0) {
259+
int initialValue, computedValue;
260+
do {
261+
initialValue = Volatile.Read(ref target);
262+
computedValue = Math.Max(floor, initialValue - decrement);
263+
}
264+
// If target is modified by another thread between initialValue assignment and now, continue loop
265+
while (Interlocked.CompareExchange(ref target, computedValue, initialValue) != initialValue);
266+
}
226267
}

test/unit/AdaptiveTimeoutTest.cs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,4 +66,23 @@ public async Task AdaptiveTimeout_GetAdaptiveTimeout_TimeSpikeSafetyValve() {
6666
var adaptiveTimeoutResult = adaptiveTimeout.GetAdaptiveTimeout();
6767
Assert.Equal(maxTimeout, adaptiveTimeoutResult);
6868
}
69+
70+
[Fact]
71+
public void AdaptiveTimeout_AtomicDecrementWithFloor_IsThreadSafe()
72+
{
73+
int value = 1000;
74+
int decrement = 1;
75+
int threads = 10;
76+
int decrementsPerThread = 100;
77+
78+
Parallel.For(0, threads, i =>
79+
{
80+
for (int j = 0; j < decrementsPerThread; j++)
81+
{
82+
AdaptiveTimeout.AtomicDecrementWithFloor(ref value, decrement, 0);
83+
}
84+
});
85+
86+
Assert.Equal(1000 - threads * decrementsPerThread, value);
87+
}
6988
}

0 commit comments

Comments
 (0)