diff --git a/DevCycle.SDK.Server.Local.MSTests/EventQueueTest.cs b/DevCycle.SDK.Server.Local.MSTests/EventQueueTest.cs index c9757a9..82c4438 100644 --- a/DevCycle.SDK.Server.Local.MSTests/EventQueueTest.cs +++ b/DevCycle.SDK.Server.Local.MSTests/EventQueueTest.cs @@ -190,5 +190,183 @@ private void AssertFailedEvent(object sender, DevCycleEventArgs e) Assert.AreNotEqual(0, e.Errors.Count); Assert.IsFalse(e.Success); } + + // --------------------------------------------------------------- + // Tests for the FlushMutex leak fix. + // + // Background: when the WASM bucketing engine traps inside + // flushEventQueue (e.g. its event queue state is corrupted from + // accumulated AssemblyScript throws), the previous implementation + // of FlushEvents() let the exception escape AFTER calling + // localBucketing.StartFlush() but BEFORE calling EndFlush(). That + // permanently leaked the FlushMutex and every subsequent flush + // would deadlock, causing events to accumulate in the WASM heap + // indefinitely. The fix wraps StartFlush/EndFlush in try/finally + // and swallows WASM exceptions so the mutex is always released. + // --------------------------------------------------------------- + + [TestMethod] + public async Task FlushEvents_WhenFlushEventQueueThrows_DoesNotPropagate() + { + var bucketing = new TrappingLocalBucketing(throwOnFlushEventQueue: true); + var eventQueue = BuildEventQueueWithFakeBucketing(bucketing); + + // Should NOT throw - the fix swallows the exception so the + // mutex outer try/finally can release the FlushMutex. + await eventQueue.FlushEvents(); + } + + [TestMethod] + public async Task FlushEvents_WhenFlushEventQueueThrows_StillCallsEndFlush() + { + var bucketing = new TrappingLocalBucketing(throwOnFlushEventQueue: true); + var eventQueue = BuildEventQueueWithFakeBucketing(bucketing); + + await eventQueue.FlushEvents(); + + Assert.AreEqual(1, bucketing.StartFlushCount, + "StartFlush should have been called exactly once."); + Assert.AreEqual(1, bucketing.EndFlushCount, + "EndFlush MUST be called even when FlushEventQueue throws, " + + "otherwise the FlushMutex semaphore is permanently leaked."); + } + + [TestMethod] + public async Task FlushEvents_WhenFlushEventQueueThrows_RaisesFailureToSubscribers() + { + var bucketing = new TrappingLocalBucketing(throwOnFlushEventQueue: true); + var eventQueue = BuildEventQueueWithFakeBucketing(bucketing); + + DevCycleEventArgs received = null; + var completion = new ManualResetEvent(false); + eventQueue.AddFlushedEventsSubscriber((sender, e) => + { + received = e; + completion.Set(); + }); + + await eventQueue.FlushEvents(); + completion.WaitOne(TimeSpan.FromSeconds(2)); + + Assert.IsNotNull(received, "FlushedEvents subscriber should have been invoked."); + Assert.IsFalse(received.Success); + Assert.IsTrue(received.Errors.Count > 0, + "Errors collection should contain the underlying exception."); + } + + [TestMethod] + public async Task FlushEvents_AfterFailedFlush_NextFlushDoesNotDeadlock() + { + // The fake's StartFlush uses a real SemaphoreSlim with a 5s + // timeout, so a regression that skips EndFlush will surface as + // an InvalidOperationException ("StartFlush deadlocked") on + // the second StartFlush call. The first flush is wrapped in + // try/catch so the test specifically exercises the second + // flush behaviour. + var bucketing = new TrappingLocalBucketing(throwOnFlushEventQueue: true); + var eventQueue = BuildEventQueueWithFakeBucketing(bucketing); + + try + { + await eventQueue.FlushEvents(); + } + catch (Exception) + { + // A regression where FlushEvents propagates the WASM + // exception is OK for THIS test - we still want to verify + // the second flush isn't blocked by a leaked mutex. + } + + // Now turn off the fault and try again. With the fix, this + // completes promptly. Without the fix, the fake's StartFlush + // detects the leaked semaphore and throws. + bucketing.ThrowOnFlushEventQueue = false; + await eventQueue.FlushEvents(); + + Assert.AreEqual(2, bucketing.StartFlushCount); + Assert.AreEqual(2, bucketing.EndFlushCount); + } + + private EventQueue BuildEventQueueWithFakeBucketing(TrappingLocalBucketing bucketing) + { + var sdkKey = $"server-{Guid.NewGuid()}"; + var loggerFactory = LoggerFactory.Create(b => b.SetMinimumLevel(LogLevel.None)); + var options = new DevCycleLocalOptions(10, 10); + + var mockHttp = new MockHttpMessageHandler(); + mockHttp.When("https://*").Respond(HttpStatusCode.Created, "application/json", "{}"); + + return new EventQueue(sdkKey, options, loggerFactory, bucketing, + new DevCycleRestClientOptions { ConfigureMessageHandler = _ => mockHttp }); + } + + /// + /// Test fake of that simulates a WASM + /// trap inside FlushEventQueue. Uses a real + /// for Start/EndFlush so that a missed + /// EndFlush actually deadlocks subsequent StartFlush calls (matching + /// the behaviour of the real WASMLocalBucketing.FlushMutex). + /// + private class TrappingLocalBucketing : ILocalBucketing + { + private readonly SemaphoreSlim flushMutex = new SemaphoreSlim(1, 1); + + public bool ThrowOnFlushEventQueue { get; set; } + public int StartFlushCount { get; private set; } + public int EndFlushCount { get; private set; } + + public TrappingLocalBucketing(bool throwOnFlushEventQueue) + { + ThrowOnFlushEventQueue = throwOnFlushEventQueue; + } + + public string ClientUUID => "test-client-uuid"; + + public List FlushEventQueue(string sdkKey) + { + if (ThrowOnFlushEventQueue) + { + // Mimics a wasmtime trap surfaced as LocalBucketingException. + throw new LocalBucketingException("Simulated WASM trap during flushEventQueue"); + } + return new List(); + } + + public void StartFlush() + { + // Block (with a 5s timeout so a regression doesn't hang the + // test runner forever) - mirrors WasmLocalBucketing.FlushMutex.Wait(). + if (!flushMutex.Wait(TimeSpan.FromSeconds(5))) + { + throw new InvalidOperationException( + "TrappingLocalBucketing.StartFlush deadlocked - " + + "EndFlush was not called for a previous flush. " + + "This indicates the FlushMutex leak regression."); + } + StartFlushCount++; + } + + public void EndFlush() + { + EndFlushCount++; + flushMutex.Release(); + } + + // Unused on the flush path - return safe defaults / no-ops. + public void InitEventQueue(string sdkKey, string options) { } + public BucketedUserConfig GenerateBucketedConfig(string sdkKey, string user) => + new BucketedUserConfig(); + public int EventQueueSize(string sdkKey) => 0; + public void QueueEvent(string sdkKey, string user, string eventString) { } + public void QueueAggregateEvent(string sdkKey, string eventString, string variableVariationMapStr) { } + public void OnPayloadSuccess(string sdkKey, string payloadId) { } + public void OnPayloadFailure(string sdkKey, string payloadId, bool retryable) { } + public void StoreConfig(string sdkKey, string config) { } + public void SetPlatformData(string platformData) { } + public string GetVariable(string sdkKey, string userJSON, string key, TypeEnum variableType, bool shouldTrackEvent) => null; + public string GetConfigMetadata(string sdkKey) => null; + public byte[] GetVariableForUserProtobuf(byte[] serializedParams) => null; + public void SetClientCustomData(string sdkKey, string customData) { } + } } } \ No newline at end of file diff --git a/DevCycle.SDK.Server.Local/Api/EventQueue.cs b/DevCycle.SDK.Server.Local/Api/EventQueue.cs index 573e1d5..eedbe25 100644 --- a/DevCycle.SDK.Server.Local/Api/EventQueue.cs +++ b/DevCycle.SDK.Server.Local/Api/EventQueue.cs @@ -64,16 +64,51 @@ private async Task> GetPayloadResult(FlushPayl public virtual async Task FlushEvents() { localBucketing.StartFlush(); - var flushPayloads = GetPayloads(); + try + { + await FlushEventsInternal(); + } + finally + { + // Ensure the FlushMutex is always released, even if the WASM + // bucketing engine throws inside flushEventQueue or any of + // the downstream calls. Without this, a single WASM trap + // during a flush would permanently leak the mutex and all + // subsequent flushes would deadlock - causing events to + // accumulate indefinitely in the WASM heap. + localBucketing.EndFlush(); + } + } + + private async Task FlushEventsInternal() + { var flushResultEvent = new DevCycleEventArgs { Success = true }; + List flushPayloads; + try + { + flushPayloads = GetPayloads(); + } + catch (Exception ex) + { + // GetPayloads can throw if the WASM bucketing engine traps + // (e.g. its event queue state is corrupted, or a flush-path + // throw fires inside the AssemblyScript). Surface the + // failure to flush subscribers and return cleanly so the + // mutex gets released by the outer finally. + flushResultEvent.Success = false; + flushResultEvent.Errors.Add(ex as DevCycleException + ?? new DevCycleException(new ErrorResponse(ex.Message))); + OnFlushedEvents(flushResultEvent); + return; + } + if (flushPayloads.Count == 0) { OnFlushedEvents(flushResultEvent); - localBucketing.EndFlush(); return; } @@ -115,7 +150,6 @@ public virtual async Task FlushEvents() } OnFlushedEvents(flushResultEvent); - localBucketing.EndFlush(); } private List GetPayloads()