Skip to content

Commit 53f1895

Browse files
committed
committer start block
1 parent e8fa8b5 commit 53f1895

3 files changed

Lines changed: 30 additions & 0 deletions

File tree

cmd/committer.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,31 @@ import (
77
"github.com/prometheus/client_golang/prometheus/promhttp"
88
"github.com/rs/zerolog/log"
99
"github.com/spf13/cobra"
10+
config "github.com/thirdweb-dev/indexer/configs"
1011
"github.com/thirdweb-dev/indexer/internal/committer"
1112
)
1213

14+
var committerStartBlock uint64
15+
1316
var committerCmd = &cobra.Command{
1417
Use: "committer",
1518
Short: "run committer",
1619
Long: "published data from s3 to kafka. if block is not found in s3, it will panic",
1720
Run: RunCommitter,
1821
}
1922

23+
func init() {
24+
committerCmd.Flags().Uint64Var(&committerStartBlock, "start-block", 0, "start committing from this block number (overrides ClickHouse max block when > 0)")
25+
}
26+
2027
func RunCommitter(cmd *cobra.Command, args []string) {
2128
fmt.Println("running committer")
2229

30+
if committerStartBlock > 0 {
31+
config.Cfg.CommitterStartBlock = committerStartBlock
32+
log.Info().Uint64("start_block", committerStartBlock).Msg("Committer start block override enabled")
33+
}
34+
2335
// Start Prometheus metrics server
2436
log.Info().Msg("Starting Metrics Server on port 2112")
2537
go func() {

configs/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ type Config struct {
6262
CommitterKafkaBatchSize int `env:"COMMITTER_KAFKA_BATCH_SIZE" envDefault:"500"`
6363
CommitterIsLive bool `env:"COMMITTER_IS_LIVE" envDefault:"false"`
6464
CommitterLagByBlocks uint64 `env:"COMMITTER_LAG_BY_BLOCKS" envDefault:"0"`
65+
// CommitterStartBlock, when set (>0), forces the committer to start publishing
66+
// from this block number regardless of what ClickHouse says is already committed.
67+
// This can cause duplicate publishing if ClickHouse already contains higher blocks.
68+
CommitterStartBlock uint64 `env:"COMMITTER_START_BLOCK" envDefault:"0"`
6569
StagingS3Bucket string `env:"STAGING_S3_BUCKET" envDefault:"thirdweb-insight-production"`
6670
StagingS3Region string `env:"STAGING_S3_REGION" envDefault:"us-west-2"`
6771
StagingS3AccessKeyID string `env:"STAGING_S3_ACCESS_KEY_ID"`

internal/committer/committer.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,20 @@ func getLastTrackedBlockNumberAndBlockRangesFromS3() (int64, []types.BlockRange,
104104
}
105105
log.Debug().Int64("max_block_number", maxBlockNumber).Msg("Retrieved max block number from ClickHouse.(-1 means nothing committed yet, start from 0)")
106106

107+
// Optional override: force the committer to start from a specific block number.
108+
// We implement this by pretending ClickHouse max is (startBlock - 1), so both S3
109+
// range scanning and live RPC polling begin at startBlock.
110+
if config.Cfg.CommitterStartBlock > 0 {
111+
overrideMax := int64(config.Cfg.CommitterStartBlock) - 1
112+
if maxBlockNumber < overrideMax {
113+
maxBlockNumber = overrideMax
114+
log.Info().
115+
Int64("clickhouse_max_block", maxBlockNumber).
116+
Uint64("override_start_block", config.Cfg.CommitterStartBlock).
117+
Msg("CommitterStartBlock override enabled; starting earlier than ClickHouse cursor")
118+
}
119+
}
120+
107121
blockRanges, err := libs.GetBlockRangesFromS3(maxBlockNumber)
108122
if err != nil {
109123
log.Error().Err(err).Msg("Failed to get block ranges from S3")

0 commit comments

Comments
 (0)