-
-
Notifications
You must be signed in to change notification settings - Fork 508
fix: Optimize cleanup jobs, eliminate raw ES client usage, add test coverage #2267
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,5 @@ | ||
| using Exceptionless.Core.Models; | ||
| using System.Linq.Expressions; | ||
| using Exceptionless.Core.Models; | ||
| using Exceptionless.Core.Repositories.Configuration; | ||
| using Exceptionless.Core.Repositories.Queries; | ||
| using Exceptionless.Core.Validation; | ||
|
|
@@ -11,11 +12,13 @@ namespace Exceptionless.Core.Repositories; | |
|
|
||
| public class EventRepository : RepositoryOwnedByOrganizationAndProject<PersistentEvent>, IEventRepository | ||
| { | ||
| private readonly ExceptionlessElasticConfiguration _configuration; | ||
| private readonly TimeProvider _timeProvider; | ||
|
|
||
| public EventRepository(ExceptionlessElasticConfiguration configuration, AppOptions options, MiniValidationValidator validator) | ||
| : base(configuration.Events, validator, options) | ||
| { | ||
| _configuration = configuration; | ||
| _timeProvider = configuration.TimeProvider; | ||
|
|
||
| DisableCache(); // NOTE: If cache is ever enabled, then fast paths for patching/deleting with scripts will be super slow! | ||
|
|
@@ -189,9 +192,111 @@ public override Task<FindResults<PersistentEvent>> GetByProjectIdAsync(string pr | |
| public Task<long> RemoveAllByStackIdsAsync(string[] stackIds) | ||
| { | ||
| ArgumentNullException.ThrowIfNull(stackIds); | ||
| if (stackIds.Length == 0) | ||
| if (stackIds is []) | ||
| throw new ArgumentOutOfRangeException(nameof(stackIds)); | ||
|
|
||
| return RemoveAllAsync(q => q.Stack(stackIds)); | ||
| } | ||
|
|
||
| public Task<long> RemoveAllByProjectIdsAsync(string[] projectIds) | ||
| { | ||
| ArgumentNullException.ThrowIfNull(projectIds); | ||
| if (projectIds is []) | ||
| throw new ArgumentOutOfRangeException(nameof(projectIds)); | ||
|
|
||
| return RemoveAllAsync(q => q.Project(projectIds)); | ||
| } | ||
|
|
||
| public Task<long> RemoveAllByOrganizationIdsAsync(string[] organizationIds) | ||
| { | ||
| ArgumentNullException.ThrowIfNull(organizationIds); | ||
| if (organizationIds is []) | ||
| throw new ArgumentOutOfRangeException(nameof(organizationIds)); | ||
|
|
||
| return RemoveAllAsync(q => q.Organization(organizationIds)); | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Reassigns all events from the source stacks to the target stack using a parameterized | ||
| /// Painless script (no string interpolation) to prevent script injection. | ||
| /// </summary> | ||
| public Task<long> ReassignStackAsync(IEnumerable<string> sourceStackIds, string targetStackId) | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this feels dangerous where is it used? make sure we have integration tests for this and every method added in repos. |
||
| { | ||
| ArgumentNullException.ThrowIfNull(sourceStackIds); | ||
| ArgumentException.ThrowIfNullOrEmpty(targetStackId); | ||
|
|
||
| // Materialize to avoid multiple enumeration and guard against empty — an empty | ||
| // .Stack() filter would match ALL events and reassign them to the target stack. | ||
| var sourceIds = sourceStackIds.ToList(); | ||
| if (sourceIds.Count == 0) | ||
| return Task.FromResult(0L); | ||
|
|
||
| return PatchAllAsync( | ||
| q => q.Stack(sourceIds), | ||
| new ScriptPatch("ctx._source.stack_id = params.targetStackId") | ||
|
Comment on lines
+223
to
+236
|
||
| { | ||
| Params = new Dictionary<string, object> { ["targetStackId"] = targetStackId } | ||
| }); | ||
| } | ||
|
|
||
| public Task<IReadOnlyCollection<string>> GetDistinctStackIdsAsync(int batchSize, CompositeKeyResult? afterKey = null) | ||
| { | ||
| return GetDistinctFieldValuesAsync("stack_id", e => e.StackId, batchSize, afterKey); | ||
| } | ||
|
|
||
| public Task<IReadOnlyCollection<string>> GetDistinctProjectIdsAsync(int batchSize, CompositeKeyResult? afterKey = null) | ||
| { | ||
| return GetDistinctFieldValuesAsync("project_id", e => e.ProjectId, batchSize, afterKey); | ||
| } | ||
|
|
||
| public Task<IReadOnlyCollection<string>> GetDistinctOrganizationIdsAsync(int batchSize, CompositeKeyResult? afterKey = null) | ||
| { | ||
| return GetDistinctFieldValuesAsync("organization_id", e => e.OrganizationId, batchSize, afterKey); | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Uses a composite aggregation to paginate through all distinct values of a field. | ||
| /// Composite aggregations are preferred over terms aggregations for high-cardinality fields | ||
| /// because terms aggregations can silently miss values when the unique count exceeds the | ||
| /// configured size parameter. Composite aggregations guarantee correct iteration via an | ||
| /// after_key cursor, at the cost of requiring sequential page fetches. | ||
| /// </summary> | ||
| private async Task<IReadOnlyCollection<string>> GetDistinctFieldValuesAsync( | ||
| string fieldName, Expression<Func<PersistentEvent, object>> fieldExpression, int batchSize, CompositeKeyResult? afterKey) | ||
| { | ||
| var afterKeyValues = afterKey?.AfterKey; | ||
| var search = await _configuration.Client.SearchAsync<PersistentEvent>(s => | ||
| { | ||
| s.Size(0).Aggregations(a => a | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should document here why we are using a composite agg. and performance / concerns. |
||
| .Composite($"composite_{fieldName}", c => | ||
| { | ||
| var composite = c.Size(batchSize) | ||
| .Sources(src => src.Terms(fieldName, t => t.Field(fieldExpression))); | ||
| if (afterKeyValues is { Count: > 0 }) | ||
| composite.After(new CompositeKey(afterKeyValues)); | ||
| return composite; | ||
| })); | ||
| return s; | ||
| }); | ||
|
|
||
| var composite = search.Aggregations.Composite($"composite_{fieldName}"); | ||
|
|
||
| // Always clear the cursor first; repopulate only when a next page exists. | ||
| // This ensures callers that check afterKey.AfterKey.Count > 0 correctly | ||
| // detect end-of-pagination without requiring a final empty-result fetch. | ||
| if (afterKey is not null) | ||
| { | ||
| afterKey.AfterKey.Clear(); | ||
| if (composite?.AfterKey is not null) | ||
| { | ||
| foreach (var kvp in composite.AfterKey) | ||
| afterKey.AfterKey[kvp.Key] = kvp.Value; | ||
| } | ||
| } | ||
|
|
||
| if (composite?.Buckets is not { Count: > 0 }) | ||
| return []; | ||
|
|
||
| return composite.Buckets.Select(b => b.Key[fieldName].ToString()!).ToArray(); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -188,6 +188,22 @@ public Task<long> SoftDeleteByProjectIdAsync(string organizationId, string proje | |
| ); | ||
| } | ||
|
|
||
| public async Task<IReadOnlyCollection<string>> GetDuplicateSignaturesAsync(int maxResults = 10000) | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make sure we have integration tests for this. |
||
| { | ||
| // ImmediateConsistency forces a segment refresh before the aggregation so that | ||
| // any stacks soft-deleted in a previous batch are excluded here. Cost: one refresh | ||
| // per batch (not per item), equivalent to the original explicit index refresh. | ||
| var result = await CountAsync( | ||
| q => q.AggregationsExpression($"terms:(duplicate_signature~{maxResults} @min:2)"), | ||
| o => o.ImmediateConsistency()); | ||
|
|
||
| var buckets = result.Aggregations.Terms("terms_duplicate_signature")?.Buckets; | ||
| if (buckets is not { Count: > 0 }) | ||
| return []; | ||
|
|
||
| return buckets.Select(b => b.Key).ToArray(); | ||
| } | ||
|
|
||
| protected override async Task AddDocumentsToCacheAsync(ICollection<FindHit<Stack>> findHits, ICommandOptions options, bool isDirtyRead) | ||
| { | ||
| await base.AddDocumentsToCacheAsync(findHits, options, isDirtyRead); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
during cleanup what's renewing the job lock?