Skip to content

Commit 06d9c3e

Browse files
authored
Memory optimization (#1120)
* Don't duplicate the chronik transactions to announce * Free memory as soon as possible * Splice when possible It's O(n) instead of O(2n) * Process the chronik transactions on-the-fly This changes the logic in fetchLatestTxsForAddresses to yield the chronik transactions periodically instead of waiting for the full address transactions to be fetched. With this implementation, there is a race between the download and the processing, which will drain the transactions every 500ms up to the point where there is not enough remaining txs to fill a batch. This means that the tx buffer memory can grow up to batch size (20) + how many txs can be downloaded during the polling delay. This is not a hard limit but is a clear win when processing addresses with thousands of transactions.
1 parent 7f67067 commit 06d9c3e

2 files changed

Lines changed: 62 additions & 13 deletions

File tree

constants/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,7 @@ export const CHRONIK_FETCH_N_TXS_PER_PAGE = 200
280280
export const INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY = 16
281281
export const TX_EMIT_BATCH_SIZE = 200 // for our generator, not chronik
282282
export const DB_COMMIT_BATCH_SIZE = 200 // tamanho dos lotes para commit no DB
283+
export const TX_BATCH_POLLING_DELAY = 500 // delay (ms) between polling for new batches of txs to commit to the DB
283284

284285
export const TRIGGER_POST_CONCURRENCY = 100
285286
export const TRIGGER_EMAIL_CONCURRENCY = 100

services/chronikService.ts

Lines changed: 61 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { BlockInfo, ChronikClient, ConnectionStrategy, ScriptUtxo, Tx, WsConfig, WsEndpoint, WsMsgClient, WsSubScriptClient } from 'chronik-client'
22
import { encodeCashAddress, decodeCashAddress } from 'ecashaddrjs'
33
import { AddressWithTransaction, BlockchainInfo, TransactionDetails, ProcessedMessages, SubbedAddressesLog, SyncAndSubscriptionReturn, SubscriptionReturn, SimpleBlockInfo } from 'types/chronikTypes'
4-
import { CHRONIK_MESSAGE_CACHE_DELAY, RESPONSE_MESSAGES, XEC_TIMESTAMP_THRESHOLD, XEC_NETWORK_ID, BCH_NETWORK_ID, BCH_TIMESTAMP_THRESHOLD, CHRONIK_FETCH_N_TXS_PER_PAGE, KeyValueT, NETWORK_IDS_FROM_SLUGS, SOCKET_MESSAGES, NETWORK_IDS, NETWORK_TICKERS, MainNetworkSlugsType, MAX_MEMPOOL_TXS_TO_PROCESS_AT_A_TIME, MEMPOOL_PROCESS_DELAY, CHRONIK_INITIALIZATION_DELAY, LATENCY_TEST_CHECK_DELAY, INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY, TX_EMIT_BATCH_SIZE, DB_COMMIT_BATCH_SIZE, MAX_TXS_PER_ADDRESS } from 'constants/index'
4+
import { CHRONIK_MESSAGE_CACHE_DELAY, RESPONSE_MESSAGES, XEC_TIMESTAMP_THRESHOLD, XEC_NETWORK_ID, BCH_NETWORK_ID, BCH_TIMESTAMP_THRESHOLD, CHRONIK_FETCH_N_TXS_PER_PAGE, KeyValueT, NETWORK_IDS_FROM_SLUGS, SOCKET_MESSAGES, NETWORK_IDS, NETWORK_TICKERS, MainNetworkSlugsType, MAX_MEMPOOL_TXS_TO_PROCESS_AT_A_TIME, MEMPOOL_PROCESS_DELAY, CHRONIK_INITIALIZATION_DELAY, LATENCY_TEST_CHECK_DELAY, INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY, TX_EMIT_BATCH_SIZE, DB_COMMIT_BATCH_SIZE, MAX_TXS_PER_ADDRESS, TX_BATCH_POLLING_DELAY } from 'constants/index'
55
import { productionAddresses } from 'prisma-local/seeds/addresses'
66
import prisma from 'prisma-local/clientInstance'
77
import {
@@ -344,6 +344,9 @@ export class ChronikBlockchainClient {
344344

345345
console.log(`${logPrefix} >>> starting chronik fetching for ${addressBatchSlice.length} addresses... (${syncedAlready}/${totalCount} synced)`)
346346

347+
// Track completed addresses
348+
const completedAddresses: string[] = []
349+
347350
const perAddressWorkers = addressBatchSlice.map(async (address) => {
348351
const addrLogPrefix = `${logPrefix} > ${address.address}:`
349352
const lastSyncedTimestampSeconds = this.getLastSyncTs(address)
@@ -389,6 +392,7 @@ export class ChronikBlockchainClient {
389392
const newTxsInThisPage = pageTxs.length
390393
if (newTxsInThisPage > 0) {
391394
chronikTxs.push(...pageTxs.map(tx => ({ tx, address })))
395+
pageTxs = []
392396
}
393397

394398
if (oldestTs < lastSyncedTimestampSeconds) {
@@ -404,22 +408,60 @@ export class ChronikBlockchainClient {
404408
if (newTxs > 0) {
405409
console.log(`${addrLogPrefix} ${newTxs} new txs.`)
406410
}
411+
completedAddresses.push(address.address)
407412
})
408413
syncedAlready += addressBatchSlice.length
409414

410-
await Promise.all(
415+
// Start workers but don't wait - yield batches while they're running
416+
const workersPromise = Promise.all(
411417
perAddressWorkers.map(async worker =>
412418
await worker.catch(err => console.error(`${logPrefix}: address job failed: ${err.message as string}`))
413419
)
414420
)
415421

416-
// Yield full TX batches when buffer reaches TX_EMIT_BATCH_SIZE
417-
while (chronikTxs.length >= TX_EMIT_BATCH_SIZE) {
418-
const chronikTxsSlice = chronikTxs.slice(0, TX_EMIT_BATCH_SIZE)
419-
chronikTxs = chronikTxs.slice(TX_EMIT_BATCH_SIZE)
420-
yield { chronikTxs: chronikTxsSlice, addressesSynced: [] }
422+
// Race between worker completion and periodic checks to yield batches incrementally
423+
let allWorkersDone = false
424+
425+
while (!allWorkersDone || chronikTxs.length > 0) {
426+
// Yield batches if buffer is large enough
427+
while (chronikTxs.length >= TX_EMIT_BATCH_SIZE) {
428+
const chronikTxsSlice = chronikTxs.splice(0, TX_EMIT_BATCH_SIZE)
429+
yield { chronikTxs: chronikTxsSlice, addressesSynced: [] }
430+
}
431+
432+
// If workers are done, yield any remaining transactions (even if < batch size)
433+
if (allWorkersDone && chronikTxs.length > 0) {
434+
// This clears chronikTxs so the below check for length === 0 is true
435+
const remaining = chronikTxs.splice(0)
436+
yield { chronikTxs: remaining, addressesSynced: [] }
437+
}
438+
439+
// Yield completed addresses if any
440+
if (completedAddresses.length > 0) {
441+
const completed = completedAddresses.splice(0)
442+
yield { chronikTxs: [], addressesSynced: completed }
443+
}
444+
445+
// If workers are done and no more transactions, break
446+
if (allWorkersDone && chronikTxs.length === 0) {
447+
break
448+
}
449+
450+
// Wait a bit or until workers complete
451+
const raceResult = await Promise.race([
452+
workersPromise.then(() => true),
453+
new Promise<boolean>(resolve => setTimeout(() => resolve(false), TX_BATCH_POLLING_DELAY))
454+
])
455+
456+
// Update flag if workers completed
457+
if (raceResult) {
458+
allWorkersDone = true
459+
}
421460
}
422461

462+
// Ensure all workers are finished
463+
await workersPromise
464+
423465
// Yield batch marker for completed address group
424466
yield { chronikTxs: [], addressesSynced: lastBatchAddresses }
425467
}
@@ -836,19 +878,22 @@ export class ChronikBlockchainClient {
836878
}
837879

838880
if (createdTxs.length > 0) {
839-
const rawByHash = new Map(commitTuples.map(p => [p.raw.txid, p.raw]))
840881
const triggerBatch: BroadcastTxData[] = []
841882
for (const createdTx of createdTxs) {
842-
const raw = rawByHash.get(createdTx.hash)
843-
if (raw == null) {
883+
const tuple = commitTuples.find(t => t.row.hash === createdTx.hash)
884+
if (tuple == null) {
844885
continue
845886
}
846-
const bd = this.broadcastIncomingTx(createdTx.address.address, raw, createdTx)
887+
const bd = this.broadcastIncomingTx(createdTx.address.address, tuple.raw, createdTx)
847888
triggerBatch.push(bd)
848889
}
849890
if (runTriggers && triggerBatch.length > 0) {
850891
await executeTriggersBatch(triggerBatch, this.networkId)
851892
}
893+
894+
// Release memory
895+
createdTxs.length = 0
896+
triggerBatch.length = 0
852897
}
853898

854899
// Get the latest timestamp of all committed transactions (including pre-existent) for each address.
@@ -933,11 +978,14 @@ export class ChronikBlockchainClient {
933978
}
934979

935980
toCommit.push(...tupleFromBatch)
981+
// Release memory
982+
tupleFromBatch.length = 0
936983

937984
if (toCommit.length >= DB_COMMIT_BATCH_SIZE) {
938-
const commitPairs = toCommit.slice(0, DB_COMMIT_BATCH_SIZE)
939-
toCommit = toCommit.slice(DB_COMMIT_BATCH_SIZE)
985+
const commitPairs = toCommit.splice(0, DB_COMMIT_BATCH_SIZE)
940986
await this.commitTransactionsBatch(commitPairs, productionAddressesIds, runTriggers)
987+
// Clear commitPairs
988+
commitPairs.length = 0
941989
}
942990
} catch (err: any) {
943991
console.error(`${this.CHRONIK_MSG_PREFIX}: ERROR in batch (scoped): ${err.message as string}`)

0 commit comments

Comments
 (0)