diff --git a/src/Packages/Audience/Runtime/AudienceConfig.cs b/src/Packages/Audience/Runtime/AudienceConfig.cs index f6085bca1..c4b2ae9af 100644 --- a/src/Packages/Audience/Runtime/AudienceConfig.cs +++ b/src/Packages/Audience/Runtime/AudienceConfig.cs @@ -1,16 +1,20 @@ +#nullable enable + +using System; + namespace Immutable.Audience { // Configuration passed to ImmutableAudience.Init. public class AudienceConfig { - // Studio API key. - public string PublishableKey { get; set; } + // Studio API key. Required — Init throws if null. + public string? PublishableKey { get; set; } // Initial consent level. public ConsentLevel Consent { get; set; } = ConsentLevel.None; // Distribution platform the game is running on. - public string DistributionPlatform { get; set; } + public string? DistributionPlatform { get; set; } // Enable debug logging. public bool Debug { get; set; } = false; @@ -20,5 +24,21 @@ public class AudienceConfig // Flush as soon as this many events are queued. public int FlushSize { get; set; } = Constants.DefaultFlushSize; + + // Optional error callback. + public Action? OnError { get; set; } + + // Directory the SDK uses for identity, consent, and queued events. + // Unity hooks populate this from Application.persistentDataPath. + public string? PersistentDataPath { get; set; } + + // Library version sent on every message. + public string PackageVersion { get; set; } = Constants.LibraryVersion; + + // Maximum time Shutdown waits for the final flush. + public int ShutdownFlushTimeoutMs { get; set; } = 2_000; + + // Test seam for HttpTransport; not part of the public API. + internal System.Net.Http.HttpMessageHandler? HttpHandler { get; set; } } } diff --git a/src/Packages/Audience/Runtime/ConsentLevel.cs b/src/Packages/Audience/Runtime/ConsentLevel.cs index 197836d75..deb0b189e 100644 --- a/src/Packages/Audience/Runtime/ConsentLevel.cs +++ b/src/Packages/Audience/Runtime/ConsentLevel.cs @@ -1,3 +1,5 @@ +#nullable enable + namespace Immutable.Audience { // How much data the Audience SDK is allowed to collect. @@ -10,4 +12,18 @@ public enum ConsentLevel // Full tracking, including identity. Full } + + internal static class ConsentLevelExtensions + { + // Throws on unknown casts rather than emitting null: a null value + // would poison the backend consent log. + internal static string ToLowercaseString(this ConsentLevel level) => level switch + { + ConsentLevel.None => "none", + ConsentLevel.Anonymous => "anonymous", + ConsentLevel.Full => "full", + _ => throw new System.ArgumentOutOfRangeException( + nameof(level), level, "Unhandled ConsentLevel"), + }; + } } diff --git a/src/Packages/Audience/Runtime/Core/Constants.cs b/src/Packages/Audience/Runtime/Core/Constants.cs index 9cb2a6de0..f57a8f3bd 100644 --- a/src/Packages/Audience/Runtime/Core/Constants.cs +++ b/src/Packages/Audience/Runtime/Core/Constants.cs @@ -17,6 +17,7 @@ internal static class Constants internal const int MaxBatchSize = 100; internal const int StaleEventDays = 30; internal const int MaxFieldLength = 256; // Backend schema limit. + internal const int ControlPlaneRequestTimeoutSeconds = 30; internal const string LibraryName = "com.immutable.audience"; internal const string LibraryVersion = "0.1.0"; diff --git a/src/Packages/Audience/Runtime/ImmutableAudience.cs b/src/Packages/Audience/Runtime/ImmutableAudience.cs index d8b78aed2..ae29c3f28 100644 --- a/src/Packages/Audience/Runtime/ImmutableAudience.cs +++ b/src/Packages/Audience/Runtime/ImmutableAudience.cs @@ -1,15 +1,666 @@ +#nullable enable + +using System; +using System.Collections.Generic; +using System.IO; +using System.Net.Http; +using System.Threading; +using System.Threading.Tasks; + namespace Immutable.Audience { // Entry point for the Immutable Audience SDK. public static class ImmutableAudience { - // Scaffold only -- implementation follows in subsequent sub-issues (see SDK-99). + // Reference fields are written inside _initLock; readers fence off the volatile _initialized load. + // _consent and _userId are mutated outside the lock and need volatile themselves. + private static AudienceConfig? _config; + private static DiskStore? _store; + private static EventQueue? _queue; + private static HttpTransport? _transport; + private static HttpClient? _controlClient; + private static CancellationTokenSource? _shutdownCancellationSource; + private static Timer? _sendTimer; + private static volatile ConsentLevel _consent; + private static volatile string? _userId; + private static volatile bool _initialized; + private static readonly object _initLock = new object(); + + // Guard against overlapping timer ticks. System.Threading.Timer fires + // callbacks on independent ThreadPool threads and does not serialise + // them; without this gate, a slow SendBatchAsync (up to the HTTP + // timeout) would stack on every interval tick, each tick holding its + // own thread blocked on a pending request. + private static int _sendInFlight; + + // AudienceUnityHooks sets this at SubsystemRegistration so Unity studios + // can omit PersistentDataPath from AudienceConfig and Init will fill it + // from Application.persistentDataPath. Non-Unity callers must still set + // PersistentDataPath on the config. + internal static Func? DefaultPersistentDataPathProvider; // Starts the SDK. Call once at launch. public static void Init(AudienceConfig config) { - throw new System.NotImplementedException( - "Immutable Audience SDK: implementation pending. See SDK-99."); + if (config == null) throw new ArgumentNullException(nameof(config)); + if (string.IsNullOrEmpty(config.PublishableKey)) + throw new ArgumentException("PublishableKey is required", nameof(config)); + + if (string.IsNullOrEmpty(config.PersistentDataPath)) + config.PersistentDataPath = DefaultPersistentDataPathProvider?.Invoke(); + if (string.IsNullOrEmpty(config.PersistentDataPath)) + throw new ArgumentException("PersistentDataPath is required", nameof(config)); + + ConsentLevel consentAtInit; + lock (_initLock) + { + if (_initialized) + { + Log.Warn("Init called more than once — ignoring; original config retained. " + + "Call Shutdown() first if reconfiguring is intended."); + return; + } + + _config = config; + Log.Enabled = config.Debug; + // Persisted consent overrides the config default so a prior runtime downgrade survives restart. + _consent = ConsentStore.Load(config.PersistentDataPath) ?? config.Consent; + + _store = new DiskStore(config.PersistentDataPath); + _queue = new EventQueue(_store, config.FlushIntervalSeconds, config.FlushSize); + _transport = new HttpTransport(_store, config.PublishableKey, config.OnError, config.HttpHandler); + _controlClient = config.HttpHandler != null + ? new HttpClient(config.HttpHandler, disposeHandler: false) + : new HttpClient(); + _controlClient.Timeout = TimeSpan.FromSeconds(Constants.ControlPlaneRequestTimeoutSeconds); + _shutdownCancellationSource = new CancellationTokenSource(); + + // Disk → network timer. EventQueue owns the separate memory → disk drain. + var sendIntervalMs = Math.Max(1, config.FlushIntervalSeconds) * 1000; + _sendTimer = new Timer(_ => SendBatch(), null, sendIntervalMs, sendIntervalMs); + + _initialized = true; + + // Snapshot under the lock so a racing SetConsent(None) can't drop the launch event. + consentAtInit = _consent; + } + + FireGameLaunch(config, consentAtInit); + } + + // ----------------------------------------------------------------- + // Track + // ----------------------------------------------------------------- + + // Send a typed event. + // + // Prefer this overload for predefined event names (e.g. purchase) — the + // IEvent implementation enforces required fields and value types at + // compile time. The string overload accepts any property shape and + // cannot catch missing or mistyped fields. + public static void Track(IEvent evt) + { + if (!CanTrack()) return; + if (evt == null) + { + Log.Warn("Track(IEvent) called with null event — dropping."); + return; + } + + var config = _config; + if (config == null) return; + + // Consumer-supplied impl; catch so a buggy IEvent cannot crash the game. + string eventName; + Dictionary properties; + try + { + eventName = evt.EventName; + properties = evt.ToProperties(); + } + catch (Exception ex) + { + Log.Warn($"Track(IEvent) — {evt.GetType().Name}.ToProperties()/EventName threw {ex.GetType().Name}: {ex.Message}. Dropping."); + return; + } + + if (string.IsNullOrEmpty(eventName)) + { + Log.Warn($"Track(IEvent) — {evt.GetType().Name}.EventName returned null or empty. Dropping."); + return; + } + + var anonymousId = Identity.GetOrCreate(config.PersistentDataPath!, _consent); + // ToProperties returns a fresh dict per call, so no snapshot needed. + var msg = MessageBuilder.Track(eventName, anonymousId, _userId, config.PackageVersion, properties); + Enqueue(msg); + } + + // Send a custom event. + // + // For predefined event names (e.g. purchase), prefer the typed + // overload Track(new Purchase { ... }) — it enforces required fields + // and value types at compile time. This overload does not validate + // property shapes, so missing or mistyped fields can break + // attribution/conversion reporting. + public static void Track(string eventName, Dictionary? properties = null) + { + if (!CanTrack()) return; + if (string.IsNullOrEmpty(eventName)) + { + Log.Warn("Track(string) called with null or empty event name — dropping."); + return; + } + + var config = _config; + if (config == null) return; + + var anonymousId = Identity.GetOrCreate(config.PersistentDataPath!, _consent); + var msg = MessageBuilder.Track(eventName, anonymousId, _userId, config.PackageVersion, + SnapshotCallerDict(properties)); + Enqueue(msg); + } + + // ----------------------------------------------------------------- + // Identity + // ----------------------------------------------------------------- + + // Attach a known user id to subsequent events. + public static void Identify(string userId, IdentityType identityType, Dictionary? traits = null) => + Identify(userId, identityType.ToLowercaseString(), traits); + + // Attach a known user id to subsequent events. String overload for + // providers not in IdentityType. + public static void Identify(string userId, string? identityType, Dictionary? traits = null) + { + if (!_initialized) return; + + // Validate inputs before consent so null-arg callers get the right warning. + if (string.IsNullOrEmpty(userId)) + { + Log.Warn("Identify called with null or empty userId — dropping."); + return; + } + if (string.IsNullOrEmpty(identityType)) + { + Log.Warn("Identify called with null or empty identityType — dropping."); + return; + } + if (_consent != ConsentLevel.Full) + { + Log.Warn($"Identify discarded — requires Full consent, current is {_consent}"); + return; + } + + var config = _config; + if (config == null) return; + + _userId = userId; + + var anonymousId = Identity.GetOrCreate(config.PersistentDataPath!, _consent); + var msg = MessageBuilder.Identify(anonymousId, userId, identityType, config.PackageVersion, + SnapshotCallerDict(traits)); + Enqueue(msg); + } + + // Attach or update traits for the current anonymous user without + // supplying a user id. Useful when only the anonymous profile is + // known, or when only traits have changed since a prior Identify(). + // + // Does not modify the current user id — a subsequent Track() still + // carries whatever id was set by a previous Identify(userId, ...) call. + public static void Identify(Dictionary traits) + { + if (!_initialized) return; + + if (traits == null) + { + Log.Warn("Identify(traits) called with null traits — dropping."); + return; + } + if (_consent != ConsentLevel.Full) + { + Log.Warn($"Identify discarded — requires Full consent, current is {_consent}"); + return; + } + + var config = _config; + if (config == null) return; + + var anonymousId = Identity.GetOrCreate(config.PersistentDataPath!, _consent); + var msg = MessageBuilder.Identify(anonymousId, userId: null, identityType: null, + config.PackageVersion, SnapshotCallerDict(traits)); + Enqueue(msg); + } + + // Link two user ids for the same player. + public static void Alias(string fromId, IdentityType fromType, string toId, IdentityType toType) => + Alias(fromId, fromType.ToLowercaseString(), toId, toType.ToLowercaseString()); + + // Link two user ids for the same player. String overload for + // providers not in IdentityType. + public static void Alias(string fromId, string? fromType, string toId, string? toType) + { + if (!_initialized) return; + + if (string.IsNullOrEmpty(fromId) || string.IsNullOrEmpty(toId)) + { + Log.Warn("Alias called with null or empty fromId/toId — dropping."); + return; + } + if (string.IsNullOrEmpty(fromType) || string.IsNullOrEmpty(toType)) + { + Log.Warn("Alias called with null or empty fromType/toType — dropping."); + return; + } + if (_consent != ConsentLevel.Full) + { + Log.Warn($"Alias discarded — requires Full consent, current is {_consent}"); + return; + } + + var config = _config; + if (config == null) return; + + var msg = MessageBuilder.Alias(fromId, fromType, toId, toType, config.PackageVersion); + Enqueue(msg); + } + + // Log out the current player. Clears the user id, generates a fresh + // anonymous id, and discards queued events (in-memory and on-disk) + // so the next player on this device isn't attributed to the previous + // one. + // + // To send queued events before they're discarded, + // invoke await FlushAsync() first: + // + // await ImmutableAudience.FlushAsync(); + // ImmutableAudience.Reset(); + public static void Reset() + { + if (!_initialized) return; + + var config = _config; + if (config == null) return; + + _userId = null; + _queue?.PurgeAll(); + Identity.Reset(config.PersistentDataPath!); + } + + // Ask the backend to erase this player's data. + public static void DeleteData(string? userId = null) + { + if (!_initialized) return; + + var config = _config; + var client = _controlClient; + if (config == null || client == null) return; + + string query; + if (!string.IsNullOrEmpty(userId)) + { + query = "userId=" + Uri.EscapeDataString(userId); + } + else + { + // Get, not GetOrCreate — a brand-new install must not register an ID just to delete it. + var anonymousId = Identity.Get(config.PersistentDataPath!); + if (string.IsNullOrEmpty(anonymousId)) + return; + query = "anonymousId=" + Uri.EscapeDataString(anonymousId); + } + + var url = Constants.DataUrl(config.PublishableKey) + "?" + query; + var onError = config.OnError; + var publishableKey = config.PublishableKey; + var cancellationToken = _shutdownCancellationSource?.Token ?? CancellationToken.None; + + Task.Run(async () => + { + try + { + using var request = new HttpRequestMessage(HttpMethod.Delete, url); + request.Headers.Add(Constants.PublishableKeyHeader, publishableKey); + using var response = await client.SendAsync(request, cancellationToken).ConfigureAwait(false); + + if (!response.IsSuccessStatusCode) + { + NotifyErrorCallback(onError, AudienceErrorCode.NetworkError, + $"Data delete failed with status {(int)response.StatusCode}"); + } + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + // Shutdown cancelled the request — no error fired; caller is tearing down. + } + catch (Exception ex) + { + NotifyErrorCallback(onError, AudienceErrorCode.NetworkError, + $"Data delete threw: {ex.Message}"); + } + }); + } + + private static void NotifyErrorCallback(Action? onError, AudienceErrorCode code, string message) + { + if (onError == null) return; + try + { + onError(new AudienceError(code, message)); + } + catch + { + // Swallow: a buggy OnError must not crash the SDK surface. + } + } + + // ----------------------------------------------------------------- + // Consent + // ----------------------------------------------------------------- + + // Change the player's consent level. + public static void SetConsent(ConsentLevel level) + { + if (!_initialized) return; + + var config = _config; + var queue = _queue; + if (config == null) return; + + var previous = _consent; + if (level == previous) return; + + // Snapshot the anonymousId BEFORE Identity.Reset (on downgrade to + // None) wipes the file. The PUT audit trail needs it to record + // whose consent changed. + var anonymousIdForPut = previous == ConsentLevel.None + ? Identity.GetOrCreate(config.PersistentDataPath!, level) + : Identity.Get(config.PersistentDataPath!); + + _consent = level; + + try + { + // PersistentDataPath is validated non-null in Init; compiler can't propagate that. + ConsentStore.Save(config.PersistentDataPath!, level); + } + catch (Exception ex) when (ex is IOException || ex is UnauthorizedAccessException) + { + Log.Warn($"SetConsent — failed to persist consent level: {ex.GetType().Name}: {ex.Message}. " + + "In-memory level is updated but will revert on next launch."); + } + + if (level == ConsentLevel.None) + { + queue?.PurgeAll(); + Identity.Reset(config.PersistentDataPath!); + } + else if (previous == ConsentLevel.Full && level == ConsentLevel.Anonymous) + { + _userId = null; + queue?.ApplyAnonymousDowngrade(); + } + + SyncConsentToBackend(config, level, anonymousIdForPut); + } + + // Fire-and-forget PUT /v1/audience/tracking-consent. Failures do not + // block or surface; the local consent change has already applied. + private static void SyncConsentToBackend(AudienceConfig config, ConsentLevel level, string? anonymousId) + { + var client = _controlClient; + if (client == null) return; + + var url = Constants.ConsentUrl(config.PublishableKey); + var publishableKey = config.PublishableKey; + var onError = config.OnError; + var cancellationToken = _shutdownCancellationSource?.Token ?? CancellationToken.None; + + var body = Json.Serialize(new Dictionary + { + ["status"] = level.ToLowercaseString(), + ["source"] = Constants.ConsentSource, + // Json.Serialize emits null → "anonymousId": null. Preserves the backend's ability to distinguish "unknown" from a missing field. + ["anonymousId"] = anonymousId!, + }); + + Task.Run(async () => + { + try + { + using var request = new HttpRequestMessage(HttpMethod.Put, url); + request.Headers.Add(Constants.PublishableKeyHeader, publishableKey); + request.Content = new StringContent(body, System.Text.Encoding.UTF8, "application/json"); + using var response = await client.SendAsync(request, cancellationToken).ConfigureAwait(false); + + if (!response.IsSuccessStatusCode) + { + NotifyErrorCallback(onError, AudienceErrorCode.ConsentSyncFailed, + $"Consent sync failed with status {(int)response.StatusCode}"); + } + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + // Shutdown cancelled the request — no error fired. + } + catch (Exception ex) + { + NotifyErrorCallback(onError, AudienceErrorCode.ConsentSyncFailed, + $"Consent sync threw: {ex.Message}"); + } + }); + } + + // ----------------------------------------------------------------- + // Flush / Shutdown + // ----------------------------------------------------------------- + + // Send pending events now. + public static async Task FlushAsync() + { + if (!_initialized) return; + + var queue = _queue; + var transport = _transport; + if (queue == null || transport == null) return; + + queue.FlushSync(); + + while (!transport.IsInBackoffWindow && + await transport.SendBatchAsync().ConfigureAwait(false)) + { + } + } + + // Flush and stop the SDK. + public static void Shutdown() + { + if (!_initialized) return; + + // Drain in-flight timer callbacks before disposing dependents. + // Parameterless Timer.Dispose returns immediately and would race SendBatch. + var timer = _sendTimer; + if (timer != null) + { + using var disposed = new ManualResetEvent(false); + if (timer.Dispose(disposed)) + { + disposed.WaitOne(TimeSpan.FromSeconds(2)); + } + _sendTimer = null; + } + + // Clear the in-flight guard in case the WaitOne above timed out + // with a SendBatch callback still running: without this, a later + // Init would leave _sendInFlight stranded at 1 and suppress every + // tick of the new timer. + Interlocked.Exchange(ref _sendInFlight, 0); + + _queue?.Shutdown(); + + // Best-effort final send, capped so a slow network can't hang quit. + if (_transport != null) + { + var timeoutMs = _config?.ShutdownFlushTimeoutMs ?? 2_000; + try + { + var send = _transport.SendBatchAsync(); + if (!send.Wait(timeoutMs)) + { + Log.Warn($"Shutdown flush exceeded {timeoutMs}ms — abandoning. " + + "Queued events remain on disk and will retry on next startup."); + } + } + catch (Exception ex) + { + Log.Warn($"Shutdown flush threw: {ex.GetType().Name}: {ex.Message}"); + } + } + + // Cancel in-flight control-plane HTTP requests (DeleteData / SyncConsentToBackend) + // before disposing the client so awaiting callers observe OperationCanceledException + // rather than ObjectDisposedException. + _shutdownCancellationSource?.Cancel(); + + _transport?.Dispose(); + _queue?.Dispose(); + _controlClient?.Dispose(); + _shutdownCancellationSource?.Dispose(); + _shutdownCancellationSource = null; + + // Drop Identity's in-memory cache so a subsequent Init with a + // different persistentDataPath reads the file from the new path + // instead of returning the previous session's id. + Identity.ClearCache(); + + _initialized = false; + _config = null; + _store = null; + _queue = null; + _transport = null; + _controlClient = null; + _userId = null; + } + + // ----------------------------------------------------------------- + // Internal — shared with tests and AudienceUnityHooks + // ----------------------------------------------------------------- + + // Shuts down (if initialised) and clears per-session state so a + // fresh Init starts clean. Used on test teardown and by Unity + // SubsystemRegistration to survive "disable domain reload". + internal static void ResetState() + { + if (_initialized) + Shutdown(); + + _consent = ConsentLevel.None; + // Drop Identity's static cache so a subsequent Init with a different + // persistentDataPath (tests, domain reload with changed config) reads + // the file from the new path, not the previous session's cached id. + Identity.ClearCache(); + } + + internal static ConsentLevel CurrentConsentForTesting => _consent; + + internal static void FlushQueueToDiskForTesting() => _queue?.FlushSync(); + + // Invokes the timer callback body directly so the overlapping-tick + // guard can be exercised without a real timer. + internal static void SendBatchForTesting() => SendBatch(); + + // ----------------------------------------------------------------- + // Private + // ----------------------------------------------------------------- + + private static bool CanTrack() + { + return _initialized && _consent != ConsentLevel.None; + } + + // Shallow-copy the caller's dict so a post-call mutation cannot race the drain-thread serialiser. + private static Dictionary? SnapshotCallerDict(Dictionary? src) => + src != null ? new Dictionary(src) : null; + + private static void Enqueue(Dictionary? msg) + { + var queue = _queue; + if (queue == null) return; + + // Re-check consent inside the drain lock so a SetConsent(None) racing + // the caller's CanTrack cannot leak this event past the purge. + queue.EnqueueChecked(msg, () => _consent != ConsentLevel.None); + } + + private static void SendBatch() + { + // CAS in the guard before doing any work; a previous tick still + // running means skip entirely, including the reschedule — the + // in-flight tick will reschedule on its own finally path. + if (Interlocked.CompareExchange(ref _sendInFlight, 1, 0) != 0) + return; + + try + { + var transport = _transport; + if (transport == null) return; + + if (!transport.IsInBackoffWindow) + { + try + { + transport.SendBatchAsync().ConfigureAwait(false).GetAwaiter().GetResult(); + } + catch (Exception ex) + { + // ThreadPool timer thread; no caller above to catch. + Log.Warn($"SendBatch unexpected exception: {ex.GetType().Name}: {ex.Message}"); + } + } + + RescheduleSendTimer(transport); + } + finally + { + Interlocked.Exchange(ref _sendInFlight, 0); + } + } + + // Realigns the timer to NextAttemptAt so we don't repoll through a long backoff window. + private static void RescheduleSendTimer(HttpTransport transport) + { + var timer = _sendTimer; + var config = _config; + if (timer == null || config == null || transport == null) return; + + var sendIntervalMs = Math.Max(1, config.FlushIntervalSeconds) * 1000; + var nextMs = sendIntervalMs; + + if (transport.NextAttemptAt is DateTime scheduled) + { + var delayMs = (scheduled - DateTime.UtcNow).TotalMilliseconds; + if (delayMs > sendIntervalMs) + nextMs = (int)Math.Min(int.MaxValue, delayMs); + } + + timer.Change(nextMs, sendIntervalMs); + } + + // consentAtInit snapshot is only used to skip the launch event under None; + // Track still consults live _consent via CanTrack, so a SetConsent(None) + // landing between Init returning and here still drops the event. + private static void FireGameLaunch(AudienceConfig config, ConsentLevel consentAtInit) + { + if (consentAtInit == ConsentLevel.None) return; + + var properties = new Dictionary(); + + if (config.DistributionPlatform != null) + properties["distributionPlatform"] = config.DistributionPlatform; + + // Device-derived fields (platform, version, buildGuid, unityVersion) land with DeviceCollector. + Track("game_launch", properties.Count > 0 ? properties : null); } } } diff --git a/src/Packages/Audience/Runtime/Transport/DiskStore.cs b/src/Packages/Audience/Runtime/Transport/DiskStore.cs index e57e09137..7aefee056 100644 --- a/src/Packages/Audience/Runtime/Transport/DiskStore.cs +++ b/src/Packages/Audience/Runtime/Transport/DiskStore.cs @@ -1,5 +1,8 @@ +#nullable enable + using System; using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; using System.IO; using System.Linq; @@ -95,5 +98,91 @@ private static void TryDelete(string path) catch (IOException) { } catch (UnauthorizedAccessException) { } } + internal void DeleteAll() + { + string[] paths; + try { paths = Directory.GetFiles(_queueDir, "*.json"); } + catch (DirectoryNotFoundException) { return; } + + foreach (var path in paths) + TryDelete(path); + } + + // Drops queued identify/alias files, strips userId from track files. + // Unparseable files are deleted to fail closed. + internal void ApplyAnonymousDowngrade() + { + string[] paths; + try { paths = Directory.GetFiles(_queueDir, "*.json"); } + catch (DirectoryNotFoundException) { return; } + + foreach (var path in paths) + ApplyAnonymousDowngradeToFile(path); + } + + private void ApplyAnonymousDowngradeToFile(string path) + { + if (!TryReadMessage(path, out var msg) || + !msg.TryGetValue(MessageFields.Type, out var typeObj) || + !(typeObj is string type)) + { + TryDelete(path); + return; + } + + if (IsIdentityMessage(type)) + { + TryDelete(path); + return; + } + + if (type == MessageTypes.Track && msg.ContainsKey(MessageFields.UserId)) + RewriteTrackWithoutUserId(path, msg); + } + + private static bool IsIdentityMessage(string type) => + type == MessageTypes.Identify || type == MessageTypes.Alias; + + private static bool TryReadMessage(string path, [NotNullWhen(true)] out Dictionary? msg) + { + msg = null; + string json; + try { json = File.ReadAllText(path); } + catch (IOException) { return false; } + catch (UnauthorizedAccessException) { return false; } + + try { msg = JsonReader.DeserializeObject(json); } + catch (FormatException) { return false; } + + return true; + } + + private void RewriteTrackWithoutUserId(string path, Dictionary msg) + { + msg.Remove(MessageFields.UserId); + + try + { + var rewritten = Json.Serialize(msg); + var tmp = path + ".tmp"; + File.WriteAllText(tmp, rewritten); + try { File.Move(tmp, path); } + catch (IOException) + { + File.Delete(path); + File.Move(tmp, path); + } + } + catch (IOException) + { + // Delete rather than leave the old userId-bearing payload. + TryDelete(path); + } + catch (UnauthorizedAccessException) + { + TryDelete(path); + } + } + } } diff --git a/src/Packages/Audience/Runtime/Transport/EventQueue.cs b/src/Packages/Audience/Runtime/Transport/EventQueue.cs index 8f4ae023b..e81fcde43 100644 --- a/src/Packages/Audience/Runtime/Transport/EventQueue.cs +++ b/src/Packages/Audience/Runtime/Transport/EventQueue.cs @@ -1,5 +1,9 @@ +#nullable enable + using System; using System.Collections.Concurrent; +using System.Collections.Generic; +using System.IO; using System.Threading; namespace Immutable.Audience @@ -15,12 +19,17 @@ internal sealed class EventQueue : IDisposable private readonly int _flushIntervalMs; private readonly int _flushSize; - private readonly ConcurrentQueue _memory = new ConcurrentQueue(); + // Dictionaries rather than serialised strings: Json.Serialize runs on the drain thread. + private readonly ConcurrentQueue> _memory + = new ConcurrentQueue>(); private readonly CancellationTokenSource _cts = new CancellationTokenSource(); private readonly Thread _drainThread; private readonly ManualResetEventSlim _flushGate = new ManualResetEventSlim(false); - // Volatile so all threads see the shutdown signal immediately. + // Serialises drain vs PurgeAll / ApplyAnonymousDowngrade. Without it, a + // TryDequeue'd event could hit disk after DeleteAll already cleared it. + private readonly object _drainLock = new object(); + private volatile bool _disposed; // store: destination for drained events. @@ -40,18 +49,38 @@ internal EventQueue(DiskStore store, int flushIntervalSeconds, int flushSize) _drainThread.Start(); } - // Enqueues a JSON-serialised event. Lock-free; safe from any thread. - internal void Enqueue(string json) + // Enqueues a message dictionary. Lock-free; safe from any thread. + // The dictionary is not copied -- callers must not mutate it after + // enqueue. Serialisation happens on the drain thread so Track() stays + // allocation-light. + internal void Enqueue(Dictionary? msg) { - if (_disposed) return; + if (_disposed || msg == null) return; - _memory.Enqueue(json); + _memory.Enqueue(msg); // Signal the drain thread early if we've hit the flush-size threshold if (_memory.Count >= _flushSize) _flushGate.Set(); } + // Enqueues under _drainLock, re-checking stillAllowed inside the lock. + // Closes the window where a concurrent PurgeAll could complete between + // the caller's check and the enqueue, leaking the event past revocation. + internal void EnqueueChecked(Dictionary? msg, Func? stillAllowed) + { + if (_disposed || msg == null) return; + + lock (_drainLock) + { + if (stillAllowed != null && !stillAllowed()) return; + _memory.Enqueue(msg); + } + + if (_memory.Count >= _flushSize) + _flushGate.Set(); + } + // Drains the in-memory queue and persists all events to disk // immediately. Blocks until the drain is complete. internal void FlushSync() @@ -59,6 +88,40 @@ internal void FlushSync() DrainMemoryToDisk(); } + // Discards every pending event, in-memory and on disk. Used on + // consent revocation. + internal void PurgeAll() + { + // Hold _drainLock so the background drain can't sneak a TryDequeue'd + // event onto disk after our DeleteAll. See _drainLock declaration for + // the full race description. + lock (_drainLock) + { + while (_memory.TryDequeue(out _)) { } + _store.DeleteAll(); + } + } + + // Synchronous: a Task.Run offload would race HttpTransport, which + // does not take _drainLock, opening a window where userId-bearing + // track files could ship during the rewrite. + internal void ApplyAnonymousDowngrade() + { + lock (_drainLock) + { + // Drain any pending in-memory events first so they hit disk and + // get the same filtering as everything already persisted. + while (_memory.TryDequeue(out var msg)) + { + try { _store.Write(Json.Serialize(msg)); } + catch (IOException) { /* best-effort */ } + catch (UnauthorizedAccessException) { /* best-effort */ } + } + + _store.ApplyAnonymousDowngrade(); + } + } + // Flushes all pending events to disk and stops the drain thread. // Safe to call multiple times. internal void Shutdown() @@ -71,7 +134,7 @@ internal void Shutdown() // Signal the drain thread to exit, then wait for it. _cts.Cancel(); - _flushGate.Set(); + _flushGate.Set(); // Wake drain thread so it exits promptly _drainThread.Join(TimeSpan.FromSeconds(5)); // Final drain: anything enqueued before _disposed was set. @@ -99,15 +162,25 @@ private void DrainLoop() private void DrainMemoryToDisk() { - while (_memory.TryDequeue(out var json)) + // Take _drainLock so PurgeAll can't run between our TryDequeue and Write + // and leave the just-written event orphaned on disk after the wipe. + lock (_drainLock) { - try - { - _store.Write(json); - } - catch (Exception) + while (_memory.TryDequeue(out var msg)) { - // Best-effort: if we can't write, discard rather than block the drain + try + { + // Serialise on the drain thread, not on the caller thread — + // keeps Track() lock-free and allocation-light. + _store.Write(Json.Serialize(msg)); + } + catch (IOException) + { + // Best-effort: if we can't write, discard rather than block the drain + } + catch (UnauthorizedAccessException) + { + } } } } diff --git a/src/Packages/Audience/Tests/Runtime/ConsentSyncTests.cs b/src/Packages/Audience/Tests/Runtime/ConsentSyncTests.cs new file mode 100644 index 000000000..2bd7a8e22 --- /dev/null +++ b/src/Packages/Audience/Tests/Runtime/ConsentSyncTests.cs @@ -0,0 +1,149 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Net; +using System.Net.Http; +using System.Threading; +using System.Threading.Tasks; +using NUnit.Framework; + +namespace Immutable.Audience.Tests +{ + [TestFixture] + internal class ConsentSyncTests + { + private string _testDir; + + [SetUp] + public void SetUp() + { + _testDir = Path.Combine(Path.GetTempPath(), Path.GetRandomFileName()); + Directory.CreateDirectory(_testDir); + } + + [TearDown] + public void TearDown() + { + ImmutableAudience.ResetState(); + Identity.Reset(_testDir); + if (Directory.Exists(_testDir)) + Directory.Delete(_testDir, recursive: true); + } + + [Test] + public void SetConsent_FiresPut_WithExpectedBodyShape() + { + var handler = new CapturingHandler(); + var config = MakeConfig(handler, ConsentLevel.Anonymous); + ImmutableAudience.Init(config); + + ImmutableAudience.SetConsent(ConsentLevel.Full); + + var put = WaitForPut(handler); + var body = JsonReader.DeserializeObject(put.Body); + + Assert.AreEqual(Constants.ConsentUrl("pk_imapik-test-key1"), put.Url); + Assert.AreEqual("full", body["status"]); + Assert.AreEqual(Constants.ConsentSource, body["source"]); + Assert.IsTrue(body.ContainsKey("anonymousId")); + Assert.IsNotNull(body["anonymousId"], "upgrade PUT must carry the current anonymousId"); + } + + [Test] + public void SetConsent_None_PutCarriesOldAnonymousId_AfterReset() + { + // Regression guard: Identity.Reset runs before SyncConsentToBackend, + // so the PUT must have captured the anonymousId beforehand. + var handler = new CapturingHandler(); + var config = MakeConfig(handler, ConsentLevel.Anonymous); + ImmutableAudience.Init(config); + + var seeded = Identity.Get(_testDir); + Assert.IsNotNull(seeded, "Init under Anonymous should have minted an anonymousId"); + + ImmutableAudience.SetConsent(ConsentLevel.None); + + var put = WaitForPut(handler); + var body = JsonReader.DeserializeObject(put.Body); + + Assert.AreEqual("none", body["status"]); + Assert.AreEqual(seeded, body["anonymousId"], + "revocation PUT must carry the id that was revoked, not null"); + Assert.IsFalse(File.Exists(AudiencePaths.IdentityFile(_testDir)), + "precondition: Identity.Reset ran"); + } + + [Test] + public void SetConsent_PutFailure_InvokesOnErrorWithConsentSyncFailed() + { + var handler = new CapturingHandler { Status = HttpStatusCode.InternalServerError }; + var received = new ManualResetEventSlim(false); + AudienceError captured = null; + + var config = MakeConfig(handler, ConsentLevel.Anonymous); + config.OnError = err => + { + if (err.Code == AudienceErrorCode.ConsentSyncFailed) + { + captured = err; + received.Set(); + } + }; + ImmutableAudience.Init(config); + + ImmutableAudience.SetConsent(ConsentLevel.Full); + + Assert.IsTrue(received.Wait(TimeSpan.FromSeconds(5)), + "OnError(ConsentSyncFailed) should fire on non-2xx"); + StringAssert.Contains("500", captured.Message); + } + + private AudienceConfig MakeConfig(CapturingHandler handler, ConsentLevel consent) => + new AudienceConfig + { + PublishableKey = "pk_imapik-test-key1", + Consent = consent, + PersistentDataPath = _testDir, + FlushIntervalSeconds = 600, + FlushSize = 1000, + HttpHandler = handler, + }; + + private static CapturedRequest WaitForPut(CapturingHandler handler) + { + Assert.IsTrue(handler.PutReceived.Wait(TimeSpan.FromSeconds(5)), + "consent PUT never fired"); + return handler.LastPut; + } + + private class CapturedRequest + { + internal string Url; + internal string Body; + } + + private class CapturingHandler : HttpMessageHandler + { + internal readonly ManualResetEventSlim PutReceived = new ManualResetEventSlim(false); + internal CapturedRequest LastPut; + internal HttpStatusCode Status { get; set; } = HttpStatusCode.NoContent; + + protected override async Task SendAsync( + HttpRequestMessage request, CancellationToken ct) + { + if (request.Method == HttpMethod.Put) + { + LastPut = new CapturedRequest + { + Url = request.RequestUri!.ToString(), + Body = request.Content != null + ? await request.Content.ReadAsStringAsync().ConfigureAwait(false) + : null, + }; + PutReceived.Set(); + } + return new HttpResponseMessage(Status); + } + } + } +} diff --git a/src/Packages/Audience/Tests/Runtime/DeleteDataTests.cs b/src/Packages/Audience/Tests/Runtime/DeleteDataTests.cs new file mode 100644 index 000000000..e874512b0 --- /dev/null +++ b/src/Packages/Audience/Tests/Runtime/DeleteDataTests.cs @@ -0,0 +1,170 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Net; +using System.Net.Http; +using System.Threading; +using System.Threading.Tasks; +using NUnit.Framework; + +namespace Immutable.Audience.Tests +{ + [TestFixture] + internal class DeleteDataTests + { + private string _testDir; + + [SetUp] + public void SetUp() + { + _testDir = Path.Combine(Path.GetTempPath(), Path.GetRandomFileName()); + Directory.CreateDirectory(_testDir); + } + + [TearDown] + public void TearDown() + { + ImmutableAudience.ResetState(); + Identity.Reset(_testDir); + if (Directory.Exists(_testDir)) + Directory.Delete(_testDir, recursive: true); + } + + private AudienceConfig MakeConfig(CapturingHandler handler, ConsentLevel consent = ConsentLevel.Full) + { + return new AudienceConfig + { + PublishableKey = "pk_imapik-test-key1", + Consent = consent, + PersistentDataPath = _testDir, + FlushIntervalSeconds = 600, + FlushSize = 1000, + HttpHandler = handler + }; + } + + /// + /// Records every request and returns a caller-configurable status. + /// Signals when a request lands so tests can await the async Task.Run path. + /// + private class CapturingHandler : HttpMessageHandler + { + internal readonly List Requests = new List(); + internal readonly ManualResetEventSlim RequestSent = new ManualResetEventSlim(false); + internal HttpStatusCode Status { get; set; } = HttpStatusCode.Accepted; + + protected override Task SendAsync(HttpRequestMessage request, CancellationToken ct) + { + Requests.Add(request); + RequestSent.Set(); + return Task.FromResult(new HttpResponseMessage(Status)); + } + } + + private static void WaitForRequest(CapturingHandler handler) + { + Assert.IsTrue(handler.RequestSent.Wait(TimeSpan.FromSeconds(5)), + "DeleteData's background HTTP call never fired"); + } + + [Test] + public void DeleteData_WithUserId_FiresDelete_WithUserIdQuery() + { + var handler = new CapturingHandler(); + ImmutableAudience.Init(MakeConfig(handler)); + + ImmutableAudience.DeleteData(userId: "player-42"); + WaitForRequest(handler); + + // Filter out the game_launch POST from Init. + HttpRequestMessage deleteRequest = null; + foreach (var r in handler.Requests) + if (r.Method == HttpMethod.Delete) { deleteRequest = r; break; } + + Assert.IsNotNull(deleteRequest, "expected a DELETE request"); + StringAssert.Contains(Constants.DataPath, deleteRequest.RequestUri!.ToString()); + StringAssert.Contains("userId=player-42", deleteRequest.RequestUri.Query); + Assert.IsTrue(deleteRequest.Headers.Contains("x-immutable-publishable-key"), + "publishable key header must be attached"); + } + + [Test] + public void DeleteData_NoUserId_WithExistingAnonymousId_FiresDelete_WithAnonymousIdQuery() + { + // Seed an anonymousId as if the player had tracked in a prior session. + var seeded = Identity.GetOrCreate(_testDir, ConsentLevel.Anonymous); + Assert.IsNotNull(seeded); + + var handler = new CapturingHandler(); + ImmutableAudience.Init(MakeConfig(handler)); + + ImmutableAudience.DeleteData(); + WaitForRequest(handler); + + HttpRequestMessage deleteRequest = null; + foreach (var r in handler.Requests) + if (r.Method == HttpMethod.Delete) { deleteRequest = r; break; } + + Assert.IsNotNull(deleteRequest); + StringAssert.Contains($"anonymousId={seeded}", deleteRequest.RequestUri!.Query); + } + + [Test] + public void DeleteData_NoUserId_NoAnonymousId_DoesNotFireRequest() + { + // Use Consent.None so Init's game_launch is suppressed — the only way + // to guarantee no HTTP request fires at all. + var handler = new CapturingHandler(); + ImmutableAudience.Init(MakeConfig(handler, ConsentLevel.None)); + + Assert.IsFalse(handler.RequestSent.IsSet, + "precondition: no request yet"); + + ImmutableAudience.DeleteData(); + + // Give any errant background task a moment to fire. + Thread.Sleep(250); + + Assert.IsFalse(handler.RequestSent.IsSet, + "no anonymousId and no userId must mean no request"); + } + + [Test] + public void DeleteData_DoesNotCreateAnonymousIdFile() + { + var handler = new CapturingHandler(); + ImmutableAudience.Init(MakeConfig(handler, ConsentLevel.None)); + + ImmutableAudience.DeleteData(userId: "some-user"); + // Even with a userId request, the anonymousId file must not materialise. + Thread.Sleep(250); + + var identityPath = AudiencePaths.IdentityFile(_testDir); + Assert.IsFalse(File.Exists(identityPath), + "DeleteData must not create the anonymousId file as a side effect"); + } + + [Test] + public void DeleteData_ServerError_InvokesOnError() + { + var handler = new CapturingHandler { Status = HttpStatusCode.InternalServerError }; + var received = new ManualResetEventSlim(false); + AudienceError captured = null; + + var config = MakeConfig(handler); + config.OnError = err => + { + captured = err; + received.Set(); + }; + ImmutableAudience.Init(config); + + ImmutableAudience.DeleteData(userId: "player-42"); + + Assert.IsTrue(received.Wait(TimeSpan.FromSeconds(5)), + "OnError should fire when DeleteData's response is non-2xx"); + Assert.AreEqual(AudienceErrorCode.NetworkError, captured.Code); + StringAssert.Contains("500", captured.Message); + } + } +} diff --git a/src/Packages/Audience/Tests/Runtime/ImmutableAudienceTests.cs b/src/Packages/Audience/Tests/Runtime/ImmutableAudienceTests.cs new file mode 100644 index 000000000..49250a8fc --- /dev/null +++ b/src/Packages/Audience/Tests/Runtime/ImmutableAudienceTests.cs @@ -0,0 +1,905 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Net; +using System.Net.Http; +using System.Threading; +using System.Threading.Tasks; +using NUnit.Framework; + +namespace Immutable.Audience.Tests +{ + [TestFixture] + internal class ImmutableAudienceTests + { + private string _testDir; + + [SetUp] + public void SetUp() + { + _testDir = Path.Combine(Path.GetTempPath(), Path.GetRandomFileName()); + Directory.CreateDirectory(_testDir); + } + + [TearDown] + public void TearDown() + { + ImmutableAudience.ResetState(); + ImmutableAudience.DefaultPersistentDataPathProvider = null; + Identity.Reset(_testDir); + if (Directory.Exists(_testDir)) + Directory.Delete(_testDir, recursive: true); + } + + private AudienceConfig MakeConfig(ConsentLevel consent = ConsentLevel.Anonymous) + { + return new AudienceConfig + { + PublishableKey = "pk_imapik-test-key1", + Consent = consent, + PersistentDataPath = _testDir, + FlushIntervalSeconds = 600, // large — we flush manually in tests + FlushSize = 1000, + HttpHandler = new KeepOnDiskHandler() + }; + } + + /// + /// Returns 503 so the transport keeps files on disk for inspection. + /// Tests verify queuing behavior, not sending behavior. + /// + private class KeepOnDiskHandler : HttpMessageHandler + { + protected override Task SendAsync(HttpRequestMessage request, CancellationToken ct) + { + return Task.FromResult(new HttpResponseMessage(HttpStatusCode.ServiceUnavailable)); + } + } + + // ----------------------------------------------------------------- + // Init + // ----------------------------------------------------------------- + + [Test] + public void Init_NullConfig_Throws() + { + Assert.Throws(() => ImmutableAudience.Init(null)); + } + + [Test] + public void Init_MissingPublishableKey_Throws() + { + var config = MakeConfig(); + config.PublishableKey = null; + Assert.Throws(() => ImmutableAudience.Init(config)); + } + + [Test] + public void Init_MissingPersistentDataPath_Throws() + { + var config = MakeConfig(); + config.PersistentDataPath = null; + Assert.Throws(() => ImmutableAudience.Init(config)); + } + + [Test] + public void Init_CalledTwice_IgnoresSecondCall() + { + ImmutableAudience.Init(MakeConfig()); + Assert.DoesNotThrow(() => ImmutableAudience.Init(MakeConfig())); + } + + [Test] + public void Track_NullEvent_DoesNotThrow_AndLogsWarning() + { + ImmutableAudience.Init(MakeConfig()); + + var lines = new List(); + Log.Writer = lines.Add; + try + { + Assert.DoesNotThrow(() => ImmutableAudience.Track((IEvent)null)); + Assert.That(lines, Has.Some.Contains("null event")); + } + finally { Log.Writer = null; } + } + + [Test] + public void Track_NullOrEmptyEventName_DoesNotEnqueue() + { + ImmutableAudience.Init(MakeConfig()); + + var beforeQueue = AudiencePaths.QueueDir(_testDir); + var beforeCount = Directory.Exists(beforeQueue) ? Directory.GetFiles(beforeQueue, "*.json").Length : 0; + + Assert.DoesNotThrow(() => ImmutableAudience.Track((string)null)); + Assert.DoesNotThrow(() => ImmutableAudience.Track("")); + + ImmutableAudience.Shutdown(); + var afterCount = Directory.GetFiles(beforeQueue, "*.json").Length; + // Only game_launch should have been enqueued; null/empty Track calls dropped. + Assert.AreEqual(beforeCount + 1, afterCount, "null/empty event names must be dropped, not enqueued"); + } + + [Test] + public void Identify_NullUserId_DoesNotEnqueue() + { + ImmutableAudience.Init(MakeConfig(ConsentLevel.Full)); + + Assert.DoesNotThrow(() => ImmutableAudience.Identify(null, IdentityType.Passport)); + Assert.DoesNotThrow(() => ImmutableAudience.Identify("", IdentityType.Passport)); + } + + [Test] + public void Identify_InvalidIdentityTypeCast_DoesNotThrow_AndDropsEvent() + { + ImmutableAudience.Init(MakeConfig(ConsentLevel.Full)); + + var invalid = (IdentityType)999; + + Assert.DoesNotThrow(() => ImmutableAudience.Identify("user1", invalid)); + + ImmutableAudience.Shutdown(); + var queueDir = AudiencePaths.QueueDir(_testDir); + var contents = Directory.GetFiles(queueDir, "*.json").Select(File.ReadAllText); + Assert.IsFalse(contents.Any(c => c.Contains("\"identify\"")), + "invalid enum cast must drop the identify event, not enqueue it"); + } + + [Test] + public void Alias_InvalidIdentityTypeCast_DoesNotThrow_AndDropsEvent() + { + ImmutableAudience.Init(MakeConfig(ConsentLevel.Full)); + + var invalid = (IdentityType)999; + + Assert.DoesNotThrow(() => + ImmutableAudience.Alias("fromId", invalid, "toId", IdentityType.Steam)); + + ImmutableAudience.Shutdown(); + var queueDir = AudiencePaths.QueueDir(_testDir); + var contents = Directory.GetFiles(queueDir, "*.json").Select(File.ReadAllText); + Assert.IsFalse(contents.Any(c => c.Contains("\"alias\"")), + "invalid enum cast must drop the alias event, not enqueue it"); + } + + [Test] + public void Alias_NullIds_DoesNotEnqueue() + { + ImmutableAudience.Init(MakeConfig(ConsentLevel.Full)); + + Assert.DoesNotThrow(() => ImmutableAudience.Alias(null, IdentityType.Passport, "to", IdentityType.Steam)); + Assert.DoesNotThrow(() => ImmutableAudience.Alias("from", IdentityType.Passport, "", IdentityType.Steam)); + } + + [Test] + public void Init_CalledTwice_LogsWarning() + { + var lines = new List(); + Log.Writer = lines.Add; + try + { + ImmutableAudience.Init(MakeConfig()); + ImmutableAudience.Init(MakeConfig()); + + Assert.That(lines, Has.Some.Contains("Init called more than once"), + "second Init must surface a warning so a developer notices the silent no-op"); + } + finally + { + Log.Writer = null; + } + } + + [Test] + public void Init_ConcurrentCalls_OnlyOneSucceeds_OthersWarn() + { + // Spin up N threads that all race to call Init. With the lock in place, + // exactly one initialises; the rest hit the duplicate-call warning branch. + // Without the lock, all of them would pass the _initialized check and + // double-allocate Timer + HttpClient + EventQueue, leaking the first set. + const int threadCount = 16; + var lines = new System.Collections.Concurrent.ConcurrentBag(); + Log.Writer = msg => lines.Add(msg); + + try + { + var barrier = new System.Threading.Barrier(threadCount); + var threads = new Thread[threadCount]; + + for (int i = 0; i < threadCount; i++) + { + threads[i] = new Thread(() => + { + barrier.SignalAndWait(); + ImmutableAudience.Init(MakeConfig()); + }); + threads[i].Start(); + } + + foreach (var t in threads) t.Join(TimeSpan.FromSeconds(5)); + + var warningCount = lines.Count(l => l.Contains("Init called more than once")); + Assert.AreEqual(threadCount - 1, warningCount, + "exactly one thread should initialise; the other (threadCount - 1) should hit the duplicate-call warning branch"); + } + finally + { + Log.Writer = null; + } + } + + // ----------------------------------------------------------------- + // Track — custom events + // ----------------------------------------------------------------- + + [Test] + public void Track_CustomEvent_WritesEventToDisk() + { + ImmutableAudience.Init(MakeConfig()); + + ImmutableAudience.Track("crafting_started", new Dictionary + { + { "recipe_id", "iron_sword" } + }); + + // Flush memory → disk + ImmutableAudience.Shutdown(); + + var queueDir = AudiencePaths.QueueDir(_testDir); + var files = Directory.GetFiles(queueDir, "*.json"); + // game_launch + crafting_started + Assert.GreaterOrEqual(files.Length, 2); + + var contents = files.Select(File.ReadAllText).ToList(); + Assert.IsTrue(contents.Any(c => c.Contains("\"crafting_started\"")), + "should contain the custom event"); + } + + [Test] + public void Track_NoProperties_WritesEvent() + { + ImmutableAudience.Init(MakeConfig()); + + ImmutableAudience.Track("main_menu_opened"); + ImmutableAudience.Shutdown(); + + var queueDir = AudiencePaths.QueueDir(_testDir); + var contents = Directory.GetFiles(queueDir, "*.json") + .Select(File.ReadAllText).ToList(); + Assert.IsTrue(contents.Any(c => c.Contains("\"main_menu_opened\""))); + } + + [Test] + public void Track_ConsentNone_DoesNotEnqueue() + { + ImmutableAudience.Init(MakeConfig(ConsentLevel.None)); + + ImmutableAudience.Track("should_not_appear"); + ImmutableAudience.Shutdown(); + + var queueDir = AudiencePaths.QueueDir(_testDir); + if (!Directory.Exists(queueDir)) + { + Assert.Pass("queue directory not created — no events"); + return; + } + + Assert.AreEqual(0, Directory.GetFiles(queueDir, "*.json").Length); + } + + // ----------------------------------------------------------------- + // Track — typed events + // ----------------------------------------------------------------- + + private class NullNameEvent : IEvent + { + public string EventName => null; + public Dictionary ToProperties() => new Dictionary(); + } + + private class EmptyNameEvent : IEvent + { + public string EventName => ""; + public Dictionary ToProperties() => new Dictionary(); + } + + [Test] + public void Track_TypedEvent_NullEventName_IsDropped() + { + ImmutableAudience.Init(MakeConfig()); + + // Sanity: game_launch is already on disk; drain it first so the + // assertion counts only our test event. + ImmutableAudience.FlushQueueToDiskForTesting(); + var queueDir = AudiencePaths.QueueDir(_testDir); + foreach (var f in Directory.GetFiles(queueDir, "*.json")) File.Delete(f); + + Assert.DoesNotThrow(() => ImmutableAudience.Track(new NullNameEvent())); + Assert.DoesNotThrow(() => ImmutableAudience.Track(new EmptyNameEvent())); + + ImmutableAudience.FlushQueueToDiskForTesting(); + Assert.AreEqual(0, Directory.GetFiles(queueDir, "*.json").Length, + "IEvent with null/empty EventName must be dropped, not enqueued"); + } + + [Test] + public void Track_TypedProgression_WritesCorrectEventName() + { + ImmutableAudience.Init(MakeConfig()); + + ImmutableAudience.Track(new Progression + { + Status = ProgressionStatus.Complete, + World = "tutorial", + Level = "1" + }); + ImmutableAudience.Shutdown(); + + var queueDir = AudiencePaths.QueueDir(_testDir); + var contents = Directory.GetFiles(queueDir, "*.json") + .Select(File.ReadAllText).ToList(); + Assert.IsTrue(contents.Any(c => + c.Contains("\"progression\"") && c.Contains("\"complete\""))); + } + + [Test] + public void Track_TypedPurchase_WritesCorrectEventName() + { + ImmutableAudience.Init(MakeConfig()); + + ImmutableAudience.Track(new Purchase + { + Currency = "USD", + Value = 9.99m + }); + ImmutableAudience.Shutdown(); + + var queueDir = AudiencePaths.QueueDir(_testDir); + var contents = Directory.GetFiles(queueDir, "*.json") + .Select(File.ReadAllText).ToList(); + Assert.IsTrue(contents.Any(c => c.Contains("\"purchase\""))); + } + + // ----------------------------------------------------------------- + // Identity + // ----------------------------------------------------------------- + + [Test] + public void Identify_FullConsent_WritesIdentifyEvent() + { + ImmutableAudience.Init(MakeConfig(ConsentLevel.Full)); + + ImmutableAudience.Identify("76561198012345", "steam"); + ImmutableAudience.Shutdown(); + + var queueDir = AudiencePaths.QueueDir(_testDir); + var contents = Directory.GetFiles(queueDir, "*.json") + .Select(File.ReadAllText).ToList(); + Assert.IsTrue(contents.Any(c => + c.Contains("\"identify\"") && c.Contains("\"76561198012345\""))); + } + + [Test] + public void Identify_AnonymousConsent_IsIgnored() + { + ImmutableAudience.Init(MakeConfig(ConsentLevel.Anonymous)); + + ImmutableAudience.Identify("user1", "steam"); + ImmutableAudience.Shutdown(); + + var queueDir = AudiencePaths.QueueDir(_testDir); + var contents = Directory.GetFiles(queueDir, "*.json") + .Select(File.ReadAllText).ToList(); + Assert.IsFalse(contents.Any(c => c.Contains("\"identify\"")), + "identify should be discarded at Anonymous consent"); + } + + [Test] + public void IdentifyTraits_FullConsent_WritesIdentifyWithTraitsAndNoUserIdField() + { + ImmutableAudience.Init(MakeConfig(ConsentLevel.Full)); + + ImmutableAudience.Identify(new Dictionary { ["plan"] = "pro" }); + ImmutableAudience.Shutdown(); + + var queueDir = AudiencePaths.QueueDir(_testDir); + var contents = Directory.GetFiles(queueDir, "*.json") + .Select(File.ReadAllText).ToList(); + var identifyMsg = contents.FirstOrDefault(c => c.Contains("\"identify\"")); + Assert.IsNotNull(identifyMsg, "traits-only identify should enqueue an identify event"); + Assert.IsTrue(identifyMsg.Contains("\"plan\"") && identifyMsg.Contains("\"pro\""), + "traits payload should be present"); + Assert.IsFalse(identifyMsg.Contains("\"userId\""), + "traits-only identify must not attach a userId field"); + Assert.IsFalse(identifyMsg.Contains("\"identityType\""), + "traits-only identify must not attach an identityType field"); + } + + [Test] + public void IdentifyTraits_AnonymousConsent_IsIgnored() + { + ImmutableAudience.Init(MakeConfig(ConsentLevel.Anonymous)); + + ImmutableAudience.Identify(new Dictionary { ["plan"] = "pro" }); + ImmutableAudience.Shutdown(); + + var queueDir = AudiencePaths.QueueDir(_testDir); + var contents = Directory.GetFiles(queueDir, "*.json") + .Select(File.ReadAllText).ToList(); + Assert.IsFalse(contents.Any(c => c.Contains("\"identify\"")), + "Identify(traits) should be discarded at Anonymous consent"); + } + + [Test] + public void IdentifyTraits_NoneConsent_IsIgnored() + { + ImmutableAudience.Init(MakeConfig(ConsentLevel.None)); + + ImmutableAudience.Identify(new Dictionary { ["plan"] = "pro" }); + ImmutableAudience.Shutdown(); + + var queueDir = AudiencePaths.QueueDir(_testDir); + var files = Directory.Exists(queueDir) + ? Directory.GetFiles(queueDir, "*.json") + : Array.Empty(); + Assert.IsFalse(files.Select(File.ReadAllText).Any(c => c.Contains("\"identify\"")), + "Identify(traits) should be discarded at None consent"); + } + + [Test] + public void IdentifyTraits_NotInitialised_IsIgnored() + { + Assert.DoesNotThrow(() => ImmutableAudience.Identify(new Dictionary())); + } + + [Test] + public void IdentifyTraits_NullTraits_DropsAndWarns() + { + ImmutableAudience.Init(MakeConfig(ConsentLevel.Full)); + + var lines = new List(); + Log.Writer = lines.Add; + try + { + Assert.DoesNotThrow(() => ImmutableAudience.Identify((Dictionary)null)); + Assert.That(lines, Has.Some.Contains("null traits")); + } + finally { Log.Writer = null; } + + ImmutableAudience.Shutdown(); + var queueDir = AudiencePaths.QueueDir(_testDir); + var contents = Directory.GetFiles(queueDir, "*.json") + .Select(File.ReadAllText).ToList(); + Assert.IsFalse(contents.Any(c => c.Contains("\"identify\"")), + "null traits must not produce an identify event"); + } + + [Test] + public void IdentifyTraits_DoesNotOverwritePriorUserId() + { + ImmutableAudience.Init(MakeConfig(ConsentLevel.Full)); + + ImmutableAudience.Identify("user-123", IdentityType.Passport); + ImmutableAudience.Identify(new Dictionary { ["plan"] = "pro" }); + ImmutableAudience.Track("after_traits_identify"); + ImmutableAudience.Shutdown(); + + var queueDir = AudiencePaths.QueueDir(_testDir); + var trackMsg = Directory.GetFiles(queueDir, "*.json") + .Select(File.ReadAllText) + .FirstOrDefault(c => c.Contains("\"after_traits_identify\"")); + Assert.IsNotNull(trackMsg, "track event should be enqueued"); + Assert.IsTrue(trackMsg.Contains("\"user-123\""), + "Track after traits-only Identify must still carry the prior userId"); + } + + [Test] + public void Alias_FullConsent_WritesAliasEvent() + { + ImmutableAudience.Init(MakeConfig(ConsentLevel.Full)); + + ImmutableAudience.Alias("steam123", "steam", "user_456", "passport"); + ImmutableAudience.Shutdown(); + + var queueDir = AudiencePaths.QueueDir(_testDir); + var contents = Directory.GetFiles(queueDir, "*.json") + .Select(File.ReadAllText).ToList(); + Assert.IsTrue(contents.Any(c => + c.Contains("\"alias\"") && c.Contains("\"steam123\""))); + } + + // ----------------------------------------------------------------- + // Reset + // ----------------------------------------------------------------- + + [Test] + public void Reset_GeneratesNewAnonymousId() + { + ImmutableAudience.Init(MakeConfig()); + + ImmutableAudience.Track("before_reset"); + var id1 = Identity.GetOrCreate(_testDir, ConsentLevel.Anonymous); + + ImmutableAudience.Reset(); + + ImmutableAudience.Track("after_reset"); + var id2 = Identity.GetOrCreate(_testDir, ConsentLevel.Anonymous); + + Assert.AreNotEqual(id1, id2, "Reset should generate a new anonymousId"); + } + + [Test] + public void Reset_DiscardsQueuedEventsOnDisk() + { + ImmutableAudience.Init(MakeConfig()); + + ImmutableAudience.Track("before_reset"); + ImmutableAudience.FlushQueueToDiskForTesting(); + + var queueDir = AudiencePaths.QueueDir(_testDir); + Assert.Greater(Directory.GetFiles(queueDir, "*.json").Length, 0, + "precondition: queued event should be on disk before reset"); + + ImmutableAudience.Reset(); + + Assert.AreEqual(0, Directory.GetFiles(queueDir, "*.json").Length, + "Reset must discard queued events on disk to match the Web SDK"); + } + + // ----------------------------------------------------------------- + // SetConsent — purge + persistence + // ----------------------------------------------------------------- + + [Test] + public void SetConsent_DowngradeToNone_PurgesQueueOnDiskAndInMemory() + { + ImmutableAudience.Init(MakeConfig()); + + ImmutableAudience.Track("event_under_old_consent"); + + var queueDir = AudiencePaths.QueueDir(_testDir); + // Force memory → disk so we can verify the purge wipes both layers. + ImmutableAudience.FlushQueueToDiskForTesting(); + Assert.Greater(Directory.GetFiles(queueDir, "*.json").Length, 0, + "precondition: events queued before downgrade exist on disk"); + + ImmutableAudience.SetConsent(ConsentLevel.None); + + Assert.AreEqual(0, Directory.GetFiles(queueDir, "*.json").Length, + "downgrade to None must purge queued events from disk so they can't leak after revocation"); + } + + [Test] + public void SetConsent_DowngradeToNone_DropsInFlightTrack_ThatRacesThePurge() + { + // Reproduces the window where a Track call observed consent=Anonymous, + // built its message, and is about to enqueue — while a concurrent + // SetConsent(None) sets consent and purges. Without the re-check inside + // the drain lock, the enqueue lands after the purge and the event leaks + // to disk past revocation. + ImmutableAudience.Init(MakeConfig(ConsentLevel.Anonymous)); + + // Drain the game_launch that Init auto-fires so the assertion below is + // about our race event only. + ImmutableAudience.FlushQueueToDiskForTesting(); + var queueDir = AudiencePaths.QueueDir(_testDir); + foreach (var f in Directory.GetFiles(queueDir, "*.json")) File.Delete(f); + + // Gate the Track thread so it's poised to enqueue at the moment SetConsent + // completes its purge. We approximate the race by kicking Track off a + // threadpool thread and racing SetConsent after a tiny stagger — if the + // re-check is missing, this leaks deterministically under contention over + // repeated runs. + var trackStarted = new ManualResetEventSlim(false); + var trackTask = Task.Run(() => + { + trackStarted.Set(); + ImmutableAudience.Track("racing_event"); + }); + + trackStarted.Wait(); + ImmutableAudience.SetConsent(ConsentLevel.None); + trackTask.Wait(TimeSpan.FromSeconds(5)); + + // Flush any residue that a faulty Enqueue may have pushed to memory. + ImmutableAudience.FlushQueueToDiskForTesting(); + + var leaked = Directory.Exists(queueDir) + ? Directory.GetFiles(queueDir, "*.json").Select(File.ReadAllText) + .Count(c => c.Contains("\"racing_event\"")) + : 0; + + Assert.AreEqual(0, leaked, + "Track that raced SetConsent(None) must not leak past the purge"); + } + + [Test] + public void SetConsent_DowngradeToNone_StressTest_NoLeak() + { + // The single-shot race test above can pass trivially if Track finishes + // before SetConsent starts on a fast machine. This stress variant runs + // the race many times with many concurrent Track threads so at least + // some iterations are guaranteed to land the enqueue inside the + // _consent=None/PurgeAll window. + // + // Without the EnqueueChecked re-check, this test leaks events + // reproducibly. With the fix, zero leaks across all iterations. + const int iterations = 200; + const int trackersPerIteration = 4; + + for (int iter = 0; iter < iterations; iter++) + { + ImmutableAudience.Init(MakeConfig(ConsentLevel.Anonymous)); + + // Clear game_launch so only race events can leak. + ImmutableAudience.FlushQueueToDiskForTesting(); + var queueDir = AudiencePaths.QueueDir(_testDir); + if (Directory.Exists(queueDir)) + foreach (var f in Directory.GetFiles(queueDir, "*.json")) File.Delete(f); + + // All trackers spin up and block on the barrier so they all release + // simultaneously. The main thread joins the barrier too and fires + // SetConsent immediately after release — maximising contention. + var barrier = new Barrier(trackersPerIteration + 1); + var trackers = new Task[trackersPerIteration]; + for (int t = 0; t < trackersPerIteration; t++) + { + trackers[t] = Task.Run(() => + { + barrier.SignalAndWait(); + ImmutableAudience.Track("race_stress"); + }); + } + + barrier.SignalAndWait(); + ImmutableAudience.SetConsent(ConsentLevel.None); + Task.WaitAll(trackers, TimeSpan.FromSeconds(5)); + + // Anything the drain loop hasn't picked up yet → force it. + ImmutableAudience.FlushQueueToDiskForTesting(); + + int leaked = 0; + if (Directory.Exists(queueDir)) + { + leaked = Directory.GetFiles(queueDir, "*.json") + .Select(File.ReadAllText) + .Count(c => c.Contains("\"race_stress\"")); + } + + if (leaked > 0) + { + Assert.Fail( + $"iteration {iter}: {leaked} race_stress events leaked past SetConsent(None)"); + } + + ImmutableAudience.ResetState(); + // Clean state for next iteration so consent isn't carried via disk. + if (Directory.Exists(AudiencePaths.AudienceDir(_testDir))) + Directory.Delete(AudiencePaths.AudienceDir(_testDir), recursive: true); + } + } + + [Test] + public void ResetState_ClearsIdentityCache_AcrossInitWithDifferentPath() + { + // First init: mints and caches an anonymousId under _testDir. + ImmutableAudience.Init(MakeConfig(ConsentLevel.Anonymous)); + var firstId = Identity.GetOrCreate(_testDir, ConsentLevel.Anonymous); + ImmutableAudience.Shutdown(); + + // Second init with a different persistentDataPath. If Identity's + // static cache survives Shutdown, GetOrCreate returns firstId + // even though the new path has its own (yet-to-be-written) file. + var otherDir = Path.Combine(Path.GetTempPath(), Path.GetRandomFileName()); + Directory.CreateDirectory(otherDir); + try + { + var config2 = MakeConfig(ConsentLevel.Anonymous); + config2.PersistentDataPath = otherDir; + ImmutableAudience.Init(config2); + + var secondId = Identity.GetOrCreate(otherDir, ConsentLevel.Anonymous); + Assert.AreNotEqual(firstId, secondId, + "ResetState must drop Identity's in-memory cache so the new path mints its own id"); + } + finally + { + Identity.Reset(otherDir); + if (Directory.Exists(otherDir)) + Directory.Delete(otherDir, recursive: true); + } + } + + [Test] + public void SetConsent_PersistsAcrossInit() + { + ImmutableAudience.Init(MakeConfig(ConsentLevel.Anonymous)); + ImmutableAudience.SetConsent(ConsentLevel.Full); + ImmutableAudience.Shutdown(); + + // Re-init with the *original* (Anonymous) config — persisted Full should win. + ImmutableAudience.Init(MakeConfig(ConsentLevel.Anonymous)); + + Assert.AreEqual(ConsentLevel.Full, ImmutableAudience.CurrentConsentForTesting, + "persisted consent must override the config default after restart"); + } + + // ----------------------------------------------------------------- + // game_launch auto-fire + // ----------------------------------------------------------------- + + [Test] + public void Init_FiresGameLaunch_Automatically() + { + ImmutableAudience.Init(MakeConfig()); + ImmutableAudience.Shutdown(); + + var queueDir = AudiencePaths.QueueDir(_testDir); + var contents = Directory.GetFiles(queueDir, "*.json") + .Select(File.ReadAllText).ToList(); + Assert.IsTrue(contents.Any(c => c.Contains("\"game_launch\"")), + "Init should auto-fire game_launch"); + } + + [Test] + public void Init_GameLaunch_IncludesDistributionPlatform() + { + var config = MakeConfig(); + config.DistributionPlatform = DistributionPlatforms.Steam; + ImmutableAudience.Init(config); + ImmutableAudience.Shutdown(); + + var queueDir = AudiencePaths.QueueDir(_testDir); + var contents = Directory.GetFiles(queueDir, "*.json") + .Select(File.ReadAllText).ToList(); + Assert.IsTrue(contents.Any(c => + c.Contains("\"game_launch\"") && c.Contains("\"steam\""))); + } + + [Test] + public void Init_ConsentNone_DoesNotFireGameLaunch() + { + ImmutableAudience.Init(MakeConfig(ConsentLevel.None)); + ImmutableAudience.Shutdown(); + + var queueDir = AudiencePaths.QueueDir(_testDir); + if (!Directory.Exists(queueDir)) + { + Assert.Pass(); + return; + } + + var contents = Directory.GetFiles(queueDir, "*.json") + .Select(File.ReadAllText).ToList(); + Assert.IsFalse(contents.Any(c => c.Contains("\"game_launch\""))); + } + + // ----------------------------------------------------------------- + // Shutdown + // ----------------------------------------------------------------- + + [Test] + public void Shutdown_CalledTwice_DoesNotThrow() + { + ImmutableAudience.Init(MakeConfig()); + ImmutableAudience.Shutdown(); + Assert.DoesNotThrow(() => ImmutableAudience.Shutdown()); + } + + [Test] + public void Track_AfterShutdown_IsIgnored() + { + ImmutableAudience.Init(MakeConfig()); + ImmutableAudience.Shutdown(); + + Assert.DoesNotThrow(() => ImmutableAudience.Track("should_not_crash")); + } + + // ----------------------------------------------------------------- + // Full -> Anonymous consent downgrade + // ----------------------------------------------------------------- + + [Test] + public void FullToAnonymous_StripsUserIdFromQueuedTrackAndDropsIdentifyAlias() + { + ImmutableAudience.Init(MakeConfig(ConsentLevel.Full)); + + ImmutableAudience.Identify("player_steam", IdentityType.Steam); + ImmutableAudience.Alias("player_steam", IdentityType.Steam, "player_passport", IdentityType.Passport); + ImmutableAudience.Track("tracked_before_downgrade"); + + ImmutableAudience.FlushQueueToDiskForTesting(); + + ImmutableAudience.SetConsent(ConsentLevel.Anonymous); + + var queueDir = AudiencePaths.QueueDir(_testDir); + var files = Directory.GetFiles(queueDir, "*.json"); + + foreach (var f in files) + { + var msg = JsonReader.DeserializeObject(File.ReadAllText(f)); + var type = (string)msg["type"]; + Assert.AreNotEqual("identify", type, "identify must be purged on Full -> Anonymous"); + Assert.AreNotEqual("alias", type, "alias must be purged on Full -> Anonymous"); + if (type == "track") + Assert.IsFalse(msg.ContainsKey("userId"), "userId must be stripped from queued track on Full -> Anonymous"); + } + } + + [Test] + public void FullToAnonymous_FutureTracksOmitUserId() + { + ImmutableAudience.Init(MakeConfig(ConsentLevel.Full)); + ImmutableAudience.Identify("player_steam", IdentityType.Steam); + ImmutableAudience.SetConsent(ConsentLevel.Anonymous); + + ImmutableAudience.Track("tracked_after_downgrade"); + ImmutableAudience.FlushQueueToDiskForTesting(); + + var queueDir = AudiencePaths.QueueDir(_testDir); + var trackFiles = Directory.GetFiles(queueDir, "*.json") + .Select(f => JsonReader.DeserializeObject(File.ReadAllText(f))) + .Where(m => (string)m["type"] == "track" + && m.ContainsKey("eventName") + && (string)m["eventName"] == "tracked_after_downgrade") + .ToList(); + + Assert.AreEqual(1, trackFiles.Count); + Assert.IsFalse(trackFiles[0].ContainsKey("userId"), + "Track under Anonymous consent must not carry userId"); + } + + // ----------------------------------------------------------------- + // SendBatch — overlapping timer tick guard + // ----------------------------------------------------------------- + + [Test] + public void SendBatch_ConcurrentTicks_OnlyOneReachesTransport() + { + var handler = new BlockingHandler(); + var config = MakeConfig(); + config.HttpHandler = handler; + + ImmutableAudience.Init(config); + ImmutableAudience.Track("event_to_send"); + ImmutableAudience.FlushQueueToDiskForTesting(); + + // Kick off one SendBatch on a worker — it will block inside the + // handler until we signal, holding _sendInFlight = 1. + var blocked = Task.Run(() => ImmutableAudience.SendBatchForTesting()); + + // Give the worker enough time to enter the handler's SendAsync. + Assert.IsTrue(handler.EnteredSendAsync.Wait(TimeSpan.FromSeconds(2)), + "first SendBatch should have reached the HTTP handler"); + + // Second tick while the first is still in flight — must return + // immediately without issuing another request. + ImmutableAudience.SendBatchForTesting(); + + // Release the blocked send and let it finish. + handler.Release.Set(); + Assert.IsTrue(blocked.Wait(TimeSpan.FromSeconds(5)), + "blocked SendBatch should finish after release"); + + Assert.AreEqual(1, handler.RequestCount, + "overlapping tick must not issue a second HTTP request"); + } + + private class BlockingHandler : HttpMessageHandler + { + public readonly ManualResetEventSlim EnteredSendAsync = new ManualResetEventSlim(false); + public readonly ManualResetEventSlim Release = new ManualResetEventSlim(false); + public int RequestCount; + + protected override async Task SendAsync(HttpRequestMessage request, CancellationToken ct) + { + Interlocked.Increment(ref RequestCount); + EnteredSendAsync.Set(); + await Task.Run(() => Release.Wait(ct), ct).ConfigureAwait(false); + return new HttpResponseMessage(HttpStatusCode.ServiceUnavailable); + } + } + } +} \ No newline at end of file diff --git a/src/Packages/Audience/Tests/Runtime/Transport/DiskStoreTests.cs b/src/Packages/Audience/Tests/Runtime/Transport/DiskStoreTests.cs index d9c37dbc7..3a6d6378e 100644 --- a/src/Packages/Audience/Tests/Runtime/Transport/DiskStoreTests.cs +++ b/src/Packages/Audience/Tests/Runtime/Transport/DiskStoreTests.cs @@ -32,7 +32,7 @@ public void Write_CreatesJsonFile_InQueueDirectory() { _store.Write("{\"event\":\"test\"}"); - var queueDir = Path.Combine(_testDir, "imtbl_audience", "queue"); + var queueDir = AudiencePaths.QueueDir(_testDir); var files = Directory.GetFiles(queueDir, "*.json"); Assert.AreEqual(1, files.Length, "should have written exactly one event file"); } @@ -43,7 +43,7 @@ public void Write_FileContents_MatchInputJson() const string json = "{\"event\":\"pageview\",\"userId\":\"u1\"}"; _store.Write(json); - var queueDir = Path.Combine(_testDir, "imtbl_audience", "queue"); + var queueDir = AudiencePaths.QueueDir(_testDir); var file = Directory.GetFiles(queueDir, "*.json").Single(); Assert.AreEqual(json, File.ReadAllText(file)); } @@ -97,7 +97,7 @@ public void ReadBatch_ExcludesAndDeletesStaleFiles() // Manually plant a stale file (ticks from 31 days ago) var staleTime = DateTime.UtcNow.AddDays(-(Constants.StaleEventDays + 1)); var staleName = $"{staleTime.Ticks}_{Guid.NewGuid():N}.json"; - var queueDir = Path.Combine(_testDir, "imtbl_audience", "queue"); + var queueDir = AudiencePaths.QueueDir(_testDir); File.WriteAllText(Path.Combine(queueDir, staleName), "{\"stale\":true}"); var batch = _store.ReadBatch(10); @@ -150,7 +150,7 @@ public void ReadBatch_ZeroMaxSize_ReturnsEmpty() public void CrashRecovery_PicksUpFilesFromPreviousRun() { // Simulate a previous run by writing a file directly - var queueDir = Path.Combine(_testDir, "imtbl_audience", "queue"); + var queueDir = AudiencePaths.QueueDir(_testDir); var survivingName = $"{DateTime.UtcNow.Ticks}_{Guid.NewGuid():N}.json"; File.WriteAllText(Path.Combine(queueDir, survivingName), "{\"survived\":true}"); @@ -160,5 +160,70 @@ public void CrashRecovery_PicksUpFilesFromPreviousRun() Assert.AreEqual(1, batch.Count, "crash-surviving file should be picked up on next init"); } + + [Test] + public void ApplyAnonymousDowngrade_DeletesIdentifyAndAlias_StripsUserIdFromTrack() + { + _store.Write("{\"type\":\"identify\",\"anonymousId\":\"a\",\"userId\":\"u\"}"); + _store.Write("{\"type\":\"alias\",\"fromId\":\"a\",\"toId\":\"u\"}"); + _store.Write("{\"type\":\"track\",\"eventName\":\"x\",\"anonymousId\":\"a\",\"userId\":\"u\"}"); + _store.Write("{\"type\":\"track\",\"eventName\":\"y\",\"anonymousId\":\"a\"}"); + + _store.ApplyAnonymousDowngrade(); + + var remaining = _store.ReadBatch(100); + Assert.AreEqual(2, remaining.Count, "identify and alias files should be deleted"); + + foreach (var path in remaining) + { + var json = File.ReadAllText(path); + var msg = JsonReader.DeserializeObject(json); + Assert.AreEqual("track", msg["type"]); + Assert.IsFalse(msg.ContainsKey("userId"), "userId must be stripped from queued track messages"); + } + } + + [Test] + public void ApplyAnonymousDowngrade_PurchaseValue_RoundsTripsExactlyForRealisticPrices() + { + // Pinning test: the JsonReader -> Json.Serialize rewrite path + // turns decimals into doubles. Assert that the realistic range of + // Purchase.Value amounts (typical two-decimal prices, free item, + // AAA-tier bundle) survives the rewrite without drift. + string[] realisticAmounts = { "0.99", "4.99", "9.99", "19.99", "49.99", "99.99", "149.99" }; + + foreach (var amount in realisticAmounts) + { + // Fresh store per iteration to keep assertions clean. + TearDown(); + SetUp(); + + var json = "{\"type\":\"track\",\"eventName\":\"purchase\",\"anonymousId\":\"a\",\"userId\":\"u\"," + + "\"properties\":{\"currency\":\"USD\",\"value\":" + amount + "}}"; + _store.Write(json); + + _store.ApplyAnonymousDowngrade(); + + var rewritten = _store.ReadBatch(10); + Assert.AreEqual(1, rewritten.Count); + var rewrittenJson = File.ReadAllText(rewritten[0]); + StringAssert.Contains("\"value\":" + amount, rewrittenJson, + $"Purchase.Value {amount} must round-trip exactly through the downgrade rewrite"); + } + } + + [Test] + public void ApplyAnonymousDowngrade_DeletesMalformedFiles() + { + // Seed the queue directory with a file that is not valid JSON so the + // downgrade cannot leave it to potentially leak identified data. + var queueDir = AudiencePaths.QueueDir(_testDir); + var badName = $"{DateTime.UtcNow.Ticks}_{Guid.NewGuid():N}.json"; + File.WriteAllText(Path.Combine(queueDir, badName), "{not valid json"); + + _store.ApplyAnonymousDowngrade(); + + Assert.AreEqual(0, _store.ReadBatch(10).Count, "malformed file must not survive downgrade"); + } } } diff --git a/src/Packages/Audience/Tests/Runtime/Transport/EventQueueTests.cs b/src/Packages/Audience/Tests/Runtime/Transport/EventQueueTests.cs index c9ec7ccd3..9dd48c580 100644 --- a/src/Packages/Audience/Tests/Runtime/Transport/EventQueueTests.cs +++ b/src/Packages/Audience/Tests/Runtime/Transport/EventQueueTests.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.IO; using System.Threading; using NUnit.Framework; @@ -26,12 +27,15 @@ public void TearDown() Directory.Delete(_testDir, recursive: true); } + private static Dictionary Msg(string evt) => + new Dictionary { ["event"] = evt }; + [Test] public void Enqueue_ThenFlushSync_PersistesEventToDisk() { using var queue = new EventQueue(_store, flushIntervalSeconds: 60, flushSize: 100); - queue.Enqueue("{\"event\":\"track\"}"); + queue.Enqueue(Msg("track")); queue.FlushSync(); Assert.AreEqual(1, _store.Count(), "event should be on disk after FlushSync"); @@ -43,7 +47,7 @@ public void Enqueue_MultipleEvents_AllPersistedAfterFlush() using var queue = new EventQueue(_store, flushIntervalSeconds: 60, flushSize: 100); for (var i = 0; i < 10; i++) - queue.Enqueue($"{{\"i\":{i}}}"); + queue.Enqueue(new Dictionary { ["i"] = i }); queue.FlushSync(); @@ -57,7 +61,7 @@ public void FlushSize_Trigger_DrainsToDiskAutomatically() using var queue = new EventQueue(_store, flushIntervalSeconds: 60, flushSize: flushSize); for (var i = 0; i < flushSize; i++) - queue.Enqueue($"{{\"i\":{i}}}"); + queue.Enqueue(new Dictionary { ["i"] = i }); // Give the background thread time to drain var deadline = DateTime.UtcNow.AddSeconds(3); @@ -73,8 +77,8 @@ public void Shutdown_FlushesRemainingEvents() { var queue = new EventQueue(_store, flushIntervalSeconds: 60, flushSize: 100); - queue.Enqueue("{\"event\":\"a\"}"); - queue.Enqueue("{\"event\":\"b\"}"); + queue.Enqueue(Msg("a")); + queue.Enqueue(Msg("b")); queue.Shutdown(); @@ -95,7 +99,7 @@ public void Enqueue_AfterShutdown_IsIgnored() var queue = new EventQueue(_store, flushIntervalSeconds: 60, flushSize: 100); queue.Shutdown(); - queue.Enqueue("{\"event\":\"ignored\"}"); + queue.Enqueue(Msg("ignored")); Assert.AreEqual(0, _store.Count(), "events enqueued after Shutdown should be discarded"); } @@ -106,7 +110,7 @@ public void IntervalFlush_DrainsToDiskWithoutExplicitCall() // Very short interval to make the test fast using var queue = new EventQueue(_store, flushIntervalSeconds: 1, flushSize: 100); - queue.Enqueue("{\"event\":\"interval_flush\"}"); + queue.Enqueue(Msg("interval_flush")); // Wait slightly longer than the flush interval var deadline = DateTime.UtcNow.AddSeconds(4); @@ -121,10 +125,30 @@ public void Dispose_FlushesAndStopsDrainThread() { using (var queue = new EventQueue(_store, flushIntervalSeconds: 60, flushSize: 100)) { - queue.Enqueue("{\"event\":\"dispose_test\"}"); + queue.Enqueue(Msg("dispose_test")); } // Dispose called here Assert.AreEqual(1, _store.Count(), "Dispose should flush events to disk"); } + + [Test] + public void SerialisationHappensOnDrainThread_CallerMutationDoesNotCorrupt() + { + using var queue = new EventQueue(_store, flushIntervalSeconds: 60, flushSize: 100); + + // Caller passes a dict, then -- simulating a misuse -- mutates it + // after enqueue but before FlushSync. Because the queue stores the + // reference and serialises on drain, the written payload reflects the + // mutated state. This test pins down that behaviour so callers know + // they must snapshot (which ImmutableAudience.Track does). + var shared = new Dictionary { ["v"] = 1 }; + queue.Enqueue(shared); + shared["v"] = 99; + + queue.FlushSync(); + + var written = File.ReadAllText(_store.ReadBatch(10)[0]); + StringAssert.Contains("\"v\":99", written); + } } }