Skip to content

Commit 009070e

Browse files
authored
Add transaction locking for nonce management (#132)
* Update to make transaction processing a prisma. * Update queries to support pgtx * Use transaction with timeout * Add locking on getWalletNonce * Update nodemon to watch src
1 parent 671a175 commit 009070e

22 files changed

Lines changed: 463 additions & 262 deletions

core/env.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ export const env = createEnv({
6161
MAX_PRIORITY_FEE_PER_GAS_FOR_RETRY: z.string().default("55000000000"),
6262
MAX_RETRIES_FOR_TX: z.coerce.number().default(3),
6363
RETRY_TX_CRON_SCHEDULE: z.string().default("*/30 * * * * *"),
64-
MAX_BLOCKS_ELAPSED_BEFORE_RETRY: z.coerce.number().default(50),
64+
MAX_BLOCKS_ELAPSED_BEFORE_RETRY: z.coerce.number().default(10),
6565
MAX_WAIT_TIME_BEFORE_RETRY: z.coerce.number().default(600),
6666
},
6767
clientPrefix: "NEVER_USED",

core/sdk/sdk.ts

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import {
1111
walletTableSchema,
1212
} from "../../server/schemas/wallet";
1313
import { getWalletDetails } from "../../src/db/wallets/getWalletDetails";
14+
import { PrismaTransaction } from "../../src/schema/prisma";
1415
import { env } from "../env";
1516
import { networkResponseSchema } from "../schema";
1617

@@ -95,7 +96,11 @@ const cacheSdk = (
9596
};
9697

9798
const walletDataMap: Map<string, string> = new Map();
98-
const getCachedWallet = async (walletAddress: string, chainId: number) => {
99+
const getCachedWallet = async (
100+
walletAddress: string,
101+
chainId: number,
102+
pgtx?: PrismaTransaction,
103+
) => {
99104
walletAddress = walletAddress.toLowerCase();
100105
let walletData;
101106
const cachedWallet = walletDataMap.get(walletAddress);
@@ -104,7 +109,11 @@ const getCachedWallet = async (walletAddress: string, chainId: number) => {
104109
} else {
105110
console.log("Checking details for address", walletAddress);
106111
// TODO: This needs to be changed...
107-
walletData = await getWalletDetails({ address: walletAddress, chainId });
112+
walletData = await getWalletDetails({
113+
pgtx,
114+
address: walletAddress,
115+
chainId,
116+
});
108117
console.log("Received wallet data:", walletData);
109118
if (walletData) {
110119
walletDataMap.set(walletAddress, JSON.stringify(walletData));
@@ -121,6 +130,7 @@ const THIRDWEB_API_SECRET_KEY = env.THIRDWEB_API_SECRET_KEY;
121130
export const getSDK = async (
122131
chainName: ChainOrRpc,
123132
walletAddress?: string,
133+
pgtx?: PrismaTransaction,
124134
): Promise<ThirdwebSDK> => {
125135
let walletData: Static<typeof walletTableSchema> | undefined;
126136

@@ -158,7 +168,7 @@ export const getSDK = async (
158168
return sdk;
159169
}
160170

161-
walletData = await getCachedWallet(walletAddress, chain.chainId);
171+
walletData = await getCachedWallet(walletAddress, chain.chainId, pgtx);
162172

163173
if (!walletData) {
164174
throw new Error(`Wallet not found for address: ${walletAddress}`);

package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@
1010
"docker": "docker compose --env-file ./.env up --remove-orphans",
1111
"docker:build": "docker compose build --no-cache",
1212
"dev": "yarn dev:infra && yarn prisma:init && yarn dev:server & sleep 10 && yarn dev:worker",
13-
"dev:server": "yarn prisma:init && nodemon --watch 'server/**/*.ts' --watch 'core/**/*.ts' --exec 'npx tsx ./server/index.ts' --files server/index.ts",
14-
"dev:worker": "yarn prisma:init && nodemon --watch 'worker/**/*.ts' --watch 'core/**/*.ts' --exec 'npx tsx ./worker/index.ts' --files worker/index.ts",
13+
"dev:server": "yarn prisma:init && nodemon --watch 'server/**/*.ts' --watch 'core/**/*.ts' --watch 'src/**/*.ts' --exec 'npx tsx ./server/index.ts' --files server/index.ts",
14+
"dev:worker": "yarn prisma:init && nodemon --watch 'worker/**/*.ts' --watch 'core/**/*.ts' --watch 'src/**/*.ts' --exec 'npx tsx ./worker/index.ts' --files worker/index.ts",
1515
"dev:infra": "docker compose -f ./docker-compose-infra.yml up -d",
1616
"build": "yarn && rm -rf dist && tsc -p ./tsconfig.json --outDir dist",
1717
"prisma:reset": "prisma migrate reset --force && prisma generate",

src/db/client.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
11
import { PrismaClient } from "@prisma/client";
2+
import { PrismaTransaction } from "../schema/prisma";
23

34
export const prisma = new PrismaClient();
5+
6+
export const getPrismaWithPostgresTx = (pgtx?: PrismaTransaction) => {
7+
return pgtx || prisma;
8+
};

src/db/transactions/getAllTxs.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,20 @@ import {
44
transactionResponseSchema,
55
} from "../../../server/schemas/transaction";
66
import { ContractExtension } from "../../schema/extension";
7-
import { prisma } from "../client";
7+
import { PrismaTransaction } from "../../schema/prisma";
8+
import { getPrismaWithPostgresTx } from "../client";
89
import { cleanTxs } from "./cleanTxs";
910

1011
interface GetAllTxsParams {
12+
pgtx?: PrismaTransaction;
1113
page: number;
1214
limit: number;
1315
filter?: TransactionStatusEnum;
1416
extensions?: ContractExtension[];
1517
}
1618

1719
export const getAllTxs = async ({
20+
pgtx,
1821
page,
1922
limit,
2023
filter,
@@ -40,6 +43,8 @@ export const getAllTxs = async ({
4043
filterBy = "errorMessage";
4144
}
4245

46+
const prisma = getPrismaWithPostgresTx(pgtx);
47+
4348
// TODO: Cleaning should be handled by zod
4449
const txs = await prisma.transactions.findMany({
4550
where: {

src/db/transactions/getQueuedTxs.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,19 @@
11
import { Static } from "@sinclair/typebox";
22
import { env } from "../../../core/env";
33
import { transactionResponseSchema } from "../../../server/schemas/transaction";
4-
import { prisma } from "../client";
4+
import { PrismaTransaction } from "../../schema/prisma";
5+
import { getPrismaWithPostgresTx } from "../client";
56
import { cleanTxs } from "./cleanTxs";
67

7-
export const getQueuedTxs = async (): Promise<
8+
interface GetQueuedTxsParams {
9+
pgtx?: PrismaTransaction;
10+
}
11+
12+
export const getQueuedTxs = async ({ pgtx }: GetQueuedTxsParams = {}): Promise<
813
Static<typeof transactionResponseSchema>[]
914
> => {
15+
const prisma = getPrismaWithPostgresTx(pgtx);
16+
1017
// TODO: Don't use env var for transactions to batch
1118
const txs = await prisma.$queryRaw`
1219
SELECT

src/db/transactions/getSentTxs.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,19 @@
11
import { Static } from "@sinclair/typebox";
22
import { env } from "../../../core/env";
33
import { transactionResponseSchema } from "../../../server/schemas/transaction";
4-
import { prisma } from "../client";
4+
import { PrismaTransaction } from "../../schema/prisma";
5+
import { getPrismaWithPostgresTx } from "../client";
56
import { cleanTxs } from "./cleanTxs";
67

7-
export const getSentTxs = async (): Promise<
8+
interface GetSentTxsParams {
9+
pgtx?: PrismaTransaction;
10+
}
11+
12+
export const getSentTxs = async ({ pgtx }: GetSentTxsParams = {}): Promise<
813
Static<typeof transactionResponseSchema>[]
914
> => {
15+
const prisma = getPrismaWithPostgresTx(pgtx);
16+
1017
const txs = await prisma.transactions.findMany({
1118
where: {
1219
processedAt: {

src/db/transactions/getTxById.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
1-
import { prisma } from "../client";
1+
import { PrismaTransaction } from "../../schema/prisma";
2+
import { getPrismaWithPostgresTx } from "../client";
23
import { cleanTxs } from "./cleanTxs";
34

45
interface GetTxByIdParams {
6+
pgtx?: PrismaTransaction;
57
queueId: string;
68
}
79

8-
export const getTxById = async ({ queueId }: GetTxByIdParams) => {
10+
export const getTxById = async ({ pgtx, queueId }: GetTxByIdParams) => {
11+
const prisma = getPrismaWithPostgresTx(pgtx);
12+
913
const tx = await prisma.transactions.findUnique({
1014
where: {
1115
id: queueId,

src/db/transactions/getTxToRetry.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,18 @@
11
import { Static } from "@sinclair/typebox";
22
import { transactionResponseSchema } from "../../../server/schemas/transaction";
3-
import { prisma } from "../client";
3+
import type { PrismaTransaction } from "../../schema/prisma";
4+
import { getPrismaWithPostgresTx } from "../client";
45
import { cleanTxs } from "./cleanTxs";
56

6-
export const getTxToRetry = async (): Promise<
7+
interface GetTxToRetryParams {
8+
pgtx?: PrismaTransaction;
9+
}
10+
11+
export const getTxToRetry = async ({ pgtx }: GetTxToRetryParams = {}): Promise<
712
Static<typeof transactionResponseSchema>[]
813
> => {
14+
const prisma = getPrismaWithPostgresTx(pgtx);
15+
916
// TODO: Why is this checking that transaction hash is not null
1017
const txs = await prisma.$queryRaw`
1118
SELECT

src/db/transactions/queueTx.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
import type { DeployTransaction, Transaction } from "@thirdweb-dev/sdk";
22
import { BigNumber } from "ethers";
33
import type { ContractExtension } from "../../schema/extension";
4-
import { prisma } from "../client";
4+
import { PrismaTransaction } from "../../schema/prisma";
5+
import { getPrismaWithPostgresTx } from "../client";
56

67
interface QueueTxParams {
8+
pgtx?: PrismaTransaction;
79
tx: Transaction<any> | DeployTransaction;
810
chainId: number;
911
extension: ContractExtension;
@@ -14,12 +16,15 @@ interface QueueTxParams {
1416

1517
// TODO: Simulation should be done before this function...
1618
export const queueTx = async ({
19+
pgtx,
1720
tx,
1821
chainId,
1922
extension,
2023
deployedContractAddress,
2124
deployedContractType,
2225
}: QueueTxParams) => {
26+
const prisma = getPrismaWithPostgresTx(pgtx);
27+
2328
// TODO: SDK should have a JSON.stringify() method.
2429
const fromAddress = (await tx.getSignerAddress()).toLowerCase();
2530
const toAddress = tx.getTarget().toLowerCase();

0 commit comments

Comments
 (0)