|
9 | 9 | "time" |
10 | 10 |
|
11 | 11 | "github.com/celestiaorg/go-square/v3/share" |
| 12 | + "github.com/evstack/ev-node/pkg/da" |
12 | 13 | "github.com/rs/zerolog" |
13 | 14 |
|
14 | 15 | "github.com/evstack/ev-node/block/internal/common" |
@@ -89,15 +90,31 @@ func (c *client) Submit(ctx context.Context, data [][]byte, _ float64, namespace |
89 | 90 |
|
90 | 91 | blobs := make([]*blobrpc.Blob, len(data)) |
91 | 92 | for i, raw := range data { |
92 | | - if uint64(len(raw)) > common.DefaultMaxBlobSize { |
| 93 | + // Compress blob data before submission to reduce bandwidth and storage costs |
| 94 | + compressed, compErr := da.Compress(raw) |
| 95 | + if compErr != nil { |
| 96 | + return datypes.ResultSubmit{ |
| 97 | + BaseResult: datypes.BaseResult{ |
| 98 | + Code: datypes.StatusError, |
| 99 | + Message: fmt.Sprintf("compress blob %d: %v", i, compErr), |
| 100 | + }, |
| 101 | + } |
| 102 | + } |
| 103 | + c.logger.Debug(). |
| 104 | + Int("original_size", len(raw)). |
| 105 | + Int("compressed_size", len(compressed)). |
| 106 | + Float64("ratio", float64(len(compressed))/float64(len(raw))). |
| 107 | + Msg("compressed blob for DA submission") |
| 108 | + |
| 109 | + if uint64(len(compressed)) > common.DefaultMaxBlobSize { |
93 | 110 | return datypes.ResultSubmit{ |
94 | 111 | BaseResult: datypes.BaseResult{ |
95 | 112 | Code: datypes.StatusTooBig, |
96 | 113 | Message: datypes.ErrBlobSizeOverLimit.Error(), |
97 | 114 | }, |
98 | 115 | } |
99 | 116 | } |
100 | | - blobs[i], err = blobrpc.NewBlobV0(ns, raw) |
| 117 | + blobs[i], err = blobrpc.NewBlobV0(ns, compressed) |
101 | 118 | if err != nil { |
102 | 119 | return datypes.ResultSubmit{ |
103 | 120 | BaseResult: datypes.BaseResult{ |
@@ -278,12 +295,22 @@ func (c *client) Retrieve(ctx context.Context, height uint64, namespace []byte) |
278 | 295 | } |
279 | 296 | } |
280 | 297 |
|
281 | | - // Extract IDs and data from the blobs. |
| 298 | + // Extract IDs and data from the blobs, decompressing if needed. |
282 | 299 | ids := make([]datypes.ID, len(blobs)) |
283 | 300 | data := make([]datypes.Blob, len(blobs)) |
284 | 301 | for i, b := range blobs { |
285 | 302 | ids[i] = blobrpc.MakeID(height, b.Commitment) |
286 | | - data[i] = b.Data() |
| 303 | + decompressed, decompErr := da.Decompress(b.Data()) |
| 304 | + if decompErr != nil { |
| 305 | + return datypes.ResultRetrieve{ |
| 306 | + BaseResult: datypes.BaseResult{ |
| 307 | + Code: datypes.StatusError, |
| 308 | + Message: fmt.Sprintf("decompress blob %d at height %d: %v", i, height, decompErr), |
| 309 | + Height: height, |
| 310 | + }, |
| 311 | + } |
| 312 | + } |
| 313 | + data[i] = decompressed |
287 | 314 | } |
288 | 315 |
|
289 | 316 | c.logger.Debug().Int("num_blobs", len(blobs)).Msg("retrieved blobs") |
@@ -361,7 +388,11 @@ func (c *client) Get(ctx context.Context, ids []datypes.ID, namespace []byte) ([ |
361 | 388 | if b == nil { |
362 | 389 | continue |
363 | 390 | } |
364 | | - res = append(res, b.Data()) |
| 391 | + decompressed, decompErr := da.Decompress(b.Data()) |
| 392 | + if decompErr != nil { |
| 393 | + return nil, fmt.Errorf("decompress blob: %w", decompErr) |
| 394 | + } |
| 395 | + res = append(res, decompressed) |
365 | 396 | } |
366 | 397 |
|
367 | 398 | return res, nil |
|
0 commit comments