-
Notifications
You must be signed in to change notification settings - Fork 27
Expand file tree
/
Copy pathcommitter.go
More file actions
166 lines (134 loc) · 5.34 KB
/
committer.go
File metadata and controls
166 lines (134 loc) · 5.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
package committer
import (
"context"
"fmt"
"os"
"path/filepath"
"github.com/rs/zerolog/log"
config "github.com/thirdweb-dev/indexer/configs"
"github.com/thirdweb-dev/indexer/internal/common"
"github.com/thirdweb-dev/indexer/internal/libs"
"github.com/thirdweb-dev/indexer/internal/metrics"
"github.com/thirdweb-dev/indexer/internal/types"
)
type BlockDataWithSize struct {
BlockData *common.BlockData
ByteSize uint64
Acquired bool // Whether semaphore was acquired for this block
}
var tempDir = filepath.Join(os.TempDir(), "committer")
var blockDataChannel chan *BlockDataWithSize
var downloadedFilePathChannel chan string
var memorySemaphore *SafeSemaphore // Safe semaphore for memory-based flow control
var nextBlockNumber uint64 = 0
func Init() {
libs.InitRPCClient()
libs.InitNewClickHouseV2()
libs.InitS3()
libs.InitKafkaV2()
tempDir = filepath.Join(os.TempDir(), "committer", fmt.Sprintf("chain_%d", libs.ChainId.Uint64()))
// Set up safe semaphore for memory-based flow control (convert MB to bytes)
maxMemoryBytes := int64(config.Cfg.CommitterMaxMemoryMB) * 1024 * 1024
memorySemaphore = NewSafeSemaphore(maxMemoryBytes)
log.Info().
Int("max_memory_mb", config.Cfg.CommitterMaxMemoryMB).
Int64("max_memory_bytes", maxMemoryBytes).
Msg("Initialized committer with safe semaphore memory limiting")
// streaming channels
blockDataChannel = make(chan *BlockDataWithSize)
downloadedFilePathChannel = make(chan string, config.Cfg.StagingS3MaxParallelFileDownload)
}
func CommitStreaming() error {
log.Info().Str("chain_id", libs.ChainIdStr).Msg("Starting streaming commit process")
// Initialize metrics labels
chainIdStr := libs.ChainIdStr
indexerName := config.Cfg.ZeetProjectName
maxBlockNumber, blockRanges, err := getLastTrackedBlockNumberAndBlockRangesFromS3()
if err != nil {
log.Error().
Err(err).
Int64("max_block_number", maxBlockNumber).
Msg("Failed to get last tracked block number and block ranges from S3")
return err
}
log.Debug().
Int64("maxBlockNumber", maxBlockNumber).
Msg("No files to process - all blocks are up to date from S3")
nextBlockNumber = uint64(maxBlockNumber + 1)
// Update next block number metric
metrics.CommitterNextBlockNumber.WithLabelValues(indexerName, chainIdStr).Set(float64(nextBlockNumber))
if len(blockRanges) != 0 {
log.Info().Uint64("next_commit_block", nextBlockNumber).Msg("Streaming data from s3")
blockParserDone := make(chan struct{})
blockProcessorDone := make(chan struct{})
go blockParserRoutine(blockParserDone)
go blockProcessorRoutine(blockProcessorDone)
downloadFilesForBlockRange(blockRanges)
close(downloadedFilePathChannel)
<-blockParserDone
close(blockDataChannel)
<-blockProcessorDone
}
log.Info().Msg("Consuming latest blocks from RPC")
pollLatest()
return nil
}
func getLastTrackedBlockNumberAndBlockRangesFromS3() (int64, []types.BlockRange, error) {
log.Info().Str("chain_id", libs.ChainIdStr).Msg("Starting streaming commit process")
maxBlockNumber, err := libs.GetMaxBlockNumberFromClickHouseV2(libs.ChainId)
if err != nil {
log.Error().Err(err).Msg("Failed to get max block number from ClickHouse")
return -1, nil, err
}
log.Debug().Int64("max_block_number", maxBlockNumber).Msg("Retrieved max block number from ClickHouse.(-1 means nothing committed yet, start from 0)")
// Optional override: force the committer to start from a specific block number.
// We implement this by pretending ClickHouse max is (startBlock - 1), so both S3
// range scanning and live RPC polling begin at startBlock.
if config.Cfg.CommitterStartBlock > 0 {
overrideMax := int64(config.Cfg.CommitterStartBlock) - 1
if maxBlockNumber < overrideMax {
maxBlockNumber = overrideMax
log.Info().
Int64("clickhouse_max_block", maxBlockNumber).
Uint64("override_start_block", config.Cfg.CommitterStartBlock).
Msg("CommitterStartBlock override enabled; starting earlier than ClickHouse cursor")
}
}
blockRanges, err := libs.GetBlockRangesFromS3(maxBlockNumber)
if err != nil {
log.Error().Err(err).Msg("Failed to get block ranges from S3")
return -1, nil, err
}
log.Debug().Int("filtered_ranges", len(blockRanges)).Msg("Got block ranges from S3")
return maxBlockNumber, blockRanges, nil
}
func downloadFilesForBlockRange(blockRanges []types.BlockRange) {
for i, blockRange := range blockRanges {
log.Info().
Int("processing", i+1).
Int("total", len(blockRanges)).
Str("file", blockRange.S3Key).
Uint64("start_block", blockRange.StartBlock).
Uint64("end_block", blockRange.EndBlock).
Msg("Starting download")
filePath, err := libs.DownloadFile(tempDir, &blockRange)
if err != nil {
log.Panic().Err(err).Str("file", blockRange.S3Key).Msg("Failed to download file")
}
downloadedFilePathChannel <- filePath
}
log.Info().Msg("All downloads completed, closing download channel")
}
// Helper functions for safe semaphore memory tracking
func acquireMemoryPermit(size uint64) (bool, error) {
// If channel is empty, processor is ready to handle immediately - skip semaphore
if len(blockDataChannel) == 0 {
return false, nil // false = didn't acquire semaphore
}
// If channel has data, processor is busy - apply semaphore locking
err := memorySemaphore.Acquire(context.Background(), int64(size))
return true, err // true = acquired semaphore
}
func releaseMemoryPermit(size uint64) {
memorySemaphore.Release(int64(size))
}