Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 178 additions & 0 deletions DevCycle.SDK.Server.Local.MSTests/EventQueueTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -190,5 +190,183 @@ private void AssertFailedEvent(object sender, DevCycleEventArgs e)
Assert.AreNotEqual(0, e.Errors.Count);
Assert.IsFalse(e.Success);
}

// ---------------------------------------------------------------
// Tests for the FlushMutex leak fix.
//
// Background: when the WASM bucketing engine traps inside
// flushEventQueue (e.g. its event queue state is corrupted from
// accumulated AssemblyScript throws), the previous implementation
// of FlushEvents() let the exception escape AFTER calling
// localBucketing.StartFlush() but BEFORE calling EndFlush(). That
// permanently leaked the FlushMutex and every subsequent flush
// would deadlock, causing events to accumulate in the WASM heap
// indefinitely. The fix wraps StartFlush/EndFlush in try/finally
// and swallows WASM exceptions so the mutex is always released.
// ---------------------------------------------------------------

[TestMethod]
public async Task FlushEvents_WhenFlushEventQueueThrows_DoesNotPropagate()
{
var bucketing = new TrappingLocalBucketing(throwOnFlushEventQueue: true);
var eventQueue = BuildEventQueueWithFakeBucketing(bucketing);

// Should NOT throw - the fix swallows the exception so the
// mutex outer try/finally can release the FlushMutex.
await eventQueue.FlushEvents();
}

[TestMethod]
public async Task FlushEvents_WhenFlushEventQueueThrows_StillCallsEndFlush()
{
var bucketing = new TrappingLocalBucketing(throwOnFlushEventQueue: true);
var eventQueue = BuildEventQueueWithFakeBucketing(bucketing);

await eventQueue.FlushEvents();

Assert.AreEqual(1, bucketing.StartFlushCount,
"StartFlush should have been called exactly once.");
Assert.AreEqual(1, bucketing.EndFlushCount,
"EndFlush MUST be called even when FlushEventQueue throws, "
+ "otherwise the FlushMutex semaphore is permanently leaked.");
}

[TestMethod]
public async Task FlushEvents_WhenFlushEventQueueThrows_RaisesFailureToSubscribers()
{
var bucketing = new TrappingLocalBucketing(throwOnFlushEventQueue: true);
var eventQueue = BuildEventQueueWithFakeBucketing(bucketing);

DevCycleEventArgs received = null;
var completion = new ManualResetEvent(false);
eventQueue.AddFlushedEventsSubscriber((sender, e) =>
{
received = e;
completion.Set();
});

await eventQueue.FlushEvents();
completion.WaitOne(TimeSpan.FromSeconds(2));

Assert.IsNotNull(received, "FlushedEvents subscriber should have been invoked.");
Assert.IsFalse(received.Success);
Assert.IsTrue(received.Errors.Count > 0,
"Errors collection should contain the underlying exception.");
}

[TestMethod]
public async Task FlushEvents_AfterFailedFlush_NextFlushDoesNotDeadlock()
{
// The fake's StartFlush uses a real SemaphoreSlim with a 5s
// timeout, so a regression that skips EndFlush will surface as
// an InvalidOperationException ("StartFlush deadlocked") on
// the second StartFlush call. The first flush is wrapped in
// try/catch so the test specifically exercises the second
// flush behaviour.
var bucketing = new TrappingLocalBucketing(throwOnFlushEventQueue: true);
var eventQueue = BuildEventQueueWithFakeBucketing(bucketing);

try
{
await eventQueue.FlushEvents();
}
catch (Exception)
{
// A regression where FlushEvents propagates the WASM
// exception is OK for THIS test - we still want to verify
// the second flush isn't blocked by a leaked mutex.
}

// Now turn off the fault and try again. With the fix, this
// completes promptly. Without the fix, the fake's StartFlush
// detects the leaked semaphore and throws.
bucketing.ThrowOnFlushEventQueue = false;
await eventQueue.FlushEvents();

Assert.AreEqual(2, bucketing.StartFlushCount);
Assert.AreEqual(2, bucketing.EndFlushCount);
}

private EventQueue BuildEventQueueWithFakeBucketing(TrappingLocalBucketing bucketing)
{
var sdkKey = $"server-{Guid.NewGuid()}";
var loggerFactory = LoggerFactory.Create(b => b.SetMinimumLevel(LogLevel.None));
var options = new DevCycleLocalOptions(10, 10);

var mockHttp = new MockHttpMessageHandler();
mockHttp.When("https://*").Respond(HttpStatusCode.Created, "application/json", "{}");

return new EventQueue(sdkKey, options, loggerFactory, bucketing,
new DevCycleRestClientOptions { ConfigureMessageHandler = _ => mockHttp });
}

