Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
-- Drop 4 unused activityRelations indexes (already dropped on prod 2026-03-27,
-- see ACTIVITYRELATIONS_INDEX_CLEANUP.md — IF EXISTS guards for idempotency)
alter table "activityRelations" drop constraint if exists "activityRelations_activityId_memberId_key";

drop index concurrently if exists "ix_activityRelations_memberId_segmentId_include";
drop index concurrently if exists "ix_activityRelations_organizationId_segmentId_include";
drop index concurrently if exists "ix_activityRelations_platform_username";

create index concurrently if not exists idx_osa_org_segment_membercount
on "organizationSegmentsAgg" ("organizationId", "segmentId")
include ("memberCount");
63 changes: 36 additions & 27 deletions services/apps/data_sink_worker/src/service/activity.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import {
} from '@crowd/data-access-layer'
import { IDbActivityRelation } from '@crowd/data-access-layer/src/activityRelations/types'
import { DbStore, arePrimitivesDbEqual } from '@crowd/data-access-layer/src/database'
import { getMemberNoMerge } from '@crowd/data-access-layer/src/member_merge'
import {
IActivityRelationCreateOrUpdateData,
IDbActivity,
Expand Down Expand Up @@ -56,7 +55,7 @@ import {
} from '@crowd/types'

import { IActivityUpdateData, ISentimentActivityInput } from './activity.data'
import MemberService from './member.service'
import MemberService, { mergeIfAllowed } from './member.service'
import { IProcessActivityResult } from './types'

/* eslint-disable @typescript-eslint/no-explicit-any */
Expand Down Expand Up @@ -293,6 +292,24 @@ export default class ActivityService extends LoggerBase {
}
}

// When activity.username is set but differs from the member's platform identity value,
// override it so the member lookup and the identity insert use the same key.
// Example: git activities set activity.username to the author display name (e.g. "John Doe")
// while the identity stores the email (e.g. "john.doe@example.com"). Without this correction
// the lookup misses the existing member, creating an unnecessary orphan member.
if (username && member) {
const platformIdentity = member.identities.find(
(i) => i.platform === platform && i.type === MemberIdentityType.USERNAME && i.value,
)
if (platformIdentity && platformIdentity.value !== username) {
this.log.debug(
{ platform, originalUsername: username, correctedUsername: platformIdentity.value },
'Overriding activity.username with member platform identity value',
)
activity.username = platformIdentity.value
}
}

member.identities = member.identities.filter((i) => i.value)

