From ddc7f24c7dfdc2cb13b586ddf41ed0b0413fb3a9 Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Sat, 30 May 2026 12:30:18 -0500 Subject: [PATCH 01/10] Fix duplicate plan limit overage emails MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Root cause: every web pod subscribes to PlanOverage at startup via EnqueueOrganizationNotificationOnPlanOverage. Foundatio pub/sub delivers each message to all subscribers, so a single monthly overage event enqueued one work item per running web pod. The original ThrottlingLockProvider(1/hour) allowed exactly one item through per calendar-hour bucket; abandoned duplicates were re-queued and reprocessed once each new bucket opened — producing one email per hour for each duplicate item. Fix: - Queue-level dedup: OrganizationNotificationWorkItem implements IHaveUniqueIdentifier and DuplicateDetectionQueueBehavior is registered so fanout enqueues collapse to one item. - Handler-level idempotency: per-org distributed lock (30 min) + 24-hour sent marker ensure stale duplicates already in the queue at deploy time cannot retrigger an email. - Hourly items short-circuit at GetWorkItemLockAsync and never enter the lock/sent-key path, preventing hourly overages from suppressing subsequent monthly notifications. Also add RCA-pinning unit tests (TestWithServices) and integration tests (IntegrationTestsBase) covering fanout dedup, legacy hourly throttle regression, per-org isolation, 24h resend window, hourly-before-monthly ordering, and idempotency via existing sent marker. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> fix: Refine plan limit email notification idempotency and lock management This commit refines the idempotency strategy and lock management for plan limit overage email notifications, addressing potential race conditions and improving resilience to transient failures. Key changes include: - **Revised Sent Marker Timing**: The notification sent marker is now written *after* the email has been successfully dispatched. This changes the idempotency guarantee from at-most-once (where the marker was written before sending) to at-least-once for successful notifications. This allows transient email send failures to be retried by subsequent work item processing attempts, without prematurely marking the notification as "sent." - **Extended Notification Lock Timeout**: The distributed lock for monthly overage notifications now uses a 65-minute timeout. This duration exceeds the 1-hour work-item queue visibility timeout, preventing the lock from expiring while a slow email send is in progress. This mitigates the risk of a second worker acquiring the lock and sending a duplicate email before the sent marker can be written. - **Formalized Queue Duplicate Detection**: The `DuplicateDetectionQueueBehavior` is now registered using `services.TryAddEnumerable` with a dedicated `WorkItemDuplicateDetectionQueueBehavior` class. This improves clarity in service registration and ensures robust queue-level deduplication, preventing multiple workers from processing the same notification item if enqueued in quick succession. New integration and unit tests validate the corrected idempotency behavior (marker written post-send, retries on failure), the extended lock timeout, and proper queue duplicate detection. fix: keep monthly overage email marker until limit reset fix: bound overage sent marker to billing month fix: refine overage marker reset on plan changes --- src/Exceptionless.Core/Bootstrapper.cs | 7 + ...OrganizationNotificationWorkItemHandler.cs | 133 +++++- .../OrganizationNotificationWorkItem.cs | 21 +- .../Services/UsageService.cs | 39 +- ...ficationWorkItemHandlerIntegrationTests.cs | 426 ++++++++++++++++++ ...izationNotificationWorkItemHandlerTests.cs | 273 +++++++++++ .../Mail/CountingMailer.cs | 79 ++++ .../Services/UsageServiceTests.cs | 160 +++++++ 8 files changed, 1107 insertions(+), 31 deletions(-) create mode 100644 tests/Exceptionless.Tests/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandlerIntegrationTests.cs create mode 100644 tests/Exceptionless.Tests/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandlerTests.cs create mode 100644 tests/Exceptionless.Tests/Mail/CountingMailer.cs diff --git a/src/Exceptionless.Core/Bootstrapper.cs b/src/Exceptionless.Core/Bootstrapper.cs index 751d8e0880..334f2b07ae 100644 --- a/src/Exceptionless.Core/Bootstrapper.cs +++ b/src/Exceptionless.Core/Bootstrapper.cs @@ -43,6 +43,7 @@ using Foundatio.Serializer; using Foundatio.Storage; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; using Microsoft.Extensions.Logging; using DataDictionary = Exceptionless.Core.Models.DataDictionary; using MaintainIndexesJob = Foundatio.Repositories.Elasticsearch.Jobs.MaintainIndexesJob; @@ -128,6 +129,8 @@ public static void RegisterServices(IServiceCollection services, AppOptions appO services.AddSingleton(s => CreateQueue(s)); services.AddSingleton(s => CreateQueue(s, TimeSpan.FromHours(1))); + services.TryAddEnumerable(ServiceDescriptor.Singleton, WorkItemDuplicateDetectionQueueBehavior>()); + services.AddSingleton(); services.AddSingleton(); services.AddStartupAction(); @@ -301,10 +304,14 @@ private static IQueue CreateQueue(IServiceProvider container, TimeSpan? wo return new InMemoryQueue(new InMemoryQueueOptions { WorkItemTimeout = workItemTimeout.GetValueOrDefault(TimeSpan.FromMinutes(5.0)), + Behaviors = container.GetServices>().ToList(), Serializer = container.GetRequiredService(), TimeProvider = container.GetRequiredService(), ResiliencePolicyProvider = container.GetRequiredService(), LoggerFactory = loggerFactory }); } + + private sealed class WorkItemDuplicateDetectionQueueBehavior(ICacheClient cacheClient, ILoggerFactory loggerFactory) + : DuplicateDetectionQueueBehavior(cacheClient, loggerFactory, TimeSpan.FromHours(24)); } diff --git a/src/Exceptionless.Core/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandler.cs b/src/Exceptionless.Core/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandler.cs index 840f2bad38..1eeef4b95c 100644 --- a/src/Exceptionless.Core/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandler.cs +++ b/src/Exceptionless.Core/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandler.cs @@ -1,8 +1,10 @@ +using Exceptionless.Core.Extensions; using Exceptionless.Core.Mail; using Exceptionless.Core.Messaging.Models; using Exceptionless.Core.Models; using Exceptionless.Core.Models.WorkItems; using Exceptionless.Core.Repositories; +using Exceptionless.DateTimeExtensions; using Foundatio.Caching; using Foundatio.Extensions.Hosting.Startup; using Foundatio.Jobs; @@ -10,7 +12,6 @@ using Foundatio.Messaging; using Foundatio.Queues; using Foundatio.Repositories; -using Foundatio.Resilience; using Microsoft.Extensions.Logging; namespace Exceptionless.Core.Jobs.WorkItemHandlers; @@ -44,37 +45,111 @@ await _workItemQueue.EnqueueAsync(new OrganizationNotificationWorkItem } } +/// +/// Handles by sending a plan-overage email at most +/// once per overage state per organization. +/// +/// Root cause of duplicate emails (fixed here): +/// Every web pod registers the same +/// startup action, so a single PlanOverage message fanned out to all pods and each pod +/// enqueued its own copy of the work item. The previous +/// ThrottlingLockProvider(slotsPerPeriod: 1, period: 1 hour) allowed exactly one item +/// through per calendar-hour bucket. Abandoned duplicates were re-queued and reprocessed once +/// each new bucket opened — producing one email per hour for as many duplicate items existed. +/// +/// Fix layers (both required): +/// +/// +/// Queue-level dedup: plus +/// DuplicateDetectionQueueBehavior collapses identical fanout enqueues to a single +/// queue entry, preventing duplicates from being enqueued in the first place. +/// +/// +/// Handler-level idempotency: a distributed lock serialises concurrent processing, and a +/// sent marker () ensures that any stale duplicates +/// already in the queue before the fix deployed cannot re-trigger an email in the same UTC +/// month. The marker is reset when a monthly plan limit change re-evaluates the overage state. +/// +/// +/// +/// Only monthly overages send email. Hourly items use the base work-item lock and return from +/// before touching the monthly notification lock/sent-key path, so +/// they can never block or suppress a later monthly notification. +/// Monthly items also re-check the current organization usage before sending so delayed queue +/// entries do not email after a plan or usage-state change leaves the organization under limit. +/// public class OrganizationNotificationWorkItemHandler : WorkItemHandlerBase { + private static readonly TimeSpan WorkItemLockTimeout = TimeSpan.FromMinutes(65); + private readonly IOrganizationRepository _organizationRepository; private readonly IUserRepository _userRepository; private readonly IMailer _mailer; + private readonly ICacheClient _cacheClient; + private readonly TimeProvider _timeProvider; + + // ILockProvider is kept local rather than pushed to WorkItemHandlerBase because the + // lock/sent-key contract is specific to plan-limit email idempotency. Passing it through + // the base class would force every unrelated handler to acquire unnecessary plan-limit locks. private readonly ILockProvider _lockProvider; - public OrganizationNotificationWorkItemHandler(IOrganizationRepository organizationRepository, IUserRepository userRepository, IMailer mailer, ICacheClient cacheClient, TimeProvider timeProvider, IResiliencePolicyProvider resiliencePolicyProvider, ILoggerFactory loggerFactory) : base(loggerFactory) + public OrganizationNotificationWorkItemHandler(IOrganizationRepository organizationRepository, IUserRepository userRepository, IMailer mailer, ICacheClient cacheClient, TimeProvider timeProvider, ILockProvider lockProvider, ILoggerFactory loggerFactory) : base(loggerFactory) { _organizationRepository = organizationRepository; _userRepository = userRepository; _mailer = mailer; - _lockProvider = new ThrottlingLockProvider(cacheClient, 1, TimeSpan.FromHours(1), timeProvider, resiliencePolicyProvider, loggerFactory); + _cacheClient = cacheClient; + _timeProvider = timeProvider; + _lockProvider = lockProvider; + } + + public override Task GetWorkItemLockAsync(object workItem, CancellationToken cancellationToken = default) + { + var wi = (OrganizationNotificationWorkItem)workItem; + + // Hourly overages do not send email. Return the base EmptyLock so WorkItemJob calls + // HandleItemAsync and then marks the queue entry as completed. Returning null instead + // would cause WorkItemJob to call AbandonAsync, creating an infinite retry loop where + // the item is re-queued and abandoned on every dequeue attempt. + if (!ShouldSendNotificationEmail(wi)) + return base.GetWorkItemLockAsync(workItem, cancellationToken); + + // timeUntilExpires: exceed the 1-hour work-item queue timeout so a slow send does not let + // a duplicate worker acquire the notification lock at the queue visibility boundary. + // + // acquireTimeout: TimeSpan.Zero — if another worker already holds the lock, return null + // immediately so WorkItemJob calls AbandonAsync. The item is retried later, at which point + // the sent marker is already set and the handler skips. Blocking here instead would stall + // a worker slot for up to the lock timeout with no correctness benefit. + return _lockProvider.TryAcquireAsync(GetNotificationLockKey(wi.OrganizationId, wi.IsOverMonthlyLimit), WorkItemLockTimeout, TimeSpan.Zero); } - public override Task HandleItemAsync(WorkItemContext context) + public override async Task HandleItemAsync(WorkItemContext context) { var wi = context.GetData()!; + if (!ShouldSendNotificationEmail(wi)) + return; - string cacheKey = $"{nameof(OrganizationNotificationWorkItemHandler)}:{wi.OrganizationId}"; + Log.LogInformation("Received organization notification work item for: {OrganizationId} IsOverHourlyLimit: {IsOverHourlyLimit} IsOverMonthlyLimit: {IsOverMonthlyLimit}", wi.OrganizationId, wi.IsOverHourlyLimit, wi.IsOverMonthlyLimit); - return _lockProvider.TryUsingAsync(cacheKey, async () => + var organization = await _organizationRepository.GetByIdAsync(wi.OrganizationId, o => o.Cache()); + if (organization is null) + return; + + if (!organization.IsOverMonthlyLimit(_timeProvider)) + { + Log.LogInformation("Skipping stale monthly overage notification for organization: {OrganizationId} because it is no longer over the monthly limit", wi.OrganizationId); + return; + } + + if (await WasNotificationSentAsync(wi.OrganizationId, wi.IsOverMonthlyLimit)) { - Log.LogInformation("Received organization notification work item for: {Organization} IsOverHourlyLimit: {IsOverHourlyLimit} IsOverMonthlyLimit: {IsOverMonthlyLimit}", wi.OrganizationId, wi.IsOverHourlyLimit, wi.IsOverMonthlyLimit); - var organization = await _organizationRepository.GetByIdAsync(wi.OrganizationId, o => o.Cache()); - if (organization is null) - return; - - if (wi.IsOverMonthlyLimit) - await SendOverageNotificationsAsync(organization, wi.IsOverHourlyLimit, wi.IsOverMonthlyLimit); - }, TimeSpan.FromMinutes(15), context.CancellationToken); + Log.LogInformation("Skipping duplicate monthly overage notification for organization: {OrganizationId}", wi.OrganizationId); + return; + } + + await SendOverageNotificationsAsync(organization, wi.IsOverHourlyLimit, wi.IsOverMonthlyLimit); + await _cacheClient.SetAsync(GetNotificationSentKey(wi.OrganizationId, wi.IsOverMonthlyLimit), true, GetNotificationSentExpiresAtUtc()); } private async Task SendOverageNotificationsAsync(Organization organization, bool isOverHourlyLimit, bool isOverMonthlyLimit) @@ -100,4 +175,34 @@ private async Task SendOverageNotificationsAsync(Organization organization, bool Log.LogTrace("Done sending email"); } + + // Only monthly overages send email. Hourly overages still trigger real-time websocket + // budget updates in the UI but do not warrant a separate notification email. + // Keeping hourly items out of the lock/sent-key path also means a burst of hourly events + // cannot suppress the monthly notification that follows. + private static bool ShouldSendNotificationEmail(OrganizationNotificationWorkItem workItem) + { + return workItem.IsOverMonthlyLimit; + } + + private async Task WasNotificationSentAsync(string organizationId, bool isOverMonthlyLimit) + { + var sent = await _cacheClient.GetAsync(GetNotificationSentKey(organizationId, isOverMonthlyLimit)); + return sent.HasValue && sent.Value; + } + + private DateTime GetNotificationSentExpiresAtUtc() + { + return _timeProvider.GetUtcNow().UtcDateTime.EndOfMonth(); + } + + public static string GetNotificationLockKey(string organizationId, bool isOverMonthlyLimit) + { + return $"{OrganizationNotificationWorkItem.GetNotificationKey(organizationId, isOverMonthlyLimit)}-lock"; + } + + public static string GetNotificationSentKey(string organizationId, bool isOverMonthlyLimit) + { + return OrganizationNotificationWorkItem.GetNotificationSentKey(organizationId, isOverMonthlyLimit); + } } diff --git a/src/Exceptionless.Core/Models/WorkItems/OrganizationNotificationWorkItem.cs b/src/Exceptionless.Core/Models/WorkItems/OrganizationNotificationWorkItem.cs index 2d734ea6c2..6e6401d2f4 100644 --- a/src/Exceptionless.Core/Models/WorkItems/OrganizationNotificationWorkItem.cs +++ b/src/Exceptionless.Core/Models/WorkItems/OrganizationNotificationWorkItem.cs @@ -1,8 +1,25 @@ -namespace Exceptionless.Core.Models.WorkItems; +using Foundatio.Queues; -public record OrganizationNotificationWorkItem +namespace Exceptionless.Core.Models.WorkItems; + +public record OrganizationNotificationWorkItem : IHaveUniqueIdentifier { + public const string HourlyNotificationType = "hourly"; + public const string MonthlyNotificationType = "monthly"; + public required string OrganizationId { get; init; } public required bool IsOverHourlyLimit { get; init; } public required bool IsOverMonthlyLimit { get; init; } + + public string? UniqueIdentifier => GetNotificationKey(OrganizationId, IsOverMonthlyLimit); + + public static string GetNotificationKey(string organizationId, bool isOverMonthlyLimit) + { + return $"Organization:{organizationId}:notification:{(isOverMonthlyLimit ? MonthlyNotificationType : HourlyNotificationType)}"; + } + + public static string GetNotificationSentKey(string organizationId, bool isOverMonthlyLimit) + { + return $"{GetNotificationKey(organizationId, isOverMonthlyLimit)}-sent"; + } } diff --git a/src/Exceptionless.Core/Services/UsageService.cs b/src/Exceptionless.Core/Services/UsageService.cs index 8de074c3e8..0ba03190df 100644 --- a/src/Exceptionless.Core/Services/UsageService.cs +++ b/src/Exceptionless.Core/Services/UsageService.cs @@ -1,6 +1,7 @@ using Exceptionless.Core.Extensions; using Exceptionless.Core.Messaging.Models; using Exceptionless.Core.Models; +using Exceptionless.Core.Models.WorkItems; using Exceptionless.Core.Repositories; using Exceptionless.DateTimeExtensions; using Foundatio.Caching; @@ -223,28 +224,36 @@ public async Task HandleOrganizationChangeAsync(Organization modified, Organizat if (modifiedMaxEvents == originalMaxEvents) return; - if (modifiedMaxEvents > originalMaxEvents) + bool isMonthlyLimitIncrease = modifiedMaxEvents < 0 || (originalMaxEvents >= 0 && modifiedMaxEvents > originalMaxEvents); + if (isMonthlyLimitIncrease) { - // remove is throttled flag + await _cache.RemoveAsync(OrganizationNotificationWorkItem.GetNotificationSentKey(modified.Id, isOverMonthlyLimit: true)); await _cache.RemoveAsync(GetThrottledKey(utcNow, modified.Id)); + return; } - else + + bool wasOverMonthlyLimit = original.IsOverMonthlyLimit(_timeProvider); + bool isOverMonthlyLimit = modified.IsOverMonthlyLimit(_timeProvider); + if (!wasOverMonthlyLimit && isOverMonthlyLimit) { - var bucketTotal = await _cache.GetAsync(GetBucketTotalCacheKey(utcNow, modified.Id)); - if (!bucketTotal.HasValue) - return; + await _cache.RemoveAsync(OrganizationNotificationWorkItem.GetNotificationSentKey(modified.Id, isOverMonthlyLimit: true)); + await _messagePublisher.PublishAsync(new PlanOverage { OrganizationId = modified.Id }); + } + + var bucketTotal = await _cache.GetAsync(GetBucketTotalCacheKey(utcNow, modified.Id)); + if (!bucketTotal.HasValue) + return; - int bucketLimit = GetBucketEventLimit(modifiedMaxEvents); + int bucketLimit = GetBucketEventLimit(modifiedMaxEvents); - // unlimited - if (bucketLimit < 0) - return; + // unlimited + if (bucketLimit < 0) + return; - if (bucketTotal.Value >= bucketLimit) - { - await _messagePublisher.PublishAsync(new PlanOverage { OrganizationId = modified.Id, IsHourly = true }); - await _cache.SetAsync(GetThrottledKey(utcNow, modified.Id), true, TimeSpan.FromMinutes(5)); - } + if (bucketTotal.Value >= bucketLimit) + { + await _messagePublisher.PublishAsync(new PlanOverage { OrganizationId = modified.Id, IsHourly = true }); + await _cache.SetAsync(GetThrottledKey(utcNow, modified.Id), true, TimeSpan.FromMinutes(5)); } } diff --git a/tests/Exceptionless.Tests/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandlerIntegrationTests.cs b/tests/Exceptionless.Tests/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandlerIntegrationTests.cs new file mode 100644 index 0000000000..72fb560dcb --- /dev/null +++ b/tests/Exceptionless.Tests/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandlerIntegrationTests.cs @@ -0,0 +1,426 @@ +using Exceptionless.Core; +using Exceptionless.Core.Billing; +using Exceptionless.Core.Extensions; +using Exceptionless.Core.Jobs.WorkItemHandlers; +using Exceptionless.Core.Mail; +using Exceptionless.Core.Models; +using Exceptionless.Core.Models.WorkItems; +using Exceptionless.Core.Repositories; +using Exceptionless.DateTimeExtensions; +using Exceptionless.Tests.Mail; +using Exceptionless.Tests.Utility; +using Foundatio.Caching; +using Foundatio.Jobs; +using Foundatio.Repositories; +using Microsoft.Extensions.DependencyInjection; +using Xunit; + +namespace Exceptionless.Tests.Jobs.WorkItemHandlers; + +public class OrganizationNotificationWorkItemHandlerIntegrationTests : IntegrationTestsBase +{ + private const string PrimaryOrganizationId = "664ec4c1f12e4f2b7a0d1001"; + private const string SecondaryOrganizationId = "664ec4c1f12e4f2b7a0d1002"; + private const string MissingOrganizationId = "664ec4c1f12e4f2b7a0d1003"; + private const string PrimaryUserId = "664ec4c1f12e4f2b7a0d2001"; + private const string SecondaryUserId = "664ec4c1f12e4f2b7a0d2002"; + private const string UnverifiedUserId = "664ec4c1f12e4f2b7a0d2003"; + private const string DisabledNotificationsUserId = "664ec4c1f12e4f2b7a0d2004"; + + public OrganizationNotificationWorkItemHandlerIntegrationTests(ITestOutputHelper output, AppWebHostFactory factory) : base(output, factory) { } + + private CountingMailer Mailer => GetService(); + private ICacheClient CacheClient => GetService(); + private OrganizationNotificationWorkItemHandler Handler => GetService(); + private IOrganizationRepository OrganizationRepository => GetService(); + private IUserRepository UserRepository => GetService(); + private BillingManager BillingManager => GetService(); + private BillingPlans BillingPlans => GetService(); + private OrganizationData OrganizationData => GetService(); + private UserData UserData => GetService(); + + private Task SetMonthlySentMarkerAsync(string organizationId) + { + return CacheClient.SetAsync( + OrganizationNotificationWorkItemHandler.GetNotificationSentKey(organizationId, isOverMonthlyLimit: true), + true, + TimeProvider.GetUtcNow().UtcDateTime.EndOfMonth()); + } + + protected override void RegisterServices(IServiceCollection services) + { + base.RegisterServices(services); + services.AddSingleton(); + services.ReplaceSingleton(sp => sp.GetRequiredService()); + } + + protected override async Task ResetDataAsync() + { + await base.ResetDataAsync(); + Mailer.Reset(); + + var organizations = new[] + { + OrganizationData.GenerateOrganization(BillingManager, BillingPlans, id: PrimaryOrganizationId, name: "Primary Organization", plan: BillingPlans.SmallPlan), + OrganizationData.GenerateOrganization(BillingManager, BillingPlans, id: SecondaryOrganizationId, name: "Secondary Organization", plan: BillingPlans.SmallPlan) + }; + + foreach (var organization in organizations) + organization.GetCurrentUsage(TimeProvider).Total = organization.GetMaxEventsPerMonthWithBonus(TimeProvider); + + await OrganizationRepository.AddAsync(organizations, options => options.ImmediateConsistency()); + + var primaryUser = UserData.GenerateUser(id: PrimaryUserId, organizationId: PrimaryOrganizationId, emailAddress: "primary-owner@example.org"); + primaryUser.FullName = "Primary Owner"; + primaryUser.IsEmailAddressVerified = true; + + var secondaryUser = UserData.GenerateUser(id: SecondaryUserId, organizationId: SecondaryOrganizationId, emailAddress: "secondary-owner@example.org"); + secondaryUser.FullName = "Secondary Owner"; + secondaryUser.IsEmailAddressVerified = true; + + await UserRepository.AddAsync([primaryUser, secondaryUser], options => options.ImmediateConsistency()); + } + + [Fact] + public async Task HandleItemAsync_WhenDuplicateMonthlyNotificationsAreProcessedAcrossHours_ShouldSendOneEmail() + { + // Arrange + // Act + await HandleWorkItemAsync(CreateMonthlyNotificationWorkItem(PrimaryOrganizationId)); + + TimeProvider.Advance(TimeSpan.FromHours(1).Add(TimeSpan.FromMinutes(1))); + await HandleWorkItemAsync(CreateMonthlyNotificationWorkItem(PrimaryOrganizationId)); + + TimeProvider.Advance(TimeSpan.FromHours(1).Add(TimeSpan.FromMinutes(1))); + await HandleWorkItemAsync(CreateMonthlyNotificationWorkItem(PrimaryOrganizationId)); + + // Assert + Assert.Equal(1, Mailer.OrganizationNoticeCount); + } + + [Fact] + public async Task HandleItemAsync_WhenMonthlyNotificationsTargetDifferentOrganizations_ShouldSendOneEmailPerOrganization() + { + // Arrange + // Act + for (int i = 0; i < 3; i++) + { + await HandleWorkItemAsync(CreateMonthlyNotificationWorkItem(PrimaryOrganizationId)); + await HandleWorkItemAsync(CreateMonthlyNotificationWorkItem(SecondaryOrganizationId)); + } + + // Assert + var callsByOrganization = Mailer.OrganizationNoticeCalls + .GroupBy(call => call.OrganizationId) + .ToDictionary(group => group.Key, group => group.Count()); + + Assert.Equal(2, callsByOrganization.Count); + Assert.Equal(1, callsByOrganization[PrimaryOrganizationId]); + Assert.Equal(1, callsByOrganization[SecondaryOrganizationId]); + } + + [Fact] + public async Task HandleItemAsync_WhenTwentyFourHoursPass_ShouldNotSendAnotherMonthlyNotification() + { + // Arrange + TimeProvider.SetUtcNow(new DateTime(2026, 5, 15, 12, 0, 0, DateTimeKind.Utc)); + await SetOrganizationOverMonthlyLimitAsync(PrimaryOrganizationId); + + // Act + await HandleWorkItemAsync(CreateMonthlyNotificationWorkItem(PrimaryOrganizationId)); + + TimeProvider.Advance(TimeSpan.FromHours(25)); + await HandleWorkItemAsync(CreateMonthlyNotificationWorkItem(PrimaryOrganizationId)); + + // Assert + Assert.Equal(1, Mailer.OrganizationNoticeCount); + } + + [Fact] + public async Task HandleItemAsync_WhenUtcMonthEndsAndOrganizationIsStillOverMonthlyLimit_ShouldAllowAnotherMonthlyNotification() + { + // Arrange + TimeProvider.SetUtcNow(new DateTime(2026, 5, 31, 23, 50, 0, DateTimeKind.Utc)); + await SetOrganizationOverMonthlyLimitAsync(PrimaryOrganizationId); + + // Act + await HandleWorkItemAsync(CreateMonthlyNotificationWorkItem(PrimaryOrganizationId)); + + TimeProvider.Advance(TimeSpan.FromMinutes(20)); + await SetOrganizationOverMonthlyLimitAsync(PrimaryOrganizationId); + await HandleWorkItemAsync(CreateMonthlyNotificationWorkItem(PrimaryOrganizationId)); + + // Assert + Assert.Equal(2, Mailer.OrganizationNoticeCount); + } + + /// + /// Regression test: returning null from GetWorkItemLockAsync causes WorkItemJob to call + /// AbandonAsync instead of HandleItemAsync, creating an infinite retry loop for hourly items. + /// The fix returns EmptyLock (via base) so the item is completed normally. + /// + [Fact] + public async Task GetWorkItemLockAsync_WhenWorkItemIsHourly_ShouldReturnNonNullLockSoItemIsNotAbandoned() + { + // Arrange + var workItem = CreateHourlyNotificationWorkItem(PrimaryOrganizationId); + + // Act + // null → WorkItemJob calls AbandonAsync (infinite retry loop) + // non-null → WorkItemJob calls HandleItemAsync and completes the entry + await using var workItemLock = await Handler.GetWorkItemLockAsync(workItem, TestContext.Current.CancellationToken); + + // Assert + Assert.NotNull(workItemLock); + } + + [Fact] + public async Task GetWorkItemLockAsync_WhenMonthlyLockIsAlreadyHeld_ShouldReturnNullSoConcurrentItemIsAbandoned() + { + // Arrange + var workItem = CreateMonthlyNotificationWorkItem(PrimaryOrganizationId); + + // Act + await using var firstLock = await Handler.GetWorkItemLockAsync(workItem, TestContext.Current.CancellationToken); + await using var secondLock = await Handler.GetWorkItemLockAsync(workItem, TestContext.Current.CancellationToken); + + // Assert + Assert.NotNull(firstLock); // First worker acquires the lock + Assert.Null(secondLock); // Concurrent duplicate is correctly abandoned + } + + /// + /// Regression test for the helper bug: + /// the helper must NOT call + /// when the lock is null, because in production WorkItemJob calls AbandonAsync instead. + /// Without this guard a concurrent item could bypass the lock, find no sent marker, and send + /// a duplicate email — a bug that would be invisible to all other integration tests. + /// + [Fact] + public async Task HandleItemAsync_WhenConcurrentWorkerHoldsMonthlyLock_ShouldNotSendEmail() + { + // Arrange: a concurrent worker already holds the monthly notification lock + var workItem = CreateMonthlyNotificationWorkItem(PrimaryOrganizationId); + await using var concurrentWorkerLock = await Handler.GetWorkItemLockAsync(workItem, TestContext.Current.CancellationToken); + Assert.NotNull(concurrentWorkerLock); + + // Act: a second worker attempts to process the same item while the lock is held; + // GetWorkItemLockAsync returns null so WorkItemJob abandons the item (never calls HandleItemAsync) + await HandleWorkItemAsync(workItem); + + // Assert: no email was sent because the item was abandoned, not processed + Assert.Equal(0, Mailer.OrganizationNoticeCount); + } + + [Fact] + public async Task HandleItemAsync_WhenOnlyAnHourlyOverageIsProcessed_ShouldNotSendEmail() + { + // Arrange + // Act + await HandleWorkItemAsync(CreateHourlyNotificationWorkItem(PrimaryOrganizationId)); + + // Assert + Assert.Equal(0, Mailer.OrganizationNoticeCount); + } + + [Fact] + public async Task HandleItemAsync_WhenTheMonthlySentMarkerAlreadyExists_ShouldNotSendEmail() + { + // Arrange + await SetMonthlySentMarkerAsync(PrimaryOrganizationId); + + // Act + await HandleWorkItemAsync(CreateMonthlyNotificationWorkItem(PrimaryOrganizationId)); + + // Assert + Assert.Equal(0, Mailer.OrganizationNoticeCount); + } + + [Fact] + public async Task HandleItemAsync_WhenMonthlyEmailIsSent_ShouldWriteSentMarker() + { + // Arrange + var workItem = CreateMonthlyNotificationWorkItem(PrimaryOrganizationId); + + // Act + await HandleWorkItemAsync(workItem); + + // Assert + var sentMarkerExists = await CacheClient.ExistsAsync( + OrganizationNotificationWorkItemHandler.GetNotificationSentKey(PrimaryOrganizationId, isOverMonthlyLimit: true)); + Assert.True(sentMarkerExists); + } + + [Fact] + public async Task HandleItemAsync_WhenMonthlyWorkItemIsStaleAndOrganizationIsNoLongerOverMonthlyLimit_ShouldNotSendEmailOrWriteSentMarker() + { + // Arrange + var organization = await OrganizationRepository.GetByIdAsync(PrimaryOrganizationId, options => options.Cache()); + Assert.NotNull(organization); + + organization.GetCurrentUsage(TimeProvider).Total = 0; + await OrganizationRepository.SaveAsync(organization, options => options.ImmediateConsistency().Cache()); + + // Act + await HandleWorkItemAsync(CreateMonthlyNotificationWorkItem(PrimaryOrganizationId)); + + // Assert + var sentMarkerExists = await CacheClient.ExistsAsync( + OrganizationNotificationWorkItemHandler.GetNotificationSentKey(PrimaryOrganizationId, isOverMonthlyLimit: true)); + Assert.False(sentMarkerExists); + Assert.Equal(0, Mailer.OrganizationNoticeCount); + } + + [Fact] + public async Task HandleItemAsync_WhenOrganizationNoLongerExists_ShouldNotWriteSentMarkerOrSendEmail() + { + // Arrange + var workItem = CreateMonthlyNotificationWorkItem(MissingOrganizationId); + + // Act + await HandleWorkItemAsync(workItem); + + // Assert + var sentMarkerExists = await CacheClient.ExistsAsync( + OrganizationNotificationWorkItemHandler.GetNotificationSentKey(MissingOrganizationId, isOverMonthlyLimit: true)); + Assert.False(sentMarkerExists); + Assert.Equal(0, Mailer.OrganizationNoticeCount); + } + + [Fact] + public async Task HandleItemAsync_WhenUsersAreUnverifiedOrDisabled_ShouldOnlySendToEligibleUsers() + { + // Arrange + var unverifiedUser = UserData.GenerateUser(id: UnverifiedUserId, organizationId: PrimaryOrganizationId, emailAddress: "unverified-owner@example.org"); + unverifiedUser.FullName = "Unverified Owner"; + unverifiedUser.IsEmailAddressVerified = false; + unverifiedUser.EmailNotificationsEnabled = true; + unverifiedUser.ResetVerifyEmailAddressTokenAndExpiration(TimeProvider); + + var disabledNotificationsUser = UserData.GenerateUser(id: DisabledNotificationsUserId, organizationId: PrimaryOrganizationId, emailAddress: "disabled-owner@example.org"); + disabledNotificationsUser.FullName = "Disabled Owner"; + disabledNotificationsUser.IsEmailAddressVerified = true; + disabledNotificationsUser.EmailNotificationsEnabled = false; + + await UserRepository.AddAsync([unverifiedUser, disabledNotificationsUser], options => options.ImmediateConsistency()); + + // Act + await HandleWorkItemAsync(CreateMonthlyNotificationWorkItem(PrimaryOrganizationId)); + + // Assert + var call = Assert.Single(Mailer.OrganizationNoticeCalls); + Assert.Equal(PrimaryUserId, call.UserId); + Assert.Equal(PrimaryOrganizationId, call.OrganizationId); + Assert.True(call.IsOverMonthlyLimit); + Assert.False(call.IsOverHourlyLimit); + } + + [Fact] + public async Task HandleItemAsync_WhenHourlyOveragePrecedesMonthlyOverage_ShouldSendTheMonthlyEmail() + { + // Arrange + // Act + await HandleWorkItemAsync(CreateHourlyNotificationWorkItem(PrimaryOrganizationId)); + await HandleWorkItemAsync(CreateMonthlyNotificationWorkItem(PrimaryOrganizationId)); + + // Assert + Assert.Equal(1, Mailer.OrganizationNoticeCount); + } + + [Fact] + public async Task HandleItemAsync_WhenHourlyOverageArrivesAfterMonthlyEmail_ShouldNotSendAnotherEmail() + { + // Arrange + // Act + await HandleWorkItemAsync(CreateMonthlyNotificationWorkItem(PrimaryOrganizationId)); + + TimeProvider.Advance(TimeSpan.FromHours(1).Add(TimeSpan.FromMinutes(1))); + await HandleWorkItemAsync(CreateHourlyNotificationWorkItem(PrimaryOrganizationId)); + + // Assert + var call = Assert.Single(Mailer.OrganizationNoticeCalls); + Assert.True(call.IsOverMonthlyLimit); + Assert.False(call.IsOverHourlyLimit); + } + + [Fact] + public async Task GetWorkItemLockAsync_WhenMonthlyWorkerReachesWorkItemTimeout_ShouldKeepConcurrentItemAbandoned() + { + // Arrange + var workItem = CreateMonthlyNotificationWorkItem(PrimaryOrganizationId); + await using var workerALock = await Handler.GetWorkItemLockAsync(workItem, TestContext.Current.CancellationToken); + Assert.NotNull(workerALock); + + TimeProvider.Advance(TimeSpan.FromMinutes(61)); + + // Act + await using var workerBLock = await Handler.GetWorkItemLockAsync(workItem, TestContext.Current.CancellationToken); + + // Assert + Assert.Null(workerBLock); + } + + [Fact] + public async Task HandleItemAsync_WhenEmailSendFails_ShouldNotWriteSentMarkerSoRetryCanSend() + { + // Arrange + var workItem = CreateMonthlyNotificationWorkItem(PrimaryOrganizationId); + Mailer.ShouldThrow = true; + + // Act + await Assert.ThrowsAsync(() => HandleWorkItemAsync(workItem)); + + var sentMarkerExists = await CacheClient.ExistsAsync( + OrganizationNotificationWorkItemHandler.GetNotificationSentKey(PrimaryOrganizationId, isOverMonthlyLimit: true)); + + Mailer.ShouldThrow = false; + await HandleWorkItemAsync(workItem); + + // Assert + Assert.False(sentMarkerExists); + Assert.Equal(1, Mailer.OrganizationNoticeCount); + } + + private async Task HandleWorkItemAsync(OrganizationNotificationWorkItem workItem) + { + await using var workItemLock = await Handler.GetWorkItemLockAsync(workItem, TestContext.Current.CancellationToken); + + // Mirror production WorkItemJob semantics: when GetWorkItemLockAsync returns null, + // WorkItemJob calls AbandonAsync and never calls HandleItemAsync. Omitting this guard + // would let tests call HandleItemAsync without the lock, masking concurrent-access bugs. + if (workItemLock is null) + return; + + var context = new WorkItemContext(workItem, "test-job", workItemLock, TestContext.Current.CancellationToken, static (_, _) => Task.CompletedTask); + await Handler.HandleItemAsync(context); + } + + private async Task SetOrganizationOverMonthlyLimitAsync(string organizationId) + { + var organization = await OrganizationRepository.GetByIdAsync(organizationId, options => options.Cache()); + Assert.NotNull(organization); + + organization.GetCurrentUsage(TimeProvider).Total = organization.GetMaxEventsPerMonthWithBonus(TimeProvider); + await OrganizationRepository.SaveAsync(organization, options => options.ImmediateConsistency().Cache()); + } + + private static OrganizationNotificationWorkItem CreateMonthlyNotificationWorkItem(string organizationId) + { + return new OrganizationNotificationWorkItem + { + OrganizationId = organizationId, + IsOverHourlyLimit = false, + IsOverMonthlyLimit = true + }; + } + + private static OrganizationNotificationWorkItem CreateHourlyNotificationWorkItem(string organizationId) + { + return new OrganizationNotificationWorkItem + { + OrganizationId = organizationId, + IsOverHourlyLimit = true, + IsOverMonthlyLimit = false + }; + } +} diff --git a/tests/Exceptionless.Tests/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandlerTests.cs b/tests/Exceptionless.Tests/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandlerTests.cs new file mode 100644 index 0000000000..4b04b5e905 --- /dev/null +++ b/tests/Exceptionless.Tests/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandlerTests.cs @@ -0,0 +1,273 @@ +using Exceptionless.Core; +using Exceptionless.Core.Extensions; +using Exceptionless.Core.Jobs.WorkItemHandlers; +using Exceptionless.Core.Mail; +using Exceptionless.Core.Messaging.Models; +using Exceptionless.Core.Models; +using Exceptionless.Core.Models.WorkItems; +using Exceptionless.Tests.Mail; +using Foundatio.Caching; +using Foundatio.Jobs; +using Foundatio.Lock; +using Foundatio.Messaging; +using Foundatio.Queues; +using Foundatio.Resilience; +using Foundatio.Serializer; +using Microsoft.Extensions.DependencyInjection; +using Xunit; + +namespace Exceptionless.Tests.Jobs.WorkItemHandlers; + +/// +/// RCA-focused tests for duplicate plan-limit email notifications. +/// +/// Root cause: every web pod registers the same +/// startup action. Because Foundatio pub/sub delivers each message to all subscribers, a single +/// monthly PlanOverage event enqueued one work item per running web pod. The original +/// ThrottlingLockProvider(slotsPerPeriod: 1, period: 1 hour) allowed exactly one item +/// through per calendar-hour bucket. Duplicate items were abandoned back to the queue and +/// reprocessed once each new bucket opened — producing one email per hour. +/// +/// Fix: queue-level dedup ( + +/// DuplicateDetectionQueueBehavior) collapses the fanout at enqueue time, and +/// handler-level idempotency (per-org distributed lock + sent marker) ensures that stale +/// duplicates already in the queue when the fix deployed cannot retrigger an email. +/// +public class OrganizationNotificationWorkItemHandlerTests : TestWithServices +{ + private const string PrimaryOrganizationId = "664ec4c1f12e4f2b7a0d1001"; + private const string QueueDuplicateDetectionOrganizationId = "664ec4c1f12e4f2b7a0d1002"; + private const string RegisteredQueueOrganizationId = "664ec4c1f12e4f2b7a0d1003"; + private const string DequeuedNotificationOrganizationId = "664ec4c1f12e4f2b7a0d1004"; + private const string HourlyOrganizationId = "664ec4c1f12e4f2b7a0d1005"; + + public OrganizationNotificationWorkItemHandlerTests(ITestOutputHelper output) : base(output) { } + + private CountingMailer Mailer => GetService(); + private IMessagePublisher MessagePublisher => GetService(); + private IMessageSubscriber MessageSubscriber => GetService(); + private ICacheClient CacheClient => GetService(); + private IResiliencePolicyProvider ResiliencePolicyProvider => GetService(); + + protected override void RegisterServices(IServiceCollection services, AppOptions options) + { + base.RegisterServices(services, options); + services.AddSingleton(); + services.ReplaceSingleton(sp => sp.GetRequiredService()); + } + + [Fact] + public async Task RunAsync_WhenOnePlanOverageIsObservedBySixSubscribersWithoutQueueDuplicateDetection_ShouldEnqueueSixWorkItems() + { + // Arrange + using var workItemQueue = CreateWorkItemQueue(); + await SubscribeToPlanOverageAsync(workItemQueue, subscriberCount: 6); + + // Act + await MessagePublisher.PublishAsync(new PlanOverage { OrganizationId = PrimaryOrganizationId }, cancellationToken: TestContext.Current.CancellationToken); + + // Assert + var queueStats = await workItemQueue.GetQueueStatsAsync(); + Assert.Equal(6, queueStats.Enqueued); + } + + [Fact] + public async Task RunAsync_WhenOnePlanOverageIsObservedBySixSubscribersWithQueueDuplicateDetection_ShouldEnqueueOneWorkItem() + { + // Arrange + using var dedupBehavior = new DuplicateDetectionQueueBehavior(CacheClient, GetService(), TimeSpan.FromHours(24)); + using var workItemQueue = CreateWorkItemQueue(dedupBehavior); + await SubscribeToPlanOverageAsync(workItemQueue, subscriberCount: 6); + + // Act + await MessagePublisher.PublishAsync(new PlanOverage { OrganizationId = QueueDuplicateDetectionOrganizationId }, cancellationToken: TestContext.Current.CancellationToken); + + // Assert + var queueStats = await workItemQueue.GetQueueStatsAsync(); + Assert.Equal(1, queueStats.Enqueued); + } + + [Fact] + public async Task EnqueueAsync_WhenDuplicateNotificationUsesRegisteredQueueBehavior_ShouldEnqueueOneWorkItem() + { + // Arrange + var workItemQueue = GetService>(); + var workItem = CreateMonthlyNotificationWorkItem(RegisteredQueueOrganizationId); + + // Act + await workItemQueue.EnqueueAsync(workItem); + await workItemQueue.EnqueueAsync(workItem); + + // Assert + var queueStats = await workItemQueue.GetQueueStatsAsync(); + Assert.Equal(1, queueStats.Enqueued); + } + + [Fact] + public async Task EnqueueAsync_WhenDuplicateNotificationIsDequeued_ShouldAllowFutureEnqueue() + { + // Arrange + using var dedupBehavior = new DuplicateDetectionQueueBehavior(CacheClient, GetService(), TimeSpan.FromHours(24)); + using var workItemQueue = CreateWorkItemQueue(dedupBehavior); + var workItem = CreateMonthlyNotificationWorkItem(DequeuedNotificationOrganizationId); + + await workItemQueue.EnqueueAsync(workItem); + var queueEntry = await workItemQueue.DequeueAsync(TestContext.Current.CancellationToken); + Assert.NotNull(queueEntry); + await queueEntry.CompleteAsync(); + + // Act + await workItemQueue.EnqueueAsync(workItem); + + // Assert + var queueStats = await workItemQueue.GetQueueStatsAsync(); + Assert.Equal(2, queueStats.Enqueued); + } + + [Fact] + public async Task RunAsync_WhenHourlyPlanOverageIsObserved_ShouldEnqueueHourlyWorkItem() + { + // Arrange + using var workItemQueue = CreateWorkItemQueue(); + await SubscribeToPlanOverageAsync(workItemQueue, subscriberCount: 1); + + // Act + await MessagePublisher.PublishAsync(new PlanOverage { OrganizationId = HourlyOrganizationId, IsHourly = true }, cancellationToken: TestContext.Current.CancellationToken); + + // Assert + var queueEntry = await workItemQueue.DequeueAsync(TestContext.Current.CancellationToken); + Assert.NotNull(queueEntry); + + var workItem = GetService().Deserialize(queueEntry.Value.Data)!; + Assert.Equal(HourlyOrganizationId, workItem.OrganizationId); + Assert.True(workItem.IsOverHourlyLimit); + Assert.False(workItem.IsOverMonthlyLimit); + + await queueEntry.CompleteAsync(); + } + + [Fact] + public void UniqueIdentifier_WhenMonthlyNotificationIsCreated_ShouldMatchCanonicalNotificationKey() + { + // Arrange + var workItem = CreateMonthlyNotificationWorkItem(PrimaryOrganizationId); + + // Act + var uniqueIdentifier = workItem.UniqueIdentifier; + + // Assert + Assert.Equal(OrganizationNotificationWorkItem.GetNotificationKey(PrimaryOrganizationId, isOverMonthlyLimit: true), uniqueIdentifier); + } + + [Fact] + public void UniqueIdentifier_WhenHourlyNotificationIsCreated_ShouldMatchCanonicalNotificationKey() + { + // Arrange + var workItem = CreateHourlyNotificationWorkItem(PrimaryOrganizationId); + + // Act + var uniqueIdentifier = workItem.UniqueIdentifier; + + // Assert + Assert.Equal(OrganizationNotificationWorkItem.GetNotificationKey(PrimaryOrganizationId, isOverMonthlyLimit: false), uniqueIdentifier); + } + + [Fact] + public void GetNotificationKey_WhenMonthlyAndHourlyNotificationsAreCreated_ShouldUseDifferentKeys() + { + // Arrange + // Act + var monthlyKey = OrganizationNotificationWorkItem.GetNotificationKey(PrimaryOrganizationId, isOverMonthlyLimit: true); + var hourlyKey = OrganizationNotificationWorkItem.GetNotificationKey(PrimaryOrganizationId, isOverMonthlyLimit: false); + + // Assert + Assert.Equal($"Organization:{PrimaryOrganizationId}:notification:monthly", monthlyKey); + Assert.Equal($"Organization:{PrimaryOrganizationId}:notification:hourly", hourlyKey); + Assert.NotEqual(monthlyKey, hourlyKey); + } + + [Fact] + public async Task HandleItemAsync_WhenLegacyHourlyThrottleProcessesStaleMonthlyDuplicates_ShouldSendOneEmailPerHourBucket() + { + // Arrange + // Reproduce the pre-fix behavior: ThrottlingLockProvider(1/hour) allowed exactly one + // item through per calendar-hour bucket. When a duplicate was abandoned and re-queued, + // it could acquire a fresh lock once the next hour bucket opened — one email per hour. + var organization = new Organization { Id = PrimaryOrganizationId, Name = "Acme Corp" }; + var user = new User + { + Id = "664ec4c1f12e4f2b7a0d2001", + FullName = "Jane Smith", + EmailAddress = "jane.smith@acmecorp.example", + IsEmailAddressVerified = true, + EmailNotificationsEnabled = true + }; + + var lockKey = OrganizationNotificationWorkItemHandler.GetNotificationLockKey(PrimaryOrganizationId, isOverMonthlyLimit: true); + var lockProvider = new ThrottlingLockProvider(CacheClient, 1, TimeSpan.FromHours(1), TimeProvider, ResiliencePolicyProvider, GetService()); + + Task ProcessDuplicateWithLegacyLockAsync() + { + return lockProvider.TryUsingAsync(lockKey, async () => + { + await Mailer.SendOrganizationNoticeAsync(user, organization, isOverMonthlyLimit: true, isOverHourlyLimit: false); + }, TimeSpan.FromMinutes(15), TestContext.Current.CancellationToken); + } + + // Act: each call simulates a stale duplicate item being dequeued in a new hour bucket + await ProcessDuplicateWithLegacyLockAsync(); // Hour 0: lock acquired, email sent + + TimeProvider.Advance(TimeSpan.FromHours(1).Add(TimeSpan.FromMinutes(1))); + await ProcessDuplicateWithLegacyLockAsync(); // Hour 1: new bucket, email sent again + + TimeProvider.Advance(TimeSpan.FromHours(1).Add(TimeSpan.FromMinutes(1))); + await ProcessDuplicateWithLegacyLockAsync(); // Hour 2: new bucket, email sent again + + // Assert: the old code allowed one email per hour — this is the bug + Assert.Equal(3, Mailer.OrganizationNoticeCount); + } + + private async Task SubscribeToPlanOverageAsync(IQueue workItemQueue, int subscriberCount) + { + for (int i = 0; i < subscriberCount; i++) + { + var startupAction = new EnqueueOrganizationNotificationOnPlanOverage(workItemQueue, MessageSubscriber, GetService()); + await startupAction.RunAsync(); + } + } + + private InMemoryQueue CreateWorkItemQueue(params IQueueBehavior[] behaviors) + { + var options = new InMemoryQueueOptions + { + Serializer = GetService(), + TimeProvider = TimeProvider, + LoggerFactory = GetService() + }; + + if (behaviors.Length > 0) + options.Behaviors = behaviors; + + return new InMemoryQueue(options); + } + + private static OrganizationNotificationWorkItem CreateMonthlyNotificationWorkItem(string organizationId) + { + return new OrganizationNotificationWorkItem + { + OrganizationId = organizationId, + IsOverHourlyLimit = false, + IsOverMonthlyLimit = true + }; + } + + private static OrganizationNotificationWorkItem CreateHourlyNotificationWorkItem(string organizationId) + { + return new OrganizationNotificationWorkItem + { + OrganizationId = organizationId, + IsOverHourlyLimit = true, + IsOverMonthlyLimit = false + }; + } +} diff --git a/tests/Exceptionless.Tests/Mail/CountingMailer.cs b/tests/Exceptionless.Tests/Mail/CountingMailer.cs new file mode 100644 index 0000000000..96d5671c3f --- /dev/null +++ b/tests/Exceptionless.Tests/Mail/CountingMailer.cs @@ -0,0 +1,79 @@ +using Exceptionless.Core.Mail; +using Exceptionless.Core.Models; + +namespace Exceptionless.Tests.Mail; + +public class CountingMailer : IMailer +{ + private int _organizationNoticeCount; + + public int OrganizationNoticeCount => _organizationNoticeCount; + + public List OrganizationNoticeCalls { get; } = []; + + /// + /// When true, throws instead of recording a call. + /// Reset by . + /// + public bool ShouldThrow { get; set; } + + public Task SendEventNoticeAsync(User user, PersistentEvent ev, Project project, bool isNew, bool isRegression, int totalOccurrences) + { + return Task.FromResult(true); + } + + public Task SendOrganizationAddedAsync(User sender, Organization organization, User user) + { + return Task.CompletedTask; + } + + public Task SendOrganizationInviteAsync(User sender, Organization organization, Invite invite) + { + return Task.CompletedTask; + } + + public Task SendOrganizationNoticeAsync(User user, Organization organization, bool isOverMonthlyLimit, bool isOverHourlyLimit) + { + if (ShouldThrow) + throw new InvalidOperationException("Simulated mailer failure."); + + Interlocked.Increment(ref _organizationNoticeCount); + lock (OrganizationNoticeCalls) + { + OrganizationNoticeCalls.Add(new OrganizationNoticeCall(user.Id, organization.Id, isOverMonthlyLimit, isOverHourlyLimit)); + } + return Task.CompletedTask; + } + + public Task SendOrganizationPaymentFailedAsync(User owner, Organization organization) + { + return Task.CompletedTask; + } + + public Task SendProjectDailySummaryAsync(User user, Project project, IEnumerable? mostFrequent, IEnumerable? newest, DateTime startDate, bool hasSubmittedEvents, double count, double uniqueCount, double newCount, double fixedCount, int blockedCount, int tooBigCount, bool isFreePlan) + { + return Task.CompletedTask; + } + + public Task SendUserEmailVerifyAsync(User user) + { + return Task.CompletedTask; + } + + public Task SendUserPasswordResetAsync(User user) + { + return Task.CompletedTask; + } + + public void Reset() + { + Interlocked.Exchange(ref _organizationNoticeCount, 0); + lock (OrganizationNoticeCalls) + { + OrganizationNoticeCalls.Clear(); + } + ShouldThrow = false; + } +} + +public record OrganizationNoticeCall(string UserId, string OrganizationId, bool IsOverMonthlyLimit, bool IsOverHourlyLimit); diff --git a/tests/Exceptionless.Tests/Services/UsageServiceTests.cs b/tests/Exceptionless.Tests/Services/UsageServiceTests.cs index a4ea26928c..a07143ea98 100644 --- a/tests/Exceptionless.Tests/Services/UsageServiceTests.cs +++ b/tests/Exceptionless.Tests/Services/UsageServiceTests.cs @@ -1,11 +1,15 @@ using System.Diagnostics; using Exceptionless.Core.Billing; +using Exceptionless.Core.Extensions; using Exceptionless.Core.Messaging.Models; using Exceptionless.Core.Models; +using Exceptionless.Core.Models.WorkItems; using Exceptionless.Core.Repositories; using Exceptionless.Core.Services; +using Exceptionless.DateTimeExtensions; using Exceptionless.Tests.Extensions; using Foundatio.AsyncEx; +using Foundatio.Caching; using Foundatio.Messaging; using Foundatio.Repositories; using Xunit; @@ -19,6 +23,7 @@ public sealed class UsageServiceTests : IntegrationTestsBase private readonly IProjectRepository _projectRepository; private readonly UsageService _usageService; private readonly BillingPlans _plans; + private readonly ICacheClient _cache; public UsageServiceTests(ITestOutputHelper output, AppWebHostFactory factory) : base(output, factory) { @@ -28,6 +33,161 @@ public UsageServiceTests(ITestOutputHelper output, AppWebHostFactory factory) : _organizationRepository = GetService(); _projectRepository = GetService(); _plans = GetService(); + _cache = GetService(); + } + + private Task SetMonthlySentMarkerAsync(string organizationId) + { + return _cache.SetAsync( + OrganizationNotificationWorkItem.GetNotificationSentKey(organizationId, isOverMonthlyLimit: true), + true, + TimeProvider.GetUtcNow().UtcDateTime.EndOfMonth()); + } + + private Task MonthlySentMarkerExistsAsync(string organizationId) + { + return _cache.ExistsAsync(OrganizationNotificationWorkItem.GetNotificationSentKey(organizationId, isOverMonthlyLimit: true)); + } + + [Fact] + public async Task HandleOrganizationChangeAsync_WhenMonthlyPlanLimitIncreases_ShouldResetOverageNotificationSentMarker() + { + // Arrange + var original = new Organization + { + Id = "664ec4c1f12e4f2b7a0d3001", + Name = "Primary Organization", + PlanId = _plans.SmallPlan.Id, + MaxEventsPerMonth = 100 + }; + + var modified = new Organization + { + Id = original.Id, + Name = original.Name, + PlanId = _plans.MediumPlan.Id, + MaxEventsPerMonth = 200 + }; + + await SetMonthlySentMarkerAsync(original.Id); + + // Act + await _usageService.HandleOrganizationChangeAsync(modified, original); + + // Assert + Assert.False(await MonthlySentMarkerExistsAsync(original.Id)); + } + + [Fact] + public async Task HandleOrganizationChangeAsync_WhenPlanChangesToUnlimited_ShouldResetOverageNotificationSentMarker() + { + // Arrange + var original = new Organization + { + Id = "664ec4c1f12e4f2b7a0d3002", + Name = "Primary Organization", + PlanId = _plans.EnterprisePlan.Id, + MaxEventsPerMonth = _plans.EnterprisePlan.MaxEventsPerMonth + }; + + var modified = new Organization + { + Id = original.Id, + Name = original.Name, + PlanId = _plans.UnlimitedPlan.Id, + MaxEventsPerMonth = _plans.UnlimitedPlan.MaxEventsPerMonth + }; + + await SetMonthlySentMarkerAsync(original.Id); + + // Act + await _usageService.HandleOrganizationChangeAsync(modified, original); + + // Assert + Assert.False(await MonthlySentMarkerExistsAsync(original.Id)); + } + + [Fact] + public async Task HandleOrganizationChangeAsync_WhenPlanLimitDecreasesBelowCurrentUsage_ShouldResetMarkerAndPublishMonthlyOverage() + { + // Arrange + var messageBus = GetService(); + var countdown = new AsyncCountdownEvent(1); + PlanOverage? overage = null; + await messageBus.SubscribeAsync(po => + { + overage = po; + countdown.Signal(); + }, TestCancellationToken); + + var original = new Organization + { + Id = "664ec4c1f12e4f2b7a0d3003", + Name = "Primary Organization", + PlanId = _plans.MediumPlan.Id, + MaxEventsPerMonth = 200 + }; + + var modified = new Organization + { + Id = original.Id, + Name = original.Name, + PlanId = _plans.SmallPlan.Id, + MaxEventsPerMonth = 100 + }; + modified.GetCurrentUsage(TimeProvider).Total = 150; + + await SetMonthlySentMarkerAsync(original.Id); + + // Act + await _usageService.HandleOrganizationChangeAsync(modified, original); + await countdown.WaitAsync(TimeSpan.FromMilliseconds(150)); + + // Assert + Assert.False(await MonthlySentMarkerExistsAsync(original.Id)); + Assert.NotNull(overage); + Assert.Equal(modified.Id, overage.OrganizationId); + Assert.False(overage.IsHourly); + } + + [Fact] + public async Task HandleOrganizationChangeAsync_WhenAlreadyOverMonthlyLimitAndPlanLimitDecreases_ShouldKeepMarkerAndNotPublishMonthlyOverage() + { + // Arrange + var messageBus = GetService(); + var countdown = new AsyncCountdownEvent(1); + await messageBus.SubscribeAsync(po => + { + if (!po.IsHourly) + countdown.Signal(); + }, TestCancellationToken); + + var original = new Organization + { + Id = "664ec4c1f12e4f2b7a0d3004", + Name = "Primary Organization", + PlanId = _plans.MediumPlan.Id, + MaxEventsPerMonth = 200 + }; + original.GetCurrentUsage(TimeProvider).Total = 250; + + var modified = new Organization + { + Id = original.Id, + Name = original.Name, + PlanId = _plans.SmallPlan.Id, + MaxEventsPerMonth = 100 + }; + modified.GetCurrentUsage(TimeProvider).Total = 250; + + await SetMonthlySentMarkerAsync(original.Id); + + // Act + await _usageService.HandleOrganizationChangeAsync(modified, original); + + // Assert + await Assert.ThrowsAsync(() => countdown.WaitAsync(TimeSpan.FromMilliseconds(150))); + Assert.True(await MonthlySentMarkerExistsAsync(original.Id)); } [Fact] From b52b222cfe5dcb4408267d9e9a72d785261a5092 Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Sat, 30 May 2026 14:17:03 -0500 Subject: [PATCH 02/10] Address plan overage review feedback --- .../OrganizationNotificationWorkItemHandler.cs | 4 ++-- .../Models/WorkItems/OrganizationNotificationWorkItem.cs | 5 ----- src/Exceptionless.Core/Services/UsageService.cs | 6 +++--- .../OrganizationNotificationWorkItemHandlerTests.cs | 7 +++---- tests/Exceptionless.Tests/Services/UsageServiceTests.cs | 6 +++--- 5 files changed, 11 insertions(+), 17 deletions(-) diff --git a/src/Exceptionless.Core/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandler.cs b/src/Exceptionless.Core/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandler.cs index 1eeef4b95c..372eb0a25a 100644 --- a/src/Exceptionless.Core/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandler.cs +++ b/src/Exceptionless.Core/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandler.cs @@ -65,7 +65,7 @@ await _workItemQueue.EnqueueAsync(new OrganizationNotificationWorkItem /// queue entry, preventing duplicates from being enqueued in the first place. /// /// -/// Handler-level idempotency: a distributed lock serialises concurrent processing, and a +/// Handler-level idempotency: a distributed lock serializes concurrent processing, and a /// sent marker () ensures that any stale duplicates /// already in the queue before the fix deployed cannot re-trigger an email in the same UTC /// month. The marker is reset when a monthly plan limit change re-evaluates the overage state. @@ -203,6 +203,6 @@ public static string GetNotificationLockKey(string organizationId, bool isOverMo public static string GetNotificationSentKey(string organizationId, bool isOverMonthlyLimit) { - return OrganizationNotificationWorkItem.GetNotificationSentKey(organizationId, isOverMonthlyLimit); + return $"{OrganizationNotificationWorkItem.GetNotificationKey(organizationId, isOverMonthlyLimit)}-sent"; } } diff --git a/src/Exceptionless.Core/Models/WorkItems/OrganizationNotificationWorkItem.cs b/src/Exceptionless.Core/Models/WorkItems/OrganizationNotificationWorkItem.cs index 6e6401d2f4..89f80e3434 100644 --- a/src/Exceptionless.Core/Models/WorkItems/OrganizationNotificationWorkItem.cs +++ b/src/Exceptionless.Core/Models/WorkItems/OrganizationNotificationWorkItem.cs @@ -17,9 +17,4 @@ public static string GetNotificationKey(string organizationId, bool isOverMonthl { return $"Organization:{organizationId}:notification:{(isOverMonthlyLimit ? MonthlyNotificationType : HourlyNotificationType)}"; } - - public static string GetNotificationSentKey(string organizationId, bool isOverMonthlyLimit) - { - return $"{GetNotificationKey(organizationId, isOverMonthlyLimit)}-sent"; - } } diff --git a/src/Exceptionless.Core/Services/UsageService.cs b/src/Exceptionless.Core/Services/UsageService.cs index 0ba03190df..0ad54e444d 100644 --- a/src/Exceptionless.Core/Services/UsageService.cs +++ b/src/Exceptionless.Core/Services/UsageService.cs @@ -1,7 +1,7 @@ using Exceptionless.Core.Extensions; +using Exceptionless.Core.Jobs.WorkItemHandlers; using Exceptionless.Core.Messaging.Models; using Exceptionless.Core.Models; -using Exceptionless.Core.Models.WorkItems; using Exceptionless.Core.Repositories; using Exceptionless.DateTimeExtensions; using Foundatio.Caching; @@ -227,7 +227,7 @@ public async Task HandleOrganizationChangeAsync(Organization modified, Organizat bool isMonthlyLimitIncrease = modifiedMaxEvents < 0 || (originalMaxEvents >= 0 && modifiedMaxEvents > originalMaxEvents); if (isMonthlyLimitIncrease) { - await _cache.RemoveAsync(OrganizationNotificationWorkItem.GetNotificationSentKey(modified.Id, isOverMonthlyLimit: true)); + await _cache.RemoveAsync(OrganizationNotificationWorkItemHandler.GetNotificationSentKey(modified.Id, isOverMonthlyLimit: true)); await _cache.RemoveAsync(GetThrottledKey(utcNow, modified.Id)); return; } @@ -236,7 +236,7 @@ public async Task HandleOrganizationChangeAsync(Organization modified, Organizat bool isOverMonthlyLimit = modified.IsOverMonthlyLimit(_timeProvider); if (!wasOverMonthlyLimit && isOverMonthlyLimit) { - await _cache.RemoveAsync(OrganizationNotificationWorkItem.GetNotificationSentKey(modified.Id, isOverMonthlyLimit: true)); + await _cache.RemoveAsync(OrganizationNotificationWorkItemHandler.GetNotificationSentKey(modified.Id, isOverMonthlyLimit: true)); await _messagePublisher.PublishAsync(new PlanOverage { OrganizationId = modified.Id }); } diff --git a/tests/Exceptionless.Tests/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandlerTests.cs b/tests/Exceptionless.Tests/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandlerTests.cs index 4b04b5e905..7b62ad519c 100644 --- a/tests/Exceptionless.Tests/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandlerTests.cs +++ b/tests/Exceptionless.Tests/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandlerTests.cs @@ -30,7 +30,7 @@ namespace Exceptionless.Tests.Jobs.WorkItemHandlers; /// /// Fix: queue-level dedup ( + /// DuplicateDetectionQueueBehavior) collapses the fanout at enqueue time, and -/// handler-level idempotency (per-org distributed lock + sent marker) ensures that stale +/// handler-level idempotency (per-organization distributed lock + sent marker) ensures that stale /// duplicates already in the queue when the fix deployed cannot retrigger an email. /// public class OrganizationNotificationWorkItemHandlerTests : TestWithServices @@ -240,14 +240,13 @@ private InMemoryQueue CreateWorkItemQueue(params IQueueBehavior { + Behaviors = behaviors, Serializer = GetService(), TimeProvider = TimeProvider, + ResiliencePolicyProvider = GetService(), LoggerFactory = GetService() }; - if (behaviors.Length > 0) - options.Behaviors = behaviors; - return new InMemoryQueue(options); } diff --git a/tests/Exceptionless.Tests/Services/UsageServiceTests.cs b/tests/Exceptionless.Tests/Services/UsageServiceTests.cs index a07143ea98..46d065cea3 100644 --- a/tests/Exceptionless.Tests/Services/UsageServiceTests.cs +++ b/tests/Exceptionless.Tests/Services/UsageServiceTests.cs @@ -1,9 +1,9 @@ using System.Diagnostics; using Exceptionless.Core.Billing; using Exceptionless.Core.Extensions; +using Exceptionless.Core.Jobs.WorkItemHandlers; using Exceptionless.Core.Messaging.Models; using Exceptionless.Core.Models; -using Exceptionless.Core.Models.WorkItems; using Exceptionless.Core.Repositories; using Exceptionless.Core.Services; using Exceptionless.DateTimeExtensions; @@ -39,14 +39,14 @@ public UsageServiceTests(ITestOutputHelper output, AppWebHostFactory factory) : private Task SetMonthlySentMarkerAsync(string organizationId) { return _cache.SetAsync( - OrganizationNotificationWorkItem.GetNotificationSentKey(organizationId, isOverMonthlyLimit: true), + OrganizationNotificationWorkItemHandler.GetNotificationSentKey(organizationId, isOverMonthlyLimit: true), true, TimeProvider.GetUtcNow().UtcDateTime.EndOfMonth()); } private Task MonthlySentMarkerExistsAsync(string organizationId) { - return _cache.ExistsAsync(OrganizationNotificationWorkItem.GetNotificationSentKey(organizationId, isOverMonthlyLimit: true)); + return _cache.ExistsAsync(OrganizationNotificationWorkItemHandler.GetNotificationSentKey(organizationId, isOverMonthlyLimit: true)); } [Fact] From 21b51053645cd3af9253cd0a608c3bd2c3a60f07 Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Sat, 30 May 2026 14:27:22 -0500 Subject: [PATCH 03/10] Tighten overage notification review fixes --- .../OrganizationNotificationWorkItemHandler.cs | 6 +++--- .../Models/WorkItems/OrganizationNotificationWorkItem.cs | 2 +- tests/Exceptionless.Tests/Services/UsageServiceTests.cs | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/Exceptionless.Core/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandler.cs b/src/Exceptionless.Core/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandler.cs index 372eb0a25a..a2c4e1cb14 100644 --- a/src/Exceptionless.Core/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandler.cs +++ b/src/Exceptionless.Core/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandler.cs @@ -80,7 +80,7 @@ await _workItemQueue.EnqueueAsync(new OrganizationNotificationWorkItem /// public class OrganizationNotificationWorkItemHandler : WorkItemHandlerBase { - private static readonly TimeSpan WorkItemLockTimeout = TimeSpan.FromMinutes(65); + private static readonly TimeSpan WorkItemLockTimeout = TimeSpan.FromMinutes(90); private readonly IOrganizationRepository _organizationRepository; private readonly IUserRepository _userRepository; @@ -114,8 +114,8 @@ public OrganizationNotificationWorkItemHandler(IOrganizationRepository organizat if (!ShouldSendNotificationEmail(wi)) return base.GetWorkItemLockAsync(workItem, cancellationToken); - // timeUntilExpires: exceed the 1-hour work-item queue timeout so a slow send does not let - // a duplicate worker acquire the notification lock at the queue visibility boundary. + // timeUntilExpires: comfortably exceed the 1-hour work-item queue timeout so a slow send + // does not let a duplicate worker acquire the notification lock at the queue visibility boundary. // // acquireTimeout: TimeSpan.Zero — if another worker already holds the lock, return null // immediately so WorkItemJob calls AbandonAsync. The item is retried later, at which point diff --git a/src/Exceptionless.Core/Models/WorkItems/OrganizationNotificationWorkItem.cs b/src/Exceptionless.Core/Models/WorkItems/OrganizationNotificationWorkItem.cs index 89f80e3434..89a766a51c 100644 --- a/src/Exceptionless.Core/Models/WorkItems/OrganizationNotificationWorkItem.cs +++ b/src/Exceptionless.Core/Models/WorkItems/OrganizationNotificationWorkItem.cs @@ -11,7 +11,7 @@ public record OrganizationNotificationWorkItem : IHaveUniqueIdentifier public required bool IsOverHourlyLimit { get; init; } public required bool IsOverMonthlyLimit { get; init; } - public string? UniqueIdentifier => GetNotificationKey(OrganizationId, IsOverMonthlyLimit); + public string UniqueIdentifier => GetNotificationKey(OrganizationId, IsOverMonthlyLimit); public static string GetNotificationKey(string organizationId, bool isOverMonthlyLimit) { diff --git a/tests/Exceptionless.Tests/Services/UsageServiceTests.cs b/tests/Exceptionless.Tests/Services/UsageServiceTests.cs index 46d065cea3..d0216debef 100644 --- a/tests/Exceptionless.Tests/Services/UsageServiceTests.cs +++ b/tests/Exceptionless.Tests/Services/UsageServiceTests.cs @@ -141,7 +141,7 @@ await messageBus.SubscribeAsync(po => // Act await _usageService.HandleOrganizationChangeAsync(modified, original); - await countdown.WaitAsync(TimeSpan.FromMilliseconds(150)); + await countdown.WaitAsync(TimeSpan.FromSeconds(5)); // Assert Assert.False(await MonthlySentMarkerExistsAsync(original.Id)); From 0e9b68751c820ca341de5f4fec6dc1932c727b5b Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Sat, 30 May 2026 14:30:02 -0500 Subject: [PATCH 04/10] Address overage notification follow-up feedback --- .../OrganizationNotificationWorkItemHandler.cs | 4 ++-- tests/Exceptionless.Tests/Services/UsageServiceTests.cs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Exceptionless.Core/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandler.cs b/src/Exceptionless.Core/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandler.cs index a2c4e1cb14..c7614bd47d 100644 --- a/src/Exceptionless.Core/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandler.cs +++ b/src/Exceptionless.Core/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandler.cs @@ -127,11 +127,11 @@ public OrganizationNotificationWorkItemHandler(IOrganizationRepository organizat public override async Task HandleItemAsync(WorkItemContext context) { var wi = context.GetData()!; + Log.LogInformation("Received organization notification work item for: {OrganizationId} IsOverHourlyLimit: {IsOverHourlyLimit} IsOverMonthlyLimit: {IsOverMonthlyLimit}", wi.OrganizationId, wi.IsOverHourlyLimit, wi.IsOverMonthlyLimit); + if (!ShouldSendNotificationEmail(wi)) return; - Log.LogInformation("Received organization notification work item for: {OrganizationId} IsOverHourlyLimit: {IsOverHourlyLimit} IsOverMonthlyLimit: {IsOverMonthlyLimit}", wi.OrganizationId, wi.IsOverHourlyLimit, wi.IsOverMonthlyLimit); - var organization = await _organizationRepository.GetByIdAsync(wi.OrganizationId, o => o.Cache()); if (organization is null) return; diff --git a/tests/Exceptionless.Tests/Services/UsageServiceTests.cs b/tests/Exceptionless.Tests/Services/UsageServiceTests.cs index d0216debef..e0da7b0b5c 100644 --- a/tests/Exceptionless.Tests/Services/UsageServiceTests.cs +++ b/tests/Exceptionless.Tests/Services/UsageServiceTests.cs @@ -186,7 +186,7 @@ await messageBus.SubscribeAsync(po => await _usageService.HandleOrganizationChangeAsync(modified, original); // Assert - await Assert.ThrowsAsync(() => countdown.WaitAsync(TimeSpan.FromMilliseconds(150))); + await Assert.ThrowsAsync(() => countdown.WaitAsync(TimeSpan.FromSeconds(1))); Assert.True(await MonthlySentMarkerExistsAsync(original.Id)); } From 7476bfae341a5e1c37c93cd8425ac6e269fb5a31 Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Sat, 30 May 2026 15:08:44 -0500 Subject: [PATCH 05/10] Address overage notification review concerns --- ...OrganizationNotificationWorkItemHandler.cs | 51 ++++------------- .../Services/NotificationService.cs | 56 ++++++++++++++++++- .../Services/UsageService.cs | 9 ++- ...ficationWorkItemHandlerIntegrationTests.cs | 43 ++++++++------ ...izationNotificationWorkItemHandlerTests.cs | 55 ------------------ .../Services/NotificationServiceTests.cs | 42 ++++++++++++++ .../Services/UsageServiceTests.cs | 11 ++-- 7 files changed, 143 insertions(+), 124 deletions(-) create mode 100644 tests/Exceptionless.Tests/Services/NotificationServiceTests.cs diff --git a/src/Exceptionless.Core/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandler.cs b/src/Exceptionless.Core/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandler.cs index c7614bd47d..fbf30ea43a 100644 --- a/src/Exceptionless.Core/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandler.cs +++ b/src/Exceptionless.Core/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandler.cs @@ -4,8 +4,7 @@ using Exceptionless.Core.Models; using Exceptionless.Core.Models.WorkItems; using Exceptionless.Core.Repositories; -using Exceptionless.DateTimeExtensions; -using Foundatio.Caching; +using Exceptionless.Core.Services; using Foundatio.Extensions.Hosting.Startup; using Foundatio.Jobs; using Foundatio.Lock; @@ -66,7 +65,7 @@ await _workItemQueue.EnqueueAsync(new OrganizationNotificationWorkItem /// /// /// Handler-level idempotency: a distributed lock serializes concurrent processing, and a -/// sent marker () ensures that any stale duplicates +/// sent marker ensures that any stale duplicates /// already in the queue before the fix deployed cannot re-trigger an email in the same UTC /// month. The marker is reset when a monthly plan limit change re-evaluates the overage state. /// @@ -80,27 +79,19 @@ await _workItemQueue.EnqueueAsync(new OrganizationNotificationWorkItem /// public class OrganizationNotificationWorkItemHandler : WorkItemHandlerBase { - private static readonly TimeSpan WorkItemLockTimeout = TimeSpan.FromMinutes(90); - private readonly IOrganizationRepository _organizationRepository; private readonly IUserRepository _userRepository; private readonly IMailer _mailer; - private readonly ICacheClient _cacheClient; + private readonly NotificationService _notificationService; private readonly TimeProvider _timeProvider; - // ILockProvider is kept local rather than pushed to WorkItemHandlerBase because the - // lock/sent-key contract is specific to plan-limit email idempotency. Passing it through - // the base class would force every unrelated handler to acquire unnecessary plan-limit locks. - private readonly ILockProvider _lockProvider; - - public OrganizationNotificationWorkItemHandler(IOrganizationRepository organizationRepository, IUserRepository userRepository, IMailer mailer, ICacheClient cacheClient, TimeProvider timeProvider, ILockProvider lockProvider, ILoggerFactory loggerFactory) : base(loggerFactory) + public OrganizationNotificationWorkItemHandler(IOrganizationRepository organizationRepository, IUserRepository userRepository, IMailer mailer, NotificationService notificationService, TimeProvider timeProvider, ILoggerFactory loggerFactory) : base(loggerFactory) { _organizationRepository = organizationRepository; _userRepository = userRepository; _mailer = mailer; - _cacheClient = cacheClient; + _notificationService = notificationService; _timeProvider = timeProvider; - _lockProvider = lockProvider; } public override Task GetWorkItemLockAsync(object workItem, CancellationToken cancellationToken = default) @@ -114,14 +105,15 @@ public OrganizationNotificationWorkItemHandler(IOrganizationRepository organizat if (!ShouldSendNotificationEmail(wi)) return base.GetWorkItemLockAsync(workItem, cancellationToken); - // timeUntilExpires: comfortably exceed the 1-hour work-item queue timeout so a slow send - // does not let a duplicate worker acquire the notification lock at the queue visibility boundary. + // The notification service uses a 90-minute lease, comfortably exceeding the 1-hour + // work-item queue timeout so a slow send does not let a duplicate worker acquire the + // notification lock at the queue visibility boundary. // // acquireTimeout: TimeSpan.Zero — if another worker already holds the lock, return null // immediately so WorkItemJob calls AbandonAsync. The item is retried later, at which point // the sent marker is already set and the handler skips. Blocking here instead would stall // a worker slot for up to the lock timeout with no correctness benefit. - return _lockProvider.TryAcquireAsync(GetNotificationLockKey(wi.OrganizationId, wi.IsOverMonthlyLimit), WorkItemLockTimeout, TimeSpan.Zero); + return _notificationService.TryAcquireOrganizationNotificationLockAsync(wi.OrganizationId, wi.IsOverMonthlyLimit); } public override async Task HandleItemAsync(WorkItemContext context) @@ -142,14 +134,14 @@ public override async Task HandleItemAsync(WorkItemContext context) return; } - if (await WasNotificationSentAsync(wi.OrganizationId, wi.IsOverMonthlyLimit)) + if (await _notificationService.IsOrganizationNotificationSentAsync(wi.OrganizationId, wi.IsOverMonthlyLimit)) { Log.LogInformation("Skipping duplicate monthly overage notification for organization: {OrganizationId}", wi.OrganizationId); return; } await SendOverageNotificationsAsync(organization, wi.IsOverHourlyLimit, wi.IsOverMonthlyLimit); - await _cacheClient.SetAsync(GetNotificationSentKey(wi.OrganizationId, wi.IsOverMonthlyLimit), true, GetNotificationSentExpiresAtUtc()); + await _notificationService.SetOrganizationNotificationSentAsync(wi.OrganizationId, wi.IsOverMonthlyLimit); } private async Task SendOverageNotificationsAsync(Organization organization, bool isOverHourlyLimit, bool isOverMonthlyLimit) @@ -184,25 +176,4 @@ private static bool ShouldSendNotificationEmail(OrganizationNotificationWorkItem { return workItem.IsOverMonthlyLimit; } - - private async Task WasNotificationSentAsync(string organizationId, bool isOverMonthlyLimit) - { - var sent = await _cacheClient.GetAsync(GetNotificationSentKey(organizationId, isOverMonthlyLimit)); - return sent.HasValue && sent.Value; - } - - private DateTime GetNotificationSentExpiresAtUtc() - { - return _timeProvider.GetUtcNow().UtcDateTime.EndOfMonth(); - } - - public static string GetNotificationLockKey(string organizationId, bool isOverMonthlyLimit) - { - return $"{OrganizationNotificationWorkItem.GetNotificationKey(organizationId, isOverMonthlyLimit)}-lock"; - } - - public static string GetNotificationSentKey(string organizationId, bool isOverMonthlyLimit) - { - return $"{OrganizationNotificationWorkItem.GetNotificationKey(organizationId, isOverMonthlyLimit)}-sent"; - } } diff --git a/src/Exceptionless.Core/Services/NotificationService.cs b/src/Exceptionless.Core/Services/NotificationService.cs index 10b0f093b6..fc35f82c65 100644 --- a/src/Exceptionless.Core/Services/NotificationService.cs +++ b/src/Exceptionless.Core/Services/NotificationService.cs @@ -1,12 +1,16 @@ using Exceptionless.Core.Messaging.Models; +using Exceptionless.Core.Models.WorkItems; +using Exceptionless.DateTimeExtensions; using Foundatio.Caching; +using Foundatio.Lock; using Foundatio.Messaging; namespace Exceptionless.Core.Services; -public class NotificationService(ICacheClient cacheClient, IMessagePublisher messagePublisher, TimeProvider timeProvider) +public class NotificationService(ICacheClient cacheClient, IMessagePublisher messagePublisher, TimeProvider timeProvider, CacheLockProvider lockProvider) { private const string SystemNotificationCacheKey = "system-notification"; + private static readonly TimeSpan OrganizationNotificationLockTimeout = TimeSpan.FromMinutes(90); public async Task GetSystemNotificationAsync() { @@ -37,4 +41,54 @@ public async Task SendReleaseNotificationAsync(string? mess await messagePublisher.PublishAsync(notification); return notification; } + + public async Task IsOrganizationNotificationSentAsync(string organizationId, bool isOverMonthlyLimit) + { + var sent = await cacheClient.GetAsync(GetOrganizationNotificationSentKey(organizationId, isOverMonthlyLimit)); + return sent.HasValue && sent.Value; + } + + public Task SetOrganizationNotificationSentAsync(string organizationId, bool isOverMonthlyLimit) + { + return cacheClient.SetAsync(GetOrganizationNotificationSentKey(organizationId, isOverMonthlyLimit), true, GetOrganizationNotificationSentExpiresAtUtc()); + } + + public Task RemoveOrganizationNotificationSentAsync(string organizationId, bool isOverMonthlyLimit) + { + return cacheClient.RemoveAsync(GetOrganizationNotificationSentKey(organizationId, isOverMonthlyLimit)); + } + + public Task TryAcquireOrganizationNotificationLockAsync(string organizationId, bool isOverMonthlyLimit) + { + return lockProvider.TryAcquireAsync(GetOrganizationNotificationLockKey(organizationId, isOverMonthlyLimit), OrganizationNotificationLockTimeout, TimeSpan.Zero); + } + + public async Task IsOrganizationNotificationLockedAsync(string organizationId, bool isOverMonthlyLimit) + { + await using var notificationLock = await TryAcquireOrganizationNotificationLockAsync(organizationId, isOverMonthlyLimit); + return notificationLock is null; + } + + private static string GetOrganizationNotificationLockKey(string organizationId, bool isOverMonthlyLimit) + { + return $"{OrganizationNotificationWorkItem.GetNotificationKey(organizationId, isOverMonthlyLimit)}-lock"; + } + + private static string GetOrganizationNotificationSentKey(string organizationId, bool isOverMonthlyLimit) + { + return $"{OrganizationNotificationWorkItem.GetNotificationKey(organizationId, isOverMonthlyLimit)}-sent"; + } + + private DateTime GetOrganizationNotificationSentExpiresAtUtc() + { + var utcNow = timeProvider.GetUtcNow().UtcDateTime; + var expiresAtUtc = utcNow.StartOfMonth().AddMonths(1); + + // Foundatio treats absolute cache expirations less than 5ms in the future as already expired. + // Keep the marker observable for sends that complete in the final milliseconds of the UTC month. + if (expiresAtUtc - utcNow < CacheClientExtensions.MinimumExpiration) + return utcNow.Add(CacheClientExtensions.MinimumExpiration); + + return expiresAtUtc; + } } diff --git a/src/Exceptionless.Core/Services/UsageService.cs b/src/Exceptionless.Core/Services/UsageService.cs index 0ad54e444d..b069c0e1f5 100644 --- a/src/Exceptionless.Core/Services/UsageService.cs +++ b/src/Exceptionless.Core/Services/UsageService.cs @@ -1,5 +1,4 @@ using Exceptionless.Core.Extensions; -using Exceptionless.Core.Jobs.WorkItemHandlers; using Exceptionless.Core.Messaging.Models; using Exceptionless.Core.Models; using Exceptionless.Core.Repositories; @@ -17,11 +16,13 @@ public class UsageService private readonly IProjectRepository _projectRepository; private readonly ICacheClient _cache; private readonly IMessagePublisher _messagePublisher; + private readonly NotificationService _notificationService; private readonly TimeProvider _timeProvider; private readonly ILogger _logger; private readonly TimeSpan _bucketSize = TimeSpan.FromMinutes(5); public UsageService(IOrganizationRepository organizationRepository, IProjectRepository projectRepository, ICacheClient cache, IMessagePublisher messagePublisher, + NotificationService notificationService, TimeProvider timeProvider, ILoggerFactory loggerFactory) { @@ -29,6 +30,7 @@ public UsageService(IOrganizationRepository organizationRepository, IProjectRepo _projectRepository = projectRepository; _cache = cache; _messagePublisher = messagePublisher; + _notificationService = notificationService; _timeProvider = timeProvider; _logger = loggerFactory.CreateLogger(); } @@ -227,7 +229,8 @@ public async Task HandleOrganizationChangeAsync(Organization modified, Organizat bool isMonthlyLimitIncrease = modifiedMaxEvents < 0 || (originalMaxEvents >= 0 && modifiedMaxEvents > originalMaxEvents); if (isMonthlyLimitIncrease) { - await _cache.RemoveAsync(OrganizationNotificationWorkItemHandler.GetNotificationSentKey(modified.Id, isOverMonthlyLimit: true)); + // A higher monthly limit ends the current overage state: allow a future monthly overage email and clear the hourly throttle window. + await _notificationService.RemoveOrganizationNotificationSentAsync(modified.Id, isOverMonthlyLimit: true); await _cache.RemoveAsync(GetThrottledKey(utcNow, modified.Id)); return; } @@ -236,7 +239,7 @@ public async Task HandleOrganizationChangeAsync(Organization modified, Organizat bool isOverMonthlyLimit = modified.IsOverMonthlyLimit(_timeProvider); if (!wasOverMonthlyLimit && isOverMonthlyLimit) { - await _cache.RemoveAsync(OrganizationNotificationWorkItemHandler.GetNotificationSentKey(modified.Id, isOverMonthlyLimit: true)); + await _notificationService.RemoveOrganizationNotificationSentAsync(modified.Id, isOverMonthlyLimit: true); await _messagePublisher.PublishAsync(new PlanOverage { OrganizationId = modified.Id }); } diff --git a/tests/Exceptionless.Tests/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandlerIntegrationTests.cs b/tests/Exceptionless.Tests/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandlerIntegrationTests.cs index 72fb560dcb..7925aed41e 100644 --- a/tests/Exceptionless.Tests/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandlerIntegrationTests.cs +++ b/tests/Exceptionless.Tests/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandlerIntegrationTests.cs @@ -6,10 +6,9 @@ using Exceptionless.Core.Models; using Exceptionless.Core.Models.WorkItems; using Exceptionless.Core.Repositories; -using Exceptionless.DateTimeExtensions; +using Exceptionless.Core.Services; using Exceptionless.Tests.Mail; using Exceptionless.Tests.Utility; -using Foundatio.Caching; using Foundatio.Jobs; using Foundatio.Repositories; using Microsoft.Extensions.DependencyInjection; @@ -30,7 +29,7 @@ public class OrganizationNotificationWorkItemHandlerIntegrationTests : Integrati public OrganizationNotificationWorkItemHandlerIntegrationTests(ITestOutputHelper output, AppWebHostFactory factory) : base(output, factory) { } private CountingMailer Mailer => GetService(); - private ICacheClient CacheClient => GetService(); + private NotificationService NotificationService => GetService(); private OrganizationNotificationWorkItemHandler Handler => GetService(); private IOrganizationRepository OrganizationRepository => GetService(); private IUserRepository UserRepository => GetService(); @@ -41,10 +40,7 @@ public OrganizationNotificationWorkItemHandlerIntegrationTests(ITestOutputHelper private Task SetMonthlySentMarkerAsync(string organizationId) { - return CacheClient.SetAsync( - OrganizationNotificationWorkItemHandler.GetNotificationSentKey(organizationId, isOverMonthlyLimit: true), - true, - TimeProvider.GetUtcNow().UtcDateTime.EndOfMonth()); + return NotificationService.SetOrganizationNotificationSentAsync(organizationId, isOverMonthlyLimit: true); } protected override void RegisterServices(IServiceCollection services) @@ -85,6 +81,9 @@ protected override async Task ResetDataAsync() public async Task HandleItemAsync_WhenDuplicateMonthlyNotificationsAreProcessedAcrossHours_ShouldSendOneEmail() { // Arrange + TimeProvider.SetUtcNow(new DateTime(2026, 5, 15, 12, 0, 0, DateTimeKind.Utc)); + await SetOrganizationOverMonthlyLimitAsync(PrimaryOrganizationId); + // Act await HandleWorkItemAsync(CreateMonthlyNotificationWorkItem(PrimaryOrganizationId)); @@ -246,9 +245,22 @@ public async Task HandleItemAsync_WhenMonthlyEmailIsSent_ShouldWriteSentMarker() await HandleWorkItemAsync(workItem); // Assert - var sentMarkerExists = await CacheClient.ExistsAsync( - OrganizationNotificationWorkItemHandler.GetNotificationSentKey(PrimaryOrganizationId, isOverMonthlyLimit: true)); - Assert.True(sentMarkerExists); + Assert.True(await NotificationService.IsOrganizationNotificationSentAsync(PrimaryOrganizationId, isOverMonthlyLimit: true)); + } + + [Fact] + public async Task HandleItemAsync_WhenMonthlyEmailIsSentInLastMillisecondOfUtcMonth_ShouldWriteObservableSentMarker() + { + // Arrange + TimeProvider.SetUtcNow(new DateTime(2026, 5, 31, 23, 59, 59, 999, DateTimeKind.Utc)); + await SetOrganizationOverMonthlyLimitAsync(PrimaryOrganizationId); + + // Act + await HandleWorkItemAsync(CreateMonthlyNotificationWorkItem(PrimaryOrganizationId)); + + // Assert + Assert.True(await NotificationService.IsOrganizationNotificationSentAsync(PrimaryOrganizationId, isOverMonthlyLimit: true)); + Assert.Equal(1, Mailer.OrganizationNoticeCount); } [Fact] @@ -265,9 +277,7 @@ public async Task HandleItemAsync_WhenMonthlyWorkItemIsStaleAndOrganizationIsNoL await HandleWorkItemAsync(CreateMonthlyNotificationWorkItem(PrimaryOrganizationId)); // Assert - var sentMarkerExists = await CacheClient.ExistsAsync( - OrganizationNotificationWorkItemHandler.GetNotificationSentKey(PrimaryOrganizationId, isOverMonthlyLimit: true)); - Assert.False(sentMarkerExists); + Assert.False(await NotificationService.IsOrganizationNotificationSentAsync(PrimaryOrganizationId, isOverMonthlyLimit: true)); Assert.Equal(0, Mailer.OrganizationNoticeCount); } @@ -281,9 +291,7 @@ public async Task HandleItemAsync_WhenOrganizationNoLongerExists_ShouldNotWriteS await HandleWorkItemAsync(workItem); // Assert - var sentMarkerExists = await CacheClient.ExistsAsync( - OrganizationNotificationWorkItemHandler.GetNotificationSentKey(MissingOrganizationId, isOverMonthlyLimit: true)); - Assert.False(sentMarkerExists); + Assert.False(await NotificationService.IsOrganizationNotificationSentAsync(MissingOrganizationId, isOverMonthlyLimit: true)); Assert.Equal(0, Mailer.OrganizationNoticeCount); } @@ -370,8 +378,7 @@ public async Task HandleItemAsync_WhenEmailSendFails_ShouldNotWriteSentMarkerSoR // Act await Assert.ThrowsAsync(() => HandleWorkItemAsync(workItem)); - var sentMarkerExists = await CacheClient.ExistsAsync( - OrganizationNotificationWorkItemHandler.GetNotificationSentKey(PrimaryOrganizationId, isOverMonthlyLimit: true)); + var sentMarkerExists = await NotificationService.IsOrganizationNotificationSentAsync(PrimaryOrganizationId, isOverMonthlyLimit: true); Mailer.ShouldThrow = false; await HandleWorkItemAsync(workItem); diff --git a/tests/Exceptionless.Tests/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandlerTests.cs b/tests/Exceptionless.Tests/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandlerTests.cs index 7b62ad519c..4b6c17df1d 100644 --- a/tests/Exceptionless.Tests/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandlerTests.cs +++ b/tests/Exceptionless.Tests/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandlerTests.cs @@ -1,19 +1,14 @@ using Exceptionless.Core; using Exceptionless.Core.Extensions; using Exceptionless.Core.Jobs.WorkItemHandlers; -using Exceptionless.Core.Mail; using Exceptionless.Core.Messaging.Models; -using Exceptionless.Core.Models; using Exceptionless.Core.Models.WorkItems; -using Exceptionless.Tests.Mail; using Foundatio.Caching; using Foundatio.Jobs; -using Foundatio.Lock; using Foundatio.Messaging; using Foundatio.Queues; using Foundatio.Resilience; using Foundatio.Serializer; -using Microsoft.Extensions.DependencyInjection; using Xunit; namespace Exceptionless.Tests.Jobs.WorkItemHandlers; @@ -43,18 +38,9 @@ public class OrganizationNotificationWorkItemHandlerTests : TestWithServices public OrganizationNotificationWorkItemHandlerTests(ITestOutputHelper output) : base(output) { } - private CountingMailer Mailer => GetService(); private IMessagePublisher MessagePublisher => GetService(); private IMessageSubscriber MessageSubscriber => GetService(); private ICacheClient CacheClient => GetService(); - private IResiliencePolicyProvider ResiliencePolicyProvider => GetService(); - - protected override void RegisterServices(IServiceCollection services, AppOptions options) - { - base.RegisterServices(services, options); - services.AddSingleton(); - services.ReplaceSingleton(sp => sp.GetRequiredService()); - } [Fact] public async Task RunAsync_WhenOnePlanOverageIsObservedBySixSubscribersWithoutQueueDuplicateDetection_ShouldEnqueueSixWorkItems() @@ -186,47 +172,6 @@ public void GetNotificationKey_WhenMonthlyAndHourlyNotificationsAreCreated_Shoul Assert.NotEqual(monthlyKey, hourlyKey); } - [Fact] - public async Task HandleItemAsync_WhenLegacyHourlyThrottleProcessesStaleMonthlyDuplicates_ShouldSendOneEmailPerHourBucket() - { - // Arrange - // Reproduce the pre-fix behavior: ThrottlingLockProvider(1/hour) allowed exactly one - // item through per calendar-hour bucket. When a duplicate was abandoned and re-queued, - // it could acquire a fresh lock once the next hour bucket opened — one email per hour. - var organization = new Organization { Id = PrimaryOrganizationId, Name = "Acme Corp" }; - var user = new User - { - Id = "664ec4c1f12e4f2b7a0d2001", - FullName = "Jane Smith", - EmailAddress = "jane.smith@acmecorp.example", - IsEmailAddressVerified = true, - EmailNotificationsEnabled = true - }; - - var lockKey = OrganizationNotificationWorkItemHandler.GetNotificationLockKey(PrimaryOrganizationId, isOverMonthlyLimit: true); - var lockProvider = new ThrottlingLockProvider(CacheClient, 1, TimeSpan.FromHours(1), TimeProvider, ResiliencePolicyProvider, GetService()); - - Task ProcessDuplicateWithLegacyLockAsync() - { - return lockProvider.TryUsingAsync(lockKey, async () => - { - await Mailer.SendOrganizationNoticeAsync(user, organization, isOverMonthlyLimit: true, isOverHourlyLimit: false); - }, TimeSpan.FromMinutes(15), TestContext.Current.CancellationToken); - } - - // Act: each call simulates a stale duplicate item being dequeued in a new hour bucket - await ProcessDuplicateWithLegacyLockAsync(); // Hour 0: lock acquired, email sent - - TimeProvider.Advance(TimeSpan.FromHours(1).Add(TimeSpan.FromMinutes(1))); - await ProcessDuplicateWithLegacyLockAsync(); // Hour 1: new bucket, email sent again - - TimeProvider.Advance(TimeSpan.FromHours(1).Add(TimeSpan.FromMinutes(1))); - await ProcessDuplicateWithLegacyLockAsync(); // Hour 2: new bucket, email sent again - - // Assert: the old code allowed one email per hour — this is the bug - Assert.Equal(3, Mailer.OrganizationNoticeCount); - } - private async Task SubscribeToPlanOverageAsync(IQueue workItemQueue, int subscriberCount) { for (int i = 0; i < subscriberCount; i++) diff --git a/tests/Exceptionless.Tests/Services/NotificationServiceTests.cs b/tests/Exceptionless.Tests/Services/NotificationServiceTests.cs new file mode 100644 index 0000000000..7d0ef61d5f --- /dev/null +++ b/tests/Exceptionless.Tests/Services/NotificationServiceTests.cs @@ -0,0 +1,42 @@ +using Exceptionless.Core.Services; +using Xunit; + +namespace Exceptionless.Tests.Services; + +public sealed class NotificationServiceTests : TestWithServices +{ + private const string PrimaryOrganizationId = "664ec4c1f12e4f2b7a0d4001"; + + public NotificationServiceTests(ITestOutputHelper output) : base(output) { } + + private NotificationService NotificationService => GetService(); + + [Fact] + public async Task SetOrganizationNotificationSentAsync_WhenCalledInLastMillisecondOfUtcMonth_ShouldWriteObservableMarker() + { + // Arrange + TimeProvider.SetUtcNow(new DateTime(2026, 5, 31, 23, 59, 59, 999, DateTimeKind.Utc)); + + // Act + await NotificationService.SetOrganizationNotificationSentAsync(PrimaryOrganizationId, isOverMonthlyLimit: true); + + // Assert + Assert.True(await NotificationService.IsOrganizationNotificationSentAsync(PrimaryOrganizationId, isOverMonthlyLimit: true)); + } + + [Fact] + public async Task TryAcquireOrganizationNotificationLockAsync_WhenOldHourlyBucketBoundaryPasses_ShouldRemainLocked() + { + // Arrange + await using var firstLock = await NotificationService.TryAcquireOrganizationNotificationLockAsync(PrimaryOrganizationId, isOverMonthlyLimit: true); + Assert.NotNull(firstLock); + + // Act + TimeProvider.Advance(TimeSpan.FromHours(1).Add(TimeSpan.FromMinutes(1))); + + // Assert + Assert.True(await NotificationService.IsOrganizationNotificationLockedAsync(PrimaryOrganizationId, isOverMonthlyLimit: true)); + await using var secondLock = await NotificationService.TryAcquireOrganizationNotificationLockAsync(PrimaryOrganizationId, isOverMonthlyLimit: true); + Assert.Null(secondLock); + } +} diff --git a/tests/Exceptionless.Tests/Services/UsageServiceTests.cs b/tests/Exceptionless.Tests/Services/UsageServiceTests.cs index e0da7b0b5c..5c9bcfbca0 100644 --- a/tests/Exceptionless.Tests/Services/UsageServiceTests.cs +++ b/tests/Exceptionless.Tests/Services/UsageServiceTests.cs @@ -1,12 +1,10 @@ using System.Diagnostics; using Exceptionless.Core.Billing; using Exceptionless.Core.Extensions; -using Exceptionless.Core.Jobs.WorkItemHandlers; using Exceptionless.Core.Messaging.Models; using Exceptionless.Core.Models; using Exceptionless.Core.Repositories; using Exceptionless.Core.Services; -using Exceptionless.DateTimeExtensions; using Exceptionless.Tests.Extensions; using Foundatio.AsyncEx; using Foundatio.Caching; @@ -22,6 +20,7 @@ public sealed class UsageServiceTests : IntegrationTestsBase private readonly IOrganizationRepository _organizationRepository; private readonly IProjectRepository _projectRepository; private readonly UsageService _usageService; + private readonly NotificationService _notificationService; private readonly BillingPlans _plans; private readonly ICacheClient _cache; @@ -30,6 +29,7 @@ public UsageServiceTests(ITestOutputHelper output, AppWebHostFactory factory) : TimeProvider.SetUtcNow(new DateTime(2015, 2, 13, 0, 0, 0, DateTimeKind.Utc)); Log.SetLogLevel(LogLevel.Information); _usageService = GetService(); + _notificationService = GetService(); _organizationRepository = GetService(); _projectRepository = GetService(); _plans = GetService(); @@ -38,15 +38,12 @@ public UsageServiceTests(ITestOutputHelper output, AppWebHostFactory factory) : private Task SetMonthlySentMarkerAsync(string organizationId) { - return _cache.SetAsync( - OrganizationNotificationWorkItemHandler.GetNotificationSentKey(organizationId, isOverMonthlyLimit: true), - true, - TimeProvider.GetUtcNow().UtcDateTime.EndOfMonth()); + return _notificationService.SetOrganizationNotificationSentAsync(organizationId, isOverMonthlyLimit: true); } private Task MonthlySentMarkerExistsAsync(string organizationId) { - return _cache.ExistsAsync(OrganizationNotificationWorkItemHandler.GetNotificationSentKey(organizationId, isOverMonthlyLimit: true)); + return _notificationService.IsOrganizationNotificationSentAsync(organizationId, isOverMonthlyLimit: true); } [Fact] From ab701452e4126df669ec9950e0c3de14931c8363 Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Sat, 30 May 2026 15:30:23 -0500 Subject: [PATCH 06/10] fix: Add input validation and refine notification lock checks This commit introduces robust input validation for `organizationId` across notification-related methods to prevent operations with null or empty IDs. Additionally, it refines the `IsOrganizationNotificationLockedAsync` method. Previously, this method would attempt to acquire a lock to determine its status. It now directly checks the lock provider's state using `IsLockedAsync`, which is more efficient and semantically correct, avoiding unnecessary lock acquisition attempts and potential side effects. These changes enhance the stability and correctness of the notification system, crucial for reliable plan limit overage handling. --- .../OrganizationNotificationWorkItem.cs | 2 + .../Services/NotificationService.cs | 15 +++++-- ...izationNotificationWorkItemHandlerTests.cs | 10 +++++ .../Services/NotificationServiceTests.cs | 39 +++++++++++++++++++ 4 files changed, 63 insertions(+), 3 deletions(-) diff --git a/src/Exceptionless.Core/Models/WorkItems/OrganizationNotificationWorkItem.cs b/src/Exceptionless.Core/Models/WorkItems/OrganizationNotificationWorkItem.cs index 89a766a51c..5c14e87743 100644 --- a/src/Exceptionless.Core/Models/WorkItems/OrganizationNotificationWorkItem.cs +++ b/src/Exceptionless.Core/Models/WorkItems/OrganizationNotificationWorkItem.cs @@ -15,6 +15,8 @@ public record OrganizationNotificationWorkItem : IHaveUniqueIdentifier public static string GetNotificationKey(string organizationId, bool isOverMonthlyLimit) { + ArgumentException.ThrowIfNullOrEmpty(organizationId); + return $"Organization:{organizationId}:notification:{(isOverMonthlyLimit ? MonthlyNotificationType : HourlyNotificationType)}"; } } diff --git a/src/Exceptionless.Core/Services/NotificationService.cs b/src/Exceptionless.Core/Services/NotificationService.cs index fc35f82c65..52f6db4024 100644 --- a/src/Exceptionless.Core/Services/NotificationService.cs +++ b/src/Exceptionless.Core/Services/NotificationService.cs @@ -44,29 +44,38 @@ public async Task SendReleaseNotificationAsync(string? mess public async Task IsOrganizationNotificationSentAsync(string organizationId, bool isOverMonthlyLimit) { + ArgumentException.ThrowIfNullOrEmpty(organizationId); + var sent = await cacheClient.GetAsync(GetOrganizationNotificationSentKey(organizationId, isOverMonthlyLimit)); return sent.HasValue && sent.Value; } public Task SetOrganizationNotificationSentAsync(string organizationId, bool isOverMonthlyLimit) { + ArgumentException.ThrowIfNullOrEmpty(organizationId); + return cacheClient.SetAsync(GetOrganizationNotificationSentKey(organizationId, isOverMonthlyLimit), true, GetOrganizationNotificationSentExpiresAtUtc()); } public Task RemoveOrganizationNotificationSentAsync(string organizationId, bool isOverMonthlyLimit) { + ArgumentException.ThrowIfNullOrEmpty(organizationId); + return cacheClient.RemoveAsync(GetOrganizationNotificationSentKey(organizationId, isOverMonthlyLimit)); } public Task TryAcquireOrganizationNotificationLockAsync(string organizationId, bool isOverMonthlyLimit) { + ArgumentException.ThrowIfNullOrEmpty(organizationId); + return lockProvider.TryAcquireAsync(GetOrganizationNotificationLockKey(organizationId, isOverMonthlyLimit), OrganizationNotificationLockTimeout, TimeSpan.Zero); } - public async Task IsOrganizationNotificationLockedAsync(string organizationId, bool isOverMonthlyLimit) + public Task IsOrganizationNotificationLockedAsync(string organizationId, bool isOverMonthlyLimit) { - await using var notificationLock = await TryAcquireOrganizationNotificationLockAsync(organizationId, isOverMonthlyLimit); - return notificationLock is null; + ArgumentException.ThrowIfNullOrEmpty(organizationId); + + return lockProvider.IsLockedAsync(GetOrganizationNotificationLockKey(organizationId, isOverMonthlyLimit)); } private static string GetOrganizationNotificationLockKey(string organizationId, bool isOverMonthlyLimit) diff --git a/tests/Exceptionless.Tests/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandlerTests.cs b/tests/Exceptionless.Tests/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandlerTests.cs index 4b6c17df1d..03ee40af0b 100644 --- a/tests/Exceptionless.Tests/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandlerTests.cs +++ b/tests/Exceptionless.Tests/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandlerTests.cs @@ -172,6 +172,16 @@ public void GetNotificationKey_WhenMonthlyAndHourlyNotificationsAreCreated_Shoul Assert.NotEqual(monthlyKey, hourlyKey); } + [Theory] + [InlineData(null)] + [InlineData("")] + public void GetNotificationKey_WhenOrganizationIdIsNullOrEmpty_ShouldThrowArgumentException(string? organizationId) + { + // Arrange + // Act & Assert + Assert.ThrowsAny(() => OrganizationNotificationWorkItem.GetNotificationKey(organizationId!, isOverMonthlyLimit: true)); + } + private async Task SubscribeToPlanOverageAsync(IQueue workItemQueue, int subscriberCount) { for (int i = 0; i < subscriberCount; i++) diff --git a/tests/Exceptionless.Tests/Services/NotificationServiceTests.cs b/tests/Exceptionless.Tests/Services/NotificationServiceTests.cs index 7d0ef61d5f..e472c9c7d5 100644 --- a/tests/Exceptionless.Tests/Services/NotificationServiceTests.cs +++ b/tests/Exceptionless.Tests/Services/NotificationServiceTests.cs @@ -1,4 +1,7 @@ +using System.Threading; using Exceptionless.Core.Services; +using Foundatio.Lock; +using Foundatio.Messaging; using Xunit; namespace Exceptionless.Tests.Services; @@ -11,6 +14,42 @@ public NotificationServiceTests(ITestOutputHelper output) : base(output) { } private NotificationService NotificationService => GetService(); + [Fact] + public async Task IsOrganizationNotificationLockedAsync_WhenNoLockIsHeld_ShouldNotAcquireAndReleaseLock() + { + // Arrange + var messageBus = GetService(); + var releaseCount = 0; + await messageBus.SubscribeAsync(_ => + { + Interlocked.Increment(ref releaseCount); + return Task.CompletedTask; + }, TestCancellationToken); + + // Act + var isLocked = await NotificationService.IsOrganizationNotificationLockedAsync(PrimaryOrganizationId, isOverMonthlyLimit: true); + + // Assert + Assert.False(isLocked); + Assert.Equal(0, Volatile.Read(ref releaseCount)); + } + + [Theory] + [InlineData(null)] + [InlineData("")] + public async Task OrganizationNotificationMethods_WhenOrganizationIdIsNullOrEmpty_ShouldThrowArgumentException(string? organizationId) + { + // Arrange + const bool IsOverMonthlyLimit = true; + + // Act & Assert + await Assert.ThrowsAnyAsync(() => NotificationService.IsOrganizationNotificationSentAsync(organizationId!, IsOverMonthlyLimit)); + await Assert.ThrowsAnyAsync(() => NotificationService.SetOrganizationNotificationSentAsync(organizationId!, IsOverMonthlyLimit)); + await Assert.ThrowsAnyAsync(() => NotificationService.RemoveOrganizationNotificationSentAsync(organizationId!, IsOverMonthlyLimit)); + await Assert.ThrowsAnyAsync(() => NotificationService.TryAcquireOrganizationNotificationLockAsync(organizationId!, IsOverMonthlyLimit)); + await Assert.ThrowsAnyAsync(() => NotificationService.IsOrganizationNotificationLockedAsync(organizationId!, IsOverMonthlyLimit)); + } + [Fact] public async Task SetOrganizationNotificationSentAsync_WhenCalledInLastMillisecondOfUtcMonth_ShouldWriteObservableMarker() { From 592c2738478928a1f0b3a9d255124300a5a0c19c Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Sat, 30 May 2026 15:31:35 -0500 Subject: [PATCH 07/10] fix: Ensure monthly overage notification only resets when truly below new limit Previously, increasing an organization's monthly event limit would always clear the monthly overage notification marker. This was problematic if the organization's current usage still exceeded the new, higher limit, potentially causing them to remain in overage without further notifications. This change modifies the logic to only remove the overage notification marker if the organization's total usage has actually fallen below the newly increased monthly limit. This prevents premature resets and ensures that overage notifications continue until the usage is truly compliant. --- .../Services/UsageService.cs | 6 ++-- .../Services/UsageServiceTests.cs | 31 +++++++++++++++++++ 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/src/Exceptionless.Core/Services/UsageService.cs b/src/Exceptionless.Core/Services/UsageService.cs index b069c0e1f5..4a12491675 100644 --- a/src/Exceptionless.Core/Services/UsageService.cs +++ b/src/Exceptionless.Core/Services/UsageService.cs @@ -229,8 +229,10 @@ public async Task HandleOrganizationChangeAsync(Organization modified, Organizat bool isMonthlyLimitIncrease = modifiedMaxEvents < 0 || (originalMaxEvents >= 0 && modifiedMaxEvents > originalMaxEvents); if (isMonthlyLimitIncrease) { - // A higher monthly limit ends the current overage state: allow a future monthly overage email and clear the hourly throttle window. - await _notificationService.RemoveOrganizationNotificationSentAsync(modified.Id, isOverMonthlyLimit: true); + // A higher monthly limit only resets monthly notification state when it actually ends the current overage. + if (!modified.IsOverMonthlyLimit(_timeProvider)) + await _notificationService.RemoveOrganizationNotificationSentAsync(modified.Id, isOverMonthlyLimit: true); + await _cache.RemoveAsync(GetThrottledKey(utcNow, modified.Id)); return; } diff --git a/tests/Exceptionless.Tests/Services/UsageServiceTests.cs b/tests/Exceptionless.Tests/Services/UsageServiceTests.cs index 5c9bcfbca0..015f449866 100644 --- a/tests/Exceptionless.Tests/Services/UsageServiceTests.cs +++ b/tests/Exceptionless.Tests/Services/UsageServiceTests.cs @@ -75,6 +75,37 @@ public async Task HandleOrganizationChangeAsync_WhenMonthlyPlanLimitIncreases_Sh Assert.False(await MonthlySentMarkerExistsAsync(original.Id)); } + [Fact] + public async Task HandleOrganizationChangeAsync_WhenMonthlyPlanLimitIncreasesButOrganizationIsStillOverLimit_ShouldKeepOverageNotificationSentMarker() + { + // Arrange + var original = new Organization + { + Id = "664ec4c1f12e4f2b7a0d3005", + Name = "Primary Organization", + PlanId = _plans.SmallPlan.Id, + MaxEventsPerMonth = 100 + }; + original.GetCurrentUsage(TimeProvider).Total = 250; + + var modified = new Organization + { + Id = original.Id, + Name = original.Name, + PlanId = _plans.MediumPlan.Id, + MaxEventsPerMonth = 200 + }; + modified.GetCurrentUsage(TimeProvider).Total = 250; + + await SetMonthlySentMarkerAsync(original.Id); + + // Act + await _usageService.HandleOrganizationChangeAsync(modified, original); + + // Assert + Assert.True(await MonthlySentMarkerExistsAsync(original.Id)); + } + [Fact] public async Task HandleOrganizationChangeAsync_WhenPlanChangesToUnlimited_ShouldResetOverageNotificationSentMarker() { From 005793617e989f44afaf81bd50b47a473b437cbc Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Sat, 30 May 2026 15:46:32 -0500 Subject: [PATCH 08/10] test: Stabilize overage notification review tests --- ...izationNotificationWorkItemHandlerTests.cs | 27 ++++++++++++++++--- .../Services/UsageServiceTests.cs | 3 --- 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/tests/Exceptionless.Tests/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandlerTests.cs b/tests/Exceptionless.Tests/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandlerTests.cs index 03ee40af0b..f228a08ea0 100644 --- a/tests/Exceptionless.Tests/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandlerTests.cs +++ b/tests/Exceptionless.Tests/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandlerTests.cs @@ -3,6 +3,7 @@ using Exceptionless.Core.Jobs.WorkItemHandlers; using Exceptionless.Core.Messaging.Models; using Exceptionless.Core.Models.WorkItems; +using Foundatio.AsyncEx; using Foundatio.Caching; using Foundatio.Jobs; using Foundatio.Messaging; @@ -50,7 +51,7 @@ public async Task RunAsync_WhenOnePlanOverageIsObservedBySixSubscribersWithoutQu await SubscribeToPlanOverageAsync(workItemQueue, subscriberCount: 6); // Act - await MessagePublisher.PublishAsync(new PlanOverage { OrganizationId = PrimaryOrganizationId }, cancellationToken: TestContext.Current.CancellationToken); + await PublishPlanOverageAndWaitForQueueAsync(workItemQueue, new PlanOverage { OrganizationId = PrimaryOrganizationId }, expectedEnqueueAttempts: 6, expectedEnqueuedCount: 6); // Assert var queueStats = await workItemQueue.GetQueueStatsAsync(); @@ -66,7 +67,7 @@ public async Task RunAsync_WhenOnePlanOverageIsObservedBySixSubscribersWithQueue await SubscribeToPlanOverageAsync(workItemQueue, subscriberCount: 6); // Act - await MessagePublisher.PublishAsync(new PlanOverage { OrganizationId = QueueDuplicateDetectionOrganizationId }, cancellationToken: TestContext.Current.CancellationToken); + await PublishPlanOverageAndWaitForQueueAsync(workItemQueue, new PlanOverage { OrganizationId = QueueDuplicateDetectionOrganizationId }, expectedEnqueueAttempts: 6, expectedEnqueuedCount: 1); // Assert var queueStats = await workItemQueue.GetQueueStatsAsync(); @@ -118,7 +119,7 @@ public async Task RunAsync_WhenHourlyPlanOverageIsObserved_ShouldEnqueueHourlyWo await SubscribeToPlanOverageAsync(workItemQueue, subscriberCount: 1); // Act - await MessagePublisher.PublishAsync(new PlanOverage { OrganizationId = HourlyOrganizationId, IsHourly = true }, cancellationToken: TestContext.Current.CancellationToken); + await PublishPlanOverageAndWaitForQueueAsync(workItemQueue, new PlanOverage { OrganizationId = HourlyOrganizationId, IsHourly = true }, expectedEnqueueAttempts: 1, expectedEnqueuedCount: 1); // Assert var queueEntry = await workItemQueue.DequeueAsync(TestContext.Current.CancellationToken); @@ -191,6 +192,26 @@ private async Task SubscribeToPlanOverageAsync(IQueue workItemQueu } } + private async Task PublishPlanOverageAndWaitForQueueAsync(IQueue workItemQueue, PlanOverage overage, int expectedEnqueueAttempts, int expectedEnqueuedCount) + { + using var enqueueAttempts = new AsyncCountdownEvent(expectedEnqueueAttempts); + using var enqueued = new AsyncCountdownEvent(expectedEnqueuedCount); + using var enqueueAttemptSubscription = workItemQueue.Enqueuing.AddHandler((_, _) => + { + enqueueAttempts.Signal(); + return Task.CompletedTask; + }); + using var enqueuedSubscription = workItemQueue.Enqueued.AddHandler((_, _) => + { + enqueued.Signal(); + return Task.CompletedTask; + }); + + await MessagePublisher.PublishAsync(overage, cancellationToken: TestContext.Current.CancellationToken); + await enqueueAttempts.WaitAsync(TimeSpan.FromSeconds(5)); + await enqueued.WaitAsync(TimeSpan.FromSeconds(5)); + } + private InMemoryQueue CreateWorkItemQueue(params IQueueBehavior[] behaviors) { var options = new InMemoryQueueOptions diff --git a/tests/Exceptionless.Tests/Services/UsageServiceTests.cs b/tests/Exceptionless.Tests/Services/UsageServiceTests.cs index 015f449866..8a54c6cbd7 100644 --- a/tests/Exceptionless.Tests/Services/UsageServiceTests.cs +++ b/tests/Exceptionless.Tests/Services/UsageServiceTests.cs @@ -7,7 +7,6 @@ using Exceptionless.Core.Services; using Exceptionless.Tests.Extensions; using Foundatio.AsyncEx; -using Foundatio.Caching; using Foundatio.Messaging; using Foundatio.Repositories; using Xunit; @@ -22,7 +21,6 @@ public sealed class UsageServiceTests : IntegrationTestsBase private readonly UsageService _usageService; private readonly NotificationService _notificationService; private readonly BillingPlans _plans; - private readonly ICacheClient _cache; public UsageServiceTests(ITestOutputHelper output, AppWebHostFactory factory) : base(output, factory) { @@ -33,7 +31,6 @@ public UsageServiceTests(ITestOutputHelper output, AppWebHostFactory factory) : _organizationRepository = GetService(); _projectRepository = GetService(); _plans = GetService(); - _cache = GetService(); } private Task SetMonthlySentMarkerAsync(string organizationId) From 85dbed5ce994766350e09598f54c946f4797ae45 Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Sat, 30 May 2026 15:47:20 -0500 Subject: [PATCH 09/10] test: Fix overage notification test wait helper --- .../OrganizationNotificationWorkItemHandlerTests.cs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/Exceptionless.Tests/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandlerTests.cs b/tests/Exceptionless.Tests/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandlerTests.cs index f228a08ea0..652bd69aee 100644 --- a/tests/Exceptionless.Tests/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandlerTests.cs +++ b/tests/Exceptionless.Tests/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandlerTests.cs @@ -3,6 +3,7 @@ using Exceptionless.Core.Jobs.WorkItemHandlers; using Exceptionless.Core.Messaging.Models; using Exceptionless.Core.Models.WorkItems; +using Exceptionless.Tests.Extensions; using Foundatio.AsyncEx; using Foundatio.Caching; using Foundatio.Jobs; @@ -194,8 +195,8 @@ private async Task SubscribeToPlanOverageAsync(IQueue workItemQueu private async Task PublishPlanOverageAndWaitForQueueAsync(IQueue workItemQueue, PlanOverage overage, int expectedEnqueueAttempts, int expectedEnqueuedCount) { - using var enqueueAttempts = new AsyncCountdownEvent(expectedEnqueueAttempts); - using var enqueued = new AsyncCountdownEvent(expectedEnqueuedCount); + var enqueueAttempts = new AsyncCountdownEvent(expectedEnqueueAttempts); + var enqueued = new AsyncCountdownEvent(expectedEnqueuedCount); using var enqueueAttemptSubscription = workItemQueue.Enqueuing.AddHandler((_, _) => { enqueueAttempts.Signal(); From 5ad397ca6d9995aaf2dc824e6ae8d19d33a9a1e9 Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Sat, 30 May 2026 15:48:12 -0500 Subject: [PATCH 10/10] test: Use fixture cancellation token in notification tests --- .../OrganizationNotificationWorkItemHandlerTests.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/Exceptionless.Tests/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandlerTests.cs b/tests/Exceptionless.Tests/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandlerTests.cs index 652bd69aee..f1812b61cc 100644 --- a/tests/Exceptionless.Tests/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandlerTests.cs +++ b/tests/Exceptionless.Tests/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandlerTests.cs @@ -100,7 +100,7 @@ public async Task EnqueueAsync_WhenDuplicateNotificationIsDequeued_ShouldAllowFu var workItem = CreateMonthlyNotificationWorkItem(DequeuedNotificationOrganizationId); await workItemQueue.EnqueueAsync(workItem); - var queueEntry = await workItemQueue.DequeueAsync(TestContext.Current.CancellationToken); + var queueEntry = await workItemQueue.DequeueAsync(TestCancellationToken); Assert.NotNull(queueEntry); await queueEntry.CompleteAsync(); @@ -123,7 +123,7 @@ public async Task RunAsync_WhenHourlyPlanOverageIsObserved_ShouldEnqueueHourlyWo await PublishPlanOverageAndWaitForQueueAsync(workItemQueue, new PlanOverage { OrganizationId = HourlyOrganizationId, IsHourly = true }, expectedEnqueueAttempts: 1, expectedEnqueuedCount: 1); // Assert - var queueEntry = await workItemQueue.DequeueAsync(TestContext.Current.CancellationToken); + var queueEntry = await workItemQueue.DequeueAsync(TestCancellationToken); Assert.NotNull(queueEntry); var workItem = GetService().Deserialize(queueEntry.Value.Data)!; @@ -208,7 +208,7 @@ private async Task PublishPlanOverageAndWaitForQueueAsync(IQueue w return Task.CompletedTask; }); - await MessagePublisher.PublishAsync(overage, cancellationToken: TestContext.Current.CancellationToken); + await MessagePublisher.PublishAsync(overage, cancellationToken: TestCancellationToken); await enqueueAttempts.WaitAsync(TimeSpan.FromSeconds(5)); await enqueued.WaitAsync(TimeSpan.FromSeconds(5)); }