diff --git a/infrastructure/evault-core/src/core/db/db.service.ts b/infrastructure/evault-core/src/core/db/db.service.ts index 4524147b4..8f97cf00a 100644 --- a/infrastructure/evault-core/src/core/db/db.service.ts +++ b/infrastructure/evault-core/src/core/db/db.service.ts @@ -1,5 +1,6 @@ import neo4j, { type Driver } from "neo4j-driver"; import { W3IDBuilder } from "w3id"; +import { timed } from "../utils/timing"; import { deserializeValue, serializeValue } from "./schema"; import type { AppendEnvelopeOperationLogParams, @@ -41,12 +42,15 @@ export class DbService { * @returns The result of the query execution */ private async runQueryInternal(query: string, params: Record) { - const session = this.driver.session(); - try { - return await session.run(query, params); - } finally { - await session.close(); - } + const firstLine = query.trim().split("\n")[0].slice(0, 80); + return timed(`db.query "${firstLine}"`, async () => { + const session = this.driver.session(); + try { + return await session.run(query, params); + } finally { + await session.close(); + } + }); } /** @@ -74,11 +78,14 @@ export class DbService { acl: string[], eName: string, ): Promise> { + return timed("db.storeMetaEnvelope", async () => { if (!eName) { throw new Error("eName is required for storing meta-envelopes"); } - const w3id = await new W3IDBuilder().build(); + const w3id = await timed("db.storeMetaEnvelope.buildMetaId", () => + new W3IDBuilder().build(), + ); const cypher: string[] = [ `CREATE (m:MetaEnvelope { id: $metaId, ontology: $ontology, acl: $acl, eName: $eName })`, @@ -128,7 +135,9 @@ export class DbService { counter++; } - await this.runQueryInternal(cypher.join("\n"), envelopeParams); + await timed("db.storeMetaEnvelope.runQuery", () => + this.runQueryInternal(cypher.join("\n"), envelopeParams), + ); return { metaEnvelope: { @@ -138,6 +147,7 @@ export class DbService { }, envelopes: createdEnvelopes, }; + }); } /** @@ -528,91 +538,81 @@ export class DbService { acl: string[], eName: string, ): Promise> { + return timed("db.updateMetaEnvelopeById", async () => { if (!eName) { throw new Error("eName is required for updating meta-envelopes"); } + // The whole read-modify-write cycle runs inside a single Neo4j write + // transaction. The opening MERGE+SET acquires a write lock on the + // MetaEnvelope node, so concurrent updates to the same id serialize + // here — without this, request B's "delete stale envelopes" step + // could clobber fields that request A just wrote. + const session = this.driver.session(); try { - let existing = await this.findMetaEnvelopeById(id, eName); - if (!existing) { - const metaW3id = await new W3IDBuilder().build(); - await this.runQueryInternal( + return await session.executeWrite(async (tx) => { + const findResult = await tx.run( ` - CREATE (m:MetaEnvelope { - id: $id, - ontology: $ontology, - acl: $acl, - eName: $eName - }) + MERGE (m:MetaEnvelope { id: $id, eName: $eName }) + ON CREATE SET m.ontology = $ontology, m.acl = $acl + ON MATCH SET m.ontology = $ontology, m.acl = $acl + WITH m + OPTIONAL MATCH (m)-[:LINKS_TO]->(e:Envelope) + RETURN collect(e) AS envelopes `, - { id, ontology: meta.ontology, acl, eName }, + { id, eName, ontology: meta.ontology, acl }, ); - existing = { - id, - ontology: meta.ontology, - acl, - parsed: meta.payload, - envelopes: [], - }; - } - // Update the meta-envelope properties (ensure eName matches) - await this.runQueryInternal( - ` - MATCH (m:MetaEnvelope { id: $id, eName: $eName }) - SET m.ontology = $ontology, m.acl = $acl - `, - { id, ontology: meta.ontology, acl, eName }, - ); + const envelopeNodes: any[] = ( + findResult.records[0]?.get("envelopes") ?? [] + ).filter((n: any) => n !== null && n !== undefined); - // Deduplicate envelopes — if multiple Envelope nodes share the - // same ontology (field name), keep the first and delete the rest. - // This prevents non-deterministic reads where collect(e) returns - // duplicates in undefined order and reduce picks the wrong one. - const seen = new Map(); // ontology → kept envelope id - const dupsToDelete: string[] = []; - for (const env of existing.envelopes) { - if (seen.has(env.ontology)) { - dupsToDelete.push(env.id); - } else { - seen.set(env.ontology, env.id); + let workingEnvelopes: Envelope[] = + envelopeNodes.map((node: any) => ({ + id: node.properties.id, + ontology: node.properties.ontology, + value: deserializeValue( + node.properties.value, + node.properties.valueType, + ) as T[keyof T], + valueType: node.properties.valueType, + })); + + // Deduplicate envelopes — if multiple Envelope nodes share the + // same ontology, keep the first and delete the rest. + const seen = new Map(); + const dupsToDelete: string[] = []; + for (const env of workingEnvelopes) { + if (seen.has(env.ontology)) { + dupsToDelete.push(env.id); + } else { + seen.set(env.ontology, env.id); + } } - } - if (dupsToDelete.length > 0) { - console.warn( - `[eVault] Cleaning ${dupsToDelete.length} duplicate envelope(s) for MetaEnvelope ${id}`, - ); - for (const dupId of dupsToDelete) { - await this.runQueryInternal( - `MATCH (e:Envelope { id: $envelopeId }) DETACH DELETE e`, - { envelopeId: dupId }, + if (dupsToDelete.length > 0) { + console.warn( + `[eVault] Cleaning ${dupsToDelete.length} duplicate envelope(s) for MetaEnvelope ${id}`, + ); + await tx.run( + `MATCH (e:Envelope) WHERE e.id IN $ids DETACH DELETE e`, + { ids: dupsToDelete }, + ); + workingEnvelopes = workingEnvelopes.filter( + (e) => !dupsToDelete.includes(e.id), ); } - // Remove deleted dupes from the existing list so the update - // loop below doesn't try to reference them. - existing.envelopes = existing.envelopes.filter( - (e) => !dupsToDelete.includes(e.id), - ); - } - const createdEnvelopes: Envelope[] = []; - let counter = 0; + const createdEnvelopes: Envelope[] = []; - // For each field in the new payload - for (const [key, value] of Object.entries(meta.payload)) { - try { + for (const [key, value] of Object.entries(meta.payload)) { const { value: storedValue, type: valueType } = serializeValue(value); - const alias = `e${counter}`; - - // Check if an envelope with this ontology already exists - const existingEnvelope = existing.envelopes.find( + const existingEnvelope = workingEnvelopes.find( (e) => e.ontology === key, ); if (existingEnvelope) { - // Update existing envelope - await this.runQueryInternal( + await tx.run( ` MATCH (e:Envelope { id: $envelopeId }) SET e.value = $newValue, e.valueType = $valueType @@ -623,7 +623,6 @@ export class DbService { valueType, }, ); - createdEnvelopes.push({ id: existingEnvelope.id, ontology: key, @@ -631,29 +630,24 @@ export class DbService { valueType, }); } else { - // Create new envelope — use MERGE on the relationship - // + ontology to prevent duplicate Envelopes if two - // concurrent updates race. const envW3id = await new W3IDBuilder().build(); const envelopeId = envW3id.id; - - await this.runQueryInternal( + await tx.run( ` MATCH (m:MetaEnvelope { id: $metaId, eName: $eName }) - MERGE (m)-[:LINKS_TO]->(${alias}:Envelope { ontology: $${alias}_ontology }) - ON CREATE SET ${alias}.id = $${alias}_id, ${alias}.value = $${alias}_value, ${alias}.valueType = $${alias}_type - ON MATCH SET ${alias}.value = $${alias}_value, ${alias}.valueType = $${alias}_type + MERGE (m)-[:LINKS_TO]->(e:Envelope { ontology: $ontology }) + ON CREATE SET e.id = $envelopeId, e.value = $newValue, e.valueType = $valueType + ON MATCH SET e.value = $newValue, e.valueType = $valueType `, { metaId: id, - eName: eName, - [`${alias}_id`]: envelopeId, - [`${alias}_ontology`]: key, - [`${alias}_value`]: storedValue, - [`${alias}_type`]: valueType, + eName, + envelopeId, + ontology: key, + newValue: storedValue, + valueType, }, ); - createdEnvelopes.push({ id: envelopeId, ontology: key, @@ -661,50 +655,47 @@ export class DbService { valueType, }); } - - counter++; - } catch (error) { - console.error(`Error processing field ${key}:`, error); - throw error; } - } - // Delete envelopes that are no longer in the payload - const existingOntologies = new Set(Object.keys(meta.payload)); - const envelopesToDelete = existing.envelopes.filter( - (e) => !existingOntologies.has(e.ontology), - ); - - for (const envelope of envelopesToDelete) { - try { - await this.runQueryInternal( - ` - MATCH (e:Envelope { id: $envelopeId }) - DETACH DELETE e - `, - { envelopeId: envelope.id }, - ); - } catch (error) { - console.error( - `Error deleting envelope ${envelope.id}:`, - error, - ); - throw error; + // PATCH semantics: fields absent from the new payload are + // left alone. Callers (notably web3-adapter) project partial + // platform updates through toGlobal — if the platform only + // touched one column, only one ontology reaches us, and + // deleting "stale" envelopes here would clobber every other + // field on the meta-envelope (e.g. wiping participantIds when + // a read-receipt update arrives). + + // Build the full post-write state by merging the pre-write + // envelope set with everything we just wrote. Used by + // resolvers to fan out webhooks containing the complete + // merged state — receivers overwrite their local row with + // whatever the webhook carries, so a partial diff would + // make them lose every untouched field. + const mergedPayload: Record = {}; + for (const env of workingEnvelopes) { + mergedPayload[env.ontology] = env.value; + } + for (const env of createdEnvelopes) { + mergedPayload[env.ontology] = env.value; } - } - return { - metaEnvelope: { - id, - ontology: meta.ontology, - acl, - }, - envelopes: createdEnvelopes, - }; + return { + metaEnvelope: { + id, + ontology: meta.ontology, + acl, + }, + envelopes: createdEnvelopes, + mergedPayload, + }; + }); } catch (error) { console.error("Error in updateMetaEnvelopeById:", error); throw error; + } finally { + await session.close(); } + }); } /** diff --git a/infrastructure/evault-core/src/core/db/migrations/add-id-indexes.ts b/infrastructure/evault-core/src/core/db/migrations/add-id-indexes.ts new file mode 100644 index 000000000..a76a6a209 --- /dev/null +++ b/infrastructure/evault-core/src/core/db/migrations/add-id-indexes.ts @@ -0,0 +1,50 @@ +/** + * Neo4j Migration: Add point-lookup indexes on Envelope.id and MetaEnvelope.id + * + * Without these, every `MATCH (e:Envelope { id: $id })` and + * `MATCH (m:MetaEnvelope { id: $id })` does a NodeByLabelScan over the entire + * Envelope / MetaEnvelope population, which scales linearly with total stored + * data. Update operations call these queries once per payload field, so a + * 5-field chat update can take 10+ seconds. + * + * `id` is generated by W3IDBuilder and is unique by construction, so a plain + * range index on the property is sufficient — no need for a composite index + * with eName, since the id alone is selective. + */ + +import type { Driver } from "neo4j-driver"; + +const STATEMENTS: { name: string; cypher: string }[] = [ + { + name: "envelope_id_index", + cypher: `CREATE INDEX envelope_id_index IF NOT EXISTS FOR (e:Envelope) ON (e.id)`, + }, + { + name: "meta_envelope_id_index", + cypher: `CREATE INDEX meta_envelope_id_index IF NOT EXISTS FOR (m:MetaEnvelope) ON (m.id)`, + }, +]; + +export async function createIdIndexes(driver: Driver): Promise { + const session = driver.session(); + try { + for (const { name, cypher } of STATEMENTS) { + try { + await session.run(cypher); + console.log(`Ensured ${name}`); + } catch (error) { + if ( + error instanceof Error && + error.message.includes("already exists") + ) { + console.log(`${name} already exists`); + } else { + console.error(`Error creating ${name}:`, error); + throw error; + } + } + } + } finally { + await session.close(); + } +} diff --git a/infrastructure/evault-core/src/core/db/types.ts b/infrastructure/evault-core/src/core/db/types.ts index e7f3b2a9d..98475e717 100644 --- a/infrastructure/evault-core/src/core/db/types.ts +++ b/infrastructure/evault-core/src/core/db/types.ts @@ -37,6 +37,11 @@ export type MetaEnvelopeResult< /** * Result type for storing a new meta-envelope. + * + * `envelopes` are the envelopes touched by this operation. `mergedPayload`, + * when present, is the FULL post-write state of the meta-envelope (every + * field, not just the touched ones) — used by callers that need to fan out + * a webhook with the complete merged state rather than the partial diff. */ export type StoreMetaEnvelopeResult< T extends Record = Record @@ -47,6 +52,7 @@ export type StoreMetaEnvelopeResult< acl: string[]; }; envelopes: Envelope[]; + mergedPayload?: Record; }; /** diff --git a/infrastructure/evault-core/src/core/protocol/graphql-server.ts b/infrastructure/evault-core/src/core/protocol/graphql-server.ts index 0ecfe4827..06067a027 100644 --- a/infrastructure/evault-core/src/core/protocol/graphql-server.ts +++ b/infrastructure/evault-core/src/core/protocol/graphql-server.ts @@ -92,6 +92,21 @@ export class GraphQLServer { requestingPlatform: string | null, webhookPayload: any, ): Promise { + // One log line per dispatch — the same payload goes to every + // target platform, so we log the body once here instead of per + // target. This is the source of truth for "what eVault claims it + // sent"; correlate against receiver logs to find divergence. + try { + const payloadJson = JSON.stringify(webhookPayload); + console.log( + `[webhook] id=${webhookPayload?.id} schemaId=${webhookPayload?.schemaId} w3id=${webhookPayload?.w3id} from=${requestingPlatform ?? ""} payload=${payloadJson}`, + ); + } catch { + console.log( + `[webhook] id=${webhookPayload?.id} schemaId=${webhookPayload?.schemaId} payload=`, + ); + } + try { const activePlatforms = await this.getActivePlatforms(); @@ -530,14 +545,21 @@ export class GraphQLServer { parsed: parsedFromEnvelopes, }; - // Deliver webhooks for update operation + // Deliver webhooks for update operation. + // Use the FULL post-write state, not input.payload — + // input.payload is the partial diff the caller sent, + // and receivers overwrite their local row with + // whatever the webhook carries. Sending the diff + // would make the receiver lose every untouched + // field (e.g. a read-receipt update would wipe + // participantIds on the receiver side). const requestingPlatform = context.tokenPayload?.platform || null; const webhookPayload = { id, w3id: context.eName, evaultPublicKey: this.evaultPublicKey, - data: input.payload, + data: result.mergedPayload ?? input.payload, schemaId: input.ontology, }; @@ -1222,14 +1244,18 @@ export class GraphQLServer { context.eName, ); - // Deliver webhooks for update operation + // Deliver webhooks with the FULL post-write state. + // See the long comment on the new updateMetaEnvelope + // resolver above — sending input.payload (the + // partial diff) would make receivers clobber their + // own untouched fields. const requestingPlatform = context.tokenPayload?.platform || null; const webhookPayload = { id: id, w3id: context.eName, evaultPublicKey: this.evaultPublicKey, - data: input.payload, + data: result.mergedPayload ?? input.payload, schemaId: input.ontology, }; diff --git a/infrastructure/evault-core/src/core/protocol/vault-access-guard.ts b/infrastructure/evault-core/src/core/protocol/vault-access-guard.ts index a851a71de..203a80a68 100644 --- a/infrastructure/evault-core/src/core/protocol/vault-access-guard.ts +++ b/infrastructure/evault-core/src/core/protocol/vault-access-guard.ts @@ -3,6 +3,7 @@ import type { YogaInitialContext } from "graphql-yoga"; import * as jose from "jose"; import type { DbService } from "../db/db.service"; import type { MetaEnvelope } from "../db/types"; +import { timed } from "../utils/timing"; export type VaultContext = YogaInitialContext & { currentUser: string | null; @@ -10,6 +11,14 @@ export type VaultContext = YogaInitialContext & { eName: string | null; }; +type CachedJWKS = { + jwks: ReturnType; + expiresAt: number; +}; +const jwksCache = new Map(); +const JWKS_TTL_MS = 24 * 60 * 60 * 1000; +const JWKS_FETCH_TIMEOUT_MS = 5000; + export class VaultAccessGuard { constructor(private db: DbService) {} @@ -35,12 +44,29 @@ export class VaultAccessGuard { return null; } - const jwksResponse = await axios.get( - new URL(`/.well-known/jwks.json`, registryUrl).toString(), - ); + const jwksUrl = new URL( + `/.well-known/jwks.json`, + registryUrl, + ).toString(); + + const now = Date.now(); + let cached = jwksCache.get(jwksUrl); + if (!cached || cached.expiresAt <= now) { + const jwksResponse = await timed( + "guard.validateToken.fetchJWKS", + () => + axios.get(jwksUrl, { + timeout: JWKS_FETCH_TIMEOUT_MS, + }), + ); + cached = { + jwks: jose.createLocalJWKSet(jwksResponse.data), + expiresAt: now + JWKS_TTL_MS, + }; + jwksCache.set(jwksUrl, cached); + } - const JWKS = jose.createLocalJWKSet(jwksResponse.data); - const { payload } = await jose.jwtVerify(token, JWKS); + const { payload } = await jose.jwtVerify(token, cached.jwks); return payload; } catch (error) { @@ -115,11 +141,16 @@ export class VaultAccessGuard { metaEnvelopeId: string, context: VaultContext, ): Promise<{ hasAccess: boolean; exists: boolean }> { - // Validate token if present - const authHeader = - context.request?.headers?.get("authorization") ?? - context.request?.headers?.get("Authorization"); - const tokenPayload = await this.validateToken(authHeader); + // Reuse token payload already validated by validateAuthentication() earlier + // in the middleware; only re-validate as a fallback (e.g. store operations + // where an upstream pass may not have populated it). + let tokenPayload = context.tokenPayload ?? null; + if (!tokenPayload) { + const authHeader = + context.request?.headers?.get("authorization") ?? + context.request?.headers?.get("Authorization"); + tokenPayload = await this.validateToken(authHeader); + } if (tokenPayload) { // Token is valid, set platform context and allow access @@ -226,7 +257,9 @@ export class VaultAccessGuard { !args.id; // storeMetaEnvelope doesn't have id, updateMetaEnvelopeById does // CRITICAL: Validate authentication BEFORE executing any resolver - await this.validateAuthentication(context, isStoreOperation); + await timed("guard.validateAuthentication", () => + this.validateAuthentication(context, isStoreOperation), + ); // For operations that don't require a specific meta envelope ID (bulk queries) if (!args.id && !args.envelopeId) { @@ -261,9 +294,8 @@ export class VaultAccessGuard { } // Check if envelope exists and user has access - const { hasAccess, exists } = await this.checkAccess( - metaEnvelopeId, - context, + const { hasAccess, exists } = await timed("guard.checkAccess", () => + this.checkAccess(metaEnvelopeId, context), ); // For update operations with input, allow in-place creation if envelope doesn't exist diff --git a/infrastructure/evault-core/src/core/utils/timing.ts b/infrastructure/evault-core/src/core/utils/timing.ts new file mode 100644 index 000000000..ac419a313 --- /dev/null +++ b/infrastructure/evault-core/src/core/utils/timing.ts @@ -0,0 +1,40 @@ +const ENABLED = process.env.EVAULT_TIMING !== "0"; +const SLOW_MS = + process.env.EVAULT_TIMING_SLOW_MS !== undefined + ? Number(process.env.EVAULT_TIMING_SLOW_MS) + : 50; + +let counter = 0; +export function newTraceId(prefix = "t"): string { + counter = (counter + 1) % 1_000_000; + return `${prefix}${Date.now().toString(36)}${counter.toString(36)}`; +} + +export async function timed( + label: string, + fn: () => Promise, + traceId?: string, +): Promise { + if (!ENABLED) return fn(); + const start = performance.now(); + try { + const result = await fn(); + const ms = performance.now() - start; + if (ms >= SLOW_MS) { + console.log( + `[timing]${traceId ? ` ${traceId}` : ""} ${label} ${ms.toFixed(1)}ms`, + ); + } + return result; + } catch (err) { + const ms = performance.now() - start; + console.log( + `[timing]${traceId ? ` ${traceId}` : ""} ${label} ${ms.toFixed(1)}ms (failed)`, + ); + throw err; + } +} + +export function timingEnabled(): boolean { + return ENABLED; +} diff --git a/infrastructure/evault-core/src/index.ts b/infrastructure/evault-core/src/index.ts index 4ffcd9f88..9cb085d9a 100644 --- a/infrastructure/evault-core/src/index.ts +++ b/infrastructure/evault-core/src/index.ts @@ -115,6 +115,17 @@ const initializeEVault = async ( console.warn("Failed to create User index:", error); } + // Create id indexes on Envelope and MetaEnvelope so per-field point + // lookups inside updateMetaEnvelopeById don't scan the whole label. + try { + const { createIdIndexes } = await import( + "./core/db/migrations/add-id-indexes" + ); + await createIdIndexes(driver); + } catch (error) { + console.warn("Failed to create id indexes:", error); + } + // Migrate publicKey (string) to publicKeys (array) try { const { migratePublicKeyToArray } = await import( @@ -186,7 +197,11 @@ const initializeEVault = async ( const ip = request.ip; const { allowed, retryAfterSeconds } = checkGlobalRateLimit(token, ip); if (!allowed) { - reply + // In an async onRequest hook, Fastify only short-circuits the + // handler chain if you return the reply object. Without the + // return, the 429 is queued but the downstream handler (GraphQL) + // still runs — turning the rate limiter into a silent counter. + return reply .code(429) .header("Retry-After", String(retryAfterSeconds)) .send({