Skip to content

Commit 50bd730

Browse files
committed
fix: rewrite initial sync logic & reduce parallelism
wip
1 parent 45175e7 commit 50bd730

2 files changed

Lines changed: 71 additions & 65 deletions

File tree

constants/index.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -276,10 +276,10 @@ export const MEMPOOL_PROCESS_DELAY = 100
276276
// When fetching some address transactions, number of transactions to fetch at a time.
277277
// On chronik, the max allowed is 200
278278
export const CHRONIK_FETCH_N_TXS_PER_PAGE = 200
279-
280-
export const INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY = 64
281-
export const TX_EMIT_BATCH_SIZE = 1_000 // for our generator, not chronik
282-
export const DB_COMMIT_BATCH_SIZE = 1_000 // tamanho dos lotes para commit no DB
279+
export const INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY = 10
280+
export const TX_EMIT_BATCH_SIZE = 500
281+
export const TRANSFORM_TX_FROM_CHRONIK_BATCH_SIZE = 50
282+
export const DB_COMMIT_BATCH_SIZE = 500
283283

284284
export const TRIGGER_POST_CONCURRENCY = 100
285285
export const TRIGGER_EMAIL_CONCURRENCY = 100

services/chronikService.ts

Lines changed: 67 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { BlockInfo, ChronikClient, ConnectionStrategy, ScriptUtxo, Tx, WsConfig, WsEndpoint, WsMsgClient, WsSubScriptClient } from 'chronik-client'
22
import { encodeCashAddress, decodeCashAddress } from 'ecashaddrjs'
33
import { AddressWithTransaction, BlockchainInfo, TransactionDetails, ProcessedMessages, SubbedAddressesLog, SyncAndSubscriptionReturn, SubscriptionReturn, SimpleBlockInfo } from 'types/chronikTypes'
4-
import { CHRONIK_MESSAGE_CACHE_DELAY, RESPONSE_MESSAGES, XEC_TIMESTAMP_THRESHOLD, XEC_NETWORK_ID, BCH_NETWORK_ID, BCH_TIMESTAMP_THRESHOLD, CHRONIK_FETCH_N_TXS_PER_PAGE, KeyValueT, NETWORK_IDS_FROM_SLUGS, SOCKET_MESSAGES, NETWORK_IDS, NETWORK_TICKERS, MainNetworkSlugsType, MAX_MEMPOOL_TXS_TO_PROCESS_AT_A_TIME, MEMPOOL_PROCESS_DELAY, CHRONIK_INITIALIZATION_DELAY, LATENCY_TEST_CHECK_DELAY, INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY, TX_EMIT_BATCH_SIZE, DB_COMMIT_BATCH_SIZE } from 'constants/index'
4+
import { CHRONIK_MESSAGE_CACHE_DELAY, RESPONSE_MESSAGES, XEC_TIMESTAMP_THRESHOLD, XEC_NETWORK_ID, BCH_NETWORK_ID, BCH_TIMESTAMP_THRESHOLD, CHRONIK_FETCH_N_TXS_PER_PAGE, KeyValueT, NETWORK_IDS_FROM_SLUGS, SOCKET_MESSAGES, NETWORK_IDS, NETWORK_TICKERS, MainNetworkSlugsType, MAX_MEMPOOL_TXS_TO_PROCESS_AT_A_TIME, MEMPOOL_PROCESS_DELAY, CHRONIK_INITIALIZATION_DELAY, LATENCY_TEST_CHECK_DELAY, INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY, TX_EMIT_BATCH_SIZE, DB_COMMIT_BATCH_SIZE, TRANSFORM_TX_FROM_CHRONIK_BATCH_SIZE } from 'constants/index'
55
import { productionAddresses } from 'prisma-local/seeds/addresses'
66
import {
77
TransactionWithAddressAndPrices,
@@ -338,10 +338,20 @@ export class ChronikBlockchainClient {
338338
return (await this.chronik.script(type, hash160).history(page, pageSize)).txs
339339
}
340340

341+
private async fetchPage (addressString: string, pageIndex: number, logPrefix: string): Promise<Tx[]> {
342+
try {
343+
return await this.getPaginatedTxs(addressString, pageIndex, CHRONIK_FETCH_N_TXS_PER_PAGE)
344+
} catch (err: any) {
345+
console.warn(`${logPrefix} page=${pageIndex} failed: ${err.message as string}`)
346+
return []
347+
}
348+
}
349+
341350
/*
342-
* For each address, fetch pages in parallel (“burst”),
343-
* then use the burst’s newest/oldest timestamps to decide whether to continue.
344-
* Yields happen only in the generator body (after each slice finishes, and at final flush).
351+
* For each address, fetch pages sequentially per-address,
352+
* processing addresses in small concurrent batches.
353+
* Yields happen eagerly: after each address completes and whenever the buffer
354+
* reaches TX_EMIT_BATCH_SIZE, keeping memory footprint low.
345355
*/
346356
private async * fetchLatestTxsForAddresses (
347357
addresses: Address[]
@@ -353,44 +363,29 @@ export class ChronikBlockchainClient {
353363
`(addressConcurrency=${INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY}, pageConcurrency=1).`
354364
)
355365

366+
// Buffer that accumulates across address batches; drained via splice to avoid copies
356367
let chronikTxs: ChronikTxWithAddress[] = []
357-
let lastBatchAddresses: string[] = []
358368

359369
const totalCount = addresses.length
360370
let syncedAlready = 0
361371
for (let i = 0; i < addresses.length; i += INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY) {
362372
const addressBatchSlice = addresses.slice(i, i + INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY)
363-
lastBatchAddresses = addressBatchSlice.map(a => a.address)
373+
const lastBatchAddresses = addressBatchSlice.map(a => a.address)
364374

365375
console.log(`${logPrefix} >>> starting chronik fetching for ${addressBatchSlice.length} addresses... (${syncedAlready}/${totalCount} synced)`)
366376

367-
const perAddressWorkers = addressBatchSlice.map(async (address) => {
377+
// Each worker returns its own collected txs — no shared mutable push during async execution
378+
const perAddressWorkers = addressBatchSlice.map(async (address): Promise<ChronikTxWithAddress[]> => {
368379
const addrLogPrefix = `${logPrefix} > ${address.address}:`
369380
const lastSyncedTimestampSeconds = this.getLastSyncTs(address)
370381
const txThresholdFilter = this.txThesholdFilter(address)
371382

372-
let nextBurstBasePageIndex = 0
373-
let hasReachedStoppingCondition = false
374-
375-
let newTxs = 0
376-
while (!hasReachedStoppingCondition) {
377-
const pageIndex = nextBurstBasePageIndex
378-
let pageTxs: Tx[] = []
379-
380-
try {
381-
pageTxs = await this.getPaginatedTxs(address.address, pageIndex, CHRONIK_FETCH_N_TXS_PER_PAGE)
382-
} catch (err: any) {
383-
console.warn(`${addrLogPrefix} page=${pageIndex} failed: ${err.message as string}`)
384-
pageTxs = []
385-
}
386-
387-
if (pageTxs.length === 0) {
388-
console.log(`${addrLogPrefix} EMPTY ADDRESS`)
389-
break
390-
}
383+
const collected: ChronikTxWithAddress[] = []
384+
let pageIndex = 0
385+
let pageTxs = await this.fetchPage(address.address, pageIndex, addrLogPrefix)
391386

387+
while (pageTxs.length > 0) {
392388
const newestTs = Number(pageTxs[0].block?.timestamp ?? pageTxs[0].timeFirstSeen)
393-
394389
if (newestTs < lastSyncedTimestampSeconds) {
395390
console.log(`${addrLogPrefix} NO NEW TXS`)
396391
break
@@ -402,38 +397,47 @@ export class ChronikBlockchainClient {
402397
.filter(txThresholdFilter)
403398
.filter(t => t.block === undefined || t.block.timestamp >= lastSyncedTimestampSeconds)
404399

405-
const newTxsInThisPage = pageTxs.length
406-
if (newTxsInThisPage > 0) {
407-
chronikTxs.push(...pageTxs.map(tx => ({ tx, address })))
400+
for (const tx of pageTxs) {
401+
collected.push({ tx, address })
408402
}
409403

410-
if (oldestTs < lastSyncedTimestampSeconds) {
411-
hasReachedStoppingCondition = true
412-
}
404+
if (oldestTs < lastSyncedTimestampSeconds) break
413405

414-
nextBurstBasePageIndex += 1
415-
if (newTxsInThisPage === 0 && oldestTs < lastSyncedTimestampSeconds) {
416-
hasReachedStoppingCondition = true
417-
}
418-
newTxs += newTxsInThisPage
406+
pageIndex += 1
407+
pageTxs = await this.fetchPage(address.address, pageIndex, addrLogPrefix)
408+
}
409+
410+
if (pageIndex === 0 && collected.length === 0) {
411+
console.log(`${addrLogPrefix} EMPTY ADDRESS`)
419412
}
420-
if (newTxs > 0) {
421-
console.log(`${addrLogPrefix} ${newTxs} new txs.`)
413+
414+
if (collected.length > 0) {
415+
console.log(`${addrLogPrefix} ${collected.length} new txs.`)
422416
}
417+
return collected
423418
})
424-
syncedAlready += addressBatchSlice.length
425419

426-
await Promise.all(
420+
// Await all address workers and merge results
421+
const results = await Promise.all(
427422
perAddressWorkers.map(async worker =>
428-
await worker.catch(err => console.error(`${logPrefix}: address job failed: ${err.message as string}`))
423+
await worker.catch((err: any) => {
424+
console.error(`${logPrefix}: address job failed: ${err.message as string}`)
425+
return [] as ChronikTxWithAddress[]
426+
})
429427
)
430428
)
429+
for (const workerTxs of results) {
430+
if (workerTxs.length > 0) {
431+
chronikTxs.push(...workerTxs)
432+
}
433+
}
434+
435+
syncedAlready += addressBatchSlice.length
431436

432-
// Yield full TX batches when buffer reaches TX_EMIT_BATCH_SIZE
437+
// Yield full TX batches when buffer reaches TX_EMIT_BATCH_SIZE — use splice to drain in-place
433438
while (chronikTxs.length >= TX_EMIT_BATCH_SIZE) {
434-
const chronikTxsSlice = chronikTxs.slice(0, TX_EMIT_BATCH_SIZE)
435-
chronikTxs = chronikTxs.slice(TX_EMIT_BATCH_SIZE)
436-
yield { chronikTxs: chronikTxsSlice, addressesSynced: [] }
439+
const drained = chronikTxs.splice(0, TX_EMIT_BATCH_SIZE)
440+
yield { chronikTxs: drained, addressesSynced: [] }
437441
}
438442

439443
// Yield batch marker for completed address group
@@ -442,9 +446,8 @@ export class ChronikBlockchainClient {
442446

443447
// Final TX flush after all addresses processed
444448
if (chronikTxs.length > 0) {
445-
const remaining = chronikTxs
449+
yield { chronikTxs, addressesSynced: [] }
446450
chronikTxs = []
447-
yield { chronikTxs: remaining, addressesSynced: [] }
448451
}
449452
}
450453

@@ -836,28 +839,33 @@ export class ChronikBlockchainClient {
836839
addresses.forEach(a => perAddrCount.set(a.id, 0))
837840

838841
interface RowWithRaw { row: Prisma.TransactionUncheckedCreateInput, raw: Tx }
839-
let toCommit: RowWithRaw[] = []
842+
const toCommit: RowWithRaw[] = []
840843

841844
try {
842845
const pfx = `${this.CHRONIK_MSG_PREFIX}[PARALLEL FETCHING]`
843846
console.log(`${pfx} will fetch batches of ${INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY} addresses from chronik`)
844847

845848
for await (const batch of this.fetchLatestTxsForAddresses(addresses)) {
846849
if (batch.addressesSynced.length > 0) {
847-
// marcador de slice => desmarca syncing
848850
await setSyncingBatch(batch.addressesSynced, false)
849851
continue
850852
}
851853

852854
const involvedAddrIds = new Set(batch.chronikTxs.map(({ address }) => address.id))
853855

854856
try {
855-
const pairsFromBatch: RowWithRaw[] = await Promise.all(
856-
batch.chronikTxs.map(async ({ tx, address }) => {
857-
const row = await this.getTransactionFromChronikTransaction(tx, address)
858-
return { row, raw: tx }
859-
})
860-
)
857+
// Process tx-to-row conversions in chunks to limit concurrent DB calls
858+
const pairsFromBatch: RowWithRaw[] = []
859+
for (let i = 0; i < batch.chronikTxs.length; i += TRANSFORM_TX_FROM_CHRONIK_BATCH_SIZE) {
860+
const chunk = batch.chronikTxs.slice(i, i + TRANSFORM_TX_FROM_CHRONIK_BATCH_SIZE)
861+
const chunkResults = await Promise.all(
862+
chunk.map(async ({ tx, address }) => {
863+
const row = await this.getTransactionFromChronikTransaction(tx, address)
864+
return { row, raw: tx }
865+
})
866+
)
867+
pairsFromBatch.push(...chunkResults)
868+
}
861869

862870
for (const { row } of pairsFromBatch) {
863871
perAddrCount.set(row.addressId, (perAddrCount.get(row.addressId) ?? 0) + 1)
@@ -866,8 +874,7 @@ export class ChronikBlockchainClient {
866874
toCommit.push(...pairsFromBatch)
867875

868876
if (toCommit.length >= DB_COMMIT_BATCH_SIZE) {
869-
const commitPairs = toCommit.slice(0, DB_COMMIT_BATCH_SIZE)
870-
toCommit = toCommit.slice(DB_COMMIT_BATCH_SIZE)
877+
const commitPairs = toCommit.splice(0, DB_COMMIT_BATCH_SIZE)
871878

872879
const rows = commitPairs.map(p => p.row)
873880
const createdTxs = await createManyTransactions(rows)
@@ -906,8 +913,7 @@ export class ChronikBlockchainClient {
906913

907914
// final DB flush (se sobrou menos que DB_COMMIT_BATCH_SIZE)
908915
if (toCommit.length > 0) {
909-
const commitPairs = toCommit.slice()
910-
toCommit = []
916+
const commitPairs = toCommit.splice(0)
911917

912918
const rows = commitPairs.map(p => p.row)
913919
const createdTxs = await createManyTransactions(rows)

0 commit comments

Comments
 (0)