Skip to content

Commit e654f0a

Browse files
committed
chore: atomic update
1 parent 626c692 commit e654f0a

1 file changed

Lines changed: 83 additions & 123 deletions

File tree

infrastructure/evault-core/src/core/db/db.service.ts

Lines changed: 83 additions & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -543,95 +543,76 @@ export class DbService {
543543
throw new Error("eName is required for updating meta-envelopes");
544544
}
545545

546+
// The whole read-modify-write cycle runs inside a single Neo4j write
547+
// transaction. The opening MERGE+SET acquires a write lock on the
548+
// MetaEnvelope node, so concurrent updates to the same id serialize
549+
// here — without this, request B's "delete stale envelopes" step
550+
// could clobber fields that request A just wrote.
551+
const session = this.driver.session();
546552
try {
547-
let existing = await timed(
548-
"db.updateMetaEnvelopeById.findExisting",
549-
() => this.findMetaEnvelopeById<T>(id, eName),
550-
);
551-
if (!existing) {
552-
await timed(
553-
"db.updateMetaEnvelopeById.createMissing",
554-
() =>
555-
this.runQueryInternal(
556-
`
557-
CREATE (m:MetaEnvelope {
558-
id: $id,
559-
ontology: $ontology,
560-
acl: $acl,
561-
eName: $eName
562-
})
553+
return await session.executeWrite(async (tx) => {
554+
const findResult = await tx.run(
555+
`
556+
MERGE (m:MetaEnvelope { id: $id, eName: $eName })
557+
ON CREATE SET m.ontology = $ontology, m.acl = $acl
558+
ON MATCH SET m.ontology = $ontology, m.acl = $acl
559+
WITH m
560+
OPTIONAL MATCH (m)-[:LINKS_TO]->(e:Envelope)
561+
RETURN collect(e) AS envelopes
563562
`,
564-
{ id, ontology: meta.ontology, acl, eName },
565-
),
563+
{ id, eName, ontology: meta.ontology, acl },
566564
);
567-
existing = {
568-
id,
569-
ontology: meta.ontology,
570-
acl,
571-
parsed: meta.payload,
572-
envelopes: [],
573-
};
574-
}
575565

576-
// Update the meta-envelope properties (ensure eName matches)
577-
await timed("db.updateMetaEnvelopeById.updateMetaProps", () =>
578-
this.runQueryInternal(
579-
`
580-
MATCH (m:MetaEnvelope { id: $id, eName: $eName })
581-
SET m.ontology = $ontology, m.acl = $acl
582-
`,
583-
{ id, ontology: meta.ontology, acl, eName },
584-
),
585-
);
566+
const envelopeNodes: any[] = (
567+
findResult.records[0]?.get("envelopes") ?? []
568+
).filter((n: any) => n !== null && n !== undefined);
586569

587-
// Deduplicate envelopes — if multiple Envelope nodes share the
588-
// same ontology (field name), keep the first and delete the rest.
589-
// This prevents non-deterministic reads where collect(e) returns
590-
// duplicates in undefined order and reduce picks the wrong one.
591-
const seen = new Map<string, string>(); // ontology → kept envelope id
592-
const dupsToDelete: string[] = [];
593-
for (const env of existing.envelopes) {
594-
if (seen.has(env.ontology)) {
595-
dupsToDelete.push(env.id);
596-
} else {
597-
seen.set(env.ontology, env.id);
570+
let workingEnvelopes: Envelope<T[keyof T]>[] =
571+
envelopeNodes.map((node: any) => ({
572+
id: node.properties.id,
573+
ontology: node.properties.ontology,
574+
value: deserializeValue(
575+
node.properties.value,
576+
node.properties.valueType,
577+
) as T[keyof T],
578+
valueType: node.properties.valueType,
579+
}));
580+
581+
// Deduplicate envelopes — if multiple Envelope nodes share the
582+
// same ontology, keep the first and delete the rest.
583+
const seen = new Map<string, string>();
584+
const dupsToDelete: string[] = [];
585+
for (const env of workingEnvelopes) {
586+
if (seen.has(env.ontology)) {
587+
dupsToDelete.push(env.id);
588+
} else {
589+
seen.set(env.ontology, env.id);
590+
}
598591
}
599-
}
600-
if (dupsToDelete.length > 0) {
601-
console.warn(
602-
`[eVault] Cleaning ${dupsToDelete.length} duplicate envelope(s) for MetaEnvelope ${id}`,
603-
);
604-
for (const dupId of dupsToDelete) {
605-
await this.runQueryInternal(
606-
`MATCH (e:Envelope { id: $envelopeId }) DETACH DELETE e`,
607-
{ envelopeId: dupId },
592+
if (dupsToDelete.length > 0) {
593+
console.warn(
594+
`[eVault] Cleaning ${dupsToDelete.length} duplicate envelope(s) for MetaEnvelope ${id}`,
595+
);
596+
await tx.run(
597+
`MATCH (e:Envelope) WHERE e.id IN $ids DETACH DELETE e`,
598+
{ ids: dupsToDelete },
599+
);
600+
workingEnvelopes = workingEnvelopes.filter(
601+
(e) => !dupsToDelete.includes(e.id),
608602
);
609603
}
610-
// Remove deleted dupes from the existing list so the update
611-
// loop below doesn't try to reference them.
612-
existing.envelopes = existing.envelopes.filter(
613-
(e) => !dupsToDelete.includes(e.id),
614-
);
615-
}
616604

617-
const createdEnvelopes: Envelope<T[keyof T]>[] = [];
618-
let counter = 0;
605+
const createdEnvelopes: Envelope<T[keyof T]>[] = [];
619606

620-
// For each field in the new payload
621-
for (const [key, value] of Object.entries(meta.payload)) {
622-
try {
607+
for (const [key, value] of Object.entries(meta.payload)) {
623608
const { value: storedValue, type: valueType } =
624609
serializeValue(value);
625-
const alias = `e${counter}`;
626-
627-
// Check if an envelope with this ontology already exists
628-
const existingEnvelope = existing.envelopes.find(
610+
const existingEnvelope = workingEnvelopes.find(
629611
(e) => e.ontology === key,
630612
);
631613

632614
if (existingEnvelope) {
633-
// Update existing envelope
634-
await this.runQueryInternal(
615+
await tx.run(
635616
`
636617
MATCH (e:Envelope { id: $envelopeId })
637618
SET e.value = $newValue, e.valueType = $valueType
@@ -642,87 +623,66 @@ export class DbService {
642623
valueType,
643624
},
644625
);
645-
646626
createdEnvelopes.push({
647627
id: existingEnvelope.id,
648628
ontology: key,
649629
value: value as T[keyof T],
650630
valueType,
651631
});
652632
} else {
653-
// Create new envelope — use MERGE on the relationship
654-
// + ontology to prevent duplicate Envelopes if two
655-
// concurrent updates race.
656633
const envW3id = await new W3IDBuilder().build();
657634
const envelopeId = envW3id.id;
658-
659-
await this.runQueryInternal(
635+
await tx.run(
660636
`
661637
MATCH (m:MetaEnvelope { id: $metaId, eName: $eName })
662-
MERGE (m)-[:LINKS_TO]->(${alias}:Envelope { ontology: $${alias}_ontology })
663-
ON CREATE SET ${alias}.id = $${alias}_id, ${alias}.value = $${alias}_value, ${alias}.valueType = $${alias}_type
664-
ON MATCH SET ${alias}.value = $${alias}_value, ${alias}.valueType = $${alias}_type
638+
MERGE (m)-[:LINKS_TO]->(e:Envelope { ontology: $ontology })
639+
ON CREATE SET e.id = $envelopeId, e.value = $newValue, e.valueType = $valueType
640+
ON MATCH SET e.value = $newValue, e.valueType = $valueType
665641
`,
666642
{
667643
metaId: id,
668-
eName: eName,
669-
[`${alias}_id`]: envelopeId,
670-
[`${alias}_ontology`]: key,
671-
[`${alias}_value`]: storedValue,
672-
[`${alias}_type`]: valueType,
644+
eName,
645+
envelopeId,
646+
ontology: key,
647+
newValue: storedValue,
648+
valueType,
673649
},
674650
);
675-
676651
createdEnvelopes.push({
677652
id: envelopeId,
678653
ontology: key,
679654
value: value as T[keyof T],
680655
valueType,
681656
});
682657
}
683-
684-
counter++;
685-
} catch (error) {
686-
console.error(`Error processing field ${key}:`, error);
687-
throw error;
688658
}
689-
}
690659

691-
// Delete envelopes that are no longer in the payload
692-
const existingOntologies = new Set(Object.keys(meta.payload));
693-
const envelopesToDelete = existing.envelopes.filter(
694-
(e) => !existingOntologies.has(e.ontology),
695-
);
696-
697-
for (const envelope of envelopesToDelete) {
698-
try {
699-
await this.runQueryInternal(
700-
`
701-
MATCH (e:Envelope { id: $envelopeId })
702-
DETACH DELETE e
703-
`,
704-
{ envelopeId: envelope.id },
705-
);
706-
} catch (error) {
707-
console.error(
708-
`Error deleting envelope ${envelope.id}:`,
709-
error,
660+
// Delete envelopes that are no longer in the payload
661+
const newOntologies = new Set(Object.keys(meta.payload));
662+
const idsToDelete = workingEnvelopes
663+
.filter((e) => !newOntologies.has(e.ontology))
664+
.map((e) => e.id);
665+
if (idsToDelete.length > 0) {
666+
await tx.run(
667+
`MATCH (e:Envelope) WHERE e.id IN $ids DETACH DELETE e`,
668+
{ ids: idsToDelete },
710669
);
711-
throw error;
712670
}
713-
}
714671

715-
return {
716-
metaEnvelope: {
717-
id,
718-
ontology: meta.ontology,
719-
acl,
720-
},
721-
envelopes: createdEnvelopes,
722-
};
672+
return {
673+
metaEnvelope: {
674+
id,
675+
ontology: meta.ontology,
676+
acl,
677+
},
678+
envelopes: createdEnvelopes,
679+
};
680+
});
723681
} catch (error) {
724682
console.error("Error in updateMetaEnvelopeById:", error);
725683
throw error;
684+
} finally {
685+
await session.close();
726686
}
727687
});
728688
}

0 commit comments

Comments
 (0)