From 7ee5a401fc375ef98ec9365a2bd41b4fd2acbc1d Mon Sep 17 00:00:00 2001 From: ImmutableJeffrey Date: Thu, 23 Apr 2026 14:19:18 +1000 Subject: [PATCH 01/11] fix(audience-http): lock HttpTransport backoff state _consecutiveFailures and _nextAttemptAt had no synchronisation. Readers (RescheduleSendTimer, FlushAsync.IsInBackoffWindow) could observe torn state while SendBatchAsync wrote them. Add a dedicated _backoffLock. All reads and writes go through it. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../Runtime/Transport/HttpTransport.cs | 40 ++++++++++++++----- 1 file changed, 30 insertions(+), 10 deletions(-) diff --git a/src/Packages/Audience/Runtime/Transport/HttpTransport.cs b/src/Packages/Audience/Runtime/Transport/HttpTransport.cs index 67750ccb7..b04195d67 100644 --- a/src/Packages/Audience/Runtime/Transport/HttpTransport.cs +++ b/src/Packages/Audience/Runtime/Transport/HttpTransport.cs @@ -22,6 +22,7 @@ internal sealed class HttpTransport : IDisposable private readonly Action? _onError; private readonly Func _getUtcNow; + private readonly object _backoffLock = new object(); private int _consecutiveFailures; private DateTime? _nextAttemptAt; @@ -131,7 +132,15 @@ internal async Task SendBatchAsync(CancellationToken ct = default) return true; } - internal int BackoffMs => _consecutiveFailures switch + internal int BackoffMs + { + get + { + lock (_backoffLock) return BackoffMsLocked(); + } + } + + private int BackoffMsLocked() => _consecutiveFailures switch { <= 0 => 0, 1 => 5_000, @@ -143,10 +152,16 @@ internal async Task SendBatchAsync(CancellationToken ct = default) // Earliest UTC time at which the next attempt may run. // Null when no backoff is active. - internal DateTime? NextAttemptAt => _nextAttemptAt; + internal DateTime? NextAttemptAt + { + get { lock (_backoffLock) return _nextAttemptAt; } + } // True while UtcNow < NextAttemptAt. Flips false as the clock advances. - internal bool IsInBackoffWindow => _getUtcNow() < _nextAttemptAt; + internal bool IsInBackoffWindow + { + get { lock (_backoffLock) return _getUtcNow() < _nextAttemptAt; } + } public void Dispose() { @@ -155,17 +170,22 @@ public void Dispose() private void RecordFailure() { - var now = _getUtcNow(); - if (now < _nextAttemptAt) return; // inside prior window — don't compound backoff - - _consecutiveFailures++; - _nextAttemptAt = now.AddMilliseconds(BackoffMs); + lock (_backoffLock) + { + var now = _getUtcNow(); + if (now < _nextAttemptAt) return; // inside prior window — don't compound backoff + _consecutiveFailures++; + _nextAttemptAt = now.AddMilliseconds(BackoffMsLocked()); + } } private void ResetBackoff() { - _consecutiveFailures = 0; - _nextAttemptAt = null; + lock (_backoffLock) + { + _consecutiveFailures = 0; + _nextAttemptAt = null; + } } // Reads each path and wraps the concatenated JSON bodies in From 6f30b3adf74cbd7cd1e67d099a43ce29dd03e905 Mon Sep 17 00:00:00 2001 From: ImmutableJeffrey Date: Thu, 23 Apr 2026 14:19:46 +1000 Subject: [PATCH 02/11] fix(audience-http): do not dispose consumer-supplied handler HttpTransport's HttpClient constructor used the default disposeHandler:true, which meant Shutdown disposed the consumer's HttpMessageHandler. A caller who shared the handler across Init cycles (tests, proxy config, connection pooling) saw it fail on the second Init. Matches _controlClient which already used disposeHandler:false. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/Packages/Audience/Runtime/Transport/HttpTransport.cs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/Packages/Audience/Runtime/Transport/HttpTransport.cs b/src/Packages/Audience/Runtime/Transport/HttpTransport.cs index b04195d67..22688f179 100644 --- a/src/Packages/Audience/Runtime/Transport/HttpTransport.cs +++ b/src/Packages/Audience/Runtime/Transport/HttpTransport.cs @@ -41,7 +41,11 @@ internal HttpTransport( _publishableKey = publishableKey ?? throw new ArgumentNullException(nameof(publishableKey)); _url = Constants.MessagesUrl(publishableKey); _onError = onError; - _client = handler != null ? new HttpClient(handler) : new HttpClient(); + // disposeHandler: false so the consumer can reuse their handler + // across Init/Shutdown cycles (matches _controlClient's policy). + _client = handler != null + ? new HttpClient(handler, disposeHandler: false) + : new HttpClient(); _client.Timeout = TimeSpan.FromSeconds(30); _getUtcNow = getUtcNow ?? (() => DateTime.UtcNow); } From 143d690cb799eb701f0843ec7ffd4e5b7c39dec2 Mon Sep 17 00:00:00 2001 From: ImmutableJeffrey Date: Thu, 23 Apr 2026 14:23:08 +1000 Subject: [PATCH 03/11] fix(audience-http): surface rejected count from 200 response MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per the Unity Implementation Plan §4.6, the backend may return {accepted, rejected} on a 2xx to signal per-message validation errors. The old code deleted the batch silently on any 2xx — studios had no way to see that some events were being dropped. Parse the response body, surface via OnError(ValidationRejected, ...) when rejected > 0. The batch is still deleted (rejections are validation errors; retries would not help). Body parse failures fall through as zero-rejected — a malformed diagnostic must not block the success path. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../Runtime/Transport/HttpTransport.cs | 43 ++++++++++++++++++- 1 file changed, 42 insertions(+), 1 deletion(-) diff --git a/src/Packages/Audience/Runtime/Transport/HttpTransport.cs b/src/Packages/Audience/Runtime/Transport/HttpTransport.cs index 22688f179..0134eee59 100644 --- a/src/Packages/Audience/Runtime/Transport/HttpTransport.cs +++ b/src/Packages/Audience/Runtime/Transport/HttpTransport.cs @@ -99,9 +99,17 @@ internal async Task SendBatchAsync(CancellationToken ct = default) if (statusCode >= 200 && statusCode < 300) { - // 2xx: server acked, drop the batch, healthy state. + // 2xx: server acked. Parse {accepted, rejected} and surface + // any partial rejections via onError — rejected events are + // validation errors, won't succeed on retry. + var rejected = await ParseRejectedCount(response).ConfigureAwait(false); _store.Delete(batch); ResetBackoff(); + if (rejected > 0) + { + NotifyError(AudienceErrorCode.ValidationRejected, + $"Batch partially rejected: {rejected} of {batch.Count} events dropped"); + } } else if (statusCode >= 400 && statusCode < 500) { @@ -225,6 +233,39 @@ private void ResetBackoff() return sb.ToString(); } + // Reads the body and extracts "rejected". Returns 0 on any parse or + // read failure — the body is diagnostic, so a malformed one must not + // block the success path. + private static async Task ParseRejectedCount(HttpResponseMessage response) + { + string body; + try + { + body = await response.Content.ReadAsStringAsync().ConfigureAwait(false); + } + catch + { + return 0; + } + if (string.IsNullOrEmpty(body)) return 0; + + try + { + var parsed = JsonReader.DeserializeObject(body); + if (!parsed.TryGetValue("rejected", out var raw)) return 0; + return raw switch + { + int i => i, + long l => (int)l, + _ => 0, + }; + } + catch (FormatException) + { + return 0; + } + } + private void NotifyError(AudienceErrorCode code, string message) { if (_onError == null) return; From c033c6c442359d0eb37a5a358144959a3d949553 Mon Sep 17 00:00:00 2001 From: ImmutableJeffrey Date: Thu, 23 Apr 2026 14:30:20 +1000 Subject: [PATCH 04/11] test(audience-http): partial-rejection warning on 200 with rejected>0 Pin the I9 fix (commit 09b23cdf): - 200 with rejected>0: batch deleted, ValidationRejected surfaced via onError with the rejected count. - 200 with rejected=0: onError silent. - 200 with malformed body: falls through as zero-rejected, batch still deleted. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../Runtime/Transport/HttpTransportTests.cs | 55 +++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/src/Packages/Audience/Tests/Runtime/Transport/HttpTransportTests.cs b/src/Packages/Audience/Tests/Runtime/Transport/HttpTransportTests.cs index 30a29f165..290028df5 100644 --- a/src/Packages/Audience/Tests/Runtime/Transport/HttpTransportTests.cs +++ b/src/Packages/Audience/Tests/Runtime/Transport/HttpTransportTests.cs @@ -182,6 +182,61 @@ public async Task SendBatchAsync_4xx_DeletesFilesAndResetsBackoff() Assert.AreEqual(AudienceErrorCode.ValidationRejected, reportedError.Code); } + [Test] + public async Task SendBatchAsync_200_WithRejected_DeletesFilesAndSurfacesValidationRejected() + { + // Per Unity Implementation Plan §4.6, a 200 with rejected>0 means + // per-message validation errors. The batch is deleted (retries + // would not help) and the count is surfaced via onError so + // studios can observe silently dropped events. + _store.Write("{\"type\":\"track\",\"eventName\":\"a\"}"); + _store.Write("{\"type\":\"track\",\"eventName\":\"b\"}"); + + var handler = new MockHandler(HttpStatusCode.OK, "{\"accepted\":1,\"rejected\":1}"); + AudienceError reportedError = null; + using var transport = new HttpTransport(_store, "pk_imapik-test-key1", + onError: e => reportedError = e, handler: handler); + + await transport.SendBatchAsync(); + + Assert.AreEqual(0, _store.Count(), "200 with rejected>0 should still delete the batch"); + Assert.IsNotNull(reportedError, "partial rejection must surface via onError"); + Assert.AreEqual(AudienceErrorCode.ValidationRejected, reportedError.Code); + StringAssert.Contains("1", reportedError.Message, "message should include the rejected count"); + } + + [Test] + public async Task SendBatchAsync_200_ZeroRejected_DoesNotFireOnError() + { + _store.Write("{\"type\":\"track\",\"eventName\":\"a\"}"); + + var handler = new MockHandler(HttpStatusCode.OK, "{\"accepted\":1,\"rejected\":0}"); + AudienceError reportedError = null; + using var transport = new HttpTransport(_store, "pk_imapik-test-key1", + onError: e => reportedError = e, handler: handler); + + await transport.SendBatchAsync(); + + Assert.IsNull(reportedError, "zero rejected must not fire onError"); + } + + [Test] + public async Task SendBatchAsync_200_MalformedBody_TreatsAsZeroRejected() + { + // Malformed diagnostic body must not block the success path. + _store.Write("{\"type\":\"track\",\"eventName\":\"a\"}"); + + var handler = new MockHandler(HttpStatusCode.OK, "not-json"); + AudienceError reportedError = null; + using var transport = new HttpTransport(_store, "pk_imapik-test-key1", + onError: e => reportedError = e, handler: handler); + + await transport.SendBatchAsync(); + + Assert.AreEqual(0, _store.Count(), "files should still be deleted on 200"); + Assert.IsNull(reportedError, "malformed body must not surface an error"); + } + [Test] public async Task SendBatchAsync_5xx_KeepsFilesAndIncreasesBackoff() { From 17c8c7529fc8c67c6fd5d0b2bb9b4a68873d920b Mon Sep 17 00:00:00 2001 From: ImmutableJeffrey Date: Thu, 23 Apr 2026 14:18:26 +1000 Subject: [PATCH 05/11] fix(audience): gate FlushAsync on _sendInFlight Two concurrent FlushAsync callers would both call ReadBatch with the same paths and double-POST. Reuse the timer-tick gate so at most one SendBatchAsync runs at a time; other callers poll cheaply until the gate clears. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../Audience/Runtime/ImmutableAudience.cs | 20 +++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/src/Packages/Audience/Runtime/ImmutableAudience.cs b/src/Packages/Audience/Runtime/ImmutableAudience.cs index 9eaa98503..55fc8b79f 100644 --- a/src/Packages/Audience/Runtime/ImmutableAudience.cs +++ b/src/Packages/Audience/Runtime/ImmutableAudience.cs @@ -554,9 +554,25 @@ public static async Task FlushAsync() queue.FlushSync(); - while (!transport.IsInBackoffWindow && - await transport.SendBatchAsync().ConfigureAwait(false)) + // Serialise SendBatchAsync via _sendInFlight. Without the gate, + // two concurrent FlushAsync callers both call ReadBatch with the + // same paths and double-POST. Poll cheaply while another caller + // (timer SendBatch or a racing FlushAsync) holds the gate. + while (Interlocked.CompareExchange(ref _sendInFlight, 1, 0) != 0) { + await Task.Yield(); + } + + try + { + while (!transport.IsInBackoffWindow && + await transport.SendBatchAsync().ConfigureAwait(false)) + { + } + } + finally + { + Interlocked.Exchange(ref _sendInFlight, 0); } } From 626ec69a9aced995ba4cc4733a5d49126f75d821 Mon Sep 17 00:00:00 2001 From: ImmutableJeffrey Date: Thu, 23 Apr 2026 14:25:18 +1000 Subject: [PATCH 06/11] fix(audience): flushAsync accepts CancellationToken, swallows ObjectDisposedException MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add optional CancellationToken. Caller can cancel the gate-wait and the in-flight HTTP send (default is CancellationToken.None — no behaviour change for existing callers). - Catch ObjectDisposedException thrown when a concurrent Shutdown disposed the transport mid-flush. Previously, the exception propagated to the awaiter. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/Packages/Audience/Runtime/ImmutableAudience.cs | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/Packages/Audience/Runtime/ImmutableAudience.cs b/src/Packages/Audience/Runtime/ImmutableAudience.cs index 55fc8b79f..8fc60fe8f 100644 --- a/src/Packages/Audience/Runtime/ImmutableAudience.cs +++ b/src/Packages/Audience/Runtime/ImmutableAudience.cs @@ -543,8 +543,9 @@ private static void SyncConsentToBackend(AudienceConfig config, ConsentLevel lev // Flush / Shutdown // ----------------------------------------------------------------- - // Sends all pending events now. - public static async Task FlushAsync() + // Sends all pending events now. Respects cancellationToken for both + // the gate wait and the HTTP send. + public static async Task FlushAsync(CancellationToken cancellationToken = default) { if (!_initialized) return; @@ -560,16 +561,22 @@ public static async Task FlushAsync() // (timer SendBatch or a racing FlushAsync) holds the gate. while (Interlocked.CompareExchange(ref _sendInFlight, 1, 0) != 0) { + cancellationToken.ThrowIfCancellationRequested(); await Task.Yield(); } try { while (!transport.IsInBackoffWindow && - await transport.SendBatchAsync().ConfigureAwait(false)) + await transport.SendBatchAsync(cancellationToken).ConfigureAwait(false)) { } } + catch (ObjectDisposedException) + { + // Concurrent Shutdown disposed the transport. Exit silently — + // caller is tearing down. + } finally { Interlocked.Exchange(ref _sendInFlight, 0); From 5804ad5862162d91098509d698e552bcf8c191a8 Mon Sep 17 00:00:00 2001 From: ImmutableJeffrey Date: Thu, 23 Apr 2026 14:28:22 +1000 Subject: [PATCH 07/11] test(audience): concurrent FlushAsync must serialise on _sendInFlight Pin the H2-A fix (commit f58260d5): two parallel FlushAsync calls must not both issue HTTP POSTs against the same on-disk batch. BlockingHandler blocks in SendAsync until released. First FlushAsync enters; second starts and must wait on the gate; RequestCount stays at 1. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../Tests/Runtime/ImmutableAudienceTests.cs | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/src/Packages/Audience/Tests/Runtime/ImmutableAudienceTests.cs b/src/Packages/Audience/Tests/Runtime/ImmutableAudienceTests.cs index e71f19c76..e40e4f570 100644 --- a/src/Packages/Audience/Tests/Runtime/ImmutableAudienceTests.cs +++ b/src/Packages/Audience/Tests/Runtime/ImmutableAudienceTests.cs @@ -1145,6 +1145,39 @@ public void SendBatch_ConcurrentTicks_OnlyOneReachesTransport() "overlapping tick must not issue a second HTTP request"); } + [Test] + public void FlushAsync_ConcurrentCallers_OnlyOneReachesTransport() + { + // Two parallel FlushAsync calls must serialise on _sendInFlight so + // ReadBatch/POST pairs do not double-send. Sabotage: remove the + // gate in FlushAsync and this test sees RequestCount > 1. + var handler = new BlockingHandler(); + var config = MakeConfig(); + config.HttpHandler = handler; + + ImmutableAudience.Init(config); + ImmutableAudience.Track("event_to_send"); + ImmutableAudience.FlushQueueToDiskForTesting(); + + // First caller enters SendAsync and blocks on handler.Release. + var flush1 = Task.Run(() => ImmutableAudience.FlushAsync()); + Assert.IsTrue(handler.EnteredSendAsync.Wait(TimeSpan.FromSeconds(2)), + "first FlushAsync should reach the HTTP handler"); + + // Second caller starts while the first holds the gate — it must + // wait, not issue a second request. + var flush2 = Task.Run(() => ImmutableAudience.FlushAsync()); + + // Give the second caller a moment to try (and back off). + Thread.Sleep(200); + Assert.AreEqual(1, handler.RequestCount, + "second FlushAsync must not issue a second HTTP request while the first is in-flight"); + + handler.Release.Set(); + Assert.IsTrue(Task.WaitAll(new[] { flush1, flush2 }, TimeSpan.FromSeconds(10)), + "both FlushAsync calls should complete after release"); + } + private class BlockingHandler : HttpMessageHandler { public readonly ManualResetEventSlim EnteredSendAsync = new ManualResetEventSlim(false); From 20b250b91aef96c0960245bb04392d055c3e5357 Mon Sep 17 00:00:00 2001 From: ImmutableJeffrey Date: Thu, 23 Apr 2026 15:19:38 +1000 Subject: [PATCH 08/11] docs(audience-http): plain-language comment pass Rewrites jargon (2xx server acked, serialise, diagnostic must not block the success path) into plain language. No code changes. All 184 tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/Packages/Audience/Runtime/ImmutableAudience.cs | 8 ++++---- .../Audience/Runtime/Transport/HttpTransport.cs | 13 +++++++------ 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/src/Packages/Audience/Runtime/ImmutableAudience.cs b/src/Packages/Audience/Runtime/ImmutableAudience.cs index 8fc60fe8f..ca2f1c1cf 100644 --- a/src/Packages/Audience/Runtime/ImmutableAudience.cs +++ b/src/Packages/Audience/Runtime/ImmutableAudience.cs @@ -555,10 +555,10 @@ public static async Task FlushAsync(CancellationToken cancellationToken = defaul queue.FlushSync(); - // Serialise SendBatchAsync via _sendInFlight. Without the gate, - // two concurrent FlushAsync callers both call ReadBatch with the - // same paths and double-POST. Poll cheaply while another caller - // (timer SendBatch or a racing FlushAsync) holds the gate. + // Only one send runs at a time. Without this, two FlushAsync + // callers would both read the same batch from disk and send it + // twice. Yield while another caller (the timer or another + // FlushAsync) holds the in-flight slot. while (Interlocked.CompareExchange(ref _sendInFlight, 1, 0) != 0) { cancellationToken.ThrowIfCancellationRequested(); diff --git a/src/Packages/Audience/Runtime/Transport/HttpTransport.cs b/src/Packages/Audience/Runtime/Transport/HttpTransport.cs index 0134eee59..16dcf6a3a 100644 --- a/src/Packages/Audience/Runtime/Transport/HttpTransport.cs +++ b/src/Packages/Audience/Runtime/Transport/HttpTransport.cs @@ -99,9 +99,10 @@ internal async Task SendBatchAsync(CancellationToken ct = default) if (statusCode >= 200 && statusCode < 300) { - // 2xx: server acked. Parse {accepted, rejected} and surface - // any partial rejections via onError — rejected events are - // validation errors, won't succeed on retry. + // Server accepted the batch. Count how many messages it + // rejected; if any, tell the studio via onError. Rejected + // messages are validation failures — retrying won't help, + // so the batch is deleted either way. var rejected = await ParseRejectedCount(response).ConfigureAwait(false); _store.Delete(batch); ResetBackoff(); @@ -233,9 +234,9 @@ private void ResetBackoff() return sb.ToString(); } - // Reads the body and extracts "rejected". Returns 0 on any parse or - // read failure — the body is diagnostic, so a malformed one must not - // block the success path. + // Reads the response body and pulls out the "rejected" count. Returns + // 0 if the body is missing or unreadable — the body is only for + // reporting, so failing to read it must not break the success path. private static async Task ParseRejectedCount(HttpResponseMessage response) { string body; From c58e86121a94b3b9d0d3a05e34facf49cd2f9ed9 Mon Sep 17 00:00:00 2001 From: ImmutableJeffrey Date: Thu, 23 Apr 2026 17:46:00 +1000 Subject: [PATCH 09/11] fix(audience-http): propagate caller cancellation from SendBatchAsync MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses PR #701 review from @nattb8. The catch-filter branch for caller cancellation used to silently swallow the exception and let the method return `true` — its normal "batch sent, ask me again" signal. FlushAsync's send loop takes that return value at face value and immediately re-enters on the same cancelled token. HttpClient throws on entry (token still cancelled), the same branch swallows it, `true` is returned again. The batch is never deleted on this path, so ReadBatch keeps handing back the same events — a tight infinite loop. Rethrow instead. The caller's send loop exits via the exception; no behaviour change for HttpClient's internal timeout path (still filtered out by the `when (ct.IsCancellationRequested)` guard) so timeouts still trigger backoff. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../Audience/Runtime/Transport/HttpTransport.cs | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/src/Packages/Audience/Runtime/Transport/HttpTransport.cs b/src/Packages/Audience/Runtime/Transport/HttpTransport.cs index 16dcf6a3a..80c2dc9ed 100644 --- a/src/Packages/Audience/Runtime/Transport/HttpTransport.cs +++ b/src/Packages/Audience/Runtime/Transport/HttpTransport.cs @@ -131,10 +131,15 @@ internal async Task SendBatchAsync(CancellationToken ct = default) } catch (OperationCanceledException) when (ct.IsCancellationRequested) { - // Caller cancelled the token (e.g. on shutdown). Events stay on - // disk, no failure recorded. HttpClient timeouts throw the same - // exception but without ct.IsCancellationRequested set, so they - // fall through to the Exception branch below and trigger backoff. + // Caller cancelled the token (e.g. on shutdown). Events stay + // on disk, no failure recorded. Rethrow so the caller's send + // loop exits — swallowing here returns `true`, and the loop + // would re-enter on the same cancelled token and spin because + // the batch is still on disk. HttpClient timeouts throw the + // same exception but without ct.IsCancellationRequested set, + // so they fall through to the Exception branch below and + // trigger backoff. + throw; } catch (Exception ex) { From 632902c2febeb3c609b690d36824d97ad91bfa5a Mon Sep 17 00:00:00 2001 From: ImmutableJeffrey Date: Thu, 23 Apr 2026 17:46:09 +1000 Subject: [PATCH 10/11] test(audience): cancellation exits FlushAsync and SendBatchAsync cleanly Two regression guards for PR #701 review from @nattb8. SendBatchAsync_CallerCancelled_Throws: pre-cancel the token, confirm the method throws OperationCanceledException, confirm the batch stays on disk, confirm no backoff and no onError. Sabotage: re-add the empty catch body and this fails because SendBatchAsync returns true silently. FlushAsync_CancelledToken_Terminates_DoesNotHotLoop: pre-cancel the token, start FlushAsync, race against a 2s timeout. With the fix the task faults quickly; without it the task never completes. Also flips the handler to 200 and runs a follow-up FlushAsync to prove _sendInFlight was released (the finally block didn't get stranded). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../Tests/Runtime/ImmutableAudienceTests.cs | 57 +++++++++++++++++++ .../Runtime/Transport/HttpTransportTests.cs | 27 +++++++++ 2 files changed, 84 insertions(+) diff --git a/src/Packages/Audience/Tests/Runtime/ImmutableAudienceTests.cs b/src/Packages/Audience/Tests/Runtime/ImmutableAudienceTests.cs index e40e4f570..287f32d37 100644 --- a/src/Packages/Audience/Tests/Runtime/ImmutableAudienceTests.cs +++ b/src/Packages/Audience/Tests/Runtime/ImmutableAudienceTests.cs @@ -1178,6 +1178,63 @@ public void FlushAsync_ConcurrentCallers_OnlyOneReachesTransport() "both FlushAsync calls should complete after release"); } + [Test] + public async Task FlushAsync_CancelledToken_Terminates_DoesNotHotLoop() + { + // Regression for PR #701 review (@nattb8): if SendBatchAsync + // silently swallowed caller cancellation, the inner while-loop + // here would re-enter on the same cancelled token and spin + // because the batch is never deleted on that code path. The + // task below would never complete. After the fix, cancellation + // propagates and the task faults quickly. + var handler = new CancellingHandler(); + var config = MakeConfig(); + config.HttpHandler = handler; + + ImmutableAudience.Init(config); + ImmutableAudience.Track("event_to_send"); + ImmutableAudience.FlushQueueToDiskForTesting(); + + using var cts = new CancellationTokenSource(); + cts.Cancel(); + + var flush = ImmutableAudience.FlushAsync(cts.Token); + var finishedFirst = await Task.WhenAny(flush, Task.Delay(TimeSpan.FromSeconds(2))); + + Assert.AreSame(flush, finishedFirst, + "FlushAsync must terminate (not hot-loop) when the token is cancelled"); + Assert.IsTrue(flush.IsCanceled || flush.IsFaulted, + "FlushAsync must propagate the cancellation, not return normally"); + Assert.LessOrEqual(handler.CallCount, 1, + "a cancelled token must not drive repeated SendAsync attempts"); + + // Gate must be released by the finally block — a follow-up flush + // on an uncancelled token should proceed, proving _sendInFlight + // is not stranded at 1. + handler.AcceptNextAsSuccess = true; + var followUp = ImmutableAudience.FlushAsync(); + Assert.IsTrue(followUp.Wait(TimeSpan.FromSeconds(2)), + "_sendInFlight must be released after a cancelled flush"); + } + + private class CancellingHandler : HttpMessageHandler + { + public int CallCount; + public bool AcceptNextAsSuccess; + + protected override Task SendAsync(HttpRequestMessage request, CancellationToken ct) + { + Interlocked.Increment(ref CallCount); + ct.ThrowIfCancellationRequested(); + var status = AcceptNextAsSuccess ? HttpStatusCode.OK : HttpStatusCode.ServiceUnavailable; + var body = AcceptNextAsSuccess ? "{\"accepted\":1,\"rejected\":0}" : ""; + return Task.FromResult(new HttpResponseMessage(status) + { + Content = new StringContent(body) + }); + } + } + private class BlockingHandler : HttpMessageHandler { public readonly ManualResetEventSlim EnteredSendAsync = new ManualResetEventSlim(false); diff --git a/src/Packages/Audience/Tests/Runtime/Transport/HttpTransportTests.cs b/src/Packages/Audience/Tests/Runtime/Transport/HttpTransportTests.cs index 290028df5..c3e311faa 100644 --- a/src/Packages/Audience/Tests/Runtime/Transport/HttpTransportTests.cs +++ b/src/Packages/Audience/Tests/Runtime/Transport/HttpTransportTests.cs @@ -396,6 +396,33 @@ public async Task SendBatchAsync_HttpClientTimeout_TreatedAsNetworkError() Assert.AreEqual(AudienceErrorCode.NetworkError, reportedError.Code); } + [Test] + public void SendBatchAsync_CallerCancelled_Throws_DoesNotDeleteOrRecordFailure() + { + // Regression guard for PR #701 review: caller cancellation must + // propagate. If the `when (ct.IsCancellationRequested)` branch + // swallowed the exception, SendBatchAsync would return `true` + // with the batch still on disk, and a FlushAsync loop watching + // that return value would re-enter on the same cancelled token + // forever — nothing ever drains, nothing ever throws. + _store.Write("{\"type\":\"track\"}"); + + var handler = new MockHandler(() => throw new OperationCanceledException("simulated")); + AudienceError reportedError = null; + using var transport = new HttpTransport(_store, "pk_imapik-test-key1", + onError: e => reportedError = e, handler: handler); + + using var cts = new CancellationTokenSource(); + cts.Cancel(); + + Assert.ThrowsAsync( + async () => await transport.SendBatchAsync(cts.Token)); + + Assert.AreEqual(1, _store.Count(), "cancelled send must not delete the batch"); + Assert.IsFalse(transport.IsInBackoffWindow, "cancel is not a failure — no backoff engaged"); + Assert.IsNull(reportedError, "cancel is caller-initiated — no onError fires"); + } + [Test] public async Task IsInBackoffWindow_ClearsAfterNextAttemptAtElapses() { From b588bdf10ec43a569dda993dccc81697a67b6d07 Mon Sep 17 00:00:00 2001 From: ImmutableJeffrey Date: Thu, 23 Apr 2026 17:57:54 +1000 Subject: [PATCH 11/11] test(audience-http): accept HttpClient's TaskCanceledException wrap MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit SendBatchAsync_CallerCancelled_Throws was asserting the exact type `OperationCanceledException` via `Assert.ThrowsAsync`. HttpClient internally catches the OCE our mock throws and rethrows it as `TaskCanceledException` (a subclass), so `ThrowsAsync` — which is exact-type — missed. Switch to `CatchAsync`, which accepts the whole cancellation family. This is what we actually want to assert: "cancellation propagated", not "HttpClient happens to throw this exact subclass". Co-Authored-By: Claude Opus 4.7 (1M context) --- .../Audience/Tests/Runtime/Transport/HttpTransportTests.cs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/Packages/Audience/Tests/Runtime/Transport/HttpTransportTests.cs b/src/Packages/Audience/Tests/Runtime/Transport/HttpTransportTests.cs index c3e311faa..a848679cf 100644 --- a/src/Packages/Audience/Tests/Runtime/Transport/HttpTransportTests.cs +++ b/src/Packages/Audience/Tests/Runtime/Transport/HttpTransportTests.cs @@ -415,7 +415,10 @@ public void SendBatchAsync_CallerCancelled_Throws_DoesNotDeleteOrRecordFailure() using var cts = new CancellationTokenSource(); cts.Cancel(); - Assert.ThrowsAsync( + // CatchAsync (not ThrowsAsync) because HttpClient re-wraps our mock's + // OperationCanceledException as TaskCanceledException before rethrowing. + // We want to assert the cancellation *family*, not a specific subclass. + Assert.CatchAsync( async () => await transport.SendBatchAsync(cts.Token)); Assert.AreEqual(1, _store.Count(), "cancelled send must not delete the batch");