Skip to content

Commit 1f629e0

Browse files
committed
fix: first block num
1 parent 8fe74e3 commit 1f629e0

11 files changed

Lines changed: 278 additions & 31 deletions

common.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"errors"
1010
"fmt"
1111
"io"
12+
"math"
1213
"os"
1314
"path"
1415
"sort"
@@ -22,6 +23,8 @@ import (
2223
"github.com/c2h5oh/datasize"
2324
)
2425

26+
const NoBlockNum = uint64(math.MaxUint64)
27+
2528
type Dataset struct {
2629
Name string
2730
Version string

filter.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,6 @@ func (o FilterBuilderOptions[T]) WithDefaults() FilterBuilderOptions[T] {
4343
}
4444

4545
type filterBuilder[T any] struct {
46-
ctx context.Context
47-
4846
indexes map[IndexName]Index[T]
4947
fs storage.FS
5048
}

filter_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func generateMixedIntBlocks() []Block[[]int] {
5858
// 45-49 generate 5 blocks with no data
5959
// 50-69 generate 20 blocks with random but repeating huge numbers
6060

61-
for i := 1; i <= 20; i++ {
61+
for i := 0; i <= 20; i++ {
6262
blocks = append(blocks, Block[[]int]{
6363
Hash: common.BytesToHash([]byte{byte(i)}),
6464
Number: uint64(i),
@@ -288,7 +288,7 @@ func TestIntMixFiltering(t *testing.T) {
288288
}
289289

290290
onlyEvenResults := onlyEvenFilter.IndexIterator(context.Background())
291-
assert.Len(t, onlyEvenResults.Bitmap().ToArray(), 20)
291+
assert.Len(t, onlyEvenResults.Bitmap().ToArray(), 21)
292292
for _, block := range onlyEvenResults.Bitmap().ToArray() {
293293
assert.True(t, block <= 20)
294294
}
@@ -373,7 +373,7 @@ func TestLowestIndexedBlockNum(t *testing.T) {
373373
})
374374
assert.NoError(t, err)
375375
lowestBlockIndexed = indexer.BlockNum()
376-
assert.Equal(t, uint64(0), lowestBlockIndexed)
376+
assert.Equal(t, NoBlockNum, lowestBlockIndexed)
377377
blocks := generateIntBlocks()
378378
for _, block := range blocks[:50] {
379379
err = indexer.Index(context.Background(), block)

index.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func (u *IndexUpdate) Merge(update *IndexUpdate) {
5050
u.BlockBitmap[indexValue].Or(bm)
5151
}
5252

53-
if u.LastBlockNum < update.LastBlockNum {
53+
if u.LastBlockNum == NoBlockNum || u.LastBlockNum < update.LastBlockNum {
5454
u.LastBlockNum = update.LastBlockNum
5555
}
5656
}
@@ -97,7 +97,7 @@ func (i *Index[T]) IndexBlock(ctx context.Context, fs storage.FS, block Block[T]
9797
return nil, fmt.Errorf("unexpected: failed to get number of blocks indexed: %w", err)
9898
}
9999

100-
if block.Number <= numBlocksIndexed {
100+
if numBlocksIndexed != NoBlockNum && block.Number <= numBlocksIndexed {
101101
return nil, nil
102102
}
103103
}
@@ -148,7 +148,7 @@ func (i *Index[T]) Store(ctx context.Context, fs storage.FS, indexUpdate *IndexU
148148
if err != nil {
149149
return fmt.Errorf("failed to get number of blocks indexed: %w", err)
150150
}
151-
if lastBlockNumIndexed >= indexUpdate.LastBlockNum {
151+
if lastBlockNumIndexed != NoBlockNum && lastBlockNumIndexed >= indexUpdate.LastBlockNum {
152152
return nil
153153
}
154154

@@ -191,19 +191,19 @@ func (i *Index[T]) LastBlockNumIndexed(ctx context.Context, fs storage.FS) (uint
191191
file, err := fs.Open(ctx, indexedBlockNumFilePath(string(i.name)), nil)
192192
if err != nil {
193193
// file doesn't exist
194-
return 0, nil
194+
return NoBlockNum, nil
195195
}
196196
defer file.Close()
197197

198198
buf, err := io.ReadAll(file)
199199
if err != nil {
200-
return 0, fmt.Errorf("failed to read IndexBlock file: %w", err)
200+
return NoBlockNum, fmt.Errorf("failed to read IndexBlock file: %w", err)
201201
}
202202

203203
var numBlocksIndexed uint64
204204
err = binary.Read(bytes.NewReader(buf), binary.BigEndian, &numBlocksIndexed)
205205
if err != nil {
206-
return 0, fmt.Errorf("failed to unmarshal bitmap: %w", err)
206+
return NoBlockNum, fmt.Errorf("failed to unmarshal bitmap: %w", err)
207207
}
208208

209209
i.numBlocksIndexed = &atomic.Uint64{}
@@ -219,7 +219,7 @@ func (i *Index[T]) storeLastBlockNumIndexed(ctx context.Context, fs storage.FS,
219219
prevBlockIndexed = blocksIndexed
220220
}
221221

222-
if prevBlockIndexed >= numBlocksIndexed {
222+
if prevBlockIndexed != NoBlockNum && prevBlockIndexed >= numBlocksIndexed {
223223
return nil
224224
}
225225

indexer.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -134,16 +134,21 @@ func (i *Indexer[T]) BlockNum() uint64 {
134134
i.mu.Lock()
135135
defer i.mu.Unlock()
136136

137+
// initialize lowestBlockNum to math.MaxUint64
137138
var lowestBlockNum uint64 = math.MaxUint64
138139
for _, indexUpdate := range i.indexUpdates {
140+
// if no blocks have been indexed, return NoBlockNum
141+
if indexUpdate.LastBlockNum == NoBlockNum {
142+
return NoBlockNum
143+
}
144+
145+
// update lowestBlockNum if the current indexUpdate has a lower last block number
139146
if indexUpdate.LastBlockNum < lowestBlockNum {
140147
lowestBlockNum = indexUpdate.LastBlockNum
141148
}
142149
}
143150

144-
if lowestBlockNum == math.MaxUint64 {
145-
return 0
146-
}
151+
// return the lowest block number indexed by all indexes
147152
return lowestBlockNum
148153
}
149154

reader.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"fmt"
77
"io"
8+
"math"
89
"os"
910
"sync"
1011
"time"
@@ -29,10 +30,9 @@ type Reader[T any] interface {
2930
}
3031

3132
type reader[T any] struct {
32-
options Options
33-
path string
34-
fs storage.FS
35-
useCompression bool
33+
options Options
34+
path string
35+
fs storage.FS
3636

3737
closer io.Closer
3838

@@ -140,8 +140,11 @@ func (r *reader[T]) Read(ctx context.Context) (Block[T], error) {
140140
}
141141
}
142142

143-
var block Block[T]
144-
for structs.IsZero(block) || block.Number <= r.lastBlockNum {
143+
var (
144+
block Block[T]
145+
hasSuccessfullyReadBlock bool
146+
)
147+
for !hasSuccessfullyReadBlock || block.Number <= r.lastBlockNum && r.lastBlockNum != math.MaxUint64 {
145148
select {
146149
case <-ctx.Done():
147150
return Block[T]{}, ctx.Err()
@@ -168,6 +171,8 @@ func (r *reader[T]) Read(ctx context.Context) (Block[T], error) {
168171
r.lastBlockNum = block.Number
169172
}
170173

174+
hasSuccessfullyReadBlock = true
175+
171176
if !r.isBlockWithin(block) {
172177
currentFile := r.fileIndex.At(r.currFileIndex)
173178
return Block[T]{}, fmt.Errorf("block number %d is out of file block %d-%d range",
@@ -181,6 +186,8 @@ func (r *reader[T]) Read(ctx context.Context) (Block[T], error) {
181186
return Block[T]{}, fmt.Errorf("failed to decode file data: %w", err)
182187
}
183188

189+
hasSuccessfullyReadBlock = true
190+
184191
if !r.isBlockWithin(block) {
185192
currentFile := r.fileIndex.At(r.currFileIndex)
186193
return Block[T]{}, fmt.Errorf("block number %d is out of file block %d-%d range",

writer.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"context"
66
"fmt"
77
"io"
8+
"math"
89
"os"
910
"sync"
1011

@@ -77,18 +78,26 @@ func NewWriter[T any](opt Options) (Writer[T], error) {
7778
return nil, fmt.Errorf("failed to load file index: %w", err)
7879
}
7980

81+
var firstBlockNum uint64
8082
var lastBlockNum uint64
8183
var fileIndexFileList = fileIndex.Files()
8284
if len(fileIndexFileList) > 0 {
8385
lastBlockNum = fileIndexFileList[len(fileIndexFileList)-1].LastBlockNum
8486
}
8587

88+
if lastBlockNum == 0 {
89+
firstBlockNum = NoBlockNum
90+
lastBlockNum = NoBlockNum
91+
} else {
92+
firstBlockNum = lastBlockNum + 1
93+
}
94+
8695
// create new writer
8796
return &writer[T]{
8897
options: opt,
8998
path: datasetPath,
9099
fs: fs,
91-
firstBlockNum: lastBlockNum + 1,
100+
firstBlockNum: firstBlockNum,
92101
lastBlockNum: lastBlockNum,
93102
fileIndex: fileIndex,
94103
buffer: bytes.NewBuffer(make([]byte, 0, defaultFileSize)),
@@ -103,7 +112,7 @@ func (w *writer[T]) Write(ctx context.Context, b Block[T]) error {
103112
w.mu.Lock()
104113
defer w.mu.Unlock()
105114

106-
if w.lastBlockNum >= b.Number {
115+
if w.lastBlockNum != math.MaxUint64 && w.lastBlockNum >= b.Number {
107116
return nil
108117
}
109118

@@ -118,6 +127,10 @@ func (w *writer[T]) Write(ctx context.Context, b Block[T]) error {
118127
return fmt.Errorf("failed to encode file data: %w", err)
119128
}
120129

130+
if w.firstBlockNum == NoBlockNum {
131+
w.firstBlockNum = b.Number
132+
}
133+
121134
w.lastBlockNum = b.Number
122135
w.options.FileRollPolicy.onBlockProcessed(w.lastBlockNum)
123136
return nil

writer_no_gap.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ type noGapWriter[T any] struct {
1313
}
1414

1515
func NewWriterNoGap[T any](w Writer[T]) Writer[T] {
16-
return &noGapWriter[T]{w: w}
16+
return &noGapWriter[T]{w: w, lastBlockNum: w.BlockNum()}
1717
}
1818

1919
func (n *noGapWriter[T]) FileSystem() storage.FS {
@@ -24,7 +24,7 @@ func (n *noGapWriter[T]) Write(ctx context.Context, b Block[T]) error {
2424
defer func() { n.lastBlockNum = b.Number }()
2525

2626
// skip if block number is less than or equal to last block number
27-
if b.Number <= n.lastBlockNum {
27+
if n.lastBlockNum != NoBlockNum && b.Number <= n.lastBlockNum {
2828
return nil
2929
}
3030

@@ -40,6 +40,7 @@ func (n *noGapWriter[T]) Write(ctx context.Context, b Block[T]) error {
4040
return err
4141
}
4242
}
43+
4344
return n.w.Write(ctx, b)
4445
}
4546

writer_no_gap_test.go

Lines changed: 60 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ func TestWriterNoGap(t *testing.T) {
3030
ngw := NewWriterNoGap[int](w)
3131
require.NotNil(t, w)
3232

33+
err = ngw.Write(context.Background(), Block[int]{Number: 0})
34+
require.NoError(t, err)
35+
3336
err = ngw.Write(context.Background(), Block[int]{Number: 1})
3437
require.NoError(t, err)
3538

@@ -46,7 +49,7 @@ func TestWriterNoGap(t *testing.T) {
4649
require.NoError(t, err)
4750

4851
walData, err := os.ReadFile(
49-
path.Join(buildETHWALPath(opt.Dataset.Name, opt.Dataset.Version, opt.Dataset.Path), (&File{FirstBlockNum: 1, LastBlockNum: 3}).Path()),
52+
path.Join(buildETHWALPath(opt.Dataset.Name, opt.Dataset.Version, opt.Dataset.Path), (&File{FirstBlockNum: 0, LastBlockNum: 3}).Path()),
5053
)
5154
require.NoError(t, err)
5255

@@ -59,7 +62,57 @@ func TestWriterNoGap(t *testing.T) {
5962
blockCount++
6063
}
6164

62-
require.Equal(t, 3, blockCount)
65+
require.Equal(t, 4, blockCount)
66+
})
67+
68+
t.Run("gap_first_block", func(t *testing.T) {
69+
defer testTeardown(t)
70+
71+
opt := Options{
72+
Dataset: Dataset{
73+
Name: "int-wal",
74+
Path: testPath,
75+
Version: defaultDatasetVersion,
76+
},
77+
NewEncoder: NewJSONEncoder,
78+
}.WithDefaults()
79+
80+
w, err := NewWriter[int](opt)
81+
require.NoError(t, err)
82+
83+
ngw := NewWriterNoGap[int](w)
84+
require.NotNil(t, w)
85+
86+
err = ngw.Write(context.Background(), Block[int]{Number: 1})
87+
require.NoError(t, err)
88+
89+
err = ngw.Write(context.Background(), Block[int]{Number: 2})
90+
require.NoError(t, err)
91+
92+
err = ngw.Write(context.Background(), Block[int]{Number: 3})
93+
require.NoError(t, err)
94+
95+
err = (w.(*writer[int])).rollFile(context.Background())
96+
require.NoError(t, err)
97+
98+
err = ngw.Close(context.Background())
99+
require.NoError(t, err)
100+
101+
walData, err := os.ReadFile(
102+
path.Join(buildETHWALPath(opt.Dataset.Name, opt.Dataset.Version, opt.Dataset.Path), (&File{FirstBlockNum: 0, LastBlockNum: 3}).Path()),
103+
)
104+
require.NoError(t, err)
105+
106+
d := NewJSONDecoder(bytes.NewBuffer(walData))
107+
108+
var b Block[int]
109+
var blockCount int
110+
for d.Decode(&b) != io.EOF {
111+
require.NoError(t, err)
112+
blockCount++
113+
}
114+
115+
require.Equal(t, 4, blockCount)
63116
})
64117

65118
t.Run("gap_3_10", func(t *testing.T) {
@@ -80,6 +133,9 @@ func TestWriterNoGap(t *testing.T) {
80133
ngw := NewWriterNoGap[int](w)
81134
require.NotNil(t, w)
82135

136+
err = ngw.Write(context.Background(), Block[int]{Number: 0})
137+
require.NoError(t, err)
138+
83139
err = ngw.Write(context.Background(), Block[int]{Number: 1})
84140
require.NoError(t, err)
85141

@@ -98,7 +154,7 @@ func TestWriterNoGap(t *testing.T) {
98154
require.NoError(t, err)
99155

100156
walData, err := os.ReadFile(
101-
path.Join(buildETHWALPath(opt.Dataset.Name, opt.Dataset.Version, opt.Dataset.Path), (&File{FirstBlockNum: 1, LastBlockNum: 10}).Path()),
157+
path.Join(buildETHWALPath(opt.Dataset.Name, opt.Dataset.Version, opt.Dataset.Path), (&File{FirstBlockNum: 0, LastBlockNum: 10}).Path()),
102158
)
103159
require.NoError(t, err)
104160

@@ -111,6 +167,6 @@ func TestWriterNoGap(t *testing.T) {
111167
blockCount++
112168
}
113169

114-
require.Equal(t, 10, blockCount)
170+
require.Equal(t, 11, blockCount)
115171
})
116172
}

0 commit comments

Comments
 (0)