Skip to content

Commit 7bea95a

Browse files
authored
Merge pull request #1065 from PayButton/feat/separate-sync-process
feat: separate process to do initial syncing
2 parents 9acfa66 + b50f0f3 commit 7bea95a

4 files changed

Lines changed: 73 additions & 30 deletions

File tree

jobs/initJobs.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { CURRENT_PRICE_REPEAT_DELAY } from 'constants/index'
22
import { Queue } from 'bullmq'
33
import { redisBullMQ } from 'redis/clientInstance'
44
import EventEmitter from 'events'
5-
import { syncCurrentPricesWorker } from './workers'
5+
import { syncCurrentPricesWorker, syncBlockchainAndPricesWorker } from './workers'
66

77
EventEmitter.defaultMaxListeners = 20
88

@@ -20,6 +20,10 @@ const main = async (): Promise<void> => {
2020
)
2121

2222
await syncCurrentPricesWorker(pricesQueue.name)
23+
24+
const blockchainQueue = new Queue('blockchainSync', { connection: redisBullMQ })
25+
await blockchainQueue.add('syncBlockchainAndPrices', {}, { jobId: 'syncBlockchainAndPrices' })
26+
await syncBlockchainAndPricesWorker(blockchainQueue.name)
2327
}
2428

2529
void main()

jobs/workers.ts

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import { Worker } from 'bullmq'
22
import { redisBullMQ } from 'redis/clientInstance'
33
import { DEFAULT_WORKER_LOCK_DURATION } from 'constants/index'
4+
import { multiBlockchainClient } from 'services/chronikService'
5+
import { connectAllTransactionsToPrices } from 'services/transactionService'
46

57
import * as priceService from 'services/priceService'
68

@@ -17,13 +19,44 @@ export const syncCurrentPricesWorker = async (queueName: string): Promise<void>
1719
}
1820
)
1921
worker.on('completed', job => {
20-
console.log(`syncing of ${job.data.syncType as string} prices finished`)
22+
console.log('syncing of current prices finished')
2123
})
2224

2325
worker.on('failed', (job, err) => {
2426
if (job !== undefined) {
25-
console.log(`syncing of ${job.data.syncType as string} prices FAILED`)
26-
console.log(`error for initial syncing of ${job.data.syncType as string} prices: ${err.message}`)
27+
console.log('syncing of current prices FAILED')
28+
console.log(`error for initial syncing of current prices: ${err.message}`)
29+
}
30+
})
31+
}
32+
33+
export const syncBlockchainAndPricesWorker = async (queueName: string): Promise<void> => {
34+
const worker = new Worker(
35+
queueName,
36+
async (job) => {
37+
console.log(`job ${job.id as string}: syncing missed transactions and connecting prices...`)
38+
await multiBlockchainClient.syncMissedTransactions()
39+
await connectAllTransactionsToPrices()
40+
},
41+
{
42+
connection: redisBullMQ,
43+
lockDuration: DEFAULT_WORKER_LOCK_DURATION
44+
}
45+
)
46+
47+
worker.on('completed', (job) => {
48+
// teardown
49+
void (async () => {
50+
console.log('Cleaning up MultiBlockchainClient global instance...')
51+
await multiBlockchainClient.destroy()
52+
console.log('Done.')
53+
console.log(`job ${job.id as string}: blockchain + prices sync finished`)
54+
})()
55+
})
56+
57+
worker.on('failed', (job, err) => {
58+
if (job != null) {
59+
console.error(`job ${job.id as string}: FAILED — ${err.message}`)
2760
}
2861
})
2962
}

scripts/docker-exec-shortcuts.sh

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ case "$command" in
4949
eval "$base_command_db" mariadb-dump -h "$MAIN_DB_HOST" -u root -p"$MAIN_DB_ROOT_PASSWORD" "$@"
5050
;;
5151
"databaseshell" | "dbs")
52-
eval "$base_command_db" bash -l "$@"
52+
eval "$base_command_db" bash -c bash -l "$@"
5353
;;
5454
"databasetest" | "dbt")
5555
eval "$base_command_db" mariadb -h "$MAIN_DB_HOST" -u "$MAIN_DB_USER"-test -p"$MAIN_DB_PASSWORD" -D "$MAIN_DB_NAME"-test "$@"
@@ -70,10 +70,10 @@ case "$command" in
7070
eval "$base_command_node" yarn test --coverage --verbose "$@"
7171
;;
7272
"nodeshell" | "ns")
73-
eval "$base_command_node" bash -l
73+
eval "$base_command_node" bash -c bash -l
7474
;;
7575
"rootnodeshell" | "rns")
76-
eval "$base_command_node_root" bash -l
76+
eval "$base_command_node_root" bash -c bash -l
7777
;;
7878
"jobslogs" | "jl")
7979
eval "$base_command_node" pm2 logs jobs

