diff --git a/prisma/migrations/20260510120000_snapshot_cache_body_gzip_rename/migration.sql b/prisma/migrations/20260510120000_snapshot_cache_body_gzip_rename/migration.sql new file mode 100644 index 0000000..7b53d10 --- /dev/null +++ b/prisma/migrations/20260510120000_snapshot_cache_body_gzip_rename/migration.sql @@ -0,0 +1,12 @@ +-- snapshot_cache: rename `body` -> `body_gzip` and add a `content_encoding` +-- column so the encoding contract is explicit at the schema layer and a +-- future migration can add e.g. "zstd" without another rename. +-- +-- Both operations are PostgreSQL metadata-only: +-- - RENAME COLUMN is instant (no table rewrite). +-- - ADD COLUMN with a constant DEFAULT in PG 11+ is also instant. +-- Safe under concurrent reads/writes; no maintenance window required. + +-- AlterTable +ALTER TABLE "snapshot_cache" RENAME COLUMN "body" TO "body_gzip"; +ALTER TABLE "snapshot_cache" ADD COLUMN "content_encoding" TEXT NOT NULL DEFAULT 'gzip'; diff --git a/prisma/schema.prisma b/prisma/schema.prisma index c648848..7277603 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -33,35 +33,35 @@ model CrowdfundingCampaign { } model Drep { - drepId String @id @map("drep_id") - userId String? @unique @map("user_id") - name String? - paymentAddr String? @map("payment_addr") - iconUrl String? @map("icon_url") - doNotList Boolean? @map("do_not_list") - votingPower BigInt @default(0) @map("voting_power") - delegatorCount Int? @map("delegator_count") + drepId String @id @map("drep_id") + userId String? @unique @map("user_id") + name String? + paymentAddr String? @map("payment_addr") + iconUrl String? @map("icon_url") + doNotList Boolean? @map("do_not_list") + votingPower BigInt @default(0) @map("voting_power") + delegatorCount Int? @map("delegator_count") // Denormalised — populated by drep-denorm.service after each epoch sync. // Read by /dreps LIST + /snapshot/dreps to avoid expensive groupBys at request time. - firstSeenEpoch Int? @map("first_seen_epoch") - proposalParticipationPercent Float? @map("proposal_participation_percent") - registered Boolean? @map("registered") - active Boolean? @map("active") - expiresEpoch Int? @map("expires_epoch") - metaUrl String? @map("meta_url") - metaHash String? @map("meta_hash") + firstSeenEpoch Int? @map("first_seen_epoch") + proposalParticipationPercent Float? @map("proposal_participation_percent") + registered Boolean? @map("registered") + active Boolean? @map("active") + expiresEpoch Int? @map("expires_epoch") + metaUrl String? @map("meta_url") + metaHash String? @map("meta_hash") // CIP-119 metadata fields (from /drep_updates meta_json.body) - bio String? @map("bio") - motivations String? @map("motivations") - objectives String? @map("objectives") - qualifications String? @map("qualifications") - references String? @map("references") // JSON string of references array - createdAt DateTime? @default(now()) @map("created_at") - updatedAt DateTime? @map("updated_at") - user User? @relation(fields: [userId], references: [id]) - onchainVotes OnchainVote[] - stakeDelegationStates StakeDelegationState[] - epochSnapshots DrepEpochSnapshot[] + bio String? @map("bio") + motivations String? @map("motivations") + objectives String? @map("objectives") + qualifications String? @map("qualifications") + references String? @map("references") // JSON string of references array + createdAt DateTime? @default(now()) @map("created_at") + updatedAt DateTime? @map("updated_at") + user User? @relation(fields: [userId], references: [id]) + onchainVotes OnchainVote[] + stakeDelegationStates StakeDelegationState[] + epochSnapshots DrepEpochSnapshot[] @@map("drep") } @@ -150,18 +150,22 @@ model StakeDelegationChange { } // Pre-rendered snapshot bodies for /snapshot endpoints. -// Each row holds one composed JSON payload (gzipped) keyed by cacheKey. +// Each row holds one composed JSON payload (encoded per `contentEncoding`, +// currently always "gzip") keyed by cacheKey. // Final chunks are immutable — written once when the chunk's last epoch finalises. // Manifest + dreps + current chunk are rewritten by snapshotBuilder.rebuildAfterEpoch. model SnapshotCache { - cacheKey String @id @map("cache_key") - body Bytes - generatedAt DateTime @map("generated_at") - schemaVersion String @map("schema_version") - isFinal Boolean @default(false) @map("is_final") - byteSize Int @map("byte_size") - etag String - lastAccessedAt DateTime @updatedAt @map("last_accessed_at") + cacheKey String @id @map("cache_key") + bodyGzip Bytes @map("body_gzip") + /// IANA Content-Encoding token. Currently "gzip" for every row; the column + /// exists so a future migration can add e.g. "zstd" without another rename. + contentEncoding String @default("gzip") @map("content_encoding") + generatedAt DateTime @map("generated_at") + schemaVersion String @map("schema_version") + isFinal Boolean @default(false) @map("is_final") + byteSize Int @map("byte_size") + etag String + lastAccessedAt DateTime @updatedAt @map("last_accessed_at") @@index([lastAccessedAt]) @@map("snapshot_cache") @@ -227,15 +231,15 @@ model StakeDelegationStaging { // Tracks per-epoch completion state for the governance analytics sync job. // This lets the job run frequently but only do heavy work once per epoch. model EpochAnalyticsSync { - epoch Int @id @map("epoch_no") - drepsSyncedAt DateTime? @map("dreps_synced_at") - drepInfoSyncedAt DateTime? @map("drep_info_synced_at") - drepSnapshotSyncedAt DateTime? @map("drep_snapshot_synced_at") - totalsSyncedAt DateTime? @map("totals_synced_at") - drepLifecycleSyncedAt DateTime? @map("drep_lifecycle_synced_at") - poolGroupsSyncedAt DateTime? @map("pool_groups_synced_at") - createdAt DateTime @default(now()) @map("created_at") - updatedAt DateTime @updatedAt @map("updated_at") + epoch Int @id @map("epoch_no") + drepsSyncedAt DateTime? @map("dreps_synced_at") + drepInfoSyncedAt DateTime? @map("drep_info_synced_at") + drepSnapshotSyncedAt DateTime? @map("drep_snapshot_synced_at") + totalsSyncedAt DateTime? @map("totals_synced_at") + drepLifecycleSyncedAt DateTime? @map("drep_lifecycle_synced_at") + poolGroupsSyncedAt DateTime? @map("pool_groups_synced_at") + createdAt DateTime @default(now()) @map("created_at") + updatedAt DateTime @updatedAt @map("updated_at") @@map("epoch_analytics_sync") } @@ -348,25 +352,25 @@ model PoolGroup { } model OnchainVote { - id String @id - txHash String @map("tx_hash") - proposalId String @map("proposal_id") - vote VoteType? - voterType VoterType @map("voter_type") - votingPower BigInt? @map("voting_power") - responseEpoch Int? @map("response_epoch") - anchorUrl String? @map("anchor_url") - anchorHash String? @map("anchor_hash") - rationale String? + id String @id + txHash String @map("tx_hash") + proposalId String @map("proposal_id") + vote VoteType? + voterType VoterType @map("voter_type") + votingPower BigInt? @map("voting_power") + responseEpoch Int? @map("response_epoch") + anchorUrl String? @map("anchor_url") + anchorHash String? @map("anchor_hash") + rationale String? surveyResponse String? @map("survey_response") surveyResponseSurveyTxId String? @map("survey_response_survey_tx_id") surveyResponseResponderRole String? @map("survey_response_responder_role") - votedAt DateTime? @default(now()) @map("voted_at") - createdAt DateTime? @default(now()) @map("created_at") - updatedAt DateTime? @map("updated_at") - drepId String? @map("drep_id") - spoId String? @map("spo_id") - ccId String? @map("cc_id") + votedAt DateTime? @default(now()) @map("voted_at") + createdAt DateTime? @default(now()) @map("created_at") + updatedAt DateTime? @map("updated_at") + drepId String? @map("drep_id") + spoId String? @map("spo_id") + ccId String? @map("cc_id") cc CC? @relation(fields: [ccId], references: [ccId]) drep Drep? @relation(fields: [drepId], references: [drepId]) @@ -524,22 +528,22 @@ model ActivityRecent { } model ActivityHistorical { - id String @id @default(cuid()) - repoId String @map("repo_id") - date DateTime @db.Date - commitCount Int @default(0) @map("commit_count") - prOpened Int @default(0) @map("pr_opened") - prMerged Int @default(0) @map("pr_merged") - prClosed Int @default(0) @map("pr_closed") - issuesOpened Int @default(0) @map("issues_opened") - issuesClosed Int @default(0) @map("issues_closed") - additions Int @default(0) - deletions Int @default(0) - uniqueContributors Int @default(0) @map("unique_contributors") - avgPrMergeHours Float? @map("avg_pr_merge_hours") - avgIssueResolutionHours Float? @map("avg_issue_resolution_hours") - releasesPublished Int @default(0) @map("releases_published") - createdAt DateTime @default(now()) @map("created_at") + id String @id @default(cuid()) + repoId String @map("repo_id") + date DateTime @db.Date + commitCount Int @default(0) @map("commit_count") + prOpened Int @default(0) @map("pr_opened") + prMerged Int @default(0) @map("pr_merged") + prClosed Int @default(0) @map("pr_closed") + issuesOpened Int @default(0) @map("issues_opened") + issuesClosed Int @default(0) @map("issues_closed") + additions Int @default(0) + deletions Int @default(0) + uniqueContributors Int @default(0) @map("unique_contributors") + avgPrMergeHours Float? @map("avg_pr_merge_hours") + avgIssueResolutionHours Float? @map("avg_issue_resolution_hours") + releasesPublished Int @default(0) @map("releases_published") + createdAt DateTime @default(now()) @map("created_at") repository GithubRepository @relation(fields: [repoId], references: [id]) diff --git a/src/controllers/data/triggerEpochTotalsSync.ts b/src/controllers/data/triggerEpochTotalsSync.ts index 29bd766..ca61547 100644 --- a/src/controllers/data/triggerEpochTotalsSync.ts +++ b/src/controllers/data/triggerEpochTotalsSync.ts @@ -36,14 +36,34 @@ export const postTriggerEpochTotalsSync = async ( try { const result = await syncEpochTotalsStep(prisma); + // Mirror the cron path: a downstream best-effort hook failure (denorm + // refresh, snapshot rebuild) should propagate to SyncStatus.lastResult + // as "partial" so monitoring can distinguish a green run from one that + // missed an invalidation. + const partialFailure = + Boolean(result.denormRefreshError) + || Boolean(result.snapshotRebuildError); + const partialError = [ + result.denormRefreshError && `denorm: ${result.denormRefreshError}`, + result.snapshotRebuildError + && `snapshot-rebuild: ${result.snapshotRebuildError}`, + ] + .filter(Boolean) + .join("; "); + await releaseJobLock( JOB_NAME, - "success", - result.skippedPrevious ? 1 : 2 + partialFailure ? "partial" : "success", + result.skippedPrevious ? 1 : 2, + partialFailure ? partialError : null ); - console.log("[Epoch Totals Sync] Completed successfully:", { - currentEpoch: result.currentEpoch, epochToSync: result.epochToSync, skippedPrevious: result.skippedPrevious, + console.log("[Epoch Totals Sync] Completed:", { + currentEpoch: result.currentEpoch, + epochToSync: result.epochToSync, + skippedPrevious: result.skippedPrevious, + denormRefreshError: result.denormRefreshError ?? null, + snapshotRebuildError: result.snapshotRebuildError ?? null, }); } catch (error) { console.error("[Epoch Totals Sync] Async processing error:", formatAxiosLikeError(error)); diff --git a/src/controllers/drep/getDReps.ts b/src/controllers/drep/getDReps.ts index 2a8f4b2..fc3d69b 100644 --- a/src/controllers/drep/getDReps.ts +++ b/src/controllers/drep/getDReps.ts @@ -1,16 +1,15 @@ import { Request, Response } from "express"; import { VoterType } from "@prisma/client"; import { prisma } from "../../services"; -import { GetDRepsResponse, DRepSummary } from "../../responses"; +import { + DRepSummary, + GetDRepsResponse, + toAdaString, + toLovelaceString, +} from "../../responses"; import { formatAxiosLikeError } from "../../utils/format-http-client-error"; +import { parseIntegerQuery } from "../../utils/query-params"; -/** - * Converts lovelace (BigInt) to ADA string with 6 decimal places - */ -function lovelaceToAda(lovelace: bigint): string { - const ada = Number(lovelace) / 1_000_000; - return ada.toFixed(6); -} /** * GET /dreps @@ -25,8 +24,21 @@ function lovelaceToAda(lovelace: bigint): string { */ export const getDReps = async (req: Request, res: Response) => { try { - const page = Math.max(1, parseInt(req.query.page as string) || 1); - const pageSize = Math.min(1000, Math.max(1, parseInt(req.query.pageSize as string) || 20)); + const pageR = parseIntegerQuery(req.query.page, "page", { + min: 1, + default: 1, + }); + if (!pageR.ok) return res.status(pageR.status).json(pageR); + const page = pageR.value; + + const pageSizeR = parseIntegerQuery(req.query.pageSize, "pageSize", { + min: 1, + max: 1000, + default: 20, + }); + if (!pageSizeR.ok) return res.status(pageSizeR.status).json(pageSizeR); + const pageSize = pageSizeR.value; + const sortBy = (req.query.sortBy as string) || "votingPower"; const sortOrder = (req.query.sortOrder as string) === "asc" ? "asc" : "desc"; const search = (req.query.search as string) || ""; @@ -150,8 +162,8 @@ export const getDReps = async (req: Request, res: Response) => { drepId: drep.drepId, name: drep.name, iconUrl: drep.iconUrl, - votingPower: drep.votingPower.toString(), - votingPowerAda: lovelaceToAda(drep.votingPower), + votingPower: toLovelaceString(drep.votingPower), + votingPowerAda: toAdaString(drep.votingPower), totalVotesCast: voteCountMap.get(drep.drepId) || 0, delegatorCount: drep.delegatorCount, firstSeenEpoch: diff --git a/src/controllers/healthz/getHealthz.ts b/src/controllers/healthz/getHealthz.ts new file mode 100644 index 0000000..3343753 --- /dev/null +++ b/src/controllers/healthz/getHealthz.ts @@ -0,0 +1,52 @@ +import { Request, Response } from "express"; +import { getSnapshotBootRecoveryStatus } from "../../services/ingestion/snapshot-builder.service"; + +/** + * GET /healthz — liveness + snapshot boot readiness signal. + * + * Public, unauthenticated. Designed for cloud-deploy readiness probes. + * + * Status field: + * - "ok" — process is up, snapshot boot recovery completed (or was + * a no-op because L2 was already fresh). + * - "starting" — boot recover is still running or hasn't started yet. + * - "skipped" — boot recover skipped (no epoch data, or another replica + * is rebuilding). The API will still serve requests but + * the snapshot may be stale until a successful sync. + * - "degraded" — boot recover failed. The API is up but `/snapshot/*` may + * be empty or stale. Probes should mark NOT READY so the + * load balancer drains traffic. + * + * The HTTP status mirrors the readiness signal: 200 for ok/starting/skipped, + * 503 for degraded. Liveness check (process responsive) always returns 200 + * when this controller runs at all. + */ +export const getHealthz = (_req: Request, res: Response) => { + const boot = getSnapshotBootRecoveryStatus(); + + let status: "ok" | "starting" | "skipped" | "degraded"; + switch (boot.state) { + case "ok": + case "fresh": + status = "ok"; + break; + case "running": + case "not-started": + status = "starting"; + break; + case "skipped": + status = "skipped"; + break; + case "failed": + status = "degraded"; + break; + } + + const httpStatus = status === "degraded" ? 503 : 200; + + res.status(httpStatus).json({ + status, + snapshotBootRecovery: boot, + serverTime: new Date().toISOString(), + }); +}; diff --git a/src/controllers/healthz/index.ts b/src/controllers/healthz/index.ts new file mode 100644 index 0000000..f508090 --- /dev/null +++ b/src/controllers/healthz/index.ts @@ -0,0 +1 @@ +export * from "./getHealthz"; diff --git a/src/controllers/migrations/getMigrations.ts b/src/controllers/migrations/getMigrations.ts index a62518b..b7d5ee0 100644 --- a/src/controllers/migrations/getMigrations.ts +++ b/src/controllers/migrations/getMigrations.ts @@ -3,44 +3,82 @@ import { prisma } from "../../services"; import { getMigrationAggregateAccuracy, } from "../../services/ingestion/migration-aggregate.service"; -import { GetMigrationsResponse, MigrationRow } from "../../responses"; +import { + GetMigrationsResponse, + MigrationRow, + toAdaString, + toLovelaceString, +} from "../../responses"; import { formatAxiosLikeError } from "../../utils/format-http-client-error"; +import { parseIntegerQuery } from "../../utils/query-params"; +import { SENTINEL_DREP_IDS } from "../../libs/sentinels"; -const ALWAYS_ABSTAIN = "drep_always_abstain"; -const ALWAYS_NO_CONFIDENCE = "drep_always_no_confidence"; - -function lovelaceToAda(lovelace: bigint): string { - return (Number(lovelace) / 1_000_000).toFixed(6); -} - -function parseIntOpt(value: unknown, fallback: number): number { - if (typeof value !== "string" || !value) return fallback; - const n = parseInt(value, 10); - return Number.isFinite(n) ? n : fallback; -} +/** Default page size for /migrations. Bounded to keep the response under + * ~1MB even with maximum-cardinality DRep churn per epoch. Operators can + * raise via the `limit` query param up to MIGRATIONS_MAX_LIMIT. */ +const MIGRATIONS_DEFAULT_LIMIT = 500; +const MIGRATIONS_MAX_LIMIT = 5000; /** * GET /migrations * * Query params: - * epochStart — inclusive lower bound on delegated epoch (default 0) - * epochEnd — inclusive upper bound (default Number.MAX_SAFE_INTEGER) - * fromDrepId — optional source-DRep filter - * toDrepId — optional target-DRep filter + * epochStart — inclusive lower bound on delegated epoch (default 0) + * epochEnd — inclusive upper bound (default = current largest epoch in the aggregate) + * fromDrepId — optional source-DRep filter + * toDrepId — optional target-DRep filter * excludeSentinels — drop rows touching drep_always_* (default true) - * topNByPower — optional cap: only return rows whose source AND target - * DReps are among the top-N DReps by current voting_power. + * topNByPower — optional cap: only return rows whose source AND target + * DReps are among the top-N DReps by current voting_power. + * limit — page size (default 500, max 5000) + * offset — pagination offset (default 0) */ export const getMigrations = async (req: Request, res: Response) => { try { - const epochStart = parseIntOpt(req.query.epochStart, 0); // Postgres INT max — Prisma rejects MAX_SAFE_INTEGER as out-of-range for an Int column. const POSTGRES_INT_MAX = 2_147_483_647; - const epochEnd = parseIntOpt(req.query.epochEnd, POSTGRES_INT_MAX); + + const epochStartR = parseIntegerQuery(req.query.epochStart, "epochStart", { + min: 0, + max: POSTGRES_INT_MAX, + default: 0, + }); + if (!epochStartR.ok) return res.status(epochStartR.status).json(epochStartR); + const epochStart = epochStartR.value; + + const epochEndR = parseIntegerQuery(req.query.epochEnd, "epochEnd", { + min: 0, + max: POSTGRES_INT_MAX, + default: POSTGRES_INT_MAX, + }); + if (!epochEndR.ok) return res.status(epochEndR.status).json(epochEndR); + const epochEnd = epochEndR.value; + + const topNR = parseIntegerQuery(req.query.topNByPower, "topNByPower", { + min: 0, + default: 0, + }); + if (!topNR.ok) return res.status(topNR.status).json(topNR); + const topNByPower = topNR.value; + + const limitR = parseIntegerQuery(req.query.limit, "limit", { + min: 1, + max: MIGRATIONS_MAX_LIMIT, + default: MIGRATIONS_DEFAULT_LIMIT, + }); + if (!limitR.ok) return res.status(limitR.status).json(limitR); + const limit = limitR.value; + + const offsetR = parseIntegerQuery(req.query.offset, "offset", { + min: 0, + default: 0, + }); + if (!offsetR.ok) return res.status(offsetR.status).json(offsetR); + const offset = offsetR.value; + const fromDrepId = typeof req.query.fromDrepId === "string" ? req.query.fromDrepId : undefined; const toDrepId = typeof req.query.toDrepId === "string" ? req.query.toDrepId : undefined; const excludeSentinels = req.query.excludeSentinels !== "false"; - const topNByPower = parseIntOpt(req.query.topNByPower, 0); let topNDrepIds: string[] | null = null; if (topNByPower > 0) { @@ -56,7 +94,7 @@ export const getMigrations = async (req: Request, res: Response) => { const fromDrepIdFilter: Record = {}; if (fromDrepId) fromDrepIdFilter.equals = fromDrepId; if (excludeSentinels) { - fromDrepIdFilter.notIn = [ALWAYS_ABSTAIN, ALWAYS_NO_CONFIDENCE]; + fromDrepIdFilter.notIn = [...SENTINEL_DREP_IDS]; } if (topNDrepIds) { fromDrepIdFilter.in = topNDrepIds; @@ -65,27 +103,34 @@ export const getMigrations = async (req: Request, res: Response) => { const toDrepIdFilter: Record = {}; if (toDrepId) toDrepIdFilter.equals = toDrepId; if (excludeSentinels) { - toDrepIdFilter.notIn = [ALWAYS_ABSTAIN, ALWAYS_NO_CONFIDENCE]; + toDrepIdFilter.notIn = [...SENTINEL_DREP_IDS]; } if (topNDrepIds) { toDrepIdFilter.in = topNDrepIds; } - const rows = await prisma.migrationAggregate.findMany({ - where: { - epoch: { gte: epochStart, lte: epochEnd }, - ...(Object.keys(fromDrepIdFilter).length ? { fromDrepId: fromDrepIdFilter as any } : {}), - ...(Object.keys(toDrepIdFilter).length ? { toDrepId: toDrepIdFilter as any } : {}), - }, - orderBy: [{ epoch: "asc" }, { fromDrepId: "asc" }, { toDrepId: "asc" }], - }); + const where = { + epoch: { gte: epochStart, lte: epochEnd }, + ...(Object.keys(fromDrepIdFilter).length ? { fromDrepId: fromDrepIdFilter as any } : {}), + ...(Object.keys(toDrepIdFilter).length ? { toDrepId: toDrepIdFilter as any } : {}), + }; + + const [rows, total] = await Promise.all([ + prisma.migrationAggregate.findMany({ + where, + orderBy: [{ epoch: "asc" }, { fromDrepId: "asc" }, { toDrepId: "asc" }], + skip: offset, + take: limit, + }), + prisma.migrationAggregate.count({ where }), + ]); const migrations: MigrationRow[] = rows.map((r) => ({ epoch: r.epoch, fromDrepId: r.fromDrepId, toDrepId: r.toDrepId, - lovelace: r.adaLovelace.toString(), - ada: lovelaceToAda(r.adaLovelace), + lovelace: toLovelaceString(r.adaLovelace), + ada: toAdaString(r.adaLovelace), delegators: r.delegators, })); @@ -109,12 +154,19 @@ export const getMigrations = async (req: Request, res: Response) => { epochStart, epochEnd: epochEnd === POSTGRES_INT_MAX ? -1 : epochEnd, rowCount: migrations.length, + pagination: { + limit, + offset, + total, + hasMore: offset + migrations.length < total, + }, accuracy: accuracy.level, sourceDistribution: { total: accuracy.totalChangeRows, koiosHistory: accuracy.rowsWithKoiosHistory, currentProxy: accuracy.rowsWithCurrentProxy, unknown: accuracy.rowsUnknown, + malformed: accuracy.rowsMalformed, }, lastComputedAt: lastComputedAt ? lastComputedAt.toISOString() : null, }, diff --git a/src/controllers/snapshot/getChunk.ts b/src/controllers/snapshot/getChunk.ts index 9c4a8f4..15af42d 100644 --- a/src/controllers/snapshot/getChunk.ts +++ b/src/controllers/snapshot/getChunk.ts @@ -7,6 +7,8 @@ import { prisma } from "../../services"; import { sendCachedSnapshot } from "./sendCachedSnapshot"; import { formatAxiosLikeError } from "../../utils/format-http-client-error"; +// Strict integer-only — `parseInt("12abc", 10)` returns 12 silently which would +// let "/snapshot/chunks/12abc-49xyz" pass the chunk-boundary check below. const CHUNK_RANGE_RE = /^(\d+)-(\d+)$/; /** diff --git a/src/controllers/snapshot/getDreps.ts b/src/controllers/snapshot/getDreps.ts index c59d611..0fdd8c0 100644 --- a/src/controllers/snapshot/getDreps.ts +++ b/src/controllers/snapshot/getDreps.ts @@ -2,21 +2,17 @@ import { Request, Response } from "express"; import { snapshotService } from "../../services/snapshot.service"; import { sendCachedSnapshot } from "./sendCachedSnapshot"; import { formatAxiosLikeError } from "../../utils/format-http-client-error"; - -function parseIntOpt(value: unknown): number | undefined { - if (typeof value !== "string" || !value) return undefined; - const n = parseInt(value, 10); - return Number.isFinite(n) ? n : undefined; -} +import { parseIntegerQueryOpt } from "../../utils/query-params"; /** * GET /snapshot/dreps */ export const getSnapshotDreps = async (req: Request, res: Response) => { try { - const topN = parseIntOpt(req.query.topN); + const topNR = parseIntegerQueryOpt(req.query.topN, "topN", { min: 1, max: 10_000 }); + if (!topNR.ok) return res.status(topNR.status).json(topNR); const includeHistory = req.query.includeHistory === "true"; - const cached = await snapshotService.getDreps({ topN, includeHistory }); + const cached = await snapshotService.getDreps({ topN: topNR.value, includeHistory }); sendCachedSnapshot(req, res, cached); } catch (error) { console.error("Error fetching snapshot dreps", formatAxiosLikeError(error)); diff --git a/src/controllers/snapshot/getFull.ts b/src/controllers/snapshot/getFull.ts index 793bd88..4faf749 100644 --- a/src/controllers/snapshot/getFull.ts +++ b/src/controllers/snapshot/getFull.ts @@ -11,12 +11,7 @@ import { SnapshotVote, } from "../../responses"; import { formatAxiosLikeError } from "../../utils/format-http-client-error"; - -function parseIntOpt(value: unknown): number | undefined { - if (typeof value !== "string" || !value) return undefined; - const n = parseInt(value, 10); - return Number.isFinite(n) ? n : undefined; -} +import { parseIntegerQueryOpt } from "../../utils/query-params"; /** * GET /snapshot/full @@ -29,8 +24,14 @@ function parseIntOpt(value: unknown): number | undefined { */ export const getSnapshotFull = async (req: Request, res: Response) => { try { - const epochStart = parseIntOpt(req.query.epochStart); - const epochEnd = parseIntOpt(req.query.epochEnd); + const epochStartR = parseIntegerQueryOpt(req.query.epochStart, "epochStart", { min: 0 }); + if (!epochStartR.ok) return res.status(epochStartR.status).json(epochStartR); + const epochStart = epochStartR.value; + + const epochEndR = parseIntegerQueryOpt(req.query.epochEnd, "epochEnd", { min: 0 }); + if (!epochEndR.ok) return res.status(epochEndR.status).json(epochEndR); + const epochEnd = epochEndR.value; + const includeHistory = req.query.includeHistory === "true"; const [manifestCached, drepsCached] = await Promise.all([ diff --git a/src/controllers/snapshot/getManifest.ts b/src/controllers/snapshot/getManifest.ts index 2e16953..251f3e1 100644 --- a/src/controllers/snapshot/getManifest.ts +++ b/src/controllers/snapshot/getManifest.ts @@ -2,21 +2,22 @@ import { Request, Response } from "express"; import { snapshotService } from "../../services/snapshot.service"; import { sendCachedSnapshot } from "./sendCachedSnapshot"; import { formatAxiosLikeError } from "../../utils/format-http-client-error"; - -function parseIntOpt(value: unknown): number | undefined { - if (typeof value !== "string" || !value) return undefined; - const n = parseInt(value, 10); - return Number.isFinite(n) ? n : undefined; -} +import { parseIntegerQueryOpt } from "../../utils/query-params"; /** * GET /snapshot/manifest */ export const getSnapshotManifest = async (req: Request, res: Response) => { try { - const epochStart = parseIntOpt(req.query.epochStart); - const epochEnd = parseIntOpt(req.query.epochEnd); - const cached = await snapshotService.getManifest({ epochStart, epochEnd }); + const epochStartR = parseIntegerQueryOpt(req.query.epochStart, "epochStart", { min: 0 }); + if (!epochStartR.ok) return res.status(epochStartR.status).json(epochStartR); + const epochEndR = parseIntegerQueryOpt(req.query.epochEnd, "epochEnd", { min: 0 }); + if (!epochEndR.ok) return res.status(epochEndR.status).json(epochEndR); + + const cached = await snapshotService.getManifest({ + epochStart: epochStartR.value, + epochEnd: epochEndR.value, + }); sendCachedSnapshot(req, res, cached); } catch (error) { console.error("Error fetching snapshot manifest", formatAxiosLikeError(error)); diff --git a/src/index.ts b/src/index.ts index 50fc9cb..0ffe5d2 100644 --- a/src/index.ts +++ b/src/index.ts @@ -19,6 +19,7 @@ import epochsRouter from "./routes/epochs.route"; import actionsRouter from "./routes/actions.route"; import migrationsRouter from "./routes/migrations.route"; import snapshotRouter from "./routes/snapshot.route"; +import healthzRouter from "./routes/healthz.route"; import { apiKeyAuth } from "./middleware/auth.middleware"; import { requestLog } from "./middleware/request-log.middleware"; import { startAllJobs } from "./jobs"; @@ -61,6 +62,7 @@ if (fs.existsSync(swaggerPath)) { } // Public read-only endpoints intended for browser clients (drep-lens, etc.) +app.use("/healthz", healthzRouter); app.use("/epochs", epochsRouter); app.use("/actions", actionsRouter); app.use("/migrations", migrationsRouter); diff --git a/src/jobs/sync-epoch-totals.job.ts b/src/jobs/sync-epoch-totals.job.ts index c48bc53..0264075 100644 --- a/src/jobs/sync-epoch-totals.job.ts +++ b/src/jobs/sync-epoch-totals.job.ts @@ -41,6 +41,27 @@ export const startEpochTotalsSyncJob = () => ` Totals (current epoch=${result.currentEpoch}): upserted=${cur.upserted}, circulation=${cur.circulation?.toString() ?? "null"}, treasury=${cur.treasury?.toString() ?? "null"}, delegatedDrepPower=${cur.delegatedDrepPower?.toString() ?? "null"}, totalPoolVotePower=${cur.totalPoolVotePower?.toString() ?? "null"}` ); - return { itemsProcessed: result.skippedPrevious ? 1 : 2 }; + // Surface partial degradation so log aggregators can alert separately + // from "step failed entirely". Without these the structured result would + // report success even when a downstream hook silently broke. + if (result.denormRefreshError) { + console.error( + `[Epoch Totals] partial: drep-denorm refresh failed — ${result.denormRefreshError}` + ); + } + if (result.snapshotRebuildError) { + console.error( + `[Epoch Totals] partial: snapshot rebuild failed — ${result.snapshotRebuildError}` + ); + } + const partialFailure = + Boolean(result.denormRefreshError) || Boolean(result.snapshotRebuildError); + + return { + itemsProcessed: result.skippedPrevious ? 1 : 2, + // Mark the run as "partial" so SyncStatus.lastResult carries the signal + // upward to ops dashboards rather than reporting a clean success. + lockResult: partialFailure ? ("partial" as const) : undefined, + }; }, }); diff --git a/src/libs/sentinels.ts b/src/libs/sentinels.ts new file mode 100644 index 0000000..208589e --- /dev/null +++ b/src/libs/sentinels.ts @@ -0,0 +1,76 @@ +/** + * Cardano governance sentinel DRep identifiers — and branded types that make + * accidental misuse a compile error. + * + * The chain emits two protocol-defined "vote target" sentinels: + * - `drep_always_abstain` — stake delegated here always abstains. + * - `drep_always_no_confidence` — stake delegated here always votes "no". + * + * They are not real registered DReps; they appear in stake-delegation rows, + * Koios voting summaries, and aggregate tables but should usually be filtered + * out of "DRep listing" UIs and migration analyses. + * + * Branding strategy: + * `SentinelDrepId` — narrow union of the two sentinel literals. + * `NonSentinelDrepId` — opaque brand for "id that has been checked NOT to be a sentinel". + * Use `requireNonSentinelDrepId()` at the point where you've ruled + * sentinels out, then pass the branded value forward; downstream + * functions can declare the parameter as `NonSentinelDrepId` and + * the compiler refuses to accept a raw string. + */ + +declare const __nonSentinelDrepIdBrand: unique symbol; + +export const DREP_ALWAYS_ABSTAIN = "drep_always_abstain" as const; +export const DREP_ALWAYS_NO_CONFIDENCE = "drep_always_no_confidence" as const; + +/** All sentinel DRep ids — useful for `notIn` / `IN` filters. */ +export const SENTINEL_DREP_IDS: ReadonlyArray = [ + DREP_ALWAYS_ABSTAIN, + DREP_ALWAYS_NO_CONFIDENCE, +]; + +export type SentinelDrepId = + | typeof DREP_ALWAYS_ABSTAIN + | typeof DREP_ALWAYS_NO_CONFIDENCE; + +/** + * Branded "DRep id, definitely not a sentinel". Construct via + * `requireNonSentinelDrepId()` at the boundary where sentinels are filtered; + * downstream code declares `NonSentinelDrepId` parameters and the compiler + * rejects raw strings, catching a missed filter at type-check time. + */ +export type NonSentinelDrepId = string & { + readonly [__nonSentinelDrepIdBrand]: true; +}; + +export function isSentinelDrepId( + id: string | null | undefined +): id is SentinelDrepId { + return id === DREP_ALWAYS_ABSTAIN || id === DREP_ALWAYS_NO_CONFIDENCE; +} + +/** + * Narrow a raw string to `NonSentinelDrepId`. Returns null when the input is + * a sentinel; throw at the call site only where a non-sentinel is required. + */ +export function asNonSentinelDrepId( + id: string +): NonSentinelDrepId | null { + return isSentinelDrepId(id) ? null : (id as NonSentinelDrepId); +} + +/** + * Same as {@link asNonSentinelDrepId} but throws when the id is a sentinel. + * Use at call sites that have already documented "sentinel rejected" — the + * runtime check enforces what the type system claims. + */ +export function requireNonSentinelDrepId(id: string): NonSentinelDrepId { + const branded = asNonSentinelDrepId(id); + if (branded == null) { + throw new Error( + `requireNonSentinelDrepId: sentinel DRep id passed where a real DRep id was expected: ${id}` + ); + } + return branded; +} diff --git a/src/middleware/request-log.middleware.ts b/src/middleware/request-log.middleware.ts index 22e3179..101307a 100644 --- a/src/middleware/request-log.middleware.ts +++ b/src/middleware/request-log.middleware.ts @@ -28,11 +28,16 @@ export function requestLog(req: Request, res: Response, next: NextFunction): voi const bytesOut = res.getHeader("Content-Length") ?? "?"; const etag = (res.getHeader("ETag") as string | undefined) ?? null; const encoding = res.getHeader("Content-Encoding") ?? ""; - console.log( - `[req] ${req.method} ${req.originalUrl} ${res.statusCode} ${durationMs}ms ${bytesOut}B${ - encoding ? ` enc=${encoding}` : "" - }${etag ? ` etag=${etag}` : ""}` - ); + const message = `[req] ${req.method} ${req.originalUrl} ${res.statusCode} ${durationMs}ms ${bytesOut}B${ + encoding ? ` enc=${encoding}` : "" + }${etag ? ` etag=${etag}` : ""}`; + // 5xx responses ride out at error severity so log aggregators can alert + // separately from the high-volume 2xx/3xx traffic on the same paths. + if (res.statusCode >= 500) { + console.error(message); + } else { + console.log(message); + } }; res.on("finish", onFinish); res.on("close", onFinish); diff --git a/src/responses/drep.response.ts b/src/responses/drep.response.ts index 0bee8ad..4b8ef8a 100644 --- a/src/responses/drep.response.ts +++ b/src/responses/drep.response.ts @@ -1,7 +1,13 @@ /** - * DRep Dashboard Response Types + * DRep Dashboard Response Types. + * + * Convention: this set of authenticated endpoints serialises lovelace as + * `LovelaceString` (precision-preserving) and ADA as `AdaString` (6dp string). + * See ./lovelace.ts for the project-wide rationale. */ +import type { LovelaceString, AdaString } from "./lovelace"; + /** * DRep summary for listing */ @@ -9,10 +15,8 @@ export interface DRepSummary { drepId: string; name: string | null; iconUrl: string | null; - /** Voting power in lovelace (as string for BigInt serialization) */ - votingPower: string; - /** Voting power in ADA (converted from lovelace) */ - votingPowerAda: string; + votingPower: LovelaceString; + votingPowerAda: AdaString; /** Total number of votes cast by this DRep */ totalVotesCast: number; /** Number of delegators to this DRep */ diff --git a/src/responses/index.ts b/src/responses/index.ts index c01e6d9..80d26f9 100644 --- a/src/responses/index.ts +++ b/src/responses/index.ts @@ -1,3 +1,4 @@ +export * from "./lovelace"; export * from "./user.response"; export * from "./overview.response"; export * from "./proposal.response"; diff --git a/src/responses/lovelace.ts b/src/responses/lovelace.ts new file mode 100644 index 0000000..f35005e --- /dev/null +++ b/src/responses/lovelace.ts @@ -0,0 +1,68 @@ +/** + * Lovelace serialization conventions. + * + * Cardano stake amounts are integer lovelace (1 ADA = 1e6 lovelace, 1 kADA = 1e9 + * lovelace). Mainnet whale stakes routinely exceed `Number.MAX_SAFE_INTEGER` + * (2^53 ≈ 9 PADA), so naive `Number(bigint)` is lossy. Different endpoints in + * this API made different precision/ergonomics trade-offs; this file pins the + * contract so callers (and reviewers) can tell which is which from the type. + * + * The branding is purely TypeScript — over the wire each branded type + * serialises as `string` or `number`. The brand exists so internal code that + * accepts a "Lovelace amount" can refuse a raw `string`/`number` of the wrong + * shape at compile time, and so the response files self-document. + * + * ─── Conventions ─────────────────────────────────────────────────────── + * + * `LovelaceString` precision: exact wire: string used in: /migrations, /dreps (auth'd) + * `Kada` precision: float64 wire: number used in: /snapshot/dreps (`power`) + * `Ada` precision: float64 wire: number used in: /snapshot/chunks (`MIGRATIONS[].ada`) + * `AdaString` precision: 6dp str wire: string used in: /migrations (`ada`), /dreps (`votingPowerAda`) + * + * Drep-lens consumes the kADA/ADA numbers because it does its own clustering + * arithmetic in client-side JS where BigInt is awkward; the precision loss is + * accepted (visualisation, not accounting). Authenticated endpoints keep + * `LovelaceString` because callers there are typically off-line pipelines + * that need exact totals. + */ + +declare const __lovelaceStringBrand: unique symbol; +declare const __kadaBrand: unique symbol; +declare const __adaBrand: unique symbol; +declare const __adaStringBrand: unique symbol; + +/** Integer lovelace as a base-10 string. Precision-preserving for whale stakes. */ +export type LovelaceString = string & { readonly [__lovelaceStringBrand]: true }; + +/** Lovelace converted to kADA (× 1e-9) as a JS number. Lossy ≥ 9 PADA. */ +export type Kada = number & { readonly [__kadaBrand]: true }; + +/** Lovelace converted to ADA (× 1e-6) as a JS number. Lossy ≥ 9 EADA (basically never in practice). */ +export type Ada = number & { readonly [__adaBrand]: true }; + +/** Lovelace converted to ADA as a fixed-6dp string ("123.456789"). Lossy at the 7th decimal. */ +export type AdaString = string & { readonly [__adaStringBrand]: true }; + +export function toLovelaceString(value: bigint): LovelaceString { + return value.toString() as LovelaceString; +} + +export function toKada(lovelace: bigint): Kada { + return (Number(lovelace) / 1_000_000_000) as Kada; +} + +export function toAda(lovelace: bigint): Ada { + return (Number(lovelace) / 1_000_000) as Ada; +} + +export function toAdaString(lovelace: bigint): AdaString { + // Compute the 6-decimal ADA string directly from the BigInt — going through + // `Number(lovelace) / 1e6` rounds for amounts above 2^53 (whale stakes), which + // would silently lose precision before `.toFixed(6)`. + const negative = lovelace < 0n; + const abs = negative ? -lovelace : lovelace; + const integerPart = abs / 1_000_000n; + const fractionalPart = abs % 1_000_000n; + const sixDp = fractionalPart.toString().padStart(6, "0"); + return ((negative ? "-" : "") + integerPart.toString() + "." + sixDp) as AdaString; +} diff --git a/src/responses/migrations.response.ts b/src/responses/migrations.response.ts index 715edbc..1f0e5e7 100644 --- a/src/responses/migrations.response.ts +++ b/src/responses/migrations.response.ts @@ -1,8 +1,14 @@ /** * Migration response types — per-epoch DRep migrations aggregated from * the StakeDelegationChange changelog. + * + * Convention: this endpoint is consumed primarily by off-line pipelines that + * need exact totals, so lovelace is serialised as `LovelaceString` (precision + * preserving) and ADA as `AdaString` (6dp string). See ./lovelace.ts. */ +import type { LovelaceString, AdaString } from "./lovelace"; + export interface MigrationRow { /** Delegation switch epoch (delegated_epoch_no) */ epoch: number; @@ -10,10 +16,10 @@ export interface MigrationRow { fromDrepId: string; /** Target DRep bech32 id */ toDrepId: string; - /** Lovelace moved (BigInt as string) — sum of amount_at_switch (or current-proxy fallback) */ - lovelace: string; - /** Lovelace converted to ADA, rounded to 6 dp */ - ada: string; + /** Lovelace moved — sum of amount_at_switch (or current-proxy fallback). */ + lovelace: LovelaceString; + /** Lovelace converted to ADA, rounded to 6 dp. */ + ada: AdaString; /** Distinct stake addresses moved between (fromDrepId, toDrepId) in this epoch */ delegators: number; } @@ -23,8 +29,18 @@ export interface GetMigrationsResponse { meta: { epochStart: number; epochEnd: number; - /** Aggregated row count returned */ + /** Aggregated row count returned in this page */ rowCount: number; + /** + * Pagination state. `total` is the count of rows matching the filter + * BEFORE limit/offset, so callers can compute total pages. + */ + pagination: { + limit: number; + offset: number; + total: number; + hasMore: boolean; + }; /** * Provenance of `lovelace`: * - "koios-history": ≥99% of changelog rows have `amount_at_switch` from Koios /account_history. @@ -38,6 +54,13 @@ export interface GetMigrationsResponse { koiosHistory: number; currentProxy: number; unknown: number; + /** + * Rows whose Koios /account_history value couldn't be validated as a + * non-negative integer. Excluded from automatic retries to avoid pinning + * the queue head; an operator can clear the tag and re-run the backfill + * once the upstream source is fixed. + */ + malformed: number; }; /** Most recent computedAt for any row in [epochStart, epochEnd], if any */ lastComputedAt: string | null; diff --git a/src/responses/snapshot.response.ts b/src/responses/snapshot.response.ts index fd461c7..db09053 100644 --- a/src/responses/snapshot.response.ts +++ b/src/responses/snapshot.response.ts @@ -4,25 +4,66 @@ * IMPORTANT: field names match drep-lens's RawDataset / DRep / GovAction shape * (snake_case / camelCase mix is intentional — this is the bridge layer): * - DRep.id (NOT drepId) - * - power (kADA = lovelace / 1e9) + * - power (kADA = lovelace / 1e9, JS number — see ./lovelace.ts) * - delegators (NOT delegatorCount) * - vote enum is lowercase: "yes" | "no" | "abstain" * - GovAction.type is the display label, NOT the DB enum + * + * Convention: this endpoint deliberately serialises lovelace as `Kada` / + * `Ada` (lossy JS numbers) because drep-lens does its clustering arithmetic + * in client-side JS where BigInt is awkward. The /migrations and /dreps + * endpoints keep the precision-preserving `LovelaceString` form. See + * ./lovelace.ts for the project-wide rationale. */ +import type { Kada, Ada } from "./lovelace"; + export type SnapshotVote = "yes" | "no" | "abstain"; -export interface SnapshotManifestChunk { +/** + * Chunk finality state. Exposed to consumers as two parallel booleans so + * existing JSON readers don't need to change, but constructed via the + * `chunkFinality()` helper so the illegal `{isFinal:false, isStable:true}` + * combination is unrepresentable in the producer code path. + * + * Invariant: `isStable` ⇒ `isFinal`. Encoded via a discriminated union over + * a `state` discriminator that the wire format strips. + */ +export type ChunkFinality = + | { state: "live"; isFinal: false; isStable: false } + | { state: "final-unstable"; isFinal: true; isStable: false } + | { state: "stable"; isFinal: true; isStable: true }; + +/** + * Build a {@link ChunkFinality} value from the underlying signals. Centralised + * so every producer (composeChunk, composeManifest, snapshot-builder) gets the + * same truth table. + * + * isFinal := chunkEnd < currentEpoch + * isStable := isFinal AND allAmountsHistorical AND fullScanCompleted + */ +export function chunkFinality(input: { + isFinal: boolean; + allAmountsHistorical: boolean; + fullScanCompleted: boolean; +}): ChunkFinality { + if (!input.isFinal) { + return { state: "live", isFinal: false, isStable: false }; + } + if (input.allAmountsHistorical && input.fullScanCompleted) { + return { state: "stable", isFinal: true, isStable: true }; + } + return { state: "final-unstable", isFinal: true, isStable: false }; +} + +export type SnapshotManifestChunk = { startEpoch: number; endEpoch: number; url: string; - isFinal: boolean; - /** Final + amounts fully backfilled — safe for long CDN immutable cache */ - isStable: boolean; actionCount: number; voteCount: number; migrationCount: number; -} +} & ChunkFinality; export interface SnapshotManifest { schemaVersion: "v1"; @@ -39,13 +80,15 @@ export interface SnapshotDrep { id: string; name: string; handle: string; - power: number; + /** kADA (= lovelace / 1e9). Lossy ≥ 9 PADA — see ./lovelace.ts. */ + power: Kada; delegators: number; + /** Fraction in [0, 1], 4-decimal precision. */ participation: number; joined: number; cluster: number; iconUrl: string | null; - powerSeries?: Array<{ epoch: number; power: number; delegators: number }>; + powerSeries?: Array<{ epoch: number; power: Kada; delegators: number }>; } export interface SnapshotDreps { @@ -71,26 +114,27 @@ export interface SnapshotChunkMigration { fromCluster: number; /** Always 0 — drep-lens analysis remaps clusters per-pass; this is the fallback */ toCluster: number; - ada: number; + /** ADA (= lovelace / 1e6). Lossy ≥ 9 EADA — see ./lovelace.ts. */ + ada: Ada; delegators: number; } -export interface SnapshotChunk { +/** + * Cross product of static fields + the {@link ChunkFinality} discriminated + * union. Producers MUST go through `chunkFinality()` so the illegal + * `{isFinal:false, isStable:true}` combination cannot be constructed. + * + * On the wire each variant serialises as the same `{...isFinal, isStable, state}` + * shape; the optional `state` discriminator is informational and the boolean + * pair stays as the source of truth for legacy clients (drep-lens etc.). + */ +export type SnapshotChunk = { schemaVersion: "v1"; generatedAt: string; epochStart: number; epochEnd: number; - /** Chunk's epoch range is in the past (no new proposals/votes/migrations) */ - isFinal: boolean; - /** - * Additionally, every migration row in this chunk has `amount_source = "koios-history"`. - * Only when both `isFinal && isStable` is the chunk safe to serve with - * `Cache-Control: immutable, max-age=30d`. Otherwise migration lovelace values - * may still change as the amount-at-switch backfill drains. - */ - isStable: boolean; ACTIONS: SnapshotChunkAction[]; /** Sparse: outer key drep_id, inner key gov_action_id */ votes: Record>; MIGRATIONS: SnapshotChunkMigration[]; -} +} & ChunkFinality; diff --git a/src/routes/healthz.route.ts b/src/routes/healthz.route.ts new file mode 100644 index 0000000..442b7c8 --- /dev/null +++ b/src/routes/healthz.route.ts @@ -0,0 +1,22 @@ +import { Router } from "express"; +import * as healthzController from "../controllers/healthz"; + +const router = Router(); + +/** + * @openapi + * /healthz: + * get: + * summary: Liveness + snapshot boot readiness probe + * description: | + * Returns 200 with `status: "ok"`/`"starting"`/`"skipped"` while the API + * is healthy. Returns 503 with `status: "degraded"` when snapshot boot + * recovery has failed and `/snapshot/*` may be empty or stale. Public, + * unauthenticated; safe for Cloud Run / K8s readiness probes. + * responses: + * 200: { description: API is up; snapshot boot recovery progressed normally. } + * 503: { description: API is up but snapshot is degraded. } + */ +router.get("/", healthzController.getHealthz); + +export default router; diff --git a/src/scripts/backfill-amount-at-switch.ts b/src/scripts/backfill-amount-at-switch.ts index 1248dc4..0daf2e5 100644 --- a/src/scripts/backfill-amount-at-switch.ts +++ b/src/scripts/backfill-amount-at-switch.ts @@ -11,8 +11,10 @@ * - Koios pressure budget tolerates ~1000 calls (full mainnet drain ~30 min). * * Environment (optional): - * - AMOUNT_AT_SWITCH_BATCH_ROWS=2000 // rows per pass (default 2000) - * - AMOUNT_AT_SWITCH_MAX_PASSES=200 // safety cap (default 200; ~400k rows) + * - AMOUNT_AT_SWITCH_BATCH_ROWS=2000 // rows per pass (default 2000) + * - AMOUNT_AT_SWITCH_MAX_PASSES=200 // safety cap (default 200; ~400k rows) + * - AMOUNT_AT_SWITCH_PASS_RETRIES=4 // per-pass retries before giving up (default 4) + * - AMOUNT_AT_SWITCH_BACKOFF_BASE_MS=1500 // base backoff (default 1500ms; doubles each retry, capped 60s) * * Usage: * npx ts-node src/scripts/backfill-amount-at-switch.ts @@ -21,7 +23,10 @@ import "dotenv/config"; import { formatAxiosLikeError } from "../utils/format-http-client-error"; import { prisma } from "../services/prisma"; -import { backfillAmountAtSwitch } from "../services/ingestion/migration-amount-backfill.service"; +import { + backfillAmountAtSwitch, + type MigrationAmountBackfillResult, +} from "../services/ingestion/migration-amount-backfill.service"; import { refreshMigrationAggregate } from "../services/ingestion/migration-aggregate.service"; function parseIntEnv(name: string, fallback: number): number { @@ -31,33 +36,77 @@ function parseIntEnv(name: string, fallback: number): number { return Number.isNaN(n) ? fallback : n; } +async function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +/** + * Run one backfill pass with bounded retry + exponential backoff. A single + * Koios 5xx or timeout used to abort the whole drain (and the trailing + * MigrationAggregate refresh + cache flush never ran). Now: per-pass retries + * absorb transient failures; only a sustained outage propagates. + */ +async function runPassWithRetry( + passIndex: number, + batchSize: number, + maxRetries: number, + baseBackoffMs: number +): Promise { + let attempt = 0; + // Inclusive: attempt 0 is the initial try, 1..maxRetries are retries. + while (true) { + try { + return await backfillAmountAtSwitch({ + maxRows: batchSize, + source: "scripts.backfill-amount-at-switch", + }); + } catch (e) { + attempt += 1; + if (attempt > maxRetries) { + console.error( + `[backfill-amount-at-switch] pass ${passIndex + 1} exhausted ${maxRetries} retries — propagating`, + formatAxiosLikeError(e) + ); + throw e; + } + const sleepMs = Math.min(baseBackoffMs * 2 ** (attempt - 1), 60_000); + console.warn( + `[backfill-amount-at-switch] pass ${passIndex + 1} attempt ${attempt}/${maxRetries} failed — retrying in ${sleepMs}ms`, + formatAxiosLikeError(e) + ); + await sleep(sleepMs); + } + } +} + async function main() { const batchSize = parseIntEnv("AMOUNT_AT_SWITCH_BATCH_ROWS", 2000); const maxPasses = parseIntEnv("AMOUNT_AT_SWITCH_MAX_PASSES", 200); + const passRetries = parseIntEnv("AMOUNT_AT_SWITCH_PASS_RETRIES", 4); + const backoffBaseMs = parseIntEnv("AMOUNT_AT_SWITCH_BACKOFF_BASE_MS", 1500); console.log( - `[backfill-amount-at-switch] starting — batchSize=${batchSize} maxPasses=${maxPasses}` + `[backfill-amount-at-switch] starting — batchSize=${batchSize} maxPasses=${maxPasses} passRetries=${passRetries} backoffBaseMs=${backoffBaseMs}` ); let totalScanned = 0; let totalUpdated = 0; let totalUnknown = 0; + let totalMalformed = 0; let totalEpochs = 0; let pass = 0; for (pass = 0; pass < maxPasses; pass++) { - const result = await backfillAmountAtSwitch({ - maxRows: batchSize, - source: "scripts.backfill-amount-at-switch", - }); + const result = await runPassWithRetry(pass, batchSize, passRetries, backoffBaseMs); totalScanned += result.rowsScanned; totalUpdated += result.rowsUpdated; totalUnknown += result.rowsUnknown; + totalMalformed += result.rowsMalformed; totalEpochs += result.epochsProcessed; console.log( - `[backfill-amount-at-switch] pass ${pass + 1}: scanned=${result.rowsScanned} updated=${result.rowsUpdated} unknown=${result.rowsUnknown} epochs=${result.epochsProcessed} span=${ + `[backfill-amount-at-switch] pass ${pass + 1}: scanned=${result.rowsScanned} updated=${result.rowsUpdated} unknown=${result.rowsUnknown} malformed=${result.rowsMalformed} epochs=${result.epochsProcessed} span=${ result.epochSpan ? `[${result.epochSpan.min}, ${result.epochSpan.max}]` : "none" } in ${result.durationMs}ms` ); @@ -68,8 +117,18 @@ async function main() { } } + // Distinguish "drained the queue" from "hit the safety cap with rows remaining". + // We still run the trailing refresh + cache flush so the partial work is + // observable through /migrations + /snapshot/*; the operator can re-run the + // script (or raise AMOUNT_AT_SWITCH_MAX_PASSES) to drain the rest. + if (pass >= maxPasses) { + console.warn( + `[backfill-amount-at-switch] reached maxPasses=${maxPasses} with rows still pending — re-run or raise AMOUNT_AT_SWITCH_MAX_PASSES to drain the rest` + ); + } + console.log( - `[backfill-amount-at-switch] FINISHED in ${pass} pass(es): scanned=${totalScanned} updated=${totalUpdated} unknown=${totalUnknown} epochs=${totalEpochs}` + `[backfill-amount-at-switch] FINISHED in ${pass} pass(es): scanned=${totalScanned} updated=${totalUpdated} unknown=${totalUnknown} malformed=${totalMalformed} epochs=${totalEpochs}` ); console.log("[backfill-amount-at-switch] refreshing MigrationAggregate so it picks up the new amounts..."); diff --git a/src/services/governanceProvider.ts b/src/services/governanceProvider.ts index fbd2b11..d72ae64 100644 --- a/src/services/governanceProvider.ts +++ b/src/services/governanceProvider.ts @@ -308,38 +308,76 @@ export async function getProposalVotingSummary( return summaries?.[0] ?? null; } +/** + * PostgREST/Koios returns 400 when an endpoint doesn't accept a particular + * filter shape — the legitimate case `getEpochScopedFirstRow` is built to + * fall back from. Anything else (5xx, network/timeout, parse) is a real + * failure and must propagate so callers don't treat it as "no data". + */ +function isPostgrestFilterRejection(error: unknown): boolean { + const status = (error as { response?: { status?: number } })?.response?.status; + return status === 400; +} + async function getEpochScopedFirstRow( endpoint: string, epochNo: number, options?: GovernanceProviderOptions ): Promise { - const attempts: Array<() => Promise> = [ - () => - koiosGet( - endpoint, - { _epoch_no: epochNo }, - toKoiosContext(options) - ), - () => - koiosGet( - endpoint, - { epoch_no: `eq.${epochNo}` }, - toKoiosContext(options) - ), + const attempts: Array<{ label: string; fn: () => Promise }> = [ + { + label: "_epoch_no", + fn: () => + koiosGet( + endpoint, + { _epoch_no: epochNo }, + toKoiosContext(options) + ), + }, + { + label: "epoch_no=eq.", + fn: () => + koiosGet( + endpoint, + { epoch_no: `eq.${epochNo}` }, + toKoiosContext(options) + ), + }, ]; + let rejected = false; for (const attempt of attempts) { try { - const rows = await attempt(); + const rows = await attempt.fn(); const row = rows?.find((entry) => entry?.epoch_no === epochNo) ?? rows?.[0]; if (row?.epoch_no === epochNo) { return row; } - } catch { - // Try the next filtering style when Koios rejects one form. + // Endpoint accepted the filter shape but returned no matching row. + // Treat as legitimate "no data" without trying the alternate shape. + return null; + } catch (e) { + if (isPostgrestFilterRejection(e)) { + rejected = true; + console.warn( + `[governanceProvider] ${endpoint} rejected filter style "${attempt.label}" — trying next form` + ); + continue; + } + // Real failure (5xx, timeout, network, parse): propagate so the caller + // doesn't silently persist NULL fields as if Koios said "no data". + throw e; } } + // Both shapes were rejected by PostgREST — surface that distinctly so the + // operator notices the endpoint contract has shifted, rather than silently + // returning null forever. + if (rejected) { + throw new Error( + `getEpochScopedFirstRow: all filter shapes rejected by ${endpoint} for epoch ${epochNo}` + ); + } return null; } @@ -644,7 +682,16 @@ export async function getCurrentEpochFromKoios( options?: GovernanceProviderOptions ): Promise { const tip = await koiosGet("/tip", undefined, toKoiosContext(options)); - return tip?.[0]?.epoch_no ?? 0; + // A missing tip is a Koios failure (outage / contract drift), NOT epoch 0. + // Throw rather than disguise an outage as a fresh chain — `bootRecover` and + // similar callers gate on `currentEpoch === 0` and would silently no-op. + const epochNo = tip?.[0]?.epoch_no; + if (typeof epochNo !== "number" || !Number.isFinite(epochNo)) { + throw new Error( + `getCurrentEpochFromKoios: /tip returned no epoch_no (response=${JSON.stringify(tip)})` + ); + } + return epochNo; } function buildAccountInfoRequestBody( diff --git a/src/services/ingestion/drep-delegator-sync-run.ts b/src/services/ingestion/drep-delegator-sync-run.ts index 74509b2..cadc73b 100644 --- a/src/services/ingestion/drep-delegator-sync-run.ts +++ b/src/services/ingestion/drep-delegator-sync-run.ts @@ -32,12 +32,7 @@ function sumProcessed(r: SyncDrepDelegationChangesResult): number { export async function runDrepDelegatorSyncWithDailyRetry( db: PrismaClient ): Promise { - let r1: SyncDrepDelegationChangesResult; - try { - r1 = await syncDrepDelegationChanges(db); - } catch (e) { - throw e; - } + const r1 = await syncDrepDelegationChanges(db); if (r1.skipped) { return { kind: "skipped", result: r1 }; @@ -46,15 +41,9 @@ export async function runDrepDelegatorSyncWithDailyRetry( let last = r1; let completedSyncCalls = 1; - const allowSecondPass = r1.failed.length > 0; - - try { - if (allowSecondPass) { - completedSyncCalls = 2; - last = await syncDrepDelegationChanges(db); - } - } catch (e) { - throw e; + if (r1.failed.length > 0) { + completedSyncCalls = 2; + last = await syncDrepDelegationChanges(db); } const finalFullSuccess = last.failed.length === 0; diff --git a/src/services/ingestion/drep-sync.service.ts b/src/services/ingestion/drep-sync.service.ts index 14cd533..1fb6c63 100644 --- a/src/services/ingestion/drep-sync.service.ts +++ b/src/services/ingestion/drep-sync.service.ts @@ -172,8 +172,10 @@ async function fetchDrepMetadata(drepId: string): Promise { // Public API // ============================================================ -/** Built-in vote-delegation targets from Koios `account_info.delegated_drep` (not fetched via /drep_delegators in this job). */ +/** Built-in vote-delegation targets from Koios `account_info.delegated_drep` (not fetched via /drep_delegators in this job). Sourced from the centralised sentinel module. */ export const CARDANO_ALWAYS_DELEGATION_DREP_IDS = [ + // Imported indirectly so this module's existing public API stays a const tuple + // (callers that destructure expect a specific tuple shape). "drep_always_abstain", "drep_always_no_confidence", ] as const; diff --git a/src/services/ingestion/epoch-analytics.service.ts b/src/services/ingestion/epoch-analytics.service.ts index ce1a825..042ae3a 100644 --- a/src/services/ingestion/epoch-analytics.service.ts +++ b/src/services/ingestion/epoch-analytics.service.ts @@ -128,6 +128,17 @@ export interface StepEpochTotalsResult { previousEpochTotals?: SyncEpochTotalsResult; currentEpochTotals: SyncEpochTotalsResult; skippedPrevious: boolean; + /** + * Best-effort downstream hooks that ran after the totals sync. Each is + * present only when the corresponding hook failed; absent fields mean the + * hook either succeeded or was skipped (e.g. snapshot rebuild contention). + * Surfaced so monitoring sees partial degradation rather than treating the + * step's success status as a green light for everything downstream. + */ + denormRefreshError?: string; + snapshotRebuildError?: string; + /** True when the snapshot rebuild was skipped due to lock contention (not a failure). */ + snapshotRebuildSkipped?: boolean; } export interface StepDrepLifecycleResult { @@ -527,19 +538,26 @@ export async function syncEpochTotalsStep( } } catch (e) { console.error("[drep-denorm] refresh failed", e); + result.denormRefreshError = e instanceof Error ? e.message : String(e); } // Snapshot rebuild — best-effort, never fail the totals sync for a snapshot hiccup. // Triggers on every successful run (cheap when nothing changed: dreps + current chunk only). try { const rebuild = await rebuildAfterEpoch(currentEpoch); - if (rebuild.finalisedChunk) { + if (rebuild.skipped) { + result.snapshotRebuildSkipped = true; + console.log( + "[snapshot-builder] rebuildAfterEpoch skipped this tick — another replica holds the lock" + ); + } else if (rebuild.finalisedChunk) { console.log( `[snapshot-builder] finalised chunk ${rebuild.finalisedChunk.start}-${rebuild.finalisedChunk.end} (${rebuild.finalisedChunk.byteSize}B); current epoch ${rebuild.currentEpoch}; total ${rebuild.durationMs}ms` ); } } catch (e) { console.error("[snapshot-builder] rebuildAfterEpoch failed", e); + result.snapshotRebuildError = e instanceof Error ? e.message : String(e); } return result; diff --git a/src/services/ingestion/inactiveDrepPower.service.ts b/src/services/ingestion/inactiveDrepPower.service.ts index ad65318..89fbb44 100644 --- a/src/services/ingestion/inactiveDrepPower.service.ts +++ b/src/services/ingestion/inactiveDrepPower.service.ts @@ -5,6 +5,7 @@ import { getDrepUpdates, listDrepVotingPowerHistory, } from "../governanceProvider"; +import { SENTINEL_DREP_IDS } from "../../libs/sentinels"; type InactivePowerMode = "active" | "completed"; @@ -33,7 +34,7 @@ export const DREP_INACTIVITY_START_EPOCH = 527; // Special predefined voting options are tracked in voting summaries, but they are // not real DRep identities and should never count as inactive DReps. -const SPECIAL_DREP_IDS = ["drep_always_abstain", "drep_always_no_confidence"]; +const SPECIAL_DREP_IDS: readonly string[] = SENTINEL_DREP_IDS; const inactivePowerProcessCache = new Map(); diff --git a/src/services/ingestion/migration-aggregate.service.ts b/src/services/ingestion/migration-aggregate.service.ts index 606fe66..e1523c9 100644 --- a/src/services/ingestion/migration-aggregate.service.ts +++ b/src/services/ingestion/migration-aggregate.service.ts @@ -18,6 +18,10 @@ import type { PrismaClient } from "@prisma/client"; import { prisma as defaultPrisma } from "../prisma"; import { withDbRead, withDbWrite } from "../prisma"; +import { + DREP_ALWAYS_ABSTAIN, + DREP_ALWAYS_NO_CONFIDENCE, +} from "../../libs/sentinels"; export interface MigrationAggregateRefreshResult { rowsWritten: number; @@ -32,6 +36,8 @@ export interface MigrationAggregateAccuracy { rowsWithKoiosHistory: number; rowsWithCurrentProxy: number; rowsUnknown: number; + /** Rows that hit a Koios parse failure on a previous backfill pass. amount_at_switch is NULL → MigrationAggregate falls back to current proxy until the next pass retries. */ + rowsMalformed: number; } /** @@ -113,8 +119,6 @@ export async function getMigrationAggregateAccuracy( const POSTGRES_INT_MAX = 2_147_483_647; const epochStart = filter.epochStart ?? POSTGRES_INT_MIN; const epochEnd = filter.epochEnd ?? POSTGRES_INT_MAX; - const ALWAYS_ABSTAIN = "drep_always_abstain"; - const ALWAYS_NO_CONFIDENCE = "drep_always_no_confidence"; const excludeSentinels = filter.excludeSentinels !== false; const fromFilter = filter.fromDrepId ?? null; const toFilter = filter.toDrepId ?? null; @@ -136,9 +140,9 @@ export async function getMigrationAggregateAccuracy( wheres.push(`"delegated_epoch_no" <= $${params.length}`); if (excludeSentinels) { - params.push(ALWAYS_ABSTAIN, ALWAYS_NO_CONFIDENCE); + params.push(DREP_ALWAYS_ABSTAIN, DREP_ALWAYS_NO_CONFIDENCE); wheres.push(`"from_drep_id" NOT IN ($${params.length - 1}, $${params.length})`); - params.push(ALWAYS_ABSTAIN, ALWAYS_NO_CONFIDENCE); + params.push(DREP_ALWAYS_ABSTAIN, DREP_ALWAYS_NO_CONFIDENCE); wheres.push(`"to_drep_id" NOT IN ($${params.length - 1}, $${params.length})`); } if (fromFilter) { @@ -174,12 +178,14 @@ export async function getMigrationAggregateAccuracy( let koios = 0; let proxy = 0; let unknown = 0; + let malformed = 0; for (const r of rows) { const n = Number(r.n); total += n; if (r.amount_source === "koios-history") koios += n; else if (r.amount_source === "current-proxy") proxy += n; else if (r.amount_source === "unknown") unknown += n; + else if (r.amount_source === "koios-malformed") malformed += n; else proxy += n; } @@ -195,6 +201,7 @@ export async function getMigrationAggregateAccuracy( rowsWithKoiosHistory: koios, rowsWithCurrentProxy: proxy, rowsUnknown: unknown, + rowsMalformed: malformed, }; } diff --git a/src/services/ingestion/migration-amount-backfill.service.ts b/src/services/ingestion/migration-amount-backfill.service.ts index fa56f6c..84e11c8 100644 --- a/src/services/ingestion/migration-amount-backfill.service.ts +++ b/src/services/ingestion/migration-amount-backfill.service.ts @@ -4,9 +4,16 @@ * weight, not the current-state proxy via stake_delegation_state.amount). * * Idempotent: only operates on rows where amount_at_switch IS NULL. - * Rows that come back without a /account_history entry are stamped - * amount_source='unknown' (active_stake was zero or stake was unbonded - * at that epoch start) — they contribute 0 lovelace to MigrationAggregate. + * + * Outcomes per (stake_address, epoch): + * - "koios-history" + amountAtSwitch=value — Koios returned a parsed integer. + * - "unknown" + amountAtSwitch=0n — Koios returned no entry (active_stake + * was zero / unbonded at that epoch start). Stable; not retried. + * - "koios-malformed" + amountAtSwitch=NULL — Koios returned a value we couldn't + * validate as a non-negative integer. Tagged for observability; the + * {addr, epoch, raw} tuple is logged. Excluded from subsequent candidate + * queries by default so it doesn't pin the queue head; operators can clear + * the tag and re-run with `excludeMalformed=false` to retry. * * Steady-state: small batch every cron tick fills new rows as they land. * One-off: run scripts/backfill-amount-at-switch.ts to drain the backlog. @@ -20,6 +27,7 @@ export interface MigrationAmountBackfillResult { rowsScanned: number; rowsUpdated: number; rowsUnknown: number; + rowsMalformed: number; epochsProcessed: number; epochSpan: { min: number; max: number } | null; } @@ -31,6 +39,31 @@ export interface MigrationAmountBackfillOptions { batchSize?: number; /** Source tag for Koios telemetry / pressure-guard logs. */ source?: string; + /** + * When true (default) the candidate query skips rows already tagged + * `koios-malformed`, so chronic-malformed rows can't pin the head of the + * `amount_at_switch IS NULL` queue and stall progress on later rows. + * Operator path: clear `amount_source='koios-malformed'` via SQL then re-run + * with this flag set to false to retry them. + */ + excludeMalformed?: boolean; +} + +/** + * Koios `/account_history.active_stake` is documented as a non-negative integer + * lovelace amount serialized as a *string*. JavaScript's `BigInt(s)` is permissive + * — `BigInt("")` returns `0n`, `BigInt("0x1f")` returns `31n`, `BigInt("-1")` + * returns `-1n` — so we validate the shape before parsing. + * + * Numbers are rejected: mainnet whale stakes routinely exceed `Number.MAX_SAFE_INTEGER`, + * so a `number` here would already have been rounded by the JSON parser and is no + * longer safe to widen to BigInt. + */ +function parseActiveStake(raw: unknown): bigint | null { + if (typeof raw !== "string") return null; + const trimmed = raw.trim(); + if (!/^\d+$/.test(trimmed)) return null; + return BigInt(trimmed); } /** @@ -44,12 +77,16 @@ export async function backfillAmountAtSwitch( const startedAt = Date.now(); const maxRows = opts.maxRows ?? 1000; const source = opts.source ?? "ingestion.migration-amount-backfill"; + const excludeMalformed = opts.excludeMalformed !== false; const candidates = await prisma.stakeDelegationChange.findMany({ where: { amountAtSwitch: null, delegatedEpoch: { not: -1 }, stakeAddress: { not: "" }, + ...(excludeMalformed + ? { OR: [{ amountSource: null }, { amountSource: { not: "koios-malformed" } }] } + : {}), }, orderBy: [{ delegatedEpoch: "asc" }, { id: "asc" }], take: maxRows, @@ -62,6 +99,7 @@ export async function backfillAmountAtSwitch( rowsScanned: 0, rowsUpdated: 0, rowsUnknown: 0, + rowsMalformed: 0, epochsProcessed: 0, epochSpan: null, }; @@ -76,6 +114,7 @@ export async function backfillAmountAtSwitch( let rowsUpdated = 0; let rowsUnknown = 0; + let rowsMalformed = 0; const epochs = [...byEpoch.keys()].sort((a, b) => a - b); for (const epoch of epochs) { @@ -89,34 +128,87 @@ export async function backfillAmountAtSwitch( }); const amountByAddr = new Map(); + const malformedByAddr = new Map(); for (const e of entries) { for (const h of e.history ?? []) { if (h.epoch_no !== epoch) continue; - try { - amountByAddr.set(e.stake_address, BigInt(h.active_stake)); - } catch { - // Skip malformed rows + const parsed = parseActiveStake(h.active_stake); + if (parsed != null) { + amountByAddr.set(e.stake_address, parsed); + } else { + malformedByAddr.set(e.stake_address, h.active_stake); } break; // one record per stake/epoch when filtered } } - // Bulk update — one update per row. Cheap when ~50 addrs/epoch. - // Unknown rows get `amount_at_switch = 0` (Koios reports no active_stake at - // epoch start → contributed nothing) so they don't re-appear on the next - // pass's `WHERE amount_at_switch IS NULL` candidate query. + // Bucket rows per outcome so we can flush each bucket as a single SQL + // statement instead of paying a round-trip per row (was 200 sequential + // updates per cron tick). + // - history: per-row amount differs → one UPDATE...FROM(VALUES) join. + // - unknown: same data for every row → one updateMany. + // - malformed: same data for every row → one updateMany. The (addr, epoch, raw) + // tuple is logged BEFORE the batch flush so ops still gets per-row + // diagnostics. + const historyBucket: Array<{ id: number; amount: bigint }> = []; + const unknownIds: number[] = []; + const malformedIds: number[] = []; + for (const stake of stakes) { const amt = amountByAddr.get(stake.stakeAddress); - const ok = amt != null; - await prisma.stakeDelegationChange.update({ - where: { id: stake.id }, - data: { - amountAtSwitch: ok ? amt! : 0n, - amountSource: ok ? "koios-history" : "unknown", - }, + if (amt != null) { + historyBucket.push({ id: stake.id, amount: amt }); + continue; + } + if (malformedByAddr.has(stake.stakeAddress)) { + const raw = malformedByAddr.get(stake.stakeAddress); + console.warn( + `[migration-amount-backfill] malformed active_stake stake=${stake.stakeAddress} epoch=${epoch} raw=${JSON.stringify(raw)}` + ); + malformedIds.push(stake.id); + continue; + } + unknownIds.push(stake.id); + } + + if (historyBucket.length > 0) { + // Build one positional-bound UPDATE...FROM(VALUES) statement. A CASE WHEN + // would also work but blows up the SQL text past Postgres' parser limits + // for ~1000+ row batches. The VALUES form scales linearly with row count. + const params: Array = []; + const valuesSql = historyBucket + .map((b) => { + params.push(b.id, b.amount.toString()); + // Postgres parses the second placeholder as text; the cast below + // turns it back into bigint inside the join row source. + return `($${params.length - 1}::int, $${params.length}::bigint)`; + }) + .join(", "); + await prisma.$executeRawUnsafe( + `UPDATE "stake_delegation_change" AS s + SET "amount_at_switch" = u.amt, + "amount_source" = 'koios-history' + FROM (VALUES ${valuesSql}) AS u(id, amt) + WHERE s."id" = u.id`, + ...params + ); + rowsUpdated += historyBucket.length; + } + + if (unknownIds.length > 0) { + await prisma.stakeDelegationChange.updateMany({ + where: { id: { in: unknownIds } }, + data: { amountAtSwitch: 0n, amountSource: "unknown" }, + }); + rowsUnknown += unknownIds.length; + } + + if (malformedIds.length > 0) { + await prisma.stakeDelegationChange.updateMany({ + where: { id: { in: malformedIds } }, + data: { amountSource: "koios-malformed" }, }); - if (ok) rowsUpdated++; - else rowsUnknown++; + rowsMalformed += malformedIds.length; } } @@ -125,6 +217,7 @@ export async function backfillAmountAtSwitch( rowsScanned: candidates.length, rowsUpdated, rowsUnknown, + rowsMalformed, epochsProcessed: epochs.length, epochSpan: epochs.length > 0 ? { min: epochs[0], max: epochs[epochs.length - 1] } : null, }; diff --git a/src/services/ingestion/snapshot-builder.service.ts b/src/services/ingestion/snapshot-builder.service.ts index 54d922b..9adcc64 100644 --- a/src/services/ingestion/snapshot-builder.service.ts +++ b/src/services/ingestion/snapshot-builder.service.ts @@ -25,6 +25,30 @@ import { import { refreshDrepDenormColumnsWithResilience } from "./drep-denorm.service"; import { backfillAmountAtSwitch } from "./migration-amount-backfill.service"; import { refreshMigrationAggregateWithResilience } from "./migration-aggregate.service"; +import { + acquireJobLock, + releaseJobLock, + type JobLockReleaseResult, +} from "./syncLock"; + +const SNAPSHOT_REBUILD_JOB_NAME = "snapshot-rebuild"; +/** + * Lock TTL for snapshot rebuild. Headroom for the worst case: the cron path + * runs `backfillAmountAtSwitch` with up to 200 candidate rows, and each row + * can fan out to a Koios `/account_history` call subject to the in-process + * pressure limiter + retry/backoff on 503/timeout. Allow up to ~30 min before + * an apparently-stuck lease is considered expired (which permits another + * replica to take over after a crash). Tune via env if a deployment routinely + * exceeds this. + */ +const SNAPSHOT_REBUILD_LOCK_TTL_MS = (() => { + const raw = process.env.SNAPSHOT_REBUILD_LOCK_TTL_MS; + const parsed = raw ? Number.parseInt(raw, 10) : NaN; + if (Number.isFinite(parsed) && parsed >= 60_000 && parsed <= 60 * 60 * 1000) { + return parsed; + } + return 30 * 60 * 1000; +})(); export interface SnapshotRebuildResult { durationMs: number; @@ -33,6 +57,12 @@ export interface SnapshotRebuildResult { manifestByteSize: number; finalisedChunk: { start: number; end: number; byteSize: number } | null; currentEpoch: number; + /** + * True when another replica/process held the snapshot-rebuild lock and + * this call returned without doing work. Callers should treat the other + * fields as zero/null in this case. + */ + skipped: boolean; } async function readCurrentEpoch(): Promise { @@ -43,9 +73,59 @@ async function readCurrentEpoch(): Promise { export async function rebuildAfterEpoch( epochNo?: number ): Promise { - const startedAt = Date.now(); + // Resolve the epoch BEFORE acquiring any lock. If readCurrentEpoch() throws + // (e.g. DB outage), we propagate without leaving an unreleased lock behind. const currentEpoch = epochNo ?? (await readCurrentEpoch()); + // Distributed lock: prevents two replicas (or the boot path racing the cron + // tick) from simultaneously running denorm refresh + amount backfill + + // recompose. Without this, an N-replica scale-up means N parallel Koios + // floods (~200 calls each from backfillAmountAtSwitch). The acquireJobLock + // helper expires stale locks via TTL so a crashed replica can't pin work. + const acquired = await acquireJobLock( + SNAPSHOT_REBUILD_JOB_NAME, + "Snapshot Rebuild", + { + ttlMs: SNAPSHOT_REBUILD_LOCK_TTL_MS, + source: process.env.HOSTNAME ?? "snapshot-builder", + } + ); + + if (!acquired) { + console.log( + "[snapshot-builder] rebuildAfterEpoch skipped — another replica holds the snapshot-rebuild lock" + ); + return { + durationMs: 0, + drepsByteSize: 0, + chunkByteSize: 0, + manifestByteSize: 0, + finalisedChunk: null, + currentEpoch, + skipped: true, + }; + } + + let releaseResult: JobLockReleaseResult = "success"; + let releaseError: string | null = null; + try { + return await runRebuild(currentEpoch); + } catch (e) { + releaseResult = "failed"; + releaseError = e instanceof Error ? e.message : String(e); + throw e; + } finally { + try { + await releaseJobLock(SNAPSHOT_REBUILD_JOB_NAME, releaseResult, undefined, releaseError); + } catch (lockErr) { + console.error("[snapshot-builder] failed to release snapshot-rebuild lock", lockErr); + } + } +} + +async function runRebuild(currentEpoch: number): Promise { + const startedAt = Date.now(); + const currentChunkStart = Math.floor(currentEpoch / SNAPSHOT_CHUNK_SIZE) * SNAPSHOT_CHUNK_SIZE; @@ -74,10 +154,14 @@ export async function rebuildAfterEpoch( }); if (result.rowsScanned > 0) { console.log( - `[migration-amount-backfill] scanned=${result.rowsScanned} updated=${result.rowsUpdated} unknown=${result.rowsUnknown} epochs=${result.epochsProcessed} in ${result.durationMs}ms` + `[migration-amount-backfill] scanned=${result.rowsScanned} updated=${result.rowsUpdated} unknown=${result.rowsUnknown} malformed=${result.rowsMalformed} epochs=${result.epochsProcessed} in ${result.durationMs}ms` ); - backfillTouchedRows = result.rowsScanned; - if (result.epochSpan) backfillMinEpoch = result.epochSpan.min; + // Re-aggregation only matters when amount_at_switch actually changed. + // Malformed rows leave amountAtSwitch=NULL → MigrationAggregate's COALESCE + // already handles them via the current-state proxy, so re-aggregating on + // a malformed-only pass is wasted work. + backfillTouchedRows = result.rowsUpdated + result.rowsUnknown; + if (result.epochSpan && backfillTouchedRows > 0) backfillMinEpoch = result.epochSpan.min; } } catch (e) { console.error("[migration-amount-backfill] tick failed (continuing)", e); @@ -189,19 +273,89 @@ export async function rebuildAfterEpoch( manifestByteSize: manifestWritten.byteSize, finalisedChunk, currentEpoch, + skipped: false, }; } +/** + * Boot recovery state — exposed via `getSnapshotBootRecoveryStatus()` so + * deployments / readiness probes can detect a sustained Koios failure that + * would otherwise leave the API silently empty until the next epoch tick. + * + * State machine: + * not-started → running → ok (artifacts present and fresh, or rebuild succeeded) + * → skipped (DB had no epoch data yet, or another replica was rebuilding) + * → fresh (manifest+dreps already present and < 1h old) + * → failed (boot recover threw — readiness probes should mark NOT READY) + */ +export type SnapshotBootRecoveryState = + | "not-started" + | "running" + | "ok" + | "skipped" + | "fresh" + | "failed"; + +export interface SnapshotBootRecoveryStatus { + state: SnapshotBootRecoveryState; + startedAt: string | null; + finishedAt: string | null; + durationMs: number | null; + /** Last error message if state==="failed". */ + errorMessage: string | null; + /** Whether L2 manifest+dreps are present and within the staleness threshold at the time of this read. */ + l2Fresh: boolean | null; +} + +let bootRecoveryStatus: SnapshotBootRecoveryStatus = { + state: "not-started", + startedAt: null, + finishedAt: null, + durationMs: null, + errorMessage: null, + l2Fresh: null, +}; + +export function getSnapshotBootRecoveryStatus(): SnapshotBootRecoveryStatus { + return { ...bootRecoveryStatus }; +} + /** * Boot recovery — at API startup, ensure SnapshotCache has at least the * minimum required artifacts. If anything is missing or stale, trigger a * one-shot rebuild. Runs in background; never blocks startup. */ export async function bootRecover(): Promise { + const startedAt = new Date(); + bootRecoveryStatus = { + state: "running", + startedAt: startedAt.toISOString(), + finishedAt: null, + durationMs: null, + errorMessage: null, + l2Fresh: null, + }; + const finalize = ( + state: SnapshotBootRecoveryState, + extra: Partial = {} + ) => { + const finishedAt = new Date(); + bootRecoveryStatus = { + state, + startedAt: bootRecoveryStatus.startedAt, + finishedAt: finishedAt.toISOString(), + durationMs: finishedAt.getTime() - startedAt.getTime(), + errorMessage: null, + l2Fresh: null, + ...extra, + }; + }; + try { const currentEpoch = await readCurrentEpoch(); if (currentEpoch === 0) { console.log("[snapshot-builder] boot-recover skipped — no epoch data yet"); + finalize("skipped"); return; } @@ -225,15 +379,27 @@ export async function bootRecover(): Promise { (now - manifest.generatedAt.getTime()) / 1000 ).toFixed(0)}s` ); + finalize("fresh", { l2Fresh: true }); return; } console.log("[snapshot-builder] boot-recover: rebuilding snapshot"); const result = await rebuildAfterEpoch(currentEpoch); + if (result.skipped) { + console.log( + "[snapshot-builder] boot-recover deferred — another replica is already rebuilding the snapshot" + ); + finalize("skipped"); + return; + } console.log( `[snapshot-builder] boot-recover complete in ${result.durationMs}ms — dreps ${result.drepsByteSize}B chunk ${result.chunkByteSize}B manifest ${result.manifestByteSize}B` ); + finalize("ok", { l2Fresh: true }); } catch (e) { console.error("[snapshot-builder] boot-recover failed", e); + finalize("failed", { + errorMessage: e instanceof Error ? e.message : String(e), + }); } } diff --git a/src/services/ingestion/sync-utils.ts b/src/services/ingestion/sync-utils.ts index 6f6f489..db952c6 100644 --- a/src/services/ingestion/sync-utils.ts +++ b/src/services/ingestion/sync-utils.ts @@ -117,7 +117,16 @@ export async function getKoiosCurrentEpoch(): Promise { source: "ingestion.sync-utils.current-epoch", }) .then((tip) => { - const epoch = tip?.[0]?.epoch_no ?? 0; + const epoch = tip?.[0]?.epoch_no; + // A missing tip is a Koios failure (outage / contract drift), NOT epoch 0. + // Disguising it as 0 silently corrupts every caller that gates on the + // current epoch (sync windows, isFinal checks, boot recovery). Throw so + // the cron tick fails loudly and retries on the next schedule. + if (typeof epoch !== "number" || !Number.isFinite(epoch)) { + throw new Error( + `getKoiosCurrentEpoch: /tip returned no epoch_no (response=${JSON.stringify(tip)})` + ); + } cachedKoiosCurrentEpoch = epoch; cachedKoiosCurrentEpochExpiresAt = Date.now() + KOIOS_CURRENT_EPOCH_CACHE_TTL_MS; diff --git a/src/services/snapshot.service.ts b/src/services/snapshot.service.ts index 8fb4fe1..dc3816c 100644 --- a/src/services/snapshot.service.ts +++ b/src/services/snapshot.service.ts @@ -21,6 +21,7 @@ import { prisma } from "./prisma"; import { cacheGet, cacheSet, cacheInvalidatePrefix } from "./cache"; import { governanceTypeLabelMap } from "../libs/proposalMapper"; import { + chunkFinality, SnapshotChunk, SnapshotChunkAction, SnapshotChunkMigration, @@ -29,7 +30,10 @@ import { SnapshotManifest, SnapshotManifestChunk, SnapshotVote, + toAda, + toKada, } from "../responses"; +import { SENTINEL_DREP_IDS } from "../libs/sentinels"; export const SNAPSHOT_SCHEMA_VERSION = "v1" as const; export const SNAPSHOT_CHUNK_SIZE = 50; @@ -50,13 +54,11 @@ async function singleFlight(key: string, loader: () => Promise): Promise 0 ? distinctVoted / totalProposals : 0); + participationMap.set( + id, + totalProposals > 0 + ? roundParticipation(distinctVoted / totalProposals) + : 0 + ); } } @@ -185,7 +203,7 @@ export async function composeDreps(opts: { }) : []; - const historyByDrep = new Map>(); + const historyByDrep = new Map; delegators: number }>>(); for (const r of historyRows) { let arr = historyByDrep.get(r.drepId); if (!arr) { @@ -201,10 +219,13 @@ export async function composeDreps(opts: { const DREPS: SnapshotDrep[] = drepRows.map((d) => { const denormParticipation = d.proposalParticipationPercent; - // The denorm column stores percent (0..100); the wire shape is fraction (0..1). + // The denorm column stores percent (0..100, 2-decimal precision via SQL ROUND); + // dividing by 100 yields a fraction with 4 decimals. Pass through roundParticipation + // anyway so floating-point noise from the divide-by-100 doesn't smuggle in an + // extra decimal. const participation = denormParticipation != null - ? denormParticipation / 100 + ? roundParticipation(denormParticipation / 100) : participationMap.get(d.drepId) ?? 0; const joined = d.firstSeenEpoch ?? firstSeenEpochMap.get(d.drepId) ?? 0; @@ -277,15 +298,19 @@ export async function composeChunk(startEpoch: number): Promise { prisma.migrationAggregate.findMany({ where: { epoch: { gte: chunkStart, lte: chunkEnd }, - fromDrepId: { notIn: ["drep_always_abstain", "drep_always_no_confidence"] }, - toDrepId: { notIn: ["drep_always_abstain", "drep_always_no_confidence"] }, + fromDrepId: { notIn: [...SENTINEL_DREP_IDS] }, + toDrepId: { notIn: [...SENTINEL_DREP_IDS] }, }, orderBy: [{ epoch: "asc" }, { fromDrepId: "asc" }, { toDrepId: "asc" }], }), readCurrentEpoch(), - // Stability check: are ALL changelog rows in this chunk's epoch range - // backfilled with amount_at_switch from /account_history? If yes, the + // Stability check: are ALL non-sentinel changelog rows in this chunk's epoch + // range backfilled with amount_at_switch from /account_history? If yes, the // chunk's MIGRATIONS values will never change → safe to serve as immutable. + // Sentinels (drep_always_*) are filtered out of the chunk MIGRATIONS payload + // (lines 280-282) so they must also be filtered here, otherwise an unbackfilled + // sentinel row would pin isStable=false forever despite never appearing in the + // payload. prisma.$queryRaw>` SELECT COUNT(*)::bigint AS unstable FROM "stake_delegation_change" @@ -294,11 +319,13 @@ export async function composeChunk(startEpoch: number): Promise { AND "from_drep_id" <> '' AND "to_drep_id" <> '' AND "from_drep_id" <> "to_drep_id" - AND "amount_source" IS NULL - -- 'unknown' rows are stable: Koios /account_history confirmed no active_stake at - -- that epoch start. /account_history responses don't change retroactively, so the - -- 0-lovelace contribution is permanent. Only rows still awaiting backfill (NULL) - -- block isStable=true. + AND "from_drep_id" NOT IN ('drep_always_abstain', 'drep_always_no_confidence') + AND "to_drep_id" NOT IN ('drep_always_abstain', 'drep_always_no_confidence') + AND "amount_at_switch" IS NULL + -- Stability is keyed off amount_at_switch, not amount_source: 'unknown' + -- rows are stable (Koios confirmed zero active_stake; deterministic), + -- 'koios-malformed' rows are NOT (amountAtSwitch=NULL until an operator + -- clears the tag and re-runs the backfill). `, // Delegation full-scan watermark: until the first full inventory pass // completes, the changelog itself may be missing historical rows for @@ -309,14 +336,17 @@ export async function composeChunk(startEpoch: number): Promise { }), ]); - const isFinal = chunkEnd < currentEpoch; - const allAmountsHistorical = Number(amountStability[0]?.unstable ?? 0) === 0; - const fullScanCompleted = !!delegationWatermark?.lastFullAllDrepsScanAt; // isStable requires BOTH: every existing row has historical amount AND the // changelog itself has been fully scanned at least once. Without the second // condition, sync-drep-delegators can still append historical rows after a - // chunk has already been served as immutable. - const isStable = isFinal && allAmountsHistorical && fullScanCompleted; + // chunk has already been served as immutable. The `chunkFinality()` helper + // collapses these signals into a discriminated union so the illegal + // `{isFinal:false, isStable:true}` cannot be constructed. + const finality = chunkFinality({ + isFinal: chunkEnd < currentEpoch, + allAmountsHistorical: Number(amountStability[0]?.unstable ?? 0) === 0, + fullScanCompleted: !!delegationWatermark?.lastFullAllDrepsScanAt, + }); const ACTIONS: SnapshotChunkAction[] = proposals.map((p) => ({ id: p.proposalId, @@ -357,11 +387,10 @@ export async function composeChunk(startEpoch: number): Promise { generatedAt: new Date().toISOString(), epochStart: chunkStart, epochEnd: chunkEnd, - isFinal, - isStable, ACTIONS, votes, MIGRATIONS, + ...finality, }; } @@ -409,8 +438,8 @@ export async function composeManifest(opts?: { _count: { _all: true }, where: { epoch: { gte: firstChunkStart, lte: lastChunkStart + SNAPSHOT_CHUNK_SIZE - 1 }, - fromDrepId: { notIn: ["drep_always_abstain", "drep_always_no_confidence"] }, - toDrepId: { notIn: ["drep_always_abstain", "drep_always_no_confidence"] }, + fromDrepId: { notIn: [...SENTINEL_DREP_IDS] }, + toDrepId: { notIn: [...SENTINEL_DREP_IDS] }, }, }); @@ -453,11 +482,13 @@ export async function composeManifest(opts?: { AND "from_drep_id" <> '' AND "to_drep_id" <> '' AND "from_drep_id" <> "to_drep_id" - AND "amount_source" IS NULL - -- 'unknown' rows are stable: Koios /account_history confirmed no active_stake at - -- that epoch start. /account_history responses don't change retroactively, so the - -- 0-lovelace contribution is permanent. Only rows still awaiting backfill (NULL) - -- block isStable=true. + AND "from_drep_id" NOT IN ('drep_always_abstain', 'drep_always_no_confidence') + AND "to_drep_id" NOT IN ('drep_always_abstain', 'drep_always_no_confidence') + AND "amount_at_switch" IS NULL + -- See composeChunk above: stability is keyed off amount_at_switch so + -- 'koios-malformed' (NULL value, retry pending) blocks isStable while + -- 'unknown' (zero confirmed by Koios) does not. Sentinels are excluded to + -- match the chunk MIGRATIONS payload — see migrationCountsByEpoch above. GROUP BY "delegated_epoch_no" `; const unstableCountByEpoch = new Map(); @@ -485,16 +516,19 @@ export async function composeManifest(opts?: { migrationCount += migByEpoch.get(ep) ?? 0; unstableInChunk += unstableCountByEpoch.get(ep) ?? 0; } - const isFinal = e < currentEpoch; + const finality = chunkFinality({ + isFinal: e < currentEpoch, + allAmountsHistorical: unstableInChunk === 0, + fullScanCompleted, + }); chunks.push({ startEpoch: s, endEpoch: e, url: `/snapshot/chunks/${s}-${e}`, - isFinal, - isStable: isFinal && unstableInChunk === 0 && fullScanCompleted, actionCount, voteCount, migrationCount, + ...finality, }); } @@ -532,6 +566,7 @@ async function readCachedRow(cacheKey: string): Promise<{ generatedAt: Date; isFinal: boolean; byteSize: number; + contentEncoding: string; } | null> { const row = await prisma.snapshotCache.findUnique({ where: { cacheKey } }); if (!row) return null; @@ -543,11 +578,12 @@ async function readCachedRow(cacheKey: string): Promise<{ }) .catch(() => undefined); return { - body: Buffer.from(row.body), + body: Buffer.from(row.bodyGzip), etag: row.etag, generatedAt: row.generatedAt, isFinal: row.isFinal, byteSize: row.byteSize, + contentEncoding: row.contentEncoding, }; } @@ -563,7 +599,8 @@ export async function writeCachedSnapshot( await prisma.snapshotCache.upsert({ where: { cacheKey }, update: { - body: gzippedBody, + bodyGzip: gzippedBody, + contentEncoding: "gzip", generatedAt, schemaVersion: SNAPSHOT_SCHEMA_VERSION, isFinal: opts.isFinal, @@ -572,7 +609,8 @@ export async function writeCachedSnapshot( }, create: { cacheKey, - body: gzippedBody, + bodyGzip: gzippedBody, + contentEncoding: "gzip", generatedAt, schemaVersion: SNAPSHOT_SCHEMA_VERSION, isFinal: opts.isFinal, @@ -607,17 +645,38 @@ async function readOrCompose( // means the row was written before the epoch boundary and serving it as // final would lock in a partial mutable payload. if (dbHit && dbHit.isFinal === isFinal) { - const data = JSON.parse(gunzipSync(dbHit.body).toString("utf-8")) as T; - const cached: CachedSnapshot = { - data, - etag: dbHit.etag, - generatedAt: dbHit.generatedAt, - isFinal: dbHit.isFinal, - byteSize: dbHit.byteSize, - gzippedBody: dbHit.body, - }; - cacheSet(cacheKey, cached, isFinal ? CACHE_TTL_FINAL_MS : CACHE_TTL_FRESH_MS); - return cached; + try { + const data = JSON.parse(gunzipSync(dbHit.body).toString("utf-8")) as T; + const cached: CachedSnapshot = { + data, + etag: dbHit.etag, + generatedAt: dbHit.generatedAt, + isFinal: dbHit.isFinal, + byteSize: dbHit.byteSize, + gzippedBody: dbHit.body, + }; + cacheSet(cacheKey, cached, isFinal ? CACHE_TTL_FINAL_MS : CACHE_TTL_FRESH_MS); + return cached; + } catch (e) { + // Self-heal a poisoned L2 row: corrupted gzip / non-JSON body would + // otherwise 500 every reader for this key forever. Conditional-delete + // by etag so we don't wipe a fresh row that another instance just + // wrote between our read and our delete; if the row was already + // replaced, deleteMany matches zero rows and we proceed to recompose + // (an extra recompose is harmless idempotent work). + console.error( + `[snapshot.service] L2 cache poison for ${cacheKey} — dropping row and recomposing`, + e + ); + await prisma.snapshotCache + .deleteMany({ where: { cacheKey, etag: dbHit.etag } }) + .catch((delErr) => { + console.error( + `[snapshot.service] failed to evict poisoned L2 row ${cacheKey}`, + delErr + ); + }); + } } } diff --git a/src/utils/query-params.ts b/src/utils/query-params.ts new file mode 100644 index 0000000..bb49b98 --- /dev/null +++ b/src/utils/query-params.ts @@ -0,0 +1,136 @@ +/** + * Shared query-param parsers for public API endpoints. + * + * The original `parseInt(req.query.x)` pattern returned NaN for bad input + * and let the caller silently fall through to a default — meaning a typo + * (`?epochStart=abc`) would silently return the default-window response + * instead of telling the client they sent garbage. These helpers reject + * non-integer / out-of-range values so the controller can return 400. + */ + +export type ParseError = { + ok: false; + status: 400; + error: string; + message: string; +}; + +export type ParseOk = { ok: true; value: T }; + +export type ParseResult = ParseOk | ParseError; + +export interface ParseIntegerOptions { + /** Lower bound (inclusive). */ + min?: number; + /** Upper bound (inclusive). */ + max?: number; + /** Returned when the param is omitted or empty. If absent, omitted params are an error. */ + default?: number; +} + +/** + * Parse a query-param value as an integer with strict validation. + * Returns a discriminated result so the caller can short-circuit on + * `result.ok === false` with `res.status(result.status).json(...)`. + * + * Behaviour: + * - Missing / empty: returns `default` if provided, else error. + * - Non-integer string ("abc", "3.14", "1e5"): error. + * - Out of [min, max] range: error. + * - Otherwise: parsed integer. + */ +export function parseIntegerQuery( + raw: unknown, + name: string, + opts: ParseIntegerOptions = {} +): ParseResult { + if (raw === undefined || raw === null || raw === "") { + if (opts.default !== undefined) { + return { ok: true, value: opts.default }; + } + return { + ok: false, + status: 400, + error: "Missing query parameter", + message: `Required query parameter '${name}' was not provided`, + }; + } + + if (typeof raw !== "string") { + return { + ok: false, + status: 400, + error: "Invalid query parameter", + message: `Query parameter '${name}' must be a single value, got ${typeof raw}`, + }; + } + + // Strict integer-shape check: optional sign + digits, no decimals/exponents. + if (!/^-?\d+$/.test(raw.trim())) { + return { + ok: false, + status: 400, + error: "Invalid query parameter", + message: `Query parameter '${name}' must be an integer, got '${raw}'`, + }; + } + + const value = Number.parseInt(raw, 10); + if (!Number.isFinite(value)) { + return { + ok: false, + status: 400, + error: "Invalid query parameter", + message: `Query parameter '${name}' could not be parsed as an integer`, + }; + } + + // Always reject values outside the JS safe-integer window — `parseInt` will + // happily return imprecise Numbers above 2^53, and downstream Prisma calls + // would either throw or silently truncate. Callers that legitimately need + // string-typed BigInts should use a different helper. + if (!Number.isSafeInteger(value)) { + return { + ok: false, + status: 400, + error: "Invalid query parameter", + message: `Query parameter '${name}' is outside the safe integer range`, + }; + } + + if (opts.min !== undefined && value < opts.min) { + return { + ok: false, + status: 400, + error: "Invalid query parameter", + message: `Query parameter '${name}' must be ≥ ${opts.min}, got ${value}`, + }; + } + if (opts.max !== undefined && value > opts.max) { + return { + ok: false, + status: 400, + error: "Invalid query parameter", + message: `Query parameter '${name}' must be ≤ ${opts.max}, got ${value}`, + }; + } + + return { ok: true, value }; +} + +/** + * Parse an optional integer query param. Returns `undefined` for omitted + * values; rejects malformed input the same way as `parseIntegerQuery`. + */ +export function parseIntegerQueryOpt( + raw: unknown, + name: string, + opts: Omit = {} +): ParseResult { + if (raw === undefined || raw === null || raw === "") { + return { ok: true, value: undefined }; + } + const result = parseIntegerQuery(raw, name, opts); + if (!result.ok) return result; + return { ok: true, value: result.value }; +} diff --git a/test/get-dreps-controller.test.ts b/test/get-dreps-controller.test.ts new file mode 100644 index 0000000..bed7879 --- /dev/null +++ b/test/get-dreps-controller.test.ts @@ -0,0 +1,200 @@ +/** + * GET /dreps controller — guards the I11 invariants: + * - Hot path (all DReps have denormalized firstSeenEpoch + proposalParticipationPercent + * columns populated): no on-fly groupBy fallback, response uses the column values. + * - Cold-start fallback (at least one DRep missing a denorm column): the on-fly + * groupBy path runs and the response merges denorm + computed values. + * - Response shape carries the two new optional fields with correct precision. + * - Strict query-param validation (I4) returns 400 on garbage input. + */ + +async function loadHarness() { + jest.resetModules(); + + const drepCountMock = jest.fn(); + const drepFindManyMock = jest.fn(); + const onchainVoteGroupByMock = jest.fn(); + const drepLifecycleGroupByMock = jest.fn(); + const proposalCountMock = jest.fn(); + + jest.doMock("../src/services", () => ({ + prisma: { + drep: { + count: drepCountMock, + findMany: drepFindManyMock, + }, + onchainVote: { groupBy: onchainVoteGroupByMock }, + drepLifecycleEvent: { groupBy: drepLifecycleGroupByMock }, + proposal: { count: proposalCountMock }, + }, + })); + + const { getDReps } = await import("../src/controllers/drep/getDReps"); + return { + getDReps, + drepCountMock, + drepFindManyMock, + onchainVoteGroupByMock, + drepLifecycleGroupByMock, + proposalCountMock, + }; +} + +function makeRes() { + const res: any = { + statusCode: 200, + body: null as unknown, + status(code: number) { + this.statusCode = code; + return this; + }, + json(payload: unknown) { + this.body = payload; + return this; + }, + }; + return res; +} + +describe("GET /dreps", () => { + it("hot path: when ALL DReps have denorm columns populated, skips the on-fly groupBy fallback", async () => { + const h = await loadHarness(); + h.drepCountMock.mockResolvedValue(2); + h.drepFindManyMock.mockResolvedValue([ + { + drepId: "drep1aaa", + name: "Alpha", + iconUrl: null, + votingPower: 5_000_000_000n, + delegatorCount: 10, + firstSeenEpoch: 600, + proposalParticipationPercent: 42.5, + }, + { + drepId: "drep1bbb", + name: "Beta", + iconUrl: null, + votingPower: 1_000_000_000n, + delegatorCount: 3, + firstSeenEpoch: 605, + proposalParticipationPercent: 12.34, + }, + ]); + // Vote counts always queried, regardless of denorm state. + h.onchainVoteGroupByMock.mockResolvedValue([ + { drepId: "drep1aaa", _count: { id: 5 } }, + { drepId: "drep1bbb", _count: { id: 1 } }, + ]); + + const res = makeRes(); + await h.getDReps({ query: {} } as any, res); + + expect(res.statusCode).toBe(200); + // Cold-start groupBy paths must NOT have been called. + expect(h.drepLifecycleGroupByMock).not.toHaveBeenCalled(); + expect(h.proposalCountMock).not.toHaveBeenCalled(); + // The denorm columns are echoed straight through. + expect(res.body.dreps[0]).toMatchObject({ + drepId: "drep1aaa", + firstSeenEpoch: 600, + proposalParticipationPercent: 42.5, + votingPower: "5000000000", + votingPowerAda: "5000.000000", + totalVotesCast: 5, + }); + expect(res.body.dreps[1]).toMatchObject({ + drepId: "drep1bbb", + firstSeenEpoch: 605, + proposalParticipationPercent: 12.34, + }); + }); + + it("cold-start fallback: when at least one DRep is missing a denorm column, the on-fly groupBy fills the gap", async () => { + const h = await loadHarness(); + h.drepCountMock.mockResolvedValue(2); + h.drepFindManyMock.mockResolvedValue([ + { + drepId: "drep1aaa", + name: null, + iconUrl: null, + votingPower: 1n, + delegatorCount: 0, + firstSeenEpoch: null, // ← missing + proposalParticipationPercent: null, // ← missing + }, + { + drepId: "drep1bbb", + name: null, + iconUrl: null, + votingPower: 2n, + delegatorCount: 0, + firstSeenEpoch: 700, + proposalParticipationPercent: 50, + }, + ]); + h.onchainVoteGroupByMock + // First call: total vote counts (always) + .mockResolvedValueOnce([ + { drepId: "drep1aaa", _count: { id: 0 } }, + { drepId: "drep1bbb", _count: { id: 7 } }, + ]) + // Second call: distinct (drepId, proposalId) pairs (cold-start branch) + .mockResolvedValueOnce([ + { drepId: "drep1bbb", proposalId: "p1" }, + { drepId: "drep1bbb", proposalId: "p2" }, + ]); + h.drepLifecycleGroupByMock.mockResolvedValue([ + { drepId: "drep1aaa", _min: { epochNo: 690 } }, + ]); + h.proposalCountMock.mockResolvedValue(4); + + const warnSpy = jest.spyOn(console, "warn").mockImplementation(() => {}); + try { + const res = makeRes(); + await h.getDReps({ query: {} } as any, res); + + expect(res.statusCode).toBe(200); + // Cold-start groupBy paths must HAVE been called. + expect(h.drepLifecycleGroupByMock).toHaveBeenCalledTimes(1); + expect(h.proposalCountMock).toHaveBeenCalledTimes(1); + // For drep1aaa (no denorm): firstSeenEpoch came from drepLifecycleEvent groupBy + // and proposalParticipationPercent came from on-fly compute (0/4 = 0). + const aaa = res.body.dreps.find((d: any) => d.drepId === "drep1aaa"); + expect(aaa.firstSeenEpoch).toBe(690); + expect(aaa.proposalParticipationPercent).toBe(0); + // drep1bbb keeps its denorm values even though we ran the fallback. + const bbb = res.body.dreps.find((d: any) => d.drepId === "drep1bbb"); + expect(bbb.firstSeenEpoch).toBe(700); + expect(bbb.proposalParticipationPercent).toBe(50); + expect(warnSpy).toHaveBeenCalled(); + } finally { + warnSpy.mockRestore(); + } + }); + + it("rejects garbage page query param with 400 (was a NaN→default landmine before I4)", async () => { + const h = await loadHarness(); + const res = makeRes(); + await h.getDReps({ query: { page: "abc" } } as any, res); + expect(res.statusCode).toBe(400); + expect(h.drepFindManyMock).not.toHaveBeenCalled(); + }); + + it("rejects out-of-range pageSize with 400", async () => { + const h = await loadHarness(); + const res = makeRes(); + await h.getDReps({ query: { pageSize: "99999" } } as any, res); + expect(res.statusCode).toBe(400); + }); + + it("clamps and accepts a valid pageSize within bounds", async () => { + const h = await loadHarness(); + h.drepCountMock.mockResolvedValue(0); + h.drepFindManyMock.mockResolvedValue([]); + h.onchainVoteGroupByMock.mockResolvedValue([]); + const res = makeRes(); + await h.getDReps({ query: { pageSize: "50" } } as any, res); + expect(res.statusCode).toBe(200); + expect(res.body.pagination.pageSize).toBe(50); + }); +}); diff --git a/test/healthz.test.ts b/test/healthz.test.ts new file mode 100644 index 0000000..a79c0f0 --- /dev/null +++ b/test/healthz.test.ts @@ -0,0 +1,79 @@ +/** + * /healthz readiness probe — pins the I10 contract: + * - 200 on ok/fresh/skipped/starting (boot in progress, no failure) + * - 503 on degraded (boot recovery failed) + * - status field reflects the underlying boot state machine + */ + +async function loadHarness() { + jest.resetModules(); + const getStatusMock = jest.fn(); + jest.doMock("../src/services/ingestion/snapshot-builder.service", () => ({ + getSnapshotBootRecoveryStatus: () => getStatusMock(), + })); + const { getHealthz } = await import("../src/controllers/healthz/getHealthz"); + return { getHealthz, getStatusMock }; +} + +function makeRes() { + const res: any = { + statusCode: 200, + body: null as unknown, + status(code: number) { + this.statusCode = code; + return this; + }, + json(payload: unknown) { + this.body = payload; + return this; + }, + }; + return res; +} + +describe("GET /healthz", () => { + it.each([ + ["ok", "ok", 200], + ["fresh", "ok", 200], + ["running", "starting", 200], + ["not-started", "starting", 200], + ["skipped", "skipped", 200], + ["failed", "degraded", 503], + ])( + "boot state '%s' → status '%s' with HTTP %d", + async (bootState, expectedStatus, expectedHttp) => { + const h = await loadHarness(); + h.getStatusMock.mockReturnValue({ + state: bootState, + startedAt: "2026-05-09T00:00:00.000Z", + finishedAt: bootState === "running" || bootState === "not-started" ? null : "2026-05-09T00:00:01.000Z", + durationMs: bootState === "running" || bootState === "not-started" ? null : 1000, + errorMessage: bootState === "failed" ? "Koios outage" : null, + l2Fresh: bootState === "fresh" || bootState === "ok" ? true : null, + }); + + const res = makeRes(); + h.getHealthz({} as any, res); + + expect(res.statusCode).toBe(expectedHttp); + expect(res.body.status).toBe(expectedStatus); + expect(res.body.snapshotBootRecovery.state).toBe(bootState); + } + ); + + it("includes errorMessage in the body when degraded", async () => { + const h = await loadHarness(); + h.getStatusMock.mockReturnValue({ + state: "failed", + startedAt: "2026-05-09T00:00:00.000Z", + finishedAt: "2026-05-09T00:00:01.000Z", + durationMs: 1000, + errorMessage: "Koios /tip 503", + l2Fresh: null, + }); + const res = makeRes(); + h.getHealthz({} as any, res); + expect(res.statusCode).toBe(503); + expect(res.body.snapshotBootRecovery.errorMessage).toBe("Koios /tip 503"); + }); +}); diff --git a/test/migration-aggregate-accuracy.test.ts b/test/migration-aggregate-accuracy.test.ts new file mode 100644 index 0000000..4dd7c0d --- /dev/null +++ b/test/migration-aggregate-accuracy.test.ts @@ -0,0 +1,149 @@ +/** + * Tests for getMigrationAggregateAccuracy — guards the C6 invariants: + * - Filter scoping: epochStart/epochEnd, fromDrepId/toDrepId, excludeSentinels, + * drepIdAllowlist must each appear in the SQL parameter list so the source + * distribution reflects the filtered window, NOT the global changelog. + * - SQL is parameterized via $queryRawUnsafe positional params (no string + * interpolation of caller input). + * - "koios-malformed" rows are reported in their own bucket (introduced in C2). + * - Default behaviour: excludeSentinels=true unless explicitly turned off. + */ + +import type { PrismaClient } from "@prisma/client"; + +import { + getMigrationAggregateAccuracy, + type MigrationAggregateAccuracy, +} from "../src/services/ingestion/migration-aggregate.service"; + +function makePrismaWithRows(rows: Array<{ amount_source: string | null; n: bigint }>) { + const queryRawUnsafe = jest.fn().mockResolvedValue(rows); + const prisma = { $queryRawUnsafe: queryRawUnsafe } as unknown as PrismaClient; + return { prisma, queryRawUnsafe }; +} + +describe("getMigrationAggregateAccuracy", () => { + it("buckets each amount_source into its own field, including koios-malformed", async () => { + const { prisma } = makePrismaWithRows([ + { amount_source: "koios-history", n: 100n }, + { amount_source: "current-proxy", n: 7n }, + { amount_source: "unknown", n: 3n }, + { amount_source: "koios-malformed", n: 2n }, + { amount_source: null, n: 1n }, // never visited by backfill — fall-through bucket + ]); + + const result: MigrationAggregateAccuracy = + await getMigrationAggregateAccuracy(prisma); + + expect(result.totalChangeRows).toBe(113); + expect(result.rowsWithKoiosHistory).toBe(100); + expect(result.rowsWithCurrentProxy).toBe(8); // 7 explicit + 1 NULL fall-through + expect(result.rowsUnknown).toBe(3); + expect(result.rowsMalformed).toBe(2); + }); + + it("classifies level=koios-history when ≥99% rows are koios-history", async () => { + const { prisma } = makePrismaWithRows([ + { amount_source: "koios-history", n: 99n }, + { amount_source: "current-proxy", n: 1n }, + ]); + const result = await getMigrationAggregateAccuracy(prisma); + expect(result.level).toBe("koios-history"); + }); + + it("classifies level=mixed when partial koios-history coverage", async () => { + const { prisma } = makePrismaWithRows([ + { amount_source: "koios-history", n: 50n }, + { amount_source: "current-proxy", n: 50n }, + ]); + const result = await getMigrationAggregateAccuracy(prisma); + expect(result.level).toBe("mixed"); + }); + + it("classifies level=current when no koios-history rows exist", async () => { + const { prisma } = makePrismaWithRows([ + { amount_source: "current-proxy", n: 100n }, + ]); + const result = await getMigrationAggregateAccuracy(prisma); + expect(result.level).toBe("current"); + }); + + it("classifies level=current when there are zero rows at all", async () => { + const { prisma } = makePrismaWithRows([]); + const result = await getMigrationAggregateAccuracy(prisma); + expect(result.level).toBe("current"); + expect(result.totalChangeRows).toBe(0); + }); + + it("scopes the query to epochStart/epochEnd via positional params (filtered, not global)", async () => { + const { prisma, queryRawUnsafe } = makePrismaWithRows([]); + + await getMigrationAggregateAccuracy(prisma, { + epochStart: 600, + epochEnd: 700, + }); + + expect(queryRawUnsafe).toHaveBeenCalledTimes(1); + const [sql, ...params] = queryRawUnsafe.mock.calls[0]; + expect(typeof sql).toBe("string"); + // Both bounds must appear as positional params (no string interpolation). + expect(params).toEqual(expect.arrayContaining([600, 700])); + // The SQL must reference the bounds via $N placeholders so they reach the DB + // through binding, not the SQL text. Without these clauses the bounds would + // be passed but never narrow the scan. + expect(sql).toMatch(/"delegated_epoch_no"\s*>=\s*\$\d+/); + expect(sql).toMatch(/"delegated_epoch_no"\s*<=\s*\$\d+/); + }); + + it("excludes drep_always_* sentinels by default — both as params AND as SQL predicates", async () => { + const { prisma, queryRawUnsafe } = makePrismaWithRows([]); + await getMigrationAggregateAccuracy(prisma); + const [sql, ...params] = queryRawUnsafe.mock.calls[0]; + // Both sentinel ids must appear, twice each (from + to columns). + const abstainCount = params.filter((p) => p === "drep_always_abstain").length; + const nocCount = params.filter((p) => p === "drep_always_no_confidence").length; + expect(abstainCount).toBeGreaterThanOrEqual(2); + expect(nocCount).toBeGreaterThanOrEqual(2); + // The SQL must actually USE the sentinel params via NOT IN clauses on both + // columns — otherwise a regression that pushes the params but drops the + // predicate would silently let sentinels skew the accuracy denominator. + expect(sql).toMatch(/"from_drep_id"\s+NOT\s+IN/i); + expect(sql).toMatch(/"to_drep_id"\s+NOT\s+IN/i); + }); + + it("includes sentinels when excludeSentinels=false (sentinel literals must not appear)", async () => { + const { prisma, queryRawUnsafe } = makePrismaWithRows([]); + await getMigrationAggregateAccuracy(prisma, { excludeSentinels: false }); + const [, ...params] = queryRawUnsafe.mock.calls[0]; + expect(params).not.toContain("drep_always_abstain"); + expect(params).not.toContain("drep_always_no_confidence"); + }); + + it("scopes to fromDrepId / toDrepId / drepIdAllowlist via positional params AND matching SQL predicates", async () => { + const { prisma, queryRawUnsafe } = makePrismaWithRows([]); + + await getMigrationAggregateAccuracy(prisma, { + fromDrepId: "drep-from", + toDrepId: "drep-to", + drepIdAllowlist: ["drep-a", "drep-b", "drep-c"], + }); + + const [sql, ...params] = queryRawUnsafe.mock.calls[0]; + expect(params).toEqual( + expect.arrayContaining([ + "drep-from", + "drep-to", + "drep-a", + "drep-b", + "drep-c", + ]) + ); + // The SQL must actually narrow on these inputs — equality predicate on + // from_drep_id and to_drep_id, plus the allowlist IN-clause on both + // columns. Otherwise pushed params would not gate the scan. + expect(sql).toMatch(/"from_drep_id"\s*=\s*\$\d+/); + expect(sql).toMatch(/"to_drep_id"\s*=\s*\$\d+/); + expect(sql).toMatch(/"from_drep_id"\s+IN\s*\(/i); + expect(sql).toMatch(/"to_drep_id"\s+IN\s*\(/i); + }); +}); diff --git a/test/migration-amount-backfill.test.ts b/test/migration-amount-backfill.test.ts new file mode 100644 index 0000000..51162cf --- /dev/null +++ b/test/migration-amount-backfill.test.ts @@ -0,0 +1,228 @@ +/** + * Tests for backfillAmountAtSwitch — idempotency + Koios-malformed handling. + * + * Specifically guards the C2 invariants: + * - Malformed `active_stake` values (not parseable as a non-negative integer) + * are tagged "koios-malformed" with amount_at_switch left NULL, NOT silently + * coerced to 0n + "unknown". + * - Numbers are rejected (mainnet whales > MAX_SAFE_INTEGER). + * - Hex / negative / blank strings are rejected. + * - "unknown" rows (Koios returned no entry for the (addr, epoch)) get 0n + "unknown". + * - The candidate query excludes already-tagged "koios-malformed" rows by default + * (so chronic-malformed rows can't pin the head of the queue). + */ + +interface MockChange { + id: number; + stakeAddress: string; + delegatedEpoch: number; + amountAtSwitch: bigint | null; + amountSource: string | null; +} + +async function loadHarness() { + jest.resetModules(); + + const findManyMock = jest.fn(); + const updateMock = jest.fn(); + const updateManyMock = jest.fn().mockResolvedValue({ count: 0 }); + const executeRawUnsafeMock = jest.fn().mockResolvedValue(0); + const getAccountHistoryBatchMock = jest.fn(); + + jest.doMock("../src/services/prisma", () => ({ + prisma: { + stakeDelegationChange: { + findMany: findManyMock, + update: updateMock, + updateMany: updateManyMock, + }, + $executeRawUnsafe: executeRawUnsafeMock, + }, + })); + jest.doMock("../src/services/governanceProvider", () => ({ + getAccountHistoryBatch: (...args: unknown[]) => + getAccountHistoryBatchMock(...args), + })); + + const mod = await import( + "../src/services/ingestion/migration-amount-backfill.service" + ); + return { + ...mod, + findManyMock, + updateMock, + updateManyMock, + executeRawUnsafeMock, + getAccountHistoryBatchMock, + }; +} + +describe("backfillAmountAtSwitch", () => { + it("tags rows as 'koios-history' via a single batched UPDATE...FROM(VALUES) when Koios returns valid integers", async () => { + const h = await loadHarness(); + h.findManyMock.mockResolvedValue([ + { id: 1, stakeAddress: "stake1abc", delegatedEpoch: 600 }, + { id: 2, stakeAddress: "stake1def", delegatedEpoch: 600 }, + ]); + h.getAccountHistoryBatchMock.mockResolvedValue([ + { stake_address: "stake1abc", history: [{ epoch_no: 600, active_stake: "1234567890" }] }, + { stake_address: "stake1def", history: [{ epoch_no: 600, active_stake: "9999999999" }] }, + ]); + + const result = await h.backfillAmountAtSwitch(); + + expect(result.rowsUpdated).toBe(2); + expect(result.rowsUnknown).toBe(0); + expect(result.rowsMalformed).toBe(0); + // The history bucket flushes through a single $executeRawUnsafe call carrying + // every (id, amount) pair; the per-row prisma.update path is no longer used. + expect(h.executeRawUnsafeMock).toHaveBeenCalledTimes(1); + expect(h.updateMock).not.toHaveBeenCalled(); + const [sql, ...params] = h.executeRawUnsafeMock.mock.calls[0]; + expect(sql).toMatch(/UPDATE\s+"stake_delegation_change"/i); + expect(sql).toMatch(/'koios-history'/); + expect(params).toEqual([1, "1234567890", 2, "9999999999"]); + }); + + it("tags rows as 'unknown' with amount_at_switch=0n via a single updateMany when Koios returns no /account_history entry", async () => { + const h = await loadHarness(); + h.findManyMock.mockResolvedValue([ + { id: 5, stakeAddress: "stake1aaa", delegatedEpoch: 601 }, + { id: 6, stakeAddress: "stake1bbb", delegatedEpoch: 601 }, + ]); + h.getAccountHistoryBatchMock.mockResolvedValue([ + { stake_address: "stake1aaa", history: [] }, + { stake_address: "stake1bbb", history: [] }, + ]); + + const result = await h.backfillAmountAtSwitch(); + + expect(result.rowsUnknown).toBe(2); + expect(result.rowsUpdated).toBe(0); + expect(result.rowsMalformed).toBe(0); + expect(h.updateManyMock).toHaveBeenCalledWith({ + where: { id: { in: [5, 6] } }, + data: { amountAtSwitch: 0n, amountSource: "unknown" }, + }); + expect(h.updateMock).not.toHaveBeenCalled(); + }); + + it("tags rows as 'koios-malformed' via a single updateMany on parse failures (and never as 'koios-history')", async () => { + const h = await loadHarness(); + h.findManyMock.mockResolvedValue([ + { id: 3, stakeAddress: "stake1ghi", delegatedEpoch: 602 }, // empty string + { id: 4, stakeAddress: "stake1jkl", delegatedEpoch: 602 }, // hex + { id: 5, stakeAddress: "stake1mno", delegatedEpoch: 602 }, // negative + { id: 6, stakeAddress: "stake1pqr", delegatedEpoch: 602 }, // number type (rejected for whale safety) + { id: 7, stakeAddress: "stake1stu", delegatedEpoch: 602 }, // null + ]); + h.getAccountHistoryBatchMock.mockResolvedValue([ + { stake_address: "stake1ghi", history: [{ epoch_no: 602, active_stake: "" }] }, + { stake_address: "stake1jkl", history: [{ epoch_no: 602, active_stake: "0xff" }] }, + { stake_address: "stake1mno", history: [{ epoch_no: 602, active_stake: "-5" }] }, + { stake_address: "stake1pqr", history: [{ epoch_no: 602, active_stake: 12345 as unknown as string }] }, + { stake_address: "stake1stu", history: [{ epoch_no: 602, active_stake: null as unknown as string }] }, + ]); + + const warnSpy = jest.spyOn(console, "warn").mockImplementation(() => {}); + try { + const result = await h.backfillAmountAtSwitch(); + + expect(result.rowsMalformed).toBe(5); + expect(result.rowsUpdated).toBe(0); + expect(result.rowsUnknown).toBe(0); + // Single batched updateMany tags every malformed row in one round-trip; + // amount_at_switch is NOT touched (stays NULL) — verify by ensuring data + // omits it. + expect(h.updateManyMock).toHaveBeenCalledWith({ + where: { id: { in: [3, 4, 5, 6, 7] } }, + data: { amountSource: "koios-malformed" }, + }); + // Per-row warn-log still fires so ops gets the (addr, epoch, raw) tuples. + expect(warnSpy).toHaveBeenCalledTimes(5); + expect(h.updateMock).not.toHaveBeenCalled(); + } finally { + warnSpy.mockRestore(); + } + }); + + it("issues at most three SQL statements per epoch regardless of row count (no N+1)", async () => { + const h = await loadHarness(); + // 60 candidates in a single epoch — mix of buckets. + const candidates = Array.from({ length: 60 }, (_, i) => ({ + id: i + 1, + stakeAddress: `stake${i}`, + delegatedEpoch: 600, + })); + h.findManyMock.mockResolvedValue(candidates); + // 20 with valid amounts, 20 unknown (no entry), 20 malformed. + const entries = candidates.map((c, i) => ({ + stake_address: c.stakeAddress, + history: + i < 20 + ? [{ epoch_no: 600, active_stake: `${(i + 1) * 1000}` }] + : i < 40 + ? [] + : [{ epoch_no: 600, active_stake: "garbage" }], + })); + h.getAccountHistoryBatchMock.mockResolvedValue(entries); + + const warnSpy = jest.spyOn(console, "warn").mockImplementation(() => {}); + try { + const result = await h.backfillAmountAtSwitch(); + expect(result.rowsUpdated).toBe(20); + expect(result.rowsUnknown).toBe(20); + expect(result.rowsMalformed).toBe(20); + + // Exactly one $executeRawUnsafe (history bucket) + two updateMany + // (unknown + malformed), regardless of 60 input rows. + expect(h.executeRawUnsafeMock).toHaveBeenCalledTimes(1); + expect(h.updateManyMock).toHaveBeenCalledTimes(2); + expect(h.updateMock).not.toHaveBeenCalled(); + } finally { + warnSpy.mockRestore(); + } + }); + + it("default candidate query excludes rows already tagged 'koios-malformed' (no head-stuck)", async () => { + const h = await loadHarness(); + h.findManyMock.mockResolvedValue([]); + + await h.backfillAmountAtSwitch(); + + expect(h.findManyMock).toHaveBeenCalledTimes(1); + const args = h.findManyMock.mock.calls[0][0]; + // Default excludeMalformed=true should add a clause that filters out koios-malformed. + expect(JSON.stringify(args.where)).toContain("koios-malformed"); + }); + + it("excludeMalformed=false includes koios-malformed rows so an operator-driven retry path can drain them", async () => { + const h = await loadHarness(); + h.findManyMock.mockResolvedValue([]); + + await h.backfillAmountAtSwitch({ excludeMalformed: false }); + + const args = h.findManyMock.mock.calls[0][0]; + // The exclusion clause should NOT be present. + expect(JSON.stringify(args.where)).not.toContain("koios-malformed"); + }); + + it("returns an empty/zero result without any updates when there are no candidates", async () => { + const h = await loadHarness(); + h.findManyMock.mockResolvedValue([]); + + const result = await h.backfillAmountAtSwitch(); + + expect(result).toEqual({ + durationMs: expect.any(Number), + rowsScanned: 0, + rowsUpdated: 0, + rowsUnknown: 0, + rowsMalformed: 0, + epochsProcessed: 0, + epochSpan: null, + }); + expect(h.updateMock).not.toHaveBeenCalled(); + expect(h.getAccountHistoryBatchMock).not.toHaveBeenCalled(); + }); +}); diff --git a/test/query-params.test.ts b/test/query-params.test.ts new file mode 100644 index 0000000..324307d --- /dev/null +++ b/test/query-params.test.ts @@ -0,0 +1,136 @@ +/** + * Tests for the strict integer query-param parser introduced for I4. + * Guards: NaN/silent-default avoidance, range bounds, opt vs required + * presence, and clear 400 messages. + */ + +import { + parseIntegerQuery, + parseIntegerQueryOpt, +} from "../src/utils/query-params"; + +describe("parseIntegerQuery", () => { + it("accepts a clean integer string within range", () => { + expect(parseIntegerQuery("42", "x", { min: 0, max: 100 })).toEqual({ + ok: true, + value: 42, + }); + }); + + it("accepts a leading-sign integer string", () => { + expect(parseIntegerQuery("-5", "x", { min: -10 })).toEqual({ + ok: true, + value: -5, + }); + }); + + it("returns the default when the value is omitted", () => { + expect(parseIntegerQuery(undefined, "x", { default: 7 })).toEqual({ + ok: true, + value: 7, + }); + }); + + it("returns the default when the value is empty string", () => { + expect(parseIntegerQuery("", "x", { default: 9 })).toEqual({ + ok: true, + value: 9, + }); + }); + + it("rejects a non-integer string with 400 (was a NaN→default landmine before)", () => { + const r = parseIntegerQuery("abc", "epochStart", { default: 0 }); + expect(r.ok).toBe(false); + if (!r.ok) { + expect(r.status).toBe(400); + expect(r.message).toContain("epochStart"); + } + }); + + it("rejects floats — '3.14' would silently become 3 with parseInt", () => { + const r = parseIntegerQuery("3.14", "x", { default: 0 }); + expect(r.ok).toBe(false); + }); + + it("rejects exponential notation — '1e5' would silently become 1 with parseInt", () => { + const r = parseIntegerQuery("1e5", "x", { default: 0 }); + expect(r.ok).toBe(false); + }); + + it("rejects whitespace-padded values that lead with non-digits", () => { + const r = parseIntegerQuery(" abc12 ", "x", { default: 0 }); + expect(r.ok).toBe(false); + }); + + it("rejects values below min", () => { + const r = parseIntegerQuery("-1", "x", { min: 0, default: 0 }); + expect(r.ok).toBe(false); + if (!r.ok) expect(r.message).toContain("≥ 0"); + }); + + it("rejects values above max", () => { + const r = parseIntegerQuery("9999", "x", { max: 100, default: 0 }); + expect(r.ok).toBe(false); + if (!r.ok) expect(r.message).toContain("≤ 100"); + }); + + it("rejects a missing param when no default is provided", () => { + const r = parseIntegerQuery(undefined, "x"); + expect(r.ok).toBe(false); + }); + + it("rejects array-shaped values (Express duplicate query keys)", () => { + const r = parseIntegerQuery(["1", "2"], "x", { default: 0 }); + expect(r.ok).toBe(false); + }); +}); + +describe("parseIntegerQueryOpt", () => { + it("returns undefined for missing values", () => { + expect(parseIntegerQueryOpt(undefined, "x")).toEqual({ + ok: true, + value: undefined, + }); + }); + + it("rejects garbage even when optional", () => { + const r = parseIntegerQueryOpt("abc", "x"); + expect(r.ok).toBe(false); + }); + + it("validates ranges when value is provided", () => { + const r = parseIntegerQueryOpt("-1", "x", { min: 0 }); + expect(r.ok).toBe(false); + }); + + it("returns parsed value for valid input", () => { + expect(parseIntegerQueryOpt("42", "x", { min: 0 })).toEqual({ + ok: true, + value: 42, + }); + }); +}); + +describe("toAdaString (precision-preserving conversion)", () => { + it("formats small values with 6 decimals", async () => { + const { toAdaString } = await import("../src/responses/lovelace"); + expect(toAdaString(1_234_567_890n)).toBe("1234.567890"); + expect(toAdaString(1n)).toBe("0.000001"); + expect(toAdaString(0n)).toBe("0.000000"); + }); + + it("preserves precision above Number.MAX_SAFE_INTEGER (≥ 9 PADA)", async () => { + const { toAdaString } = await import("../src/responses/lovelace"); + // 100 PADA = 100 * 1e15 lovelace = 1e17. Number(1e17) loses the ones digit; + // the BigInt path must keep it intact. + const oneHundredPada = 100_000_000_000_000_000n; // 1e17 lovelace + expect(toAdaString(oneHundredPada)).toBe("100000000000.000000"); + // Add one lovelace: 1e17 + 1 — the ".000001" tail must still appear. + expect(toAdaString(oneHundredPada + 1n)).toBe("100000000000.000001"); + }); + + it("handles negative values", async () => { + const { toAdaString } = await import("../src/responses/lovelace"); + expect(toAdaString(-1_234_567n)).toBe("-1.234567"); + }); +}); diff --git a/test/snapshot-cache-etag.test.ts b/test/snapshot-cache-etag.test.ts new file mode 100644 index 0000000..973827b --- /dev/null +++ b/test/snapshot-cache-etag.test.ts @@ -0,0 +1,148 @@ +/** + * Tests for writeCachedSnapshot — guards the C6 invariants: + * - Identical payloads produce identical ETags (deterministic gzip + sha1). + * - Different payloads produce different ETags. + * - The persisted body is gzipped and round-trips back to the source JSON. + * - The isFinal flag is mirrored into the cached row so readers can detect + * the "wrote-as-mutable, now-actually-final" mismatch and recompose. + */ + +import { gunzipSync } from "node:zlib"; + +interface SnapshotCacheRow { + cacheKey: string; + bodyGzip: Buffer; + contentEncoding: string; + generatedAt: Date; + schemaVersion: string; + isFinal: boolean; + byteSize: number; + etag: string; +} + +function setupHarness() { + jest.resetModules(); + + // Tiny in-memory SnapshotCache table backed by a Map. + const store = new Map(); + + const upsert = jest.fn(async ({ where, update, create }: { + where: { cacheKey: string }; + update: Partial; + create: SnapshotCacheRow; + }) => { + const existing = store.get(where.cacheKey); + if (existing) { + const merged = { ...existing, ...update }; + store.set(where.cacheKey, merged); + return merged; + } + store.set(create.cacheKey, create); + return create; + }); + + jest.doMock("../src/services/prisma", () => ({ + prisma: { + snapshotCache: { upsert }, + epochTotals: { aggregate: jest.fn() }, + proposal: { aggregate: jest.fn() }, + drep: { count: jest.fn() }, + }, + })); + + // No L1 cache invalidation noise in tests. + jest.doMock("../src/services/cache", () => ({ + cacheGet: jest.fn(), + cacheSet: jest.fn(), + cacheInvalidatePrefix: jest.fn(() => 0), + })); + + return { store, upsert }; +} + +describe("writeCachedSnapshot ETag determinism", () => { + it("produces identical etag for identical payloads", async () => { + const { upsert } = setupHarness(); + const { writeCachedSnapshot } = await import( + "../src/services/snapshot.service" + ); + + const payload = { schemaVersion: "v1", n: 42, items: [1, 2, 3] }; + const a = await writeCachedSnapshot("k:a", payload, { isFinal: false }); + const b = await writeCachedSnapshot("k:b", payload, { isFinal: false }); + + expect(a.etag).toBe(b.etag); + expect(upsert).toHaveBeenCalledTimes(2); + }); + + it("produces different etag for different payloads", async () => { + setupHarness(); + const { writeCachedSnapshot } = await import( + "../src/services/snapshot.service" + ); + + const a = await writeCachedSnapshot("k:a", { x: 1 }, { isFinal: false }); + const b = await writeCachedSnapshot("k:b", { x: 2 }, { isFinal: false }); + + expect(a.etag).not.toBe(b.etag); + }); + + it("persists a gzipped body that round-trips back to the source JSON", async () => { + const { store } = setupHarness(); + const { writeCachedSnapshot } = await import( + "../src/services/snapshot.service" + ); + + const payload = { tag: "round-trip", arr: [{ a: 1 }, { a: 2 }] }; + await writeCachedSnapshot("k:roundtrip", payload, { isFinal: true }); + + const row = store.get("k:roundtrip"); + expect(row).toBeDefined(); + expect(row!.byteSize).toBe(row!.bodyGzip.byteLength); + const decoded = JSON.parse(gunzipSync(row!.bodyGzip).toString("utf-8")); + expect(decoded).toEqual(payload); + }); + + it("propagates isFinal into the persisted row so the reader can detect mutable-vs-final mismatch", async () => { + const { store } = setupHarness(); + const { writeCachedSnapshot } = await import( + "../src/services/snapshot.service" + ); + + await writeCachedSnapshot("k:mut", { v: 1 }, { isFinal: false }); + await writeCachedSnapshot("k:fin", { v: 1 }, { isFinal: true }); + + expect(store.get("k:mut")?.isFinal).toBe(false); + expect(store.get("k:fin")?.isFinal).toBe(true); + // ETags are still equal across the two writes — finality does not feed the body. + expect(store.get("k:mut")?.etag).toBe(store.get("k:fin")?.etag); + }); + + it("re-upserting the same key updates body, etag, byteSize, and isFinal", async () => { + const { store } = setupHarness(); + const { writeCachedSnapshot } = await import( + "../src/services/snapshot.service" + ); + + const first = await writeCachedSnapshot("k:overwrite", { v: 1 }, { isFinal: false }); + const second = await writeCachedSnapshot("k:overwrite", { v: 2 }, { isFinal: true }); + + expect(first.etag).not.toBe(second.etag); + const row = store.get("k:overwrite"); + expect(row?.etag).toBe(second.etag); + expect(row?.isFinal).toBe(true); + const decoded = JSON.parse(gunzipSync(row!.bodyGzip).toString("utf-8")); + expect(decoded).toEqual({ v: 2 }); + }); + + it("persists contentEncoding='gzip' so future encodings can be added without another column rename", async () => { + const { store } = setupHarness(); + const { writeCachedSnapshot } = await import( + "../src/services/snapshot.service" + ); + + await writeCachedSnapshot("k:enc", { v: 3 }, { isFinal: false }); + const row = store.get("k:enc"); + expect(row?.contentEncoding).toBe("gzip"); + }); +}); diff --git a/test/snapshot-compose.test.ts b/test/snapshot-compose.test.ts new file mode 100644 index 0000000..14cbea9 --- /dev/null +++ b/test/snapshot-compose.test.ts @@ -0,0 +1,362 @@ +/** + * Tests for composeChunk — guards the C6 invariants: + * - chunkStartFor math: arbitrary epoch in the chunk maps to its [chunkStart, chunkStart+49] range. + * - "Latest vote wins": when a DRep voted multiple times on the same proposal, + * the LAST entry (votedAt asc, id asc) overwrites earlier ones. + * - MIGRATIONS sentinel filter (drep_always_*) is applied via the Prisma `notIn` clauses + * passed to migrationAggregate.findMany. + * - isStable requires BOTH "no NULL amount_at_switch rows" AND + * "delegation full-scan watermark set"; either alone leaves isStable=false. + */ + +const SENTINELS = ["drep_always_abstain", "drep_always_no_confidence"]; + +function setupHarness(opts: { + proposals?: Array<{ proposalId: string; title: string; governanceActionType: string | null; submissionEpoch: number | null }>; + votes?: Array<{ drepId: string | null; proposalId: string; vote: string | null }>; + migrations?: Array<{ epoch: number; fromDrepId: string; toDrepId: string; adaLovelace: bigint; delegators: number }>; + currentEpoch?: number; + unstableCount?: number; + fullScanCompleted?: boolean; +}) { + jest.resetModules(); + + const proposalFindMany = jest.fn().mockResolvedValue(opts.proposals ?? []); + const voteFindMany = jest.fn().mockResolvedValue(opts.votes ?? []); + const migrationFindMany = jest.fn().mockResolvedValue(opts.migrations ?? []); + const aggregateMock = jest + .fn() + .mockResolvedValue({ _max: { epoch: opts.currentEpoch ?? 700 } }); + const queryRaw = jest + .fn() + .mockResolvedValue([{ unstable: BigInt(opts.unstableCount ?? 0) }]); + const findUniqueCheckpoint = jest.fn().mockResolvedValue({ + lastFullAllDrepsScanAt: opts.fullScanCompleted === false ? null : new Date(), + }); + + jest.doMock("../src/services/prisma", () => ({ + prisma: { + proposal: { findMany: proposalFindMany, aggregate: jest.fn() }, + onchainVote: { findMany: voteFindMany }, + migrationAggregate: { findMany: migrationFindMany }, + epochTotals: { aggregate: aggregateMock }, + delegationSyncCheckpoint: { findUnique: findUniqueCheckpoint }, + $queryRaw: queryRaw, + }, + })); + jest.doMock("../src/services/cache", () => ({ + cacheGet: jest.fn(), + cacheSet: jest.fn(), + cacheInvalidatePrefix: jest.fn(() => 0), + })); + + return { + proposalFindMany, + voteFindMany, + migrationFindMany, + aggregateMock, + queryRaw, + findUniqueCheckpoint, + }; +} + +describe("composeChunk", () => { + it("maps an arbitrary epoch to its 50-wide chunk window", async () => { + setupHarness({}); + const { composeChunk } = await import("../src/services/snapshot.service"); + const chunk = await composeChunk(627); + expect(chunk.epochStart).toBe(600); + expect(chunk.epochEnd).toBe(649); + }); + + it("aligns chunk window when called on a chunk-boundary epoch", async () => { + setupHarness({}); + const { composeChunk } = await import("../src/services/snapshot.service"); + const chunk = await composeChunk(650); + expect(chunk.epochStart).toBe(650); + expect(chunk.epochEnd).toBe(699); + }); + + it("LAST vote (by votedAt asc, id asc) wins when a DRep voted twice on the same proposal", async () => { + // The mock returns rows in the order the loop will iterate. Because the + // composer overwrites bucket[proposalId] = v, the LAST entry should win. + const h = setupHarness({ + votes: [ + { drepId: "drep-a", proposalId: "p1", vote: "no" }, // earlier + { drepId: "drep-a", proposalId: "p1", vote: "yes" }, // later — wins + { drepId: "drep-b", proposalId: "p1", vote: "abstain" }, + ], + }); + const { composeChunk } = await import("../src/services/snapshot.service"); + const chunk = await composeChunk(600); + expect(chunk.votes["drep-a"]).toEqual({ p1: "yes" }); + expect(chunk.votes["drep-b"]).toEqual({ p1: "abstain" }); + + // The "last wins" guarantee is only meaningful if the production query + // imposes the deterministic [votedAt asc, id asc] ordering on the iterated + // rows. Pin that contract here so a regression that drops the orderBy is + // caught by the test rather than only at integration time. + expect(h.voteFindMany).toHaveBeenCalledTimes(1); + const orderBy = h.voteFindMany.mock.calls[0][0].orderBy; + expect(orderBy).toEqual([{ votedAt: "asc" }, { id: "asc" }]); + }); + + it("drops votes with missing drepId or vote", async () => { + setupHarness({ + votes: [ + { drepId: null, proposalId: "p1", vote: "yes" }, + { drepId: "drep-x", proposalId: "p1", vote: null }, + { drepId: "drep-y", proposalId: "p1", vote: "yes" }, + ], + }); + const { composeChunk } = await import("../src/services/snapshot.service"); + const chunk = await composeChunk(600); + expect(chunk.votes).toEqual({ "drep-y": { p1: "yes" } }); + }); + + it("requests MIGRATIONS with a sentinel exclusion filter on both sides", async () => { + const h = setupHarness({}); + const { composeChunk } = await import("../src/services/snapshot.service"); + await composeChunk(600); + + expect(h.migrationFindMany).toHaveBeenCalledTimes(1); + const where = h.migrationFindMany.mock.calls[0][0].where; + expect(where.fromDrepId.notIn).toEqual(SENTINELS); + expect(where.toDrepId.notIn).toEqual(SENTINELS); + }); + + it("isStable=true only when isFinal AND no unstable rows AND full-scan watermark is set", async () => { + const h = setupHarness({ + currentEpoch: 700, // chunk 600..649 is final + unstableCount: 0, + fullScanCompleted: true, + }); + const { composeChunk } = await import("../src/services/snapshot.service"); + const chunk = await composeChunk(600); + expect(chunk.isFinal).toBe(true); + expect(chunk.isStable).toBe(true); + + // The unstable-count SQL must: + // (i) key off `amount_at_switch IS NULL` (post-C2 invariant — `amount_source IS NULL` + // would silently mark koios-malformed rows as stable) + // (ii) exclude both `drep_always_*` sentinels on both from/to columns + // (C3 — without this, a sentinel-targeted unbackfilled row would pin + // isStable=false forever despite never appearing in the chunk payload). + // The Prisma tagged-template form passes a TemplateStringsArray as args[0] + // — concatenating it gives us the SQL skeleton independent of param values. + expect(h.queryRaw).toHaveBeenCalledTimes(1); + const templateParts = h.queryRaw.mock.calls[0][0] as ReadonlyArray; + const sql = templateParts.join(" ? "); + expect(sql).toContain("amount_at_switch"); + expect(sql).toContain("IS NULL"); + expect(sql).toContain("drep_always_abstain"); + expect(sql).toContain("drep_always_no_confidence"); + // Sanity guard against accidental regression to the old amount_source predicate + // (which would mark malformed rows as stable). + expect(sql).not.toMatch(/"amount_source"\s+IS\s+NULL/); + }); + + it("isStable=false when delegation full-scan watermark is missing", async () => { + setupHarness({ + currentEpoch: 700, + unstableCount: 0, + fullScanCompleted: false, + }); + const { composeChunk } = await import("../src/services/snapshot.service"); + const chunk = await composeChunk(600); + expect(chunk.isFinal).toBe(true); + expect(chunk.isStable).toBe(false); + }); + + it("isStable=false when there are still rows with NULL amount_at_switch", async () => { + setupHarness({ + currentEpoch: 700, + unstableCount: 5, + fullScanCompleted: true, + }); + const { composeChunk } = await import("../src/services/snapshot.service"); + const chunk = await composeChunk(600); + expect(chunk.isFinal).toBe(true); + expect(chunk.isStable).toBe(false); + }); + + it("isStable=false (and isFinal=false) for the current chunk", async () => { + setupHarness({ + currentEpoch: 620, // chunk 600..649 is the CURRENT chunk + unstableCount: 0, + fullScanCompleted: true, + }); + const { composeChunk } = await import("../src/services/snapshot.service"); + const chunk = await composeChunk(600); + expect(chunk.isFinal).toBe(false); + expect(chunk.isStable).toBe(false); + }); +}); + +/** + * snapshotService.getChunk — verifies the L2 finality-mismatch invariant: + * if the SnapshotCache row was written when the chunk was still mutable + * (isFinal=false) but the current epoch has since advanced past the chunk's + * end (so isFinal should now be true), readOrCompose MUST treat the L2 row + * as a miss and recompose, writing back a row with the corrected flag. + * + * Without this re-check, the cached body would stay pinned under final-TTL + * semantics across an epoch boundary, serving partial mutable data with + * immutable cache headers. + */ +describe("snapshotService.getChunk — L2 finality-mismatch round-trip", () => { + function setupRoundTripHarness() { + jest.resetModules(); + + const findUniqueSnapshotCache = jest.fn(); + const upsertSnapshotCache = jest.fn(async (args: { create: unknown; update: unknown }) => args.create as unknown); + // Fire-and-forget update of `lastAccessedAt` returns a promise the production + // code chains `.catch` on; mock must return a thenable so the chain doesn't + // throw on .catch of undefined. + const updateSnapshotCache = jest.fn().mockResolvedValue(undefined); + const aggregateMock = jest.fn().mockResolvedValue({ _max: { epoch: 700 } }); + const proposalFindMany = jest.fn().mockResolvedValue([]); + const voteFindMany = jest.fn().mockResolvedValue([]); + const migrationFindMany = jest.fn().mockResolvedValue([]); + const queryRaw = jest.fn().mockResolvedValue([{ unstable: 0n }]); + const findUniqueCheckpoint = jest + .fn() + .mockResolvedValue({ lastFullAllDrepsScanAt: new Date() }); + + jest.doMock("../src/services/prisma", () => ({ + prisma: { + proposal: { findMany: proposalFindMany, aggregate: jest.fn() }, + onchainVote: { findMany: voteFindMany }, + migrationAggregate: { findMany: migrationFindMany }, + epochTotals: { aggregate: aggregateMock }, + delegationSyncCheckpoint: { findUnique: findUniqueCheckpoint }, + snapshotCache: { + findUnique: findUniqueSnapshotCache, + upsert: upsertSnapshotCache, + update: updateSnapshotCache, + }, + $queryRaw: queryRaw, + }, + })); + jest.doMock("../src/services/cache", () => ({ + cacheGet: jest.fn().mockReturnValue(undefined), + cacheSet: jest.fn(), + cacheInvalidatePrefix: jest.fn(() => 0), + })); + + return { + findUniqueSnapshotCache, + upsertSnapshotCache, + updateSnapshotCache, + proposalFindMany, + }; + } + + it("treats an L2 row whose isFinal disagrees with the current tip as a miss and recomposes", async () => { + const h = setupRoundTripHarness(); + + // Pretend a stale row exists for chunk 600..649 with isFinal=false even + // though current epoch is 700 (so the chunk is in fact final). + const { gzipSync } = await import("node:zlib"); + const staleBody = gzipSync(Buffer.from("{}")); + h.findUniqueSnapshotCache.mockResolvedValue({ + cacheKey: "snapshot:v1:chunk:600", + bodyGzip: staleBody, + contentEncoding: "gzip", + generatedAt: new Date(Date.now() - 60_000), + schemaVersion: "v1", + isFinal: false, // ← mismatch + byteSize: staleBody.byteLength, + etag: "stale-etag", + }); + + const { snapshotService } = await import("../src/services/snapshot.service"); + const result = await snapshotService.getChunk(600); + + // The reader must NOT serve the stale body. It must recompose (which + // hits the proposal/vote/migration mocks) and upsert a fresh row. + expect(result.etag).not.toBe("stale-etag"); + expect(result.isFinal).toBe(true); + expect(h.proposalFindMany).toHaveBeenCalled(); + expect(h.upsertSnapshotCache).toHaveBeenCalled(); + }); + + it("self-heals a poisoned L2 row (corrupted gzip body) by deleting it and recomposing", async () => { + const h = setupRoundTripHarness(); + const deleteSnapshotCache = jest.fn().mockResolvedValue({ count: 1 }); + const deleteManySnapshotCache = jest.fn().mockResolvedValue({ count: 1 }); + // Wire deleteSnapshotCache into the mocked prisma — needs re-doMock since + // the harness already mocked it without delete. + jest.resetModules(); + const findUniqueSnapshotCache = jest.fn(); + const upsertSnapshotCache = jest.fn(async (args: { create: unknown }) => args.create); + const updateSnapshotCache = jest.fn().mockResolvedValue(undefined); + const aggregateMock = jest.fn().mockResolvedValue({ _max: { epoch: 700 } }); + const proposalFindMany = jest.fn().mockResolvedValue([]); + const voteFindMany = jest.fn().mockResolvedValue([]); + const migrationFindMany = jest.fn().mockResolvedValue([]); + const queryRaw = jest.fn().mockResolvedValue([{ unstable: 0n }]); + const findUniqueCheckpoint = jest + .fn() + .mockResolvedValue({ lastFullAllDrepsScanAt: new Date() }); + + jest.doMock("../src/services/prisma", () => ({ + prisma: { + proposal: { findMany: proposalFindMany, aggregate: jest.fn() }, + onchainVote: { findMany: voteFindMany }, + migrationAggregate: { findMany: migrationFindMany }, + epochTotals: { aggregate: aggregateMock }, + delegationSyncCheckpoint: { findUnique: findUniqueCheckpoint }, + snapshotCache: { + findUnique: findUniqueSnapshotCache, + upsert: upsertSnapshotCache, + update: updateSnapshotCache, + delete: deleteSnapshotCache, + deleteMany: deleteManySnapshotCache, + }, + $queryRaw: queryRaw, + }, + })); + jest.doMock("../src/services/cache", () => ({ + cacheGet: jest.fn().mockReturnValue(undefined), + cacheSet: jest.fn(), + cacheInvalidatePrefix: jest.fn(() => 0), + })); + + // Body is corrupted (not valid gzip). The reader must NOT 500 — it must + // catch the gunzip error, delete the poisoned row, and recompose. + const corruptBody = Buffer.from("not-actually-gzip"); + findUniqueSnapshotCache.mockResolvedValue({ + cacheKey: "snapshot:v1:chunk:600", + bodyGzip: corruptBody, + contentEncoding: "gzip", + generatedAt: new Date(), + schemaVersion: "v1", + isFinal: true, // matches current tip — so self-heal must trigger via the gunzip throw, not the finality check + byteSize: corruptBody.byteLength, + etag: "poisoned-etag", + }); + + const errorSpy = jest.spyOn(console, "error").mockImplementation(() => {}); + try { + const { snapshotService } = await import("../src/services/snapshot.service"); + const result = await snapshotService.getChunk(600); + + expect(result.etag).not.toBe("poisoned-etag"); + // Conditional delete: scope to (cacheKey, etag) so we don't wipe a fresh + // row another instance just wrote between our read and our self-heal. + expect(deleteManySnapshotCache).toHaveBeenCalledWith({ + where: { + cacheKey: "snapshot:v1:chunk:600", + etag: "poisoned-etag", + }, + }); + expect(upsertSnapshotCache).toHaveBeenCalled(); + expect(errorSpy).toHaveBeenCalled(); + } finally { + errorSpy.mockRestore(); + } + // Silence the lint about unused harness for this isolated test: + void h; + }); +});