Skip to content

Commit 7f67067

Browse files
authored
Incremental syncing (#1119)
* Deduplicate the Transaction commit code during sync This is a simple refactor, the only change in behavior being the logged message. * Avoid copying the Transactions to commit Removes a needless copy. * Save lastSynced after processing a batch of txs This updates lastSynced incrementally during the syncing process instead of onece at the end of the syncing jobs. This kills 2 birds with one stone: - If the server fails syncing for whatever reason (e.g. out of memory), it will resume syncing from the last good state and not from the beginning. - It fixes a subtle bug where the server might be missing transactions: since last synced was updated once after all addresses where synced, all the txs received by these addresses during that time would be lost. It comes at the cost of one write per address, after each back has been committed, so performance drawbacks are expected. * Let updateLastSynced take the timestamp as an argument Consistently set the lastSynced value to the higher timestamp of the committed transactions and not the system time. * Also update lastSynced when no new transaction is added This should be unnecessary under normal circumstance, but it makes sure to update the state if the db was left in an inconsistent state with txs stored but lastSynced not update accordingly. This is overall more robust. * Enforce lastSynced to be monotonic There are a few cirumstances where the last committed tx timestamp is less than the current lastSynced value, e.g. if more than 200 txs where committed from the same address (since chronik stores it reverse time, digging into history is traveling through txs time backwards). This commit enforces the lastSynced value to grow monotonically. This is achieved by a lookup before the update, and it's not done in updateLastSynced for performance reasons: the use of findMany during syncing has a big impact on performances.
1 parent 4fb9f5f commit 7f67067

2 files changed

Lines changed: 98 additions & 72 deletions

File tree

services/addressService.ts

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -272,28 +272,14 @@ export async function getLatestConfirmedTxTimestampForAddress (addressId: string
272272
return tx?.timestamp
273273
}
274274

275-
export async function updateManyLastSynced (addressStringArray: string[]): Promise<void> {
276-
await prisma.address.updateMany({
277-
where: {
278-
address: {
279-
in: addressStringArray
280-
}
281-
},
282-
data: {
283-
syncing: false,
284-
lastSynced: new Date()
285-
}
286-
})
287-
}
288-
289-
export async function updateLastSynced (addressString: string): Promise<void> {
275+
export async function updateLastSynced (addressString: string, timestampSeconds: number): Promise<void> {
290276
await prisma.address.update({
291277
where: {
292278
address: addressString
293279
},
294280
data: {
295281
syncing: false,
296-
lastSynced: new Date()
282+
lastSynced: new Date(timestampSeconds * 1000)
297283
}
298284
})
299285
}

services/chronikService.ts

Lines changed: 96 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { encodeCashAddress, decodeCashAddress } from 'ecashaddrjs'
33
import { AddressWithTransaction, BlockchainInfo, TransactionDetails, ProcessedMessages, SubbedAddressesLog, SyncAndSubscriptionReturn, SubscriptionReturn, SimpleBlockInfo } from 'types/chronikTypes'
44
import { CHRONIK_MESSAGE_CACHE_DELAY, RESPONSE_MESSAGES, XEC_TIMESTAMP_THRESHOLD, XEC_NETWORK_ID, BCH_NETWORK_ID, BCH_TIMESTAMP_THRESHOLD, CHRONIK_FETCH_N_TXS_PER_PAGE, KeyValueT, NETWORK_IDS_FROM_SLUGS, SOCKET_MESSAGES, NETWORK_IDS, NETWORK_TICKERS, MainNetworkSlugsType, MAX_MEMPOOL_TXS_TO_PROCESS_AT_A_TIME, MEMPOOL_PROCESS_DELAY, CHRONIK_INITIALIZATION_DELAY, LATENCY_TEST_CHECK_DELAY, INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY, TX_EMIT_BATCH_SIZE, DB_COMMIT_BATCH_SIZE, MAX_TXS_PER_ADDRESS } from 'constants/index'
55
import { productionAddresses } from 'prisma-local/seeds/addresses'
6+
import prisma from 'prisma-local/clientInstance'
67
import {
78
TransactionWithAddressAndPrices,
89
createManyTransactions,
@@ -20,7 +21,7 @@ import {
2021
import { Address, Prisma, ClientPaymentStatus } from '@prisma/client'
2122
import xecaddr from 'xecaddrjs'
2223
import { getAddressPrefix, satoshisToUnit } from 'utils/index'
23-
import { fetchAddressesArray, fetchAllAddressesForNetworkId, getEarliestUnconfirmedTxTimestampForAddress, getLatestConfirmedTxTimestampForAddress, setSyncing, setSyncingBatch, updateLastSynced, updateManyLastSynced } from './addressService'
24+
import { fetchAddressesArray, fetchAllAddressesForNetworkId, getEarliestUnconfirmedTxTimestampForAddress, getLatestConfirmedTxTimestampForAddress, setSyncing, setSyncingBatch, updateLastSynced } from './addressService'
2425
import * as ws from 'ws'
2526
import { BroadcastTxData } from 'ws-service/types'
2627
import config from 'config'
@@ -436,6 +437,7 @@ export class ChronikBlockchainClient {
436437
let page = 0
437438
const earliestUnconfirmedTxTimestamp = await getEarliestUnconfirmedTxTimestampForAddress(address.id)
438439
const latestTimestamp = earliestUnconfirmedTxTimestamp ?? await getLatestConfirmedTxTimestampForAddress(address.id) ?? 0
440+
let maxTimestamp = 0
439441

440442
while (true) {
441443
let transactions = await this.getPaginatedTxs(address.address, page, pageSize)
@@ -458,6 +460,11 @@ export class ChronikBlockchainClient {
458460
const transactionsToPersist = [...confirmedTransactions, ...unconfirmedTransactions].map(tx => this.getTransactionFromChronikTransaction(tx, address))
459461
const persistedTransactions = await createManyTransactions(transactionsToPersist)
460462
if (persistedTransactions.length > 0) {
463+
// Track the max timestamp from persisted transactions
464+
for (const tx of persistedTransactions) {
465+
maxTimestamp = Math.max(maxTimestamp, tx.timestamp)
466+
}
467+
461468
const simplifiedTransactions = getSimplifiedTransactions(persistedTransactions)
462469

463470
console.log(`${this.CHRONIK_MSG_PREFIX}: added ${simplifiedTransactions.length} txs to ${address.address}`)
@@ -476,7 +483,18 @@ export class ChronikBlockchainClient {
476483
yield persistedTransactions
477484
}
478485
await setSyncing(address.address, false)
479-
await updateLastSynced(address.address)
486+
487+
// Only update lastSynced if new value is greater than current (or if current is null)
488+
const currentAddress = await prisma.address.findUnique({
489+
where: { address: address.address },
490+
select: { lastSynced: true }
491+
})
492+
const currentLastSynced = currentAddress?.lastSynced ?? null
493+
const newDate = new Date(maxTimestamp * 1000)
494+
495+
if ((currentLastSynced == null) || currentLastSynced < newDate) {
496+
await updateLastSynced(address.address, maxTimestamp)
497+
}
480498
}
481499

482500
private async getUtxos (address: string): Promise<ScriptUtxo[]> {
@@ -803,6 +821,75 @@ export class ChronikBlockchainClient {
803821
}
804822
}
805823

824+
private async commitTransactionsBatch (
825+
commitTuples: Array<{ row: Prisma.TransactionUncheckedCreateInput, raw: Tx, addressString: string }>,
826+
productionAddressesIds: string[],
827+
runTriggers: boolean
828+
): Promise<void> {
829+
const rows = commitTuples.map(p => p.row)
830+
const createdTxs = await createManyTransactions(rows)
831+
console.log(`${this.CHRONIK_MSG_PREFIX} committed — created=${createdTxs.length}/${commitTuples.length}`)
832+
833+
const createdForProd = createdTxs.filter(t => productionAddressesIds.includes(t.addressId))
834+
if (createdForProd.length > 0) {
835+
await appendTxsToFile(createdForProd as unknown as Prisma.TransactionCreateManyInput[])
836+
}
837+
838+
if (createdTxs.length > 0) {
839+
const rawByHash = new Map(commitTuples.map(p => [p.raw.txid, p.raw]))
840+
const triggerBatch: BroadcastTxData[] = []
841+
for (const createdTx of createdTxs) {
842+
const raw = rawByHash.get(createdTx.hash)
843+
if (raw == null) {
844+
continue
845+
}
846+
const bd = this.broadcastIncomingTx(createdTx.address.address, raw, createdTx)
847+
triggerBatch.push(bd)
848+
}
849+
if (runTriggers && triggerBatch.length > 0) {
850+
await executeTriggersBatch(triggerBatch, this.networkId)
851+
}
852+
}
853+
854+
// Get the latest timestamp of all committed transactions (including pre-existent) for each address.
855+
// This is redundant under normal circumstances, but is more robust than only updating for the newly created transactions.
856+
const addressMaxTimestamp = new Map<string, number>()
857+
for (const { row, addressString } of commitTuples) {
858+
const currentMax = addressMaxTimestamp.get(addressString) ?? 0
859+
addressMaxTimestamp.set(addressString, Math.max(currentMax, row.timestamp))
860+
}
861+
862+
// Fetch current lastSynced values for all addresses
863+
const addressesToUpdate = Array.from(addressMaxTimestamp.keys())
864+
const currentAddresses = await prisma.address.findMany({
865+
where: {
866+
address: { in: addressesToUpdate }
867+
},
868+
select: {
869+
address: true,
870+
lastSynced: true
871+
}
872+
})
873+
const currentLastSyncedMap = new Map<string, Date | null>(
874+
currentAddresses.map((a: { address: string, lastSynced: Date | null }) => [a.address, a.lastSynced])
875+
)
876+
877+
// Update lastSynced for the processed addresses (only if new value is greater)
878+
for (const [addr, maxTs] of addressMaxTimestamp) {
879+
const currentLastSynced = currentLastSyncedMap.get(addr)
880+
const newDate = new Date(maxTs * 1000)
881+
882+
// Only update if new value is greater than current (or if current is null)
883+
if ((currentLastSynced == null) || currentLastSynced < newDate) {
884+
try {
885+
await updateLastSynced(addr, maxTs)
886+
} catch (err: any) {
887+
console.error(`${this.CHRONIK_MSG_PREFIX}: Failed to update lastSynced for ${addr}: ${err.message as string}`)
888+
}
889+
}
890+
}
891+
}
892+
806893
public async syncAddresses (addresses: Address[], runTriggers = false): Promise<SyncAndSubscriptionReturn> {
807894
const failedAddressesWithErrors: KeyValueT<string> = {}
808895
const successfulAddressesWithCount: KeyValueT<number> = {}
@@ -819,7 +906,7 @@ export class ChronikBlockchainClient {
819906
const perAddrCount = new Map<string, number>()
820907
addresses.forEach(a => perAddrCount.set(a.id, 0))
821908

822-
interface RowWithRaw { row: Prisma.TransactionUncheckedCreateInput, raw: Tx }
909+
interface RowWithRaw { row: Prisma.TransactionUncheckedCreateInput, raw: Tx, addressString: string }
823910
let toCommit: RowWithRaw[] = []
824911

825912
try {
@@ -836,43 +923,21 @@ export class ChronikBlockchainClient {
836923
const involvedAddrIds = new Set(batch.chronikTxs.map(({ address }) => address.id))
837924

838925
try {
839-
const pairsFromBatch: RowWithRaw[] = batch.chronikTxs.map(({ tx, address }) => {
926+
const tupleFromBatch: RowWithRaw[] = batch.chronikTxs.map(({ tx, address }) => {
840927
const row = this.getTransactionFromChronikTransaction(tx, address)
841-
return { row, raw: tx }
928+
return { row, raw: tx, addressString: address.address }
842929
})
843930

844-
for (const { row } of pairsFromBatch) {
931+
for (const { row } of tupleFromBatch) {
845932
perAddrCount.set(row.addressId, (perAddrCount.get(row.addressId) ?? 0) + 1)
846933
}
847934

848-
toCommit.push(...pairsFromBatch)
935+
toCommit.push(...tupleFromBatch)
849936

850937
if (toCommit.length >= DB_COMMIT_BATCH_SIZE) {
851938
const commitPairs = toCommit.slice(0, DB_COMMIT_BATCH_SIZE)
852939
toCommit = toCommit.slice(DB_COMMIT_BATCH_SIZE)
853-
854-
const rows = commitPairs.map(p => p.row)
855-
const createdTxs = await createManyTransactions(rows)
856-
console.log(`${this.CHRONIK_MSG_PREFIX} committed — created=${createdTxs.length}`)
857-
858-
const createdForProd = createdTxs.filter(t => productionAddressesIds.includes(t.addressId))
859-
if (createdForProd.length > 0) {
860-
await appendTxsToFile(createdForProd as unknown as Prisma.TransactionCreateManyInput[])
861-
}
862-
863-
if (createdTxs.length > 0) {
864-
const rawByHash = new Map(commitPairs.map(p => [p.raw.txid, p.raw]))
865-
const triggerBatch: BroadcastTxData[] = []
866-
for (const createdTx of createdTxs) {
867-
const raw = rawByHash.get(createdTx.hash)
868-
if (raw == null) continue
869-
const bd = this.broadcastIncomingTx(createdTx.address.address, raw, createdTx)
870-
triggerBatch.push(bd)
871-
}
872-
if (runTriggers && triggerBatch.length > 0) {
873-
await executeTriggersBatch(triggerBatch, this.networkId)
874-
}
875-
}
940+
await this.commitTransactionsBatch(commitPairs, productionAddressesIds, runTriggers)
876941
}
877942
} catch (err: any) {
878943
console.error(`${this.CHRONIK_MSG_PREFIX}: ERROR in batch (scoped): ${err.message as string}`)
@@ -888,39 +953,14 @@ export class ChronikBlockchainClient {
888953

889954
// final DB flush (se sobrou menos que DB_COMMIT_BATCH_SIZE)
890955
if (toCommit.length > 0) {
891-
const commitPairs = toCommit.slice()
956+
await this.commitTransactionsBatch(toCommit, productionAddressesIds, runTriggers)
892957
toCommit = []
893-
894-
const rows = commitPairs.map(p => p.row)
895-
const createdTxs = await createManyTransactions(rows)
896-
console.log(`${this.CHRONIK_MSG_PREFIX} committed FINAL — created=${createdTxs.length}`)
897-
898-
const createdForProd = createdTxs.filter(t => productionAddressesIds.includes(t.addressId))
899-
if (createdForProd.length > 0) {
900-
await appendTxsToFile(createdForProd as unknown as Prisma.TransactionCreateManyInput[])
901-
}
902-
903-
if (createdTxs.length > 0) {
904-
const rawByHash = new Map(commitPairs.map(p => [p.raw.txid, p.raw]))
905-
const triggerBatch: BroadcastTxData[] = []
906-
for (const createdTx of createdTxs) {
907-
const raw = rawByHash.get(createdTx.hash)
908-
if (raw == null) continue
909-
const bd = this.broadcastIncomingTx(createdTx.address.address, raw, createdTx)
910-
triggerBatch.push(bd)
911-
}
912-
if (runTriggers && triggerBatch.length > 0) {
913-
await executeTriggersBatch(triggerBatch, this.networkId)
914-
}
915-
}
916958
}
917959

918960
// build success map
919961
addresses.forEach(a => {
920962
successfulAddressesWithCount[a.address] = perAddrCount.get(a.id) ?? 0
921963
})
922-
const okAddresses = addresses.filter(a => !(a.address in failedAddressesWithErrors))
923-
await updateManyLastSynced(okAddresses.map(a => a.address))
924964
} catch (err: any) {
925965
console.error(`${this.CHRONIK_MSG_PREFIX}: FATAL ERROR in parallel sync: ${err.message as string}`)
926966
addresses.forEach(a => {

0 commit comments

Comments
 (0)