Skip to content

Commit 3089793

Browse files
Fabcienchedieck
andauthored
Store transaction inputs/outputs address as string (#1116)
* Store transaction inputs/outputs address as string This release some constraints on the database and improve the performances. * Reduce concurrency For the chronik calls, and the db batch sizes. This avoids too long db transactions from impacting the performances and allow to start working faster on the chronik transactions. * Update the logic in createManyTransactions This avoid initiating a DB transaction for each tx, and favor bulk operations instead. Since it also removes the batch processing this removes the connection limits to the db issue entirely when an address has too many txs. * Optimize getTransactionFromChronikTransaction This is a 2-step optimization: - remove unneeded async in satoshisToUnit() callsites, this reduce locks - optimize the inputs and outputs processing by avoiding looping several times, avoid copying arrays and doing redundant formatting/checks. This also fixes a logging error and a loop exit condition that could potentially cause an infinite loop. Inspired by 6298e84 and c4cb339. This is a ~500x improvement on my machine. * fix: one network at a time * fix: current prices after initial sync * Fix the chronik service tests * Remove TransactionOutput from the database It's unclear what the use case is and it was added in #1110 to avoid having to do it later. It appears that this table is costly as it contains a lot of outputs. Let's remove it for now to favor disk size and performance. We can add it back later if we need it. --------- Co-authored-by: Estevão <estevao@chedieck.com>
1 parent beb6a65 commit 3089793

12 files changed

Lines changed: 315 additions & 169 deletions

File tree

constants/index.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -277,9 +277,9 @@ export const MEMPOOL_PROCESS_DELAY = 100
277277
// On chronik, the max allowed is 200
278278
export const CHRONIK_FETCH_N_TXS_PER_PAGE = 200
279279

280-
export const INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY = 128
281-
export const TX_EMIT_BATCH_SIZE = 2_000 // for our generator, not chronik
282-
export const DB_COMMIT_BATCH_SIZE = 2_000 // tamanho dos lotes para commit no DB
280+
export const INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY = 16
281+
export const TX_EMIT_BATCH_SIZE = 200 // for our generator, not chronik
282+
export const DB_COMMIT_BATCH_SIZE = 200 // tamanho dos lotes para commit no DB
283283

284284
export const TRIGGER_POST_CONCURRENCY = 100
285285
export const TRIGGER_EMAIL_CONCURRENCY = 100

jobs/initJobs.ts

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,7 @@ const main = async (): Promise<void> => {
1616
await blockchainQueue.obliterate({ force: true })
1717
await cleanupQueue.obliterate({ force: true })
1818

19-
await pricesQueue.add('syncCurrentPrices',
20-
{},
21-
{
22-
jobId: 'syncCurrentPrices',
23-
removeOnFail: false,
24-
repeat: {
25-
every: CURRENT_PRICE_REPEAT_DELAY
26-
}
27-
}
28-
)
29-
30-
await syncCurrentPricesWorker(pricesQueue.name)
31-
19+
// Start blockchain sync first; current prices job starts only after it completes
3220
await blockchainQueue.add('syncBlockchainAndPrices',
3321
{},
3422
{
@@ -37,7 +25,20 @@ const main = async (): Promise<void> => {
3725
removeOnFail: true
3826
}
3927
)
40-
await syncBlockchainAndPricesWorker(blockchainQueue.name)
28+
void await syncBlockchainAndPricesWorker(blockchainQueue.name, async () => {
29+
await pricesQueue.add('syncCurrentPrices',
30+
{},
31+
{
32+
jobId: 'syncCurrentPrices',
33+
removeOnFail: false,
34+
repeat: {
35+
every: CURRENT_PRICE_REPEAT_DELAY
36+
}
37+
}
38+
)
39+
await syncCurrentPricesWorker(pricesQueue.name)
40+
console.log('Current prices sync job started after blockchain sync completion.')
41+
})
4142

4243
await cleanupQueue.add(
4344
'cleanupClientPayments',

jobs/workers.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ export const syncCurrentPricesWorker = async (queueName: string): Promise<void>
3131
})
3232
}
3333

34-
export const syncBlockchainAndPricesWorker = async (queueName: string): Promise<void> => {
34+
export const syncBlockchainAndPricesWorker = async (queueName: string, onComplete?: () => Promise<void> | void): Promise<void> => {
3535
const worker = new Worker(
3636
queueName,
3737
async (job) => {
@@ -53,6 +53,7 @@ export const syncBlockchainAndPricesWorker = async (queueName: string): Promise<
5353
await multiBlockchainClient.destroy()
5454
console.log('Done.')
5555
console.log(`job ${job.id as string}: blockchain + prices sync finished`)
56+
await onComplete?.()
5657
})()
5758
})
5859

pages/api/address/balance/[address].ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ export default async (req: NextApiRequest, res: NextApiResponse): Promise<void>
1717
try {
1818
const address = parseAddress(req.query.address as string)
1919
const response = await multiBlockchainClient.getBalance(address)
20-
const balance = await satoshisToUnit(response, xecaddr.detectAddressFormat(address))
20+
const balance = satoshisToUnit(response, xecaddr.detectAddressFormat(address))
2121
res.status(200).send(balance)
2222
} catch (err: any) {
2323
switch (err.message) {
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
-- Drop foreign key constraints
2+
ALTER TABLE `TransactionInput` DROP FOREIGN KEY `TransactionInput_addressId_fkey`;
3+
ALTER TABLE `TransactionOutput` DROP FOREIGN KEY `TransactionOutput_addressId_fkey`;
4+
5+
-- Drop old indexes
6+
ALTER TABLE `TransactionInput` DROP INDEX `TransactionInput_addressId_idx`;
7+
ALTER TABLE `TransactionOutput` DROP INDEX `TransactionOutput_addressId_idx`;
8+
9+
-- Add new address column (nullable temporarily)
10+
ALTER TABLE `TransactionInput` ADD COLUMN `address` VARCHAR(255) NULL;
11+
ALTER TABLE `TransactionOutput` ADD COLUMN `address` VARCHAR(255) NULL;
12+
13+
-- Populate address column from Address table
14+
UPDATE `TransactionInput` ti
15+
INNER JOIN `Address` a ON ti.`addressId` = a.`id`
16+
SET ti.`address` = a.`address`;
17+
18+
UPDATE `TransactionOutput` tout
19+
INNER JOIN `Address` a ON tout.`addressId` = a.`id`
20+
SET tout.`address` = a.`address`;
21+
22+
-- Make address column NOT NULL
23+
ALTER TABLE `TransactionInput` MODIFY COLUMN `address` VARCHAR(255) NOT NULL;
24+
ALTER TABLE `TransactionOutput` MODIFY COLUMN `address` VARCHAR(255) NOT NULL;
25+
26+
-- Delete Address entries that were only used by TransactionInput/TransactionOutput
27+
-- These addresses are no longer needed since we store addresses as strings
28+
DELETE a FROM `Address` a
29+
WHERE (
30+
-- Address is referenced in TransactionInput or TransactionOutput
31+
EXISTS (SELECT 1 FROM `TransactionInput` ti WHERE ti.`addressId` = a.`id`)
32+
OR EXISTS (SELECT 1 FROM `TransactionOutput` tout WHERE tout.`addressId` = a.`id`)
33+
)
34+
AND NOT (
35+
-- But exclude addresses that are still used elsewhere
36+
EXISTS (SELECT 1 FROM `Transaction` t WHERE t.`addressId` = a.`id`)
37+
OR EXISTS (SELECT 1 FROM `AddressesOnUserProfiles` aup WHERE aup.`addressId` = a.`id`)
38+
OR EXISTS (SELECT 1 FROM `AddressesOnButtons` ab WHERE ab.`addressId` = a.`id`)
39+
OR EXISTS (SELECT 1 FROM `ClientPayment` cp WHERE cp.`addressString` = a.`address`)
40+
);
41+
42+
-- Drop old addressId column
43+
ALTER TABLE `TransactionInput` DROP COLUMN `addressId`;
44+
ALTER TABLE `TransactionOutput` DROP COLUMN `addressId`;
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
-- Remove TransactionOutput table
2+
-- This table is being removed as its use case is unclear and it impacts performance
3+
4+
DROP TABLE IF EXISTS `TransactionOutput`;

prisma-local/schema.prisma

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@ model Address {
2323
paybuttons AddressesOnButtons[]
2424
transactions Transaction[]
2525
clientPayments ClientPayment[]
26-
transactionInputs TransactionInput[]
27-
transactionOutputs TransactionOutput[]
2826
2927
@@index([networkId], map: "Address_networkId_fkey")
3028
}
@@ -81,7 +79,6 @@ model Transaction {
8179
prices PricesOnTransactions[]
8280
invoices Invoice[]
8381
inputs TransactionInput[]
84-
outputs TransactionOutput[]
8582
8683
createdAt DateTime @default(now())
8784
updatedAt DateTime @updatedAt
@@ -93,28 +90,14 @@ model Transaction {
9390
model TransactionInput {
9491
id String @id @default(dbgenerated("(uuid())"))
9592
transactionId String
96-
addressId String
93+
address String @db.VarChar(255)
9794
index Int
9895
transaction Transaction @relation(fields: [transactionId], references: [id], onDelete: Cascade)
99-
address Address @relation(fields: [addressId], references: [id], onDelete: Cascade)
10096
amount Decimal @db.Decimal(24, 8)
10197
10298
@@index([transactionId])
103-
@@index([addressId])
10499
}
105100

106-
model TransactionOutput {
107-
id String @id @default(dbgenerated("(uuid())"))
108-
transactionId String
109-
addressId String
110-
index Int
111-
transaction Transaction @relation(fields: [transactionId], references: [id], onDelete: Cascade)
112-
address Address @relation(fields: [addressId], references: [id], onDelete: Cascade)
113-
amount Decimal @db.Decimal(24, 8)
114-
115-
@@index([transactionId])
116-
@@index([addressId])
117-
}
118101

119102
model Wallet {
120103
id String @id @default(dbgenerated("(uuid())"))

0 commit comments

Comments
 (0)