From eefa6729098a4c4ea627b60cf6f222b8bf9ef75e Mon Sep 17 00:00:00 2001 From: ImmutableJeffrey Date: Fri, 17 Apr 2026 01:05:40 +1000 Subject: [PATCH 1/7] feat(audience): add Gzip utility and HttpTransport with retry MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Gzip: compresses batch payloads using GZipStream (System.IO.Compression, available in Unity 2021+ .NET Standard 2.1). Pure C#, all platforms. HttpTransport: reads batches from DiskStore, wraps in {"batch":[...]}, gzips, POSTs to /v1/audience/messages with x-immutable-publishable-key header. Derives sandbox vs production URL from key prefix. Retry policy: - 200: delete batch from disk - 4xx: delete batch (validation error, won't succeed on retry) - 5xx: keep on disk, exponential backoff (5s → 10s → 20s → 40s → 60s cap) - Network error: same as 5xx - Backoff resets after success Testable via injected HttpMessageHandler — 14 tests, no network calls. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../Runtime/Transport/HttpTransport.cs | 185 +++++++++++ src/Packages/Audience/Runtime/Utility/Gzip.cs | 26 ++ .../Audience/Tests/Runtime/GzipTests.cs | 59 ++++ .../Tests/Runtime/HttpTransportTests.cs | 305 ++++++++++++++++++ 4 files changed, 575 insertions(+) create mode 100644 src/Packages/Audience/Runtime/Transport/HttpTransport.cs create mode 100644 src/Packages/Audience/Runtime/Utility/Gzip.cs create mode 100644 src/Packages/Audience/Tests/Runtime/GzipTests.cs create mode 100644 src/Packages/Audience/Tests/Runtime/HttpTransportTests.cs diff --git a/src/Packages/Audience/Runtime/Transport/HttpTransport.cs b/src/Packages/Audience/Runtime/Transport/HttpTransport.cs new file mode 100644 index 000000000..137abd3ed --- /dev/null +++ b/src/Packages/Audience/Runtime/Transport/HttpTransport.cs @@ -0,0 +1,185 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Net; +using System.Net.Http; +using System.Net.Http.Headers; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Immutable.Audience +{ + /// + /// Reads event batches from , gzip-compresses them, + /// and POSTs to /v1/audience/messages. Runs entirely on background + /// threads via — no main thread involvement. + /// + /// Retry policy: 5xx and network errors keep events on disk with + /// exponential backoff (5s → 10s → 20s → 40s → 60s cap). 4xx and 200 + /// with rejected events are dropped — they won't succeed on retry. + /// + internal sealed class HttpTransport : IDisposable + { + private readonly DiskStore _store; + private readonly string _url; + private readonly string _publishableKey; + private readonly HttpClient _client; + private readonly Action _onError; + + private int _consecutiveFailures; + + /// Disk store to read batches from. + /// Sent as x-immutable-publishable-key header. + /// Optional error callback. Never throws to the caller. + /// Optional HttpMessageHandler for testing. + internal HttpTransport( + DiskStore store, + string publishableKey, + Action onError = null, + HttpMessageHandler handler = null) + { + _store = store ?? throw new ArgumentNullException(nameof(store)); + _publishableKey = publishableKey ?? throw new ArgumentNullException(nameof(publishableKey)); + _url = Constants.BaseUrl(publishableKey) + Constants.MessagesPath; + _onError = onError; + _client = handler != null ? new HttpClient(handler) : new HttpClient(); + _client.Timeout = TimeSpan.FromSeconds(30); + } + + /// + /// Reads one batch from disk and sends it to the backend. + /// Returns true if a batch was sent (regardless of outcome), false if the queue was empty. + /// + internal async Task SendBatchAsync(CancellationToken ct = default) + { + var batch = _store.ReadBatch(Constants.DefaultFlushSize); + if (batch.Count == 0) + return false; + + var payload = BuildPayload(batch); + if (payload == null) + { + // All files were unreadable — delete them and move on. + _store.Delete(batch); + return true; + } + + var compressed = Gzip.Compress(payload); + + try + { + using var request = new HttpRequestMessage(HttpMethod.Post, _url); + request.Headers.Add("x-immutable-publishable-key", _publishableKey); + request.Content = new ByteArrayContent(compressed); + request.Content.Headers.ContentType = new MediaTypeHeaderValue("application/json"); + request.Content.Headers.Add("Content-Encoding", "gzip"); + + using var response = await _client.SendAsync(request, ct).ConfigureAwait(false); + + var statusCode = (int)response.StatusCode; + + if (statusCode >= 200 && statusCode < 300) + { + // 200 — events accepted. Any rejected ones had validation errors + // and won't succeed on retry, so delete the whole batch. + _store.Delete(batch); + _consecutiveFailures = 0; + } + else if (statusCode >= 400 && statusCode < 500) + { + // 4xx — malformed request, won't succeed on retry. + _store.Delete(batch); + _consecutiveFailures = 0; + NotifyError(AudienceErrorCode.ValidationRejected, + $"Batch rejected with {statusCode}"); + } + else + { + // 5xx — transient, keep on disk for retry. + _consecutiveFailures++; + NotifyError(AudienceErrorCode.FlushFailed, $"Server error {statusCode}, will retry"); + } + } + catch (OperationCanceledException) when (ct.IsCancellationRequested) + { + // Shutdown requested via cancellation token — don't increment failures, + // events stay on disk. HttpClient timeouts throw TaskCanceledException too; + // the `when` guard ensures those fall through to the general Exception + // handler so backoff and the NetworkError callback fire correctly. + } + catch (Exception ex) + { + _consecutiveFailures++; + NotifyError(AudienceErrorCode.NetworkError, ex.Message); + } + + return true; + } + + /// + /// Backoff delay in milliseconds based on consecutive failures. + /// 0 → 5s → 10s → 20s → 40s → 60s cap. + /// + internal int BackoffMs + { + get + { + if (_consecutiveFailures <= 0) return 0; + return Math.Min(5000 * (1 << (_consecutiveFailures - 1)), 60000); + } + } + + /// Whether the transport is currently backing off after failures. + internal bool IsBackingOff => _consecutiveFailures > 0; + + public void Dispose() + { + _client.Dispose(); + } + + /// + /// Reads file contents for each path and builds the batch JSON payload: + /// {"batch":[msg1,msg2,...]} + /// Returns null if no files could be read. + /// + private static string BuildPayload(IReadOnlyList paths) + { + var sb = new StringBuilder("{\"batch\":["); + var count = 0; + + for (var i = 0; i < paths.Count; i++) + { + try + { + var json = File.ReadAllText(paths[i]); + if (count > 0) sb.Append(','); + sb.Append(json); + count++; + } + catch (Exception) + { + // File disappeared between ReadBatch and now — skip it. + } + } + + if (count == 0) return null; + + sb.Append("]}"); + return sb.ToString(); + } + + private void NotifyError(AudienceErrorCode code, string message) + { + if (_onError == null) return; + try + { + _onError(new AudienceError(code, message)); + } + catch + { + // Error callback itself threw — swallow to protect the SDK. + } + } + } +} \ No newline at end of file diff --git a/src/Packages/Audience/Runtime/Utility/Gzip.cs b/src/Packages/Audience/Runtime/Utility/Gzip.cs new file mode 100644 index 000000000..c54d4cd33 --- /dev/null +++ b/src/Packages/Audience/Runtime/Utility/Gzip.cs @@ -0,0 +1,26 @@ +using System.IO; +using System.IO.Compression; +using System.Text; + +namespace Immutable.Audience +{ + /// + /// Gzip compression using from System.IO.Compression. + /// Available in Unity 2021+ (.NET Standard 2.1). Pure C#, works on all desktop platforms. + /// + internal static class Gzip + { + internal static byte[] Compress(string text) + { + var raw = Encoding.UTF8.GetBytes(text); + + using var output = new MemoryStream(); + using (var gzip = new GZipStream(output, CompressionLevel.Fastest)) + { + gzip.Write(raw, 0, raw.Length); + } + + return output.ToArray(); + } + } +} \ No newline at end of file diff --git a/src/Packages/Audience/Tests/Runtime/GzipTests.cs b/src/Packages/Audience/Tests/Runtime/GzipTests.cs new file mode 100644 index 000000000..8d509be44 --- /dev/null +++ b/src/Packages/Audience/Tests/Runtime/GzipTests.cs @@ -0,0 +1,59 @@ +using System.IO; +using System.IO.Compression; +using System.Text; +using NUnit.Framework; + +namespace Immutable.Audience.Tests +{ + [TestFixture] + internal class GzipTests + { + [Test] + public void Compress_ProducesValidGzip_ThatDecompressesToOriginal() + { + const string original = "{\"type\":\"track\",\"eventName\":\"test\"}"; + + var compressed = Gzip.Compress(original); + + // Decompress and verify round-trip + using var input = new MemoryStream(compressed); + using var gzip = new GZipStream(input, CompressionMode.Decompress); + using var reader = new StreamReader(gzip, Encoding.UTF8); + var decompressed = reader.ReadToEnd(); + + Assert.AreEqual(original, decompressed); + } + + [Test] + public void Compress_OutputIsSmallerThanInput_ForRealisticPayload() + { + // Repeated field names compress well in JSON batches. + var sb = new StringBuilder("{\"batch\":["); + for (var i = 0; i < 20; i++) + { + if (i > 0) sb.Append(','); + sb.Append($"{{\"type\":\"track\",\"eventName\":\"level_complete\",\"anonymousId\":\"anon-{i}\"}}"); + } + + sb.Append("]}"); + var payload = sb.ToString(); + + var compressed = Gzip.Compress(payload); + + Assert.Less(compressed.Length, Encoding.UTF8.GetByteCount(payload), "gzip should compress a batch of similar JSON events"); + } + + [Test] + public void Compress_EmptyString_ProducesValidGzip() + { + var compressed = Gzip.Compress(""); + + using var input = new MemoryStream(compressed); + using var gzip = new GZipStream(input, CompressionMode.Decompress); + using var reader = new StreamReader(gzip, Encoding.UTF8); + var decompressed = reader.ReadToEnd(); + + Assert.AreEqual("", decompressed); + } + } +} \ No newline at end of file diff --git a/src/Packages/Audience/Tests/Runtime/HttpTransportTests.cs b/src/Packages/Audience/Tests/Runtime/HttpTransportTests.cs new file mode 100644 index 000000000..7f9f3040e --- /dev/null +++ b/src/Packages/Audience/Tests/Runtime/HttpTransportTests.cs @@ -0,0 +1,305 @@ +using System; +using System.IO; +using System.IO.Compression; +using System.Net; +using System.Net.Http; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using NUnit.Framework; + +namespace Immutable.Audience.Tests +{ + [TestFixture] + internal class HttpTransportTests + { + private string _testDir; + private DiskStore _store; + + [SetUp] + public void SetUp() + { + _testDir = Path.Combine(Path.GetTempPath(), Path.GetRandomFileName()); + Directory.CreateDirectory(_testDir); + _store = new DiskStore(_testDir); + } + + [TearDown] + public void TearDown() + { + if (Directory.Exists(_testDir)) + Directory.Delete(_testDir, recursive: true); + } + + [Test] + public async Task SendBatchAsync_200_DeletesFilesFromDisk() + { + _store.Write("{\"type\":\"track\",\"eventName\":\"a\"}"); + _store.Write("{\"type\":\"track\",\"eventName\":\"b\"}"); + + var handler = new MockHandler(HttpStatusCode.OK, "{\"accepted\":2,\"rejected\":0}"); + using var transport = new HttpTransport(_store, "pk_imapik-test-key1", handler: handler); + + var sent = await transport.SendBatchAsync(); + + Assert.IsTrue(sent); + Assert.AreEqual(0, _store.Count(), "files should be deleted after 200"); + } + + [Test] + public async Task SendBatchAsync_200_SendsGzippedPayloadWithCorrectHeaders() + { + _store.Write("{\"type\":\"track\",\"eventName\":\"test\"}"); + + byte[] capturedBody = null; + string capturedKey = null; + string capturedContentType = null; + // Read body inside the callback — the request content is disposed after SendAsync returns. + var handler = new MockHandler(HttpStatusCode.OK, "{\"accepted\":1,\"rejected\":0}", + onRequest: req => + { + capturedKey = string.Join("", req.Headers.GetValues("x-immutable-publishable-key")); + capturedContentType = req.Content.Headers.ContentType.MediaType; + capturedBody = req.Content.ReadAsByteArrayAsync().Result; + }); + using var transport = new HttpTransport(_store, "pk_imapik-test-key1", handler: handler); + + await transport.SendBatchAsync(); + + Assert.AreEqual("pk_imapik-test-key1", capturedKey); + Assert.AreEqual("application/json", capturedContentType); + + var decompressed = DecompressGzip(capturedBody); + StringAssert.StartsWith("{\"batch\":[", decompressed); + StringAssert.EndsWith("]}", decompressed); + StringAssert.Contains("\"eventName\":\"test\"", decompressed); + } + + [Test] + public async Task SendBatchAsync_200_UsesCorrectUrlForTestKey() + { + _store.Write("{\"type\":\"track\"}"); + + HttpRequestMessage captured = null; + var handler = new MockHandler(HttpStatusCode.OK, "{\"accepted\":1,\"rejected\":0}", + onRequest: req => captured = req); + using var transport = new HttpTransport(_store, "pk_imapik-test-key1", handler: handler); + + await transport.SendBatchAsync(); + + StringAssert.StartsWith(Constants.SandboxBaseUrl, captured.RequestUri.ToString()); + } + + [Test] + public async Task SendBatchAsync_200_UsesCorrectUrlForProdKey() + { + _store.Write("{\"type\":\"track\"}"); + + HttpRequestMessage captured = null; + var handler = new MockHandler(HttpStatusCode.OK, "{\"accepted\":1,\"rejected\":0}", + onRequest: req => captured = req); + using var transport = new HttpTransport(_store, "pk_imapik-prodkey", handler: handler); + + await transport.SendBatchAsync(); + + StringAssert.StartsWith(Constants.ProductionBaseUrl, captured.RequestUri.ToString()); + } + + [Test] + public async Task SendBatchAsync_EmptyQueue_ReturnsFalse() + { + var handler = new MockHandler(HttpStatusCode.OK, "{}"); + using var transport = new HttpTransport(_store, "pk_imapik-test-key1", handler: handler); + + var sent = await transport.SendBatchAsync(); + + Assert.IsFalse(sent); + Assert.AreEqual(0, handler.CallCount, "should not make HTTP call when queue is empty"); + } + + [Test] + public async Task SendBatchAsync_4xx_DeletesFilesAndResetsBackoff() + { + _store.Write("{\"type\":\"track\"}"); + + var handler = new MockHandler(HttpStatusCode.BadRequest, ""); + AudienceError reportedError = null; + using var transport = new HttpTransport(_store, "pk_imapik-test-key1", + onError: e => reportedError = e, handler: handler); + + await transport.SendBatchAsync(); + + Assert.AreEqual(0, _store.Count(), "4xx should delete files — won't succeed on retry"); + Assert.IsFalse(transport.IsBackingOff); + Assert.IsNotNull(reportedError); + Assert.AreEqual(AudienceErrorCode.ValidationRejected, reportedError.Code); + } + + [Test] + public async Task SendBatchAsync_5xx_KeepsFilesAndIncreasesBackoff() + { + _store.Write("{\"type\":\"track\"}"); + + var handler = new MockHandler(HttpStatusCode.InternalServerError, ""); + AudienceError reportedError = null; + using var transport = new HttpTransport(_store, "pk_imapik-test-key1", + onError: e => reportedError = e, handler: handler); + + await transport.SendBatchAsync(); + + Assert.AreEqual(1, _store.Count(), "5xx should keep files for retry"); + Assert.IsTrue(transport.IsBackingOff); + Assert.AreEqual(5000, transport.BackoffMs, "first failure = 5s backoff"); + Assert.IsNotNull(reportedError); + Assert.AreEqual(AudienceErrorCode.FlushFailed, reportedError.Code); + } + + [Test] + public async Task BackoffMs_ExponentialWithCap() + { + _store.Write("{\"type\":\"track\"}"); + var handler = new MockHandler(HttpStatusCode.InternalServerError, ""); + using var transport = new HttpTransport(_store, "pk_imapik-test-key1", handler: handler); + + // Each SendBatch re-reads the same file (5xx doesn't delete it) and increments backoff. + await transport.SendBatchAsync(); + Assert.AreEqual(5000, transport.BackoffMs); + await transport.SendBatchAsync(); + Assert.AreEqual(10000, transport.BackoffMs); + await transport.SendBatchAsync(); + Assert.AreEqual(20000, transport.BackoffMs); + await transport.SendBatchAsync(); + Assert.AreEqual(40000, transport.BackoffMs); + await transport.SendBatchAsync(); + Assert.AreEqual(60000, transport.BackoffMs, "capped at 60s"); + await transport.SendBatchAsync(); + Assert.AreEqual(60000, transport.BackoffMs, "stays at cap"); + } + + [Test] + public async Task BackoffMs_ResetsAfterSuccess() + { + _store.Write("{\"type\":\"track\"}"); + + var callCount = 0; + var handler = new MockHandler(() => + { + callCount++; + // Fail twice, then succeed. + return callCount <= 2 + ? new HttpResponseMessage(HttpStatusCode.InternalServerError) + : new HttpResponseMessage(HttpStatusCode.OK) + { Content = new StringContent("{\"accepted\":1,\"rejected\":0}") }; + }); + using var transport = new HttpTransport(_store, "pk_imapik-test-key1", handler: handler); + + await transport.SendBatchAsync(); + Assert.AreEqual(5000, transport.BackoffMs); + + await transport.SendBatchAsync(); + Assert.AreEqual(10000, transport.BackoffMs); + + await transport.SendBatchAsync(); + Assert.AreEqual(0, transport.BackoffMs, "backoff resets after success"); + Assert.IsFalse(transport.IsBackingOff); + } + + [Test] + public async Task SendBatchAsync_NetworkError_KeepsFilesAndBacksOff() + { + _store.Write("{\"type\":\"track\"}"); + + var handler = new MockHandler(() => throw new HttpRequestException("connection refused")); + AudienceError reportedError = null; + using var transport = new HttpTransport(_store, "pk_imapik-test-key1", + onError: e => reportedError = e, handler: handler); + + await transport.SendBatchAsync(); + + Assert.AreEqual(1, _store.Count(), "network error should keep files for retry"); + Assert.IsTrue(transport.IsBackingOff); + Assert.IsNotNull(reportedError); + Assert.AreEqual(AudienceErrorCode.NetworkError, reportedError.Code); + } + + [Test] + public async Task SendBatchAsync_HttpClientTimeout_TreatedAsNetworkError() + { + // Regression guard: HttpClient.Timeout throws TaskCanceledException, which + // derives from OperationCanceledException. Without a `when (ct.IsCancellationRequested)` + // guard, timeouts would be silently swallowed as "shutdown" — no backoff, no error + // callback, next cycle hot-loops. This test ensures timeouts flow through the + // NetworkError path. + _store.Write("{\"type\":\"track\"}"); + + var handler = new MockHandler(() => throw new TaskCanceledException("Request timed out")); + AudienceError reportedError = null; + using var transport = new HttpTransport(_store, "pk_imapik-test-key1", + onError: e => reportedError = e, handler: handler); + + // Pass default CancellationToken so ct.IsCancellationRequested is false — this + // simulates a real HttpClient timeout (not a caller-initiated cancellation). + await transport.SendBatchAsync(); + + Assert.AreEqual(1, _store.Count(), "timeout should keep files for retry"); + Assert.IsTrue(transport.IsBackingOff, "timeout must increment failures and engage backoff"); + Assert.IsNotNull(reportedError, "NetworkError callback must fire on timeout"); + Assert.AreEqual(AudienceErrorCode.NetworkError, reportedError.Code); + } + + [Test] + public async Task SendBatchAsync_ErrorCallbackThrows_DoesNotCrash() + { + _store.Write("{\"type\":\"track\"}"); + + var handler = new MockHandler(HttpStatusCode.BadRequest, ""); + using var transport = new HttpTransport(_store, "pk_imapik-test-key1", + onError: _ => throw new InvalidOperationException("callback bug"), + handler: handler); + + Assert.DoesNotThrowAsync(() => transport.SendBatchAsync()); + } + + private static string DecompressGzip(byte[] data) + { + using var input = new MemoryStream(data); + using var gzip = new GZipStream(input, CompressionMode.Decompress); + using var reader = new StreamReader(gzip, Encoding.UTF8); + return reader.ReadToEnd(); + } + + /// + /// Minimal HttpMessageHandler that returns a canned response. + /// Optionally captures the request for inspection. + /// + private class MockHandler : HttpMessageHandler + { + private readonly Func _factory; + private readonly Action _onRequest; + + internal int CallCount { get; private set; } + + internal MockHandler(HttpStatusCode status, string body, Action onRequest = null) + { + _factory = () => new HttpResponseMessage(status) + { + Content = new StringContent(body) + }; + _onRequest = onRequest; + } + + internal MockHandler(Func factory, Action onRequest = null) + { + _factory = factory; + _onRequest = onRequest; + } + + protected override Task SendAsync(HttpRequestMessage request, CancellationToken ct) + { + CallCount++; + _onRequest?.Invoke(request); + return Task.FromResult(_factory()); + } + } + } +} \ No newline at end of file From 0e6c1215f7512ebe587fca41d765a88c6e326986 Mon Sep 17 00:00:00 2001 From: ImmutableJeffrey Date: Fri, 17 Apr 2026 02:05:37 +1000 Subject: [PATCH 2/7] fix(audience): align HttpTransport backoff to plan schedule MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Plan specifies: 5s → 10s → 20s → 60s cap. Implementation had: 5s → 10s → 20s → 40s → 60s cap. Replace the bitshift formula with an explicit switch expression so the schedule is readable and matches the plan exactly. Update the test to verify the jump from 20s directly to 60s. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../Audience/Runtime/Transport/HttpTransport.cs | 16 ++++++++-------- .../Audience/Tests/Runtime/HttpTransportTests.cs | 5 ++--- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/src/Packages/Audience/Runtime/Transport/HttpTransport.cs b/src/Packages/Audience/Runtime/Transport/HttpTransport.cs index 137abd3ed..ebebb7012 100644 --- a/src/Packages/Audience/Runtime/Transport/HttpTransport.cs +++ b/src/Packages/Audience/Runtime/Transport/HttpTransport.cs @@ -119,16 +119,16 @@ internal async Task SendBatchAsync(CancellationToken ct = default) /// /// Backoff delay in milliseconds based on consecutive failures. - /// 0 → 5s → 10s → 20s → 40s → 60s cap. + /// Schedule per plan: 0 → 5s → 10s → 20s → 60s cap. /// - internal int BackoffMs + internal int BackoffMs => _consecutiveFailures switch { - get - { - if (_consecutiveFailures <= 0) return 0; - return Math.Min(5000 * (1 << (_consecutiveFailures - 1)), 60000); - } - } + <= 0 => 0, + 1 => 5000, + 2 => 10000, + 3 => 20000, + _ => 60000, + }; /// Whether the transport is currently backing off after failures. internal bool IsBackingOff => _consecutiveFailures > 0; diff --git a/src/Packages/Audience/Tests/Runtime/HttpTransportTests.cs b/src/Packages/Audience/Tests/Runtime/HttpTransportTests.cs index 7f9f3040e..c66ad4f66 100644 --- a/src/Packages/Audience/Tests/Runtime/HttpTransportTests.cs +++ b/src/Packages/Audience/Tests/Runtime/HttpTransportTests.cs @@ -162,6 +162,7 @@ public async Task BackoffMs_ExponentialWithCap() using var transport = new HttpTransport(_store, "pk_imapik-test-key1", handler: handler); // Each SendBatch re-reads the same file (5xx doesn't delete it) and increments backoff. + // Schedule per plan: 5s → 10s → 20s → 60s cap (no 40s step). await transport.SendBatchAsync(); Assert.AreEqual(5000, transport.BackoffMs); await transport.SendBatchAsync(); @@ -169,9 +170,7 @@ public async Task BackoffMs_ExponentialWithCap() await transport.SendBatchAsync(); Assert.AreEqual(20000, transport.BackoffMs); await transport.SendBatchAsync(); - Assert.AreEqual(40000, transport.BackoffMs); - await transport.SendBatchAsync(); - Assert.AreEqual(60000, transport.BackoffMs, "capped at 60s"); + Assert.AreEqual(60000, transport.BackoffMs, "jumps to 60s cap after 20s"); await transport.SendBatchAsync(); Assert.AreEqual(60000, transport.BackoffMs, "stays at cap"); } From 52381a49a820639bc6ddec03c0b02d0acd94a671 Mon Sep 17 00:00:00 2001 From: ImmutableJeffrey Date: Fri, 17 Apr 2026 02:11:35 +1000 Subject: [PATCH 3/7] style(audience): use digit separators in backoff literals Match the pattern used elsewhere in the codebase (Session.cs uses 60_000 for heartbeat interval). Co-Authored-By: Claude Opus 4.6 (1M context) --- src/Packages/Audience/Runtime/Transport/HttpTransport.cs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/Packages/Audience/Runtime/Transport/HttpTransport.cs b/src/Packages/Audience/Runtime/Transport/HttpTransport.cs index ebebb7012..4f888506c 100644 --- a/src/Packages/Audience/Runtime/Transport/HttpTransport.cs +++ b/src/Packages/Audience/Runtime/Transport/HttpTransport.cs @@ -124,10 +124,10 @@ internal async Task SendBatchAsync(CancellationToken ct = default) internal int BackoffMs => _consecutiveFailures switch { <= 0 => 0, - 1 => 5000, - 2 => 10000, - 3 => 20000, - _ => 60000, + 1 => 5_000, + 2 => 10_000, + 3 => 20_000, + _ => 60_000, }; /// Whether the transport is currently backing off after failures. From c9d301c5d0fd4a468c6047793491e0e8de56edda Mon Sep 17 00:00:00 2001 From: ImmutableJeffrey Date: Fri, 17 Apr 2026 14:28:54 +1000 Subject: [PATCH 4/7] refactor(audience): time-aware backoff in HttpTransport MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the binary IsBackingOff flag with a time-aware window derived from NextAttemptAt. IsBackingOff returned true as long as any failure had occurred, which would cause a caller that skips-when-backing-off to skip forever after the first failure. Changes: - Add NextAttemptAt (DateTime?) — set on failure to now + BackoffMs, cleared on success. Null when there is no active backoff. - Replace IsBackingOff with IsInBackoffWindow, which compares UtcNow (via injectable clock) against NextAttemptAt. Naturally expires. - Inject clock via constructor for deterministic timing tests — falls back to DateTime.UtcNow when omitted. - Factor failure/success transitions into RecordFailure/ResetBackoff helpers so the two state fields stay consistent. - Rename IsBackingOff to IsInBackoffWindow in tests; add a timing test that advances the injected clock to verify the window closes at NextAttemptAt. Not fixup into the HttpTransport commit — kept separate so the scope of this design change is reviewable on its own and revertable if the simpler binary flag is preferred. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../Runtime/Transport/HttpTransport.cs | 61 ++++++++-- .../Tests/Runtime/HttpTransportTests.cs | 111 +++++++++++++++--- 2 files changed, 143 insertions(+), 29 deletions(-) diff --git a/src/Packages/Audience/Runtime/Transport/HttpTransport.cs b/src/Packages/Audience/Runtime/Transport/HttpTransport.cs index 4f888506c..c2fce519d 100644 --- a/src/Packages/Audience/Runtime/Transport/HttpTransport.cs +++ b/src/Packages/Audience/Runtime/Transport/HttpTransport.cs @@ -16,8 +16,10 @@ namespace Immutable.Audience /// threads via — no main thread involvement. /// /// Retry policy: 5xx and network errors keep events on disk with - /// exponential backoff (5s → 10s → 20s → 40s → 60s cap). 4xx and 200 - /// with rejected events are dropped — they won't succeed on retry. + /// exponential backoff (5s → 10s → 20s → 60s cap). Escalation only + /// occurs after the previous backoff window has elapsed — premature + /// retries don't grow the backoff further. 4xx and 200 with rejected + /// events are dropped — they won't succeed on retry. /// internal sealed class HttpTransport : IDisposable { @@ -26,18 +28,22 @@ internal sealed class HttpTransport : IDisposable private readonly string _publishableKey; private readonly HttpClient _client; private readonly Action _onError; + private readonly Func _getUtcNow; private int _consecutiveFailures; + private DateTime? _nextAttemptAt; /// Disk store to read batches from. /// Sent as x-immutable-publishable-key header. /// Optional error callback. Never throws to the caller. /// Optional HttpMessageHandler for testing. + /// Optional UTC clock for deterministic testing of backoff timing. internal HttpTransport( DiskStore store, string publishableKey, Action onError = null, - HttpMessageHandler handler = null) + HttpMessageHandler handler = null, + Func getUtcNow = null) { _store = store ?? throw new ArgumentNullException(nameof(store)); _publishableKey = publishableKey ?? throw new ArgumentNullException(nameof(publishableKey)); @@ -45,6 +51,7 @@ internal HttpTransport( _onError = onError; _client = handler != null ? new HttpClient(handler) : new HttpClient(); _client.Timeout = TimeSpan.FromSeconds(30); + _getUtcNow = getUtcNow ?? (() => DateTime.UtcNow); } /// @@ -81,23 +88,19 @@ internal async Task SendBatchAsync(CancellationToken ct = default) if (statusCode >= 200 && statusCode < 300) { - // 200 — events accepted. Any rejected ones had validation errors - // and won't succeed on retry, so delete the whole batch. _store.Delete(batch); - _consecutiveFailures = 0; + ResetBackoff(); } else if (statusCode >= 400 && statusCode < 500) { - // 4xx — malformed request, won't succeed on retry. _store.Delete(batch); - _consecutiveFailures = 0; + ResetBackoff(); NotifyError(AudienceErrorCode.ValidationRejected, $"Batch rejected with {statusCode}"); } else { - // 5xx — transient, keep on disk for retry. - _consecutiveFailures++; + RecordFailure(); NotifyError(AudienceErrorCode.FlushFailed, $"Server error {statusCode}, will retry"); } } @@ -110,7 +113,7 @@ internal async Task SendBatchAsync(CancellationToken ct = default) } catch (Exception ex) { - _consecutiveFailures++; + RecordFailure(); NotifyError(AudienceErrorCode.NetworkError, ex.Message); } @@ -130,14 +133,46 @@ internal async Task SendBatchAsync(CancellationToken ct = default) _ => 60_000, }; - /// Whether the transport is currently backing off after failures. - internal bool IsBackingOff => _consecutiveFailures > 0; + /// + /// Timestamp after which the next send attempt should run. Null when there's + /// no active backoff (never failed, or last attempt succeeded). Callers should + /// skip sending while UtcNow < NextAttemptAt. + /// + internal DateTime? NextAttemptAt => _nextAttemptAt; + + /// + /// True if a failure occurred recently and the backoff window has not yet + /// elapsed. Becomes false naturally once enough time passes, allowing the + /// next send attempt to proceed. + /// + internal bool IsInBackoffWindow => _getUtcNow() < _nextAttemptAt; public void Dispose() { _client.Dispose(); } + private void RecordFailure() + { + var now = _getUtcNow(); + + // If we're still inside the previous backoff window, the caller retried + // before honoring the wait. Don't escalate — the existing NextAttemptAt + // deadline stands. When _nextAttemptAt is null (no prior failure), + // `now < null` is false via lifted nullable comparison, so we escalate. + if (now < _nextAttemptAt) + return; + + _consecutiveFailures++; + _nextAttemptAt = now.AddMilliseconds(BackoffMs); + } + + private void ResetBackoff() + { + _consecutiveFailures = 0; + _nextAttemptAt = null; + } + /// /// Reads file contents for each path and builds the batch JSON payload: /// {"batch":[msg1,msg2,...]} diff --git a/src/Packages/Audience/Tests/Runtime/HttpTransportTests.cs b/src/Packages/Audience/Tests/Runtime/HttpTransportTests.cs index c66ad4f66..6e05804d5 100644 --- a/src/Packages/Audience/Tests/Runtime/HttpTransportTests.cs +++ b/src/Packages/Audience/Tests/Runtime/HttpTransportTests.cs @@ -16,14 +16,25 @@ internal class HttpTransportTests private string _testDir; private DiskStore _store; + // Controllable clock for timing-sensitive tests. Tests that care about + // backoff windows or NextAttemptAt pass `getUtcNow: _getUtcNow` to the transport + // and use Advance(ms) to move time forward deterministically. + private DateTime _utcNow; + private Func _getUtcNow; + [SetUp] public void SetUp() { _testDir = Path.Combine(Path.GetTempPath(), Path.GetRandomFileName()); Directory.CreateDirectory(_testDir); _store = new DiskStore(_testDir); + _utcNow = new DateTime(2026, 4, 18, 12, 0, 0, DateTimeKind.Utc); + _getUtcNow = () => _utcNow; } + private void Advance(int milliseconds) => + _utcNow = _utcNow.AddMilliseconds(milliseconds); + [TearDown] public void TearDown() { @@ -130,7 +141,7 @@ public async Task SendBatchAsync_4xx_DeletesFilesAndResetsBackoff() await transport.SendBatchAsync(); Assert.AreEqual(0, _store.Count(), "4xx should delete files — won't succeed on retry"); - Assert.IsFalse(transport.IsBackingOff); + Assert.IsFalse(transport.IsInBackoffWindow); Assert.IsNotNull(reportedError); Assert.AreEqual(AudienceErrorCode.ValidationRejected, reportedError.Code); } @@ -148,31 +159,72 @@ public async Task SendBatchAsync_5xx_KeepsFilesAndIncreasesBackoff() await transport.SendBatchAsync(); Assert.AreEqual(1, _store.Count(), "5xx should keep files for retry"); - Assert.IsTrue(transport.IsBackingOff); + Assert.IsTrue(transport.IsInBackoffWindow); Assert.AreEqual(5000, transport.BackoffMs, "first failure = 5s backoff"); Assert.IsNotNull(reportedError); Assert.AreEqual(AudienceErrorCode.FlushFailed, reportedError.Code); } [Test] - public async Task BackoffMs_ExponentialWithCap() + public async Task BackoffMs_EscalatesOnlyAfterWindowElapsed() { _store.Write("{\"type\":\"track\"}"); var handler = new MockHandler(HttpStatusCode.InternalServerError, ""); - using var transport = new HttpTransport(_store, "pk_imapik-test-key1", handler: handler); + using var transport = new HttpTransport(_store, "pk_imapik-test-key1", + handler: handler, getUtcNow: _getUtcNow); - // Each SendBatch re-reads the same file (5xx doesn't delete it) and increments backoff. // Schedule per plan: 5s → 10s → 20s → 60s cap (no 40s step). + // Each escalation requires the previous window to have elapsed. + await transport.SendBatchAsync(); + Assert.AreEqual(5_000, transport.BackoffMs); + + Advance(5_001); + await transport.SendBatchAsync(); + Assert.AreEqual(10_000, transport.BackoffMs); + + Advance(10_001); + await transport.SendBatchAsync(); + Assert.AreEqual(20_000, transport.BackoffMs); + + Advance(20_001); await transport.SendBatchAsync(); - Assert.AreEqual(5000, transport.BackoffMs); + Assert.AreEqual(60_000, transport.BackoffMs, "jumps to 60s cap after 20s"); + + Advance(60_001); + await transport.SendBatchAsync(); + Assert.AreEqual(60_000, transport.BackoffMs, "stays at cap"); + } + + [Test] + public async Task BackoffMs_DoesNotEscalateWhileInsidePreviousWindow() + { + _store.Write("{\"type\":\"track\"}"); + var handler = new MockHandler(HttpStatusCode.InternalServerError, ""); + using var transport = new HttpTransport(_store, "pk_imapik-test-key1", + handler: handler, getUtcNow: _getUtcNow); + await transport.SendBatchAsync(); - Assert.AreEqual(10000, transport.BackoffMs); + Assert.AreEqual(5_000, transport.BackoffMs); + var firstDeadline = transport.NextAttemptAt; + Assert.IsNotNull(firstDeadline); + + // Caller ignores the window and retries immediately. Must not escalate. + Advance(100); await transport.SendBatchAsync(); - Assert.AreEqual(20000, transport.BackoffMs); + Assert.AreEqual(5_000, transport.BackoffMs, + "failures inside the previous window must not escalate backoff"); + Assert.AreEqual(firstDeadline, transport.NextAttemptAt, + "NextAttemptAt should not move when the window hasn't elapsed"); + + // Another premature retry — still no escalation. + Advance(3_000); await transport.SendBatchAsync(); - Assert.AreEqual(60000, transport.BackoffMs, "jumps to 60s cap after 20s"); + Assert.AreEqual(5_000, transport.BackoffMs); + + // Wait out the window, fail again → now we escalate. + _utcNow = firstDeadline.Value.AddMilliseconds(1); await transport.SendBatchAsync(); - Assert.AreEqual(60000, transport.BackoffMs, "stays at cap"); + Assert.AreEqual(10_000, transport.BackoffMs); } [Test] @@ -190,17 +242,20 @@ public async Task BackoffMs_ResetsAfterSuccess() : new HttpResponseMessage(HttpStatusCode.OK) { Content = new StringContent("{\"accepted\":1,\"rejected\":0}") }; }); - using var transport = new HttpTransport(_store, "pk_imapik-test-key1", handler: handler); + using var transport = new HttpTransport(_store, "pk_imapik-test-key1", + handler: handler, getUtcNow: _getUtcNow); await transport.SendBatchAsync(); - Assert.AreEqual(5000, transport.BackoffMs); + Assert.AreEqual(5_000, transport.BackoffMs); + Advance(5_001); await transport.SendBatchAsync(); - Assert.AreEqual(10000, transport.BackoffMs); + Assert.AreEqual(10_000, transport.BackoffMs); + Advance(10_001); await transport.SendBatchAsync(); Assert.AreEqual(0, transport.BackoffMs, "backoff resets after success"); - Assert.IsFalse(transport.IsBackingOff); + Assert.IsFalse(transport.IsInBackoffWindow); } [Test] @@ -216,7 +271,7 @@ public async Task SendBatchAsync_NetworkError_KeepsFilesAndBacksOff() await transport.SendBatchAsync(); Assert.AreEqual(1, _store.Count(), "network error should keep files for retry"); - Assert.IsTrue(transport.IsBackingOff); + Assert.IsTrue(transport.IsInBackoffWindow); Assert.IsNotNull(reportedError); Assert.AreEqual(AudienceErrorCode.NetworkError, reportedError.Code); } @@ -241,11 +296,35 @@ public async Task SendBatchAsync_HttpClientTimeout_TreatedAsNetworkError() await transport.SendBatchAsync(); Assert.AreEqual(1, _store.Count(), "timeout should keep files for retry"); - Assert.IsTrue(transport.IsBackingOff, "timeout must increment failures and engage backoff"); + Assert.IsTrue(transport.IsInBackoffWindow, "timeout must increment failures and engage backoff"); Assert.IsNotNull(reportedError, "NetworkError callback must fire on timeout"); Assert.AreEqual(AudienceErrorCode.NetworkError, reportedError.Code); } + [Test] + public async Task IsInBackoffWindow_ClearsAfterNextAttemptAtElapses() + { + _store.Write("{\"type\":\"track\"}"); + + var now = new DateTime(2026, 4, 17, 12, 0, 0, DateTimeKind.Utc); + var handler = new MockHandler(HttpStatusCode.InternalServerError, ""); + using var transport = new HttpTransport(_store, "pk_imapik-test-key1", + handler: handler, getUtcNow: () => now); + + await transport.SendBatchAsync(); + + Assert.IsTrue(transport.IsInBackoffWindow, "within window immediately after failure"); + Assert.AreEqual(now.AddMilliseconds(5_000), transport.NextAttemptAt); + + // Advance the clock just before NextAttemptAt — still backing off. + now = now.AddMilliseconds(4_999); + Assert.IsTrue(transport.IsInBackoffWindow); + + // Advance past NextAttemptAt — window closed, next send may proceed. + now = now.AddMilliseconds(2); + Assert.IsFalse(transport.IsInBackoffWindow, "window closes at NextAttemptAt"); + } + [Test] public async Task SendBatchAsync_ErrorCallbackThrows_DoesNotCrash() { From 9985194e68fb0cee2ecb9fae339060fec5134714 Mon Sep 17 00:00:00 2001 From: ImmutableJeffrey Date: Sat, 18 Apr 2026 12:12:54 +1000 Subject: [PATCH 5/7] fix(audience): narrow BuildPayload catch to IOException MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses review feedback on PR #691. The previous catch (Exception) in BuildPayload swallowed every failure as if it were the expected file-disappeared race. Permissions errors (UnauthorizedAccessException) and similar non-IO faults were silently dropped, masking real problems. Narrowing to IOException keeps the file-disappeared / locked / path- gone cases skipped — which is what the retry loop needs — while letting genuinely broken states propagate. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/Packages/Audience/Runtime/Transport/HttpTransport.cs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/Packages/Audience/Runtime/Transport/HttpTransport.cs b/src/Packages/Audience/Runtime/Transport/HttpTransport.cs index c2fce519d..2c7e435ad 100644 --- a/src/Packages/Audience/Runtime/Transport/HttpTransport.cs +++ b/src/Packages/Audience/Runtime/Transport/HttpTransport.cs @@ -192,9 +192,11 @@ private static string BuildPayload(IReadOnlyList paths) sb.Append(json); count++; } - catch (Exception) + catch (IOException) { - // File disappeared between ReadBatch and now — skip it. + // File disappeared, locked, or path vanished between ReadBatch + // and now — skip it. Non-IO errors like UnauthorizedAccessException + // indicate real problems and are allowed to propagate. } } From 995886dc025594b3b86116e2182de58fd8ad2996 Mon Sep 17 00:00:00 2001 From: ImmutableJeffrey Date: Sat, 18 Apr 2026 12:13:10 +1000 Subject: [PATCH 6/7] docs(audience): document BuildPayload null return contract MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses review feedback on PR #691. Makes the nullable return value explicit in the XML doc — the reviewer asked for Passport-style clarity on what's nullable. Enabling nullable reference types package-wide is a larger follow-up; this commit covers the specific case flagged. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/Packages/Audience/Runtime/Transport/HttpTransport.cs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/Packages/Audience/Runtime/Transport/HttpTransport.cs b/src/Packages/Audience/Runtime/Transport/HttpTransport.cs index 2c7e435ad..9aa538ef0 100644 --- a/src/Packages/Audience/Runtime/Transport/HttpTransport.cs +++ b/src/Packages/Audience/Runtime/Transport/HttpTransport.cs @@ -176,8 +176,12 @@ private void ResetBackoff() /// /// Reads file contents for each path and builds the batch JSON payload: /// {"batch":[msg1,msg2,...]} - /// Returns null if no files could be read. /// + /// + /// The batch JSON string, or null when every path failed to read + /// (files disappeared, empty, or locked). The caller treats null + /// as "nothing to send" and deletes the path list. + /// private static string BuildPayload(IReadOnlyList paths) { var sb = new StringBuilder("{\"batch\":["); From d68e4e7f91b0355ab4b1e4cbdc9e2d41b00e8868 Mon Sep 17 00:00:00 2001 From: ImmutableJeffrey Date: Tue, 21 Apr 2026 12:58:13 +1000 Subject: [PATCH 7/7] refactor(audience): address HttpTransport review feedback (SDK-141) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses nattb8's review on PR #691 plus follow-up cleanups. - Adopt Passport's per-file `#nullable enable` pattern. BuildPayload's return type becomes `string?`; optional ctor params (`onError`, `handler`, `getUtcNow`) and the `_onError` field are nullable-annotated. - Restore the 40s step in the backoff schedule (5s → 10s → 20s → 40s → 60s cap) so doubling stays clean and matches the original bitshift formula's natural shape. - Rewrite HttpTransport.cs comments as direct, plain-English facts. Class summary trimmed to a one-line role description; per-branch intent annotated only where non-obvious (4xx reset-backoff rationale, cancellation `when` guard, IOException narrow catch, NotifyError swallow). Drop the redundant BackoffMs summary that duplicated the switch arms below it. - Fix the cancellation-catch comment that incorrectly attributed token tripping to HttpTransport.Dispose (it's the caller's responsibility). - Handle non-IOException storage failures in SendBatchAsync. BuildPayload narrows its catch to IOException; everything else (e.g. UnauthorizedAccessException from stripped permissions) used to escape SendBatchAsync entirely and would silently kill the flush loop. Wrap the BuildPayload call: on any non-IOException failure, delete the batch, report via onError with FlushFailed, return true. Backoff test updated to cover the restored 40s step. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../Runtime/Transport/HttpTransport.cs | 110 +++++++++--------- .../Tests/Runtime/HttpTransportTests.cs | 8 +- 2 files changed, 63 insertions(+), 55 deletions(-) diff --git a/src/Packages/Audience/Runtime/Transport/HttpTransport.cs b/src/Packages/Audience/Runtime/Transport/HttpTransport.cs index 9aa538ef0..093d15c65 100644 --- a/src/Packages/Audience/Runtime/Transport/HttpTransport.cs +++ b/src/Packages/Audience/Runtime/Transport/HttpTransport.cs @@ -1,3 +1,5 @@ +#nullable enable + using System; using System.Collections.Generic; using System.IO; @@ -11,15 +13,7 @@ namespace Immutable.Audience { /// - /// Reads event batches from , gzip-compresses them, - /// and POSTs to /v1/audience/messages. Runs entirely on background - /// threads via — no main thread involvement. - /// - /// Retry policy: 5xx and network errors keep events on disk with - /// exponential backoff (5s → 10s → 20s → 60s cap). Escalation only - /// occurs after the previous backoff window has elapsed — premature - /// retries don't grow the backoff further. 4xx and 200 with rejected - /// events are dropped — they won't succeed on retry. + /// Sends queued events from to the Audience backend. /// internal sealed class HttpTransport : IDisposable { @@ -27,23 +21,23 @@ internal sealed class HttpTransport : IDisposable private readonly string _url; private readonly string _publishableKey; private readonly HttpClient _client; - private readonly Action _onError; + private readonly Action? _onError; private readonly Func _getUtcNow; private int _consecutiveFailures; private DateTime? _nextAttemptAt; - /// Disk store to read batches from. - /// Sent as x-immutable-publishable-key header. - /// Optional error callback. Never throws to the caller. - /// Optional HttpMessageHandler for testing. - /// Optional UTC clock for deterministic testing of backoff timing. + /// Source of event batches. + /// Studio API key. Sent as x-immutable-publishable-key on every request. + /// Optional failure callback. Exceptions thrown inside it are caught and ignored. + /// Optional . Callers can supply a custom pipeline (e.g. specific for test purposes). Defaults to the standard handler when null. + /// Optional UTC clock source used for backoff timing (e.g. swappable for deterministic time). Defaults to DateTime.UtcNow when null. internal HttpTransport( DiskStore store, string publishableKey, - Action onError = null, - HttpMessageHandler handler = null, - Func getUtcNow = null) + Action? onError = null, + HttpMessageHandler? handler = null, + Func? getUtcNow = null) { _store = store ?? throw new ArgumentNullException(nameof(store)); _publishableKey = publishableKey ?? throw new ArgumentNullException(nameof(publishableKey)); @@ -55,8 +49,8 @@ internal HttpTransport( } /// - /// Reads one batch from disk and sends it to the backend. - /// Returns true if a batch was sent (regardless of outcome), false if the queue was empty. + /// Attempts to process one batch: reads it from disk, gzips it, and POSTs it. + /// Returns true if a batch was consumed (outcome irrelevant), false if the queue was empty. /// internal async Task SendBatchAsync(CancellationToken ct = default) { @@ -64,10 +58,24 @@ internal async Task SendBatchAsync(CancellationToken ct = default) if (batch.Count == 0) return false; - var payload = BuildPayload(batch); + string? payload; + try + { + payload = BuildPayload(batch); + } + catch (Exception ex) + { + // Non-IOException = unrecoverable storage failure (e.g. permissions); + // retry won't help. Drop the batch, report via onError. + _store.Delete(batch); + NotifyError(AudienceErrorCode.FlushFailed, $"Local storage read failed: {ex.Message}"); + return true; + } + if (payload == null) { - // All files were unreadable — delete them and move on. + // Every file was unreadable (deleted or locked between ReadBatch and now). + // Drop the refs, return. _store.Delete(batch); return true; } @@ -88,11 +96,14 @@ internal async Task SendBatchAsync(CancellationToken ct = default) if (statusCode >= 200 && statusCode < 300) { + // 2xx: server acked, drop the batch, healthy state. _store.Delete(batch); ResetBackoff(); } else if (statusCode >= 400 && statusCode < 500) { + // 4xx: server rejected the payload. Drop it (retry won't help) and + // reset backoff — server is healthy, our data was the problem. _store.Delete(batch); ResetBackoff(); NotifyError(AudienceErrorCode.ValidationRejected, @@ -100,16 +111,18 @@ internal async Task SendBatchAsync(CancellationToken ct = default) } else { + // 5xx (or other non-2xx/4xx): server is unhealthy or the response + // is anomalous. Keep batch on disk, back off, retry later. RecordFailure(); NotifyError(AudienceErrorCode.FlushFailed, $"Server error {statusCode}, will retry"); } } catch (OperationCanceledException) when (ct.IsCancellationRequested) { - // Shutdown requested via cancellation token — don't increment failures, - // events stay on disk. HttpClient timeouts throw TaskCanceledException too; - // the `when` guard ensures those fall through to the general Exception - // handler so backoff and the NetworkError callback fire correctly. + // Caller cancelled the token (e.g. on shutdown). Events stay on + // disk, no failure recorded. HttpClient timeouts throw the same + // exception but without ct.IsCancellationRequested set, so they + // fall through to the Exception branch below and trigger backoff. } catch (Exception ex) { @@ -120,30 +133,25 @@ internal async Task SendBatchAsync(CancellationToken ct = default) return true; } - /// - /// Backoff delay in milliseconds based on consecutive failures. - /// Schedule per plan: 0 → 5s → 10s → 20s → 60s cap. - /// internal int BackoffMs => _consecutiveFailures switch { <= 0 => 0, 1 => 5_000, 2 => 10_000, 3 => 20_000, + 4 => 40_000, _ => 60_000, }; /// - /// Timestamp after which the next send attempt should run. Null when there's - /// no active backoff (never failed, or last attempt succeeded). Callers should - /// skip sending while UtcNow < NextAttemptAt. + /// Earliest UTC time at which the next attempt may run. + /// Null when no backoff is active (never failed, or last attempt succeeded). /// internal DateTime? NextAttemptAt => _nextAttemptAt; /// - /// True if a failure occurred recently and the backoff window has not yet - /// elapsed. Becomes false naturally once enough time passes, allowing the - /// next send attempt to proceed. + /// True while UtcNow < NextAttemptAt. Flips false as the clock + /// advances; no reset required. /// internal bool IsInBackoffWindow => _getUtcNow() < _nextAttemptAt; @@ -155,13 +163,7 @@ public void Dispose() private void RecordFailure() { var now = _getUtcNow(); - - // If we're still inside the previous backoff window, the caller retried - // before honoring the wait. Don't escalate — the existing NextAttemptAt - // deadline stands. When _nextAttemptAt is null (no prior failure), - // `now < null` is false via lifted nullable comparison, so we escalate. - if (now < _nextAttemptAt) - return; + if (now < _nextAttemptAt) return; // inside prior window — don't compound backoff _consecutiveFailures++; _nextAttemptAt = now.AddMilliseconds(BackoffMs); @@ -174,15 +176,14 @@ private void ResetBackoff() } /// - /// Reads file contents for each path and builds the batch JSON payload: - /// {"batch":[msg1,msg2,...]} + /// Reads each path and wraps the concatenated JSON bodies in + /// {"batch":[msg1,msg2,...]}. /// /// - /// The batch JSON string, or null when every path failed to read - /// (files disappeared, empty, or locked). The caller treats null - /// as "nothing to send" and deletes the path list. + /// The batched JSON, or null if every path was unreadable. Caller + /// treats null as "nothing to send" and deletes the path list. /// - private static string BuildPayload(IReadOnlyList paths) + private static string? BuildPayload(IReadOnlyList paths) { var sb = new StringBuilder("{\"batch\":["); var count = 0; @@ -198,9 +199,11 @@ private static string BuildPayload(IReadOnlyList paths) } catch (IOException) { - // File disappeared, locked, or path vanished between ReadBatch - // and now — skip it. Non-IO errors like UnauthorizedAccessException - // indicate real problems and are allowed to propagate. + // Transient disk race: the file was deleted or locked between + // ReadBatch and now. Safe to skip — the remaining paths in the + // batch may still read fine. Non-IOException failures escape + // and are handled by the caller (SendBatchAsync) as a batch- + // wide storage error. } } @@ -219,7 +222,8 @@ private void NotifyError(AudienceErrorCode code, string message) } catch { - // Error callback itself threw — swallow to protect the SDK. + // Consumer callback threw. Swallow: the SDK must not surface + // exceptions through the error-reporting path itself. } } } diff --git a/src/Packages/Audience/Tests/Runtime/HttpTransportTests.cs b/src/Packages/Audience/Tests/Runtime/HttpTransportTests.cs index 6e05804d5..8e5f18620 100644 --- a/src/Packages/Audience/Tests/Runtime/HttpTransportTests.cs +++ b/src/Packages/Audience/Tests/Runtime/HttpTransportTests.cs @@ -173,7 +173,7 @@ public async Task BackoffMs_EscalatesOnlyAfterWindowElapsed() using var transport = new HttpTransport(_store, "pk_imapik-test-key1", handler: handler, getUtcNow: _getUtcNow); - // Schedule per plan: 5s → 10s → 20s → 60s cap (no 40s step). + // Schedule: 5s → 10s → 20s → 40s → 60s cap. // Each escalation requires the previous window to have elapsed. await transport.SendBatchAsync(); Assert.AreEqual(5_000, transport.BackoffMs); @@ -188,7 +188,11 @@ public async Task BackoffMs_EscalatesOnlyAfterWindowElapsed() Advance(20_001); await transport.SendBatchAsync(); - Assert.AreEqual(60_000, transport.BackoffMs, "jumps to 60s cap after 20s"); + Assert.AreEqual(40_000, transport.BackoffMs); + + Advance(40_001); + await transport.SendBatchAsync(); + Assert.AreEqual(60_000, transport.BackoffMs, "reaches 60s cap after 40s step"); Advance(60_001); await transport.SendBatchAsync();