Skip to content

Commit 34c1acb

Browse files
committed
fix: offload delete member and try to handle member identities conflicts better
Signed-off-by: Uroš Marolt <uros@marolt.me>
1 parent 003b27c commit 34c1acb

10 files changed

Lines changed: 280 additions & 47 deletions

File tree

backend/src/database/migrations/U1774609007__data-sink-worker-optimizations.sql

Whitespace-only changes.
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
-- Drop 4 unused activityRelations indexes (already dropped on prod 2026-03-27,
2+
-- see ACTIVITYRELATIONS_INDEX_CLEANUP.md — IF EXISTS guards for idempotency)
3+
alter table "activityRelations" drop constraint if exists "activityRelations_activityId_memberId_key";
4+
5+
drop index concurrently if exists "ix_activityRelations_memberId_segmentId_include";
6+
drop index concurrently if exists "ix_activityRelations_organizationId_segmentId_include";
7+
drop index concurrently if exists "ix_activityRelations_platform_username";
8+
9+
create index concurrently if not exists idx_osa_org_segment_membercount
10+
on "organizationSegmentsAgg" ("organizationId", "segmentId")
11+
include ("memberCount");

services/apps/data_sink_worker/src/service/activity.service.ts

Lines changed: 36 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import {
2828
} from '@crowd/data-access-layer'
2929
import { IDbActivityRelation } from '@crowd/data-access-layer/src/activityRelations/types'
3030
import { DbStore, arePrimitivesDbEqual } from '@crowd/data-access-layer/src/database'
31-
import { getMemberNoMerge } from '@crowd/data-access-layer/src/member_merge'
3231
import {
3332
IActivityRelationCreateOrUpdateData,
3433
IDbActivity,
@@ -56,7 +55,7 @@ import {
5655
} from '@crowd/types'
5756

5857
import { IActivityUpdateData, ISentimentActivityInput } from './activity.data'
59-
import MemberService from './member.service'
58+
import MemberService, { mergeIfAllowed } from './member.service'
6059
import { IProcessActivityResult } from './types'
6160

6261
/* eslint-disable @typescript-eslint/no-explicit-any */
@@ -293,6 +292,24 @@ export default class ActivityService extends LoggerBase {
293292
}
294293
}
295294

295+
// When activity.username is set but differs from the member's platform identity value,
296+
// override it so the member lookup and the identity insert use the same key.
297+
// Example: git activities set activity.username to the author display name (e.g. "Doug Hellmann")
298+
// while the identity stores the email (e.g. "doug@doughellmann.com"). Without this correction
299+
// the lookup misses the existing member, creating an unnecessary orphan member.
300+
if (username && member) {
301+
const platformIdentity = member.identities.find(
302+
(i) => i.platform === platform && i.type === MemberIdentityType.USERNAME && i.value,
303+
)
304+
if (platformIdentity && platformIdentity.value !== username) {
305+
this.log.debug(
306+
{ platform, originalUsername: username, correctedUsername: platformIdentity.value },
307+
'Overriding activity.username with member platform identity value',
308+
)
309+
activity.username = platformIdentity.value
310+
}
311+
}
312+
296313
member.identities = member.identities.filter((i) => i.value)
297314

