Skip to content

Commit 58e9b18

Browse files
committed
feat: Implement adaptive ZSTD compression levels for DA blob submission based on batch size.
(cherry picked from commit 3ff6211ec69d482437de79a617b5b39a88c91835)
1 parent cc68320 commit 58e9b18

3 files changed

Lines changed: 102 additions & 24 deletions

File tree

block/internal/da/client.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,10 +88,20 @@ func (c *client) Submit(ctx context.Context, data [][]byte, _ float64, namespace
8888
}
8989
}
9090

91+
// Select compression level based on backlog pressure:
92+
// large batch = high backlog = prioritize speed;
93+
// small batch = low backlog = prioritize ratio.
94+
compLevel := da.LevelBest
95+
switch {
96+
case len(data) > 10:
97+
compLevel = da.LevelFastest
98+
case len(data) > 3:
99+
compLevel = da.LevelDefault
100+
}
101+
91102
blobs := make([]*blobrpc.Blob, len(data))
92103
for i, raw := range data {
93-
// Compress blob data before submission to reduce bandwidth and storage costs
94-
compressed, compErr := da.Compress(raw)
104+
compressed, compErr := da.Compress(raw, compLevel)
95105
if compErr != nil {
96106
return datypes.ResultSubmit{
97107
BaseResult: datypes.BaseResult{
@@ -104,6 +114,7 @@ func (c *client) Submit(ctx context.Context, data [][]byte, _ float64, namespace
104114
Int("original_size", len(raw)).
105115
Int("compressed_size", len(compressed)).
106116
Float64("ratio", float64(len(compressed))/float64(len(raw))).
117+
Int("level", int(compLevel)).
107118
Msg("compressed blob for DA submission")
108119

109120
if uint64(len(compressed)) > common.DefaultMaxBlobSize {

pkg/da/compression.go

Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,33 +10,59 @@ import (
1010
// ASCII "ZSTD" = 0x5A 0x53 0x54 0x44.
1111
var magic = []byte{0x5A, 0x53, 0x54, 0x44}
1212

13-
// encoder and decoder are package-level singletons. They are safe for
14-
// concurrent use per the klauspost/compress documentation.
15-
var (
16-
encoder *zstd.Encoder
17-
decoder *zstd.Decoder
13+
// CompressionLevel controls the speed/ratio trade-off for blob compression.
14+
type CompressionLevel int
15+
16+
const (
17+
// LevelFastest prioritizes speed over compression ratio.
18+
// Use when backlog is high and throughput matters most.
19+
LevelFastest CompressionLevel = iota
20+
// LevelDefault balances speed and compression ratio.
21+
LevelDefault
22+
// LevelBest prioritizes compression ratio over speed.
23+
// Use when backlog is low to save bandwidth and storage.
24+
LevelBest
1825
)
1926

27+
// encoders holds one zstd encoder per compression level. Each is safe for
28+
// concurrent use per the klauspost/compress documentation.
29+
var encoders [3]*zstd.Encoder
30+
31+
// decoder is a package-level singleton, safe for concurrent use.
32+
var decoder *zstd.Decoder
33+
2034
func init() {
21-
var err error
22-
encoder, err = zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedDefault))
23-
if err != nil {
24-
panic(fmt.Sprintf("compression: create zstd encoder: %v", err))
35+
levels := [3]zstd.EncoderLevel{
36+
zstd.SpeedFastest,
37+
zstd.SpeedDefault,
38+
zstd.SpeedBestCompression,
39+
}
40+
for i, lvl := range levels {
41+
enc, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(lvl))
42+
if err != nil {
43+
panic(fmt.Sprintf("compression: create zstd encoder (level %d): %v", i, err))
44+
}
45+
encoders[i] = enc
2546
}
47+
var err error
2648
decoder, err = zstd.NewReader(nil)
2749
if err != nil {
2850
panic(fmt.Sprintf("compression: create zstd decoder: %v", err))
2951
}
3052
}
3153

32-
// Compress compresses data using zstd and prepends the magic prefix.
54+
// Compress compresses data using zstd at the given level and prepends the magic prefix.
3355
// Returns the original data unchanged if it is empty.
34-
func Compress(data []byte) ([]byte, error) {
56+
func Compress(data []byte, level CompressionLevel) ([]byte, error) {
3557
if len(data) == 0 {
3658
return data, nil
3759
}
3860

39-
compressed := encoder.EncodeAll(data, nil)
61+
if level < LevelFastest || level > LevelBest {
62+
level = LevelDefault
63+
}
64+
65+
compressed := encoders[level].EncodeAll(data, nil)
4066

4167
// Prepend magic prefix
4268
result := make([]byte, len(magic)+len(compressed))
@@ -57,7 +83,7 @@ func Decompress(data []byte) ([]byte, error) {
5783
// Strip magic prefix and decompress
5884
decompressed, err := decoder.DecodeAll(data[len(magic):], nil)
5985
if err != nil {
60-
return nil, fmt.Errorf("compression: zstd decompress failed: %w", err)
86+
return nil, fmt.Errorf("compression: zstd decompress: %w", err)
6187
}
6288

6389
return decompressed, nil

pkg/da/compression_test.go

Lines changed: 50 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func TestCompressDecompress_RoundTrip(t *testing.T) {
3434

3535
for _, tt := range tests {
3636
t.Run(tt.name, func(t *testing.T) {
37-
compressed, err := Compress(tt.data)
37+
compressed, err := Compress(tt.data, LevelDefault)
3838
require.NoError(t, err)
3939

4040
assert.True(t, IsCompressed(compressed), "compressed data should have magic prefix")
@@ -47,12 +47,46 @@ func TestCompressDecompress_RoundTrip(t *testing.T) {
4747
}
4848
}
4949

50+
func TestCompress_AllLevelsRoundTrip(t *testing.T) {
51+
data := bytes.Repeat([]byte("adaptive compression level test "), 5000)
52+
levels := []struct {
53+
name string
54+
level CompressionLevel
55+
}{
56+
{"fastest", LevelFastest},
57+
{"default", LevelDefault},
58+
{"best", LevelBest},
59+
}
60+
61+
var sizes []int
62+
for _, lvl := range levels {
63+
t.Run(lvl.name, func(t *testing.T) {
64+
compressed, err := Compress(data, lvl.level)
65+
require.NoError(t, err)
66+
assert.True(t, IsCompressed(compressed))
67+
68+
sizes = append(sizes, len(compressed))
69+
70+
decompressed, err := Decompress(compressed)
71+
require.NoError(t, err)
72+
assert.Equal(t, data, decompressed)
73+
74+
t.Logf("level=%s compressed=%d ratio=%.4f", lvl.name, len(compressed), float64(len(compressed))/float64(len(data)))
75+
})
76+
}
77+
78+
// Best should produce equal or smaller output than Fastest
79+
if len(sizes) == 3 {
80+
assert.LessOrEqual(t, sizes[2], sizes[0], "LevelBest should compress at least as well as LevelFastest")
81+
}
82+
}
83+
5084
func TestCompress_Empty(t *testing.T) {
51-
compressed, err := Compress(nil)
85+
compressed, err := Compress(nil, LevelDefault)
5286
require.NoError(t, err)
5387
assert.Nil(t, compressed)
5488

55-
compressed, err = Compress([]byte{})
89+
compressed, err = Compress([]byte{}, LevelDefault)
5690
require.NoError(t, err)
5791
assert.Empty(t, compressed)
5892
}
@@ -112,9 +146,8 @@ func TestIsCompressed(t *testing.T) {
112146
}
113147

114148
func TestCompress_AchievesCompression(t *testing.T) {
115-
// Highly compressible data should achieve meaningful compression
116149
data := bytes.Repeat([]byte("rollkit block data with repeated content "), 10000)
117-
compressed, err := Compress(data)
150+
compressed, err := Compress(data, LevelDefault)
118151
require.NoError(t, err)
119152

120153
ratio := float64(len(compressed)) / float64(len(data))
@@ -123,12 +156,11 @@ func TestCompress_AchievesCompression(t *testing.T) {
123156
}
124157

125158
func TestCompress_RandomDataStillWorks(t *testing.T) {
126-
// Random data won't compress well but should still round-trip correctly
127159
data := make([]byte, 4096)
128160
_, err := rand.Read(data)
129161
require.NoError(t, err)
130162

131-
compressed, err := Compress(data)
163+
compressed, err := Compress(data, LevelFastest)
132164
require.NoError(t, err)
133165

134166
decompressed, err := Decompress(compressed)
@@ -137,9 +169,18 @@ func TestCompress_RandomDataStillWorks(t *testing.T) {
137169
}
138170

139171
func TestDecompress_DataStartingWithMagicButUncompressed(t *testing.T) {
140-
// Edge case: data that happens to start with the magic bytes but isn't actually compressed.
141-
// This should fail decompression (invalid zstd frame).
142172
fakeCompressed := append([]byte{0x5A, 0x53, 0x54, 0x44}, bytes.Repeat([]byte{0x00}, 100)...)
143173
_, err := Decompress(fakeCompressed)
144174
assert.Error(t, err, "data starting with magic but containing invalid zstd should error")
145175
}
176+
177+
func TestCompress_InvalidLevel(t *testing.T) {
178+
// Out-of-range level should fall back to LevelDefault
179+
data := []byte("test data for invalid level")
180+
compressed, err := Compress(data, CompressionLevel(99))
181+
require.NoError(t, err)
182+
183+
decompressed, err := Decompress(compressed)
184+
require.NoError(t, err)
185+
assert.Equal(t, data, decompressed)
186+
}

0 commit comments

Comments
 (0)