Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 73 additions & 69 deletions CHANGELOG.md

Large diffs are not rendered by default.

40 changes: 38 additions & 2 deletions node/failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type failoverState struct {
dataSyncService *evsync.DataSyncService
rpcServer *http.Server
bc *block.Components
raftNode *raft.Node
isAggregator bool

// catchup fields — used when the aggregator needs to sync before producing
catchupEnabled bool
Expand Down Expand Up @@ -172,13 +174,34 @@ func setupFailoverState(
dataSyncService: dataSyncService,
rpcServer: rpcServer,
bc: bc,
raftNode: raftNode,
isAggregator: isAggregator,
store: rktStore,
catchupEnabled: catchupEnabled,
catchupTimeout: nodeConfig.Node.CatchupTimeout.Duration,
daBlockTime: nodeConfig.DA.BlockTime.Duration,
}, nil
}

func (f *failoverState) shouldStartSyncInPublisherMode(ctx context.Context) bool {
if !f.isAggregator || f.raftNode == nil || !f.raftNode.IsLeader() {
return false
}

height, err := f.store.Height(ctx)
if err != nil {
f.logger.Warn().Err(err).Msg("cannot determine local height; keeping blocking sync startup")
return false
}
if height > 0 {
return false
}

f.logger.Info().
Msg("raft leader with empty store: starting sync services in publisher mode")
return true
}

