Skip to content

Commit dd996f0

Browse files
committed
refactor: extend local-da to produce blocks
1 parent d7c1a28 commit dd996f0

7 files changed

Lines changed: 86 additions & 28 deletions

File tree

block/internal/syncing/syncer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -749,7 +749,7 @@ func (s *Syncer) TrySyncNextBlock(ctx context.Context, event *common.DAHeightEve
749749
}
750750

751751
// Verify forced inclusion transactions if configured.
752-
// The checks is actually only performed on DA only enabled nodes.
752+
// The check is actually only performed on DA only enabled nodes.
753753
// P2P nodes aren't actually able to verify forced inclusions txs as DA inclusion happens later (so DA hints are not available) and DA hints cannot be trusted. This is a known limitation described in the ADR.
754754
// In the future we should verify force inclusion txs completely asynchronously, while still waiting for block n-1 execution.
755755
if event.Source == common.SourceDA {

block/public.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,13 @@ func NewDAClient(
6363
return base
6464
}
6565

66-
// ErrForceInclusionNotConfigured is returned when force inclusion is not configured.
67-
// It is exported because sequencers needs to check for this error.
68-
var ErrForceInclusionNotConfigured = da.ErrForceInclusionNotConfigured
66+
// The following errors are exported because sequencers needs to check for this error.
67+
var (
68+
// ErrForceInclusionNotConfigured is returned when force inclusion is not configured.
69+
ErrForceInclusionNotConfigured = da.ErrForceInclusionNotConfigured
70+
// ErrNoBatch is returned when no batch is available.
71+
ErrNoBatch = common.ErrNoBatch
72+
)
6973

7074
// ForcedInclusionEvent represents forced inclusion transactions retrieved from DA
7175
type ForcedInclusionEvent = da.ForcedInclusionEvent

pkg/sequencers/based/sequencer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ func (s *BasedSequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64)
253253
// Cache the transactions for this DA epoch
254254
s.currentBatchTxs = validTxs
255255

256-
return forcedTxsEvent.Timestamp.UTC(), forcedTxsEvent.EndDaHeight, nil
256+
return forcedTxsEvent.Timestamp, forcedTxsEvent.EndDaHeight, nil
257257
}
258258

259259
// VerifyBatch verifies a batch of transactions

