Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/Exceptionless.Core/Bootstrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,12 @@ public static void RegisterServices(IServiceCollection services, AppOptions appO
services.AddSingleton(s => CreateQueue<MailMessage>(s));
services.AddSingleton(s => CreateQueue<WorkItemData>(s, TimeSpan.FromHours(1)));

services.AddSingleton<IQueueBehavior<WorkItemData>>(s =>
new DuplicateDetectionQueueBehavior<WorkItemData>(
s.GetRequiredService<ICacheClient>(),
s.GetRequiredService<ILoggerFactory>(),
TimeSpan.FromHours(24)));

services.AddSingleton<IConnectionMapping, ConnectionMapping>();
services.AddSingleton<MessageService>();
services.AddStartupAction<MessageService>();
Expand Down Expand Up @@ -301,6 +307,7 @@ private static IQueue<T> CreateQueue<T>(IServiceProvider container, TimeSpan? wo
return new InMemoryQueue<T>(new InMemoryQueueOptions<T>
{
WorkItemTimeout = workItemTimeout.GetValueOrDefault(TimeSpan.FromMinutes(5.0)),
Behaviors = container.GetServices<IQueueBehavior<T>>().ToList(),
Serializer = container.GetRequiredService<ISerializer>(),
TimeProvider = container.GetRequiredService<TimeProvider>(),
ResiliencePolicyProvider = container.GetRequiredService<IResiliencePolicyProvider>(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
using Foundatio.Messaging;
using Foundatio.Queues;
using Foundatio.Repositories;
using Foundatio.Resilience;
using Microsoft.Extensions.Logging;

namespace Exceptionless.Core.Jobs.WorkItemHandlers;
Expand Down Expand Up @@ -44,37 +43,101 @@ await _workItemQueue.EnqueueAsync(new OrganizationNotificationWorkItem
}
}

/// <summary>
/// Handles <see cref="OrganizationNotificationWorkItem"/> by sending a plan-overage email at most
/// once per <see cref="NotificationWindow"/> per organization.
///
/// <b>Root cause of duplicate emails (fixed here):</b>
/// Every web pod registers the same <see cref="EnqueueOrganizationNotificationOnPlanOverage"/>
/// startup action, so a single <c>PlanOverage</c> message fanned out to all pods and each pod
/// enqueued its own copy of the work item. The previous
/// <c>ThrottlingLockProvider(slotsPerPeriod: 1, period: 1 hour)</c> allowed exactly one item
/// through per calendar-hour bucket. Abandoned duplicates were re-queued and reprocessed once
/// each new bucket opened — producing one email per hour for as many duplicate items existed.
///
/// <b>Fix layers (both required):</b>
/// <list type="number">
/// <item><description>
/// Queue-level dedup: <see cref="OrganizationNotificationWorkItem.UniqueIdentifier"/> plus
/// <c>DuplicateDetectionQueueBehavior</c> collapses identical fanout enqueues to a single
/// queue entry, preventing duplicates from being enqueued in the first place.
/// </description></item>
/// <item><description>
/// Handler-level idempotency: a distributed lock serialises concurrent processing, and a
/// 24-hour sent marker (<see cref="GetNotificationSentKey"/>) ensures that any stale
/// duplicates already in the queue before the fix deployed cannot re-trigger an email.
/// </description></item>
/// </list>
///
/// Only monthly overages send email. Hourly items short-circuit at
/// <see cref="GetWorkItemLockAsync"/> so they can never block or suppress a later monthly
/// notification via the lock/sent-key path.
/// </summary>
public class OrganizationNotificationWorkItemHandler : WorkItemHandlerBase
{
private static readonly TimeSpan NotificationWindow = TimeSpan.FromHours(24);

private readonly IOrganizationRepository _organizationRepository;
private readonly IUserRepository _userRepository;
private readonly IMailer _mailer;
private readonly ICacheClient _cacheClient;

// ILockProvider is kept local rather than pushed to WorkItemHandlerBase because the
// lock/sent-key contract is specific to plan-limit email idempotency. Passing it through
// the base class would force every unrelated handler to acquire unnecessary plan-limit locks.
private readonly ILockProvider _lockProvider;

public OrganizationNotificationWorkItemHandler(IOrganizationRepository organizationRepository, IUserRepository userRepository, IMailer mailer, ICacheClient cacheClient, TimeProvider timeProvider, IResiliencePolicyProvider resiliencePolicyProvider, ILoggerFactory loggerFactory) : base(loggerFactory)
Comment thread
niemyjski marked this conversation as resolved.
public OrganizationNotificationWorkItemHandler(IOrganizationRepository organizationRepository, IUserRepository userRepository, IMailer mailer, ICacheClient cacheClient, ILockProvider lockProvider, ILoggerFactory loggerFactory) : base(loggerFactory)
{
_organizationRepository = organizationRepository;
_userRepository = userRepository;
_mailer = mailer;
_lockProvider = new ThrottlingLockProvider(cacheClient, 1, TimeSpan.FromHours(1), timeProvider, resiliencePolicyProvider, loggerFactory);
_cacheClient = cacheClient;
_lockProvider = lockProvider;
}

public override Task<ILock?> GetWorkItemLockAsync(object workItem, CancellationToken cancellationToken = default)
{
var wi = (OrganizationNotificationWorkItem)workItem;

// Hourly overages do not send email; skip the lock entirely so hourly items can never
// hold a lock that would prevent a subsequent monthly overage from being processed.
if (!ShouldSendNotificationEmail(wi))
return Task.FromResult<ILock?>(null);

// The lease duration must exceed the maximum expected time for SendOverageNotificationsAsync
// to complete. If the lease expires while email sending is in progress, a second worker
// could acquire the lock, see no sent marker, and begin sending duplicates. 30 minutes
// is conservative for any realistic organization size; the sent marker is the last-resort
// dedup guard even if this window is somehow exceeded.
return _lockProvider.TryAcquireAsync(GetNotificationLockKey(wi.OrganizationId, wi.NotificationType), TimeSpan.FromMinutes(30), cancellationToken);
}
Comment on lines +99 to +114
Comment on lines +99 to +114
Comment on lines +99 to +114

public override Task HandleItemAsync(WorkItemContext context)
public override async Task HandleItemAsync(WorkItemContext context)
{
var wi = context.GetData<OrganizationNotificationWorkItem>()!;
if (!ShouldSendNotificationEmail(wi))
return;

Log.LogInformation("Received organization notification work item for: {OrganizationId} IsOverHourlyLimit: {IsOverHourlyLimit} IsOverMonthlyLimit: {IsOverMonthlyLimit}", wi.OrganizationId, wi.IsOverHourlyLimit, wi.IsOverMonthlyLimit);

string cacheKey = $"{nameof(OrganizationNotificationWorkItemHandler)}:{wi.OrganizationId}";
var organization = await _organizationRepository.GetByIdAsync(wi.OrganizationId, o => o.Cache());
if (organization is null)
return;

return _lockProvider.TryUsingAsync(cacheKey, async () =>
Comment thread
niemyjski marked this conversation as resolved.
if (await WasNotificationSentAsync(wi.OrganizationId, wi.NotificationType))
{
Log.LogInformation("Received organization notification work item for: {Organization} IsOverHourlyLimit: {IsOverHourlyLimit} IsOverMonthlyLimit: {IsOverMonthlyLimit}", wi.OrganizationId, wi.IsOverHourlyLimit, wi.IsOverMonthlyLimit);
var organization = await _organizationRepository.GetByIdAsync(wi.OrganizationId, o => o.Cache());
if (organization is null)
return;

if (wi.IsOverMonthlyLimit)
await SendOverageNotificationsAsync(organization, wi.IsOverHourlyLimit, wi.IsOverMonthlyLimit);
}, TimeSpan.FromMinutes(15), context.CancellationToken);
Log.LogInformation("Skipping duplicate monthly overage notification for organization: {OrganizationId}", wi.OrganizationId);
return;
}

await SendOverageNotificationsAsync(organization, wi.IsOverHourlyLimit, wi.IsOverMonthlyLimit);

// Write the sent marker after completing all sends. If the process crashes or
// SendOverageNotificationsAsync throws mid-loop, some recipients will already have
// received the email and will receive it again on the next retry. This is intentional:
// suppressing retries on partial failure would silently drop email for un-notified users.
await _cacheClient.SetAsync(GetNotificationSentKey(wi.OrganizationId, wi.NotificationType), true, NotificationWindow);
}

private async Task SendOverageNotificationsAsync(Organization organization, bool isOverHourlyLimit, bool isOverMonthlyLimit)
Expand All @@ -100,4 +163,29 @@ private async Task SendOverageNotificationsAsync(Organization organization, bool

Log.LogTrace("Done sending email");
}

// Only monthly overages send email. Hourly overages still trigger real-time websocket
// budget updates in the UI but do not warrant a separate notification email.
// Keeping hourly items out of the lock/sent-key path also means a burst of hourly events
// cannot suppress the monthly notification that follows.
private static bool ShouldSendNotificationEmail(OrganizationNotificationWorkItem workItem)
{
return workItem.IsOverMonthlyLimit;
}

private async Task<bool> WasNotificationSentAsync(string organizationId, string notificationType)
{
var sent = await _cacheClient.GetAsync<bool>(GetNotificationSentKey(organizationId, notificationType));
return sent.HasValue && sent.Value;
}

public static string GetNotificationLockKey(string organizationId, string notificationType)
{
return $"{OrganizationNotificationWorkItem.GetNotificationKey(organizationId, notificationType)}-lock";
}

public static string GetNotificationSentKey(string organizationId, string notificationType)
{
return $"{OrganizationNotificationWorkItem.GetNotificationKey(organizationId, notificationType)}-sent";
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,22 @@
namespace Exceptionless.Core.Models.WorkItems;
using Foundatio.Queues;

public record OrganizationNotificationWorkItem
namespace Exceptionless.Core.Models.WorkItems;

public record OrganizationNotificationWorkItem : IHaveUniqueIdentifier
{
public const string HourlyNotificationType = "hourly";
public const string MonthlyNotificationType = "monthly";

public required string OrganizationId { get; init; }
public required bool IsOverHourlyLimit { get; init; }
public required bool IsOverMonthlyLimit { get; init; }

public string NotificationType => IsOverMonthlyLimit ? MonthlyNotificationType : HourlyNotificationType;

public string? UniqueIdentifier => GetNotificationKey(OrganizationId, NotificationType);

public static string GetNotificationKey(string organizationId, string notificationType)
{
return $"Organization:{organizationId}:notification:{notificationType}";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
using Exceptionless.Core;
using Exceptionless.Core.Billing;
using Exceptionless.Core.Extensions;
using Exceptionless.Core.Jobs.WorkItemHandlers;
using Exceptionless.Core.Mail;
using Exceptionless.Core.Models;
using Exceptionless.Core.Models.WorkItems;
using Exceptionless.Core.Repositories;
using Exceptionless.Tests.Mail;
using Exceptionless.Tests.Utility;
using Foundatio.Caching;
using Foundatio.Jobs;
using Foundatio.Repositories;
using Microsoft.Extensions.DependencyInjection;
using Xunit;

namespace Exceptionless.Tests.Jobs.WorkItemHandlers;

public class OrganizationNotificationWorkItemHandlerIntegrationTests : IntegrationTestsBase
{
private const string PrimaryOrganizationId = "664ec4c1f12e4f2b7a0d1001";
private const string SecondaryOrganizationId = "664ec4c1f12e4f2b7a0d1002";
private const string PrimaryUserId = "664ec4c1f12e4f2b7a0d2001";
private const string SecondaryUserId = "664ec4c1f12e4f2b7a0d2002";

public OrganizationNotificationWorkItemHandlerIntegrationTests(ITestOutputHelper output, AppWebHostFactory factory) : base(output, factory) { }

private CountingMailer Mailer => GetService<CountingMailer>();
private ICacheClient CacheClient => GetService<ICacheClient>();
private OrganizationNotificationWorkItemHandler Handler => GetService<OrganizationNotificationWorkItemHandler>();
private IOrganizationRepository OrganizationRepository => GetService<IOrganizationRepository>();
private IUserRepository UserRepository => GetService<IUserRepository>();
private BillingManager BillingManager => GetService<BillingManager>();
private BillingPlans BillingPlans => GetService<BillingPlans>();
private OrganizationData OrganizationData => GetService<OrganizationData>();
private UserData UserData => GetService<UserData>();

protected override void RegisterServices(IServiceCollection services)
{
base.RegisterServices(services);
services.AddSingleton<CountingMailer>();
services.ReplaceSingleton<IMailer>(sp => sp.GetRequiredService<CountingMailer>());
}

protected override async Task ResetDataAsync()
{
await base.ResetDataAsync();
Mailer.Reset();

var organizations = new[]
{
OrganizationData.GenerateOrganization(BillingManager, BillingPlans, id: PrimaryOrganizationId, name: "Primary Organization"),
OrganizationData.GenerateOrganization(BillingManager, BillingPlans, id: SecondaryOrganizationId, name: "Secondary Organization")
};
await OrganizationRepository.AddAsync(organizations, options => options.ImmediateConsistency());

var primaryUser = UserData.GenerateUser(id: PrimaryUserId, organizationId: PrimaryOrganizationId, emailAddress: "primary-owner@example.org");
primaryUser.FullName = "Primary Owner";
primaryUser.IsEmailAddressVerified = true;

var secondaryUser = UserData.GenerateUser(id: SecondaryUserId, organizationId: SecondaryOrganizationId, emailAddress: "secondary-owner@example.org");
secondaryUser.FullName = "Secondary Owner";
secondaryUser.IsEmailAddressVerified = true;

await UserRepository.AddAsync([primaryUser, secondaryUser], options => options.ImmediateConsistency());
}

[Fact]
public async Task HandleItemAsync_WhenDuplicateMonthlyNotificationsAreProcessedAcrossHours_ShouldSendOneEmail()
{
// Arrange
// Act
await HandleWorkItemAsync(CreateMonthlyNotificationWorkItem(PrimaryOrganizationId));

TimeProvider.Advance(TimeSpan.FromHours(1).Add(TimeSpan.FromMinutes(1)));
await HandleWorkItemAsync(CreateMonthlyNotificationWorkItem(PrimaryOrganizationId));

TimeProvider.Advance(TimeSpan.FromHours(1).Add(TimeSpan.FromMinutes(1)));
await HandleWorkItemAsync(CreateMonthlyNotificationWorkItem(PrimaryOrganizationId));

// Assert
Assert.Equal(1, Mailer.OrganizationNoticeCount);
}

[Fact]
public async Task HandleItemAsync_WhenMonthlyNotificationsTargetDifferentOrganizations_ShouldSendOneEmailPerOrganization()
{
// Arrange
// Act
for (int i = 0; i < 3; i++)
{
await HandleWorkItemAsync(CreateMonthlyNotificationWorkItem(PrimaryOrganizationId));
await HandleWorkItemAsync(CreateMonthlyNotificationWorkItem(SecondaryOrganizationId));
}

// Assert
var callsByOrganization = Mailer.OrganizationNoticeCalls
.GroupBy(call => call.OrganizationId)
.ToDictionary(group => group.Key, group => group.Count());

Assert.Equal(2, callsByOrganization.Count);
Assert.Equal(1, callsByOrganization[PrimaryOrganizationId]);
Assert.Equal(1, callsByOrganization[SecondaryOrganizationId]);
}

[Fact]
public async Task HandleItemAsync_WhenTwentyFourHoursPass_ShouldAllowANewMonthlyNotification()
{
// Arrange
// Act
await HandleWorkItemAsync(CreateMonthlyNotificationWorkItem(PrimaryOrganizationId));

TimeProvider.Advance(TimeSpan.FromHours(25));
await HandleWorkItemAsync(CreateMonthlyNotificationWorkItem(PrimaryOrganizationId));

// Assert
Assert.Equal(2, Mailer.OrganizationNoticeCount);
}

[Fact]
public async Task HandleItemAsync_WhenOnlyAnHourlyOverageIsProcessed_ShouldNotSendEmail()
{
// Arrange
// Act
await HandleWorkItemAsync(CreateHourlyNotificationWorkItem(PrimaryOrganizationId));

// Assert
Assert.Equal(0, Mailer.OrganizationNoticeCount);
}

[Fact]
public async Task HandleItemAsync_WhenTheMonthlySentMarkerAlreadyExists_ShouldNotSendEmail()
{
// Arrange
await CacheClient.SetAsync(
OrganizationNotificationWorkItemHandler.GetNotificationSentKey(PrimaryOrganizationId, OrganizationNotificationWorkItem.MonthlyNotificationType),
true,
TimeSpan.FromHours(24));

// Act
await HandleWorkItemAsync(CreateMonthlyNotificationWorkItem(PrimaryOrganizationId));

// Assert
Assert.Equal(0, Mailer.OrganizationNoticeCount);
}

[Fact]
public async Task HandleItemAsync_WhenHourlyOveragePrecedesMonthlyOverage_ShouldSendTheMonthlyEmail()
{
// Arrange
// Act
await HandleWorkItemAsync(CreateHourlyNotificationWorkItem(PrimaryOrganizationId));
await HandleWorkItemAsync(CreateMonthlyNotificationWorkItem(PrimaryOrganizationId));

// Assert
Assert.Equal(1, Mailer.OrganizationNoticeCount);
}

private async Task HandleWorkItemAsync(OrganizationNotificationWorkItem workItem)
{
await using var workItemLock = await Handler.GetWorkItemLockAsync(workItem, TestContext.Current.CancellationToken);
var context = new WorkItemContext(workItem, "test-job", workItemLock, TestContext.Current.CancellationToken, static (_, _) => Task.CompletedTask);
await Handler.HandleItemAsync(context);
}

private static OrganizationNotificationWorkItem CreateMonthlyNotificationWorkItem(string organizationId)
{
return new OrganizationNotificationWorkItem
{
OrganizationId = organizationId,
IsOverHourlyLimit = false,
IsOverMonthlyLimit = true
};
}

private static OrganizationNotificationWorkItem CreateHourlyNotificationWorkItem(string organizationId)
{
return new OrganizationNotificationWorkItem
{
OrganizationId = organizationId,
IsOverHourlyLimit = true,
IsOverMonthlyLimit = false
};
}
}
Loading