diff --git a/src/Exceptionless.Core/Bootstrapper.cs b/src/Exceptionless.Core/Bootstrapper.cs index 751d8e0880..334f2b07ae 100644 --- a/src/Exceptionless.Core/Bootstrapper.cs +++ b/src/Exceptionless.Core/Bootstrapper.cs @@ -43,6 +43,7 @@ using Foundatio.Serializer; using Foundatio.Storage; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; using Microsoft.Extensions.Logging; using DataDictionary = Exceptionless.Core.Models.DataDictionary; using MaintainIndexesJob = Foundatio.Repositories.Elasticsearch.Jobs.MaintainIndexesJob; @@ -128,6 +129,8 @@ public static void RegisterServices(IServiceCollection services, AppOptions appO services.AddSingleton(s => CreateQueue(s)); services.AddSingleton(s => CreateQueue(s, TimeSpan.FromHours(1))); + services.TryAddEnumerable(ServiceDescriptor.Singleton, WorkItemDuplicateDetectionQueueBehavior>()); + services.AddSingleton(); services.AddSingleton(); services.AddStartupAction(); @@ -301,10 +304,14 @@ private static IQueue CreateQueue(IServiceProvider container, TimeSpan? wo return new InMemoryQueue(new InMemoryQueueOptions { WorkItemTimeout = workItemTimeout.GetValueOrDefault(TimeSpan.FromMinutes(5.0)), + Behaviors = container.GetServices>().ToList(), Serializer = container.GetRequiredService(), TimeProvider = container.GetRequiredService(), ResiliencePolicyProvider = container.GetRequiredService(), LoggerFactory = loggerFactory }); } + + private sealed class WorkItemDuplicateDetectionQueueBehavior(ICacheClient cacheClient, ILoggerFactory loggerFactory) + : DuplicateDetectionQueueBehavior(cacheClient, loggerFactory, TimeSpan.FromHours(24)); } diff --git a/src/Exceptionless.Core/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandler.cs b/src/Exceptionless.Core/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandler.cs index 840f2bad38..fbf30ea43a 100644 --- a/src/Exceptionless.Core/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandler.cs +++ b/src/Exceptionless.Core/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandler.cs @@ -1,16 +1,16 @@ +using Exceptionless.Core.Extensions; using Exceptionless.Core.Mail; using Exceptionless.Core.Messaging.Models; using Exceptionless.Core.Models; using Exceptionless.Core.Models.WorkItems; using Exceptionless.Core.Repositories; -using Foundatio.Caching; +using Exceptionless.Core.Services; using Foundatio.Extensions.Hosting.Startup; using Foundatio.Jobs; using Foundatio.Lock; using Foundatio.Messaging; using Foundatio.Queues; using Foundatio.Repositories; -using Foundatio.Resilience; using Microsoft.Extensions.Logging; namespace Exceptionless.Core.Jobs.WorkItemHandlers; @@ -44,37 +44,104 @@ await _workItemQueue.EnqueueAsync(new OrganizationNotificationWorkItem } } +/// +/// Handles by sending a plan-overage email at most +/// once per overage state per organization. +/// +/// Root cause of duplicate emails (fixed here): +/// Every web pod registers the same +/// startup action, so a single PlanOverage message fanned out to all pods and each pod +/// enqueued its own copy of the work item. The previous +/// ThrottlingLockProvider(slotsPerPeriod: 1, period: 1 hour) allowed exactly one item +/// through per calendar-hour bucket. Abandoned duplicates were re-queued and reprocessed once +/// each new bucket opened — producing one email per hour for as many duplicate items existed. +/// +/// Fix layers (both required): +/// +/// +/// Queue-level dedup: plus +/// DuplicateDetectionQueueBehavior collapses identical fanout enqueues to a single +/// queue entry, preventing duplicates from being enqueued in the first place. +/// +/// +/// Handler-level idempotency: a distributed lock serializes concurrent processing, and a +/// sent marker ensures that any stale duplicates +/// already in the queue before the fix deployed cannot re-trigger an email in the same UTC +/// month. The marker is reset when a monthly plan limit change re-evaluates the overage state. +/// +/// +/// +/// Only monthly overages send email. Hourly items use the base work-item lock and return from +/// before touching the monthly notification lock/sent-key path, so +/// they can never block or suppress a later monthly notification. +/// Monthly items also re-check the current organization usage before sending so delayed queue +/// entries do not email after a plan or usage-state change leaves the organization under limit. +/// public class OrganizationNotificationWorkItemHandler : WorkItemHandlerBase { private readonly IOrganizationRepository _organizationRepository; private readonly IUserRepository _userRepository; private readonly IMailer _mailer; - private readonly ILockProvider _lockProvider; + private readonly NotificationService _notificationService; + private readonly TimeProvider _timeProvider; - public OrganizationNotificationWorkItemHandler(IOrganizationRepository organizationRepository, IUserRepository userRepository, IMailer mailer, ICacheClient cacheClient, TimeProvider timeProvider, IResiliencePolicyProvider resiliencePolicyProvider, ILoggerFactory loggerFactory) : base(loggerFactory) + public OrganizationNotificationWorkItemHandler(IOrganizationRepository organizationRepository, IUserRepository userRepository, IMailer mailer, NotificationService notificationService, TimeProvider timeProvider, ILoggerFactory loggerFactory) : base(loggerFactory) { _organizationRepository = organizationRepository; _userRepository = userRepository; _mailer = mailer; - _lockProvider = new ThrottlingLockProvider(cacheClient, 1, TimeSpan.FromHours(1), timeProvider, resiliencePolicyProvider, loggerFactory); + _notificationService = notificationService; + _timeProvider = timeProvider; } - public override Task HandleItemAsync(WorkItemContext context) + public override Task GetWorkItemLockAsync(object workItem, CancellationToken cancellationToken = default) + { + var wi = (OrganizationNotificationWorkItem)workItem; + + // Hourly overages do not send email. Return the base EmptyLock so WorkItemJob calls + // HandleItemAsync and then marks the queue entry as completed. Returning null instead + // would cause WorkItemJob to call AbandonAsync, creating an infinite retry loop where + // the item is re-queued and abandoned on every dequeue attempt. + if (!ShouldSendNotificationEmail(wi)) + return base.GetWorkItemLockAsync(workItem, cancellationToken); + + // The notification service uses a 90-minute lease, comfortably exceeding the 1-hour + // work-item queue timeout so a slow send does not let a duplicate worker acquire the + // notification lock at the queue visibility boundary. + // + // acquireTimeout: TimeSpan.Zero — if another worker already holds the lock, return null + // immediately so WorkItemJob calls AbandonAsync. The item is retried later, at which point + // the sent marker is already set and the handler skips. Blocking here instead would stall + // a worker slot for up to the lock timeout with no correctness benefit. + return _notificationService.TryAcquireOrganizationNotificationLockAsync(wi.OrganizationId, wi.IsOverMonthlyLimit); + } + + public override async Task HandleItemAsync(WorkItemContext context) { var wi = context.GetData()!; + Log.LogInformation("Received organization notification work item for: {OrganizationId} IsOverHourlyLimit: {IsOverHourlyLimit} IsOverMonthlyLimit: {IsOverMonthlyLimit}", wi.OrganizationId, wi.IsOverHourlyLimit, wi.IsOverMonthlyLimit); + + if (!ShouldSendNotificationEmail(wi)) + return; + + var organization = await _organizationRepository.GetByIdAsync(wi.OrganizationId, o => o.Cache()); + if (organization is null) + return; - string cacheKey = $"{nameof(OrganizationNotificationWorkItemHandler)}:{wi.OrganizationId}"; + if (!organization.IsOverMonthlyLimit(_timeProvider)) + { + Log.LogInformation("Skipping stale monthly overage notification for organization: {OrganizationId} because it is no longer over the monthly limit", wi.OrganizationId); + return; + } - return _lockProvider.TryUsingAsync(cacheKey, async () => + if (await _notificationService.IsOrganizationNotificationSentAsync(wi.OrganizationId, wi.IsOverMonthlyLimit)) { - Log.LogInformation("Received organization notification work item for: {Organization} IsOverHourlyLimit: {IsOverHourlyLimit} IsOverMonthlyLimit: {IsOverMonthlyLimit}", wi.OrganizationId, wi.IsOverHourlyLimit, wi.IsOverMonthlyLimit); - var organization = await _organizationRepository.GetByIdAsync(wi.OrganizationId, o => o.Cache()); - if (organization is null) - return; - - if (wi.IsOverMonthlyLimit) - await SendOverageNotificationsAsync(organization, wi.IsOverHourlyLimit, wi.IsOverMonthlyLimit); - }, TimeSpan.FromMinutes(15), context.CancellationToken); + Log.LogInformation("Skipping duplicate monthly overage notification for organization: {OrganizationId}", wi.OrganizationId); + return; + } + + await SendOverageNotificationsAsync(organization, wi.IsOverHourlyLimit, wi.IsOverMonthlyLimit); + await _notificationService.SetOrganizationNotificationSentAsync(wi.OrganizationId, wi.IsOverMonthlyLimit); } private async Task SendOverageNotificationsAsync(Organization organization, bool isOverHourlyLimit, bool isOverMonthlyLimit) @@ -100,4 +167,13 @@ private async Task SendOverageNotificationsAsync(Organization organization, bool Log.LogTrace("Done sending email"); } + + // Only monthly overages send email. Hourly overages still trigger real-time websocket + // budget updates in the UI but do not warrant a separate notification email. + // Keeping hourly items out of the lock/sent-key path also means a burst of hourly events + // cannot suppress the monthly notification that follows. + private static bool ShouldSendNotificationEmail(OrganizationNotificationWorkItem workItem) + { + return workItem.IsOverMonthlyLimit; + } } diff --git a/src/Exceptionless.Core/Models/WorkItems/OrganizationNotificationWorkItem.cs b/src/Exceptionless.Core/Models/WorkItems/OrganizationNotificationWorkItem.cs index 2d734ea6c2..5c14e87743 100644 --- a/src/Exceptionless.Core/Models/WorkItems/OrganizationNotificationWorkItem.cs +++ b/src/Exceptionless.Core/Models/WorkItems/OrganizationNotificationWorkItem.cs @@ -1,8 +1,22 @@ -namespace Exceptionless.Core.Models.WorkItems; +using Foundatio.Queues; -public record OrganizationNotificationWorkItem +namespace Exceptionless.Core.Models.WorkItems; + +public record OrganizationNotificationWorkItem : IHaveUniqueIdentifier { + public const string HourlyNotificationType = "hourly"; + public const string MonthlyNotificationType = "monthly"; + public required string OrganizationId { get; init; } public required bool IsOverHourlyLimit { get; init; } public required bool IsOverMonthlyLimit { get; init; } + + public string UniqueIdentifier => GetNotificationKey(OrganizationId, IsOverMonthlyLimit); + + public static string GetNotificationKey(string organizationId, bool isOverMonthlyLimit) + { + ArgumentException.ThrowIfNullOrEmpty(organizationId); + + return $"Organization:{organizationId}:notification:{(isOverMonthlyLimit ? MonthlyNotificationType : HourlyNotificationType)}"; + } } diff --git a/src/Exceptionless.Core/Services/NotificationService.cs b/src/Exceptionless.Core/Services/NotificationService.cs index 10b0f093b6..52f6db4024 100644 --- a/src/Exceptionless.Core/Services/NotificationService.cs +++ b/src/Exceptionless.Core/Services/NotificationService.cs @@ -1,12 +1,16 @@ using Exceptionless.Core.Messaging.Models; +using Exceptionless.Core.Models.WorkItems; +using Exceptionless.DateTimeExtensions; using Foundatio.Caching; +using Foundatio.Lock; using Foundatio.Messaging; namespace Exceptionless.Core.Services; -public class NotificationService(ICacheClient cacheClient, IMessagePublisher messagePublisher, TimeProvider timeProvider) +public class NotificationService(ICacheClient cacheClient, IMessagePublisher messagePublisher, TimeProvider timeProvider, CacheLockProvider lockProvider) { private const string SystemNotificationCacheKey = "system-notification"; + private static readonly TimeSpan OrganizationNotificationLockTimeout = TimeSpan.FromMinutes(90); public async Task GetSystemNotificationAsync() { @@ -37,4 +41,63 @@ public async Task SendReleaseNotificationAsync(string? mess await messagePublisher.PublishAsync(notification); return notification; } + + public async Task IsOrganizationNotificationSentAsync(string organizationId, bool isOverMonthlyLimit) + { + ArgumentException.ThrowIfNullOrEmpty(organizationId); + + var sent = await cacheClient.GetAsync(GetOrganizationNotificationSentKey(organizationId, isOverMonthlyLimit)); + return sent.HasValue && sent.Value; + } + + public Task SetOrganizationNotificationSentAsync(string organizationId, bool isOverMonthlyLimit) + { + ArgumentException.ThrowIfNullOrEmpty(organizationId); + + return cacheClient.SetAsync(GetOrganizationNotificationSentKey(organizationId, isOverMonthlyLimit), true, GetOrganizationNotificationSentExpiresAtUtc()); + } + + public Task RemoveOrganizationNotificationSentAsync(string organizationId, bool isOverMonthlyLimit) + { + ArgumentException.ThrowIfNullOrEmpty(organizationId); + + return cacheClient.RemoveAsync(GetOrganizationNotificationSentKey(organizationId, isOverMonthlyLimit)); + } + + public Task TryAcquireOrganizationNotificationLockAsync(string organizationId, bool isOverMonthlyLimit) + { + ArgumentException.ThrowIfNullOrEmpty(organizationId); + + return lockProvider.TryAcquireAsync(GetOrganizationNotificationLockKey(organizationId, isOverMonthlyLimit), OrganizationNotificationLockTimeout, TimeSpan.Zero); + } + + public Task IsOrganizationNotificationLockedAsync(string organizationId, bool isOverMonthlyLimit) + { + ArgumentException.ThrowIfNullOrEmpty(organizationId); + + return lockProvider.IsLockedAsync(GetOrganizationNotificationLockKey(organizationId, isOverMonthlyLimit)); + } + + private static string GetOrganizationNotificationLockKey(string organizationId, bool isOverMonthlyLimit) + { + return $"{OrganizationNotificationWorkItem.GetNotificationKey(organizationId, isOverMonthlyLimit)}-lock"; + } + + private static string GetOrganizationNotificationSentKey(string organizationId, bool isOverMonthlyLimit) + { + return $"{OrganizationNotificationWorkItem.GetNotificationKey(organizationId, isOverMonthlyLimit)}-sent"; + } + + private DateTime GetOrganizationNotificationSentExpiresAtUtc() + { + var utcNow = timeProvider.GetUtcNow().UtcDateTime; + var expiresAtUtc = utcNow.StartOfMonth().AddMonths(1); + + // Foundatio treats absolute cache expirations less than 5ms in the future as already expired. + // Keep the marker observable for sends that complete in the final milliseconds of the UTC month. + if (expiresAtUtc - utcNow < CacheClientExtensions.MinimumExpiration) + return utcNow.Add(CacheClientExtensions.MinimumExpiration); + + return expiresAtUtc; + } } diff --git a/src/Exceptionless.Core/Services/UsageService.cs b/src/Exceptionless.Core/Services/UsageService.cs index 8de074c3e8..4a12491675 100644 --- a/src/Exceptionless.Core/Services/UsageService.cs +++ b/src/Exceptionless.Core/Services/UsageService.cs @@ -16,11 +16,13 @@ public class UsageService private readonly IProjectRepository _projectRepository; private readonly ICacheClient _cache; private readonly IMessagePublisher _messagePublisher; + private readonly NotificationService _notificationService; private readonly TimeProvider _timeProvider; private readonly ILogger _logger; private readonly TimeSpan _bucketSize = TimeSpan.FromMinutes(5); public UsageService(IOrganizationRepository organizationRepository, IProjectRepository projectRepository, ICacheClient cache, IMessagePublisher messagePublisher, + NotificationService notificationService, TimeProvider timeProvider, ILoggerFactory loggerFactory) { @@ -28,6 +30,7 @@ public UsageService(IOrganizationRepository organizationRepository, IProjectRepo _projectRepository = projectRepository; _cache = cache; _messagePublisher = messagePublisher; + _notificationService = notificationService; _timeProvider = timeProvider; _logger = loggerFactory.CreateLogger(); } @@ -223,28 +226,39 @@ public async Task HandleOrganizationChangeAsync(Organization modified, Organizat if (modifiedMaxEvents == originalMaxEvents) return; - if (modifiedMaxEvents > originalMaxEvents) + bool isMonthlyLimitIncrease = modifiedMaxEvents < 0 || (originalMaxEvents >= 0 && modifiedMaxEvents > originalMaxEvents); + if (isMonthlyLimitIncrease) { - // remove is throttled flag + // A higher monthly limit only resets monthly notification state when it actually ends the current overage. + if (!modified.IsOverMonthlyLimit(_timeProvider)) + await _notificationService.RemoveOrganizationNotificationSentAsync(modified.Id, isOverMonthlyLimit: true); + await _cache.RemoveAsync(GetThrottledKey(utcNow, modified.Id)); + return; } - else + + bool wasOverMonthlyLimit = original.IsOverMonthlyLimit(_timeProvider); + bool isOverMonthlyLimit = modified.IsOverMonthlyLimit(_timeProvider); + if (!wasOverMonthlyLimit && isOverMonthlyLimit) { - var bucketTotal = await _cache.GetAsync(GetBucketTotalCacheKey(utcNow, modified.Id)); - if (!bucketTotal.HasValue) - return; + await _notificationService.RemoveOrganizationNotificationSentAsync(modified.Id, isOverMonthlyLimit: true); + await _messagePublisher.PublishAsync(new PlanOverage { OrganizationId = modified.Id }); + } - int bucketLimit = GetBucketEventLimit(modifiedMaxEvents); + var bucketTotal = await _cache.GetAsync(GetBucketTotalCacheKey(utcNow, modified.Id)); + if (!bucketTotal.HasValue) + return; - // unlimited - if (bucketLimit < 0) - return; + int bucketLimit = GetBucketEventLimit(modifiedMaxEvents); - if (bucketTotal.Value >= bucketLimit) - { - await _messagePublisher.PublishAsync(new PlanOverage { OrganizationId = modified.Id, IsHourly = true }); - await _cache.SetAsync(GetThrottledKey(utcNow, modified.Id), true, TimeSpan.FromMinutes(5)); - } + // unlimited + if (bucketLimit < 0) + return; + + if (bucketTotal.Value >= bucketLimit) + { + await _messagePublisher.PublishAsync(new PlanOverage { OrganizationId = modified.Id, IsHourly = true }); + await _cache.SetAsync(GetThrottledKey(utcNow, modified.Id), true, TimeSpan.FromMinutes(5)); } } diff --git a/tests/Exceptionless.Tests/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandlerIntegrationTests.cs b/tests/Exceptionless.Tests/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandlerIntegrationTests.cs new file mode 100644 index 0000000000..7925aed41e --- /dev/null +++ b/tests/Exceptionless.Tests/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandlerIntegrationTests.cs @@ -0,0 +1,433 @@ +using Exceptionless.Core; +using Exceptionless.Core.Billing; +using Exceptionless.Core.Extensions; +using Exceptionless.Core.Jobs.WorkItemHandlers; +using Exceptionless.Core.Mail; +using Exceptionless.Core.Models; +using Exceptionless.Core.Models.WorkItems; +using Exceptionless.Core.Repositories; +using Exceptionless.Core.Services; +using Exceptionless.Tests.Mail; +using Exceptionless.Tests.Utility; +using Foundatio.Jobs; +using Foundatio.Repositories; +using Microsoft.Extensions.DependencyInjection; +using Xunit; + +namespace Exceptionless.Tests.Jobs.WorkItemHandlers; + +public class OrganizationNotificationWorkItemHandlerIntegrationTests : IntegrationTestsBase +{ + private const string PrimaryOrganizationId = "664ec4c1f12e4f2b7a0d1001"; + private const string SecondaryOrganizationId = "664ec4c1f12e4f2b7a0d1002"; + private const string MissingOrganizationId = "664ec4c1f12e4f2b7a0d1003"; + private const string PrimaryUserId = "664ec4c1f12e4f2b7a0d2001"; + private const string SecondaryUserId = "664ec4c1f12e4f2b7a0d2002"; + private const string UnverifiedUserId = "664ec4c1f12e4f2b7a0d2003"; + private const string DisabledNotificationsUserId = "664ec4c1f12e4f2b7a0d2004"; + + public OrganizationNotificationWorkItemHandlerIntegrationTests(ITestOutputHelper output, AppWebHostFactory factory) : base(output, factory) { } + + private CountingMailer Mailer => GetService(); + private NotificationService NotificationService => GetService(); + private OrganizationNotificationWorkItemHandler Handler => GetService(); + private IOrganizationRepository OrganizationRepository => GetService(); + private IUserRepository UserRepository => GetService(); + private BillingManager BillingManager => GetService(); + private BillingPlans BillingPlans => GetService(); + private OrganizationData OrganizationData => GetService(); + private UserData UserData => GetService(); + + private Task SetMonthlySentMarkerAsync(string organizationId) + { + return NotificationService.SetOrganizationNotificationSentAsync(organizationId, isOverMonthlyLimit: true); + } + + protected override void RegisterServices(IServiceCollection services) + { + base.RegisterServices(services); + services.AddSingleton(); + services.ReplaceSingleton(sp => sp.GetRequiredService()); + } + + protected override async Task ResetDataAsync() + { + await base.ResetDataAsync(); + Mailer.Reset(); + + var organizations = new[] + { + OrganizationData.GenerateOrganization(BillingManager, BillingPlans, id: PrimaryOrganizationId, name: "Primary Organization", plan: BillingPlans.SmallPlan), + OrganizationData.GenerateOrganization(BillingManager, BillingPlans, id: SecondaryOrganizationId, name: "Secondary Organization", plan: BillingPlans.SmallPlan) + }; + + foreach (var organization in organizations) + organization.GetCurrentUsage(TimeProvider).Total = organization.GetMaxEventsPerMonthWithBonus(TimeProvider); + + await OrganizationRepository.AddAsync(organizations, options => options.ImmediateConsistency()); + + var primaryUser = UserData.GenerateUser(id: PrimaryUserId, organizationId: PrimaryOrganizationId, emailAddress: "primary-owner@example.org"); + primaryUser.FullName = "Primary Owner"; + primaryUser.IsEmailAddressVerified = true; + + var secondaryUser = UserData.GenerateUser(id: SecondaryUserId, organizationId: SecondaryOrganizationId, emailAddress: "secondary-owner@example.org"); + secondaryUser.FullName = "Secondary Owner"; + secondaryUser.IsEmailAddressVerified = true; + + await UserRepository.AddAsync([primaryUser, secondaryUser], options => options.ImmediateConsistency()); + } + + [Fact] + public async Task HandleItemAsync_WhenDuplicateMonthlyNotificationsAreProcessedAcrossHours_ShouldSendOneEmail() + { + // Arrange + TimeProvider.SetUtcNow(new DateTime(2026, 5, 15, 12, 0, 0, DateTimeKind.Utc)); + await SetOrganizationOverMonthlyLimitAsync(PrimaryOrganizationId); + + // Act + await HandleWorkItemAsync(CreateMonthlyNotificationWorkItem(PrimaryOrganizationId)); + + TimeProvider.Advance(TimeSpan.FromHours(1).Add(TimeSpan.FromMinutes(1))); + await HandleWorkItemAsync(CreateMonthlyNotificationWorkItem(PrimaryOrganizationId)); + + TimeProvider.Advance(TimeSpan.FromHours(1).Add(TimeSpan.FromMinutes(1))); + await HandleWorkItemAsync(CreateMonthlyNotificationWorkItem(PrimaryOrganizationId)); + + // Assert + Assert.Equal(1, Mailer.OrganizationNoticeCount); + } + + [Fact] + public async Task HandleItemAsync_WhenMonthlyNotificationsTargetDifferentOrganizations_ShouldSendOneEmailPerOrganization() + { + // Arrange + // Act + for (int i = 0; i < 3; i++) + { + await HandleWorkItemAsync(CreateMonthlyNotificationWorkItem(PrimaryOrganizationId)); + await HandleWorkItemAsync(CreateMonthlyNotificationWorkItem(SecondaryOrganizationId)); + } + + // Assert + var callsByOrganization = Mailer.OrganizationNoticeCalls + .GroupBy(call => call.OrganizationId) + .ToDictionary(group => group.Key, group => group.Count()); + + Assert.Equal(2, callsByOrganization.Count); + Assert.Equal(1, callsByOrganization[PrimaryOrganizationId]); + Assert.Equal(1, callsByOrganization[SecondaryOrganizationId]); + } + + [Fact] + public async Task HandleItemAsync_WhenTwentyFourHoursPass_ShouldNotSendAnotherMonthlyNotification() + { + // Arrange + TimeProvider.SetUtcNow(new DateTime(2026, 5, 15, 12, 0, 0, DateTimeKind.Utc)); + await SetOrganizationOverMonthlyLimitAsync(PrimaryOrganizationId); + + // Act + await HandleWorkItemAsync(CreateMonthlyNotificationWorkItem(PrimaryOrganizationId)); + + TimeProvider.Advance(TimeSpan.FromHours(25)); + await HandleWorkItemAsync(CreateMonthlyNotificationWorkItem(PrimaryOrganizationId)); + + // Assert + Assert.Equal(1, Mailer.OrganizationNoticeCount); + } + + [Fact] + public async Task HandleItemAsync_WhenUtcMonthEndsAndOrganizationIsStillOverMonthlyLimit_ShouldAllowAnotherMonthlyNotification() + { + // Arrange + TimeProvider.SetUtcNow(new DateTime(2026, 5, 31, 23, 50, 0, DateTimeKind.Utc)); + await SetOrganizationOverMonthlyLimitAsync(PrimaryOrganizationId); + + // Act + await HandleWorkItemAsync(CreateMonthlyNotificationWorkItem(PrimaryOrganizationId)); + + TimeProvider.Advance(TimeSpan.FromMinutes(20)); + await SetOrganizationOverMonthlyLimitAsync(PrimaryOrganizationId); + await HandleWorkItemAsync(CreateMonthlyNotificationWorkItem(PrimaryOrganizationId)); + + // Assert + Assert.Equal(2, Mailer.OrganizationNoticeCount); + } + + /// + /// Regression test: returning null from GetWorkItemLockAsync causes WorkItemJob to call + /// AbandonAsync instead of HandleItemAsync, creating an infinite retry loop for hourly items. + /// The fix returns EmptyLock (via base) so the item is completed normally. + /// + [Fact] + public async Task GetWorkItemLockAsync_WhenWorkItemIsHourly_ShouldReturnNonNullLockSoItemIsNotAbandoned() + { + // Arrange + var workItem = CreateHourlyNotificationWorkItem(PrimaryOrganizationId); + + // Act + // null → WorkItemJob calls AbandonAsync (infinite retry loop) + // non-null → WorkItemJob calls HandleItemAsync and completes the entry + await using var workItemLock = await Handler.GetWorkItemLockAsync(workItem, TestContext.Current.CancellationToken); + + // Assert + Assert.NotNull(workItemLock); + } + + [Fact] + public async Task GetWorkItemLockAsync_WhenMonthlyLockIsAlreadyHeld_ShouldReturnNullSoConcurrentItemIsAbandoned() + { + // Arrange + var workItem = CreateMonthlyNotificationWorkItem(PrimaryOrganizationId); + + // Act + await using var firstLock = await Handler.GetWorkItemLockAsync(workItem, TestContext.Current.CancellationToken); + await using var secondLock = await Handler.GetWorkItemLockAsync(workItem, TestContext.Current.CancellationToken); + + // Assert + Assert.NotNull(firstLock); // First worker acquires the lock + Assert.Null(secondLock); // Concurrent duplicate is correctly abandoned + } + + /// + /// Regression test for the helper bug: + /// the helper must NOT call + /// when the lock is null, because in production WorkItemJob calls AbandonAsync instead. + /// Without this guard a concurrent item could bypass the lock, find no sent marker, and send + /// a duplicate email — a bug that would be invisible to all other integration tests. + /// + [Fact] + public async Task HandleItemAsync_WhenConcurrentWorkerHoldsMonthlyLock_ShouldNotSendEmail() + { + // Arrange: a concurrent worker already holds the monthly notification lock + var workItem = CreateMonthlyNotificationWorkItem(PrimaryOrganizationId); + await using var concurrentWorkerLock = await Handler.GetWorkItemLockAsync(workItem, TestContext.Current.CancellationToken); + Assert.NotNull(concurrentWorkerLock); + + // Act: a second worker attempts to process the same item while the lock is held; + // GetWorkItemLockAsync returns null so WorkItemJob abandons the item (never calls HandleItemAsync) + await HandleWorkItemAsync(workItem); + + // Assert: no email was sent because the item was abandoned, not processed + Assert.Equal(0, Mailer.OrganizationNoticeCount); + } + + [Fact] + public async Task HandleItemAsync_WhenOnlyAnHourlyOverageIsProcessed_ShouldNotSendEmail() + { + // Arrange + // Act + await HandleWorkItemAsync(CreateHourlyNotificationWorkItem(PrimaryOrganizationId)); + + // Assert + Assert.Equal(0, Mailer.OrganizationNoticeCount); + } + + [Fact] + public async Task HandleItemAsync_WhenTheMonthlySentMarkerAlreadyExists_ShouldNotSendEmail() + { + // Arrange + await SetMonthlySentMarkerAsync(PrimaryOrganizationId); + + // Act + await HandleWorkItemAsync(CreateMonthlyNotificationWorkItem(PrimaryOrganizationId)); + + // Assert + Assert.Equal(0, Mailer.OrganizationNoticeCount); + } + + [Fact] + public async Task HandleItemAsync_WhenMonthlyEmailIsSent_ShouldWriteSentMarker() + { + // Arrange + var workItem = CreateMonthlyNotificationWorkItem(PrimaryOrganizationId); + + // Act + await HandleWorkItemAsync(workItem); + + // Assert + Assert.True(await NotificationService.IsOrganizationNotificationSentAsync(PrimaryOrganizationId, isOverMonthlyLimit: true)); + } + + [Fact] + public async Task HandleItemAsync_WhenMonthlyEmailIsSentInLastMillisecondOfUtcMonth_ShouldWriteObservableSentMarker() + { + // Arrange + TimeProvider.SetUtcNow(new DateTime(2026, 5, 31, 23, 59, 59, 999, DateTimeKind.Utc)); + await SetOrganizationOverMonthlyLimitAsync(PrimaryOrganizationId); + + // Act + await HandleWorkItemAsync(CreateMonthlyNotificationWorkItem(PrimaryOrganizationId)); + + // Assert + Assert.True(await NotificationService.IsOrganizationNotificationSentAsync(PrimaryOrganizationId, isOverMonthlyLimit: true)); + Assert.Equal(1, Mailer.OrganizationNoticeCount); + } + + [Fact] + public async Task HandleItemAsync_WhenMonthlyWorkItemIsStaleAndOrganizationIsNoLongerOverMonthlyLimit_ShouldNotSendEmailOrWriteSentMarker() + { + // Arrange + var organization = await OrganizationRepository.GetByIdAsync(PrimaryOrganizationId, options => options.Cache()); + Assert.NotNull(organization); + + organization.GetCurrentUsage(TimeProvider).Total = 0; + await OrganizationRepository.SaveAsync(organization, options => options.ImmediateConsistency().Cache()); + + // Act + await HandleWorkItemAsync(CreateMonthlyNotificationWorkItem(PrimaryOrganizationId)); + + // Assert + Assert.False(await NotificationService.IsOrganizationNotificationSentAsync(PrimaryOrganizationId, isOverMonthlyLimit: true)); + Assert.Equal(0, Mailer.OrganizationNoticeCount); + } + + [Fact] + public async Task HandleItemAsync_WhenOrganizationNoLongerExists_ShouldNotWriteSentMarkerOrSendEmail() + { + // Arrange + var workItem = CreateMonthlyNotificationWorkItem(MissingOrganizationId); + + // Act + await HandleWorkItemAsync(workItem); + + // Assert + Assert.False(await NotificationService.IsOrganizationNotificationSentAsync(MissingOrganizationId, isOverMonthlyLimit: true)); + Assert.Equal(0, Mailer.OrganizationNoticeCount); + } + + [Fact] + public async Task HandleItemAsync_WhenUsersAreUnverifiedOrDisabled_ShouldOnlySendToEligibleUsers() + { + // Arrange + var unverifiedUser = UserData.GenerateUser(id: UnverifiedUserId, organizationId: PrimaryOrganizationId, emailAddress: "unverified-owner@example.org"); + unverifiedUser.FullName = "Unverified Owner"; + unverifiedUser.IsEmailAddressVerified = false; + unverifiedUser.EmailNotificationsEnabled = true; + unverifiedUser.ResetVerifyEmailAddressTokenAndExpiration(TimeProvider); + + var disabledNotificationsUser = UserData.GenerateUser(id: DisabledNotificationsUserId, organizationId: PrimaryOrganizationId, emailAddress: "disabled-owner@example.org"); + disabledNotificationsUser.FullName = "Disabled Owner"; + disabledNotificationsUser.IsEmailAddressVerified = true; + disabledNotificationsUser.EmailNotificationsEnabled = false; + + await UserRepository.AddAsync([unverifiedUser, disabledNotificationsUser], options => options.ImmediateConsistency()); + + // Act + await HandleWorkItemAsync(CreateMonthlyNotificationWorkItem(PrimaryOrganizationId)); + + // Assert + var call = Assert.Single(Mailer.OrganizationNoticeCalls); + Assert.Equal(PrimaryUserId, call.UserId); + Assert.Equal(PrimaryOrganizationId, call.OrganizationId); + Assert.True(call.IsOverMonthlyLimit); + Assert.False(call.IsOverHourlyLimit); + } + + [Fact] + public async Task HandleItemAsync_WhenHourlyOveragePrecedesMonthlyOverage_ShouldSendTheMonthlyEmail() + { + // Arrange + // Act + await HandleWorkItemAsync(CreateHourlyNotificationWorkItem(PrimaryOrganizationId)); + await HandleWorkItemAsync(CreateMonthlyNotificationWorkItem(PrimaryOrganizationId)); + + // Assert + Assert.Equal(1, Mailer.OrganizationNoticeCount); + } + + [Fact] + public async Task HandleItemAsync_WhenHourlyOverageArrivesAfterMonthlyEmail_ShouldNotSendAnotherEmail() + { + // Arrange + // Act + await HandleWorkItemAsync(CreateMonthlyNotificationWorkItem(PrimaryOrganizationId)); + + TimeProvider.Advance(TimeSpan.FromHours(1).Add(TimeSpan.FromMinutes(1))); + await HandleWorkItemAsync(CreateHourlyNotificationWorkItem(PrimaryOrganizationId)); + + // Assert + var call = Assert.Single(Mailer.OrganizationNoticeCalls); + Assert.True(call.IsOverMonthlyLimit); + Assert.False(call.IsOverHourlyLimit); + } + + [Fact] + public async Task GetWorkItemLockAsync_WhenMonthlyWorkerReachesWorkItemTimeout_ShouldKeepConcurrentItemAbandoned() + { + // Arrange + var workItem = CreateMonthlyNotificationWorkItem(PrimaryOrganizationId); + await using var workerALock = await Handler.GetWorkItemLockAsync(workItem, TestContext.Current.CancellationToken); + Assert.NotNull(workerALock); + + TimeProvider.Advance(TimeSpan.FromMinutes(61)); + + // Act + await using var workerBLock = await Handler.GetWorkItemLockAsync(workItem, TestContext.Current.CancellationToken); + + // Assert + Assert.Null(workerBLock); + } + + [Fact] + public async Task HandleItemAsync_WhenEmailSendFails_ShouldNotWriteSentMarkerSoRetryCanSend() + { + // Arrange + var workItem = CreateMonthlyNotificationWorkItem(PrimaryOrganizationId); + Mailer.ShouldThrow = true; + + // Act + await Assert.ThrowsAsync(() => HandleWorkItemAsync(workItem)); + + var sentMarkerExists = await NotificationService.IsOrganizationNotificationSentAsync(PrimaryOrganizationId, isOverMonthlyLimit: true); + + Mailer.ShouldThrow = false; + await HandleWorkItemAsync(workItem); + + // Assert + Assert.False(sentMarkerExists); + Assert.Equal(1, Mailer.OrganizationNoticeCount); + } + + private async Task HandleWorkItemAsync(OrganizationNotificationWorkItem workItem) + { + await using var workItemLock = await Handler.GetWorkItemLockAsync(workItem, TestContext.Current.CancellationToken); + + // Mirror production WorkItemJob semantics: when GetWorkItemLockAsync returns null, + // WorkItemJob calls AbandonAsync and never calls HandleItemAsync. Omitting this guard + // would let tests call HandleItemAsync without the lock, masking concurrent-access bugs. + if (workItemLock is null) + return; + + var context = new WorkItemContext(workItem, "test-job", workItemLock, TestContext.Current.CancellationToken, static (_, _) => Task.CompletedTask); + await Handler.HandleItemAsync(context); + } + + private async Task SetOrganizationOverMonthlyLimitAsync(string organizationId) + { + var organization = await OrganizationRepository.GetByIdAsync(organizationId, options => options.Cache()); + Assert.NotNull(organization); + + organization.GetCurrentUsage(TimeProvider).Total = organization.GetMaxEventsPerMonthWithBonus(TimeProvider); + await OrganizationRepository.SaveAsync(organization, options => options.ImmediateConsistency().Cache()); + } + + private static OrganizationNotificationWorkItem CreateMonthlyNotificationWorkItem(string organizationId) + { + return new OrganizationNotificationWorkItem + { + OrganizationId = organizationId, + IsOverHourlyLimit = false, + IsOverMonthlyLimit = true + }; + } + + private static OrganizationNotificationWorkItem CreateHourlyNotificationWorkItem(string organizationId) + { + return new OrganizationNotificationWorkItem + { + OrganizationId = organizationId, + IsOverHourlyLimit = true, + IsOverMonthlyLimit = false + }; + } +} diff --git a/tests/Exceptionless.Tests/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandlerTests.cs b/tests/Exceptionless.Tests/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandlerTests.cs new file mode 100644 index 0000000000..f1812b61cc --- /dev/null +++ b/tests/Exceptionless.Tests/Jobs/WorkItemHandlers/OrganizationNotificationWorkItemHandlerTests.cs @@ -0,0 +1,249 @@ +using Exceptionless.Core; +using Exceptionless.Core.Extensions; +using Exceptionless.Core.Jobs.WorkItemHandlers; +using Exceptionless.Core.Messaging.Models; +using Exceptionless.Core.Models.WorkItems; +using Exceptionless.Tests.Extensions; +using Foundatio.AsyncEx; +using Foundatio.Caching; +using Foundatio.Jobs; +using Foundatio.Messaging; +using Foundatio.Queues; +using Foundatio.Resilience; +using Foundatio.Serializer; +using Xunit; + +namespace Exceptionless.Tests.Jobs.WorkItemHandlers; + +/// +/// RCA-focused tests for duplicate plan-limit email notifications. +/// +/// Root cause: every web pod registers the same +/// startup action. Because Foundatio pub/sub delivers each message to all subscribers, a single +/// monthly PlanOverage event enqueued one work item per running web pod. The original +/// ThrottlingLockProvider(slotsPerPeriod: 1, period: 1 hour) allowed exactly one item +/// through per calendar-hour bucket. Duplicate items were abandoned back to the queue and +/// reprocessed once each new bucket opened — producing one email per hour. +/// +/// Fix: queue-level dedup ( + +/// DuplicateDetectionQueueBehavior) collapses the fanout at enqueue time, and +/// handler-level idempotency (per-organization distributed lock + sent marker) ensures that stale +/// duplicates already in the queue when the fix deployed cannot retrigger an email. +/// +public class OrganizationNotificationWorkItemHandlerTests : TestWithServices +{ + private const string PrimaryOrganizationId = "664ec4c1f12e4f2b7a0d1001"; + private const string QueueDuplicateDetectionOrganizationId = "664ec4c1f12e4f2b7a0d1002"; + private const string RegisteredQueueOrganizationId = "664ec4c1f12e4f2b7a0d1003"; + private const string DequeuedNotificationOrganizationId = "664ec4c1f12e4f2b7a0d1004"; + private const string HourlyOrganizationId = "664ec4c1f12e4f2b7a0d1005"; + + public OrganizationNotificationWorkItemHandlerTests(ITestOutputHelper output) : base(output) { } + + private IMessagePublisher MessagePublisher => GetService(); + private IMessageSubscriber MessageSubscriber => GetService(); + private ICacheClient CacheClient => GetService(); + + [Fact] + public async Task RunAsync_WhenOnePlanOverageIsObservedBySixSubscribersWithoutQueueDuplicateDetection_ShouldEnqueueSixWorkItems() + { + // Arrange + using var workItemQueue = CreateWorkItemQueue(); + await SubscribeToPlanOverageAsync(workItemQueue, subscriberCount: 6); + + // Act + await PublishPlanOverageAndWaitForQueueAsync(workItemQueue, new PlanOverage { OrganizationId = PrimaryOrganizationId }, expectedEnqueueAttempts: 6, expectedEnqueuedCount: 6); + + // Assert + var queueStats = await workItemQueue.GetQueueStatsAsync(); + Assert.Equal(6, queueStats.Enqueued); + } + + [Fact] + public async Task RunAsync_WhenOnePlanOverageIsObservedBySixSubscribersWithQueueDuplicateDetection_ShouldEnqueueOneWorkItem() + { + // Arrange + using var dedupBehavior = new DuplicateDetectionQueueBehavior(CacheClient, GetService(), TimeSpan.FromHours(24)); + using var workItemQueue = CreateWorkItemQueue(dedupBehavior); + await SubscribeToPlanOverageAsync(workItemQueue, subscriberCount: 6); + + // Act + await PublishPlanOverageAndWaitForQueueAsync(workItemQueue, new PlanOverage { OrganizationId = QueueDuplicateDetectionOrganizationId }, expectedEnqueueAttempts: 6, expectedEnqueuedCount: 1); + + // Assert + var queueStats = await workItemQueue.GetQueueStatsAsync(); + Assert.Equal(1, queueStats.Enqueued); + } + + [Fact] + public async Task EnqueueAsync_WhenDuplicateNotificationUsesRegisteredQueueBehavior_ShouldEnqueueOneWorkItem() + { + // Arrange + var workItemQueue = GetService>(); + var workItem = CreateMonthlyNotificationWorkItem(RegisteredQueueOrganizationId); + + // Act + await workItemQueue.EnqueueAsync(workItem); + await workItemQueue.EnqueueAsync(workItem); + + // Assert + var queueStats = await workItemQueue.GetQueueStatsAsync(); + Assert.Equal(1, queueStats.Enqueued); + } + + [Fact] + public async Task EnqueueAsync_WhenDuplicateNotificationIsDequeued_ShouldAllowFutureEnqueue() + { + // Arrange + using var dedupBehavior = new DuplicateDetectionQueueBehavior(CacheClient, GetService(), TimeSpan.FromHours(24)); + using var workItemQueue = CreateWorkItemQueue(dedupBehavior); + var workItem = CreateMonthlyNotificationWorkItem(DequeuedNotificationOrganizationId); + + await workItemQueue.EnqueueAsync(workItem); + var queueEntry = await workItemQueue.DequeueAsync(TestCancellationToken); + Assert.NotNull(queueEntry); + await queueEntry.CompleteAsync(); + + // Act + await workItemQueue.EnqueueAsync(workItem); + + // Assert + var queueStats = await workItemQueue.GetQueueStatsAsync(); + Assert.Equal(2, queueStats.Enqueued); + } + + [Fact] + public async Task RunAsync_WhenHourlyPlanOverageIsObserved_ShouldEnqueueHourlyWorkItem() + { + // Arrange + using var workItemQueue = CreateWorkItemQueue(); + await SubscribeToPlanOverageAsync(workItemQueue, subscriberCount: 1); + + // Act + await PublishPlanOverageAndWaitForQueueAsync(workItemQueue, new PlanOverage { OrganizationId = HourlyOrganizationId, IsHourly = true }, expectedEnqueueAttempts: 1, expectedEnqueuedCount: 1); + + // Assert + var queueEntry = await workItemQueue.DequeueAsync(TestCancellationToken); + Assert.NotNull(queueEntry); + + var workItem = GetService().Deserialize(queueEntry.Value.Data)!; + Assert.Equal(HourlyOrganizationId, workItem.OrganizationId); + Assert.True(workItem.IsOverHourlyLimit); + Assert.False(workItem.IsOverMonthlyLimit); + + await queueEntry.CompleteAsync(); + } + + [Fact] + public void UniqueIdentifier_WhenMonthlyNotificationIsCreated_ShouldMatchCanonicalNotificationKey() + { + // Arrange + var workItem = CreateMonthlyNotificationWorkItem(PrimaryOrganizationId); + + // Act + var uniqueIdentifier = workItem.UniqueIdentifier; + + // Assert + Assert.Equal(OrganizationNotificationWorkItem.GetNotificationKey(PrimaryOrganizationId, isOverMonthlyLimit: true), uniqueIdentifier); + } + + [Fact] + public void UniqueIdentifier_WhenHourlyNotificationIsCreated_ShouldMatchCanonicalNotificationKey() + { + // Arrange + var workItem = CreateHourlyNotificationWorkItem(PrimaryOrganizationId); + + // Act + var uniqueIdentifier = workItem.UniqueIdentifier; + + // Assert + Assert.Equal(OrganizationNotificationWorkItem.GetNotificationKey(PrimaryOrganizationId, isOverMonthlyLimit: false), uniqueIdentifier); + } + + [Fact] + public void GetNotificationKey_WhenMonthlyAndHourlyNotificationsAreCreated_ShouldUseDifferentKeys() + { + // Arrange + // Act + var monthlyKey = OrganizationNotificationWorkItem.GetNotificationKey(PrimaryOrganizationId, isOverMonthlyLimit: true); + var hourlyKey = OrganizationNotificationWorkItem.GetNotificationKey(PrimaryOrganizationId, isOverMonthlyLimit: false); + + // Assert + Assert.Equal($"Organization:{PrimaryOrganizationId}:notification:monthly", monthlyKey); + Assert.Equal($"Organization:{PrimaryOrganizationId}:notification:hourly", hourlyKey); + Assert.NotEqual(monthlyKey, hourlyKey); + } + + [Theory] + [InlineData(null)] + [InlineData("")] + public void GetNotificationKey_WhenOrganizationIdIsNullOrEmpty_ShouldThrowArgumentException(string? organizationId) + { + // Arrange + // Act & Assert + Assert.ThrowsAny(() => OrganizationNotificationWorkItem.GetNotificationKey(organizationId!, isOverMonthlyLimit: true)); + } + + private async Task SubscribeToPlanOverageAsync(IQueue workItemQueue, int subscriberCount) + { + for (int i = 0; i < subscriberCount; i++) + { + var startupAction = new EnqueueOrganizationNotificationOnPlanOverage(workItemQueue, MessageSubscriber, GetService()); + await startupAction.RunAsync(); + } + } + + private async Task PublishPlanOverageAndWaitForQueueAsync(IQueue workItemQueue, PlanOverage overage, int expectedEnqueueAttempts, int expectedEnqueuedCount) + { + var enqueueAttempts = new AsyncCountdownEvent(expectedEnqueueAttempts); + var enqueued = new AsyncCountdownEvent(expectedEnqueuedCount); + using var enqueueAttemptSubscription = workItemQueue.Enqueuing.AddHandler((_, _) => + { + enqueueAttempts.Signal(); + return Task.CompletedTask; + }); + using var enqueuedSubscription = workItemQueue.Enqueued.AddHandler((_, _) => + { + enqueued.Signal(); + return Task.CompletedTask; + }); + + await MessagePublisher.PublishAsync(overage, cancellationToken: TestCancellationToken); + await enqueueAttempts.WaitAsync(TimeSpan.FromSeconds(5)); + await enqueued.WaitAsync(TimeSpan.FromSeconds(5)); + } + + private InMemoryQueue CreateWorkItemQueue(params IQueueBehavior[] behaviors) + { + var options = new InMemoryQueueOptions + { + Behaviors = behaviors, + Serializer = GetService(), + TimeProvider = TimeProvider, + ResiliencePolicyProvider = GetService(), + LoggerFactory = GetService() + }; + + return new InMemoryQueue(options); + } + + private static OrganizationNotificationWorkItem CreateMonthlyNotificationWorkItem(string organizationId) + { + return new OrganizationNotificationWorkItem + { + OrganizationId = organizationId, + IsOverHourlyLimit = false, + IsOverMonthlyLimit = true + }; + } + + private static OrganizationNotificationWorkItem CreateHourlyNotificationWorkItem(string organizationId) + { + return new OrganizationNotificationWorkItem + { + OrganizationId = organizationId, + IsOverHourlyLimit = true, + IsOverMonthlyLimit = false + }; + } +} diff --git a/tests/Exceptionless.Tests/Mail/CountingMailer.cs b/tests/Exceptionless.Tests/Mail/CountingMailer.cs new file mode 100644 index 0000000000..96d5671c3f --- /dev/null +++ b/tests/Exceptionless.Tests/Mail/CountingMailer.cs @@ -0,0 +1,79 @@ +using Exceptionless.Core.Mail; +using Exceptionless.Core.Models; + +namespace Exceptionless.Tests.Mail; + +public class CountingMailer : IMailer +{ + private int _organizationNoticeCount; + + public int OrganizationNoticeCount => _organizationNoticeCount; + + public List OrganizationNoticeCalls { get; } = []; + + /// + /// When true, throws instead of recording a call. + /// Reset by . + /// + public bool ShouldThrow { get; set; } + + public Task SendEventNoticeAsync(User user, PersistentEvent ev, Project project, bool isNew, bool isRegression, int totalOccurrences) + { + return Task.FromResult(true); + } + + public Task SendOrganizationAddedAsync(User sender, Organization organization, User user) + { + return Task.CompletedTask; + } + + public Task SendOrganizationInviteAsync(User sender, Organization organization, Invite invite) + { + return Task.CompletedTask; + } + + public Task SendOrganizationNoticeAsync(User user, Organization organization, bool isOverMonthlyLimit, bool isOverHourlyLimit) + { + if (ShouldThrow) + throw new InvalidOperationException("Simulated mailer failure."); + + Interlocked.Increment(ref _organizationNoticeCount); + lock (OrganizationNoticeCalls) + { + OrganizationNoticeCalls.Add(new OrganizationNoticeCall(user.Id, organization.Id, isOverMonthlyLimit, isOverHourlyLimit)); + } + return Task.CompletedTask; + } + + public Task SendOrganizationPaymentFailedAsync(User owner, Organization organization) + { + return Task.CompletedTask; + } + + public Task SendProjectDailySummaryAsync(User user, Project project, IEnumerable? mostFrequent, IEnumerable? newest, DateTime startDate, bool hasSubmittedEvents, double count, double uniqueCount, double newCount, double fixedCount, int blockedCount, int tooBigCount, bool isFreePlan) + { + return Task.CompletedTask; + } + + public Task SendUserEmailVerifyAsync(User user) + { + return Task.CompletedTask; + } + + public Task SendUserPasswordResetAsync(User user) + { + return Task.CompletedTask; + } + + public void Reset() + { + Interlocked.Exchange(ref _organizationNoticeCount, 0); + lock (OrganizationNoticeCalls) + { + OrganizationNoticeCalls.Clear(); + } + ShouldThrow = false; + } +} + +public record OrganizationNoticeCall(string UserId, string OrganizationId, bool IsOverMonthlyLimit, bool IsOverHourlyLimit); diff --git a/tests/Exceptionless.Tests/Services/NotificationServiceTests.cs b/tests/Exceptionless.Tests/Services/NotificationServiceTests.cs new file mode 100644 index 0000000000..e472c9c7d5 --- /dev/null +++ b/tests/Exceptionless.Tests/Services/NotificationServiceTests.cs @@ -0,0 +1,81 @@ +using System.Threading; +using Exceptionless.Core.Services; +using Foundatio.Lock; +using Foundatio.Messaging; +using Xunit; + +namespace Exceptionless.Tests.Services; + +public sealed class NotificationServiceTests : TestWithServices +{ + private const string PrimaryOrganizationId = "664ec4c1f12e4f2b7a0d4001"; + + public NotificationServiceTests(ITestOutputHelper output) : base(output) { } + + private NotificationService NotificationService => GetService(); + + [Fact] + public async Task IsOrganizationNotificationLockedAsync_WhenNoLockIsHeld_ShouldNotAcquireAndReleaseLock() + { + // Arrange + var messageBus = GetService(); + var releaseCount = 0; + await messageBus.SubscribeAsync(_ => + { + Interlocked.Increment(ref releaseCount); + return Task.CompletedTask; + }, TestCancellationToken); + + // Act + var isLocked = await NotificationService.IsOrganizationNotificationLockedAsync(PrimaryOrganizationId, isOverMonthlyLimit: true); + + // Assert + Assert.False(isLocked); + Assert.Equal(0, Volatile.Read(ref releaseCount)); + } + + [Theory] + [InlineData(null)] + [InlineData("")] + public async Task OrganizationNotificationMethods_WhenOrganizationIdIsNullOrEmpty_ShouldThrowArgumentException(string? organizationId) + { + // Arrange + const bool IsOverMonthlyLimit = true; + + // Act & Assert + await Assert.ThrowsAnyAsync(() => NotificationService.IsOrganizationNotificationSentAsync(organizationId!, IsOverMonthlyLimit)); + await Assert.ThrowsAnyAsync(() => NotificationService.SetOrganizationNotificationSentAsync(organizationId!, IsOverMonthlyLimit)); + await Assert.ThrowsAnyAsync(() => NotificationService.RemoveOrganizationNotificationSentAsync(organizationId!, IsOverMonthlyLimit)); + await Assert.ThrowsAnyAsync(() => NotificationService.TryAcquireOrganizationNotificationLockAsync(organizationId!, IsOverMonthlyLimit)); + await Assert.ThrowsAnyAsync(() => NotificationService.IsOrganizationNotificationLockedAsync(organizationId!, IsOverMonthlyLimit)); + } + + [Fact] + public async Task SetOrganizationNotificationSentAsync_WhenCalledInLastMillisecondOfUtcMonth_ShouldWriteObservableMarker() + { + // Arrange + TimeProvider.SetUtcNow(new DateTime(2026, 5, 31, 23, 59, 59, 999, DateTimeKind.Utc)); + + // Act + await NotificationService.SetOrganizationNotificationSentAsync(PrimaryOrganizationId, isOverMonthlyLimit: true); + + // Assert + Assert.True(await NotificationService.IsOrganizationNotificationSentAsync(PrimaryOrganizationId, isOverMonthlyLimit: true)); + } + + [Fact] + public async Task TryAcquireOrganizationNotificationLockAsync_WhenOldHourlyBucketBoundaryPasses_ShouldRemainLocked() + { + // Arrange + await using var firstLock = await NotificationService.TryAcquireOrganizationNotificationLockAsync(PrimaryOrganizationId, isOverMonthlyLimit: true); + Assert.NotNull(firstLock); + + // Act + TimeProvider.Advance(TimeSpan.FromHours(1).Add(TimeSpan.FromMinutes(1))); + + // Assert + Assert.True(await NotificationService.IsOrganizationNotificationLockedAsync(PrimaryOrganizationId, isOverMonthlyLimit: true)); + await using var secondLock = await NotificationService.TryAcquireOrganizationNotificationLockAsync(PrimaryOrganizationId, isOverMonthlyLimit: true); + Assert.Null(secondLock); + } +} diff --git a/tests/Exceptionless.Tests/Services/UsageServiceTests.cs b/tests/Exceptionless.Tests/Services/UsageServiceTests.cs index a4ea26928c..8a54c6cbd7 100644 --- a/tests/Exceptionless.Tests/Services/UsageServiceTests.cs +++ b/tests/Exceptionless.Tests/Services/UsageServiceTests.cs @@ -1,5 +1,6 @@ using System.Diagnostics; using Exceptionless.Core.Billing; +using Exceptionless.Core.Extensions; using Exceptionless.Core.Messaging.Models; using Exceptionless.Core.Models; using Exceptionless.Core.Repositories; @@ -18,6 +19,7 @@ public sealed class UsageServiceTests : IntegrationTestsBase private readonly IOrganizationRepository _organizationRepository; private readonly IProjectRepository _projectRepository; private readonly UsageService _usageService; + private readonly NotificationService _notificationService; private readonly BillingPlans _plans; public UsageServiceTests(ITestOutputHelper output, AppWebHostFactory factory) : base(output, factory) @@ -25,11 +27,194 @@ public UsageServiceTests(ITestOutputHelper output, AppWebHostFactory factory) : TimeProvider.SetUtcNow(new DateTime(2015, 2, 13, 0, 0, 0, DateTimeKind.Utc)); Log.SetLogLevel(LogLevel.Information); _usageService = GetService(); + _notificationService = GetService(); _organizationRepository = GetService(); _projectRepository = GetService(); _plans = GetService(); } + private Task SetMonthlySentMarkerAsync(string organizationId) + { + return _notificationService.SetOrganizationNotificationSentAsync(organizationId, isOverMonthlyLimit: true); + } + + private Task MonthlySentMarkerExistsAsync(string organizationId) + { + return _notificationService.IsOrganizationNotificationSentAsync(organizationId, isOverMonthlyLimit: true); + } + + [Fact] + public async Task HandleOrganizationChangeAsync_WhenMonthlyPlanLimitIncreases_ShouldResetOverageNotificationSentMarker() + { + // Arrange + var original = new Organization + { + Id = "664ec4c1f12e4f2b7a0d3001", + Name = "Primary Organization", + PlanId = _plans.SmallPlan.Id, + MaxEventsPerMonth = 100 + }; + + var modified = new Organization + { + Id = original.Id, + Name = original.Name, + PlanId = _plans.MediumPlan.Id, + MaxEventsPerMonth = 200 + }; + + await SetMonthlySentMarkerAsync(original.Id); + + // Act + await _usageService.HandleOrganizationChangeAsync(modified, original); + + // Assert + Assert.False(await MonthlySentMarkerExistsAsync(original.Id)); + } + + [Fact] + public async Task HandleOrganizationChangeAsync_WhenMonthlyPlanLimitIncreasesButOrganizationIsStillOverLimit_ShouldKeepOverageNotificationSentMarker() + { + // Arrange + var original = new Organization + { + Id = "664ec4c1f12e4f2b7a0d3005", + Name = "Primary Organization", + PlanId = _plans.SmallPlan.Id, + MaxEventsPerMonth = 100 + }; + original.GetCurrentUsage(TimeProvider).Total = 250; + + var modified = new Organization + { + Id = original.Id, + Name = original.Name, + PlanId = _plans.MediumPlan.Id, + MaxEventsPerMonth = 200 + }; + modified.GetCurrentUsage(TimeProvider).Total = 250; + + await SetMonthlySentMarkerAsync(original.Id); + + // Act + await _usageService.HandleOrganizationChangeAsync(modified, original); + + // Assert + Assert.True(await MonthlySentMarkerExistsAsync(original.Id)); + } + + [Fact] + public async Task HandleOrganizationChangeAsync_WhenPlanChangesToUnlimited_ShouldResetOverageNotificationSentMarker() + { + // Arrange + var original = new Organization + { + Id = "664ec4c1f12e4f2b7a0d3002", + Name = "Primary Organization", + PlanId = _plans.EnterprisePlan.Id, + MaxEventsPerMonth = _plans.EnterprisePlan.MaxEventsPerMonth + }; + + var modified = new Organization + { + Id = original.Id, + Name = original.Name, + PlanId = _plans.UnlimitedPlan.Id, + MaxEventsPerMonth = _plans.UnlimitedPlan.MaxEventsPerMonth + }; + + await SetMonthlySentMarkerAsync(original.Id); + + // Act + await _usageService.HandleOrganizationChangeAsync(modified, original); + + // Assert + Assert.False(await MonthlySentMarkerExistsAsync(original.Id)); + } + + [Fact] + public async Task HandleOrganizationChangeAsync_WhenPlanLimitDecreasesBelowCurrentUsage_ShouldResetMarkerAndPublishMonthlyOverage() + { + // Arrange + var messageBus = GetService(); + var countdown = new AsyncCountdownEvent(1); + PlanOverage? overage = null; + await messageBus.SubscribeAsync(po => + { + overage = po; + countdown.Signal(); + }, TestCancellationToken); + + var original = new Organization + { + Id = "664ec4c1f12e4f2b7a0d3003", + Name = "Primary Organization", + PlanId = _plans.MediumPlan.Id, + MaxEventsPerMonth = 200 + }; + + var modified = new Organization + { + Id = original.Id, + Name = original.Name, + PlanId = _plans.SmallPlan.Id, + MaxEventsPerMonth = 100 + }; + modified.GetCurrentUsage(TimeProvider).Total = 150; + + await SetMonthlySentMarkerAsync(original.Id); + + // Act + await _usageService.HandleOrganizationChangeAsync(modified, original); + await countdown.WaitAsync(TimeSpan.FromSeconds(5)); + + // Assert + Assert.False(await MonthlySentMarkerExistsAsync(original.Id)); + Assert.NotNull(overage); + Assert.Equal(modified.Id, overage.OrganizationId); + Assert.False(overage.IsHourly); + } + + [Fact] + public async Task HandleOrganizationChangeAsync_WhenAlreadyOverMonthlyLimitAndPlanLimitDecreases_ShouldKeepMarkerAndNotPublishMonthlyOverage() + { + // Arrange + var messageBus = GetService(); + var countdown = new AsyncCountdownEvent(1); + await messageBus.SubscribeAsync(po => + { + if (!po.IsHourly) + countdown.Signal(); + }, TestCancellationToken); + + var original = new Organization + { + Id = "664ec4c1f12e4f2b7a0d3004", + Name = "Primary Organization", + PlanId = _plans.MediumPlan.Id, + MaxEventsPerMonth = 200 + }; + original.GetCurrentUsage(TimeProvider).Total = 250; + + var modified = new Organization + { + Id = original.Id, + Name = original.Name, + PlanId = _plans.SmallPlan.Id, + MaxEventsPerMonth = 100 + }; + modified.GetCurrentUsage(TimeProvider).Total = 250; + + await SetMonthlySentMarkerAsync(original.Id); + + // Act + await _usageService.HandleOrganizationChangeAsync(modified, original); + + // Assert + await Assert.ThrowsAsync(() => countdown.WaitAsync(TimeSpan.FromSeconds(1))); + Assert.True(await MonthlySentMarkerExistsAsync(original.Id)); + } + [Fact] public async Task CanIncrementUsageAsync() {