From fdde173e914425b24d7963223b622c1d74423828 Mon Sep 17 00:00:00 2001 From: Jonathan Norris Date: Mon, 4 May 2026 15:31:02 -0400 Subject: [PATCH 1/2] fix: ensure FlushMutex is released when WASM flush throws When localBucketing.FlushEventQueue throws (e.g. AssemblyScript abort inside the WASM event queue path), the previous implementation let the exception escape between StartFlush() and EndFlush(). That permanently leaked the FlushMutex semaphore, so every subsequent flush deadlocked on StartFlush() and events accumulated in the WASM heap indefinitely - compounding any pre-existing heap corruption. Wrap the flush body in try/finally so EndFlush() always runs. Catch exceptions from GetPayloads() and the per-payload mark-success/failure calls so a failing flush surfaces as a failed FlushedEvents subscriber notification instead of an uncaught exception. --- .../EventQueueTest.cs | 147 ++++++++++++++++++ DevCycle.SDK.Server.Local/Api/EventQueue.cs | 57 ++++++- 2 files changed, 198 insertions(+), 6 deletions(-) diff --git a/DevCycle.SDK.Server.Local.MSTests/EventQueueTest.cs b/DevCycle.SDK.Server.Local.MSTests/EventQueueTest.cs index c9757a9..a3d12d0 100644 --- a/DevCycle.SDK.Server.Local.MSTests/EventQueueTest.cs +++ b/DevCycle.SDK.Server.Local.MSTests/EventQueueTest.cs @@ -190,5 +190,152 @@ private void AssertFailedEvent(object sender, DevCycleEventArgs e) Assert.AreNotEqual(0, e.Errors.Count); Assert.IsFalse(e.Success); } + + // --------------------------------------------------------------- + // Tests for the FlushMutex leak fix. + // + // Background: when the WASM bucketing engine traps inside + // flushEventQueue (e.g. its event queue state is corrupted from + // accumulated AssemblyScript throws), the previous implementation + // of FlushEvents() let the exception escape AFTER calling + // localBucketing.StartFlush() but BEFORE calling EndFlush(). That + // permanently leaked the FlushMutex and every subsequent flush + // would deadlock, causing events to accumulate in the WASM heap + // indefinitely. The fix wraps StartFlush/EndFlush in try/finally + // and swallows WASM exceptions so the mutex is always released. + // --------------------------------------------------------------- + + [TestMethod] + public async Task FlushEvents_WhenFlushEventQueueThrows_DoesNotPropagate() + { + var bucketing = new TrappingLocalBucketing(throwOnFlushEventQueue: true); + var eventQueue = BuildEventQueueWithFakeBucketing(bucketing); + + // Should NOT throw - the fix swallows the exception so the + // mutex outer try/finally can release the FlushMutex. + await eventQueue.FlushEvents(); + } + + [TestMethod] + public async Task FlushEvents_WhenFlushEventQueueThrows_StillCallsEndFlush() + { + var bucketing = new TrappingLocalBucketing(throwOnFlushEventQueue: true); + var eventQueue = BuildEventQueueWithFakeBucketing(bucketing); + + await eventQueue.FlushEvents(); + + Assert.AreEqual(1, bucketing.StartFlushCount, + "StartFlush should have been called exactly once."); + Assert.AreEqual(1, bucketing.EndFlushCount, + "EndFlush MUST be called even when FlushEventQueue throws, " + + "otherwise the FlushMutex semaphore is permanently leaked."); + } + + [TestMethod] + public async Task FlushEvents_WhenFlushEventQueueThrows_RaisesFailureToSubscribers() + { + var bucketing = new TrappingLocalBucketing(throwOnFlushEventQueue: true); + var eventQueue = BuildEventQueueWithFakeBucketing(bucketing); + + DevCycleEventArgs received = null; + var completion = new ManualResetEvent(false); + eventQueue.AddFlushedEventsSubscriber((sender, e) => + { + received = e; + completion.Set(); + }); + + await eventQueue.FlushEvents(); + completion.WaitOne(TimeSpan.FromSeconds(2)); + + Assert.IsNotNull(received, "FlushedEvents subscriber should have been invoked."); + Assert.IsFalse(received.Success); + Assert.IsTrue(received.Errors.Count > 0, + "Errors collection should contain the underlying exception."); + } + + [TestMethod] + public async Task FlushEvents_AfterFailedFlush_NextFlushDoesNotDeadlock() + { + // This test would hang forever before the fix: the first flush + // throws inside FlushEventQueue, EndFlush never runs, FlushMutex + // is leaked, and the second flush blocks on StartFlush forever. + var bucketing = new TrappingLocalBucketing(throwOnFlushEventQueue: true); + var eventQueue = BuildEventQueueWithFakeBucketing(bucketing); + + await eventQueue.FlushEvents(); + + // Now turn off the fault and try again. With the fix, this + // completes promptly. Without the fix, this deadlocks. + bucketing.ThrowOnFlushEventQueue = false; + var second = eventQueue.FlushEvents(); + var completed = await Task.WhenAny(second, Task.Delay(TimeSpan.FromSeconds(5))); + + Assert.AreSame(second, completed, + "Second flush should complete instead of deadlocking on a leaked FlushMutex."); + Assert.AreEqual(2, bucketing.StartFlushCount); + Assert.AreEqual(2, bucketing.EndFlushCount); + } + + private EventQueue BuildEventQueueWithFakeBucketing(TrappingLocalBucketing bucketing) + { + var sdkKey = $"server-{Guid.NewGuid()}"; + var loggerFactory = LoggerFactory.Create(b => b.SetMinimumLevel(LogLevel.None)); + var options = new DevCycleLocalOptions(10, 10); + + var mockHttp = new MockHttpMessageHandler(); + mockHttp.When("https://*").Respond(HttpStatusCode.Created, "application/json", "{}"); + + return new EventQueue(sdkKey, options, loggerFactory, bucketing, + new DevCycleRestClientOptions { ConfigureMessageHandler = _ => mockHttp }); + } + + /// + /// Test fake of that simulates a WASM + /// trap inside FlushEventQueue. Tracks Start/EndFlush call + /// counts so tests can assert the FlushMutex was released. + /// + private class TrappingLocalBucketing : ILocalBucketing + { + public bool ThrowOnFlushEventQueue { get; set; } + public int StartFlushCount { get; private set; } + public int EndFlushCount { get; private set; } + + public TrappingLocalBucketing(bool throwOnFlushEventQueue) + { + ThrowOnFlushEventQueue = throwOnFlushEventQueue; + } + + public string ClientUUID => "test-client-uuid"; + + public List FlushEventQueue(string sdkKey) + { + if (ThrowOnFlushEventQueue) + { + // Mimics a wasmtime trap surfaced as LocalBucketingException. + throw new LocalBucketingException("Simulated WASM trap during flushEventQueue"); + } + return new List(); + } + + public void StartFlush() { StartFlushCount++; } + public void EndFlush() { EndFlushCount++; } + + // Unused on the flush path - return safe defaults / no-ops. + public void InitEventQueue(string sdkKey, string options) { } + public BucketedUserConfig GenerateBucketedConfig(string sdkKey, string user) => + new BucketedUserConfig(); + public int EventQueueSize(string sdkKey) => 0; + public void QueueEvent(string sdkKey, string user, string eventString) { } + public void QueueAggregateEvent(string sdkKey, string eventString, string variableVariationMapStr) { } + public void OnPayloadSuccess(string sdkKey, string payloadId) { } + public void OnPayloadFailure(string sdkKey, string payloadId, bool retryable) { } + public void StoreConfig(string sdkKey, string config) { } + public void SetPlatformData(string platformData) { } + public string GetVariable(string sdkKey, string userJSON, string key, TypeEnum variableType, bool shouldTrackEvent) => null; + public string GetConfigMetadata(string sdkKey) => null; + public byte[] GetVariableForUserProtobuf(byte[] serializedParams) => null; + public void SetClientCustomData(string sdkKey, string customData) { } + } } } \ No newline at end of file diff --git a/DevCycle.SDK.Server.Local/Api/EventQueue.cs b/DevCycle.SDK.Server.Local/Api/EventQueue.cs index 573e1d5..b05e533 100644 --- a/DevCycle.SDK.Server.Local/Api/EventQueue.cs +++ b/DevCycle.SDK.Server.Local/Api/EventQueue.cs @@ -64,16 +64,51 @@ private async Task> GetPayloadResult(FlushPayl public virtual async Task FlushEvents() { localBucketing.StartFlush(); - var flushPayloads = GetPayloads(); + try + { + await FlushEventsInternal(); + } + finally + { + // Ensure the FlushMutex is always released, even if the WASM + // bucketing engine throws inside flushEventQueue or any of + // the downstream calls. Without this, a single WASM trap + // during a flush would permanently leak the mutex and all + // subsequent flushes would deadlock - causing events to + // accumulate indefinitely in the WASM heap. + localBucketing.EndFlush(); + } + } + + private async Task FlushEventsInternal() + { var flushResultEvent = new DevCycleEventArgs { Success = true }; + List flushPayloads; + try + { + flushPayloads = GetPayloads(); + } + catch (Exception ex) + { + // GetPayloads can throw if the WASM bucketing engine traps + // (e.g. its event queue state is corrupted, or a flush-path + // throw fires inside the AssemblyScript). Surface the + // failure to flush subscribers and return cleanly so the + // mutex gets released by the outer finally. + flushResultEvent.Success = false; + flushResultEvent.Errors.Add(ex as DevCycleException + ?? new DevCycleException(new ErrorResponse(ex.Message))); + OnFlushedEvents(flushResultEvent); + return; + } + if (flushPayloads.Count == 0) { OnFlushedEvents(flushResultEvent); - localBucketing.EndFlush(); return; } @@ -105,17 +140,27 @@ public virtual async Task FlushEvents() localBucketing.OnPayloadSuccess(sdkKey, flushPayload.PayloadID); } } - catch (DevCycleException ex) + catch (Exception ex) { logger.LogError($"DevCycle Error Flushing Events response message: ${ex.Message}"); - localBucketing.OnPayloadFailure(sdkKey, flushPayload.PayloadID, true); + // Best-effort mark as failure; if this itself throws + // (because the WASM is corrupted), swallow so we still + // get to the outer finally and release the FlushMutex. + try + { + localBucketing.OnPayloadFailure(sdkKey, flushPayload.PayloadID, true); + } + catch (Exception markEx) + { + logger.LogError($"DevCycle Error marking payload as failure: ${markEx.Message}"); + } flushResultEvent.Success = false; - flushResultEvent.Errors.Add(ex); + flushResultEvent.Errors.Add(ex as DevCycleException + ?? new DevCycleException(new ErrorResponse(ex.Message))); } } OnFlushedEvents(flushResultEvent); - localBucketing.EndFlush(); } private List GetPayloads() From ea96fca52844b0a5db62d406cd42b9eba6623407 Mon Sep 17 00:00:00 2001 From: Jonathan Norris Date: Mon, 4 May 2026 15:50:36 -0400 Subject: [PATCH 2/2] review: address Copilot feedback on PR #204 - EventQueue.cs: revert per-payload catch from 'Exception' back to 'DevCycleException'. The broadened catch was over-aggressive and would have re-queued payloads as retryable failures if OnPayloadSuccess/OnPayloadFailure themselves threw, including re-sending events that DevCycle had already accepted with a 201. The outer try/finally on FlushEvents() ensures EndFlush is called regardless, so the inner catch doesn't need to swallow non-HTTP exceptions. - EventQueueTest.cs: TrappingLocalBucketing fake now uses a real SemaphoreSlim with a 5s timeout for StartFlush/EndFlush. A regression that skips EndFlush will now actually deadlock the next StartFlush, surfaced as InvalidOperationException, so the deadlock test exercises the real failure mode instead of just counting calls. - Wrapped the first FlushEvents call in the deadlock test in try/catch so the test specifically exercises 'second flush after failed first flush' regardless of whether the first call propagates the underlying exception. --- .../EventQueueTest.cs | 57 ++++++++++++++----- DevCycle.SDK.Server.Local/Api/EventQueue.cs | 17 +----- 2 files changed, 47 insertions(+), 27 deletions(-) diff --git a/DevCycle.SDK.Server.Local.MSTests/EventQueueTest.cs b/DevCycle.SDK.Server.Local.MSTests/EventQueueTest.cs index a3d12d0..82c4438 100644 --- a/DevCycle.SDK.Server.Local.MSTests/EventQueueTest.cs +++ b/DevCycle.SDK.Server.Local.MSTests/EventQueueTest.cs @@ -257,22 +257,32 @@ public async Task FlushEvents_WhenFlushEventQueueThrows_RaisesFailureToSubscribe [TestMethod] public async Task FlushEvents_AfterFailedFlush_NextFlushDoesNotDeadlock() { - // This test would hang forever before the fix: the first flush - // throws inside FlushEventQueue, EndFlush never runs, FlushMutex - // is leaked, and the second flush blocks on StartFlush forever. + // The fake's StartFlush uses a real SemaphoreSlim with a 5s + // timeout, so a regression that skips EndFlush will surface as + // an InvalidOperationException ("StartFlush deadlocked") on + // the second StartFlush call. The first flush is wrapped in + // try/catch so the test specifically exercises the second + // flush behaviour. var bucketing = new TrappingLocalBucketing(throwOnFlushEventQueue: true); var eventQueue = BuildEventQueueWithFakeBucketing(bucketing); - await eventQueue.FlushEvents(); + try + { + await eventQueue.FlushEvents(); + } + catch (Exception) + { + // A regression where FlushEvents propagates the WASM + // exception is OK for THIS test - we still want to verify + // the second flush isn't blocked by a leaked mutex. + } // Now turn off the fault and try again. With the fix, this - // completes promptly. Without the fix, this deadlocks. + // completes promptly. Without the fix, the fake's StartFlush + // detects the leaked semaphore and throws. bucketing.ThrowOnFlushEventQueue = false; - var second = eventQueue.FlushEvents(); - var completed = await Task.WhenAny(second, Task.Delay(TimeSpan.FromSeconds(5))); + await eventQueue.FlushEvents(); - Assert.AreSame(second, completed, - "Second flush should complete instead of deadlocking on a leaked FlushMutex."); Assert.AreEqual(2, bucketing.StartFlushCount); Assert.AreEqual(2, bucketing.EndFlushCount); } @@ -292,11 +302,15 @@ private EventQueue BuildEventQueueWithFakeBucketing(TrappingLocalBucketing bucke /// /// Test fake of that simulates a WASM - /// trap inside FlushEventQueue. Tracks Start/EndFlush call - /// counts so tests can assert the FlushMutex was released. + /// trap inside FlushEventQueue. Uses a real + /// for Start/EndFlush so that a missed + /// EndFlush actually deadlocks subsequent StartFlush calls (matching + /// the behaviour of the real WASMLocalBucketing.FlushMutex). /// private class TrappingLocalBucketing : ILocalBucketing { + private readonly SemaphoreSlim flushMutex = new SemaphoreSlim(1, 1); + public bool ThrowOnFlushEventQueue { get; set; } public int StartFlushCount { get; private set; } public int EndFlushCount { get; private set; } @@ -318,8 +332,25 @@ public List FlushEventQueue(string sdkKey) return new List(); } - public void StartFlush() { StartFlushCount++; } - public void EndFlush() { EndFlushCount++; } + public void StartFlush() + { + // Block (with a 5s timeout so a regression doesn't hang the + // test runner forever) - mirrors WasmLocalBucketing.FlushMutex.Wait(). + if (!flushMutex.Wait(TimeSpan.FromSeconds(5))) + { + throw new InvalidOperationException( + "TrappingLocalBucketing.StartFlush deadlocked - " + + "EndFlush was not called for a previous flush. " + + "This indicates the FlushMutex leak regression."); + } + StartFlushCount++; + } + + public void EndFlush() + { + EndFlushCount++; + flushMutex.Release(); + } // Unused on the flush path - return safe defaults / no-ops. public void InitEventQueue(string sdkKey, string options) { } diff --git a/DevCycle.SDK.Server.Local/Api/EventQueue.cs b/DevCycle.SDK.Server.Local/Api/EventQueue.cs index b05e533..eedbe25 100644 --- a/DevCycle.SDK.Server.Local/Api/EventQueue.cs +++ b/DevCycle.SDK.Server.Local/Api/EventQueue.cs @@ -140,23 +140,12 @@ private async Task FlushEventsInternal() localBucketing.OnPayloadSuccess(sdkKey, flushPayload.PayloadID); } } - catch (Exception ex) + catch (DevCycleException ex) { logger.LogError($"DevCycle Error Flushing Events response message: ${ex.Message}"); - // Best-effort mark as failure; if this itself throws - // (because the WASM is corrupted), swallow so we still - // get to the outer finally and release the FlushMutex. - try - { - localBucketing.OnPayloadFailure(sdkKey, flushPayload.PayloadID, true); - } - catch (Exception markEx) - { - logger.LogError($"DevCycle Error marking payload as failure: ${markEx.Message}"); - } + localBucketing.OnPayloadFailure(sdkKey, flushPayload.PayloadID, true); flushResultEvent.Success = false; - flushResultEvent.Errors.Add(ex as DevCycleException - ?? new DevCycleException(new ErrorResponse(ex.Message))); + flushResultEvent.Errors.Add(ex); } }