Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 27 additions & 4 deletions src/Packages/Audience/Runtime/ImmutableAudience.cs
Original file line number Diff line number Diff line change
Expand Up @@ -543,8 +543,9 @@ private static void SyncConsentToBackend(AudienceConfig config, ConsentLevel lev
// Flush / Shutdown
// -----------------------------------------------------------------

// Sends all pending events now.
public static async Task FlushAsync()
// Sends all pending events now. Respects cancellationToken for both
// the gate wait and the HTTP send.
public static async Task FlushAsync(CancellationToken cancellationToken = default)
{
if (!_initialized) return;

Expand All @@ -554,9 +555,31 @@ public static async Task FlushAsync()

queue.FlushSync();

while (!transport.IsInBackoffWindow &&
await transport.SendBatchAsync().ConfigureAwait(false))
// Only one send runs at a time. Without this, two FlushAsync
// callers would both read the same batch from disk and send it
// twice. Yield while another caller (the timer or another
// FlushAsync) holds the in-flight slot.
while (Interlocked.CompareExchange(ref _sendInFlight, 1, 0) != 0)
{
cancellationToken.ThrowIfCancellationRequested();
await Task.Yield();
}

try
{
while (!transport.IsInBackoffWindow &&
await transport.SendBatchAsync(cancellationToken).ConfigureAwait(false))
Comment thread
ImmutableJeffrey marked this conversation as resolved.
{
}
}
catch (ObjectDisposedException)
{
// Concurrent Shutdown disposed the transport. Exit silently —
// caller is tearing down.
}
finally
{
Interlocked.Exchange(ref _sendInFlight, 0);
}
}

Expand Down
103 changes: 87 additions & 16 deletions src/Packages/Audience/Runtime/Transport/HttpTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ internal sealed class HttpTransport : IDisposable
private readonly Action<AudienceError>? _onError;
private readonly Func<DateTime> _getUtcNow;

private readonly object _backoffLock = new object();
private int _consecutiveFailures;
private DateTime? _nextAttemptAt;

Expand All @@ -40,7 +41,11 @@ internal HttpTransport(
_publishableKey = publishableKey ?? throw new ArgumentNullException(nameof(publishableKey));
_url = Constants.MessagesUrl(publishableKey);
_onError = onError;
_client = handler != null ? new HttpClient(handler) : new HttpClient();
// disposeHandler: false so the consumer can reuse their handler
// across Init/Shutdown cycles (matches _controlClient's policy).
_client = handler != null
? new HttpClient(handler, disposeHandler: false)
: new HttpClient();
_client.Timeout = TimeSpan.FromSeconds(30);
_getUtcNow = getUtcNow ?? (() => DateTime.UtcNow);
}
Expand Down Expand Up @@ -94,9 +99,18 @@ internal async Task<bool> SendBatchAsync(CancellationToken ct = default)

if (statusCode >= 200 && statusCode < 300)
{
// 2xx: server acked, drop the batch, healthy state.
// Server accepted the batch. Count how many messages it
// rejected; if any, tell the studio via onError. Rejected
// messages are validation failures — retrying won't help,
// so the batch is deleted either way.
var rejected = await ParseRejectedCount(response).ConfigureAwait(false);
_store.Delete(batch);
ResetBackoff();
if (rejected > 0)
{
NotifyError(AudienceErrorCode.ValidationRejected,
$"Batch partially rejected: {rejected} of {batch.Count} events dropped");
}
}
else if (statusCode >= 400 && statusCode < 500)
{
Expand All @@ -117,10 +131,15 @@ internal async Task<bool> SendBatchAsync(CancellationToken ct = default)
}
catch (OperationCanceledException) when (ct.IsCancellationRequested)
{
// Caller cancelled the token (e.g. on shutdown). Events stay on
// disk, no failure recorded. HttpClient timeouts throw the same
// exception but without ct.IsCancellationRequested set, so they
// fall through to the Exception branch below and trigger backoff.
// Caller cancelled the token (e.g. on shutdown). Events stay
// on disk, no failure recorded. Rethrow so the caller's send
// loop exits — swallowing here returns `true`, and the loop
// would re-enter on the same cancelled token and spin because
// the batch is still on disk. HttpClient timeouts throw the
// same exception but without ct.IsCancellationRequested set,
// so they fall through to the Exception branch below and
// trigger backoff.
throw;
}
catch (Exception ex)
{
Expand All @@ -131,7 +150,15 @@ internal async Task<bool> SendBatchAsync(CancellationToken ct = default)
return true;
}

internal int BackoffMs => _consecutiveFailures switch
internal int BackoffMs
{
get
{
lock (_backoffLock) return BackoffMsLocked();
}
}

private int BackoffMsLocked() => _consecutiveFailures switch
{
<= 0 => 0,
1 => 5_000,
Expand All @@ -143,10 +170,16 @@ internal async Task<bool> SendBatchAsync(CancellationToken ct = default)

// Earliest UTC time at which the next attempt may run.
// Null when no backoff is active.
internal DateTime? NextAttemptAt => _nextAttemptAt;
internal DateTime? NextAttemptAt
{
get { lock (_backoffLock) return _nextAttemptAt; }
}

// True while UtcNow < NextAttemptAt. Flips false as the clock advances.
internal bool IsInBackoffWindow => _getUtcNow() < _nextAttemptAt;
internal bool IsInBackoffWindow
{
get { lock (_backoffLock) return _getUtcNow() < _nextAttemptAt; }
}

public void Dispose()
{
Expand All @@ -155,17 +188,22 @@ public void Dispose()

private void RecordFailure()
{
var now = _getUtcNow();
if (now < _nextAttemptAt) return; // inside prior window — don't compound backoff

_consecutiveFailures++;
_nextAttemptAt = now.AddMilliseconds(BackoffMs);
lock (_backoffLock)
{
var now = _getUtcNow();
if (now < _nextAttemptAt) return; // inside prior window — don't compound backoff
_consecutiveFailures++;
_nextAttemptAt = now.AddMilliseconds(BackoffMsLocked());
}
}

private void ResetBackoff()
{
_consecutiveFailures = 0;
_nextAttemptAt = null;
lock (_backoffLock)
{
_consecutiveFailures = 0;
_nextAttemptAt = null;
}
}

// Reads each path and wraps the concatenated JSON bodies in
Expand Down Expand Up @@ -201,6 +239,39 @@ private void ResetBackoff()
return sb.ToString();
}

// Reads the response body and pulls out the "rejected" count. Returns
// 0 if the body is missing or unreadable — the body is only for
// reporting, so failing to read it must not break the success path.
private static async Task<int> ParseRejectedCount(HttpResponseMessage response)
{
string body;
try
{
body = await response.Content.ReadAsStringAsync().ConfigureAwait(false);
}
catch
{
return 0;
}
if (string.IsNullOrEmpty(body)) return 0;

try
{
var parsed = JsonReader.DeserializeObject(body);
if (!parsed.TryGetValue("rejected", out var raw)) return 0;
return raw switch
{
int i => i,
long l => (int)l,
_ => 0,
};
}
catch (FormatException)
{
return 0;
}
}

private void NotifyError(AudienceErrorCode code, string message)
{
if (_onError == null) return;
Expand Down
90 changes: 90 additions & 0 deletions src/Packages/Audience/Tests/Runtime/ImmutableAudienceTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1145,6 +1145,96 @@ public void SendBatch_ConcurrentTicks_OnlyOneReachesTransport()
"overlapping tick must not issue a second HTTP request");
}

[Test]
public void FlushAsync_ConcurrentCallers_OnlyOneReachesTransport()
{
// Two parallel FlushAsync calls must serialise on _sendInFlight so
// ReadBatch/POST pairs do not double-send. Sabotage: remove the
// gate in FlushAsync and this test sees RequestCount > 1.
var handler = new BlockingHandler();
var config = MakeConfig();
config.HttpHandler = handler;

ImmutableAudience.Init(config);
ImmutableAudience.Track("event_to_send");
ImmutableAudience.FlushQueueToDiskForTesting();

// First caller enters SendAsync and blocks on handler.Release.
var flush1 = Task.Run(() => ImmutableAudience.FlushAsync());
Assert.IsTrue(handler.EnteredSendAsync.Wait(TimeSpan.FromSeconds(2)),
"first FlushAsync should reach the HTTP handler");

// Second caller starts while the first holds the gate — it must
// wait, not issue a second request.
var flush2 = Task.Run(() => ImmutableAudience.FlushAsync());

// Give the second caller a moment to try (and back off).
Thread.Sleep(200);
Assert.AreEqual(1, handler.RequestCount,
"second FlushAsync must not issue a second HTTP request while the first is in-flight");

handler.Release.Set();
Assert.IsTrue(Task.WaitAll(new[] { flush1, flush2 }, TimeSpan.FromSeconds(10)),
"both FlushAsync calls should complete after release");
}

[Test]
public async Task FlushAsync_CancelledToken_Terminates_DoesNotHotLoop()
{
// Regression for PR #701 review (@nattb8): if SendBatchAsync
// silently swallowed caller cancellation, the inner while-loop
// here would re-enter on the same cancelled token and spin
// because the batch is never deleted on that code path. The
// task below would never complete. After the fix, cancellation
// propagates and the task faults quickly.
var handler = new CancellingHandler();
var config = MakeConfig();
config.HttpHandler = handler;

ImmutableAudience.Init(config);
ImmutableAudience.Track("event_to_send");
ImmutableAudience.FlushQueueToDiskForTesting();

using var cts = new CancellationTokenSource();
cts.Cancel();

var flush = ImmutableAudience.FlushAsync(cts.Token);
var finishedFirst = await Task.WhenAny(flush, Task.Delay(TimeSpan.FromSeconds(2)));

Assert.AreSame(flush, finishedFirst,
"FlushAsync must terminate (not hot-loop) when the token is cancelled");
Assert.IsTrue(flush.IsCanceled || flush.IsFaulted,
"FlushAsync must propagate the cancellation, not return normally");
Assert.LessOrEqual(handler.CallCount, 1,
"a cancelled token must not drive repeated SendAsync attempts");

// Gate must be released by the finally block — a follow-up flush
// on an uncancelled token should proceed, proving _sendInFlight
// is not stranded at 1.
handler.AcceptNextAsSuccess = true;
var followUp = ImmutableAudience.FlushAsync();
Assert.IsTrue(followUp.Wait(TimeSpan.FromSeconds(2)),
"_sendInFlight must be released after a cancelled flush");
}

private class CancellingHandler : HttpMessageHandler
{
public int CallCount;
public bool AcceptNextAsSuccess;

protected override Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken ct)
{
Interlocked.Increment(ref CallCount);
ct.ThrowIfCancellationRequested();
var status = AcceptNextAsSuccess ? HttpStatusCode.OK : HttpStatusCode.ServiceUnavailable;
var body = AcceptNextAsSuccess ? "{\"accepted\":1,\"rejected\":0}" : "";
return Task.FromResult(new HttpResponseMessage(status)
{
Content = new StringContent(body)
});
}
}

private class BlockingHandler : HttpMessageHandler
{
public readonly ManualResetEventSlim EnteredSendAsync = new ManualResetEventSlim(false);
Expand Down
Loading
Loading