diff --git a/src/Packages/Audience/Runtime/ImmutableAudience.cs b/src/Packages/Audience/Runtime/ImmutableAudience.cs index 9eaa98503..ca2f1c1cf 100644 --- a/src/Packages/Audience/Runtime/ImmutableAudience.cs +++ b/src/Packages/Audience/Runtime/ImmutableAudience.cs @@ -543,8 +543,9 @@ private static void SyncConsentToBackend(AudienceConfig config, ConsentLevel lev // Flush / Shutdown // ----------------------------------------------------------------- - // Sends all pending events now. - public static async Task FlushAsync() + // Sends all pending events now. Respects cancellationToken for both + // the gate wait and the HTTP send. + public static async Task FlushAsync(CancellationToken cancellationToken = default) { if (!_initialized) return; @@ -554,9 +555,31 @@ public static async Task FlushAsync() queue.FlushSync(); - while (!transport.IsInBackoffWindow && - await transport.SendBatchAsync().ConfigureAwait(false)) + // Only one send runs at a time. Without this, two FlushAsync + // callers would both read the same batch from disk and send it + // twice. Yield while another caller (the timer or another + // FlushAsync) holds the in-flight slot. + while (Interlocked.CompareExchange(ref _sendInFlight, 1, 0) != 0) { + cancellationToken.ThrowIfCancellationRequested(); + await Task.Yield(); + } + + try + { + while (!transport.IsInBackoffWindow && + await transport.SendBatchAsync(cancellationToken).ConfigureAwait(false)) + { + } + } + catch (ObjectDisposedException) + { + // Concurrent Shutdown disposed the transport. Exit silently — + // caller is tearing down. + } + finally + { + Interlocked.Exchange(ref _sendInFlight, 0); } } diff --git a/src/Packages/Audience/Runtime/Transport/HttpTransport.cs b/src/Packages/Audience/Runtime/Transport/HttpTransport.cs index 67750ccb7..80c2dc9ed 100644 --- a/src/Packages/Audience/Runtime/Transport/HttpTransport.cs +++ b/src/Packages/Audience/Runtime/Transport/HttpTransport.cs @@ -22,6 +22,7 @@ internal sealed class HttpTransport : IDisposable private readonly Action? _onError; private readonly Func _getUtcNow; + private readonly object _backoffLock = new object(); private int _consecutiveFailures; private DateTime? _nextAttemptAt; @@ -40,7 +41,11 @@ internal HttpTransport( _publishableKey = publishableKey ?? throw new ArgumentNullException(nameof(publishableKey)); _url = Constants.MessagesUrl(publishableKey); _onError = onError; - _client = handler != null ? new HttpClient(handler) : new HttpClient(); + // disposeHandler: false so the consumer can reuse their handler + // across Init/Shutdown cycles (matches _controlClient's policy). + _client = handler != null + ? new HttpClient(handler, disposeHandler: false) + : new HttpClient(); _client.Timeout = TimeSpan.FromSeconds(30); _getUtcNow = getUtcNow ?? (() => DateTime.UtcNow); } @@ -94,9 +99,18 @@ internal async Task SendBatchAsync(CancellationToken ct = default) if (statusCode >= 200 && statusCode < 300) { - // 2xx: server acked, drop the batch, healthy state. + // Server accepted the batch. Count how many messages it + // rejected; if any, tell the studio via onError. Rejected + // messages are validation failures — retrying won't help, + // so the batch is deleted either way. + var rejected = await ParseRejectedCount(response).ConfigureAwait(false); _store.Delete(batch); ResetBackoff(); + if (rejected > 0) + { + NotifyError(AudienceErrorCode.ValidationRejected, + $"Batch partially rejected: {rejected} of {batch.Count} events dropped"); + } } else if (statusCode >= 400 && statusCode < 500) { @@ -117,10 +131,15 @@ internal async Task SendBatchAsync(CancellationToken ct = default) } catch (OperationCanceledException) when (ct.IsCancellationRequested) { - // Caller cancelled the token (e.g. on shutdown). Events stay on - // disk, no failure recorded. HttpClient timeouts throw the same - // exception but without ct.IsCancellationRequested set, so they - // fall through to the Exception branch below and trigger backoff. + // Caller cancelled the token (e.g. on shutdown). Events stay + // on disk, no failure recorded. Rethrow so the caller's send + // loop exits — swallowing here returns `true`, and the loop + // would re-enter on the same cancelled token and spin because + // the batch is still on disk. HttpClient timeouts throw the + // same exception but without ct.IsCancellationRequested set, + // so they fall through to the Exception branch below and + // trigger backoff. + throw; } catch (Exception ex) { @@ -131,7 +150,15 @@ internal async Task SendBatchAsync(CancellationToken ct = default) return true; } - internal int BackoffMs => _consecutiveFailures switch + internal int BackoffMs + { + get + { + lock (_backoffLock) return BackoffMsLocked(); + } + } + + private int BackoffMsLocked() => _consecutiveFailures switch { <= 0 => 0, 1 => 5_000, @@ -143,10 +170,16 @@ internal async Task SendBatchAsync(CancellationToken ct = default) // Earliest UTC time at which the next attempt may run. // Null when no backoff is active. - internal DateTime? NextAttemptAt => _nextAttemptAt; + internal DateTime? NextAttemptAt + { + get { lock (_backoffLock) return _nextAttemptAt; } + } // True while UtcNow < NextAttemptAt. Flips false as the clock advances. - internal bool IsInBackoffWindow => _getUtcNow() < _nextAttemptAt; + internal bool IsInBackoffWindow + { + get { lock (_backoffLock) return _getUtcNow() < _nextAttemptAt; } + } public void Dispose() { @@ -155,17 +188,22 @@ public void Dispose() private void RecordFailure() { - var now = _getUtcNow(); - if (now < _nextAttemptAt) return; // inside prior window — don't compound backoff - - _consecutiveFailures++; - _nextAttemptAt = now.AddMilliseconds(BackoffMs); + lock (_backoffLock) + { + var now = _getUtcNow(); + if (now < _nextAttemptAt) return; // inside prior window — don't compound backoff + _consecutiveFailures++; + _nextAttemptAt = now.AddMilliseconds(BackoffMsLocked()); + } } private void ResetBackoff() { - _consecutiveFailures = 0; - _nextAttemptAt = null; + lock (_backoffLock) + { + _consecutiveFailures = 0; + _nextAttemptAt = null; + } } // Reads each path and wraps the concatenated JSON bodies in @@ -201,6 +239,39 @@ private void ResetBackoff() return sb.ToString(); } + // Reads the response body and pulls out the "rejected" count. Returns + // 0 if the body is missing or unreadable — the body is only for + // reporting, so failing to read it must not break the success path. + private static async Task ParseRejectedCount(HttpResponseMessage response) + { + string body; + try + { + body = await response.Content.ReadAsStringAsync().ConfigureAwait(false); + } + catch + { + return 0; + } + if (string.IsNullOrEmpty(body)) return 0; + + try + { + var parsed = JsonReader.DeserializeObject(body); + if (!parsed.TryGetValue("rejected", out var raw)) return 0; + return raw switch + { + int i => i, + long l => (int)l, + _ => 0, + }; + } + catch (FormatException) + { + return 0; + } + } + private void NotifyError(AudienceErrorCode code, string message) { if (_onError == null) return; diff --git a/src/Packages/Audience/Tests/Runtime/ImmutableAudienceTests.cs b/src/Packages/Audience/Tests/Runtime/ImmutableAudienceTests.cs index e71f19c76..287f32d37 100644 --- a/src/Packages/Audience/Tests/Runtime/ImmutableAudienceTests.cs +++ b/src/Packages/Audience/Tests/Runtime/ImmutableAudienceTests.cs @@ -1145,6 +1145,96 @@ public void SendBatch_ConcurrentTicks_OnlyOneReachesTransport() "overlapping tick must not issue a second HTTP request"); } + [Test] + public void FlushAsync_ConcurrentCallers_OnlyOneReachesTransport() + { + // Two parallel FlushAsync calls must serialise on _sendInFlight so + // ReadBatch/POST pairs do not double-send. Sabotage: remove the + // gate in FlushAsync and this test sees RequestCount > 1. + var handler = new BlockingHandler(); + var config = MakeConfig(); + config.HttpHandler = handler; + + ImmutableAudience.Init(config); + ImmutableAudience.Track("event_to_send"); + ImmutableAudience.FlushQueueToDiskForTesting(); + + // First caller enters SendAsync and blocks on handler.Release. + var flush1 = Task.Run(() => ImmutableAudience.FlushAsync()); + Assert.IsTrue(handler.EnteredSendAsync.Wait(TimeSpan.FromSeconds(2)), + "first FlushAsync should reach the HTTP handler"); + + // Second caller starts while the first holds the gate — it must + // wait, not issue a second request. + var flush2 = Task.Run(() => ImmutableAudience.FlushAsync()); + + // Give the second caller a moment to try (and back off). + Thread.Sleep(200); + Assert.AreEqual(1, handler.RequestCount, + "second FlushAsync must not issue a second HTTP request while the first is in-flight"); + + handler.Release.Set(); + Assert.IsTrue(Task.WaitAll(new[] { flush1, flush2 }, TimeSpan.FromSeconds(10)), + "both FlushAsync calls should complete after release"); + } + + [Test] + public async Task FlushAsync_CancelledToken_Terminates_DoesNotHotLoop() + { + // Regression for PR #701 review (@nattb8): if SendBatchAsync + // silently swallowed caller cancellation, the inner while-loop + // here would re-enter on the same cancelled token and spin + // because the batch is never deleted on that code path. The + // task below would never complete. After the fix, cancellation + // propagates and the task faults quickly. + var handler = new CancellingHandler(); + var config = MakeConfig(); + config.HttpHandler = handler; + + ImmutableAudience.Init(config); + ImmutableAudience.Track("event_to_send"); + ImmutableAudience.FlushQueueToDiskForTesting(); + + using var cts = new CancellationTokenSource(); + cts.Cancel(); + + var flush = ImmutableAudience.FlushAsync(cts.Token); + var finishedFirst = await Task.WhenAny(flush, Task.Delay(TimeSpan.FromSeconds(2))); + + Assert.AreSame(flush, finishedFirst, + "FlushAsync must terminate (not hot-loop) when the token is cancelled"); + Assert.IsTrue(flush.IsCanceled || flush.IsFaulted, + "FlushAsync must propagate the cancellation, not return normally"); + Assert.LessOrEqual(handler.CallCount, 1, + "a cancelled token must not drive repeated SendAsync attempts"); + + // Gate must be released by the finally block — a follow-up flush + // on an uncancelled token should proceed, proving _sendInFlight + // is not stranded at 1. + handler.AcceptNextAsSuccess = true; + var followUp = ImmutableAudience.FlushAsync(); + Assert.IsTrue(followUp.Wait(TimeSpan.FromSeconds(2)), + "_sendInFlight must be released after a cancelled flush"); + } + + private class CancellingHandler : HttpMessageHandler + { + public int CallCount; + public bool AcceptNextAsSuccess; + + protected override Task SendAsync(HttpRequestMessage request, CancellationToken ct) + { + Interlocked.Increment(ref CallCount); + ct.ThrowIfCancellationRequested(); + var status = AcceptNextAsSuccess ? HttpStatusCode.OK : HttpStatusCode.ServiceUnavailable; + var body = AcceptNextAsSuccess ? "{\"accepted\":1,\"rejected\":0}" : ""; + return Task.FromResult(new HttpResponseMessage(status) + { + Content = new StringContent(body) + }); + } + } + private class BlockingHandler : HttpMessageHandler { public readonly ManualResetEventSlim EnteredSendAsync = new ManualResetEventSlim(false); diff --git a/src/Packages/Audience/Tests/Runtime/Transport/HttpTransportTests.cs b/src/Packages/Audience/Tests/Runtime/Transport/HttpTransportTests.cs index 30a29f165..a848679cf 100644 --- a/src/Packages/Audience/Tests/Runtime/Transport/HttpTransportTests.cs +++ b/src/Packages/Audience/Tests/Runtime/Transport/HttpTransportTests.cs @@ -182,6 +182,61 @@ public async Task SendBatchAsync_4xx_DeletesFilesAndResetsBackoff() Assert.AreEqual(AudienceErrorCode.ValidationRejected, reportedError.Code); } + [Test] + public async Task SendBatchAsync_200_WithRejected_DeletesFilesAndSurfacesValidationRejected() + { + // Per Unity Implementation Plan §4.6, a 200 with rejected>0 means + // per-message validation errors. The batch is deleted (retries + // would not help) and the count is surfaced via onError so + // studios can observe silently dropped events. + _store.Write("{\"type\":\"track\",\"eventName\":\"a\"}"); + _store.Write("{\"type\":\"track\",\"eventName\":\"b\"}"); + + var handler = new MockHandler(HttpStatusCode.OK, "{\"accepted\":1,\"rejected\":1}"); + AudienceError reportedError = null; + using var transport = new HttpTransport(_store, "pk_imapik-test-key1", + onError: e => reportedError = e, handler: handler); + + await transport.SendBatchAsync(); + + Assert.AreEqual(0, _store.Count(), "200 with rejected>0 should still delete the batch"); + Assert.IsNotNull(reportedError, "partial rejection must surface via onError"); + Assert.AreEqual(AudienceErrorCode.ValidationRejected, reportedError.Code); + StringAssert.Contains("1", reportedError.Message, "message should include the rejected count"); + } + + [Test] + public async Task SendBatchAsync_200_ZeroRejected_DoesNotFireOnError() + { + _store.Write("{\"type\":\"track\",\"eventName\":\"a\"}"); + + var handler = new MockHandler(HttpStatusCode.OK, "{\"accepted\":1,\"rejected\":0}"); + AudienceError reportedError = null; + using var transport = new HttpTransport(_store, "pk_imapik-test-key1", + onError: e => reportedError = e, handler: handler); + + await transport.SendBatchAsync(); + + Assert.IsNull(reportedError, "zero rejected must not fire onError"); + } + + [Test] + public async Task SendBatchAsync_200_MalformedBody_TreatsAsZeroRejected() + { + // Malformed diagnostic body must not block the success path. + _store.Write("{\"type\":\"track\",\"eventName\":\"a\"}"); + + var handler = new MockHandler(HttpStatusCode.OK, "not-json"); + AudienceError reportedError = null; + using var transport = new HttpTransport(_store, "pk_imapik-test-key1", + onError: e => reportedError = e, handler: handler); + + await transport.SendBatchAsync(); + + Assert.AreEqual(0, _store.Count(), "files should still be deleted on 200"); + Assert.IsNull(reportedError, "malformed body must not surface an error"); + } + [Test] public async Task SendBatchAsync_5xx_KeepsFilesAndIncreasesBackoff() { @@ -341,6 +396,36 @@ public async Task SendBatchAsync_HttpClientTimeout_TreatedAsNetworkError() Assert.AreEqual(AudienceErrorCode.NetworkError, reportedError.Code); } + [Test] + public void SendBatchAsync_CallerCancelled_Throws_DoesNotDeleteOrRecordFailure() + { + // Regression guard for PR #701 review: caller cancellation must + // propagate. If the `when (ct.IsCancellationRequested)` branch + // swallowed the exception, SendBatchAsync would return `true` + // with the batch still on disk, and a FlushAsync loop watching + // that return value would re-enter on the same cancelled token + // forever — nothing ever drains, nothing ever throws. + _store.Write("{\"type\":\"track\"}"); + + var handler = new MockHandler(() => throw new OperationCanceledException("simulated")); + AudienceError reportedError = null; + using var transport = new HttpTransport(_store, "pk_imapik-test-key1", + onError: e => reportedError = e, handler: handler); + + using var cts = new CancellationTokenSource(); + cts.Cancel(); + + // CatchAsync (not ThrowsAsync) because HttpClient re-wraps our mock's + // OperationCanceledException as TaskCanceledException before rethrowing. + // We want to assert the cancellation *family*, not a specific subclass. + Assert.CatchAsync( + async () => await transport.SendBatchAsync(cts.Token)); + + Assert.AreEqual(1, _store.Count(), "cancelled send must not delete the batch"); + Assert.IsFalse(transport.IsInBackoffWindow, "cancel is not a failure — no backoff engaged"); + Assert.IsNull(reportedError, "cancel is caller-initiated — no onError fires"); + } + [Test] public async Task IsInBackoffWindow_ClearsAfterNextAttemptAtElapses() {