Skip to content

Commit 6e74291

Browse files
eabdelmoneimclaude
andcommitted
feat: add transaction backfill fallback for queue ID lookups
- Add ENABLE_TX_BACKFILL_FALLBACK env var (default: false) - Add backfill methods to TransactionDB (getBackfillHash, setBackfill, bulkSetBackfill) - Add fallback lookup in /transaction/logs endpoint - Add POST /admin/backfill endpoint for loading backfill data When enabled, the /transaction/logs endpoint will check a fallback Redis table (backfill:<queueId>) when the primary transaction cache misses. This allows recovering transaction logs for queue IDs that were pruned from the main cache. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent c50bc44 commit 6e74291

5 files changed

Lines changed: 122 additions & 0 deletions

File tree

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
import { type Static, Type } from "@sinclair/typebox";
2+
import type { FastifyInstance } from "fastify";
3+
import { StatusCodes } from "http-status-codes";
4+
import { TransactionDB } from "../../../shared/db/transactions/db";
5+
import { standardResponseSchema } from "../../schemas/shared-api-schemas";
6+
7+
const requestBodySchema = Type.Object({
8+
entries: Type.Array(
9+
Type.Object({
10+
queueId: Type.String({ description: "Queue ID (UUID)" }),
11+
transactionHash: Type.String({ description: "Transaction hash (0x...)" }),
12+
}),
13+
{ description: "Array of queueId to transactionHash mappings", maxItems: 10000 },
14+
),
15+
});
16+
17+
const responseBodySchema = Type.Object({
18+
result: Type.Object({
19+
inserted: Type.Integer({ description: "Number of entries inserted" }),
20+
skipped: Type.Integer({
21+
description: "Number of entries skipped (already exist)",
22+
}),
23+
}),
24+
});
25+
26+
export async function loadBackfillRoute(fastify: FastifyInstance) {
27+
fastify.route<{
28+
Body: Static<typeof requestBodySchema>;
29+
Reply: Static<typeof responseBodySchema>;
30+
}>({
31+
method: "POST",
32+
url: "/admin/backfill",
33+
schema: {
34+
summary: "Load backfill entries",
35+
description:
36+
"Load queueId to transactionHash mappings into the backfill table. Uses SETNX to never overwrite existing entries.",
37+
tags: ["Admin"],
38+
operationId: "loadBackfill",
39+
body: requestBodySchema,
40+
response: {
41+
...standardResponseSchema,
42+
[StatusCodes.OK]: responseBodySchema,
43+
},
44+
hide: true,
45+
},
46+
handler: async (request, reply) => {
47+
const { entries } = request.body;
48+
49+
const { inserted, skipped } =
50+
await TransactionDB.bulkSetBackfill(entries);
51+
52+
reply.status(StatusCodes.OK).send({
53+
result: { inserted, skipped },
54+
});
55+
},
56+
});
57+
}

src/server/routes/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import type { FastifyInstance } from "fastify";
2+
import { loadBackfillRoute } from "./admin/backfill";
23
import { getNonceDetailsRoute } from "./admin/nonces";
34
import { getTransactionDetails } from "./admin/transaction";
45
import { createAccessToken } from "./auth/access-tokens/create";
@@ -297,4 +298,5 @@ export async function withRoutes(fastify: FastifyInstance) {
297298
// Admin
298299
await fastify.register(getTransactionDetails);
299300
await fastify.register(getNonceDetailsRoute);
301+
await fastify.register(loadBackfillRoute);
300302
}