test/e2e/evm_force_inclusion_e2e_test.go

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -808,13 +808,7 @@ func TestEvmSequencerCatchUpBasedSequencerE2E(t *testing.T) {
808808

809809
// Hack for local-da for advancing past multiple epochs
810810
t.Log("Advancing DA past multiple epochs...")
811-
for i := 0; i < 5; i++ {
812-
dummyData := [][]byte{[]byte(fmt.Sprintf("dummy-%d", i))}
813-
result := daClient.Submit(ctx, dummyData, -1, daClient.GetDataNamespace(), nil)
814-
require.Equal(t, da.StatusSuccess, result.Code, "Failed to submit dummy data %d to DA: %s", i+1, result.Message)
815-
time.Sleep(100 * time.Millisecond)
816-
}
817-
t.Log("DA advanced past multiple epochs")
811+
time.Sleep(6 * time.Second)
818812

819813
// ===== PHASE 6: Verify Based Sequencer Includes Forced Txs =====
820814
t.Log("Phase 6: Verify Based Sequencer Includes Forced Txs")
@@ -1058,13 +1052,7 @@ func TestEvmBasedSequencerBaselineE2E(t *testing.T) {
10581052
// Advance DA past epoch boundary by submitting dummy data
10591053
// With epoch=2, we need at least 2 DA blocks per epoch
10601054
t.Log("Advancing DA past epoch boundary...")
1061-
for i := 0; i < 5; i++ {
1062-
dummyData := [][]byte{[]byte(fmt.Sprintf("dummy-%d", i))}
1063-
result := daClient.Submit(ctx, dummyData, -1, daClient.GetDataNamespace(), nil)
1064-
require.Equal(t, da.StatusSuccess, result.Code, "Failed to submit dummy data %d to DA: %s", i+1, result.Message)
1065-
time.Sleep(100 * time.Millisecond)
1066-
}
1067-
t.Log("DA advanced past epoch boundary")
1055+
time.Sleep(4 * time.Second)
10681056

10691057
// ===== VERIFY BASED SEQUENCER INCLUDES FORCED TXS =====
10701058
t.Log("Waiting for based sequencer to include forced inclusion txs")

test/e2e/evm_test_common.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -554,7 +554,7 @@ func setupCommonEVMEnv(t testing.TB, sut *SystemUnderTest, client tastoratypes.T
554554
if evmSingleBinaryPath != "evm" {
555555
localDABinary = filepath.Join(filepath.Dir(evmSingleBinaryPath), "local-da")
556556
}
557-
sut.ExecCmd(localDABinary, "-port", dynEndpoints.DAPort)
557+
sut.ExecCmd(localDABinary, "-port", dynEndpoints.DAPort, "-block-time", "1s")
558558
t.Logf("Started local DA on port %s", dynEndpoints.DAPort)
559559

560560
require.NotNil(t, client, "docker client is required")

tools/local-da/local.go

Lines changed: 60 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,13 @@ import (
1919
datypes "github.com/evstack/ev-node/pkg/da/types"
2020
)
2121

22-
// DefaultMaxBlobSize is the default max blob size
23-
const DefaultMaxBlobSize uint64 = 7 * 1024 * 1024 // 7MB
22+
const (
23+
// DefaultMaxBlobSize is the default max blob size
24+
DefaultMaxBlobSize uint64 = 7 * 1024 * 1024 // 7MB
25+
26+
// DefaultBlockTime is the default time between empty blocks
27+
DefaultBlockTime = 1 * time.Second
28+
)
2429

2530
// LocalDA is a simple implementation of in-memory DA. Not production ready! Intended only for testing!
2631
//
@@ -35,6 +40,8 @@ type LocalDA struct {
3540
height uint64
3641
privKey ed25519.PrivateKey
3742
pubKey ed25519.PublicKey
43+
blockTime time.Duration
44+
lastTime time.Time // tracks last timestamp to ensure monotonicity
3845

3946
logger zerolog.Logger
4047
}
@@ -51,6 +58,8 @@ func NewLocalDA(logger zerolog.Logger, opts ...func(*LocalDA) *LocalDA) *LocalDA
5158
timestamps: make(map[uint64]time.Time),
5259
blobData: make(map[uint64][]*blobrpc.Blob),
5360
maxBlobSize: DefaultMaxBlobSize,
61+
blockTime: DefaultBlockTime,
62+
lastTime: time.Now(),
5463
logger: logger,
5564
}
5665
for _, f := range opts {
@@ -194,7 +203,7 @@ func (d *LocalDA) SubmitWithOptions(ctx context.Context, blobs []datypes.Blob, g
194203
defer d.mu.Unlock()
195204
ids := make([]datypes.ID, len(blobs))
196205
d.height += 1
197-
d.timestamps[d.height] = time.Now()
206+
d.timestamps[d.height] = d.monotonicTime()
198207
for i, blob := range blobs {
199208
ids[i] = append(d.nextID(), d.getHash(blob)...)
200209

@@ -224,7 +233,7 @@ func (d *LocalDA) Submit(ctx context.Context, blobs []datypes.Blob, gasPrice flo
224233
defer d.mu.Unlock()
225234
ids := make([]datypes.ID, len(blobs))
226235
d.height += 1
227-
d.timestamps[d.height] = time.Now()
236+
d.timestamps[d.height] = d.monotonicTime()
228237
for i, blob := range blobs {
229238
ids[i] = append(d.nextID(), d.getHash(blob)...)
230239

@@ -274,10 +283,57 @@ func (d *LocalDA) getProof(id, blob []byte) []byte {
274283
return sign
275284
}
276285

286+
// monotonicTime returns a timestamp that is guaranteed to be after the last recorded timestamp.
287+
func (d *LocalDA) monotonicTime() time.Time {
288+
now := time.Now()
289+
if now.After(d.lastTime) {
290+
d.lastTime = now
291+
return now
292+
}
293+
d.lastTime = d.lastTime.Add(1)
294+
return d.lastTime
295+
}
296+
277297
// WithMaxBlobSize returns a function that sets the max blob size of LocalDA
278298
func WithMaxBlobSize(maxBlobSize uint64) func(*LocalDA) *LocalDA {
279299
return func(da *LocalDA) *LocalDA {
280300
da.maxBlobSize = maxBlobSize
281301
return da
282302
}
283303
}
304+
305+
// WithBlockTime returns a function that sets the block time for empty block production
306+
func WithBlockTime(blockTime time.Duration) func(*LocalDA) *LocalDA {
307+
return func(da *LocalDA) *LocalDA {
308+
da.blockTime = blockTime
309+
return da
310+
}
311+
}
312+
313+
// Start begins producing empty blocks at the configured block time interval.
314+
func (d *LocalDA) Start(ctx context.Context) {
315+
go func() {
316+
ticker := time.NewTicker(d.blockTime)
317+
defer ticker.Stop()
318+
319+
for {
320+
select {
321+
case <-ctx.Done():
322+
d.logger.Info().Msg("LocalDA: stopping empty block production")
323+
return
324+
case <-ticker.C:
325+
d.produceEmptyBlock()
326+
}
327+
}
328+
}()
329+
d.logger.Info().Dur("blockTime", d.blockTime).Msg("LocalDA: started empty block production")
330+
}
331+
332+
// produceEmptyBlock creates a new empty block at the next height.
333+
func (d *LocalDA) produceEmptyBlock() {
334+
d.mu.Lock()
335+
defer d.mu.Unlock()
336+
d.height++
337+
d.timestamps[d.height] = d.monotonicTime()
338+
d.logger.Debug().Uint64("height", d.height).Msg("produced empty block")
339+
}

tools/local-da/main.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,13 @@ func main() {
2323
port string
2424
listenAll bool
2525
maxBlobSize uint64
26+
blockTime time.Duration
2627
)
2728
flag.StringVar(&port, "port", defaultPort, "listening port")
2829
flag.StringVar(&host, "host", defaultHost, "listening address")
2930
flag.BoolVar(&listenAll, "listen-all", false, "listen on all network interfaces (0.0.0.0) instead of just localhost")
3031
flag.Uint64Var(&maxBlobSize, "max-blob-size", DefaultMaxBlobSize, "maximum blob size in bytes")
32+
flag.DurationVar(&blockTime, "block-time", DefaultBlockTime, "time between empty blocks (e.g., 1s, 500ms)")
3133
flag.Parse()
3234

3335
if listenAll {
@@ -43,25 +45,33 @@ func main() {
4345
if maxBlobSize != DefaultMaxBlobSize {
4446
opts = append(opts, WithMaxBlobSize(maxBlobSize))
4547
}
48+
if blockTime != DefaultBlockTime {
49+
opts = append(opts, WithBlockTime(blockTime))
50+
}
4651
da := NewLocalDA(logger, opts...)
4752

53+
ctx, cancel := context.WithCancel(context.Background())
54+
defer cancel()
55+
56+
da.Start(ctx)
57+
4858
addr := fmt.Sprintf("%s:%s", host, port)
4959
srv, err := startBlobServer(logger, addr, da)
5060
if err != nil {
5161
logger.Error().Err(err).Msg("error while creating blob RPC server")
5262
os.Exit(1)
5363
}
5464

55-
logger.Info().Str("host", host).Str("port", port).Uint64("maxBlobSize", maxBlobSize).Msg("Listening on")
65+
logger.Info().Str("host", host).Str("port", port).Uint64("maxBlobSize", maxBlobSize).Dur("blockTime", blockTime).Msg("Listening on")
5666

5767
interrupt := make(chan os.Signal, 1)
5868
signal.Notify(interrupt, os.Interrupt, syscall.SIGINT)
5969
<-interrupt
6070
fmt.Println("\nCtrl+C pressed. Exiting...")
6171

62-
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
63-
defer cancel()
64-
if err := srv.Shutdown(ctx); err != nil {
72+
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
73+
defer shutdownCancel()
74+
if err := srv.Shutdown(shutdownCtx); err != nil {
6575
logger.Error().Err(err).Msg("error shutting down server")
6676
}
6777
os.Exit(0)

0 commit comments

Comments
 (0)