298315
if (!username) {
@@ -1721,32 +1738,24 @@ export default class ActivityService extends LoggerBase {
17211738
const originalId = metadata.memberWithIdentity as string
17221739
const targetId = metadata.memberIdToUpdate as string
17231740

1724-
// but first check memberNoMerge table
1725-
const noMergeMemberIds = await getMemberNoMerge(this.pgQx, [originalId, targetId])
1726-
1727-
const noMerge = singleOrDefault(
1728-
noMergeMemberIds,
1729-
(m) =>
1730-
(m.memberId === originalId && m.noMergeId === targetId) ||
1731-
(m.memberId === targetId && m.noMergeId === originalId),
1732-
)
1733-
1734-
if (noMerge) {
1735-
metadata.noMerge = true
1736-
} else {
1737-
try {
1738-
await this.pgQx.tx(async (txPgQx) => {
1739-
const service = new CommonMemberService(txPgQx, this.temporal, this.log)
1740-
await service.merge(originalId, targetId)
1741-
})
1742-
1741+
try {
1742+
const merged = await mergeIfAllowed(
1743+
this.pgQx,
1744+
this.temporal,
1745+
this.log,
1746+
originalId,
1747+
targetId,
1748+
)
1749+
if (merged) {
17431750
return originalId
1744-
} catch (err) {
1745-
metadata.mergeError = {
1746-
errorMessage: err?.message ?? '<no error message>',
1747-
errorStack: err?.stack,
1748-
err,
1749-
}
1751+
} else {
1752+
metadata.noMerge = true
1753+
}
1754+
} catch (err) {
1755+
metadata.mergeError = {
1756+
errorMessage: err?.message ?? '<no error message>',
1757+
errorStack: err?.stack,
1758+
err,
17501759
}
17511760
}
17521761
}

services/apps/data_sink_worker/src/service/member.service.ts

Lines changed: 198 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,16 @@ import {
1212
isObjectEmpty,
1313
singleOrDefault,
1414
} from '@crowd/common'
15-
import { BotDetectionService } from '@crowd/common_services'
15+
import { BotDetectionService, CommonMemberService } from '@crowd/common_services'
1616
import { QueryExecutor, createMember, dbStoreQx, updateMember } from '@crowd/data-access-layer'
17-
import { findIdentitiesForMembers, findMembersByVerifiedUsernames } from '@crowd/data-access-layer'
17+
import {
18+
findIdentitiesForMembers,
19+
findMemberIdByVerifiedIdentity,
20+
findMembersByVerifiedUsernames,
21+
moveToNewMember,
22+
} from '@crowd/data-access-layer'
1823
import { DbStore } from '@crowd/data-access-layer/src/database'
24+
import { getMemberNoMerge } from '@crowd/data-access-layer/src/member_merge'
1925
import IntegrationRepository from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/integration.repo'
2026
import {
2127
IDbMember,
@@ -60,6 +66,36 @@ function orgCacheKey(org: IOrganization): string | null {
6066
return null
6167
}
6268

69+
/**
70+
* Checks the memberNoMerge table and, if allowed, merges secondaryId into primaryId
71+
* using CommonMemberService. Returns true if the merge was performed, false if a noMerge
72+
* record prevents it. Throws if the merge itself fails.
73+
*/
74+
export async function mergeIfAllowed(
75+
pgQx: QueryExecutor,
76+
temporal: TemporalClient,
77+
log: Logger,
78+
primaryId: string,
79+
secondaryId: string,
80+
): Promise<boolean> {
81+
const noMergeMemberIds = await getMemberNoMerge(pgQx, [primaryId, secondaryId])
82+
const noMerge = singleOrDefault(
83+
noMergeMemberIds,
84+
(m) =>
85+
(m.memberId === primaryId && m.noMergeId === secondaryId) ||
86+
(m.memberId === secondaryId && m.noMergeId === primaryId),
87+
)
88+
if (noMerge) {
89+
log.warn({ primaryId, secondaryId }, 'Members are marked as no-merge — skipping merge')
90+
return false
91+
}
92+
await pgQx.tx(async (txPgQx) => {
93+
const service = new CommonMemberService(txPgQx, temporal, log)
94+
await service.merge(primaryId, secondaryId)
95+
})
96+
return true
97+
}
98+
6399
export default class MemberService extends LoggerBase {
64100
private readonly memberRepo: MemberRepository
65101
private readonly pgQx: QueryExecutor
@@ -154,20 +190,155 @@ export default class MemberService extends LoggerBase {
154190
'memberService -> create -> createMember',
155191
)
156192

157-
try {
158-
await logExecutionTimeV2(
159-
() => this.memberRepo.insertIdentities(id, integrationId, data.identities),
160-
this.log,
161-
'memberService -> create -> insertIdentities',
193+
const insertedCount = await logExecutionTimeV2(
194+
() => this.memberRepo.insertIdentities(id, integrationId, data.identities),
195+
this.log,
196+
'memberService -> create -> insertIdentities',
197+
)
198+
199+
if (insertedCount < data.identities.length) {
200+
// At least one verified identity conflicted. Walk every verified identity to:
201+
// (a) find the existing member(s) that own the conflicting ones, and
202+
// (b) collect identities that were successfully inserted for the orphan.
203+
let existingMemberId: string | null = null
204+
const orphanVerifiedIdentities: IMemberIdentity[] = []
205+
206+
for (const identity of data.identities.filter((i) => i.verified)) {
207+
const owner = await findMemberIdByVerifiedIdentity(
208+
this.pgQx,
209+
identity.platform,
210+
identity.value,
211+
identity.type,
212+
)
213+
214+
if (!owner) {
215+
// The identity disappeared between INSERT and SELECT — unusual race condition.
216+
// Cannot safely resolve; schedule orphan deletion and throw.
217+
this.log.error(
218+
{ orphanMemberId: id, identity },
219+
'Verified identity not found after conflict detection — scheduling orphan deletion',
220+
)
221+
await this.scheduleOrphanMemberDeletion(id)
222+
throw new ApplicationError(
223+
`Identity conflict during member creation: owner not found for identity (${identity.platform}, ${identity.value}, ${identity.type})`,
224+
)
225+
} else if (owner === id) {
226+
// Successfully inserted for the orphan — will be moved to the existing member below
227+
orphanVerifiedIdentities.push(identity)
228+
} else if (!existingMemberId) {
229+
// First conflicting owner found
230+
existingMemberId = owner
231+
} else if (existingMemberId !== owner) {
232+
// A second conflicting owner — two existing members each own a different verified
233+
// identity of this incoming member, so the data source asserts they are the same
234+
// person. Auto-merge the second into the first.
235+
this.log.warn(
236+
{
237+
orphanMemberId: id,
238+
primaryMemberId: existingMemberId,
239+
secondaryMemberId: owner,
240+
identity,
241+
},
242+
'Multiple conflicting verified identities belong to different existing members — merging automatically',
243+
)
244+
let merged: boolean
245+
try {
246+
merged = await mergeIfAllowed(
247+
this.pgQx,
248+
this.temporal,
249+
this.log,
250+
existingMemberId,
251+
owner,
252+
)
253+
} catch (mergeErr) {
254+
this.log.error(
255+
mergeErr,
256+
{
257+
orphanMemberId: id,
258+
primaryMemberId: existingMemberId,
259+
secondaryMemberId: owner,
260+
},
261+
'Auto-merge of conflicting members failed — scheduling orphan deletion',
262+
)
263+
await this.scheduleOrphanMemberDeletion(id)
264+
throw new ApplicationError(
265+
`Identity conflict during member creation: auto-merge of members ${existingMemberId} and ${owner} failed for identity (${identity.platform}, ${identity.value}, ${identity.type})`,
266+
)
267+
}
268+
if (!merged) {
269+
this.log.error(
270+
{
271+
orphanMemberId: id,
272+
primaryMemberId: existingMemberId,
273+
secondaryMemberId: owner,
274+
},
275+
'Auto-merge prevented by noMerge record — scheduling orphan deletion',
276+
)
277+
await this.scheduleOrphanMemberDeletion(id)
278+
throw new ApplicationError(
279+
`Identity conflict during member creation: members ${existingMemberId} and ${owner} are marked as no-merge but share identity (${identity.platform}, ${identity.value}, ${identity.type})`,
280+
)
281+
}
282+
// existingMemberId (primary) survives; owner (secondary) was absorbed
283+
this.log.info(
284+
{
285+
orphanMemberId: id,
286+
survivingMemberId: existingMemberId,
287+
mergedMemberId: owner,
288+
identity,
289+
},
290+
'Auto-merge of conflicting members succeeded',
291+
)
292+
}
293+
// else: owner === existingMemberId — same member owns this identity too, nothing to do
294+
}
295+
296+
if (existingMemberId) {
297+
// Move any verified identities that were inserted for the orphan to the existing
298+
// member so they are not lost when the orphan is cascade-deleted.
299+
// UPDATE memberId rather than INSERT to avoid unique constraint violations.
300+
for (const identity of orphanVerifiedIdentities) {
301+
try {
302+
await moveToNewMember(this.pgQx, {
303+
oldMemberId: id,
304+
newMemberId: existingMemberId,
305+
platform: identity.platform,
306+
value: identity.value,
307+
type: identity.type,
308+
})
309+
} catch (moveErr) {
310+
this.log.error(
311+
moveErr,
312+
{ orphanMemberId: id, existingMemberId, identity },
313+
'Failed to move orphan verified identity to existing member — scheduling orphan deletion',
314+
)
315+
await this.scheduleOrphanMemberDeletion(id)
316+
throw new ApplicationError(
317+
`Failed to move identity (${identity.platform}, ${identity.value}, ${identity.type}) from orphan ${id} to existing member ${existingMemberId}`,
318+
)
319+
}
320+
}
321+
this.log.warn(
322+
{
323+
orphanMemberId: id,
324+
existingMemberId,
325+
transferredIdentities: orphanVerifiedIdentities.length,
326+
},
327+
'Identity conflict during member creation — reusing existing member, scheduling orphan deletion',
328+
)
329+
await this.scheduleOrphanMemberDeletion(id)
330+
return existingMemberId
331+
}
332+
333+
// insertedCount < data.identities.length but no conflicting owner found — unexpected
334+
this.log.error(
335+
{ memberId: id },
336+
'Identity conflict during member creation but existing member not found — scheduling orphan deletion',
162337
)
163-
} catch (err) {
164-
this.log.error(err, { memberId: id }, 'Error while inserting identities!')
165-
await logExecutionTimeV2(
166-
async () => this.memberRepo.destroyMemberAfterError(id, false),
167-
this.log,
168-
'memberService -> create -> destroyMemberAfterError',
338+
await this.scheduleOrphanMemberDeletion(id)
339+
throw new ApplicationError(
340+
`Identity conflict during member creation for member ${id}: inserted ${insertedCount} of ${data.identities.length} identities but found no conflicting owner`,
169341
)
170-
throw new ApplicationError('Error while inserting identities for a new member!', err)
171342
}
172343

173344
try {
@@ -396,7 +567,7 @@ export default class MemberService extends LoggerBase {
396567
this.log.trace({ memberId: id }, 'Inserting new identities!')
397568
try {
398569
await logExecutionTimeV2(
399-
() => this.memberRepo.insertIdentities(id, integrationId, identitiesToCreate),
570+
() => this.memberRepo.insertIdentities(id, integrationId, identitiesToCreate, true),
400571
this.log,
401572
'memberService -> update -> insertIdentities',
402573
)
@@ -822,6 +993,18 @@ export default class MemberService extends LoggerBase {
822993
return out
823994
}
824995

996+
private async scheduleOrphanMemberDeletion(memberId: string): Promise<void> {
997+
try {
998+
await this.temporal.workflow.start('deleteOrphanMember', {
999+
taskQueue: 'entity-merging',
1000+
workflowId: `${TemporalWorkflowId.DELETE_ORPHAN_MEMBER}/${memberId}`,
1001+
args: [memberId],
1002+
})
1003+
} catch (err) {
1004+
this.log.error(err, { memberId }, 'Failed to schedule orphan member deletion!')
1005+
}
1006+
}
1007+
8251008
private async startMemberBotAnalysisWithLLMWorkflow(memberId: string): Promise<void> {
8261009
await this.temporal.workflow.start('processMemberBotAnalysisWithLLM', {
8271010
taskQueue: 'profiles',

services/apps/entity_merging_worker/src/workflows.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
export {
2+
deleteOrphanMember,
23
finishMemberMerging,
34
finishOrganizationMerging,
45
finishMemberUnmerging,

services/apps/entity_merging_worker/src/workflows/all.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ const {
2424
startToCloseTimeout: '60 minutes',
2525
})
2626

27+
export async function deleteOrphanMember(memberId: string): Promise<void> {
28+
await deleteMember(memberId)
29+
}
30+
2731
export async function finishMemberMerging(
2832
primaryId: string,
2933
secondaryId: string,

0 commit comments

Comments
 (0)