From 352531410dcbc7cb8182e4fe9d04d5bf5b2c0ac5 Mon Sep 17 00:00:00 2001 From: king-aj-the-first Date: Sat, 27 Jun 2026 14:45:51 +0100 Subject: [PATCH] feat(backend): scheduled reconciliation, manifest persistence, resumable backfill, and SLO metrics --- backend/prisma/dev.db | Bin 389120 -> 454656 bytes .../migration.sql | 60 ++++ backend/prisma/schema.prisma | 54 ++++ backend/src/__tests__/exportManifest.test.ts | 99 ++++++ .../__tests__/ledgerReconciliationJob.test.ts | 163 ++++++++++ backend/src/__tests__/sloMetrics.test.ts | 68 +++++ .../transactionBackfill.persistence.test.ts | 126 ++++++++ backend/src/bulkExportJobs.ts | 10 + backend/src/diagnosticsBundle.ts | 10 + backend/src/exportManifest.ts | 206 ++++++++++++- backend/src/index.ts | 77 ++++- backend/src/latencyMonitoring.ts | 29 +- backend/src/metrics.ts | 67 +++++ backend/src/positionReconciliationJob.ts | 140 ++++++++- backend/src/reconciliationReport.ts | 167 ++++++++--- backend/src/transactionBackfill.ts | 281 ++++++++++++++---- docs/MONITORING_OBSERVABILITY.md | 31 ++ node_modules/.bin/husky | 16 - node_modules/.bin/husky.cmd | 17 -- node_modules/.bin/husky.ps1 | 28 -- node_modules/.package-lock.json | 23 -- node_modules/husky/LICENSE | 21 -- node_modules/husky/README.md | 1 - node_modules/husky/bin.js | 26 -- node_modules/husky/husky | 22 -- node_modules/husky/index.d.ts | 1 - node_modules/husky/index.js | 25 -- node_modules/husky/package.json | 25 -- package-lock.json | 30 -- 29 files changed, 1457 insertions(+), 366 deletions(-) create mode 100644 backend/prisma/migrations/20260627120000_add_manifest_reconciliation_backfill/migration.sql create mode 100644 backend/src/__tests__/exportManifest.test.ts create mode 100644 backend/src/__tests__/ledgerReconciliationJob.test.ts create mode 100644 backend/src/__tests__/sloMetrics.test.ts create mode 100644 backend/src/__tests__/transactionBackfill.persistence.test.ts delete mode 100644 node_modules/.bin/husky delete mode 100644 node_modules/.bin/husky.cmd delete mode 100644 node_modules/.bin/husky.ps1 delete mode 100644 node_modules/.package-lock.json delete mode 100644 node_modules/husky/LICENSE delete mode 100644 node_modules/husky/README.md delete mode 100644 node_modules/husky/bin.js delete mode 100644 node_modules/husky/husky delete mode 100644 node_modules/husky/index.d.ts delete mode 100644 node_modules/husky/index.js delete mode 100644 node_modules/husky/package.json diff --git a/backend/prisma/dev.db b/backend/prisma/dev.db index 8e3073b6faa7e8c2aca12258f9479f04c884ef75..0931ad02ed2f692552768a33be4b93ef9c3791d9 100644 GIT binary patch delta 3430 zcmds3U2qds6yBTN?53&tp;)#ps7tik%p^6NCfP(sv6P5~k}9N%0xtV=w+(HRcC(dM z8ADJ9bVex?${iek7#@7_1&47mjt{_t`smO2;Da-wqk{-K;sXybj^1q2CZSu0XLpjD z?A`C4^WAgL`OY4^$R517c7E-uUkHN8;qT%l9Sihz{zCM=aKw(MD9_c(1I2UGbKP^z zbM>QH!?PAo3_Ohoeghu50(j_H?ZTti{zr-N-bK;x+5b4pdq1}qib1b)r0BBe3BukI zjfBH-K?nsUSqTRtP>BZ(U{y9>f=L^uoUAXF;z+=rVWGf>L--OURTU?t zY(|55L)4*?%PN_PjAUeTSy7ghaV;}3VXoX*^m$Cedn$wlsK^l|1cUKtObSL+K?%xn zJg8wM5z{m^3ZWK@3o%WFN+g`%<)|bHyd*#v)k08H_-7#A8ZT=qpNJ*;R3V`X8YYP+ z;;I_f5?T}rreC4TI23LmoF9{PC{F1a1&aBsG?gFA8F>@UogH8=INH3fX~K2Y{irA6 zZg)+)=Us1~ZR2*ip6($>!0p7^TLIqA9z7Lc=}Gp!SFQxhSAw8ZCfmU$hnaa$X69{w z=jb+YiLjkpaFF*f2f!t;hIn%!MD9T^1*zA`qNg8y2*|sU0LU}wAshJ}{Zx=<;RbZg zPIjO#Tx1(M-%2{rd&j`Kg#I3;nnstV91S0DAAX#U$X9 zvr{?USiIkR=;h}a*6#TS{C^bF24 zsFzmR*4TB3JuUo6u0x3=aGmvM+m(1Ewz!ESpH6r-7} z3TK~z@>ni6z6qjF{nUo?m-k{j=s$TTw>+x0n z`TRw{%COI3cH91o-6+gLT{58BZ>&slpC!dYO;Hwoi?`TXB5KXE0gcHL3d0WgL2OT(dEG*`A6SPX8zMz7~0a~<%z0xwnZpL2yR$k3K zHqvubu-R|L%vf_o(C~3U*GV+5T!BU%go%(6!zMA2Zfhd7@>3unQE1bg;)#=cUA*haGz2a@1EikLr?;NsZC}TS^ zFP1{KE|03uZJ){dxQ)ZZL&^RWx_lA@NPS8{7fyms^vx{b(4{%x;;QR?++Zr5+??DV zaB}$aP3gH&9p>{}@@P6hb)a|dr{?LcBF%~kvlo=!ZEYraHEqUQgnmghBF9_}lAt*@YKz0_*d{UGHj|J6&m0h;EhHzIOC;?6Q zskabrj$h+`lxgfo*2yGRbGlN+)_`0?HRFC{4QjAbt!WG^Skx(&+gc6zi}#K(>-i@p=*%b&n>k5HV#{gTfUS8<{i%ziYb&cLe%<4Vd+UBGcrNXVEwXBI- z8*A+0>$NJo&Hoj%s<4UUaIVSVX!SiGtK~$E-biZjsgWdBDf3OE3A}_A>(-V|*7{(p QK%f8gsmD&A37H@6Umf^wn*aa+ delta 411 zcmZp8Al>jle1f##bp{58cp&xzVn!einW$qdd7VM8tb-NEW9I(|BpICffAfFi|H%4k zV?_zSbbM}RdJ$06&=E*T=>iD}VO}7~$L>6tT|joTpn*L5W@qt(9DIDdhZ%U^@Y(U3 z@QLzG*es?H%{$paAdE$WIh1j7fVfKYYu)Xybr~g-n8g^=+Sw;CZfBps^f#YHm{E*j zDi^an=XoY8#ubcWrM6QIm}94Z&|s09K7pAzd;42e78S>^iK>EGJk3n9no2GyLY<&1uL`$REvjh>scQ z{3f6em?t_)GB>jNOuw(fB0D`oo2_g61SOU$OxrWmS;Sdc_*fXcr^jfsNl#BuV&R?6 zr^CiC?B^Wh>KNjx;OXb$8lj*R-pC#{JzASh2Cii8_Ie$bM&_x#%$+PiYxYh5sLHZy dyOb829pf?smIZ7;F(7DXyTG!Y?E>pBc>oR2X{`VN diff --git a/backend/prisma/migrations/20260627120000_add_manifest_reconciliation_backfill/migration.sql b/backend/prisma/migrations/20260627120000_add_manifest_reconciliation_backfill/migration.sql new file mode 100644 index 00000000..d32d19cf --- /dev/null +++ b/backend/prisma/migrations/20260627120000_add_manifest_reconciliation_backfill/migration.sql @@ -0,0 +1,60 @@ +-- CreateTable +CREATE TABLE "ExportManifest" ( + "id" TEXT NOT NULL PRIMARY KEY, + "requester" TEXT NOT NULL, + "reportType" TEXT NOT NULL, + "filters" TEXT NOT NULL, + "checksum" TEXT NOT NULL, + "generatedAt" DATETIME NOT NULL, + "fileName" TEXT NOT NULL, + "rowCount" INTEGER NOT NULL, + "bulkExportJobId" TEXT, + "artifactId" TEXT +); + +-- CreateTable +CREATE TABLE "ReconciliationSnapshot" ( + "id" TEXT NOT NULL PRIMARY KEY, + "generatedAt" DATETIME NOT NULL, + "traceId" TEXT, + "status" TEXT NOT NULL, + "windowFrom" DATETIME NOT NULL, + "windowTo" DATETIME NOT NULL, + "summaryJson" TEXT NOT NULL, + "driftCount" INTEGER NOT NULL +); + +-- CreateTable +CREATE TABLE "TransactionBackfillJob" ( + "id" TEXT NOT NULL PRIMARY KEY, + "jobKey" TEXT NOT NULL, + "startLedger" INTEGER NOT NULL, + "endLedger" INTEGER NOT NULL, + "batchSize" INTEGER NOT NULL, + "dryRun" BOOLEAN NOT NULL, + "status" TEXT NOT NULL, + "rpcUrl" TEXT NOT NULL, + "contractId" TEXT NOT NULL, + "lastProcessedLedger" INTEGER, + "progressJson" TEXT NOT NULL, + "errorMessage" TEXT, + "createdAt" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" DATETIME NOT NULL, + "completedAt" DATETIME +); + +-- CreateIndex +CREATE INDEX "ExportManifest_generatedAt_idx" ON "ExportManifest"("generatedAt"); +CREATE INDEX "ExportManifest_requester_idx" ON "ExportManifest"("requester"); +CREATE INDEX "ExportManifest_reportType_idx" ON "ExportManifest"("reportType"); +CREATE INDEX "ExportManifest_checksum_idx" ON "ExportManifest"("checksum"); + +-- CreateIndex +CREATE INDEX "ReconciliationSnapshot_generatedAt_idx" ON "ReconciliationSnapshot"("generatedAt"); +CREATE INDEX "ReconciliationSnapshot_status_idx" ON "ReconciliationSnapshot"("status"); + +-- CreateIndex +CREATE UNIQUE INDEX "TransactionBackfillJob_jobKey_key" ON "TransactionBackfillJob"("jobKey"); +CREATE INDEX "TransactionBackfillJob_status_idx" ON "TransactionBackfillJob"("status"); +CREATE INDEX "TransactionBackfillJob_createdAt_idx" ON "TransactionBackfillJob"("createdAt"); +CREATE INDEX "TransactionBackfillJob_dryRun_idx" ON "TransactionBackfillJob"("dryRun"); diff --git a/backend/prisma/schema.prisma b/backend/prisma/schema.prisma index e1a1468f..1d7babfd 100644 --- a/backend/prisma/schema.prisma +++ b/backend/prisma/schema.prisma @@ -315,3 +315,57 @@ model FeatureFlagOverride { @@index([expiresAt]) @@index([actor]) } + +model ExportManifest { + id String @id + requester String + reportType String + filters String + checksum String + generatedAt DateTime + fileName String + rowCount Int + bulkExportJobId String? + artifactId String? + + @@index([generatedAt]) + @@index([requester]) + @@index([reportType]) + @@index([checksum]) +} + +model ReconciliationSnapshot { + id String @id + generatedAt DateTime + traceId String? + status String + windowFrom DateTime + windowTo DateTime + summaryJson String + driftCount Int + + @@index([generatedAt]) + @@index([status]) +} + +model TransactionBackfillJob { + id String @id + jobKey String @unique + startLedger Int + endLedger Int + batchSize Int + dryRun Boolean + status String + rpcUrl String + contractId String + lastProcessedLedger Int? + progressJson String + errorMessage String? + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + completedAt DateTime? + + @@index([status]) + @@index([createdAt]) + @@index([dryRun]) +} diff --git a/backend/src/__tests__/exportManifest.test.ts b/backend/src/__tests__/exportManifest.test.ts new file mode 100644 index 00000000..251a7855 --- /dev/null +++ b/backend/src/__tests__/exportManifest.test.ts @@ -0,0 +1,99 @@ +import crypto from 'crypto'; +import { + createExportManifest, + verifyExportManifestChecksum, + listExportManifests, + pruneExportManifests, + resetExportManifestsForTests, +} from '../exportManifest'; + +jest.mock('../prismaClient', () => ({ + getPrismaClient: () => ({ + exportManifest: { + create: jest.fn().mockResolvedValue({}), + findMany: jest.fn().mockResolvedValue([]), + findUnique: jest.fn().mockResolvedValue(null), + count: jest.fn().mockResolvedValue(0), + deleteMany: jest.fn().mockResolvedValue({ count: 0 }), + }, + }), +})); + +jest.mock('../middleware/structuredLogging', () => ({ + logger: { log: jest.fn(), configure: jest.fn() }, +})); + +describe('exportManifest persistence', () => { + beforeEach(() => { + resetExportManifestsForTests(); + process.env.EXPORT_MANIFEST_STORAGE = 'memory'; + }); + + it('creates a manifest with deterministic checksum', async () => { + const manifest = await createExportManifest({ + requester: 'admin@test', + reportType: 'transactions', + filters: { status: 'completed' }, + rows: [{ id: '1', amount: '10' }], + }); + + expect(manifest.id).toMatch(/^exp-/); + expect(manifest.checksum).toHaveLength(64); + expect(manifest.rowCount).toBe(1); + }); + + it('verifies checksum match and mismatch without exposing rows', async () => { + const manifest = await createExportManifest({ + requester: 'admin@test', + reportType: 'transactions', + filters: {}, + rows: [{ id: '1' }], + }); + + const match = await verifyExportManifestChecksum(manifest.id, manifest.checksum); + expect(match.match).toBe(true); + expect(match.manifest?.rowCount).toBe(1); + expect((match.manifest as any).rows).toBeUndefined(); + + const mismatch = await verifyExportManifestChecksum(manifest.id, crypto.randomBytes(32).toString('hex')); + expect(mismatch.match).toBe(false); + }); + + it('lists manifests with pagination from memory store', async () => { + await createExportManifest({ + requester: 'a', + reportType: 'transactions', + filters: {}, + rows: [{ id: '1' }], + }); + await createExportManifest({ + requester: 'b', + reportType: 'transactions', + filters: {}, + rows: [{ id: '2' }], + }); + + const page = await listExportManifests(1, 0); + expect(page.total).toBe(2); + expect(page.data).toHaveLength(1); + }); + + it('prunes manifests beyond retention limit', async () => { + process.env.EXPORT_MANIFEST_RETENTION = '2'; + + for (let i = 0; i < 4; i += 1) { + await createExportManifest({ + requester: 'admin', + reportType: 'transactions', + filters: { i }, + rows: [{ id: String(i) }], + }); + } + + const pruned = await pruneExportManifests(); + expect(pruned).toBeGreaterThan(0); + + const remaining = await listExportManifests(10, 0); + expect(remaining.total).toBeLessThanOrEqual(2); + }); +}); diff --git a/backend/src/__tests__/ledgerReconciliationJob.test.ts b/backend/src/__tests__/ledgerReconciliationJob.test.ts new file mode 100644 index 00000000..f447da51 --- /dev/null +++ b/backend/src/__tests__/ledgerReconciliationJob.test.ts @@ -0,0 +1,163 @@ +/** + * Tests for scheduled ledger reconciliation drift detection. + */ + +import { + runReconciliationReport, + reconcile, + resetReconciliationStateForTests, + type LedgerRecord, +} from '../reconciliationReport'; +import { runLedgerReconciliationJob, resetLedgerReconciliationSchedulerForTests } from '../positionReconciliationJob'; +import { + reconciliationDriftTotal, + reconciliationStatus, + reconciliationLastRunTimestamp, + register, +} from '../metrics'; +import { resetJobGovernance } from '../jobGovernance'; + +const mockFindMany = jest.fn().mockResolvedValue([]); +const mockSnapshotCreate = jest.fn().mockResolvedValue({}); + +jest.mock('../prismaClient', () => ({ + getPrismaClient: () => ({ + transaction: { + findMany: (...args: unknown[]) => mockFindMany(...args), + }, + reconciliationSnapshot: { + create: (...args: unknown[]) => mockSnapshotCreate(...args), + }, + }), +})); + +jest.mock('../middleware/structuredLogging', () => ({ + logger: { log: jest.fn(), configure: jest.fn() }, +})); + +describe('runReconciliationReport', () => { + beforeEach(() => { + resetReconciliationStateForTests(); + resetLedgerReconciliationSchedulerForTests(); + resetJobGovernance(); + mockFindMany.mockReset(); + mockFindMany.mockResolvedValue([]); + }); + + it('reports CLEAN when ledger and database records match', async () => { + const records: LedgerRecord[] = [ + { + transactionHash: 'tx-1', + type: 'deposit', + amount: '100', + walletAddress: 'GABC', + timestamp: '2026-06-20T10:00:00Z', + }, + ]; + + mockFindMany.mockResolvedValueOnce([ + { + id: 'tx-1', + type: 'deposit', + amount: '100', + user: 'GABC', + timestamp: new Date('2026-06-20T10:00:00Z'), + }, + ]); + + const report = await runReconciliationReport({ + from: '2026-06-20T00:00:00Z', + to: '2026-06-21T00:00:00Z', + ledgerFetcher: async () => records, + storeAsAutomated: true, + persistSnapshot: false, + }); + + expect(report.status).toBe('CLEAN'); + expect(report.counts.matched).toBe(1); + expect(report.counts.drifted).toBe(0); + }); + + it('detects drift when ledger record is missing in DB', async () => { + const ledgerRecords: LedgerRecord[] = [ + { + transactionHash: 'tx-missing', + type: 'deposit', + amount: '50', + walletAddress: 'GXYZ', + timestamp: '2026-06-20T10:00:00Z', + }, + ]; + + const report = await runReconciliationReport({ + from: '2026-06-20T00:00:00Z', + to: '2026-06-21T00:00:00Z', + ledgerFetcher: async () => ledgerRecords, + storeAsAutomated: true, + persistSnapshot: false, + }); + + expect(report.status).toBe('DRIFT_DETECTED'); + expect(report.driftEntries[0].issue).toBe('MISSING_IN_DB'); + }); +}); + +describe('runLedgerReconciliationJob metrics', () => { + beforeEach(async () => { + resetReconciliationStateForTests(); + resetJobGovernance(); + reconciliationDriftTotal.reset(); + reconciliationStatus.reset(); + reconciliationLastRunTimestamp.reset(); + }); + + it('increments drift counters and sets status gauge on drift', async () => { + const ledgerRecords: LedgerRecord[] = [ + { + transactionHash: 'tx-drift', + type: 'withdrawal', + amount: '10', + walletAddress: 'GDRIFT', + timestamp: '2026-06-20T10:00:00Z', + }, + ]; + + jest.spyOn(require('../reconciliationReport'), 'runReconciliationReport').mockResolvedValue({ + generatedAt: new Date().toISOString(), + traceId: 'trace-1', + window: { from: '2026-06-20T00:00:00Z', to: '2026-06-21T00:00:00Z' }, + counts: { ledgerRecords: 1, databaseRecords: 0, matched: 0, drifted: 1 }, + driftEntries: [{ transactionHash: 'tx-drift', issue: 'MISSING_IN_DB', details: {} }], + status: 'DRIFT_DETECTED', + }); + + await runLedgerReconciliationJob(); + + const metrics = await register.metrics(); + expect(metrics).toContain('reconciliation_drift_total'); + expect(metrics).toContain('reconciliation_status'); + expect(metrics).toContain('reconciliation_last_run_timestamp'); + }); +}); + +describe('reconcile', () => { + it('flags amount mismatches', () => { + const ledger: LedgerRecord[] = [{ + transactionHash: 'tx-1', + type: 'deposit', + amount: '100', + walletAddress: 'GABC', + timestamp: '2026-06-20T10:00:00Z', + }]; + const db: LedgerRecord[] = [{ + transactionHash: 'tx-1', + type: 'deposit', + amount: '99', + walletAddress: 'GABC', + timestamp: '2026-06-20T10:00:00Z', + }]; + + const result = reconcile(ledger, db); + expect(result.driftEntries[0].issue).toBe('AMOUNT_MISMATCH'); + }); +}); diff --git a/backend/src/__tests__/sloMetrics.test.ts b/backend/src/__tests__/sloMetrics.test.ts new file mode 100644 index 00000000..89ea64ee --- /dev/null +++ b/backend/src/__tests__/sloMetrics.test.ts @@ -0,0 +1,68 @@ +import { latencyMonitoringService } from '../latencyMonitoring'; +import { + endpointSloBreachTotal, + register, + syncJobGovernanceMetrics, +} from '../metrics'; + +jest.mock('../middleware/structuredLogging', () => ({ + logger: { log: jest.fn(), configure: jest.fn() }, +})); + +describe('endpoint SLO Prometheus metrics', () => { + beforeEach(() => { + latencyMonitoringService.resetForTests(); + endpointSloBreachTotal.reset(); + process.env.SLO_READ_THRESHOLD_MS = '100'; + process.env.SLO_ALERT_COOLDOWN_MS = '60000'; + process.env.ALERT_TYPE = 'slack'; + delete process.env.SLACK_WEBHOOK_URL; + }); + + afterEach(() => { + latencyMonitoringService.stopMonitoring(); + }); + + it('exports breach gauges and increments counter when latency exceeds budget', async () => { + const endpoint = '/health'; + + for (let i = 0; i < 20; i += 1) { + latencyMonitoringService.recordLatency(endpoint, 250); + } + + latencyMonitoringService.syncSloMetrics(); + syncJobGovernanceMetrics(); + + let metrics = await register.metrics(); + expect(metrics).toContain('backend_slo_breach'); + expect(metrics).toContain('backend_slo_p95_latency_ms'); + expect(metrics).toContain('tier="critical"'); + + const before = (await endpointSloBreachTotal.get()).values.reduce((sum, v) => sum + v.value, 0); + + for (let i = 0; i < 5; i += 1) { + latencyMonitoringService.recordLatency(endpoint, 300); + } + + const after = (await endpointSloBreachTotal.get()).values.reduce((sum, v) => sum + v.value, 0); + expect(after).toBeGreaterThanOrEqual(before); + }); + + it('respects alert cooldown for breach counter increments', async () => { + process.env.SLO_ALERT_COOLDOWN_MS = '3600000'; + const endpoint = '/ready'; + + for (let i = 0; i < 25; i += 1) { + latencyMonitoringService.recordLatency(endpoint, 500); + } + + const first = (await endpointSloBreachTotal.get()).values.reduce((sum, v) => sum + v.value, 0); + + for (let i = 0; i < 25; i += 1) { + latencyMonitoringService.recordLatency(endpoint, 500); + } + + const second = (await endpointSloBreachTotal.get()).values.reduce((sum, v) => sum + v.value, 0); + expect(second).toBe(first); + }); +}); diff --git a/backend/src/__tests__/transactionBackfill.persistence.test.ts b/backend/src/__tests__/transactionBackfill.persistence.test.ts new file mode 100644 index 00000000..39b478e9 --- /dev/null +++ b/backend/src/__tests__/transactionBackfill.persistence.test.ts @@ -0,0 +1,126 @@ +import { + createOrResumeTransactionBackfill, + getTransactionBackfillJob, + resetTransactionBackfillJobsForTests, + MAX_LEDGER_RANGE, +} from '../transactionBackfill'; + +const mockFindMany = jest.fn(); +const mockFindUnique = jest.fn(); +const mockCreate = jest.fn(); +const mockUpsert = jest.fn(); +const mockBackfillFindMany = jest.fn(); + +jest.mock('../prismaClient', () => ({ + getPrismaClient: () => ({ + processedEvent: { + findMany: (...args: unknown[]) => mockFindMany(...args), + findUnique: (...args: unknown[]) => mockFindUnique(...args), + create: (...args: unknown[]) => mockCreate(...args), + upsert: jest.fn(), + }, + transactionBackfillJob: { + findMany: (...args: unknown[]) => mockBackfillFindMany(...args), + upsert: (...args: unknown[]) => mockUpsert(...args), + deleteMany: jest.fn().mockResolvedValue({ count: 0 }), + }, + }), +})); + +jest.mock('../middleware/structuredLogging', () => ({ + logger: { log: jest.fn(), configure: jest.fn() }, +})); + +const mockFetch = jest.fn(); +global.fetch = mockFetch as any; + +describe('transactionBackfill persistence', () => { + beforeEach(() => { + resetTransactionBackfillJobsForTests(); + jest.clearAllMocks(); + mockFindMany.mockResolvedValue([]); + mockBackfillFindMany.mockResolvedValue([]); + mockFindUnique.mockResolvedValue(null); + mockCreate.mockImplementation(async ({ data }: any) => data); + mockUpsert.mockResolvedValue({}); + mockFetch.mockResolvedValue({ + json: async () => ({ result: { events: [] } }), + }); + }); + + const baseInput = { + startLedger: 100, + endLedger: 102, + rpcUrl: 'https://rpc.test', + contractId: 'CONTRACT', + batchSize: 2, + dryRun: false, + }; + + it('persists job on start and completes successfully', async () => { + const job = await createOrResumeTransactionBackfill(baseInput); + + expect(job.status).toBe('completed'); + expect(mockUpsert).toHaveBeenCalled(); + expect(job.progress.scannedLedgers).toBe(3); + }); + + it('dry-run persists job without mutating processed events', async () => { + const job = await createOrResumeTransactionBackfill({ ...baseInput, dryRun: true }); + + expect(job.status).toBe('completed'); + expect(job.dryRun).toBe(true); + expect(mockCreate).not.toHaveBeenCalled(); + }); + + it('resumes from persisted checkpoint after simulated restart', async () => { + const first = await createOrResumeTransactionBackfill(baseInput); + expect(first.status).toBe('completed'); + + resetTransactionBackfillJobsForTests(); + + mockBackfillFindMany.mockResolvedValueOnce([ + { + id: first.id, + jobKey: first.key, + startLedger: first.startLedger, + endLedger: first.endLedger, + batchSize: first.batchSize, + dryRun: first.dryRun, + status: 'running', + rpcUrl: baseInput.rpcUrl, + contractId: baseInput.contractId, + progressJson: JSON.stringify({ + ...first.progress, + scannedLedgers: 1, + }), + errorMessage: null, + createdAt: new Date(first.createdAt), + updatedAt: new Date(first.updatedAt), + lastProcessedLedger: 100, + }, + ]); + + const resumed = await getTransactionBackfillJob(first.id); + expect(resumed?.lastProcessedLedger).toBe(100); + expect(resumed?.status).toBe('running'); + }); + + it('rejects ledger ranges above MAX_LEDGER_RANGE', async () => { + await expect( + createOrResumeTransactionBackfill({ + ...baseInput, + startLedger: 1, + endLedger: MAX_LEDGER_RANGE + 1, + }), + ).rejects.toThrow(`ledger range exceeds maximum of ${MAX_LEDGER_RANGE}`); + }); + + it('marks job failed when RPC fetch throws', async () => { + mockFetch.mockRejectedValueOnce(new Error('rpc unavailable')); + + const job = await createOrResumeTransactionBackfill(baseInput); + expect(job.status).toBe('failed'); + expect(job.error).toContain('rpc unavailable'); + }); +}); diff --git a/backend/src/bulkExportJobs.ts b/backend/src/bulkExportJobs.ts index 7efbdadc..9e7c48e3 100644 --- a/backend/src/bulkExportJobs.ts +++ b/backend/src/bulkExportJobs.ts @@ -1,6 +1,7 @@ import crypto from 'crypto'; import { Prisma } from '@prisma/client'; import { prisma } from './prisma'; +import { createExportManifest } from './exportManifest'; export type BulkExportStatus = 'pending' | 'processing' | 'completed' | 'failed' | 'cancelled'; export type ExportFormat = 'csv' | 'json'; @@ -206,6 +207,15 @@ export async function processBulkExportJob(jobId: string): Promise { const artifact = buildBulkExportArtifact(job.format, allRows); storeBulkExportArtifact(artifact.id, artifact); + await createExportManifest({ + requester: job.generatedBy, + reportType: 'bulk-transactions', + filters: job.filters, + rows: allRows, + bulkExportJobId: jobId, + artifactId: artifact.id, + }); + await updateBulkExportProgress(jobId, { status: 'completed', processedRows, diff --git a/backend/src/diagnosticsBundle.ts b/backend/src/diagnosticsBundle.ts index a6e7bb73..b2406a2d 100644 --- a/backend/src/diagnosticsBundle.ts +++ b/backend/src/diagnosticsBundle.ts @@ -14,6 +14,10 @@ import { sorobanCircuitBreaker } from './circuitBreaker'; import { db } from './database'; import { getPrismaRuntimeConfig } from './prisma'; import { getJobHealthStatus, getJobMetrics } from './jobGovernance'; +import { + getLastAutomatedReconciliationSummary, + getLastAutomatedReconciliationRunAt, +} from './reconciliationReport'; import { logger } from './middleware/structuredLogging'; import { getCurrentTraceId } from './tracing'; @@ -152,6 +156,12 @@ export async function diagnosticsBundleHandler( runtime: getRuntimeInfo(), config: getSanitizedEnvConfig(), dependencies: dependencyStatus, + lastReconciliation: getLastAutomatedReconciliationSummary() + ? { + summary: getLastAutomatedReconciliationSummary(), + lastRunAt: getLastAutomatedReconciliationRunAt(), + } + : null, }; res.status(200).json(bundle); diff --git a/backend/src/exportManifest.ts b/backend/src/exportManifest.ts index 7bd9b138..be7202d7 100644 --- a/backend/src/exportManifest.ts +++ b/backend/src/exportManifest.ts @@ -1,4 +1,6 @@ import crypto from 'crypto'; +import { getPrismaClient } from './prismaClient'; +import { logger } from './middleware/structuredLogging'; export interface ExportManifestRecord { id: string; @@ -9,6 +11,8 @@ export interface ExportManifestRecord { generatedAt: string; fileName: string; rowCount: number; + bulkExportJobId?: string; + artifactId?: string; } interface CreateExportManifestInput { @@ -16,9 +20,19 @@ interface CreateExportManifestInput { reportType: string; filters: Record; rows: unknown[]; + bulkExportJobId?: string; + artifactId?: string; } -const manifests: ExportManifestRecord[] = []; +const memoryManifests: ExportManifestRecord[] = []; + +function getRetentionLimit(): number { + return parseInt(process.env.EXPORT_MANIFEST_RETENTION || '500', 10); +} + +function shouldUseMemoryFallback(): boolean { + return process.env.EXPORT_MANIFEST_STORAGE === 'memory'; +} function stableStringify(value: unknown): string { if (value === null || typeof value !== 'object') { @@ -34,7 +48,40 @@ function stableStringify(value: unknown): string { return `{${keys.map((key) => `${JSON.stringify(key)}:${stableStringify(record[key])}`).join(',')}}`; } -export function createExportManifest(input: CreateExportManifestInput): ExportManifestRecord { +function mapRow(row: { + id: string; + requester: string; + reportType: string; + filters: string; + checksum: string; + generatedAt: Date; + fileName: string; + rowCount: number; + bulkExportJobId: string | null; + artifactId: string | null; +}): ExportManifestRecord { + let filters: Record = {}; + try { + filters = JSON.parse(row.filters) as Record; + } catch { + filters = {}; + } + + return { + id: row.id, + requester: row.requester, + reportType: row.reportType, + filters, + checksum: row.checksum, + generatedAt: row.generatedAt.toISOString(), + fileName: row.fileName, + rowCount: row.rowCount, + bulkExportJobId: row.bulkExportJobId ?? undefined, + artifactId: row.artifactId ?? undefined, + }; +} + +export async function createExportManifest(input: CreateExportManifestInput): Promise { const generatedAt = new Date().toISOString(); const canonicalPayload = stableStringify({ reportType: input.reportType, @@ -45,30 +92,165 @@ export function createExportManifest(input: CreateExportManifestInput): ExportMa const checksum = crypto.createHash('sha256').update(canonicalPayload).digest('hex'); const id = `exp-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; - const manifest: ExportManifestRecord = Object.freeze({ + const manifest: ExportManifestRecord = { id, requester: input.requester, reportType: input.reportType, - filters: Object.freeze({ ...input.filters }), + filters: { ...input.filters }, checksum, generatedAt, fileName: `${input.reportType}-${generatedAt.replace(/[:.]/g, '-')}.json`, rowCount: input.rows.length, - }) as ExportManifestRecord; + bulkExportJobId: input.bulkExportJobId, + artifactId: input.artifactId, + }; + + memoryManifests.unshift(manifest); - manifests.unshift(manifest); - return manifest; + if (!shouldUseMemoryFallback()) { + try { + const prisma = getPrismaClient(); + await prisma.exportManifest.create({ + data: { + id: manifest.id, + requester: manifest.requester, + reportType: manifest.reportType, + filters: JSON.stringify(manifest.filters), + checksum: manifest.checksum, + generatedAt: new Date(manifest.generatedAt), + fileName: manifest.fileName, + rowCount: manifest.rowCount, + bulkExportJobId: manifest.bulkExportJobId ?? null, + artifactId: manifest.artifactId ?? null, + }, + }); + void pruneExportManifests(); + } catch (error) { + logger.log('warn', 'Failed to persist export manifest', { + error: error instanceof Error ? error.message : String(error), + manifestId: manifest.id, + }); + } + } + + return Object.freeze(manifest) as ExportManifestRecord; } -export function listExportManifests(limit: number = 50): ExportManifestRecord[] { +export async function listExportManifests( + limit: number = 50, + offset: number = 0, +): Promise<{ data: ExportManifestRecord[]; total: number }> { const bounded = Math.max(1, Math.min(limit, 200)); - return manifests.slice(0, bounded); + const safeOffset = Math.max(0, offset); + + if (!shouldUseMemoryFallback()) { + try { + const prisma = getPrismaClient(); + const [rows, total] = await Promise.all([ + prisma.exportManifest.findMany({ + orderBy: { generatedAt: 'desc' }, + skip: safeOffset, + take: bounded, + }), + prisma.exportManifest.count(), + ]); + + return { + data: rows.map(mapRow), + total, + }; + } catch (error) { + logger.log('warn', 'Failed to list export manifests from database', { + error: error instanceof Error ? error.message : String(error), + }); + } + } + + const data = memoryManifests.slice(safeOffset, safeOffset + bounded); + return { data, total: memoryManifests.length }; +} + +export async function getExportManifestById(id: string): Promise { + if (!shouldUseMemoryFallback()) { + try { + const prisma = getPrismaClient(); + const row = await prisma.exportManifest.findUnique({ where: { id } }); + if (row) { + return mapRow(row); + } + } catch (error) { + logger.log('warn', 'Failed to fetch export manifest from database', { + error: error instanceof Error ? error.message : String(error), + manifestId: id, + }); + } + } + + return memoryManifests.find((manifest) => manifest.id === id) || null; +} + +export async function verifyExportManifestChecksum( + id: string, + checksum: string, +): Promise<{ match: boolean; manifest: ExportManifestRecord | null }> { + const manifest = await getExportManifestById(id); + if (!manifest) { + return { match: false, manifest: null }; + } + + return { + match: manifest.checksum === checksum, + manifest: { + id: manifest.id, + requester: manifest.requester, + reportType: manifest.reportType, + filters: manifest.filters, + checksum: manifest.checksum, + generatedAt: manifest.generatedAt, + fileName: manifest.fileName, + rowCount: manifest.rowCount, + bulkExportJobId: manifest.bulkExportJobId, + artifactId: manifest.artifactId, + }, + }; } -export function getExportManifestById(id: string): ExportManifestRecord | null { - return manifests.find((manifest) => manifest.id === id) || null; +export async function pruneExportManifests(): Promise { + const retention = getRetentionLimit(); + let pruned = 0; + + if (memoryManifests.length > retention) { + const removed = memoryManifests.splice(retention); + pruned += removed.length; + } + + if (shouldUseMemoryFallback()) { + return pruned; + } + + try { + const prisma = getPrismaClient(); + const rows = await prisma.exportManifest.findMany({ + orderBy: { generatedAt: 'desc' }, + skip: retention, + select: { id: true }, + }); + + if (rows.length > 0) { + const result = await prisma.exportManifest.deleteMany({ + where: { id: { in: rows.map((row) => row.id) } }, + }); + pruned += result.count; + } + } catch (error) { + logger.log('warn', 'Failed to prune export manifests', { + error: error instanceof Error ? error.message : String(error), + }); + } + + return pruned; } export function resetExportManifestsForTests(): void { - manifests.splice(0, manifests.length); + memoryManifests.splice(0, memoryManifests.length); } diff --git a/backend/src/index.ts b/backend/src/index.ts index c91298a7..6683bb39 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -35,7 +35,7 @@ import { import { generateAdminReceipt, getAdminReceipt, listAdminReceipts, verifyReceiptSignature } from './adminReceipt'; import { startApySnapshotScheduler } from './apySnapshot'; import { startDbBackupScheduler } from './dbBackupJob'; -import { startPositionReconciliationScheduler } from './positionReconciliationJob'; +import { startPositionReconciliationScheduler, startLedgerReconciliationScheduler } from './positionReconciliationJob'; import { setupSwagger } from './swagger'; import { sorobanCircuitBreaker } from './circuitBreaker'; import { correlationIdMiddleware, CorrelationIdRequest } from './middleware/correlationId'; @@ -163,7 +163,13 @@ import { createExportManifest, getExportManifestById, listExportManifests, + verifyExportManifestChecksum, } from './exportManifest'; +import { + reconciliationReportHandler, + automatedReconciliationSummaryHandler, +} from './reconciliationReport'; +import { diagnosticsBundleHandler } from './diagnosticsBundle'; declare global { namespace Express { @@ -559,6 +565,7 @@ app.use(maintenanceModeMiddleware); app.get('/metrics', async (_req: Request, res: Response) => { try { syncJobGovernanceMetrics(); + latencyMonitoringService.syncSloMetrics(); res.set('Content-Type', register.contentType); res.end(await register.metrics()); } catch (err) { @@ -3115,10 +3122,11 @@ app.post('/admin/transactions/backfill', validateApiKey, async (req: Request, re /** * GET /admin/transactions/backfill - list recent backfill jobs */ -app.get('/admin/transactions/backfill', validateApiKey, (req: Request, res: Response) => { +app.get('/admin/transactions/backfill', validateApiKey, async (req: Request, res: Response) => { const limit = parseInt(String(req.query.limit || '20'), 10); + const data = await listTransactionBackfillJobs(limit); res.status(200).json({ - data: listTransactionBackfillJobs(limit), + data, timestamp: new Date().toISOString(), }); }); @@ -3126,8 +3134,8 @@ app.get('/admin/transactions/backfill', validateApiKey, (req: Request, res: Resp /** * GET /admin/transactions/backfill/:jobId - fetch a specific backfill job */ -app.get('/admin/transactions/backfill/:jobId', validateApiKey, (req: Request, res: Response) => { - const job = getTransactionBackfillJob(String(req.params.jobId)); +app.get('/admin/transactions/backfill/:jobId', validateApiKey, async (req: Request, res: Response) => { + const job = await getTransactionBackfillJob(String(req.params.jobId)); if (!job) { res.status(404).json({ error: 'Not Found', @@ -3145,7 +3153,7 @@ app.get('/admin/transactions/backfill/:jobId', validateApiKey, (req: Request, re /** * POST /admin/reports/exports - generate a report export and immutable manifest record */ -app.post('/admin/reports/exports', validateApiKey, (req: Request, res: Response) => { +app.post('/admin/reports/exports', validateApiKey, async (req: Request, res: Response) => { const reportType = String(req.body?.reportType || 'transactions').trim(); const requester = resolveActingAdminAddress(req); const filters = @@ -3161,7 +3169,7 @@ app.post('/admin/reports/exports', validateApiKey, (req: Request, res: Response) }, ]; - const manifest = createExportManifest({ + const manifest = await createExportManifest({ requester, reportType, filters, @@ -3177,10 +3185,15 @@ app.post('/admin/reports/exports', validateApiKey, (req: Request, res: Response) /** * GET /admin/reports/exports/manifests - list immutable export manifests */ -app.get('/admin/reports/exports/manifests', validateApiKey, (req: Request, res: Response) => { +app.get('/admin/reports/exports/manifests', validateApiKey, async (req: Request, res: Response) => { const limit = parseInt(String(req.query.limit || '50'), 10); + const offset = parseInt(String(req.query.offset || '0'), 10); + const result = await listExportManifests(limit, offset); res.status(200).json({ - data: listExportManifests(limit), + data: result.data, + total: result.total, + limit, + offset, timestamp: new Date().toISOString(), }); }); @@ -3188,8 +3201,8 @@ app.get('/admin/reports/exports/manifests', validateApiKey, (req: Request, res: /** * GET /admin/reports/exports/manifests/:id - fetch a manifest by id */ -app.get('/admin/reports/exports/manifests/:id', validateApiKey, (req: Request, res: Response) => { - const manifest = getExportManifestById(String(req.params.id)); +app.get('/admin/reports/exports/manifests/:id', validateApiKey, async (req: Request, res: Response) => { + const manifest = await getExportManifestById(String(req.params.id)); if (!manifest) { res.status(404).json({ error: 'Not Found', @@ -3204,6 +3217,37 @@ app.get('/admin/reports/exports/manifests/:id', validateApiKey, (req: Request, r }); }); +/** + * POST /admin/reports/exports/manifests/:id/verify - verify manifest checksum + */ +app.post('/admin/reports/exports/manifests/:id/verify', validateApiKey, async (req: Request, res: Response) => { + const checksum = String(req.body?.checksum || '').trim(); + if (!checksum) { + res.status(400).json({ + error: 'Bad Request', + status: 400, + message: 'checksum is required', + }); + return; + } + + const result = await verifyExportManifestChecksum(String(req.params.id), checksum); + if (!result.manifest) { + res.status(404).json({ + error: 'Not Found', + status: 404, + message: 'Export manifest not found', + }); + return; + } + + res.status(200).json({ + match: result.match, + manifest: result.manifest, + timestamp: new Date().toISOString(), + }); +}); + /** * GET /admin/jobs/dashboard - lightweight HTML dashboard for operators */ @@ -4024,6 +4068,12 @@ app.get('/admin/diagnostics', validateApiKey, diagnosticsBundleHandler); */ app.get('/admin/reconciliation', validateApiKey, reconciliationReportHandler); +/** + * GET /admin/reconciliation/latest + * Returns the latest automated reconciliation summary without re-running Horizon queries. + */ +app.get('/admin/reconciliation/latest', validateApiKey, automatedReconciliationSummaryHandler); + // ─── Typed Error Boundary (Issue #708) ────────────────────────────────────── // Mounted before the generic error handler so upstream dependency failures // are mapped to typed API errors with stable codes and retry hints. @@ -4110,6 +4160,11 @@ if (process.env.NODE_ENV !== 'test') { stopPositionReconciliationScheduler(); }); + const stopLedgerReconciliationScheduler = startLedgerReconciliationScheduler(); + shutdownHandler.onShutdown(async () => { + stopLedgerReconciliationScheduler(); + }); + // Register event polling service shutdown shutdownHandler.onShutdown(async () => { stopEventPollingService(); diff --git a/backend/src/latencyMonitoring.ts b/backend/src/latencyMonitoring.ts index 4322b5b8..9e1a534d 100644 --- a/backend/src/latencyMonitoring.ts +++ b/backend/src/latencyMonitoring.ts @@ -1,5 +1,6 @@ import { logger } from './middleware/structuredLogging'; -import { ENDPOINT_SLA_REGISTRY, resolveLatencyBudgetMs, EndpointType } from './endpointSlaRegistry'; +import { ENDPOINT_SLA_REGISTRY, resolveLatencyBudgetMs, EndpointType, getEndpointSla } from './endpointSlaRegistry'; +import { recordSloBreachAlert, endpointSloP95LatencyMs, endpointSloBudgetMs, endpointSloBreach } from './metrics'; export { EndpointType } from './endpointSlaRegistry'; @@ -261,6 +262,7 @@ export class LatencyMonitoringService { }; tracker.recordAlert(); + this.recordSloBreachMetric(endpoint, tracker.endpointType); await this.sendAlerts([violation]); logger.log('info', 'Immediate SLO breach alert triggered', { @@ -284,6 +286,7 @@ export class LatencyMonitoringService { }); tracker.recordAlert(); + this.recordSloBreachMetric(endpoint, tracker.endpointType); } }); @@ -308,6 +311,30 @@ export class LatencyMonitoringService { } } + private recordSloBreachMetric(endpoint: string, type: EndpointType): void { + const sla = getEndpointSla(endpoint); + const tier = sla?.tier ?? 'unknown'; + recordSloBreachAlert(endpoint, tier, type); + } + + /** + * Syncs endpoint SLO state into Prometheus gauges. + * Call before scraping /metrics. + */ + syncSloMetrics(): void { + const detailed = this.getDetailedMetrics(); + + for (const metric of detailed) { + const sla = getEndpointSla(metric.endpoint); + const tier = sla?.tier ?? 'unknown'; + const labels = { path: metric.endpoint, tier, type: metric.type }; + + endpointSloP95LatencyMs.set(labels, metric.currentP95); + endpointSloBudgetMs.set(labels, metric.threshold); + endpointSloBreach.set(labels, metric.isBreaching ? 1 : 0); + } + } + resetForTests(): void { this.stopMonitoring(); this.trackers.clear(); diff --git a/backend/src/metrics.ts b/backend/src/metrics.ts index bb41d4fb..2c8efc07 100644 --- a/backend/src/metrics.ts +++ b/backend/src/metrics.ts @@ -130,3 +130,70 @@ export function syncJobGovernanceMetrics(): void { ); } } + +// --- Reconciliation Drift Metrics --- + +export const reconciliationDriftTotal = new Counter({ + name: 'reconciliation_drift_total', + help: 'Total reconciliation drift issues detected by type', + labelNames: ['issue'], + registers: [register], +}); + +export const reconciliationStatus = new Gauge({ + name: 'reconciliation_status', + help: 'Reconciliation status: 1 = clean, 0 = drift detected', + registers: [register], +}); + +export const reconciliationLastRunTimestamp = new Gauge({ + name: 'reconciliation_last_run_timestamp', + help: 'Unix timestamp of the last automated reconciliation run', + registers: [register], +}); + +export function recordReconciliationDrift(issue: string): void { + reconciliationDriftTotal.inc({ issue }); +} + +export function setReconciliationStatus(clean: number): void { + reconciliationStatus.set(clean); +} + +export function setReconciliationLastRunTimestamp(unixSeconds: number): void { + reconciliationLastRunTimestamp.set(unixSeconds); +} + +// --- Endpoint SLO Metrics --- + +export const endpointSloBreachTotal = new Counter({ + name: 'backend_slo_breach_total', + help: 'Total endpoint SLO breach alerts emitted', + labelNames: ['path', 'tier', 'type'], + registers: [register], +}); + +export const endpointSloP95LatencyMs = new Gauge({ + name: 'backend_slo_p95_latency_ms', + help: 'Current rolling P95 latency per endpoint in milliseconds', + labelNames: ['path', 'tier', 'type'], + registers: [register], +}); + +export const endpointSloBudgetMs = new Gauge({ + name: 'backend_slo_budget_ms', + help: 'Configured P95 latency budget per endpoint in milliseconds', + labelNames: ['path', 'tier', 'type'], + registers: [register], +}); + +export const endpointSloBreach = new Gauge({ + name: 'backend_slo_breach', + help: 'Endpoint SLO breach state: 1 = breaching, 0 = within budget', + labelNames: ['path', 'tier', 'type'], + registers: [register], +}); + +export function recordSloBreachAlert(path: string, tier: string, type: string): void { + endpointSloBreachTotal.inc({ path, tier, type }); +} diff --git a/backend/src/positionReconciliationJob.ts b/backend/src/positionReconciliationJob.ts index cb1b398f..ba16d305 100644 --- a/backend/src/positionReconciliationJob.ts +++ b/backend/src/positionReconciliationJob.ts @@ -4,6 +4,15 @@ import { runJobWithRetry, registerJob } from './jobGovernance'; import { startEventPollingService } from './eventPollingService'; import { updateVaultMetrics } from './metrics'; import Decimal from 'decimal.js'; +import { + runReconciliationReport, + type ReconciliationSummary, +} from './reconciliationReport'; +import { + recordReconciliationDrift, + setReconciliationLastRunTimestamp, + setReconciliationStatus, +} from './metrics'; const prisma = getPrismaClient(); @@ -15,7 +24,6 @@ export async function runPositionReconciliationJob(): Promise { logger.log('info', 'Position reconciliation job started'); - // Get/start the event polling service singleton const pollingService = startEventPollingService({ rpcUrl: process.env.STELLAR_RPC_URL || 'https://soroban-testnet.stellar.org', contractId, @@ -23,10 +31,8 @@ export async function runPositionReconciliationJob(): Promise { batchSize: parseInt(process.env.EVENT_REPLAY_BATCH_SIZE || '100', 10), }); - // Trigger poll cycle and propagate errors await pollingService.pollEvents(); - // Read the updated vault state from the database const state = await prisma.vaultState.findUnique({ where: { id: 1 } }); if (state) { const assets = new Decimal(state.totalAssets); @@ -47,7 +53,7 @@ export async function runPositionReconciliationJob(): Promise { logger.log('info', 'Position reconciliation job completed'); } -let reconciliationTimer: ReturnType | null = null; +let positionReconciliationTimer: ReturnType | null = null; export function startPositionReconciliationScheduler(): () => void { const enabled = process.env.POSITION_RECONCILIATION_ENABLED !== 'false'; @@ -59,7 +65,7 @@ export function startPositionReconciliationScheduler(): () => void { const intervalMs = parseInt(process.env.POSITION_RECONCILIATION_INTERVAL_MS || '30000', 10); registerJob('positionReconciliation'); - reconciliationTimer = setInterval(() => { + positionReconciliationTimer = setInterval(() => { void runJobWithRetry('positionReconciliation', runPositionReconciliationJob).catch((error) => { logger.log('error', 'Position reconciliation scheduler error', { error: error instanceof Error ? error.message : String(error), @@ -72,10 +78,128 @@ export function startPositionReconciliationScheduler(): () => void { }); return () => { - if (reconciliationTimer) { - clearInterval(reconciliationTimer); - reconciliationTimer = null; + if (positionReconciliationTimer) { + clearInterval(positionReconciliationTimer); + positionReconciliationTimer = null; logger.log('info', 'Position reconciliation scheduler stopped'); } }; } + +// ─── Scheduled ledger drift reconciliation (Issue #724) ───────────────────── + +let lastDriftAlertAt = 0; + +function getReconciliationIntervalMs(): number { + return parseInt(process.env.LEDGER_RECONCILIATION_INTERVAL_MS || '300000', 10); +} + +function getDriftAlertThreshold(): number { + return parseInt(process.env.RECONCILIATION_DRIFT_ALERT_THRESHOLD || '1', 10); +} + +function getDriftAlertCooldownMs(): number { + return parseInt(process.env.RECONCILIATION_ALERT_COOLDOWN_MS || '900000', 10); +} + +async function sendDriftAlert(report: ReconciliationSummary): Promise { + const now = Date.now(); + if (now - lastDriftAlertAt < getDriftAlertCooldownMs()) { + return; + } + + lastDriftAlertAt = now; + const webhookUrl = process.env.RECONCILIATION_ALERT_WEBHOOK_URL || process.env.SLACK_WEBHOOK_URL; + + logger.log('warn', 'Reconciliation drift detected', { + drifted: report.counts.drifted, + status: report.status, + window: report.window, + }); + + if (!webhookUrl) { + return; + } + + try { + await fetch(webhookUrl, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + text: `Reconciliation drift detected: ${report.counts.drifted} issue(s) in window ${report.window.from} → ${report.window.to}`, + }), + }); + } catch (error) { + logger.log('error', 'Failed to send reconciliation drift alert', { + error: error instanceof Error ? error.message : String(error), + }); + } +} + +export async function runLedgerReconciliationJob(): Promise { + const startedAt = Date.now(); + const report = await runReconciliationReport({ + storeAsAutomated: true, + persistSnapshot: true, + }); + + setReconciliationLastRunTimestamp(Math.floor(Date.parse(report.generatedAt) / 1000)); + setReconciliationStatus(report.status === 'CLEAN' ? 1 : 0); + + if (report.status === 'DRIFT_DETECTED') { + for (const entry of report.driftEntries) { + recordReconciliationDrift(entry.issue); + } + + if (report.counts.drifted >= getDriftAlertThreshold()) { + await sendDriftAlert(report); + } + } + + logger.log('info', 'Ledger reconciliation job completed', { + status: report.status, + drifted: report.counts.drifted, + durationMs: Date.now() - startedAt, + }); + + return report; +} + +let ledgerReconciliationTimer: ReturnType | null = null; + +export function startLedgerReconciliationScheduler(): () => void { + const enabled = process.env.LEDGER_RECONCILIATION_ENABLED !== 'false'; + if (!enabled) { + logger.log('info', 'Ledger reconciliation scheduler disabled'); + return () => {}; + } + + const intervalMs = getReconciliationIntervalMs(); + registerJob('reportGeneration'); + + ledgerReconciliationTimer = setInterval(() => { + void runJobWithRetry('reportGeneration', runLedgerReconciliationJob).catch((error) => { + logger.log('error', 'Ledger reconciliation scheduler error', { + error: error instanceof Error ? error.message : String(error), + }); + }); + }, intervalMs); + + logger.log('info', 'Ledger reconciliation scheduler started', { intervalMs }); + + return () => { + if (ledgerReconciliationTimer) { + clearInterval(ledgerReconciliationTimer); + ledgerReconciliationTimer = null; + logger.log('info', 'Ledger reconciliation scheduler stopped'); + } + }; +} + +export function resetLedgerReconciliationSchedulerForTests(): void { + lastDriftAlertAt = 0; + if (ledgerReconciliationTimer) { + clearInterval(ledgerReconciliationTimer); + ledgerReconciliationTimer = null; + } +} diff --git a/backend/src/reconciliationReport.ts b/backend/src/reconciliationReport.ts index 1018f3a5..a1be91d6 100644 --- a/backend/src/reconciliationReport.ts +++ b/backend/src/reconciliationReport.ts @@ -16,7 +16,7 @@ import { getCurrentTraceId } from './tracing'; // ─── Types ────────────────────────────────────────────────────────────────── -interface LedgerRecord { +export interface LedgerRecord { transactionHash: string; type: string; amount: string; @@ -24,13 +24,13 @@ interface LedgerRecord { timestamp: string; } -interface DriftEntry { +export interface DriftEntry { transactionHash: string; issue: 'MISSING_IN_DB' | 'MISSING_ON_LEDGER' | 'AMOUNT_MISMATCH' | 'TYPE_MISMATCH'; details: Record; } -interface ReconciliationSummary { +export interface ReconciliationSummary { generatedAt: string; traceId: string | undefined; window: { @@ -47,6 +47,24 @@ interface ReconciliationSummary { status: 'CLEAN' | 'DRIFT_DETECTED'; } +export type LedgerFetcher = (from: string, to: string) => Promise; + +let lastAutomatedSummary: ReconciliationSummary | null = null; +let lastAutomatedRunAt: string | null = null; + +export function getLastAutomatedReconciliationSummary(): ReconciliationSummary | null { + return lastAutomatedSummary; +} + +export function getLastAutomatedReconciliationRunAt(): string | null { + return lastAutomatedRunAt; +} + +export function resetReconciliationStateForTests(): void { + lastAutomatedSummary = null; + lastAutomatedRunAt = null; +} + // ─── Ledger Fetcher ───────────────────────────────────────────────────────── /** @@ -54,7 +72,7 @@ interface ReconciliationSummary { * In production, this queries the Stellar Horizon /transactions endpoint. * Falls back gracefully if Horizon is unreachable. */ -async function fetchLedgerRecords( +export async function fetchLedgerRecords( from: string, to: string, ): Promise { @@ -104,7 +122,7 @@ async function fetchLedgerRecords( // ─── Database Fetcher ─────────────────────────────────────────────────────── -async function fetchDatabaseRecords( +export async function fetchDatabaseRecords( from: string, to: string, ): Promise { @@ -123,8 +141,6 @@ async function fetchDatabaseRecords( }); return transactions.map((tx: any) => ({ - // Map from Prisma schema fields to our internal LedgerRecord shape. - // The Transaction model uses `id` as identifier and `user` for wallet. transactionHash: tx.id, type: tx.type, amount: String(tx.amount), @@ -141,7 +157,7 @@ async function fetchDatabaseRecords( // ─── Reconciliation Logic ─────────────────────────────────────────────────── -function reconcile( +export function reconcile( ledgerRecords: LedgerRecord[], dbRecords: LedgerRecord[], ): { matched: number; driftEntries: DriftEntry[] } { @@ -158,7 +174,6 @@ function reconcile( const driftEntries: DriftEntry[] = []; let matched = 0; - // Check ledger records against DB for (const ledgerRec of ledgerRecords) { const dbRec = dbByHash.get(ledgerRec.transactionHash); if (!dbRec) { @@ -175,7 +190,6 @@ function reconcile( continue; } - // Check for mismatches if (ledgerRec.amount !== dbRec.amount) { driftEntries.push({ transactionHash: ledgerRec.transactionHash, @@ -203,7 +217,6 @@ function reconcile( matched++; } - // Check DB records missing from ledger for (const dbRec of dbRecords) { if (!ledgerByHash.has(dbRec.transactionHash) && ledgerRecords.length > 0) { driftEntries.push({ @@ -222,38 +235,28 @@ function reconcile( return { matched, driftEntries }; } -// ─── Handler ──────────────────────────────────────────────────────────────── +export interface RunReconciliationOptions { + from?: string; + to?: string; + traceId?: string; + ledgerFetcher?: LedgerFetcher; + persistSnapshot?: boolean; + storeAsAutomated?: boolean; +} -/** - * GET /api/v1/admin/reconciliation - * - * Query params: - * - from: ISO 8601 start timestamp (defaults to 24h ago) - * - to: ISO 8601 end timestamp (defaults to now) - * - * Returns a reconciliation report comparing ledger events vs DB state. - * Requires admin API key with ADMIN_READ permission. - */ -export async function reconciliationReportHandler( - req: Request, - res: Response, -): Promise { - const traceId = getCurrentTraceId(); +export async function runReconciliationReport( + options: RunReconciliationOptions = {}, +): Promise { const now = new Date(); - const defaultFrom = new Date(now.getTime() - 24 * 60 * 60 * 1000); + const defaultFrom = new Date(now.getTime() - getReconciliationWindowMs()); - const from = (req.query.from as string) || defaultFrom.toISOString(); - const to = (req.query.to as string) || now.toISOString(); - - logger.log('info', 'Reconciliation report requested', { - traceId, - from, - to, - requestedBy: req.get('x-admin-address') || 'unknown', - }); + const from = options.from || defaultFrom.toISOString(); + const to = options.to || now.toISOString(); + const traceId = options.traceId; + const ledgerFetcher = options.ledgerFetcher || fetchLedgerRecords; const [ledgerRecords, dbRecords] = await Promise.all([ - fetchLedgerRecords(from, to), + ledgerFetcher(from, to), fetchDatabaseRecords(from, to), ]); @@ -269,10 +272,92 @@ export async function reconciliationReportHandler( matched, drifted: driftEntries.length, }, - driftEntries: driftEntries.slice(0, 100), // cap response size + driftEntries: driftEntries.slice(0, 100), status: driftEntries.length === 0 ? 'CLEAN' : 'DRIFT_DETECTED', }; - const statusCode = report.status === 'CLEAN' ? 200 : 200; - res.status(statusCode).json(report); + if (options.storeAsAutomated) { + lastAutomatedSummary = report; + lastAutomatedRunAt = report.generatedAt; + } + + if (options.persistSnapshot !== false && options.storeAsAutomated) { + await persistReconciliationSnapshot(report); + } + + return report; +} + +function getReconciliationWindowMs(): number { + const hours = parseInt(process.env.RECONCILIATION_WINDOW_HOURS || '24', 10); + return Math.max(1, hours) * 60 * 60 * 1000; +} + +async function persistReconciliationSnapshot(report: ReconciliationSummary): Promise { + try { + const prisma = getPrismaClient(); + await prisma.reconciliationSnapshot.create({ + data: { + id: `rec-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`, + generatedAt: new Date(report.generatedAt), + traceId: report.traceId ?? null, + status: report.status, + windowFrom: new Date(report.window.from), + windowTo: new Date(report.window.to), + summaryJson: JSON.stringify(report), + driftCount: report.counts.drifted, + }, + }); + } catch (error) { + logger.log('warn', 'Failed to persist reconciliation snapshot', { + error: error instanceof Error ? error.message : String(error), + }); + } +} + +// ─── Handler ──────────────────────────────────────────────────────────────── + +export async function reconciliationReportHandler( + req: Request, + res: Response, +): Promise { + const traceId = getCurrentTraceId(); + + logger.log('info', 'Reconciliation report requested', { + traceId, + from: req.query.from, + to: req.query.to, + requestedBy: req.get('x-admin-address') || 'unknown', + }); + + const report = await runReconciliationReport({ + from: req.query.from as string | undefined, + to: req.query.to as string | undefined, + traceId, + storeAsAutomated: false, + persistSnapshot: false, + }); + + res.status(200).json(report); +} + +export async function automatedReconciliationSummaryHandler( + req: Request, + res: Response, +): Promise { + const summary = getLastAutomatedReconciliationSummary(); + if (!summary) { + res.status(404).json({ + error: 'Not Found', + status: 404, + message: 'No automated reconciliation summary available yet', + }); + return; + } + + res.status(200).json({ + summary, + lastRunAt: getLastAutomatedReconciliationRunAt(), + requestedBy: req.get('x-admin-address') || 'unknown', + }); } diff --git a/backend/src/transactionBackfill.ts b/backend/src/transactionBackfill.ts index cc05440f..cfc64955 100644 --- a/backend/src/transactionBackfill.ts +++ b/backend/src/transactionBackfill.ts @@ -1,8 +1,6 @@ import { getPrismaClient } from './prismaClient'; import { logger } from './middleware/structuredLogging'; -const prisma = getPrismaClient(); - export interface BackfillRequest { startLedger: number; endLedger: number; @@ -32,6 +30,7 @@ export interface BackfillJob { createdAt: string; updatedAt: string; error?: string; + lastProcessedLedger?: number; } interface StellarEvent { @@ -44,10 +43,15 @@ interface StellarEvent { const jobs = new Map(); const jobsByKey = new Map(); +let hydrationPromise: Promise | null = null; const DEFAULT_BATCH_SIZE = 50; -const MAX_BATCH_SIZE = 500; -const MAX_LEDGER_RANGE = 20000; +export const MAX_BATCH_SIZE = 500; +export const MAX_LEDGER_RANGE = 20000; + +function getRetentionDays(): number { + return parseInt(process.env.BACKFILL_JOB_RETENTION_DAYS || '30', 10); +} function nowIso(): string { return new Date().toISOString(); @@ -65,6 +69,118 @@ function jobKey(input: BackfillRequest): string { return `${input.contractId}:${input.startLedger}-${input.endLedger}:${input.dryRun ? 'dry' : 'live'}`; } +function mapDbJob(row: { + id: string; + jobKey: string; + startLedger: number; + endLedger: number; + batchSize: number; + dryRun: boolean; + status: string; + progressJson: string; + errorMessage: string | null; + createdAt: Date; + updatedAt: Date; + lastProcessedLedger: number | null; +}): BackfillJob { + let progress: BackfillProgress = { + totalLedgers: row.endLedger - row.startLedger + 1, + missingLedgers: 0, + scannedLedgers: 0, + insertedEvents: 0, + duplicateEvents: 0, + }; + + try { + progress = JSON.parse(row.progressJson) as BackfillProgress; + } catch { + // keep defaults + } + + return { + id: row.id, + key: row.jobKey, + startLedger: row.startLedger, + endLedger: row.endLedger, + batchSize: row.batchSize, + dryRun: row.dryRun, + status: row.status as BackfillJob['status'], + progress, + createdAt: row.createdAt.toISOString(), + updatedAt: row.updatedAt.toISOString(), + error: row.errorMessage ?? undefined, + lastProcessedLedger: row.lastProcessedLedger ?? undefined, + }; +} + +async function ensureJobsHydrated(): Promise { + if (!hydrationPromise) { + hydrationPromise = hydrateJobsFromDatabase(); + } + await hydrationPromise; +} + +async function hydrateJobsFromDatabase(): Promise { + try { + const rows = await getPrismaClient().transactionBackfillJob.findMany({ + orderBy: { createdAt: 'desc' }, + take: 200, + }); + + for (const row of rows) { + const job = mapDbJob(row); + jobs.set(job.id, job); + jobsByKey.set(job.key, job.id); + } + } catch (error) { + logger.log('warn', 'Failed to hydrate transaction backfill jobs', { + error: error instanceof Error ? error.message : String(error), + }); + } +} + +async function persistJob(job: BackfillJob, rpcUrl: string, contractId: string): Promise { + try { + await getPrismaClient().transactionBackfillJob.upsert({ + where: { id: job.id }, + update: { + status: job.status, + progressJson: JSON.stringify(job.progress), + errorMessage: job.error ?? null, + lastProcessedLedger: job.lastProcessedLedger ?? null, + updatedAt: new Date(job.updatedAt), + completedAt: job.status === 'completed' || job.status === 'failed' + ? new Date(job.updatedAt) + : null, + }, + create: { + id: job.id, + jobKey: job.key, + startLedger: job.startLedger, + endLedger: job.endLedger, + batchSize: job.batchSize, + dryRun: job.dryRun, + status: job.status, + rpcUrl, + contractId, + progressJson: JSON.stringify(job.progress), + errorMessage: job.error ?? null, + lastProcessedLedger: job.lastProcessedLedger ?? null, + createdAt: new Date(job.createdAt), + updatedAt: new Date(job.updatedAt), + completedAt: job.status === 'completed' || job.status === 'failed' + ? new Date(job.updatedAt) + : null, + }, + }); + } catch (error) { + logger.log('warn', 'Failed to persist transaction backfill job', { + error: error instanceof Error ? error.message : String(error), + jobId: job.id, + }); + } +} + async function fetchEventsForRange( rpcUrl: string, contractId: string, @@ -108,7 +224,7 @@ async function fetchEventsForRange( } async function getMissingLedgers(startLedger: number, endLedger: number): Promise { - const rows = await prisma.processedEvent.findMany({ + const rows = await getPrismaClient().processedEvent.findMany({ where: { ledgerSeq: { gte: startLedger, @@ -152,46 +268,24 @@ function createJob(input: BackfillRequest, key: string, missingLedgers: number[] }; } -function touchJob(job: BackfillJob): void { +async function touchJob(job: BackfillJob, rpcUrl: string, contractId: string): Promise { job.updatedAt = nowIso(); jobs.set(job.id, job); + await persistJob(job, rpcUrl, contractId); } -export async function createOrResumeTransactionBackfill(input: BackfillRequest): Promise { - if (!Number.isInteger(input.startLedger) || !Number.isInteger(input.endLedger)) { - throw new Error('startLedger and endLedger must be integers'); - } - if (input.startLedger <= 0 || input.endLedger <= 0) { - throw new Error('startLedger and endLedger must be greater than 0'); - } - if (input.endLedger < input.startLedger) { - throw new Error('endLedger must be greater than or equal to startLedger'); - } - - const range = input.endLedger - input.startLedger + 1; - if (range > MAX_LEDGER_RANGE) { - throw new Error(`ledger range exceeds maximum of ${MAX_LEDGER_RANGE}`); - } - - const batchSize = Math.min(Math.max(input.batchSize || DEFAULT_BATCH_SIZE, 1), MAX_BATCH_SIZE); - const key = jobKey(input); - const previousId = jobsByKey.get(key); - - if (previousId) { - const existing = jobs.get(previousId); - if (existing && existing.status !== 'failed') { - return existing; - } - } - - const missingLedgers = await getMissingLedgers(input.startLedger, input.endLedger); - const job = createJob(input, key, missingLedgers, batchSize); - jobs.set(job.id, job); - jobsByKey.set(key, job.id); - +async function runBackfillProcessing( + job: BackfillJob, + input: BackfillRequest, + missingLedgers: number[], + batchSize: number, +): Promise { try { if (!input.dryRun && missingLedgers.length > 0) { - for (const ledgerChunk of chunk(missingLedgers, batchSize)) { + const resumeAfter = job.lastProcessedLedger ?? 0; + const ledgersToScan = missingLedgers.filter((ledger) => ledger > resumeAfter); + + for (const ledgerChunk of chunk(ledgersToScan, batchSize)) { const startLedger = ledgerChunk[0]; const endLedger = ledgerChunk[ledgerChunk.length - 1]; const events = await fetchEventsForRange( @@ -202,38 +296,39 @@ export async function createOrResumeTransactionBackfill(input: BackfillRequest): ); for (const event of events) { - const result = await prisma.processedEvent.upsert({ - where: { id: event.id }, - update: {}, - create: { - id: event.id, - ledgerSeq: event.ledger, - eventType: event.type, - contractId: event.contractId, - txHash: event.txHash, - }, - }); - - if (result.id) { + const existing = await getPrismaClient().processedEvent.findUnique({ where: { id: event.id } }); + if (existing) { + job.progress.duplicateEvents += 1; + } else { + await getPrismaClient().processedEvent.create({ + data: { + id: event.id, + ledgerSeq: event.ledger, + eventType: event.type, + contractId: event.contractId, + txHash: event.txHash, + }, + }); job.progress.insertedEvents += 1; } } job.progress.scannedLedgers += ledgerChunk.length; - touchJob(job); + job.lastProcessedLedger = endLedger; + await touchJob(job, input.rpcUrl, input.contractId); } } else { job.progress.scannedLedgers = job.progress.missingLedgers; - touchJob(job); + await touchJob(job, input.rpcUrl, input.contractId); } job.status = 'completed'; - touchJob(job); + await touchJob(job, input.rpcUrl, input.contractId); return job; } catch (error) { job.status = 'failed'; job.error = error instanceof Error ? error.message : String(error); - touchJob(job); + await touchJob(job, input.rpcUrl, input.contractId); logger.log('error', 'Transaction backfill failed', { jobId: job.id, @@ -246,17 +341,87 @@ export async function createOrResumeTransactionBackfill(input: BackfillRequest): } } -export function getTransactionBackfillJob(jobId: string): BackfillJob | null { +export async function createOrResumeTransactionBackfill(input: BackfillRequest): Promise { + await ensureJobsHydrated(); + + if (!Number.isInteger(input.startLedger) || !Number.isInteger(input.endLedger)) { + throw new Error('startLedger and endLedger must be integers'); + } + if (input.startLedger <= 0 || input.endLedger <= 0) { + throw new Error('startLedger and endLedger must be greater than 0'); + } + if (input.endLedger < input.startLedger) { + throw new Error('endLedger must be greater than or equal to startLedger'); + } + + const range = input.endLedger - input.startLedger + 1; + if (range > MAX_LEDGER_RANGE) { + throw new Error(`ledger range exceeds maximum of ${MAX_LEDGER_RANGE}`); + } + + const batchSize = Math.min(Math.max(input.batchSize || DEFAULT_BATCH_SIZE, 1), MAX_BATCH_SIZE); + const key = jobKey(input); + + const previousId = jobsByKey.get(key); + if (previousId) { + const existing = jobs.get(previousId); + if (existing && existing.status === 'running') { + return existing; + } + if (existing && existing.status === 'completed') { + return existing; + } + if (existing && existing.status === 'failed') { + existing.status = 'running'; + existing.error = undefined; + await touchJob(existing, input.rpcUrl, input.contractId); + return runBackfillProcessing(existing, input, await getMissingLedgers(input.startLedger, input.endLedger), batchSize); + } + } + + const missingLedgers = await getMissingLedgers(input.startLedger, input.endLedger); + const job = createJob(input, key, missingLedgers, batchSize); + jobs.set(job.id, job); + jobsByKey.set(key, job.id); + await persistJob(job, input.rpcUrl, input.contractId); + + return runBackfillProcessing(job, input, missingLedgers, batchSize); +} + +export async function getTransactionBackfillJob(jobId: string): Promise { + await ensureJobsHydrated(); return jobs.get(jobId) || null; } -export function listTransactionBackfillJobs(limit: number = 20): BackfillJob[] { +export async function listTransactionBackfillJobs(limit: number = 20): Promise { + await ensureJobsHydrated(); return Array.from(jobs.values()) .sort((a, b) => b.createdAt.localeCompare(a.createdAt)) .slice(0, Math.max(1, Math.min(limit, 200))); } +export async function pruneOldBackfillJobs(): Promise { + const cutoff = new Date(); + cutoff.setUTCDate(cutoff.getUTCDate() - getRetentionDays()); + + try { + const result = await getPrismaClient().transactionBackfillJob.deleteMany({ + where: { + createdAt: { lt: cutoff }, + status: { in: ['completed', 'failed'] }, + }, + }); + return result.count; + } catch (error) { + logger.log('warn', 'Failed to prune old backfill jobs', { + error: error instanceof Error ? error.message : String(error), + }); + return 0; + } +} + export function resetTransactionBackfillJobsForTests(): void { jobs.clear(); jobsByKey.clear(); + hydrationPromise = null; } diff --git a/docs/MONITORING_OBSERVABILITY.md b/docs/MONITORING_OBSERVABILITY.md index cf41a134..a9e0a319 100644 --- a/docs/MONITORING_OBSERVABILITY.md +++ b/docs/MONITORING_OBSERVABILITY.md @@ -83,6 +83,37 @@ These are updated by `updateVaultMetrics()` whenever vault state changes. A flat An event-loop lag above **100 ms** is a sign of CPU saturation. +### 1.6 Endpoint SLO Metrics + +Per-route SLO breach state is exported directly from `latencyMonitoring.ts` via `syncSloMetrics()` on each `/metrics` scrape. + +| Metric | Type | Labels | What it measures | +|---|---|---|---| +| `backend_slo_breach_total` | Counter | `path`, `tier`, `type` | SLO breach alerts emitted (respects alert cooldown) | +| `backend_slo_p95_latency_ms` | Gauge | `path`, `tier`, `type` | Current rolling P95 latency vs budget | +| `backend_slo_budget_ms` | Gauge | `path`, `tier`, `type` | Configured P95 latency budget | +| `backend_slo_breach` | Gauge | `path`, `tier`, `type` | `1` when breaching, `0` when within budget | + +Critical tier routes (`/health`, `/ready`) use `tier="critical"` labels from `ENDPOINT_SLA_REGISTRY`. + +**Key derived queries (PromQL):** + +```promql +# Endpoints currently breaching latency SLO +backend_slo_breach == 1 + +# Alert rate per endpoint (15m window) +rate(backend_slo_breach_total[15m]) +``` + +### 1.7 Reconciliation Drift Metrics + +| Metric | Type | Labels | What it measures | +|---|---|---|---| +| `reconciliation_drift_total` | Counter | `issue` | Drift issues detected by scheduled reconciliation | +| `reconciliation_status` | Gauge | — | `1` = clean, `0` = drift detected | +| `reconciliation_last_run_timestamp` | Gauge | — | Unix timestamp of last automated reconciliation run | + --- ## 2. Latency SLO Alerts diff --git a/node_modules/.bin/husky b/node_modules/.bin/husky deleted file mode 100644 index d1e0e884..00000000 --- a/node_modules/.bin/husky +++ /dev/null @@ -1,16 +0,0 @@ -#!/bin/sh -basedir=$(dirname "$(echo "$0" | sed -e 's,\\,/,g')") - -case `uname` in - *CYGWIN*|*MINGW*|*MSYS*) - if command -v cygpath > /dev/null 2>&1; then - basedir=`cygpath -w "$basedir"` - fi - ;; -esac - -if [ -x "$basedir/node" ]; then - exec "$basedir/node" "$basedir/../husky/bin.js" "$@" -else - exec node "$basedir/../husky/bin.js" "$@" -fi diff --git a/node_modules/.bin/husky.cmd b/node_modules/.bin/husky.cmd deleted file mode 100644 index a1047404..00000000 --- a/node_modules/.bin/husky.cmd +++ /dev/null @@ -1,17 +0,0 @@ -@ECHO off -GOTO start -:find_dp0 -SET dp0=%~dp0 -EXIT /b -:start -SETLOCAL -CALL :find_dp0 - -IF EXIST "%dp0%\node.exe" ( - SET "_prog=%dp0%\node.exe" -) ELSE ( - SET "_prog=node" - SET PATHEXT=%PATHEXT:;.JS;=;% -) - -endLocal & goto #_undefined_# 2>NUL || title %COMSPEC% & "%_prog%" "%dp0%\..\husky\bin.js" %* diff --git a/node_modules/.bin/husky.ps1 b/node_modules/.bin/husky.ps1 deleted file mode 100644 index 22039529..00000000 --- a/node_modules/.bin/husky.ps1 +++ /dev/null @@ -1,28 +0,0 @@ -#!/usr/bin/env pwsh -$basedir=Split-Path $MyInvocation.MyCommand.Definition -Parent - -$exe="" -if ($PSVersionTable.PSVersion -lt "6.0" -or $IsWindows) { - # Fix case when both the Windows and Linux builds of Node - # are installed in the same directory - $exe=".exe" -} -$ret=0 -if (Test-Path "$basedir/node$exe") { - # Support pipeline input - if ($MyInvocation.ExpectingInput) { - $input | & "$basedir/node$exe" "$basedir/../husky/bin.js" $args - } else { - & "$basedir/node$exe" "$basedir/../husky/bin.js" $args - } - $ret=$LASTEXITCODE -} else { - # Support pipeline input - if ($MyInvocation.ExpectingInput) { - $input | & "node$exe" "$basedir/../husky/bin.js" $args - } else { - & "node$exe" "$basedir/../husky/bin.js" $args - } - $ret=$LASTEXITCODE -} -exit $ret diff --git a/node_modules/.package-lock.json b/node_modules/.package-lock.json deleted file mode 100644 index 49e6dda3..00000000 --- a/node_modules/.package-lock.json +++ /dev/null @@ -1,23 +0,0 @@ -{ - "name": "YieldVault-RWA", - "lockfileVersion": 3, - "requires": true, - "packages": { - "node_modules/husky": { - "version": "9.1.7", - "resolved": "https://registry.npmjs.org/husky/-/husky-9.1.7.tgz", - "integrity": "sha512-5gs5ytaNjBrh5Ow3zrvdUUY+0VxIuWVL4i9irt6friV+BqdCfmV11CQTWMiBYWHbXhco+J1kHfTOUkePhCDvMA==", - "dev": true, - "license": "MIT", - "bin": { - "husky": "bin.js" - }, - "engines": { - "node": ">=18" - }, - "funding": { - "url": "https://github.com/sponsors/typicode" - } - } - } -} diff --git a/node_modules/husky/LICENSE b/node_modules/husky/LICENSE deleted file mode 100644 index 049c9dc4..00000000 --- a/node_modules/husky/LICENSE +++ /dev/null @@ -1,21 +0,0 @@ -MIT License - -Copyright (c) 2021 typicode - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. diff --git a/node_modules/husky/README.md b/node_modules/husky/README.md deleted file mode 100644 index 405cda7d..00000000 --- a/node_modules/husky/README.md +++ /dev/null @@ -1 +0,0 @@ -https://typicode.github.io/husky diff --git a/node_modules/husky/bin.js b/node_modules/husky/bin.js deleted file mode 100644 index 244311ba..00000000 --- a/node_modules/husky/bin.js +++ /dev/null @@ -1,26 +0,0 @@ -#!/usr/bin/env node -import f, { writeFileSync as w } from 'fs' -import i from './index.js' - -let p, a, n, s, o, d - -p = process -a = p.argv[2] - -if (a == 'init') { - n = 'package.json' - s = f.readFileSync(n) - o = JSON.parse(s) - ;(o.scripts ||= {}).prepare = 'husky' - w(n, JSON.stringify(o, 0, /\t/.test(s) ? '\t' : 2) + '\n') - p.stdout.write(i()) - try { f.mkdirSync('.husky') } catch {} - w('.husky/pre-commit', (p.env.npm_config_user_agent?.split('/')[0] ?? 'npm') + ' test\n') - p.exit() -} - -d = c => console.error(`husky - ${c} command is DEPRECATED`) -if (['add', 'set', 'uninstall'].includes(a)) { d(a); p.exit(1) } -if (a == 'install') d(a) - -p.stdout.write(i(a == 'install' ? undefined : a)) diff --git a/node_modules/husky/husky b/node_modules/husky/husky deleted file mode 100644 index bf7c8964..00000000 --- a/node_modules/husky/husky +++ /dev/null @@ -1,22 +0,0 @@ -#!/usr/bin/env sh -[ "$HUSKY" = "2" ] && set -x -n=$(basename "$0") -s=$(dirname "$(dirname "$0")")/$n - -[ ! -f "$s" ] && exit 0 - -if [ -f "$HOME/.huskyrc" ]; then - echo "husky - '~/.huskyrc' is DEPRECATED, please move your code to ~/.config/husky/init.sh" -fi -i="${XDG_CONFIG_HOME:-$HOME/.config}/husky/init.sh" -[ -f "$i" ] && . "$i" - -[ "${HUSKY-}" = "0" ] && exit 0 - -export PATH="node_modules/.bin:$PATH" -sh -e "$s" "$@" -c=$? - -[ $c != 0 ] && echo "husky - $n script failed (code $c)" -[ $c = 127 ] && echo "husky - command not found in PATH=$PATH" -exit $c diff --git a/node_modules/husky/index.d.ts b/node_modules/husky/index.d.ts deleted file mode 100644 index 72a5495a..00000000 --- a/node_modules/husky/index.d.ts +++ /dev/null @@ -1 +0,0 @@ -export default function (dir?: string): string; \ No newline at end of file diff --git a/node_modules/husky/index.js b/node_modules/husky/index.js deleted file mode 100644 index bebf0d50..00000000 --- a/node_modules/husky/index.js +++ /dev/null @@ -1,25 +0,0 @@ -import c from 'child_process' -import f, { readdir, writeFileSync as w } from 'fs' -import p from 'path' - -let l = [ 'pre-commit', 'pre-merge-commit', 'prepare-commit-msg', 'commit-msg', 'post-commit', 'applypatch-msg', 'pre-applypatch', 'post-applypatch', 'pre-rebase', 'post-rewrite', 'post-checkout', 'post-merge', 'pre-push', 'pre-auto-gc' ], - msg = `echo "husky - DEPRECATED\n\nPlease remove the following two lines from $0:\n\n#!/usr/bin/env sh\n. \\"\\$(dirname -- \\"\\$0\\")/_/husky.sh\\"\n\nThey WILL FAIL in v10.0.0\n"` - -export default (d = '.husky') => { - if (process.env.HUSKY === '0') return 'HUSKY=0 skip install' - if (d.includes('..')) return '.. not allowed' - if (!f.existsSync('.git')) return `.git can't be found` - - let _ = (x = '') => p.join(d, '_', x) - let { status: s, stderr: e } = c.spawnSync('git', ['config', 'core.hooksPath', `${d}/_`]) - if (s == null) return 'git command not found' - if (s) return '' + e - - f.rmSync(_('husky.sh'), { force: true }) - f.mkdirSync(_(), { recursive: true }) - w(_('.gitignore'), '*') - f.copyFileSync(new URL('husky', import.meta.url), _('h')) - l.forEach(h => w(_(h), `#!/usr/bin/env sh\n. "\$(dirname "\$0")/h"`, { mode: 0o755 })) - w(_('husky.sh'), msg) - return '' -} diff --git a/node_modules/husky/package.json b/node_modules/husky/package.json deleted file mode 100644 index b9d3810b..00000000 --- a/node_modules/husky/package.json +++ /dev/null @@ -1,25 +0,0 @@ -{ - "name": "husky", - "version": "9.1.7", - "type": "module", - "description": "Modern native Git hooks", - "keywords": [ - "git", - "hooks", - "pre-commit" - ], - "repository": { - "type": "git", - "url": "git+https://github.com/typicode/husky.git" - }, - "funding": "https://github.com/sponsors/typicode", - "license": "MIT", - "author": "typicode", - "bin": { - "husky": "bin.js" - }, - "exports": "./index.js", - "engines": { - "node": ">=18" - } -} diff --git a/package-lock.json b/package-lock.json index c8489bc8..2aaf92d9 100644 --- a/package-lock.json +++ b/package-lock.json @@ -615,9 +615,6 @@ "arm64" ], "dev": true, - "libc": [ - "glibc" - ], "license": "MIT", "optional": true, "os": [ @@ -635,9 +632,6 @@ "arm64" ], "dev": true, - "libc": [ - "musl" - ], "license": "MIT", "optional": true, "os": [ @@ -655,9 +649,6 @@ "ppc64" ], "dev": true, - "libc": [ - "glibc" - ], "license": "MIT", "optional": true, "os": [ @@ -675,9 +666,6 @@ "s390x" ], "dev": true, - "libc": [ - "glibc" - ], "license": "MIT", "optional": true, "os": [ @@ -695,9 +683,6 @@ "x64" ], "dev": true, - "libc": [ - "glibc" - ], "license": "MIT", "optional": true, "os": [ @@ -715,9 +700,6 @@ "x64" ], "dev": true, - "libc": [ - "musl" - ], "license": "MIT", "optional": true, "os": [ @@ -1258,9 +1240,6 @@ "arm64" ], "dev": true, - "libc": [ - "glibc" - ], "license": "MPL-2.0", "optional": true, "os": [ @@ -1282,9 +1261,6 @@ "arm64" ], "dev": true, - "libc": [ - "musl" - ], "license": "MPL-2.0", "optional": true, "os": [ @@ -1306,9 +1282,6 @@ "x64" ], "dev": true, - "libc": [ - "glibc" - ], "license": "MPL-2.0", "optional": true, "os": [ @@ -1330,9 +1303,6 @@ "x64" ], "dev": true, - "libc": [ - "musl" - ], "license": "MPL-2.0", "optional": true, "os": [