Skip to content

Commit 092c81e

Browse files
committed
fix: first block num
1 parent 8fe74e3 commit 092c81e

11 files changed

Lines changed: 282 additions & 31 deletions

common.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"errors"
1010
"fmt"
1111
"io"
12+
"math"
1213
"os"
1314
"path"
1415
"sort"
@@ -22,6 +23,8 @@ import (
2223
"github.com/c2h5oh/datasize"
2324
)
2425

26+
const NoBlockNum = uint64(math.MaxUint64)
27+
2528
type Dataset struct {
2629
Name string
2730
Version string

filter.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,6 @@ func (o FilterBuilderOptions[T]) WithDefaults() FilterBuilderOptions[T] {
4343
}
4444

4545
type filterBuilder[T any] struct {
46-
ctx context.Context
47-
4846
indexes map[IndexName]Index[T]
4947
fs storage.FS
5048
}

filter_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func generateMixedIntBlocks() []Block[[]int] {
5858
// 45-49 generate 5 blocks with no data
5959
// 50-69 generate 20 blocks with random but repeating huge numbers
6060

61-
for i := 1; i <= 20; i++ {
61+
for i := 0; i <= 20; i++ {
6262
blocks = append(blocks, Block[[]int]{
6363
Hash: common.BytesToHash([]byte{byte(i)}),
6464
Number: uint64(i),
@@ -288,7 +288,7 @@ func TestIntMixFiltering(t *testing.T) {
288288
}
289289

290290
onlyEvenResults := onlyEvenFilter.IndexIterator(context.Background())
291-
assert.Len(t, onlyEvenResults.Bitmap().ToArray(), 20)
291+
assert.Len(t, onlyEvenResults.Bitmap().ToArray(), 21)
292292
for _, block := range onlyEvenResults.Bitmap().ToArray() {
293293
assert.True(t, block <= 20)
294294
}
@@ -373,7 +373,7 @@ func TestLowestIndexedBlockNum(t *testing.T) {
373373
})
374374
assert.NoError(t, err)
375375
lowestBlockIndexed = indexer.BlockNum()
376-
assert.Equal(t, uint64(0), lowestBlockIndexed)
376+
assert.Equal(t, NoBlockNum, lowestBlockIndexed)
377377
blocks := generateIntBlocks()
378378
for _, block := range blocks[:50] {
379379
err = indexer.Index(context.Background(), block)

index.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func (u *IndexUpdate) Merge(update *IndexUpdate) {
5050
u.BlockBitmap[indexValue].Or(bm)
5151
}
5252

53-
if u.LastBlockNum < update.LastBlockNum {
53+
if u.LastBlockNum == NoBlockNum || u.LastBlockNum < update.LastBlockNum {
5454
u.LastBlockNum = update.LastBlockNum
5555
}
5656
}
@@ -97,7 +97,7 @@ func (i *Index[T]) IndexBlock(ctx context.Context, fs storage.FS, block Block[T]
9797
return nil, fmt.Errorf("unexpected: failed to get number of blocks indexed: %w", err)
9898
}
9999

100-
if block.Number <= numBlocksIndexed {
100+
if numBlocksIndexed != NoBlockNum && block.Number <= numBlocksIndexed {
101101
return nil, nil
102102
}
103103
}
@@ -148,7 +148,7 @@ func (i *Index[T]) Store(ctx context.Context, fs storage.FS, indexUpdate *IndexU
148148
if err != nil {
149149
return fmt.Errorf("failed to get number of blocks indexed: %w", err)
150150
}
151-
if lastBlockNumIndexed >= indexUpdate.LastBlockNum {
151+
if lastBlockNumIndexed != NoBlockNum && lastBlockNumIndexed >= indexUpdate.LastBlockNum {
152152
return nil
153153
}
154154

@@ -169,12 +169,14 @@ func (i *Index[T]) Store(ctx context.Context, fs storage.FS, indexUpdate *IndexU
169169

170170
bmap.Or(bmUpdate)
171171

172+
fmt.Println("---- writing bm", i.name, indexValue, bmap.GetCardinality())
172173
err = file.Write(ctx, bmap)
173174
if err != nil {
174175
return err
175176
}
176177
}
177178

179+
fmt.Println("---- writing last block num", i.name, indexUpdate.LastBlockNum)
178180
err = i.storeLastBlockNumIndexed(ctx, fs, indexUpdate.LastBlockNum)
179181
if err != nil {
180182
return fmt.Errorf("failed to index number of blocks indexed: %w", err)
@@ -191,19 +193,19 @@ func (i *Index[T]) LastBlockNumIndexed(ctx context.Context, fs storage.FS) (uint
191193
file, err := fs.Open(ctx, indexedBlockNumFilePath(string(i.name)), nil)
192194
if err != nil {
193195
// file doesn't exist
194-
return 0, nil
196+
return NoBlockNum, nil
195197
}
196198
defer file.Close()
197199

198200
buf, err := io.ReadAll(file)
199201
if err != nil {
200-
return 0, fmt.Errorf("failed to read IndexBlock file: %w", err)
202+
return NoBlockNum, fmt.Errorf("failed to read IndexBlock file: %w", err)
201203
}
202204

203205
var numBlocksIndexed uint64
204206
err = binary.Read(bytes.NewReader(buf), binary.BigEndian, &numBlocksIndexed)
205207
if err != nil {
206-
return 0, fmt.Errorf("failed to unmarshal bitmap: %w", err)
208+
return NoBlockNum, fmt.Errorf("failed to unmarshal bitmap: %w", err)
207209
}
208210

209211
i.numBlocksIndexed = &atomic.Uint64{}
@@ -219,7 +221,7 @@ func (i *Index[T]) storeLastBlockNumIndexed(ctx context.Context, fs storage.FS,
219221
prevBlockIndexed = blocksIndexed
220222
}
221223

222-
if prevBlockIndexed >= numBlocksIndexed {
224+
if prevBlockIndexed != NoBlockNum && prevBlockIndexed >= numBlocksIndexed {
223225
return nil
224226
}
225227

indexer.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -134,16 +134,21 @@ func (i *Indexer[T]) BlockNum() uint64 {
134134
i.mu.Lock()
135135
defer i.mu.Unlock()
136136

137+
// initialize lowestBlockNum to math.MaxUint64
137138
var lowestBlockNum uint64 = math.MaxUint64
138139
for _, indexUpdate := range i.indexUpdates {
140+
// if no blocks have been indexed, return NoBlockNum
141+
if indexUpdate.LastBlockNum == NoBlockNum {
142+
return NoBlockNum
143+
}
144+
145+
// update lowestBlockNum if the current indexUpdate has a lower last block number
139146
if indexUpdate.LastBlockNum < lowestBlockNum {
140147
lowestBlockNum = indexUpdate.LastBlockNum
141148
}
142149
}
143150

144-
if lowestBlockNum == math.MaxUint64 {
145-
return 0
146-
}
151+
// return the lowest block number indexed by all indexes
147152
return lowestBlockNum
148153
}
149154

reader.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"fmt"
77
"io"
8+
"math"
89
"os"
910
"sync"
1011
"time"
@@ -29,10 +30,9 @@ type Reader[T any] interface {
2930
}
3031

3132
type reader[T any] struct {
32-
options Options
33-
path string
34-
fs storage.FS
35-
useCompression bool
33+
options Options
34+
path string
35+
fs storage.FS
3636

3737
closer io.Closer
3838

@@ -140,8 +140,11 @@ func (r *reader[T]) Read(ctx context.Context) (Block[T], error) {
140140
}
141141
}
142142

143-
var block Block[T]
144-
for structs.IsZero(block) || block.Number <= r.lastBlockNum {
143+
var (
144+
block Block[T]
145+
hasSuccessfullyReadBlock bool
146+
)
147+
for !hasSuccessfullyReadBlock || block.Number <= r.lastBlockNum && r.lastBlockNum != math.MaxUint64 {
145148
select {
146149
case <-ctx.Done():
147150
return Block[T]{}, ctx.Err()
@@ -168,6 +171,8 @@ func (r *reader[T]) Read(ctx context.Context) (Block[T], error) {
168171
r.lastBlockNum = block.Number
169172
}
170173

174+
hasSuccessfullyReadBlock = true
175+
171176
if !r.isBlockWithin(block) {
172177
currentFile := r.fileIndex.At(r.currFileIndex)
173178
return Block[T]{}, fmt.Errorf("block number %d is out of file block %d-%d range",
@@ -181,6 +186,8 @@ func (r *reader[T]) Read(ctx context.Context) (Block[T], error) {
181186
return Block[T]{}, fmt.Errorf("failed to decode file data: %w", err)
182187
}
183188

189+
hasSuccessfullyReadBlock = true
190+
184191
if !r.isBlockWithin(block) {
185192
currentFile := r.fileIndex.At(r.currFileIndex)
186193
return Block[T]{}, fmt.Errorf("block number %d is out of file block %d-%d range",

writer.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"context"
66
"fmt"
77
"io"
8+
"math"
89
"os"
910
"sync"
1011

@@ -77,18 +78,26 @@ func NewWriter[T any](opt Options) (Writer[T], error) {
7778
return nil, fmt.Errorf("failed to load file index: %w", err)
7879
}
7980

81+
var firstBlockNum uint64
8082
var lastBlockNum uint64
8183
var fileIndexFileList = fileIndex.Files()
8284
if len(fileIndexFileList) > 0 {
8385
lastBlockNum = fileIndexFileList[len(fileIndexFileList)-1].LastBlockNum
8486
}
8587

88+
if lastBlockNum == 0 {
89+
firstBlockNum = NoBlockNum
90+
lastBlockNum = NoBlockNum
91+
} else {
92+
firstBlockNum = lastBlockNum + 1
93+
}
94+
8695
// create new writer
8796
return &writer[T]{
8897
options: opt,
8998
path: datasetPath,
9099
fs: fs,
91-
firstBlockNum: lastBlockNum + 1,
100+
firstBlockNum: firstBlockNum,
92101
lastBlockNum: lastBlockNum,
93102
fileIndex: fileIndex,
94103
buffer: bytes.NewBuffer(make([]byte, 0, defaultFileSize)),
@@ -103,7 +112,7 @@ func (w *writer[T]) Write(ctx context.Context, b Block[T]) error {
103112
w.mu.Lock()
104113
defer w.mu.Unlock()
105114

106-
if w.lastBlockNum >= b.Number {
115+
if w.lastBlockNum != math.MaxUint64 && w.lastBlockNum >= b.Number {
107116
return nil
108117
}
109118

@@ -118,6 +127,10 @@ func (w *writer[T]) Write(ctx context.Context, b Block[T]) error {
118127
return fmt.Errorf("failed to encode file data: %w", err)
119128
}
120129

130+
if w.firstBlockNum == NoBlockNum {
131+
w.firstBlockNum = b.Number
132+
}
133+
121134
w.lastBlockNum = b.Number
122135
w.options.FileRollPolicy.onBlockProcessed(w.lastBlockNum)
123136
return nil

writer_no_gap.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package ethwal
22

33
import (
44
"context"
5+
"fmt"
56

67
"github.com/0xsequence/ethwal/storage"
78
)
@@ -13,7 +14,8 @@ type noGapWriter[T any] struct {
1314
}
1415

1516
func NewWriterNoGap[T any](w Writer[T]) Writer[T] {
16-
return &noGapWriter[T]{w: w}
17+
fmt.Println("NewWriterNoGap", w.BlockNum())
18+
return &noGapWriter[T]{w: w, lastBlockNum: w.BlockNum()}
1719
}
1820

1921
func (n *noGapWriter[T]) FileSystem() storage.FS {
@@ -24,7 +26,7 @@ func (n *noGapWriter[T]) Write(ctx context.Context, b Block[T]) error {
2426
defer func() { n.lastBlockNum = b.Number }()
2527

2628
// skip if block number is less than or equal to last block number
27-
if b.Number <= n.lastBlockNum {
29+
if n.lastBlockNum != NoBlockNum && b.Number <= n.lastBlockNum {
2830
return nil
2931
}
3032

@@ -40,6 +42,7 @@ func (n *noGapWriter[T]) Write(ctx context.Context, b Block[T]) error {
4042
return err
4143
}
4244
}
45+
4346
return n.w.Write(ctx, b)
4447
}
4548

0 commit comments

Comments
 (0)