diff --git a/backend/src/database/migrations/U1774609007__data-sink-worker-optimizations.sql b/backend/src/database/migrations/U1774609007__data-sink-worker-optimizations.sql new file mode 100644 index 0000000000..e69de29bb2 diff --git a/backend/src/database/migrations/V1774609007__data-sink-worker-optimizations.sql b/backend/src/database/migrations/V1774609007__data-sink-worker-optimizations.sql new file mode 100644 index 0000000000..b3be1b5e71 --- /dev/null +++ b/backend/src/database/migrations/V1774609007__data-sink-worker-optimizations.sql @@ -0,0 +1,11 @@ +-- Drop 4 unused activityRelations indexes (already dropped on prod 2026-03-27, +-- see ACTIVITYRELATIONS_INDEX_CLEANUP.md — IF EXISTS guards for idempotency) +alter table "activityRelations" drop constraint if exists "activityRelations_activityId_memberId_key"; + +drop index concurrently if exists "ix_activityRelations_memberId_segmentId_include"; +drop index concurrently if exists "ix_activityRelations_organizationId_segmentId_include"; +drop index concurrently if exists "ix_activityRelations_platform_username"; + +create index concurrently if not exists idx_osa_org_segment_membercount + on "organizationSegmentsAgg" ("organizationId", "segmentId") + include ("memberCount"); \ No newline at end of file diff --git a/services/apps/data_sink_worker/src/service/activity.service.ts b/services/apps/data_sink_worker/src/service/activity.service.ts index c883cb9274..a553cdc6e7 100644 --- a/services/apps/data_sink_worker/src/service/activity.service.ts +++ b/services/apps/data_sink_worker/src/service/activity.service.ts @@ -28,7 +28,6 @@ import { } from '@crowd/data-access-layer' import { IDbActivityRelation } from '@crowd/data-access-layer/src/activityRelations/types' import { DbStore, arePrimitivesDbEqual } from '@crowd/data-access-layer/src/database' -import { getMemberNoMerge } from '@crowd/data-access-layer/src/member_merge' import { IActivityRelationCreateOrUpdateData, IDbActivity, @@ -56,7 +55,7 @@ import { } from '@crowd/types' import { IActivityUpdateData, ISentimentActivityInput } from './activity.data' -import MemberService from './member.service' +import MemberService, { mergeIfAllowed } from './member.service' import { IProcessActivityResult } from './types' /* eslint-disable @typescript-eslint/no-explicit-any */ @@ -293,6 +292,24 @@ export default class ActivityService extends LoggerBase { } } + // When activity.username is set but differs from the member's platform identity value, + // override it so the member lookup and the identity insert use the same key. + // Example: git activities set activity.username to the author display name (e.g. "John Doe") + // while the identity stores the email (e.g. "john.doe@example.com"). Without this correction + // the lookup misses the existing member, creating an unnecessary orphan member. + if (username && member) { + const platformIdentity = member.identities.find( + (i) => i.platform === platform && i.type === MemberIdentityType.USERNAME && i.value, + ) + if (platformIdentity && platformIdentity.value !== username) { + this.log.debug( + { platform, originalUsername: username, correctedUsername: platformIdentity.value }, + 'Overriding activity.username with member platform identity value', + ) + activity.username = platformIdentity.value + } + } + member.identities = member.identities.filter((i) => i.value) if (!username) { @@ -1721,32 +1738,24 @@ export default class ActivityService extends LoggerBase { const originalId = metadata.memberWithIdentity as string const targetId = metadata.memberIdToUpdate as string - // but first check memberNoMerge table - const noMergeMemberIds = await getMemberNoMerge(this.pgQx, [originalId, targetId]) - - const noMerge = singleOrDefault( - noMergeMemberIds, - (m) => - (m.memberId === originalId && m.noMergeId === targetId) || - (m.memberId === targetId && m.noMergeId === originalId), - ) - - if (noMerge) { - metadata.noMerge = true - } else { - try { - await this.pgQx.tx(async (txPgQx) => { - const service = new CommonMemberService(txPgQx, this.temporal, this.log) - await service.merge(originalId, targetId) - }) - + try { + const merged = await mergeIfAllowed( + this.pgQx, + this.temporal, + this.log, + originalId, + targetId, + ) + if (merged) { return originalId - } catch (err) { - metadata.mergeError = { - errorMessage: err?.message ?? '', - errorStack: err?.stack, - err, - } + } else { + metadata.noMerge = true + } + } catch (err) { + metadata.mergeError = { + errorMessage: err?.message ?? '', + errorStack: err?.stack, + err, } } } diff --git a/services/apps/data_sink_worker/src/service/member.service.ts b/services/apps/data_sink_worker/src/service/member.service.ts index aee3994bc4..239d515b8a 100644 --- a/services/apps/data_sink_worker/src/service/member.service.ts +++ b/services/apps/data_sink_worker/src/service/member.service.ts @@ -12,10 +12,16 @@ import { isObjectEmpty, singleOrDefault, } from '@crowd/common' -import { BotDetectionService } from '@crowd/common_services' +import { BotDetectionService, CommonMemberService } from '@crowd/common_services' import { QueryExecutor, createMember, dbStoreQx, updateMember } from '@crowd/data-access-layer' -import { findIdentitiesForMembers, findMembersByVerifiedUsernames } from '@crowd/data-access-layer' +import { + findIdentitiesForMembers, + findMemberIdByVerifiedIdentity, + findMembersByVerifiedUsernames, + moveToNewMember, +} from '@crowd/data-access-layer' import { DbStore } from '@crowd/data-access-layer/src/database' +import { getMemberNoMerge } from '@crowd/data-access-layer/src/member_merge' import IntegrationRepository from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/integration.repo' import { IDbMember, @@ -60,6 +66,36 @@ function orgCacheKey(org: IOrganization): string | null { return null } +/** + * Checks the memberNoMerge table and, if allowed, merges secondaryId into primaryId + * using CommonMemberService. Returns true if the merge was performed, false if a noMerge + * record prevents it. Throws if the merge itself fails. + */ +export async function mergeIfAllowed( + pgQx: QueryExecutor, + temporal: TemporalClient, + log: Logger, + primaryId: string, + secondaryId: string, +): Promise { + const noMergeMemberIds = await getMemberNoMerge(pgQx, [primaryId, secondaryId]) + const noMerge = singleOrDefault( + noMergeMemberIds, + (m) => + (m.memberId === primaryId && m.noMergeId === secondaryId) || + (m.memberId === secondaryId && m.noMergeId === primaryId), + ) + if (noMerge) { + log.warn({ primaryId, secondaryId }, 'Members are marked as no-merge — skipping merge') + return false + } + await pgQx.tx(async (txPgQx) => { + const service = new CommonMemberService(txPgQx, temporal, log) + await service.merge(primaryId, secondaryId) + }) + return true +} + export default class MemberService extends LoggerBase { private readonly memberRepo: MemberRepository private readonly pgQx: QueryExecutor @@ -91,8 +127,18 @@ export default class MemberService extends LoggerBase { try { this.log.debug('Creating a new member!') - // prevent empty identity handles - data.identities = data.identities.filter((i) => i.value) + // filter empty handles and deduplicate by (platform, value, type, verified) + data.identities = data.identities.filter( + (identity, idx) => + !!identity.value && + data.identities.findIndex( + (j) => + j.platform === identity.platform && + j.value === identity.value && + j.type === identity.type && + j.verified === identity.verified, + ) === idx, + ) if (data.identities.length === 0) { throw new Error('Member must have at least one identity!') @@ -154,20 +200,163 @@ export default class MemberService extends LoggerBase { 'memberService -> create -> createMember', ) - try { - await logExecutionTimeV2( - () => this.memberRepo.insertIdentities(id, integrationId, data.identities), - this.log, - 'memberService -> create -> insertIdentities', + const insertedCount = await logExecutionTimeV2( + () => this.memberRepo.insertIdentities(id, integrationId, data.identities), + this.log, + 'memberService -> create -> insertIdentities', + ) + + if (insertedCount < data.identities.length) { + // At least one verified identity conflicted. Walk every verified identity to: + // (a) find the existing member(s) that own the conflicting ones, and + // (b) collect identities that were successfully inserted for the orphan. + let existingMemberId: string | null = null + const orphanVerifiedIdentities: IMemberIdentity[] = [] + + for (const identity of data.identities.filter((i) => i.verified)) { + const owner = await findMemberIdByVerifiedIdentity( + this.pgQx, + identity.platform, + identity.value, + identity.type, + ) + + if (!owner) { + // The identity disappeared between INSERT and SELECT — unusual race condition. + // Cannot safely resolve; schedule orphan deletion and throw. + this.log.error( + { orphanMemberId: id, identity }, + 'Verified identity not found after conflict detection — scheduling orphan deletion', + ) + await this.scheduleOrphanMemberDeletion(id) + throw new ApplicationError( + `Identity conflict during member creation: owner not found for identity (${identity.platform}, ${identity.value}, ${identity.type})`, + ) + } else if (owner === id) { + // Successfully inserted for the orphan — will be moved to the existing member below + orphanVerifiedIdentities.push(identity) + } else if (!existingMemberId) { + // First conflicting owner found + existingMemberId = owner + } else if (existingMemberId !== owner) { + // A second conflicting owner — two existing members each own a different verified + // identity of this incoming member, so the data source asserts they are the same + // person. Auto-merge the second into the first. + this.log.warn( + { + orphanMemberId: id, + primaryMemberId: existingMemberId, + secondaryMemberId: owner, + identity, + }, + 'Multiple conflicting verified identities belong to different existing members — merging automatically', + ) + let merged: boolean + try { + merged = await mergeIfAllowed( + this.pgQx, + this.temporal, + this.log, + existingMemberId, + owner, + ) + } catch (mergeErr) { + this.log.error( + mergeErr, + { + orphanMemberId: id, + primaryMemberId: existingMemberId, + secondaryMemberId: owner, + }, + 'Auto-merge of conflicting members failed — scheduling orphan deletion', + ) + await this.scheduleOrphanMemberDeletion(id) + throw new ApplicationError( + `Identity conflict during member creation: auto-merge of members ${existingMemberId} and ${owner} failed for identity (${identity.platform}, ${identity.value}, ${identity.type})`, + ) + } + if (!merged) { + this.log.error( + { + orphanMemberId: id, + primaryMemberId: existingMemberId, + secondaryMemberId: owner, + }, + 'Auto-merge prevented by noMerge record — scheduling orphan deletion', + ) + await this.scheduleOrphanMemberDeletion(id) + throw new ApplicationError( + `Identity conflict during member creation: members ${existingMemberId} and ${owner} are marked as no-merge but share identity (${identity.platform}, ${identity.value}, ${identity.type})`, + ) + } + // existingMemberId (primary) survives; owner (secondary) was absorbed + this.log.info( + { + orphanMemberId: id, + survivingMemberId: existingMemberId, + mergedMemberId: owner, + identity, + }, + 'Auto-merge of conflicting members succeeded', + ) + } + // else: owner === existingMemberId — same member owns this identity too, nothing to do + } + + if (existingMemberId) { + // Move any verified identities that were inserted for the orphan to the existing + // member so they are not lost when the orphan is cascade-deleted. + // UPDATE memberId rather than INSERT to avoid unique constraint violations. + for (const identity of orphanVerifiedIdentities) { + try { + await moveToNewMember(this.pgQx, { + oldMemberId: id, + newMemberId: existingMemberId, + platform: identity.platform, + value: identity.value, + type: identity.type, + }) + } catch (moveErr) { + this.log.error( + moveErr, + { orphanMemberId: id, existingMemberId, identity }, + 'Failed to move orphan verified identity to existing member — scheduling orphan deletion', + ) + await this.scheduleOrphanMemberDeletion(id) + throw new ApplicationError( + `Failed to move identity (${identity.platform}, ${identity.value}, ${identity.type}) from orphan ${id} to existing member ${existingMemberId}`, + ) + } + } + this.log.warn( + { + orphanMemberId: id, + existingMemberId, + transferredIdentities: orphanVerifiedIdentities.length, + }, + 'Identity conflict during member creation — reusing existing member, scheduling orphan deletion', + ) + await logExecutionTimeV2( + () => this.memberRepo.addToSegments(existingMemberId, segmentIds), + this.log, + 'memberService -> create -> addToSegments (conflict path)', + ) + if (releaseMemberLock) { + await releaseMemberLock() + } + await this.scheduleOrphanMemberDeletion(id) + return existingMemberId + } + + // insertedCount < data.identities.length but no conflicting owner found — unexpected + this.log.error( + { memberId: id }, + 'Identity conflict during member creation but existing member not found — scheduling orphan deletion', ) - } catch (err) { - this.log.error(err, { memberId: id }, 'Error while inserting identities!') - await logExecutionTimeV2( - async () => this.memberRepo.destroyMemberAfterError(id, false), - this.log, - 'memberService -> create -> destroyMemberAfterError', + await this.scheduleOrphanMemberDeletion(id) + throw new ApplicationError( + `Identity conflict during member creation for member ${id}: inserted ${insertedCount} of ${data.identities.length} identities but found no conflicting owner`, ) - throw new ApplicationError('Error while inserting identities for a new member!', err) } try { @@ -396,7 +585,7 @@ export default class MemberService extends LoggerBase { this.log.trace({ memberId: id }, 'Inserting new identities!') try { await logExecutionTimeV2( - () => this.memberRepo.insertIdentities(id, integrationId, identitiesToCreate), + () => this.memberRepo.insertIdentities(id, integrationId, identitiesToCreate, true), this.log, 'memberService -> update -> insertIdentities', ) @@ -822,6 +1011,18 @@ export default class MemberService extends LoggerBase { return out } + private async scheduleOrphanMemberDeletion(memberId: string): Promise { + try { + await this.temporal.workflow.start('deleteOrphanMember', { + taskQueue: 'entity-merging', + workflowId: `${TemporalWorkflowId.DELETE_ORPHAN_MEMBER}/${memberId}`, + args: [memberId], + }) + } catch (err) { + this.log.error(err, { memberId }, 'Failed to schedule orphan member deletion!') + } + } + private async startMemberBotAnalysisWithLLMWorkflow(memberId: string): Promise { await this.temporal.workflow.start('processMemberBotAnalysisWithLLM', { taskQueue: 'profiles', diff --git a/services/apps/entity_merging_worker/src/workflows.ts b/services/apps/entity_merging_worker/src/workflows.ts index 64b6b7ddfd..8fee9f483c 100644 --- a/services/apps/entity_merging_worker/src/workflows.ts +++ b/services/apps/entity_merging_worker/src/workflows.ts @@ -1,4 +1,5 @@ export { + deleteOrphanMember, finishMemberMerging, finishOrganizationMerging, finishMemberUnmerging, diff --git a/services/apps/entity_merging_worker/src/workflows/all.ts b/services/apps/entity_merging_worker/src/workflows/all.ts index d385f06a86..49853a3c18 100644 --- a/services/apps/entity_merging_worker/src/workflows/all.ts +++ b/services/apps/entity_merging_worker/src/workflows/all.ts @@ -24,6 +24,10 @@ const { startToCloseTimeout: '60 minutes', }) +export async function deleteOrphanMember(memberId: string): Promise { + await deleteMember(memberId) +} + export async function finishMemberMerging( primaryId: string, secondaryId: string, diff --git a/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py b/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py index cc802fce0a..005aa4b6a5 100644 --- a/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py +++ b/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py @@ -427,6 +427,7 @@ def create_activity( "platform": self._GIT_PLATFORM, "channel": remote, "body": "\n".join(commit["message"]), + "username": primary_email, "attributes": { "insertions": insertions, "timezone": dt.tzname(), @@ -539,7 +540,7 @@ def create_activities_from_commit( # Create author activity author = { - "username": author_name, + "username": author_email, "displayName": author_name, "emails": [author_email], } @@ -567,7 +568,7 @@ def create_activities_from_commit( committer_source_id = hashlib.sha1(hash_input.encode("utf-8")).hexdigest() committer = { - "username": committer_name, + "username": committer_email, "displayName": committer_name, "emails": [committer_email], } @@ -597,7 +598,6 @@ def create_activities_from_commit( activity_type = activity_type.lower().replace("-by", "") + "-commit" member = { - "username": member_data["email"], "displayName": member_data["name"], "emails": [member_data["email"]], } diff --git a/services/libs/data-access-layer/src/members/identities.ts b/services/libs/data-access-layer/src/members/identities.ts index 1c90f4c08a..94e019ebb4 100644 --- a/services/libs/data-access-layer/src/members/identities.ts +++ b/services/libs/data-access-layer/src/members/identities.ts @@ -194,6 +194,25 @@ export async function moveIdentitiesBetweenMembers( } } +export async function findMemberIdByVerifiedIdentity( + qx: QueryExecutor, + platform: string, + value: string, + type: MemberIdentityType, +): Promise { + const result = await qx.selectOneOrNone( + `SELECT "memberId" FROM "memberIdentities" + WHERE platform = $(platform) + AND value = $(value) + AND type = $(type) + AND verified = true + AND "deletedAt" IS NULL + LIMIT 1`, + { platform, value, type }, + ) + return result?.memberId ?? null +} + export async function insertManyMemberIdentities( qx: QueryExecutor, identities: NewMemberIdentity[], @@ -205,13 +224,13 @@ export async function insertManyMemberIdentities( identities: NewMemberIdentity[], failOnConflict?: boolean, returnRows?: false, -): Promise +): Promise export async function insertManyMemberIdentities( qx: QueryExecutor, identities: NewMemberIdentity[], failOnConflict = false, returnRows = false, -): Promise { +): Promise { const query = prepareBulkInsert( 'memberIdentities', [ @@ -240,7 +259,7 @@ export async function insertManyMemberIdentities( return qx.select(query) } - await qx.result(query) + return qx.result(query) } export async function createMemberIdentity( diff --git a/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/member.repo.ts b/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/member.repo.ts index b4e6266ef4..72e2ce285d 100644 --- a/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/member.repo.ts +++ b/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/member.repo.ts @@ -86,7 +86,8 @@ export default class MemberRepository extends RepositoryBase { memberId: string, integrationId: string, identities: IMemberIdentity[], - ): Promise { + failOnConflict = false, + ): Promise { const objects = identities.map((i) => { return { memberId, @@ -100,7 +101,11 @@ export default class MemberRepository extends RepositoryBase { } }) - await insertManyMemberIdentities(new PgPromiseQueryExecutor(this.db()), objects, true) + return insertManyMemberIdentities( + new PgPromiseQueryExecutor(this.db()), + objects, + failOnConflict, + ) } public async addToSegments(memberId: string, segmentIds: string[]): Promise { diff --git a/services/libs/types/src/enums/temporal.ts b/services/libs/types/src/enums/temporal.ts index 7887886b99..27f5370212 100644 --- a/services/libs/types/src/enums/temporal.ts +++ b/services/libs/types/src/enums/temporal.ts @@ -9,4 +9,5 @@ export enum TemporalWorkflowId { ORGANIZATIONS_CSV_EXPORTS = 'organizations-csv-exports', MEMBER_BOT_ANALYSIS_WITH_LLM = 'member-bot-analysis-with-llm', + DELETE_ORPHAN_MEMBER = 'delete-orphan-member', }