services/chronikService.ts

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import {
1111
upsertTransaction,
1212
getSimplifiedTransactions,
1313
getSimplifiedTrasaction,
14-
connectAllTransactionsToPrices,
1514
updateClientPaymentStatus,
1615
getClientPayment
1716
} from './transactionService'
@@ -28,7 +27,6 @@ import { OpReturnData, parseError, parseOpReturnData } from 'utils/validators'
2827
import { executeAddressTriggers, executeTriggersBatch } from './triggerService'
2928
import { appendTxsToFile } from 'prisma-local/seeds/transactions'
3029
import { PHASE_PRODUCTION_BUILD } from 'next/dist/shared/lib/constants'
31-
import { syncPastDaysNewerPrices } from './priceService'
3230
import { AddressType } from 'ecashaddrjs/dist/types'
3331
import { DecimalJsLike } from '@prisma/client/runtime/library'
3432

@@ -957,25 +955,14 @@ class MultiBlockchainClient {
957955
console.log('Initializing MultiBlockchainClient...')
958956
this.initializing = true
959957
void (async () => {
960-
if (this.isRunningApp()) {
961-
await syncPastDaysNewerPrices()
962-
const asyncOperations: Array<Promise<void>> = []
963-
this.clients = {
964-
ecash: this.instantiateChronikClient('ecash', asyncOperations),
965-
bitcoincash: this.instantiateChronikClient('bitcoincash', asyncOperations)
966-
}
967-
await Promise.all(asyncOperations)
968-
this.setInitialized()
969-
await connectAllTransactionsToPrices()
970-
} else if (process.env.NODE_ENV === 'test') {
971-
const asyncOperations: Array<Promise<void>> = []
972-
this.clients = {
973-
ecash: this.instantiateChronikClient('ecash', asyncOperations),
974-
bitcoincash: this.instantiateChronikClient('bitcoincash', asyncOperations)
975-
}
976-
await Promise.all(asyncOperations)
977-
this.setInitialized()
958+
const asyncOperations: Array<Promise<void>> = []
959+
this.clients = {
960+
ecash: this.instantiateChronikClient('ecash', asyncOperations),
961+
bitcoincash: this.instantiateChronikClient('bitcoincash', asyncOperations)
978962
}
963+
await Promise.all(asyncOperations)
964+
this.setInitialized()
965+
console.log('Finished initializing MultiBlockchainClient.')
979966
})()
980967
}
981968

@@ -1020,9 +1007,6 @@ class MultiBlockchainClient {
10201007
await newClient.waitForLatencyTest()
10211008
console.log(`[CHRONIK — ${networkSlug}] Subscribing addresses in database...`)
10221009
await newClient.subscribeInitialAddresses()
1023-
console.log(`[CHRONIK — ${networkSlug}] Syncing missed transactions...`)
1024-
await newClient.syncMissedTransactions()
1025-
console.log(`[CHRONIK — ${networkSlug}] Finished instantiating client.`)
10261010
})()
10271011
)
10281012
} else if (process.env.NODE_ENV === 'test') {
@@ -1033,6 +1017,7 @@ class MultiBlockchainClient {
10331017
)
10341018
}
10351019

1020+
console.log(`Finished instantiating ${networkSlug} client.`)
10361021
return newClient
10371022
}
10381023

@@ -1090,10 +1075,31 @@ class MultiBlockchainClient {
10901075
return await this.clients[networkSlug as MainNetworkSlugsType].getBalance(address)
10911076
}
10921077

1078+
public async syncMissedTransactions (): Promise<void> {
1079+
await this.waitForStart()
1080+
await Promise.all([
1081+
this.clients.ecash.syncMissedTransactions(),
1082+
this.clients.bitcoincash.syncMissedTransactions()
1083+
])
1084+
}
1085+
10931086
public async syncAndSubscribeAddresses (addresses: Address[]): Promise<SyncAndSubscriptionReturn> {
10941087
await this.subscribeAddresses(addresses)
10951088
return await this.syncAddresses(addresses)
10961089
}
1090+
1091+
public async destroy (): Promise<void> {
1092+
await Promise.all(
1093+
Object.values(this.clients).map(async (c) => {
1094+
try {
1095+
c.chronikWSEndpoint.close()
1096+
c.wsEndpoint.close()
1097+
} catch (err: any) {
1098+
console.error(`Failed to close connections for client: ${err.message as string}`)
1099+
}
1100+
})
1101+
)
1102+
}
10971103
}
10981104

10991105
export interface NodeJsGlobalMultiBlockchainClient extends NodeJS.Global {

0 commit comments

Comments
 (0)