/// <summary>
/// Test fake of <see cref="ILocalBucketing"/> that simulates a WASM
/// trap inside <c>FlushEventQueue</c>. Uses a real
/// <see cref="SemaphoreSlim"/> for Start/EndFlush so that a missed
/// EndFlush actually deadlocks subsequent StartFlush calls (matching
/// the behaviour of the real <c>WASMLocalBucketing.FlushMutex</c>).
/// </summary>
private class TrappingLocalBucketing : ILocalBucketing
{
private readonly SemaphoreSlim flushMutex = new SemaphoreSlim(1, 1);

public bool ThrowOnFlushEventQueue { get; set; }
public int StartFlushCount { get; private set; }
public int EndFlushCount { get; private set; }

public TrappingLocalBucketing(bool throwOnFlushEventQueue)
{
ThrowOnFlushEventQueue = throwOnFlushEventQueue;
}

public string ClientUUID => "test-client-uuid";

public List<FlushPayload> FlushEventQueue(string sdkKey)
{
if (ThrowOnFlushEventQueue)
{
// Mimics a wasmtime trap surfaced as LocalBucketingException.
throw new LocalBucketingException("Simulated WASM trap during flushEventQueue");
}
return new List<FlushPayload>();
}

public void StartFlush()
{
// Block (with a 5s timeout so a regression doesn't hang the
// test runner forever) - mirrors WasmLocalBucketing.FlushMutex.Wait().
if (!flushMutex.Wait(TimeSpan.FromSeconds(5)))
{
throw new InvalidOperationException(
"TrappingLocalBucketing.StartFlush deadlocked - "
+ "EndFlush was not called for a previous flush. "
+ "This indicates the FlushMutex leak regression.");
}
StartFlushCount++;
}

public void EndFlush()
{
EndFlushCount++;
flushMutex.Release();
}

// Unused on the flush path - return safe defaults / no-ops.
public void InitEventQueue(string sdkKey, string options) { }
public BucketedUserConfig GenerateBucketedConfig(string sdkKey, string user) =>
new BucketedUserConfig();
public int EventQueueSize(string sdkKey) => 0;
public void QueueEvent(string sdkKey, string user, string eventString) { }
public void QueueAggregateEvent(string sdkKey, string eventString, string variableVariationMapStr) { }
public void OnPayloadSuccess(string sdkKey, string payloadId) { }
public void OnPayloadFailure(string sdkKey, string payloadId, bool retryable) { }
public void StoreConfig(string sdkKey, string config) { }
public void SetPlatformData(string platformData) { }
public string GetVariable(string sdkKey, string userJSON, string key, TypeEnum variableType, bool shouldTrackEvent) => null;
public string GetConfigMetadata(string sdkKey) => null;
public byte[] GetVariableForUserProtobuf(byte[] serializedParams) => null;
public void SetClientCustomData(string sdkKey, string customData) { }
}
}
}
40 changes: 37 additions & 3 deletions DevCycle.SDK.Server.Local/Api/EventQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
DevCycleLocalOptions localOptions,
ILoggerFactory loggerFactory,
ILocalBucketing localBucketing,
DevCycleRestClientOptions restClientOptions = null

Check warning on line 36 in DevCycle.SDK.Server.Local/Api/EventQueue.cs

View workflow job for this annotation

GitHub Actions / run-example

Cannot convert null literal to non-nullable reference type.

Check warning on line 36 in DevCycle.SDK.Server.Local/Api/EventQueue.cs

View workflow job for this annotation

GitHub Actions / run-example

Cannot convert null literal to non-nullable reference type.

Check warning on line 36 in DevCycle.SDK.Server.Local/Api/EventQueue.cs

View workflow job for this annotation

GitHub Actions / build

Cannot convert null literal to non-nullable reference type.
)
{
devCycleEventsApiClient = new DevCycleEventsApiClient(sdkKey, localOptions, restClientOptions);
Expand Down Expand Up @@ -64,16 +64,51 @@
public virtual async Task FlushEvents()
{
localBucketing.StartFlush();
var flushPayloads = GetPayloads();
try
{
await FlushEventsInternal();
}
finally
{
// Ensure the FlushMutex is always released, even if the WASM
// bucketing engine throws inside flushEventQueue or any of
// the downstream calls. Without this, a single WASM trap
// during a flush would permanently leak the mutex and all
// subsequent flushes would deadlock - causing events to
// accumulate indefinitely in the WASM heap.
localBucketing.EndFlush();
}
}

private async Task FlushEventsInternal()
{
var flushResultEvent = new DevCycleEventArgs
{
Success = true
};

List<FlushPayload> flushPayloads;
try
{
flushPayloads = GetPayloads();
}
catch (Exception ex)
{
// GetPayloads can throw if the WASM bucketing engine traps
// (e.g. its event queue state is corrupted, or a flush-path
// throw fires inside the AssemblyScript). Surface the
// failure to flush subscribers and return cleanly so the
// mutex gets released by the outer finally.
flushResultEvent.Success = false;
flushResultEvent.Errors.Add(ex as DevCycleException
?? new DevCycleException(new ErrorResponse(ex.Message)));
OnFlushedEvents(flushResultEvent);
return;
}

if (flushPayloads.Count == 0)
{
OnFlushedEvents(flushResultEvent);
localBucketing.EndFlush();
return;
}

Expand Down Expand Up @@ -115,7 +150,6 @@
}

OnFlushedEvents(flushResultEvent);
localBucketing.EndFlush();
}

private List<FlushPayload> GetPayloads()
Expand Down
Loading