Skip to content

Commit 2a808a8

Browse files
committed
reorg handles transaction/logs count
1 parent 19296e1 commit 2a808a8

2 files changed

Lines changed: 223 additions & 6 deletions

File tree

internal/committer/reorg.go

Lines changed: 56 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ func detectAndHandleReorgs(startBlock int64, endBlock int64) error {
105105
return nil
106106
}
107107

108-
// finding the reorg start and end block
108+
// 1) Block verification: find reorg range from header continuity (existing behavior)
109109
reorgStartBlock := int64(-1)
110110
reorgEndBlock := int64(-1)
111111
for i := 1; i < len(blockHeaders); i++ {
@@ -131,18 +131,68 @@ func detectAndHandleReorgs(startBlock int64, endBlock int64) error {
131131
}
132132

133133
// set end to the last block if not set
134+
lastHeaderBlock := blockHeaders[len(blockHeaders)-1].Number.Int64()
134135
if reorgEndBlock == -1 {
135-
reorgEndBlock = blockHeaders[len(blockHeaders)-1].Number.Int64()
136+
// No header-based end detected; default to the last header for last-valid-block tracking.
137+
reorgEndBlock = lastHeaderBlock
136138
}
137139

140+
// 2) Transaction verification: check for mismatches between block.transaction_count
141+
// and the number of transactions stored per block in ClickHouse.
142+
txStart, txEnd, err := libs.GetTransactionMismatchRangeFromClickHouseV2(libs.ChainId.Uint64(), uint64(startBlock), uint64(endBlock))
143+
if err != nil {
144+
return fmt.Errorf("detectAndHandleReorgs: transaction verification failed: %w", err)
145+
}
146+
147+
// 3) Logs verification: check for mismatches between logsBloom and logs stored in ClickHouse.
148+
logsStart, logsEnd, err := libs.GetLogsMismatchRangeFromClickHouseV2(libs.ChainId.Uint64(), uint64(startBlock), uint64(endBlock))
149+
if err != nil {
150+
return fmt.Errorf("detectAndHandleReorgs: logs verification failed: %w", err)
151+
}
152+
153+
// 4) Combine all ranges:
154+
// - If all three ranges (blocks, tx, logs) are empty, then there is no reorg.
155+
// - Otherwise, take min(start) and max(end) across all non-empty ranges as the final reorg range.
156+
finalStart := int64(-1)
157+
finalEnd := int64(-1)
158+
159+
// block headers range
138160
if reorgStartBlock > -1 {
139-
if err := handleReorgForRange(uint64(reorgStartBlock), uint64(reorgEndBlock)); err != nil {
140-
return err
161+
finalStart = reorgStartBlock
162+
finalEnd = reorgEndBlock
163+
}
164+
165+
// transactions range
166+
if txStart > -1 {
167+
if finalStart == -1 || txStart < finalStart {
168+
finalStart = txStart
169+
}
170+
if finalEnd == -1 || txEnd > finalEnd {
171+
finalEnd = txEnd
141172
}
142173
}
143174

144-
// update last valid block. if there was no reorg, this will update to the last block
145-
libs.SetReorgLastValidBlock(libs.ChainIdStr, reorgEndBlock)
175+
// logs range
176+
if logsStart > -1 {
177+
if finalStart == -1 || logsStart < finalStart {
178+
finalStart = logsStart
179+
}
180+
if finalEnd == -1 || logsEnd > finalEnd {
181+
finalEnd = logsEnd
182+
}
183+
}
184+
185+
if finalStart > -1 {
186+
// We found at least one inconsistent range; reorg from min(start) to max(end).
187+
if err := handleReorgForRange(uint64(finalStart), uint64(finalEnd)); err != nil {
188+
return err
189+
}
190+
libs.SetReorgLastValidBlock(libs.ChainIdStr, finalEnd)
191+
} else {
192+
// No inconsistencies across blocks, transactions, or logs; mark the last checked
193+
// header block as the last valid block.
194+
libs.SetReorgLastValidBlock(libs.ChainIdStr, lastHeaderBlock)
195+
}
146196

147197
return nil
148198
}

internal/libs/clickhouse.go

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,17 @@ var defaultTraceFields = []string{
4444
"reward_type", "refund_address",
4545
}
4646

47+
type blockTxAggregate struct {
48+
BlockNumber *big.Int `ch:"block_number"`
49+
TxCount uint64 `ch:"tx_count"`
50+
}
51+
52+
type blockLogAggregate struct {
53+
BlockNumber *big.Int `ch:"block_number"`
54+
LogCount uint64 `ch:"log_count"`
55+
MaxLogIndex uint64 `ch:"max_log_index"`
56+
}
57+
4758
// only use this for backfill or getting old data.
4859
var ClickhouseConnV1 clickhouse.Conn
4960

@@ -253,6 +264,162 @@ func GetBlockDataFromClickHouseV2(chainId uint64, startBlockNumber uint64, endBl
253264
return blockData, nil
254265
}
255266

267+
// GetTransactionMismatchRangeFromClickHouseV2 checks, for blocks in the given range,
268+
// where the stored transaction_count in the blocks table does not match the number
269+
// of transactions in the transactions table. It returns the minimum and maximum
270+
// block numbers that have a mismatch, or (-1, -1) if all blocks are consistent.
271+
func GetTransactionMismatchRangeFromClickHouseV2(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) (int64, int64, error) {
272+
if endBlockNumber < startBlockNumber {
273+
return -1, -1, nil
274+
}
275+
276+
blocksRaw, err := getBlocksFromV2(chainId, startBlockNumber, endBlockNumber)
277+
if err != nil {
278+
return -1, -1, fmt.Errorf("GetTransactionMismatchRangeFromClickHouseV2: failed to load blocks: %w", err)
279+
}
280+
281+
// Aggregate transaction counts per block from the transactions table.
282+
query := fmt.Sprintf(
283+
"SELECT block_number, count() AS tx_count FROM %s.transactions FINAL WHERE chain_id = %d AND block_number BETWEEN %d AND %d GROUP BY block_number ORDER BY block_number",
284+
config.Cfg.CommitterClickhouseDatabase,
285+
chainId,
286+
startBlockNumber,
287+
endBlockNumber,
288+
)
289+
290+
txAggRows, err := execQueryV2[blockTxAggregate](query)
291+
if err != nil {
292+
return -1, -1, fmt.Errorf("GetTransactionMismatchRangeFromClickHouseV2: failed to load tx aggregates: %w", err)
293+
}
294+
295+
txCounts := make(map[uint64]uint64, len(txAggRows))
296+
for _, row := range txAggRows {
297+
if row.BlockNumber == nil {
298+
continue
299+
}
300+
txCounts[row.BlockNumber.Uint64()] = row.TxCount
301+
}
302+
303+
var mismatchStart int64 = -1
304+
var mismatchEnd int64 = -1
305+
306+
for _, block := range blocksRaw {
307+
if block.ChainId == nil || block.ChainId.Uint64() == 0 || block.Number == nil {
308+
continue
309+
}
310+
311+
bn := block.Number.Uint64()
312+
expectedTxCount := block.TransactionCount
313+
actualTxCount, hasTx := txCounts[bn]
314+
315+
mismatch := false
316+
if expectedTxCount == 0 {
317+
// Header says no transactions; ensure there are none in the table.
318+
if hasTx && actualTxCount > 0 {
319+
mismatch = true
320+
}
321+
} else {
322+
// Header says there should be transactions.
323+
if !hasTx || actualTxCount != expectedTxCount {
324+
mismatch = true
325+
}
326+
}
327+
328+
if mismatch {
329+
if mismatchStart == -1 || int64(bn) < mismatchStart {
330+
mismatchStart = int64(bn)
331+
}
332+
if mismatchEnd == -1 || int64(bn) > mismatchEnd {
333+
mismatchEnd = int64(bn)
334+
}
335+
}
336+
}
337+
338+
return mismatchStart, mismatchEnd, nil
339+
}
340+
341+
// GetLogsMismatchRangeFromClickHouseV2 checks, for blocks in the given range,
342+
// where logs in the logs table are inconsistent with the block's logs_bloom:
343+
// - logsBloom is non-empty but there are no logs for that block
344+
// - logsBloom is empty/zero but logs exist
345+
// - log indexes are not contiguous (count(*) != max(log_index)+1 when logs exist)
346+
// It returns the minimum and maximum block numbers that have a mismatch, or
347+
// (-1, -1) if all blocks are consistent.
348+
func GetLogsMismatchRangeFromClickHouseV2(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) (int64, int64, error) {
349+
if endBlockNumber < startBlockNumber {
350+
return -1, -1, nil
351+
}
352+
353+
blocksRaw, err := getBlocksFromV2(chainId, startBlockNumber, endBlockNumber)
354+
if err != nil {
355+
return -1, -1, fmt.Errorf("GetLogsMismatchRangeFromClickHouseV2: failed to load blocks: %w", err)
356+
}
357+
358+
// Aggregate log counts and max log_index per block from the logs table.
359+
query := fmt.Sprintf(
360+
"SELECT block_number, count() AS log_count, max(log_index) AS max_log_index FROM %s.logs FINAL WHERE chain_id = %d AND block_number BETWEEN %d AND %d GROUP BY block_number ORDER BY block_number",
361+
config.Cfg.CommitterClickhouseDatabase,
362+
chainId,
363+
startBlockNumber,
364+
endBlockNumber,
365+
)
366+
367+
logAggRows, err := execQueryV2[blockLogAggregate](query)
368+
if err != nil {
369+
return -1, -1, fmt.Errorf("GetLogsMismatchRangeFromClickHouseV2: failed to load log aggregates: %w", err)
370+
}
371+
372+
logAggs := make(map[uint64]blockLogAggregate, len(logAggRows))
373+
for _, row := range logAggRows {
374+
if row.BlockNumber == nil {
375+
continue
376+
}
377+
bn := row.BlockNumber.Uint64()
378+
logAggs[bn] = row
379+
}
380+
381+
var mismatchStart int64 = -1
382+
var mismatchEnd int64 = -1
383+
384+
for _, block := range blocksRaw {
385+
if block.ChainId == nil || block.ChainId.Uint64() == 0 || block.Number == nil {
386+
continue
387+
}
388+
389+
bn := block.Number.Uint64()
390+
hasLogsBloom := block.LogsBloom != "" && block.LogsBloom != EMPTY_LOGS_BLOOM
391+
logAgg, hasLogAgg := logAggs[bn]
392+
393+
mismatch := false
394+
395+
if hasLogsBloom {
396+
// logsBloom indicates logs should exist
397+
if !hasLogAgg || logAgg.LogCount == 0 {
398+
mismatch = true
399+
} else if logAgg.MaxLogIndex+1 != logAgg.LogCount {
400+
// log_index should be contiguous from 0..log_count-1
401+
mismatch = true
402+
}
403+
} else {
404+
// logsBloom is empty/zero; there should be no logs
405+
if hasLogAgg && logAgg.LogCount > 0 {
406+
mismatch = true
407+
}
408+
}
409+
410+
if mismatch {
411+
if mismatchStart == -1 || int64(bn) < mismatchStart {
412+
mismatchStart = int64(bn)
413+
}
414+
if mismatchEnd == -1 || int64(bn) > mismatchEnd {
415+
mismatchEnd = int64(bn)
416+
}
417+
}
418+
}
419+
420+
return mismatchStart, mismatchEnd, nil
421+
}
422+
256423
func getBlocksFromV2(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) ([]common.Block, error) {
257424
sb := startBlockNumber
258425
length := endBlockNumber - startBlockNumber + 1

0 commit comments

Comments
 (0)