if (!username) {
Expand Down Expand Up @@ -1721,32 +1738,24 @@ export default class ActivityService extends LoggerBase {
const originalId = metadata.memberWithIdentity as string
const targetId = metadata.memberIdToUpdate as string

// but first check memberNoMerge table
const noMergeMemberIds = await getMemberNoMerge(this.pgQx, [originalId, targetId])

const noMerge = singleOrDefault(
noMergeMemberIds,
(m) =>
(m.memberId === originalId && m.noMergeId === targetId) ||
(m.memberId === targetId && m.noMergeId === originalId),
)

if (noMerge) {
metadata.noMerge = true
} else {
try {
await this.pgQx.tx(async (txPgQx) => {
const service = new CommonMemberService(txPgQx, this.temporal, this.log)
await service.merge(originalId, targetId)
})

try {
const merged = await mergeIfAllowed(
this.pgQx,
this.temporal,
this.log,
originalId,
targetId,
)
if (merged) {
return originalId
} catch (err) {
metadata.mergeError = {
errorMessage: err?.message ?? '<no error message>',
errorStack: err?.stack,
err,
}
} else {
metadata.noMerge = true
}
} catch (err) {
metadata.mergeError = {
errorMessage: err?.message ?? '<no error message>',
errorStack: err?.stack,
err,
}
}
}
Expand Down
235 changes: 218 additions & 17 deletions services/apps/data_sink_worker/src/service/member.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,16 @@ import {
isObjectEmpty,
singleOrDefault,
} from '@crowd/common'
import { BotDetectionService } from '@crowd/common_services'
import { BotDetectionService, CommonMemberService } from '@crowd/common_services'
import { QueryExecutor, createMember, dbStoreQx, updateMember } from '@crowd/data-access-layer'
import { findIdentitiesForMembers, findMembersByVerifiedUsernames } from '@crowd/data-access-layer'
import {
findIdentitiesForMembers,
findMemberIdByVerifiedIdentity,
findMembersByVerifiedUsernames,
moveToNewMember,
} from '@crowd/data-access-layer'
import { DbStore } from '@crowd/data-access-layer/src/database'
import { getMemberNoMerge } from '@crowd/data-access-layer/src/member_merge'
import IntegrationRepository from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/integration.repo'
import {
IDbMember,
Expand Down Expand Up @@ -60,6 +66,36 @@ function orgCacheKey(org: IOrganization): string | null {
return null
}

/**
* Checks the memberNoMerge table and, if allowed, merges secondaryId into primaryId
* using CommonMemberService. Returns true if the merge was performed, false if a noMerge
* record prevents it. Throws if the merge itself fails.
*/
export async function mergeIfAllowed(
pgQx: QueryExecutor,
temporal: TemporalClient,
log: Logger,
primaryId: string,
secondaryId: string,
): Promise<boolean> {
const noMergeMemberIds = await getMemberNoMerge(pgQx, [primaryId, secondaryId])
const noMerge = singleOrDefault(
noMergeMemberIds,
(m) =>
(m.memberId === primaryId && m.noMergeId === secondaryId) ||
(m.memberId === secondaryId && m.noMergeId === primaryId),
)
if (noMerge) {
log.warn({ primaryId, secondaryId }, 'Members are marked as no-merge — skipping merge')
return false
}
await pgQx.tx(async (txPgQx) => {
const service = new CommonMemberService(txPgQx, temporal, log)
await service.merge(primaryId, secondaryId)
})
return true
}

export default class MemberService extends LoggerBase {
private readonly memberRepo: MemberRepository
private readonly pgQx: QueryExecutor
Expand Down Expand Up @@ -91,8 +127,18 @@ export default class MemberService extends LoggerBase {
try {
this.log.debug('Creating a new member!')

// prevent empty identity handles
data.identities = data.identities.filter((i) => i.value)
// filter empty handles and deduplicate by (platform, value, type, verified)
data.identities = data.identities.filter(
(identity, idx) =>
!!identity.value &&
data.identities.findIndex(
(j) =>
j.platform === identity.platform &&
j.value === identity.value &&
j.type === identity.type &&
j.verified === identity.verified,
) === idx,
)

if (data.identities.length === 0) {
throw new Error('Member must have at least one identity!')
Expand Down Expand Up @@ -154,20 +200,163 @@ export default class MemberService extends LoggerBase {
'memberService -> create -> createMember',
)

try {
await logExecutionTimeV2(
() => this.memberRepo.insertIdentities(id, integrationId, data.identities),
this.log,
'memberService -> create -> insertIdentities',
const insertedCount = await logExecutionTimeV2(
() => this.memberRepo.insertIdentities(id, integrationId, data.identities),
this.log,
'memberService -> create -> insertIdentities',
)

if (insertedCount < data.identities.length) {
// At least one verified identity conflicted. Walk every verified identity to:
// (a) find the existing member(s) that own the conflicting ones, and
// (b) collect identities that were successfully inserted for the orphan.
let existingMemberId: string | null = null
const orphanVerifiedIdentities: IMemberIdentity[] = []

for (const identity of data.identities.filter((i) => i.verified)) {
const owner = await findMemberIdByVerifiedIdentity(
this.pgQx,
identity.platform,
identity.value,
identity.type,
)

if (!owner) {
// The identity disappeared between INSERT and SELECT — unusual race condition.
// Cannot safely resolve; schedule orphan deletion and throw.
this.log.error(
{ orphanMemberId: id, identity },
'Verified identity not found after conflict detection — scheduling orphan deletion',
)
await this.scheduleOrphanMemberDeletion(id)
throw new ApplicationError(
`Identity conflict during member creation: owner not found for identity (${identity.platform}, ${identity.value}, ${identity.type})`,
)
} else if (owner === id) {
// Successfully inserted for the orphan — will be moved to the existing member below
orphanVerifiedIdentities.push(identity)
} else if (!existingMemberId) {
// First conflicting owner found
existingMemberId = owner
} else if (existingMemberId !== owner) {
// A second conflicting owner — two existing members each own a different verified
// identity of this incoming member, so the data source asserts they are the same
// person. Auto-merge the second into the first.
this.log.warn(
{
orphanMemberId: id,
primaryMemberId: existingMemberId,
secondaryMemberId: owner,
identity,
},
'Multiple conflicting verified identities belong to different existing members — merging automatically',
)
let merged: boolean
try {
merged = await mergeIfAllowed(
this.pgQx,
this.temporal,
this.log,
existingMemberId,
owner,
)
} catch (mergeErr) {
this.log.error(
mergeErr,
{
orphanMemberId: id,
primaryMemberId: existingMemberId,
secondaryMemberId: owner,
},
'Auto-merge of conflicting members failed — scheduling orphan deletion',
)
await this.scheduleOrphanMemberDeletion(id)
throw new ApplicationError(
`Identity conflict during member creation: auto-merge of members ${existingMemberId} and ${owner} failed for identity (${identity.platform}, ${identity.value}, ${identity.type})`,
)
}
if (!merged) {
this.log.error(
{
orphanMemberId: id,
primaryMemberId: existingMemberId,
secondaryMemberId: owner,
},
'Auto-merge prevented by noMerge record — scheduling orphan deletion',
)
await this.scheduleOrphanMemberDeletion(id)
throw new ApplicationError(
`Identity conflict during member creation: members ${existingMemberId} and ${owner} are marked as no-merge but share identity (${identity.platform}, ${identity.value}, ${identity.type})`,
)
}
// existingMemberId (primary) survives; owner (secondary) was absorbed
this.log.info(
{
orphanMemberId: id,
survivingMemberId: existingMemberId,
mergedMemberId: owner,
identity,
},
'Auto-merge of conflicting members succeeded',
)
}
// else: owner === existingMemberId — same member owns this identity too, nothing to do
}

if (existingMemberId) {
// Move any verified identities that were inserted for the orphan to the existing
// member so they are not lost when the orphan is cascade-deleted.
// UPDATE memberId rather than INSERT to avoid unique constraint violations.
for (const identity of orphanVerifiedIdentities) {
try {
await moveToNewMember(this.pgQx, {
oldMemberId: id,
newMemberId: existingMemberId,
platform: identity.platform,
value: identity.value,
type: identity.type,
})
} catch (moveErr) {
this.log.error(
moveErr,
{ orphanMemberId: id, existingMemberId, identity },
'Failed to move orphan verified identity to existing member — scheduling orphan deletion',
)
await this.scheduleOrphanMemberDeletion(id)
throw new ApplicationError(
`Failed to move identity (${identity.platform}, ${identity.value}, ${identity.type}) from orphan ${id} to existing member ${existingMemberId}`,
)
}
}
this.log.warn(
{
orphanMemberId: id,
existingMemberId,
transferredIdentities: orphanVerifiedIdentities.length,
},
'Identity conflict during member creation — reusing existing member, scheduling orphan deletion',
)
await logExecutionTimeV2(
() => this.memberRepo.addToSegments(existingMemberId, segmentIds),
this.log,
'memberService -> create -> addToSegments (conflict path)',
)
if (releaseMemberLock) {
await releaseMemberLock()
}
await this.scheduleOrphanMemberDeletion(id)
return existingMemberId
}

// insertedCount < data.identities.length but no conflicting owner found — unexpected
this.log.error(
{ memberId: id },
'Identity conflict during member creation but existing member not found — scheduling orphan deletion',
)
} catch (err) {
this.log.error(err, { memberId: id }, 'Error while inserting identities!')
await logExecutionTimeV2(
async () => this.memberRepo.destroyMemberAfterError(id, false),
this.log,
'memberService -> create -> destroyMemberAfterError',
await this.scheduleOrphanMemberDeletion(id)
throw new ApplicationError(
`Identity conflict during member creation for member ${id}: inserted ${insertedCount} of ${data.identities.length} identities but found no conflicting owner`,
)
throw new ApplicationError('Error while inserting identities for a new member!', err)
}

try {
Expand Down Expand Up @@ -396,7 +585,7 @@ export default class MemberService extends LoggerBase {
this.log.trace({ memberId: id }, 'Inserting new identities!')
try {
await logExecutionTimeV2(
() => this.memberRepo.insertIdentities(id, integrationId, identitiesToCreate),
() => this.memberRepo.insertIdentities(id, integrationId, identitiesToCreate, true),
this.log,
'memberService -> update -> insertIdentities',
)
Expand Down Expand Up @@ -822,6 +1011,18 @@ export default class MemberService extends LoggerBase {
return out
}

private async scheduleOrphanMemberDeletion(memberId: string): Promise<void> {
try {
await this.temporal.workflow.start('deleteOrphanMember', {
taskQueue: 'entity-merging',
workflowId: `${TemporalWorkflowId.DELETE_ORPHAN_MEMBER}/${memberId}`,
args: [memberId],
})
} catch (err) {
this.log.error(err, { memberId }, 'Failed to schedule orphan member deletion!')
}
}

private async startMemberBotAnalysisWithLLMWorkflow(memberId: string): Promise<void> {
await this.temporal.workflow.start('processMemberBotAnalysisWithLLM', {
taskQueue: 'profiles',
Expand Down
1 change: 1 addition & 0 deletions services/apps/entity_merging_worker/src/workflows.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export {
deleteOrphanMember,
finishMemberMerging,
finishOrganizationMerging,
finishMemberUnmerging,
Expand Down
Loading
Loading