func (f *failoverState) Run(pCtx context.Context) (multiErr error) {
stopService := func(stoppable func(context.Context) error, name string) { //nolint:contextcheck // shutdown uses context.Background intentionally
// parent context is cancelled already, so we need to create a new one
Expand Down Expand Up @@ -207,15 +230,28 @@ func (f *failoverState) Run(pCtx context.Context) (multiErr error) {
})

// start header and data sync services concurrently to avoid cumulative startup delay.
startSyncInPublisherMode := f.shouldStartSyncInPublisherMode(ctx)
syncWg, syncCtx := errgroup.WithContext(ctx)
syncWg.Go(func() error {
if err := f.headerSyncService.Start(syncCtx); err != nil {
var err error
if startSyncInPublisherMode {
err = f.headerSyncService.StartForPublishing(syncCtx)
} else {
err = f.headerSyncService.Start(syncCtx)
}
if err != nil {
return fmt.Errorf("header sync service: %w", err)
}
return nil
})
syncWg.Go(func() error {
if err := f.dataSyncService.Start(syncCtx); err != nil {
var err error
if startSyncInPublisherMode {
err = f.dataSyncService.StartForPublishing(syncCtx)
} else {
err = f.dataSyncService.Start(syncCtx)
}
if err != nil {
return fmt.Errorf("data sync service: %w", err)
}
return nil
Expand Down
93 changes: 61 additions & 32 deletions pkg/sync/sync_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,9 @@ func (syncService *SyncService[H]) WriteToStoreAndBroadcast(ctx context.Context,
}
}

firstStart := false
if !syncService.syncerStatus.started.Load() {
firstStart = true
if err := syncService.startSyncer(ctx); err != nil {
return fmt.Errorf("failed to start syncer after initializing the store: %w", err)
}
firstStart, err := syncService.startSyncer(ctx)
if err != nil {
return fmt.Errorf("failed to start syncer after initializing the store: %w", err)
}

// Broadcast for subscribers
Expand Down Expand Up @@ -190,20 +187,9 @@ func (s *SyncService[H]) AppendDAHint(ctx context.Context, daHeight uint64, heig

// Start is a part of Service interface.
func (syncService *SyncService[H]) Start(ctx context.Context) error {
// setup P2P infrastructure, but don't start Subscriber yet.
peerIDs, err := syncService.setupP2PInfrastructure(ctx)
peerIDs, err := syncService.prepareStart(ctx)
if err != nil {
return fmt.Errorf("failed to setup syncer P2P infrastructure: %w", err)
}

// create syncer, must be before initFromP2PWithRetry which calls startSyncer.
if syncService.syncer, err = newSyncer(
syncService.ex,
syncService.store,
syncService.sub,
[]goheadersync.Option{goheadersync.WithBlockTime(syncService.conf.Node.BlockTime.Duration)},
); err != nil {
return fmt.Errorf("failed to create syncer: %w", err)
return err
}

// initialize stores from P2P (blocking until genesis is fetched for followers)
Expand All @@ -223,20 +209,61 @@ func (syncService *SyncService[H]) Start(ctx context.Context) error {
return nil
}

// startSyncer starts the SyncService's syncer
func (syncService *SyncService[H]) startSyncer(ctx context.Context) error {
if syncService.syncerStatus.isStarted() {
return nil
// StartForPublishing starts the sync service in publisher mode.
//
// This mode is used by a raft leader with an empty local store: no peer can serve
// height 1 yet, so waiting for initFromP2PWithRetry would deadlock block production.
// We still need the P2P exchange server and pubsub subscriber to be ready before the
// first block is produced, because WriteToStoreAndBroadcast relies on them to gossip
// the block that bootstraps the network.
func (syncService *SyncService[H]) StartForPublishing(ctx context.Context) error {
if _, err := syncService.prepareStart(ctx); err != nil {
return err
}

if err := syncService.syncer.Start(ctx); err != nil {
return fmt.Errorf("failed to start syncer: %w", err)
if err := syncService.startSubscriber(ctx); err != nil {
return fmt.Errorf("failed to start subscriber: %w", err)
}

syncService.syncerStatus.started.Store(true)
return nil
}

func (syncService *SyncService[H]) prepareStart(ctx context.Context) ([]peer.ID, error) {
// setup P2P infrastructure, but don't start Subscriber yet.
peerIDs, err := syncService.setupP2PInfrastructure(ctx)
if err != nil {
return nil, fmt.Errorf("failed to setup syncer P2P infrastructure: %w", err)
}

// create syncer, must be before initFromP2PWithRetry which calls startSyncer.
if syncService.syncer, err = newSyncer(
syncService.ex,
syncService.store,
syncService.sub,
[]goheadersync.Option{goheadersync.WithBlockTime(syncService.conf.Node.BlockTime.Duration)},
); err != nil {
return nil, fmt.Errorf("failed to create syncer: %w", err)
}

return peerIDs, nil
}

// startSyncer starts the SyncService's syncer.
// It returns true when this call performed the actual start.
func (syncService *SyncService[H]) startSyncer(ctx context.Context) (bool, error) {
startedNow, err := syncService.syncerStatus.startOnce(func() error {
if err := syncService.syncer.Start(ctx); err != nil {
return fmt.Errorf("failed to start syncer: %w", err)
}
return nil
})
if err != nil {
return false, err
}

return startedNow, nil
}

// initStore initializes the store with the given initial header.
// it is a no-op if the store is already initialized.
// Returns true when the store was initialized by this call.
Expand Down Expand Up @@ -371,7 +398,7 @@ func (syncService *SyncService[H]) initFromP2PWithRetry(ctx context.Context, pee
return false, fmt.Errorf("failed to initialize the store: %w", err)
}
}
if err := syncService.startSyncer(ctx); err != nil {
if _, err := syncService.startSyncer(ctx); err != nil {
return false, err
}
return true, nil
Expand All @@ -386,6 +413,8 @@ func (syncService *SyncService[H]) initFromP2PWithRetry(ctx context.Context, pee
p2pInitTimeout := 30 * time.Second
timeoutTimer := time.NewTimer(p2pInitTimeout)
defer timeoutTimer.Stop()
retryTimer := time.NewTimer(backoff)
defer retryTimer.Stop()

for {
ok, err := tryInit(ctx)
Expand All @@ -403,13 +432,13 @@ func (syncService *SyncService[H]) initFromP2PWithRetry(ctx context.Context, pee
Dur("timeout", p2pInitTimeout).
Msg("P2P header sync initialization timed out, deferring to DA sync")
return nil
case <-time.After(backoff):
case <-retryTimer.C:
}

backoff *= 2
if backoff > maxBackoff {
backoff = maxBackoff
}
retryTimer.Reset(backoff)
}
}

Expand All @@ -424,9 +453,9 @@ func (syncService *SyncService[H]) Stop(ctx context.Context) error {
syncService.ex.Stop(ctx),
syncService.sub.Stop(ctx),
)
if syncService.syncerStatus.isStarted() {
err = errors.Join(err, syncService.syncer.Stop(ctx))
}
err = errors.Join(err, syncService.syncerStatus.stopIfStarted(func() error {
return syncService.syncer.Stop(ctx)
}))
// Stop the store adapter if it has a Stop method
if stopper, ok := syncService.store.(interface{ Stop(context.Context) error }); ok {
err = errors.Join(err, stopper.Stop(ctx))
Expand Down
73 changes: 72 additions & 1 deletion pkg/sync/sync_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/ipfs/go-datastore/sync"
"github.com/libp2p/go-libp2p/core/crypto"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/multiformats/go-multiaddr"
"github.com/rs/zerolog"
"github.com/stretchr/testify/require"

Expand All @@ -25,6 +26,76 @@ import (
"github.com/evstack/ev-node/types"
)

func TestHeaderSyncServiceStartForPublishingWithPeers(t *testing.T) {
mainKV := sync.MutexWrap(datastore.NewMapDatastore())
pk, _, err := crypto.GenerateEd25519Key(cryptoRand.Reader)
require.NoError(t, err)
noopSigner, err := noop.NewNoopSigner(pk)
require.NoError(t, err)
rnd := rand.New(rand.NewSource(1)) // nolint:gosec // test code only
mn := mocknet.New()

chainID := "test-chain-id"
genesisDoc := genesispkg.Genesis{
ChainID: chainID,
StartTime: time.Now(),
InitialHeight: 1,
ProposerAddress: []byte("test"),
}

conf := config.DefaultConfig()
conf.RootDir = t.TempDir()
logger := zerolog.Nop()

nodeKey1, err := key.LoadOrGenNodeKey(filepath.Join(conf.RootDir, "node1_key.json"))
require.NoError(t, err)
host1, err := mn.AddPeer(nodeKey1.PrivKey, multiaddr.StringCast("/ip4/127.0.0.1/tcp/0"))
require.NoError(t, err)

nodeKey2, err := key.LoadOrGenNodeKey(filepath.Join(conf.RootDir, "node2_key.json"))
require.NoError(t, err)
host2, err := mn.AddPeer(nodeKey2.PrivKey, multiaddr.StringCast("/ip4/127.0.0.1/tcp/0"))
require.NoError(t, err)

require.NoError(t, mn.LinkAll())
require.NoError(t, mn.ConnectAllButSelf())

client1, err := p2p.NewClientWithHost(conf.P2P, nodeKey1.PrivKey, mainKV, chainID, logger, p2p.NopMetrics(), host1)
require.NoError(t, err)
client2, err := p2p.NewClientWithHost(conf.P2P, nodeKey2.PrivKey, mainKV, chainID, logger, p2p.NopMetrics(), host2)
require.NoError(t, err)

ctx, cancel := context.WithCancel(t.Context())
defer cancel()
require.NoError(t, client1.Start(ctx))
require.NoError(t, client2.Start(ctx))
t.Cleanup(func() { _ = client1.Close() })
t.Cleanup(func() { _ = client2.Close() })

require.Eventually(t, func() bool {
return len(client1.PeerIDs()) > 0
}, time.Second, 10*time.Millisecond)

evStore := store.New(mainKV)
svc, err := NewHeaderSyncService(evStore, conf, genesisDoc, client1, logger)
require.NoError(t, err)
require.NoError(t, svc.StartForPublishing(ctx))
t.Cleanup(func() { _ = svc.Stop(context.Background()) })

headerConfig := types.HeaderConfig{
Height: genesisDoc.InitialHeight,
DataHash: bytesN(rnd, 32),
AppHash: bytesN(rnd, 32),
Signer: noopSigner,
}
signedHeader, err := types.GetRandomSignedHeaderCustom(t.Context(), &headerConfig, genesisDoc.ChainID)
require.NoError(t, err)
require.NoError(t, signedHeader.Validate())

require.NoError(t, svc.WriteToStoreAndBroadcast(ctx, &types.P2PSignedHeader{SignedHeader: signedHeader}))
require.True(t, svc.storeInitialized.Load())
}

func TestHeaderSyncServiceRestart(t *testing.T) {
mainKV := sync.MutexWrap(datastore.NewMapDatastore())
pk, _, err := crypto.GenerateEd25519Key(cryptoRand.Reader)
Expand Down Expand Up @@ -78,7 +149,7 @@ func TestHeaderSyncServiceRestart(t *testing.T) {
require.NoError(t, signedHeader.Validate())
require.NoError(t, svc.WriteToStoreAndBroadcast(ctx, &types.P2PSignedHeader{SignedHeader: signedHeader}))

for i := genesisDoc.InitialHeight + 1; i < 2; i++ {
for i := genesisDoc.InitialHeight + 1; i < 10; i++ {
signedHeader = nextHeader(t, signedHeader, genesisDoc.ChainID, noopSigner)
t.Logf("signed header: %d", i)
require.NoError(t, svc.WriteToStoreAndBroadcast(ctx, &types.P2PSignedHeader{SignedHeader: signedHeader}))
Expand Down
42 changes: 39 additions & 3 deletions pkg/sync/syncer_status.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,49 @@
package sync

import "sync/atomic"
import "sync"

// SyncerStatus is used by header and block exchange service for keeping track
// of the status of the syncer in them.
type SyncerStatus struct {
started atomic.Bool
mu sync.Mutex
started bool
}

func (syncerStatus *SyncerStatus) isStarted() bool {
return syncerStatus.started.Load()
syncerStatus.mu.Lock()
defer syncerStatus.mu.Unlock()

return syncerStatus.started
}

func (syncerStatus *SyncerStatus) startOnce(startFn func() error) (bool, error) {
syncerStatus.mu.Lock()
defer syncerStatus.mu.Unlock()

if syncerStatus.started {
return false, nil
}

if err := startFn(); err != nil {
return false, err
}

syncerStatus.started = true
return true, nil
}

func (syncerStatus *SyncerStatus) stopIfStarted(stopFn func() error) error {
syncerStatus.mu.Lock()
defer syncerStatus.mu.Unlock()

if !syncerStatus.started {
return nil
}

if err := stopFn(); err != nil {
return err
}

syncerStatus.started = false
return nil
}
Loading
Loading