diff --git a/CHANGELOG.md b/CHANGELOG.md index 912343ec0..cd4ab7177 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,39 +9,43 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +* Added publisher-mode synchronization option for failover scenarios with early P2P infrastructure readiness [#3222](https://github.com/evstack/ev-node/pull/3222) + ## v1.1.0-rc.1 ### Added -- Add AWS & GCP KMS signer backend [#3171](https://github.com/evstack/ev-node/pull/3171) -- Subscribe to forced inclusion namespace events [#3146](https://github.com/evstack/ev-node/pull/3146) -- Display block source in sync log [#3193](https://github.com/evstack/ev-node/pull/3193) +* Add AWS & GCP KMS signer backend [#3171](https://github.com/evstack/ev-node/pull/3171) +* Subscribe to forced inclusion namespace events [#3146](https://github.com/evstack/ev-node/pull/3146) +* Display block source in sync log [#3193](https://github.com/evstack/ev-node/pull/3193) ### Fixed -- Avoid evicting yet to be processed heights [#3204](https://github.com/evstack/ev-node/pull/3204) -- Bound Badger index cache memory to prevent growth with chain length [3209](https://github.com/evstack/ev-node/pull/3209) -- Refetch latest da height instead of da height +1 when P2P is offline [#3201](https://github.com/evstack/ev-node/pull/3201) -- Fix race on startup sync. [#3162](https://github.com/evstack/ev-node/pull/3162) -- Strict raft state. [#3167](https://github.com/evstack/ev-node/pull/3167) -- Retry fetching the timestamp on error in da-client [#3166](https://github.com/evstack/ev-node/pull/3166) +* Avoid evicting yet to be processed heights [#3204](https://github.com/evstack/ev-node/pull/3204) +* Bound Badger index cache memory to prevent growth with chain length [3209](https://github.com/evstack/ev-node/pull/3209) +* Refetch latest da height instead of da height +1 when P2P is offline [#3201](https://github.com/evstack/ev-node/pull/3201) +* Fix race on startup sync. [#3162](https://github.com/evstack/ev-node/pull/3162) +* Strict raft state. [#3167](https://github.com/evstack/ev-node/pull/3167) +* Retry fetching the timestamp on error in da-client [#3166](https://github.com/evstack/ev-node/pull/3166) ## v1.0.0 ### Fixed -- Persist cache snapshot only once at shutdown to avoid Badger vlog +* Persist cache snapshot only once at shutdown to avoid Badger vlog increase. [#3153](https://github.com/evstack/ev-node/pull/3153) ## v1.0.0-rc.5 ### Added -- Add disaster recovery for sequencer - - Catch up possible DA-only blocks when restarting. [#3057](https://github.com/evstack/ev-node/pull/3057) - - Verify DA and P2P state on restart (prevent double-signing). [#3061](https://github.com/evstack/ev-node/pull/3061) -- Node pruning support. [#2984](https://github.com/evstack/ev-node/pull/2984) - - Two different sort of pruning implemented: +* Add disaster recovery for sequencer + * Catch up possible DA-only blocks when restarting. [#3057](https://github.com/evstack/ev-node/pull/3057) + * Verify DA and P2P state on restart (prevent double-signing). [#3061](https://github.com/evstack/ev-node/pull/3061) +* Node pruning support. [#2984](https://github.com/evstack/ev-node/pull/2984) + * Two different sort of pruning implemented: _Classic pruning_ (`all`): prunes given `HEAD-n` blocks from the databases, including store metadatas. _Auto Storage Optimization_ (`metadata`): prunes only the state metadatas, keeps all blocks. By using one or the other, you are losing the ability to rollback or replay transactions earlier than `HEAD-n`. @@ -49,56 +53,56 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed -- Fix block timer to account for execution time. Previously, the block timer reset to the full `block_time` duration +* Fix block timer to account for execution time. Previously, the block timer reset to the full `block_time` duration after `ProduceBlock` completed, making the effective interval `block_time + execution_time`. Now the timer subtracts elapsed execution time so blocks are produced at the configured cadence. ### Changes -- Store pending blocks separately from executed blocks key. [#3073](https://github.com/evstack/ev-node/pull/3073) -- Fixes issues with force inclusion verification on sync nodes. [#3057](https://github.com/evstack/ev-node/pull/3057) -- Add flag to `local-da` to produce empty DA blocks (closer to the real +* Store pending blocks separately from executed blocks key. [#3073](https://github.com/evstack/ev-node/pull/3073) +* Fixes issues with force inclusion verification on sync nodes. [#3057](https://github.com/evstack/ev-node/pull/3057) +* Add flag to `local-da` to produce empty DA blocks (closer to the real system). [#3057](https://github.com/evstack/ev-node/pull/3057) -- Validate P2P DA height hints against the latest known DA height to prevent malicious peers from triggering runaway +* Validate P2P DA height hints against the latest known DA height to prevent malicious peers from triggering runaway catchup. [#3128](https://github.com/evstack/ev-node/pull/3128) -- Replace syncer DA polling system by DA subscription via +* Replace syncer DA polling system by DA subscription via websockets. [#3131](https://github.com/evstack/ev-node/pull/3131) ## v1.0.0-rc.4 ### Changes -- Skip draining when exec client unavailable. [#3060](https://github.com/evstack/ev-node/pull/3060) +* Skip draining when exec client unavailable. [#3060](https://github.com/evstack/ev-node/pull/3060) ## v1.0.0-rc.3 ### Added -- Add DA Hints for P2P transactions. This allows a catching up node to be on sync with both DA and +* Add DA Hints for P2P transactions. This allows a catching up node to be on sync with both DA and P2P. ([#2891](https://github.com/evstack/ev-node/pull/2891)) ### Changes -- Improve `cache.NumPendingData` to not return empty data. Automatically bumps `LastSubmittedHeight` to reflect +* Improve `cache.NumPendingData` to not return empty data. Automatically bumps `LastSubmittedHeight` to reflect that. ([#3046](https://github.com/evstack/ev-node/pull/3046)) -- **BREAKING** Make pending events cache and tx cache fully ephemeral. Those will be re-fetched on restart. DA Inclusion +* **BREAKING** Make pending events cache and tx cache fully ephemeral. Those will be re-fetched on restart. DA Inclusion cache persists until cleared up after DA inclusion has been processed. Persist accross restart using store metadata. ([#3047](https://github.com/evstack/ev-node/pull/3047)) -- Replace LRU cache by standard mem cache with manual eviction in `store_adapter`. When P2P blocks were fetched too +* Replace LRU cache by standard mem cache with manual eviction in `store_adapter`. When P2P blocks were fetched too fast, they would be evicted before being executed [#3051](https://github.com/evstack/ev-node/pull/3051) -- Fix replay logic leading to app hashes by verifying against the wrong +* Fix replay logic leading to app hashes by verifying against the wrong block [#3053](https://github.com/evstack/ev-node/pull/3053). ## v1.0.0-rc.2 ### Changes -- Improve cache handling when there is a significant backlog of pending headers and +* Improve cache handling when there is a significant backlog of pending headers and data. ([#3030](https://github.com/evstack/ev-node/pull/3030)) -- Decrease MaxBytesSize to `5MB` to increase compatibility with public +* Decrease MaxBytesSize to `5MB` to increase compatibility with public nodes. ([#3030](https://github.com/evstack/ev-node/pull/3030)) -- Proper counting of `DASubmitterPendingBlobs` metrics. [#3038](https://github.com/evstack/ev-node/pull/3038) -- Replace `go-header` store by `ev-node` store. This avoid duplication of all blocks in `go-header` and `ev-node` store. +* Proper counting of `DASubmitterPendingBlobs` metrics. [#3038](https://github.com/evstack/ev-node/pull/3038) +* Replace `go-header` store by `ev-node` store. This avoid duplication of all blocks in `go-header` and `ev-node` store. Thanks to the cached store from #3030, this should improve p2p performance as well. [#3036](https://github.com/evstack/ev-node/pull/3036) @@ -106,45 +110,45 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added -- Added OpenTelemetry tracing support with OTLP export for distributed tracing across ev-node components including block +* Added OpenTelemetry tracing support with OTLP export for distributed tracing across ev-node components including block production, syncing, DA submission/retrieval, sequencer, store operations, and RPC layer. Configurable via `instrumentation.tracing`, `instrumentation.tracing_endpoint`, `instrumentation.tracing_service_name`, and `instrumentation.tracing_sample_rate` settings. ([#2956](https://github.com/evstack/ev-node/issues/2956)) -- **BREAKING:** Implement forced inclusion and batch sequencing ([#2797](https://github.com/evstack/ev-node/pull/2797)) +* **BREAKING:** Implement forced inclusion and batch sequencing ([#2797](https://github.com/evstack/ev-node/pull/2797)) **This change requires adding a `da_epoch_forced_inclusion` field to the node's `genesis.json` file.** The recommended value is `100`. Full support for this feature will be available in a future release. -- Added `post-tx` command and force inclusion server to submit transactions directly to the DA +* Added `post-tx` command and force inclusion server to submit transactions directly to the DA layer. ([#2888](https://github.com/evstack/ev-node/pull/2888)) Additionally, modified the core package to support marking transactions as forced included transactions. The execution client ought to perform basic validation on those transactions as they have skipped the execution client's mempool. -- Added batching strategies (default stay time-based, unchanged from previous betas). Currently available strategies are +* Added batching strategies (default stay time-based, unchanged from previous betas). Currently available strategies are `time`, `size`, `immediate` and `adaptive`. [Full documentation can be found here](https://github.com/evstack/ev-node/blob/122486de98d09ecd37d792b88814dcf07238f28a/docs/learn/config.md?plain=1#L521-L597). -- Added `FilterTxs` method to the execution interface. This method is meant to filter txs by size and if the execution +* Added `FilterTxs` method to the execution interface. This method is meant to filter txs by size and if the execution clients allows it, by gas. This is useful for force included transactions, as those aren't filtered by the sequencer's mempool. -- Added `GetExecutionInfo` method to the execution interface. This method returns some execution information, such as +* Added `GetExecutionInfo` method to the execution interface. This method returns some execution information, such as the maximum gas per block. ### Changed -- **BREAKING:** Renamed `evm-single` to `evm` and `grpc-single` to `evgrpc` for +* **BREAKING:** Renamed `evm-single` to `evm` and `grpc-single` to `evgrpc` for clarity. [#2839](https://github.com/evstack/ev-node/pull/2839). You may need to manually modify your evnode.yaml `signer.signer_path` if your $HOME folder is changed. -- Split cache interface into `CacheManager` and `PendingManager` and created `da` client to easy DA +* Split cache interface into `CacheManager` and `PendingManager` and created `da` client to easy DA handling. [#2878](https://github.com/evstack/ev-node/pull/2878) -- Improved startup DA retrieval height when cache is cleared or +* Improved startup DA retrieval height when cache is cleared or empty. [#2880](https://github.com/evstack/ev-node/pull/2880) ### Removed -- **BREAKING:** Removed unused and confusing metrics from sequencers and block processing, including sequencer-specific +* **BREAKING:** Removed unused and confusing metrics from sequencers and block processing, including sequencer-specific metrics (gas price, blob size, transaction status, pending blocks), channel buffer metrics, overly granular error metrics, block production categorization metrics, and sync lag metrics. Essential metrics for DA submission health, block production, and performance monitoring are retained. [#2904](https://github.com/evstack/ev-node/pull/2904) -- **BREAKING**: Removed `core/da` package and replaced DAClient with internal implementation. The DA client is exposed +* **BREAKING**: Removed `core/da` package and replaced DAClient with internal implementation. The DA client is exposed as `block.FullDAClient`, `block.DAClient`, `block.DAVerifier` without leaking implementation details. [#2910](https://github.com/evstack/ev-node/pull/2910) @@ -152,53 +156,53 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Improvements -- Loosen syncer validation for allowing swapping sequencer and full node +* Loosen syncer validation for allowing swapping sequencer and full node state [#2925](https://github.com/evstack/ev-node/pull/2925) ## v1.0.0-beta.10 ### Added -- Enhanced health check system with separate liveness (`/health/live`) and readiness (`/health/ready`) HTTP endpoints. +* Enhanced health check system with separate liveness (`/health/live`) and readiness (`/health/ready`) HTTP endpoints. Readiness endpoint includes P2P listening check and aggregator block production rate validation (5x block time threshold). ([#2800](https://github.com/evstack/ev-node/pull/2800)) -- Added `GetP2PStoreInfo` RPC method to retrieve head/tail metadata for go-header stores used by P2P +* Added `GetP2PStoreInfo` RPC method to retrieve head/tail metadata for go-header stores used by P2P sync ([#2835](https://github.com/evstack/ev-node/pull/2835)) -- Added protobuf definitions for `P2PStoreEntry` and `P2PStoreSnapshot` messages to support P2P store inspection +* Added protobuf definitions for `P2PStoreEntry` and `P2PStoreSnapshot` messages to support P2P store inspection ### Changed -- Improved EVM execution client payload status validation with proper retry logic for SYNCING states in `InitChain`, +* Improved EVM execution client payload status validation with proper retry logic for SYNCING states in `InitChain`, `ExecuteTxs`, and `SetFinal` methods. The implementation now follows Engine API specification by retrying SYNCING/ACCEPTED status with exponential backoff and failing immediately on INVALID status, preventing unnecessary node shutdowns during transient execution engine sync operations. ([#2863](https://github.com/evstack/ev-node/pull/2863)) -- Remove GasPrice and GasMultiplier from DA interface and configuration to use celestia-node's native fee +* Remove GasPrice and GasMultiplier from DA interface and configuration to use celestia-node's native fee estimation. ([#2822](https://github.com/evstack/ev-node/pull/2822)) -- Use cache instead of in memory store for reaper. Persist cache on reload. Autoclean after 24 +* Use cache instead of in memory store for reaper. Persist cache on reload. Autoclean after 24 hours. ([#2811](https://github.com/evstack/ev-node/pull/2811)) -- Improved P2P sync service store initialization to be atomic and prevent race +* Improved P2P sync service store initialization to be atomic and prevent race conditions ([#2838](https://github.com/evstack/ev-node/pull/2838)) -- Enhanced P2P bootstrap behavior to intelligently detect starting height from local store instead of requiring trusted +* Enhanced P2P bootstrap behavior to intelligently detect starting height from local store instead of requiring trusted hash -- Relaxed execution layer height validation in block replay to allow execution to be ahead of target height, enabling +* Relaxed execution layer height validation in block replay to allow execution to be ahead of target height, enabling recovery from manual intervention scenarios ### Removed -- **BREAKING:** Removed `evnode.v1.HealthService` gRPC endpoint. Use HTTP endpoints: `GET /health/live` and +* **BREAKING:** Removed `evnode.v1.HealthService` gRPC endpoint. Use HTTP endpoints: `GET /health/live` and `GET /health/ready`. ([#2800](https://github.com/evstack/ev-node/pull/2800)) -- **BREAKING:** Removed `TrustedHash` configuration option and `--evnode.node.trusted_hash` flag. Sync service now +* **BREAKING:** Removed `TrustedHash` configuration option and `--evnode.node.trusted_hash` flag. Sync service now automatically determines starting height from local store state ([#2838](https://github.com/evstack/ev-node/pull/2838)) -- **BREAKING:** Removed unused and confusing metrics from sequencers and block processing, including sequencer-specific +* **BREAKING:** Removed unused and confusing metrics from sequencers and block processing, including sequencer-specific metrics (gas price, blob size, transaction status, pending blocks), channel buffer metrics, overly granular error metrics, block production categorization metrics, and sync lag metrics. Essential metrics for DA submission health, block production, and performance monitoring are retained. [#2904](https://github.com/evstack/ev-node/pull/2904) ### Fixed -- Fixed sync service initialization issue when node is not on genesis but has an empty store +* Fixed sync service initialization issue when node is not on genesis but has an empty store ## v1.0.0-beta.9 @@ -206,34 +210,34 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 -- Added automated upgrade test for the `evm` app that verifies compatibility when moving from v1.0.0-beta.8 to HEAD in +* Added automated upgrade test for the `evm` app that verifies compatibility when moving from v1.0.0-beta.8 to HEAD in CI ([#2780](https://github.com/evstack/ev-node/pull/2780)) -- Added execution-layer replay mechanism so nodes can resynchronize by replaying missed batches against the +* Added execution-layer replay mechanism so nodes can resynchronize by replaying missed batches against the executor ([#2771](https://github.com/evstack/ev-node/pull/2771)) -- Added cache-pruning logic that evicts entries once heights are finalized to keep node memory usage +* Added cache-pruning logic that evicts entries once heights are finalized to keep node memory usage bounded ([#2761](https://github.com/evstack/ev-node/pull/2761)) -- Added Prometheus gauges and counters that surface DA submission failures, pending blobs, and resend attempts for +* Added Prometheus gauges and counters that surface DA submission failures, pending blobs, and resend attempts for easier operational monitoring ([#2756](https://github.com/evstack/ev-node/pull/2756)) -- Added gRPC execution client implementation for remote execution services using Connect-RPC +* Added gRPC execution client implementation for remote execution services using Connect-RPC protocol ([#2490](https://github.com/evstack/ev-node/pull/2490)) -- Added `ExecutorService` protobuf definition with InitChain, GetTxs, ExecuteTxs, and SetFinal +* Added `ExecutorService` protobuf definition with InitChain, GetTxs, ExecuteTxs, and SetFinal RPCs ([#2490](https://github.com/evstack/ev-node/pull/2490)) -- Added new `grpc` app for running EVNode with a remote execution layer via +* Added new `grpc` app for running EVNode with a remote execution layer via gRPC ([#2490](https://github.com/evstack/ev-node/pull/2490)) ### Changed -- Hardened signer CLI and block pipeline per security audit: passphrases must be provided via +* Hardened signer CLI and block pipeline per security audit: passphrases must be provided via `--evnode.signer.passphrase_file`, JWT secrets must be provided via `--evm.jwt-secret-file`, data/header validation enforces metadata and timestamp checks, and the reaper backs off on failures ( BREAKING) ([#2764](https://github.com/evstack/ev-node/pull/2764)) -- Added retries around executor `ExecuteTxs` calls to better tolerate transient execution +* Added retries around executor `ExecuteTxs` calls to better tolerate transient execution errors ([#2784](https://github.com/evstack/ev-node/pull/2784)) -- Increased default `ReadinessMaxBlocksBehind` from 3 to 30 blocks so `/health/ready` stays true during normal batch +* Increased default `ReadinessMaxBlocksBehind` from 3 to 30 blocks so `/health/ready` stays true during normal batch sync ([#2779](https://github.com/evstack/ev-node/pull/2779)) -- Updated EVM execution client to use new `txpoolExt_getTxs` RPC API for retrieving pending transactions as RLP-encoded +* Updated EVM execution client to use new `txpoolExt_getTxs` RPC API for retrieving pending transactions as RLP-encoded bytes ### Deprecated @@ -244,7 +248,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 -- Removed `LastCommitHash`, `ConsensusHash`, and `LastResultsHash` from the canonical header representation in favor of +* Removed `LastCommitHash`, `ConsensusHash`, and `LastResultsHash` from the canonical header representation in favor of slim headers (BREAKING; legacy hashes now live under `Header.Legacy`) ([#2766](https://github.com/evstack/ev-node/pull/2766)) @@ -326,6 +330,6 @@ Pre-release versions: 0.x.y (anything may change) -- +* [Unreleased]: https://github.com/evstack/ev-node/compare/v1.0.0-beta.1...HEAD diff --git a/node/failover.go b/node/failover.go index c60e27a5f..613c9d37f 100644 --- a/node/failover.go +++ b/node/failover.go @@ -33,6 +33,8 @@ type failoverState struct { dataSyncService *evsync.DataSyncService rpcServer *http.Server bc *block.Components + raftNode *raft.Node + isAggregator bool // catchup fields — used when the aggregator needs to sync before producing catchupEnabled bool @@ -172,6 +174,8 @@ func setupFailoverState( dataSyncService: dataSyncService, rpcServer: rpcServer, bc: bc, + raftNode: raftNode, + isAggregator: isAggregator, store: rktStore, catchupEnabled: catchupEnabled, catchupTimeout: nodeConfig.Node.CatchupTimeout.Duration, @@ -179,6 +183,25 @@ func setupFailoverState( }, nil } +func (f *failoverState) shouldStartSyncInPublisherMode(ctx context.Context) bool { + if !f.isAggregator || f.raftNode == nil || !f.raftNode.IsLeader() { + return false + } + + height, err := f.store.Height(ctx) + if err != nil { + f.logger.Warn().Err(err).Msg("cannot determine local height; keeping blocking sync startup") + return false + } + if height > 0 { + return false + } + + f.logger.Info(). + Msg("raft leader with empty store: starting sync services in publisher mode") + return true +} + func (f *failoverState) Run(pCtx context.Context) (multiErr error) { stopService := func(stoppable func(context.Context) error, name string) { //nolint:contextcheck // shutdown uses context.Background intentionally // parent context is cancelled already, so we need to create a new one @@ -207,15 +230,28 @@ func (f *failoverState) Run(pCtx context.Context) (multiErr error) { }) // start header and data sync services concurrently to avoid cumulative startup delay. + startSyncInPublisherMode := f.shouldStartSyncInPublisherMode(ctx) syncWg, syncCtx := errgroup.WithContext(ctx) syncWg.Go(func() error { - if err := f.headerSyncService.Start(syncCtx); err != nil { + var err error + if startSyncInPublisherMode { + err = f.headerSyncService.StartForPublishing(syncCtx) + } else { + err = f.headerSyncService.Start(syncCtx) + } + if err != nil { return fmt.Errorf("header sync service: %w", err) } return nil }) syncWg.Go(func() error { - if err := f.dataSyncService.Start(syncCtx); err != nil { + var err error + if startSyncInPublisherMode { + err = f.dataSyncService.StartForPublishing(syncCtx) + } else { + err = f.dataSyncService.Start(syncCtx) + } + if err != nil { return fmt.Errorf("data sync service: %w", err) } return nil diff --git a/pkg/sync/sync_service.go b/pkg/sync/sync_service.go index b65b855a4..947a74c70 100644 --- a/pkg/sync/sync_service.go +++ b/pkg/sync/sync_service.go @@ -143,12 +143,9 @@ func (syncService *SyncService[H]) WriteToStoreAndBroadcast(ctx context.Context, } } - firstStart := false - if !syncService.syncerStatus.started.Load() { - firstStart = true - if err := syncService.startSyncer(ctx); err != nil { - return fmt.Errorf("failed to start syncer after initializing the store: %w", err) - } + firstStart, err := syncService.startSyncer(ctx) + if err != nil { + return fmt.Errorf("failed to start syncer after initializing the store: %w", err) } // Broadcast for subscribers @@ -190,20 +187,9 @@ func (s *SyncService[H]) AppendDAHint(ctx context.Context, daHeight uint64, heig // Start is a part of Service interface. func (syncService *SyncService[H]) Start(ctx context.Context) error { - // setup P2P infrastructure, but don't start Subscriber yet. - peerIDs, err := syncService.setupP2PInfrastructure(ctx) + peerIDs, err := syncService.prepareStart(ctx) if err != nil { - return fmt.Errorf("failed to setup syncer P2P infrastructure: %w", err) - } - - // create syncer, must be before initFromP2PWithRetry which calls startSyncer. - if syncService.syncer, err = newSyncer( - syncService.ex, - syncService.store, - syncService.sub, - []goheadersync.Option{goheadersync.WithBlockTime(syncService.conf.Node.BlockTime.Duration)}, - ); err != nil { - return fmt.Errorf("failed to create syncer: %w", err) + return err } // initialize stores from P2P (blocking until genesis is fetched for followers) @@ -223,20 +209,61 @@ func (syncService *SyncService[H]) Start(ctx context.Context) error { return nil } -// startSyncer starts the SyncService's syncer -func (syncService *SyncService[H]) startSyncer(ctx context.Context) error { - if syncService.syncerStatus.isStarted() { - return nil +// StartForPublishing starts the sync service in publisher mode. +// +// This mode is used by a raft leader with an empty local store: no peer can serve +// height 1 yet, so waiting for initFromP2PWithRetry would deadlock block production. +// We still need the P2P exchange server and pubsub subscriber to be ready before the +// first block is produced, because WriteToStoreAndBroadcast relies on them to gossip +// the block that bootstraps the network. +func (syncService *SyncService[H]) StartForPublishing(ctx context.Context) error { + if _, err := syncService.prepareStart(ctx); err != nil { + return err } - if err := syncService.syncer.Start(ctx); err != nil { - return fmt.Errorf("failed to start syncer: %w", err) + if err := syncService.startSubscriber(ctx); err != nil { + return fmt.Errorf("failed to start subscriber: %w", err) } - syncService.syncerStatus.started.Store(true) return nil } +func (syncService *SyncService[H]) prepareStart(ctx context.Context) ([]peer.ID, error) { + // setup P2P infrastructure, but don't start Subscriber yet. + peerIDs, err := syncService.setupP2PInfrastructure(ctx) + if err != nil { + return nil, fmt.Errorf("failed to setup syncer P2P infrastructure: %w", err) + } + + // create syncer, must be before initFromP2PWithRetry which calls startSyncer. + if syncService.syncer, err = newSyncer( + syncService.ex, + syncService.store, + syncService.sub, + []goheadersync.Option{goheadersync.WithBlockTime(syncService.conf.Node.BlockTime.Duration)}, + ); err != nil { + return nil, fmt.Errorf("failed to create syncer: %w", err) + } + + return peerIDs, nil +} + +// startSyncer starts the SyncService's syncer. +// It returns true when this call performed the actual start. +func (syncService *SyncService[H]) startSyncer(ctx context.Context) (bool, error) { + startedNow, err := syncService.syncerStatus.startOnce(func() error { + if err := syncService.syncer.Start(ctx); err != nil { + return fmt.Errorf("failed to start syncer: %w", err) + } + return nil + }) + if err != nil { + return false, err + } + + return startedNow, nil +} + // initStore initializes the store with the given initial header. // it is a no-op if the store is already initialized. // Returns true when the store was initialized by this call. @@ -371,7 +398,7 @@ func (syncService *SyncService[H]) initFromP2PWithRetry(ctx context.Context, pee return false, fmt.Errorf("failed to initialize the store: %w", err) } } - if err := syncService.startSyncer(ctx); err != nil { + if _, err := syncService.startSyncer(ctx); err != nil { return false, err } return true, nil @@ -386,6 +413,8 @@ func (syncService *SyncService[H]) initFromP2PWithRetry(ctx context.Context, pee p2pInitTimeout := 30 * time.Second timeoutTimer := time.NewTimer(p2pInitTimeout) defer timeoutTimer.Stop() + retryTimer := time.NewTimer(backoff) + defer retryTimer.Stop() for { ok, err := tryInit(ctx) @@ -403,13 +432,13 @@ func (syncService *SyncService[H]) initFromP2PWithRetry(ctx context.Context, pee Dur("timeout", p2pInitTimeout). Msg("P2P header sync initialization timed out, deferring to DA sync") return nil - case <-time.After(backoff): + case <-retryTimer.C: } - backoff *= 2 if backoff > maxBackoff { backoff = maxBackoff } + retryTimer.Reset(backoff) } } @@ -424,9 +453,9 @@ func (syncService *SyncService[H]) Stop(ctx context.Context) error { syncService.ex.Stop(ctx), syncService.sub.Stop(ctx), ) - if syncService.syncerStatus.isStarted() { - err = errors.Join(err, syncService.syncer.Stop(ctx)) - } + err = errors.Join(err, syncService.syncerStatus.stopIfStarted(func() error { + return syncService.syncer.Stop(ctx) + })) // Stop the store adapter if it has a Stop method if stopper, ok := syncService.store.(interface{ Stop(context.Context) error }); ok { err = errors.Join(err, stopper.Stop(ctx)) diff --git a/pkg/sync/sync_service_test.go b/pkg/sync/sync_service_test.go index e8b1527f7..bf096cb6d 100644 --- a/pkg/sync/sync_service_test.go +++ b/pkg/sync/sync_service_test.go @@ -12,6 +12,7 @@ import ( "github.com/ipfs/go-datastore/sync" "github.com/libp2p/go-libp2p/core/crypto" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" + "github.com/multiformats/go-multiaddr" "github.com/rs/zerolog" "github.com/stretchr/testify/require" @@ -25,6 +26,76 @@ import ( "github.com/evstack/ev-node/types" ) +func TestHeaderSyncServiceStartForPublishingWithPeers(t *testing.T) { + mainKV := sync.MutexWrap(datastore.NewMapDatastore()) + pk, _, err := crypto.GenerateEd25519Key(cryptoRand.Reader) + require.NoError(t, err) + noopSigner, err := noop.NewNoopSigner(pk) + require.NoError(t, err) + rnd := rand.New(rand.NewSource(1)) // nolint:gosec // test code only + mn := mocknet.New() + + chainID := "test-chain-id" + genesisDoc := genesispkg.Genesis{ + ChainID: chainID, + StartTime: time.Now(), + InitialHeight: 1, + ProposerAddress: []byte("test"), + } + + conf := config.DefaultConfig() + conf.RootDir = t.TempDir() + logger := zerolog.Nop() + + nodeKey1, err := key.LoadOrGenNodeKey(filepath.Join(conf.RootDir, "node1_key.json")) + require.NoError(t, err) + host1, err := mn.AddPeer(nodeKey1.PrivKey, multiaddr.StringCast("/ip4/127.0.0.1/tcp/0")) + require.NoError(t, err) + + nodeKey2, err := key.LoadOrGenNodeKey(filepath.Join(conf.RootDir, "node2_key.json")) + require.NoError(t, err) + host2, err := mn.AddPeer(nodeKey2.PrivKey, multiaddr.StringCast("/ip4/127.0.0.1/tcp/0")) + require.NoError(t, err) + + require.NoError(t, mn.LinkAll()) + require.NoError(t, mn.ConnectAllButSelf()) + + client1, err := p2p.NewClientWithHost(conf.P2P, nodeKey1.PrivKey, mainKV, chainID, logger, p2p.NopMetrics(), host1) + require.NoError(t, err) + client2, err := p2p.NewClientWithHost(conf.P2P, nodeKey2.PrivKey, mainKV, chainID, logger, p2p.NopMetrics(), host2) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + require.NoError(t, client1.Start(ctx)) + require.NoError(t, client2.Start(ctx)) + t.Cleanup(func() { _ = client1.Close() }) + t.Cleanup(func() { _ = client2.Close() }) + + require.Eventually(t, func() bool { + return len(client1.PeerIDs()) > 0 + }, time.Second, 10*time.Millisecond) + + evStore := store.New(mainKV) + svc, err := NewHeaderSyncService(evStore, conf, genesisDoc, client1, logger) + require.NoError(t, err) + require.NoError(t, svc.StartForPublishing(ctx)) + t.Cleanup(func() { _ = svc.Stop(context.Background()) }) + + headerConfig := types.HeaderConfig{ + Height: genesisDoc.InitialHeight, + DataHash: bytesN(rnd, 32), + AppHash: bytesN(rnd, 32), + Signer: noopSigner, + } + signedHeader, err := types.GetRandomSignedHeaderCustom(t.Context(), &headerConfig, genesisDoc.ChainID) + require.NoError(t, err) + require.NoError(t, signedHeader.Validate()) + + require.NoError(t, svc.WriteToStoreAndBroadcast(ctx, &types.P2PSignedHeader{SignedHeader: signedHeader})) + require.True(t, svc.storeInitialized.Load()) +} + func TestHeaderSyncServiceRestart(t *testing.T) { mainKV := sync.MutexWrap(datastore.NewMapDatastore()) pk, _, err := crypto.GenerateEd25519Key(cryptoRand.Reader) @@ -78,7 +149,7 @@ func TestHeaderSyncServiceRestart(t *testing.T) { require.NoError(t, signedHeader.Validate()) require.NoError(t, svc.WriteToStoreAndBroadcast(ctx, &types.P2PSignedHeader{SignedHeader: signedHeader})) - for i := genesisDoc.InitialHeight + 1; i < 2; i++ { + for i := genesisDoc.InitialHeight + 1; i < 10; i++ { signedHeader = nextHeader(t, signedHeader, genesisDoc.ChainID, noopSigner) t.Logf("signed header: %d", i) require.NoError(t, svc.WriteToStoreAndBroadcast(ctx, &types.P2PSignedHeader{SignedHeader: signedHeader})) diff --git a/pkg/sync/syncer_status.go b/pkg/sync/syncer_status.go index 1fe26008c..70364f276 100644 --- a/pkg/sync/syncer_status.go +++ b/pkg/sync/syncer_status.go @@ -1,13 +1,49 @@ package sync -import "sync/atomic" +import "sync" // SyncerStatus is used by header and block exchange service for keeping track // of the status of the syncer in them. type SyncerStatus struct { - started atomic.Bool + mu sync.Mutex + started bool } func (syncerStatus *SyncerStatus) isStarted() bool { - return syncerStatus.started.Load() + syncerStatus.mu.Lock() + defer syncerStatus.mu.Unlock() + + return syncerStatus.started +} + +func (syncerStatus *SyncerStatus) startOnce(startFn func() error) (bool, error) { + syncerStatus.mu.Lock() + defer syncerStatus.mu.Unlock() + + if syncerStatus.started { + return false, nil + } + + if err := startFn(); err != nil { + return false, err + } + + syncerStatus.started = true + return true, nil +} + +func (syncerStatus *SyncerStatus) stopIfStarted(stopFn func() error) error { + syncerStatus.mu.Lock() + defer syncerStatus.mu.Unlock() + + if !syncerStatus.started { + return nil + } + + if err := stopFn(); err != nil { + return err + } + + syncerStatus.started = false + return nil } diff --git a/pkg/sync/syncer_status_test.go b/pkg/sync/syncer_status_test.go new file mode 100644 index 000000000..01ecbdf49 --- /dev/null +++ b/pkg/sync/syncer_status_test.go @@ -0,0 +1,128 @@ +package sync + +import ( + "errors" + "sync" + "sync/atomic" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestSyncerStatusStartOnce(t *testing.T) { + t.Parallel() + + specs := map[string]struct { + run func(*testing.T, *SyncerStatus) + }{ + "concurrent_start_only_runs_once": { + run: func(t *testing.T, status *SyncerStatus) { + t.Helper() + + var calls atomic.Int32 + started := make(chan struct{}) + release := make(chan struct{}) + var wg sync.WaitGroup + + for range 8 { + wg.Add(1) + go func() { + defer wg.Done() + _, err := status.startOnce(func() error { + if calls.Add(1) == 1 { + close(started) + } + <-release + return nil + }) + require.NoError(t, err) + }() + } + + <-started + close(release) + wg.Wait() + + require.Equal(t, int32(1), calls.Load()) + require.True(t, status.isStarted()) + }, + }, + "failed_start_can_retry": { + run: func(t *testing.T, status *SyncerStatus) { + t.Helper() + + var calls atomic.Int32 + errBoom := errors.New("boom") + + startedNow, err := status.startOnce(func() error { + calls.Add(1) + return errBoom + }) + require.ErrorIs(t, err, errBoom) + require.False(t, startedNow) + require.False(t, status.isStarted()) + + startedNow, err = status.startOnce(func() error { + calls.Add(1) + return nil + }) + require.NoError(t, err) + require.True(t, startedNow) + require.True(t, status.isStarted()) + require.Equal(t, int32(2), calls.Load()) + }, + }, + } + + for name, spec := range specs { + t.Run(name, func(t *testing.T) { + t.Parallel() + spec.run(t, &SyncerStatus{}) + }) + } +} + +func TestSyncerStatusStopIfStarted(t *testing.T) { + t.Parallel() + + specs := map[string]struct { + started bool + wantErr bool + }{ + "no_op_when_not_started": { + started: false, + wantErr: false, + }, + "stop_clears_started": { + started: true, + wantErr: false, + }, + } + + for name, spec := range specs { + t.Run(name, func(t *testing.T) { + t.Parallel() + + status := &SyncerStatus{started: spec.started} + var stopCalls atomic.Int32 + + err := status.stopIfStarted(func() error { + stopCalls.Add(1) + return nil + }) + + if spec.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + + if spec.started { + require.Equal(t, int32(1), stopCalls.Load()) + } else { + require.Zero(t, stopCalls.Load()) + } + require.False(t, status.isStarted()) + }) + } +} diff --git a/test/e2e/failover_e2e_test.go b/test/e2e/failover_e2e_test.go index 30d689d00..71abb864d 100644 --- a/test/e2e/failover_e2e_test.go +++ b/test/e2e/failover_e2e_test.go @@ -26,6 +26,7 @@ import ( ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethclient" + "github.com/libp2p/go-libp2p/core/peer" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" @@ -34,6 +35,7 @@ import ( evmtest "github.com/evstack/ev-node/execution/evm/test" blobrpc "github.com/evstack/ev-node/pkg/da/jsonrpc" coreda "github.com/evstack/ev-node/pkg/da/types" + "github.com/evstack/ev-node/pkg/p2p/key" "github.com/evstack/ev-node/pkg/rpc/client" rpcclient "github.com/evstack/ev-node/pkg/rpc/client" "github.com/evstack/ev-node/types" @@ -82,17 +84,20 @@ func TestLeaseFailoverE2E(t *testing.T) { clusterNodes := &raftClusterNodes{ nodes: make(map[string]*nodeDetails), } - node1P2PAddr := env.Endpoints.GetRollkitP2PAddress() - node2P2PAddr := env.Endpoints.GetFullNodeP2PAddress() - node3P2PAddr := fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", mustGetAvailablePort(t)) + node1P2PListen := env.Endpoints.GetRollkitP2PAddress() + node2P2PListen := env.Endpoints.GetFullNodeP2PAddress() + node3P2PListen := fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", mustGetAvailablePort(t)) + node1P2PAddr := mustNodeP2PMultiAddr(t, workDir, "node1", node1P2PListen) + node2P2PAddr := mustNodeP2PMultiAddr(t, workDir, "node2", node2P2PListen) + node3P2PAddr := mustNodeP2PMultiAddr(t, workDir, "node3", node3P2PListen) // Start node1 (bootstrap mode) go func() { p2pPeers := node2P2PAddr + "," + node3P2PAddr proc := setupRaftSequencerNode(t, sut, workDir, "node1", node1RaftAddr, env.SequencerJWT, env.GenesisHash, env.Endpoints.GetDAAddress(), - bootstrapDir, raftCluster, p2pPeers, env.Endpoints.GetRollkitRPCListen(), env.Endpoints.GetRollkitP2PAddress(), + bootstrapDir, raftCluster, p2pPeers, env.Endpoints.GetRollkitRPCListen(), node1P2PListen, env.Endpoints.GetSequencerEngineURL(), env.Endpoints.GetSequencerEthURL(), true, passphraseFile) - clusterNodes.Set("node1", env.Endpoints.GetRollkitRPCAddress(), proc, env.Endpoints.GetSequencerEthURL(), node1RaftAddr, env.Endpoints.GetRollkitP2PAddress(), env.Endpoints.GetSequencerEngineURL(), env.Endpoints.GetSequencerEthURL()) + clusterNodes.Set("node1", env.Endpoints.GetRollkitRPCAddress(), proc, env.Endpoints.GetSequencerEthURL(), node1RaftAddr, node1P2PListen, node1P2PAddr, env.Endpoints.GetSequencerEngineURL(), env.Endpoints.GetSequencerEthURL()) t.Log("Node1 is up") }() @@ -100,8 +105,8 @@ func TestLeaseFailoverE2E(t *testing.T) { go func() { t.Log("Starting Node2") p2pPeers := node1P2PAddr + "," + node3P2PAddr - proc := setupRaftSequencerNode(t, sut, workDir, "node2", node2RaftAddr, env.FullNodeJWT, env.GenesisHash, env.Endpoints.GetDAAddress(), bootstrapDir, raftCluster, p2pPeers, env.Endpoints.GetFullNodeRPCListen(), env.Endpoints.GetFullNodeP2PAddress(), env.Endpoints.GetFullNodeEngineURL(), env.Endpoints.GetFullNodeEthURL(), true, passphraseFile) - clusterNodes.Set("node2", env.Endpoints.GetFullNodeRPCAddress(), proc, env.Endpoints.GetFullNodeEthURL(), node2RaftAddr, env.Endpoints.GetFullNodeP2PAddress(), env.Endpoints.GetFullNodeEngineURL(), env.Endpoints.GetFullNodeEthURL()) + proc := setupRaftSequencerNode(t, sut, workDir, "node2", node2RaftAddr, env.FullNodeJWT, env.GenesisHash, env.Endpoints.GetDAAddress(), bootstrapDir, raftCluster, p2pPeers, env.Endpoints.GetFullNodeRPCListen(), node2P2PListen, env.Endpoints.GetFullNodeEngineURL(), env.Endpoints.GetFullNodeEthURL(), true, passphraseFile) + clusterNodes.Set("node2", env.Endpoints.GetFullNodeRPCAddress(), proc, env.Endpoints.GetFullNodeEthURL(), node2RaftAddr, node2P2PListen, node2P2PAddr, env.Endpoints.GetFullNodeEngineURL(), env.Endpoints.GetFullNodeEthURL()) t.Log("Node2 is up") }() @@ -112,8 +117,8 @@ func TestLeaseFailoverE2E(t *testing.T) { p2pPeers := node1P2PAddr + "," + node2P2PAddr node3RPCListen := fmt.Sprintf("127.0.0.1:%d", mustGetAvailablePort(t)) ethEngineURL := fmt.Sprintf("http://127.0.0.1:%s", fullNode3EnginePort) - proc := setupRaftSequencerNode(t, sut, workDir, "node3", node3RaftAddr, jwtSecret3, env.GenesisHash, env.Endpoints.GetDAAddress(), bootstrapDir, raftCluster, p2pPeers, node3RPCListen, node3P2PAddr, ethEngineURL, node3EthAddr, true, passphraseFile) - clusterNodes.Set("node3", "http://"+node3RPCListen, proc, node3EthAddr, node3RaftAddr, node3P2PAddr, ethEngineURL, node3EthAddr) + proc := setupRaftSequencerNode(t, sut, workDir, "node3", node3RaftAddr, jwtSecret3, env.GenesisHash, env.Endpoints.GetDAAddress(), bootstrapDir, raftCluster, p2pPeers, node3RPCListen, node3P2PListen, ethEngineURL, node3EthAddr, true, passphraseFile) + clusterNodes.Set("node3", "http://"+node3RPCListen, proc, node3EthAddr, node3RaftAddr, node3P2PListen, node3P2PAddr, ethEngineURL, node3EthAddr) t.Log("Node3 is up") }() @@ -172,11 +177,11 @@ func TestLeaseFailoverE2E(t *testing.T) { } } oldDetails := clusterNodes.Details(oldLeader) - restartedNodeProcess := setupRaftSequencerNode(t, sut, workDir, oldLeader, oldDetails.raftAddr, env.SequencerJWT, env.GenesisHash, env.Endpoints.GetDAAddress(), "", raftCluster, clusterNodes.Details(newLeader).p2pAddr, oldDetails.rpcAddr, oldDetails.p2pAddr, oldDetails.engineURL, oldDetails.ethAddr, false, passphraseFile) + restartedNodeProcess := setupRaftSequencerNode(t, sut, workDir, oldLeader, oldDetails.raftAddr, env.SequencerJWT, env.GenesisHash, env.Endpoints.GetDAAddress(), "", raftCluster, clusterNodes.Details(newLeader).p2pPeerAddr, oldDetails.rpcAddr, oldDetails.p2pAddr, oldDetails.engineURL, oldDetails.ethAddr, false, passphraseFile) t.Log("Restarted old leader to sync with cluster: " + oldLeader) if IsNodeUp(t, oldDetails.rpcAddr, NodeStartupTimeout) { - clusterNodes.Set(oldLeader, oldDetails.rpcAddr, restartedNodeProcess, oldDetails.ethAddr, oldDetails.raftAddr, "", oldDetails.engineURL, oldDetails.ethAddr) + clusterNodes.Set(oldLeader, oldDetails.rpcAddr, restartedNodeProcess, oldDetails.ethAddr, oldDetails.raftAddr, "", oldDetails.p2pPeerAddr, oldDetails.engineURL, oldDetails.ethAddr) } else { t.Log("+++ old leader did not recover on restart. Skipping node verification") } @@ -276,17 +281,20 @@ func TestHASequencerRollingRestartE2E(t *testing.T) { clusterNodes := &raftClusterNodes{ nodes: make(map[string]*nodeDetails), } - node1P2PAddr := env.Endpoints.GetRollkitP2PAddress() - node2P2PAddr := env.Endpoints.GetFullNodeP2PAddress() - node3P2PAddr := fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", mustGetAvailablePort(t)) + node1P2PListen := env.Endpoints.GetRollkitP2PAddress() + node2P2PListen := env.Endpoints.GetFullNodeP2PAddress() + node3P2PListen := fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", mustGetAvailablePort(t)) + node1P2PAddr := mustNodeP2PMultiAddr(t, workDir, "node1", node1P2PListen) + node2P2PAddr := mustNodeP2PMultiAddr(t, workDir, "node2", node2P2PListen) + node3P2PAddr := mustNodeP2PMultiAddr(t, workDir, "node3", node3P2PListen) // Start node1 (bootstrap mode) go func() { p2pPeers := node2P2PAddr + "," + node3P2PAddr proc := setupRaftSequencerNode(t, sut, workDir, "node1", node1RaftAddr, env.SequencerJWT, env.GenesisHash, env.Endpoints.GetDAAddress(), - bootstrapDir, raftCluster, p2pPeers, env.Endpoints.GetRollkitRPCListen(), env.Endpoints.GetRollkitP2PAddress(), + bootstrapDir, raftCluster, p2pPeers, env.Endpoints.GetRollkitRPCListen(), node1P2PListen, env.Endpoints.GetSequencerEngineURL(), env.Endpoints.GetSequencerEthURL(), true, passphraseFile) - clusterNodes.Set("node1", env.Endpoints.GetRollkitRPCAddress(), proc, env.Endpoints.GetSequencerEthURL(), node1RaftAddr, env.Endpoints.GetRollkitP2PAddress(), env.Endpoints.GetSequencerEngineURL(), env.Endpoints.GetSequencerEthURL()) + clusterNodes.Set("node1", env.Endpoints.GetRollkitRPCAddress(), proc, env.Endpoints.GetSequencerEthURL(), node1RaftAddr, node1P2PListen, node1P2PAddr, env.Endpoints.GetSequencerEngineURL(), env.Endpoints.GetSequencerEthURL()) t.Log("Node1 is up") }() @@ -294,8 +302,8 @@ func TestHASequencerRollingRestartE2E(t *testing.T) { go func() { t.Log("Starting Node2") p2pPeers := node1P2PAddr + "," + node3P2PAddr - proc := setupRaftSequencerNode(t, sut, workDir, "node2", node2RaftAddr, env.FullNodeJWT, env.GenesisHash, env.Endpoints.GetDAAddress(), bootstrapDir, raftCluster, p2pPeers, env.Endpoints.GetFullNodeRPCListen(), env.Endpoints.GetFullNodeP2PAddress(), env.Endpoints.GetFullNodeEngineURL(), env.Endpoints.GetFullNodeEthURL(), true, passphraseFile) - clusterNodes.Set("node2", env.Endpoints.GetFullNodeRPCAddress(), proc, env.Endpoints.GetFullNodeEthURL(), node2RaftAddr, env.Endpoints.GetFullNodeP2PAddress(), env.Endpoints.GetFullNodeEngineURL(), env.Endpoints.GetFullNodeEthURL()) + proc := setupRaftSequencerNode(t, sut, workDir, "node2", node2RaftAddr, env.FullNodeJWT, env.GenesisHash, env.Endpoints.GetDAAddress(), bootstrapDir, raftCluster, p2pPeers, env.Endpoints.GetFullNodeRPCListen(), node2P2PListen, env.Endpoints.GetFullNodeEngineURL(), env.Endpoints.GetFullNodeEthURL(), true, passphraseFile) + clusterNodes.Set("node2", env.Endpoints.GetFullNodeRPCAddress(), proc, env.Endpoints.GetFullNodeEthURL(), node2RaftAddr, node2P2PListen, node2P2PAddr, env.Endpoints.GetFullNodeEngineURL(), env.Endpoints.GetFullNodeEthURL()) t.Log("Node2 is up") }() @@ -306,8 +314,8 @@ func TestHASequencerRollingRestartE2E(t *testing.T) { p2pPeers := node1P2PAddr + "," + node2P2PAddr node3RPCListen := fmt.Sprintf("127.0.0.1:%d", mustGetAvailablePort(t)) ethEngineURL := fmt.Sprintf("http://127.0.0.1:%s", fullNode3EnginePort) - proc := setupRaftSequencerNode(t, sut, workDir, "node3", node3RaftAddr, jwtSecret3, env.GenesisHash, env.Endpoints.GetDAAddress(), bootstrapDir, raftCluster, p2pPeers, node3RPCListen, node3P2PAddr, ethEngineURL, node3EthAddr, true, passphraseFile) - clusterNodes.Set("node3", "http://"+node3RPCListen, proc, node3EthAddr, node3RaftAddr, node3P2PAddr, ethEngineURL, node3EthAddr) + proc := setupRaftSequencerNode(t, sut, workDir, "node3", node3RaftAddr, jwtSecret3, env.GenesisHash, env.Endpoints.GetDAAddress(), bootstrapDir, raftCluster, p2pPeers, node3RPCListen, node3P2PListen, ethEngineURL, node3EthAddr, true, passphraseFile) + clusterNodes.Set("node3", "http://"+node3RPCListen, proc, node3EthAddr, node3RaftAddr, node3P2PListen, node3P2PAddr, ethEngineURL, node3EthAddr) t.Log("Node3 is up") }() @@ -393,7 +401,7 @@ func TestHASequencerRollingRestartE2E(t *testing.T) { nodeDetails.engineURL, nodeDetails.ethAddr, false, passphraseFile) clusterNodes.Set(nodeName, nodeDetails.rpcAddr, restartedProc, nodeDetails.ethAddr, - nodeDetails.raftAddr, nodeDetails.p2pAddr, nodeDetails.engineURL, nodeDetails.ethAddr) + nodeDetails.raftAddr, nodeDetails.p2pAddr, nodeDetails.p2pPeerAddr, nodeDetails.engineURL, nodeDetails.ethAddr) } // Initial restart of all nodes @@ -721,6 +729,16 @@ func initChain(t *testing.T, sut *SystemUnderTest, workDir string) string { require.NoError(t, err, "failed to init node", output) return passphraseFile } + +func mustNodeP2PMultiAddr(t *testing.T, workDir, nodeID, listenAddr string) string { + t.Helper() + nodeKey, err := key.LoadOrGenNodeKey(filepath.Join(workDir, nodeID, "config")) + require.NoError(t, err) + peerID, err := peer.IDFromPrivateKey(nodeKey.PrivKey) + require.NoError(t, err) + return fmt.Sprintf("%s/p2p/%s", listenAddr, peerID.String()) +} + func setupRaftSequencerNode( t *testing.T, sut *SystemUnderTest, @@ -843,6 +861,7 @@ type nodeDetails struct { xRPCClient atomic.Pointer[rpcclient.Client] running atomic.Bool p2pAddr string + p2pPeerAddr string engineURL string ethURL string } @@ -895,10 +914,10 @@ type raftClusterNodes struct { nodes map[string]*nodeDetails } -func (c *raftClusterNodes) Set(node string, listen string, proc *os.Process, eth string, raftAddr string, p2pAddr string, engineURL string, ethURL string) { +func (c *raftClusterNodes) Set(node string, listen string, proc *os.Process, eth string, raftAddr string, p2pAddr string, p2pPeerAddr string, engineURL string, ethURL string) { c.mx.Lock() defer c.mx.Unlock() - d := &nodeDetails{raftAddr: raftAddr, rpcAddr: listen, process: proc, ethAddr: eth, p2pAddr: p2pAddr, engineURL: engineURL, ethURL: ethURL} + d := &nodeDetails{raftAddr: raftAddr, rpcAddr: listen, process: proc, ethAddr: eth, p2pAddr: p2pAddr, p2pPeerAddr: p2pPeerAddr, engineURL: engineURL, ethURL: ethURL} d.running.Store(true) c.nodes[node] = d } @@ -946,11 +965,9 @@ func leader(t require.TestingT, nodes map[string]*nodeDetails) (string, *nodeDet } resp, err := client.Get(details.rpcAddr + "/raft/node") require.NoError(t, err) - defer resp.Body.Close() - var status nodeStatus require.NoError(t, json.NewDecoder(resp.Body).Decode(&status)) - + _ = resp.Body.Close() if status.IsLeader { return node, details } @@ -973,16 +990,17 @@ func must[T any](r T, err error) T { func IsNodeUp(t *testing.T, rpcAddr string, timeout time.Duration) bool { t.Helper() t.Logf("Query node is up: %s", rpcAddr) - ctx, done := context.WithTimeout(context.Background(), timeout) + ctx, done := context.WithTimeout(t.Context(), timeout) defer done() - ticker := time.Tick(min(timeout/10, 200*time.Millisecond)) + ticker := time.NewTicker(min(timeout/10, 200*time.Millisecond)) + defer ticker.Stop() c := client.NewClient(rpcAddr) require.NotNil(t, c) var lastBlock uint64 for { select { - case <-ticker: + case <-ticker.C: switch s, err := c.GetState(ctx); { case err != nil: // ignore case lastBlock == 0: diff --git a/test/e2e/sut_helper.go b/test/e2e/sut_helper.go index f5783da8b..633d1efdb 100644 --- a/test/e2e/sut_helper.go +++ b/test/e2e/sut_helper.go @@ -162,8 +162,12 @@ func (s *SystemUnderTest) awaitProcessCleanup(cmd *exec.Cmd) { s.cmdToPids[cmdKey] = append(s.cmdToPids[cmdKey], pid) s.pidsLock.Unlock() go func() { - _ = cmd.Wait() // blocks until shutdown - s.logf("Process stopped, pid: %d\n", pid) + waitErr := cmd.Wait() // blocks until shutdown + if waitErr != nil { + s.logf("Process stopped, pid: %d, err: %v\n", pid, waitErr) + } else { + s.logf("Process stopped, pid: %d\n", pid) + } s.pidsLock.Lock() defer s.pidsLock.Unlock() delete(s.pids, pid) @@ -182,11 +186,9 @@ func (s *SystemUnderTest) watchLogs(prefix string, cmd *exec.Cmd) { outReader, err := cmd.StdoutPipe() require.NoError(s.t, err) - if s.debug { - logDir := filepath.Join(WorkDir, "testnet") + if logDir := s.processLogDir(); logDir != "" { require.NoError(s.t, os.MkdirAll(logDir, 0o750)) - testName := strings.ReplaceAll(s.t.Name(), "/", "-") - logfileName := filepath.Join(logDir, prefix+fmt.Sprintf("exec-%s-%s-%d.out", filepath.Base(cmd.Args[0]), testName, time.Now().UnixNano())) + logfileName := filepath.Join(logDir, prefix+fmt.Sprintf("exec-%s-%d.out", filepath.Base(cmd.Args[0]), time.Now().UnixNano())) logfile, err := os.Create(logfileName) require.NoError(s.t, err) errReader = io.NopCloser(io.TeeReader(errReader, logfile)) @@ -202,6 +204,19 @@ func (s *SystemUnderTest) watchLogs(prefix string, cmd *exec.Cmd) { }) } +func (s *SystemUnderTest) processLogDir() string { + logRoot := strings.TrimSpace(os.Getenv("EV_E2E_LOG_DIR")) + if logRoot == "" && s.debug { + logRoot = filepath.Join(WorkDir, "testnet") + } + if logRoot == "" { + return "" + } + + testName := strings.ReplaceAll(s.t.Name(), "/", "-") + return filepath.Join(logRoot, testName) +} + // PrintBuffer outputs the contents of outBuff and errBuff to stdout, prefixing each entry with "out>" or "err>", respectively. func (s *SystemUnderTest) PrintBuffer() { out := os.Stdout