src/server/routes/transaction/blockchain/get-logs.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import { resolveContractAbi } from "thirdweb/contract";
1515
import type { TransactionReceipt } from "thirdweb/transaction";
1616
import { TransactionDB } from "../../../../shared/db/transactions/db";
1717
import { getChain } from "../../../../shared/utils/chain";
18+
import { env } from "../../../../shared/utils/env";
1819
import { thirdwebClient } from "../../../../shared/utils/sdk";
1920
import { createCustomError } from "../../../middleware/error";
2021
import { AddressSchema, TransactionHashSchema } from "../../../schemas/address";
@@ -153,10 +154,19 @@ export async function getTransactionLogs(fastify: FastifyInstance) {
153154
// Get the transaction hash from the provided input.
154155
let hash: Hex | undefined;
155156
if (queueId) {
157+
// Primary lookup
156158
const transaction = await TransactionDB.get(queueId);
157159
if (transaction?.status === "mined") {
158160
hash = transaction.transactionHash;
159161
}
162+
163+
// Fallback to backfill table if enabled and not found
164+
if (!hash && env.ENABLE_TX_BACKFILL_FALLBACK) {
165+
const backfillHash = await TransactionDB.getBackfillHash(queueId);
166+
if (backfillHash) {
167+
hash = backfillHash as Hex;
168+
}
169+
}
160170
} else if (transactionHash) {
161171
hash = transactionHash as Hex;
162172
}

src/shared/db/transactions/db.ts

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ export class TransactionDB {
3737
private static minedTransactionsKey = "transaction:mined";
3838
private static cancelledTransactionsKey = "transaction:cancelled";
3939
private static erroredTransactionsKey = "transaction:errored";
40+
private static backfillKey = (queueId: string) => `backfill:${queueId}`;
4041

4142
/**
4243
* Inserts or replaces a transaction details.
@@ -208,6 +209,55 @@ export class TransactionDB {
208209

209210
return numPruned;
210211
};
212+
213+
/**
214+
* Gets transaction hash from backfill table.
215+
*/
216+
static getBackfillHash = async (queueId: string): Promise<string | null> => {
217+
return redis.get(this.backfillKey(queueId));
218+
};
219+
220+
/**
221+
* Sets a backfill entry. Uses SETNX to never overwrite.
222+
* @returns true if set, false if already exists
223+
*/
224+
static setBackfill = async (
225+
queueId: string,
226+
transactionHash: string,
227+
): Promise<boolean> => {
228+
const result = await redis.setnx(
229+
this.backfillKey(queueId),
230+
transactionHash,
231+
);
232+
return result === 1;
233+
};
234+
235+
/**
236+
* Bulk set backfill entries.
237+
* @returns { inserted: number, skipped: number }
238+
*/
239+
static bulkSetBackfill = async (
240+
entries: Array<{ queueId: string; transactionHash: string }>,
241+
): Promise<{ inserted: number; skipped: number }> => {
242+
let inserted = 0;
243+
let skipped = 0;
244+
245+
const pipeline = redis.pipeline();
246+
for (const { queueId, transactionHash } of entries) {
247+
pipeline.setnx(this.backfillKey(queueId), transactionHash);
248+
}
249+
250+
const results = await pipeline.exec();
251+
for (const [err, result] of results ?? []) {
252+
if (!err && result === 1) {
253+
inserted++;
254+
} else {
255+
skipped++;
256+
}
257+
}
258+
259+
return { inserted, skipped };
260+
};
211261
}
212262

213263
const toSeconds = (timestamp: Date) => timestamp.getTime() / 1000;

src/shared/utils/env.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,8 @@ export const env = createEnv({
9898

9999
SEND_WEBHOOK_QUEUE_CONCURRENCY: z.coerce.number().default(10),
100100

101+
ENABLE_TX_BACKFILL_FALLBACK: boolEnvSchema(false),
102+
101103
/**
102104
* Experimental env vars. These may be renamed or removed in future non-major releases.
103105
*/
@@ -177,6 +179,7 @@ export const env = createEnv({
177179
EXPERIMENTAL__RETRY_PREPARE_USEROP_ERRORS:
178180
process.env.EXPERIMENTAL__RETRY_PREPARE_USEROP_ERRORS,
179181
SEND_WEBHOOK_QUEUE_CONCURRENCY: process.env.SEND_WEBHOOK_QUEUE_CONCURRENCY,
182+
ENABLE_TX_BACKFILL_FALLBACK: process.env.ENABLE_TX_BACKFILL_FALLBACK,
180183
},
181184
onValidationError: (error: ZodError) => {
182185
console.error(

0 commit comments

Comments
 (0)