diff --git a/sei-tendermint/config/config.go b/sei-tendermint/config/config.go index 692ca922c5..16b45df709 100644 --- a/sei-tendermint/config/config.go +++ b/sei-tendermint/config/config.go @@ -11,6 +11,7 @@ import ( "strings" "time" + mempoolcfg "github.com/sei-protocol/sei-chain/sei-tendermint/internal/mempool" tmos "github.com/sei-protocol/sei-chain/sei-tendermint/libs/os" "github.com/sei-protocol/sei-chain/sei-tendermint/types" ) @@ -857,38 +858,60 @@ type MempoolConfig struct { DropPriorityReservoirSize int `mapstructure:"drop-priority-reservoir-size"` } +func (cfg *MempoolConfig) ToMempoolConfig() *mempoolcfg.Config { + return &mempoolcfg.Config{ + Size: cfg.Size, + MaxTxsBytes: cfg.MaxTxsBytes, + CacheSize: cfg.CacheSize, + DuplicateTxsCacheSize: cfg.DuplicateTxsCacheSize, + KeepInvalidTxsInCache: cfg.KeepInvalidTxsInCache, + MaxTxBytes: cfg.MaxTxBytes, + TTLDuration: cfg.TTLDuration, + TTLNumBlocks: cfg.TTLNumBlocks, + TxNotifyThreshold: cfg.TxNotifyThreshold, + PendingSize: cfg.PendingSize, + MaxPendingTxsBytes: cfg.MaxPendingTxsBytes, + RemoveExpiredTxsFromQueue: cfg.RemoveExpiredTxsFromQueue, + DropPriorityThreshold: cfg.DropPriorityThreshold, + DropUtilisationThreshold: cfg.DropUtilisationThreshold, + DropPriorityReservoirSize: cfg.DropPriorityReservoirSize, + } +} + // DefaultMempoolConfig returns a default configuration for the Tendermint mempool. func DefaultMempoolConfig() *MempoolConfig { + cfg := mempoolcfg.DefaultConfig() return &MempoolConfig{ - Broadcast: true, - // Each signature verification takes .5ms, Size reduced until we implement - // ABCI Recheck - Size: 5000, - MaxTxsBytes: 1024 * 1024 * 1024, // 1GB - CacheSize: 10000, - DuplicateTxsCacheSize: 100000, - MaxTxBytes: 1024 * 1024, // 1MB - TTLDuration: 5 * time.Second, // prevent stale txs from filling mempool - TTLNumBlocks: 10, // remove txs after 10 blocks - TxNotifyThreshold: 0, + Broadcast: true, + Size: cfg.Size, + MaxTxsBytes: cfg.MaxTxsBytes, + CacheSize: cfg.CacheSize, + DuplicateTxsCacheSize: cfg.DuplicateTxsCacheSize, + KeepInvalidTxsInCache: cfg.KeepInvalidTxsInCache, + MaxTxBytes: cfg.MaxTxBytes, + MaxBatchBytes: 0, + TTLDuration: cfg.TTLDuration, + TTLNumBlocks: cfg.TTLNumBlocks, + TxNotifyThreshold: cfg.TxNotifyThreshold, CheckTxErrorBlacklistEnabled: true, CheckTxErrorThreshold: 50, - PendingSize: 5000, - MaxPendingTxsBytes: 1024 * 1024 * 1024, // 1GB - PendingTTLDuration: 0 * time.Second, + PendingSize: cfg.PendingSize, + MaxPendingTxsBytes: cfg.MaxPendingTxsBytes, + PendingTTLDuration: 0, PendingTTLNumBlocks: 0, - RemoveExpiredTxsFromQueue: true, - DropPriorityThreshold: 0.1, - DropUtilisationThreshold: 1.0, - DropPriorityReservoirSize: 10_240, + RemoveExpiredTxsFromQueue: cfg.RemoveExpiredTxsFromQueue, + DropPriorityThreshold: cfg.DropPriorityThreshold, + DropUtilisationThreshold: cfg.DropUtilisationThreshold, + DropPriorityReservoirSize: cfg.DropPriorityReservoirSize, } } // TestMempoolConfig returns a configuration for testing the Tendermint mempool func TestMempoolConfig() *MempoolConfig { cfg := DefaultMempoolConfig() - cfg.CacheSize = 1000 - cfg.DropUtilisationThreshold = 0.0 + testCfg := mempoolcfg.TestConfig() + cfg.CacheSize = testCfg.CacheSize + cfg.DropUtilisationThreshold = testCfg.DropUtilisationThreshold return cfg } diff --git a/sei-tendermint/internal/autobahn/producer/state.go b/sei-tendermint/internal/autobahn/producer/state.go index e39786b832..649c079bd8 100644 --- a/sei-tendermint/internal/autobahn/producer/state.go +++ b/sei-tendermint/internal/autobahn/producer/state.go @@ -6,8 +6,8 @@ import ( "time" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/consensus" - "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/pb" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/types" + "github.com/sei-protocol/sei-chain/sei-tendermint/internal/mempool" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils/scope" "golang.org/x/time/rate" @@ -32,19 +32,18 @@ func (c *Config) maxTxsPerBlock() uint64 { // State is the block producer state. type State struct { - cfg *Config - // channel of transactions to build the next block from. - mempool chan *pb.Transaction + cfg *Config + txMempool *mempool.TxMempool // consensus state to which published blocks will be reported. consensus *consensus.State } // NewState constructs a new block producer state. // Returns an error if the current node is NOT a producer. -func NewState(cfg *Config, consensus *consensus.State) *State { +func NewState(cfg *Config, txMempool *mempool.TxMempool, consensus *consensus.State) *State { return &State{ cfg: cfg, - mempool: make(chan *pb.Transaction, cfg.MempoolSize), + txMempool: txMempool, consensus: consensus, } } @@ -54,22 +53,37 @@ func NewState(cfg *Config, consensus *consensus.State) *State { func (s *State) makePayload(ctx context.Context) *types.Payload { ctx, cancel := context.WithTimeout(ctx, s.cfg.BlockInterval) defer cancel() - maxTxs := s.cfg.maxTxsPerBlock() - var totalGas uint64 - var txs [][]byte - for totalGas < s.cfg.MaxGasPerBlock && uint64(len(txs)) < maxTxs { - tx, err := utils.Recv(ctx, s.mempool) - if err != nil { - break + + if s.txMempool.NumTxsNotPending() == 0 { + select { + case <-ctx.Done(): + case <-s.txMempool.TxsAvailable(): } - txs = append(txs, tx.Payload) - totalGas += tx.GasUsed } - return types.PayloadBuilder{ + + txs, gasEstimated := s.txMempool.PopTxs(mempool.ReapLimits{ + MaxTxs: utils.Some(min(types.MaxTxsPerBlock, s.cfg.maxTxsPerBlock())), + MaxBytes: utils.Some(utils.Clamp[int64](types.MaxTxsBytesPerBlock)), + MaxGasWanted: utils.Some(utils.Clamp[int64](s.cfg.MaxGasPerBlock)), + MaxGasEstimated: utils.Some(utils.Clamp[int64](s.cfg.MaxGasPerBlock)), + }) + payloadTxs := make([][]byte, 0, len(txs)) + for _, tx := range txs { + payloadTxs = append(payloadTxs, tx) + } + payload, err := types.PayloadBuilder{ CreatedAt: time.Now(), - TotalGas: totalGas, - Txs: txs, + // TODO: ReapMaxTxsBytesMaxGas does not handle corner cases correctly rn, which actually + // can produce negative total gas. Fixing it right away might be backward incompatible afaict, + // so we leave it as is for now. + TotalGas: uint64(gasEstimated), // nolint:gosec + Txs: payloadTxs, }.Build() + // This should never happen: we construct the payload from correctly sized data. + if err != nil { + panic(fmt.Errorf("PayloadBuilder{}.Build(): %w", err)) + } + return payload } // nextPayload constructs the payload for the next block. @@ -86,11 +100,6 @@ func (s *State) nextPayload(ctx context.Context) (*types.Payload, error) { } } -// PushToMempool pushes the transaction to the mempool. -func (s *State) PushToMempool(ctx context.Context, tx *pb.Transaction) error { - return utils.Send(ctx, s.mempool, tx) -} - // Run runs the background tasks of the producer state. func (s *State) Run(ctx context.Context) error { return scope.Run(ctx, func(ctx context.Context, scope scope.Scope) error { diff --git a/sei-tendermint/internal/autobahn/types/block.go b/sei-tendermint/internal/autobahn/types/block.go index 6a871fd7bd..8f39301bb9 100644 --- a/sei-tendermint/internal/autobahn/types/block.go +++ b/sei-tendermint/internal/autobahn/types/block.go @@ -72,6 +72,30 @@ func (h *BlockHeader) Verify(c *Committee) error { return nil } +const standardTxBytes uint64 = 1024 + +// Maximum number of transactions in a block. +const MaxTxsPerBlock uint64 = 2000 + +// Maximum total size of all the transactions. +// It can be split arbitrarily across transactions (1 large, 2000 small ones, etc.) +// up to MaxTxsPerBlock limit. +const MaxTxsBytesPerBlock = MaxTxsPerBlock * standardTxBytes + +// Upper bound on the block proto encoding. +var MaxBlockProtoSize = func() uint64 { + // Payload.Txs represents the variable part of the Block size. + // Proto size is maximized if we distribute data evenly across transactions. + tx := make([]byte, standardTxBytes) + txs := make([][]byte, MaxTxsPerBlock) + for i := range txs { + txs[i] = tx + } + // Crude estimate of all other fields. + const otherFields = 100 * 1024 + return otherFields + uint64(protoutils.Size(&pb.Block{Payload: &pb.Payload{Txs: txs}})) +}() + // Block . type Block struct { utils.ReadOnly @@ -149,7 +173,19 @@ type Payload struct { } // Build builds the Payload. -func (b PayloadBuilder) Build() *Payload { return &Payload{p: b} } +func (b PayloadBuilder) Build() (*Payload, error) { + if uint64(len(b.Txs)) > MaxTxsPerBlock { + return nil, fmt.Errorf("too many transactions") + } + total := uint64(0) + for _, tx := range b.Txs { + total += uint64(len(tx)) + } + if total > MaxTxsBytesPerBlock { + return nil, fmt.Errorf("total txs bytes too large") + } + return &Payload{p: b}, nil +} // ToBuilder converts the Payload to a PayloadBuilder. func (p *Payload) ToBuilder() PayloadBuilder { return p.p } @@ -245,7 +281,7 @@ var PayloadConv = protoutils.Conv[*Payload, *pb.Payload]{ Coinbase: p.Coinbase, Basefee: *p.Basefee, Txs: p.Txs, - }.Build(), nil + }.Build() }, } diff --git a/sei-tendermint/internal/autobahn/types/testonly.go b/sei-tendermint/internal/autobahn/types/testonly.go index 9a920dbe94..11a126d659 100644 --- a/sei-tendermint/internal/autobahn/types/testonly.go +++ b/sei-tendermint/internal/autobahn/types/testonly.go @@ -90,14 +90,14 @@ func GenBlockHeader(rng utils.Rng) *BlockHeader { // GenPayload generates a random Payload. func GenPayload(rng utils.Rng) *Payload { - return PayloadBuilder{ + return utils.OrPanic1(PayloadBuilder{ CreatedAt: utils.GenTimestamp(rng), TotalGas: rng.Uint64(), EdgeCount: rng.Int63(), Coinbase: utils.GenBytes(rng, 10), Basefee: rng.Int63(), Txs: utils.GenSlice(rng, func(rng utils.Rng) []byte { return utils.GenBytes(rng, 10) }), - }.Build() + }.Build()) } // GenBlock generates a random Block. diff --git a/sei-tendermint/internal/blocksync/reactor_test.go b/sei-tendermint/internal/blocksync/reactor_test.go index ef63c01e23..41eaa385b0 100644 --- a/sei-tendermint/internal/blocksync/reactor_test.go +++ b/sei-tendermint/internal/blocksync/reactor_test.go @@ -93,7 +93,7 @@ func makeReactor( state, err := sm.MakeGenesisState(genDoc) require.NoError(t, err) require.NoError(t, stateStore.Save(state)) - mp := mempool.NewTxMempool(config.TestMempoolConfig(), app, mempool.NopMetrics(), mempool.NopTxConstraintsFetcher) + mp := mempool.NewTxMempool(mempool.TestConfig(), app, mempool.NopMetrics(), mempool.NopTxConstraintsFetcher) eventbus := eventbus.NewDefault() require.NoError(t, eventbus.Start(ctx)) diff --git a/sei-tendermint/internal/consensus/common_test.go b/sei-tendermint/internal/consensus/common_test.go index da97918b42..76365cb628 100644 --- a/sei-tendermint/internal/consensus/common_test.go +++ b/sei-tendermint/internal/consensus/common_test.go @@ -465,7 +465,7 @@ func newStateWithConfigAndBlockStore( // Make Mempool mempool := mempool.NewTxMempool( - thisConfig.Mempool, + thisConfig.Mempool.ToMempoolConfig(), proxyAppConnMem, mempool.NopMetrics(), mempool.NopTxConstraintsFetcher, diff --git a/sei-tendermint/internal/consensus/mempool_test.go b/sei-tendermint/internal/consensus/mempool_test.go index 17f66d2420..513ae8e77a 100644 --- a/sei-tendermint/internal/consensus/mempool_test.go +++ b/sei-tendermint/internal/consensus/mempool_test.go @@ -20,6 +20,7 @@ import ( sm "github.com/sei-protocol/sei-chain/sei-tendermint/internal/state" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/store" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/test/factory" + "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" "github.com/sei-protocol/sei-chain/sei-tendermint/types" ) @@ -247,7 +248,7 @@ func TestMempoolRmBadTx(t *testing.T) { // check for the tx for { - txs := cs.txMempool.ReapMaxBytesMaxGas(int64(len(txBytes)), -1, -1) + txs := cs.txMempool.ReapMaxBytesMaxGas(int64(len(txBytes)), utils.Max[int64](), utils.Max[int64]()) if len(txs) == 0 { emptyMempoolCh <- struct{}{} return diff --git a/sei-tendermint/internal/consensus/reactor_test.go b/sei-tendermint/internal/consensus/reactor_test.go index 4c1cffd5ed..05f18b2774 100644 --- a/sei-tendermint/internal/consensus/reactor_test.go +++ b/sei-tendermint/internal/consensus/reactor_test.go @@ -273,7 +273,7 @@ func TestReactorWithEvidence(t *testing.T) { proxyAppConnCon := app mempool := mempool.NewTxMempool( - thisConfig.Mempool, + thisConfig.Mempool.ToMempoolConfig(), proxyAppConnMem, mempool.NopMetrics(), mempool.NopTxConstraintsFetcher, diff --git a/sei-tendermint/internal/consensus/replay.go b/sei-tendermint/internal/consensus/replay.go index f4d8040b78..bb4895ca4d 100644 --- a/sei-tendermint/internal/consensus/replay.go +++ b/sei-tendermint/internal/consensus/replay.go @@ -135,7 +135,7 @@ func NewHandshaker( } func newReplayTxMempool(appClient abci.Application) *mempool.TxMempool { - return mempool.NewTxMempool(config.DefaultMempoolConfig(), appClient, mempool.NopMetrics(), mempool.NopTxConstraintsFetcher) + return mempool.NewTxMempool(config.DefaultMempoolConfig().ToMempoolConfig(), appClient, mempool.NopMetrics(), mempool.NopTxConstraintsFetcher) } // NBlocks returns the number of blocks applied to the state. diff --git a/sei-tendermint/internal/mempool/mempool.go b/sei-tendermint/internal/mempool/mempool.go index d7aaf14fe1..20416b2c15 100644 --- a/sei-tendermint/internal/mempool/mempool.go +++ b/sei-tendermint/internal/mempool/mempool.go @@ -12,19 +12,21 @@ import ( "time" abci "github.com/sei-protocol/sei-chain/sei-tendermint/abci/types" - "github.com/sei-protocol/sei-chain/sei-tendermint/config" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/libs/clist" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/libs/reservoir" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils/scope" "github.com/sei-protocol/sei-chain/sei-tendermint/types" + "github.com/sei-protocol/seilog" ) -// errTxInCache is returned to the client if we saw tx earlier. -var errTxInCache = errors.New("tx already exists in cache") +var logger = seilog.NewLogger("tendermint", "internal", "mempool") -// errTxTooLarge defines an error when a transaction is too big to be sent to peers. -var errTxTooLarge = errors.New("tx too large") +// ErrTxInCache is returned to the client if we saw tx earlier. +var ErrTxInCache = errors.New("tx already exists in cache") + +// ErrTxTooLarge defines an error when a transaction is too big to be sent to peers. +var ErrTxTooLarge = errors.New("tx too large") // Using SHA-256 truncated to 128 bits as the cache key: At 2K tx/sec, the // collision probability is effectively zero (≈10^-29 for 120K keys in a minute, @@ -43,13 +45,127 @@ const ( MinGasEVMTx = 21000 ) +type Config struct { + // Maximum number of transactions in the mempool + Size int + + // Limit the total size of all txs in the mempool. + // This only accounts for raw transactions (e.g. given 1MB transactions and + // max-txs-bytes=5MB, mempool will only accept 5 transactions). + MaxTxsBytes int64 + + // Size of the cache (used to filter transactions we saw earlier) in transactions + CacheSize int + + // Size of the duplicate cache used to track duplicate txs + DuplicateTxsCacheSize int + + // Do not remove invalid transactions from the cache (default: false) + // Set to true if it's not possible for any invalid transaction to become + // valid again in the future. + KeepInvalidTxsInCache bool + + // Maximum size of a single transaction + // NOTE: the max size of a tx transmitted over the network is {max-tx-bytes}. + MaxTxBytes int + + // TTLDuration, if non-zero, defines the maximum amount of time a transaction + // can exist for in the mempool. + // + // Note, if TTLNumBlocks is also defined, a transaction will be removed if it + // has existed in the mempool at least TTLNumBlocks number of blocks or if it's + // insertion time into the mempool is beyond TTLDuration. + TTLDuration time.Duration + + // TTLNumBlocks, if non-zero, defines the maximum number of blocks a transaction + // can exist for in the mempool. + // + // Note, if TTLDuration is also defined, a transaction will be removed if it + // has existed in the mempool at least TTLNumBlocks number of blocks or if + // it's insertion time into the mempool is beyond TTLDuration. + TTLNumBlocks int64 + + // TxNotifyThreshold, if non-zero, defines the minimum number of transactions + // needed to trigger a notification in mempool's Tx notifier + TxNotifyThreshold uint64 + + // Maximum number of transactions in the pending set + PendingSize int + + // Limit the total size of all txs in the pending set. + MaxPendingTxsBytes int64 + + RemoveExpiredTxsFromQueue bool + + // DropPriorityThreshold defines the percentage of transactions with the lowest + // priority hint (expressed as a float in the range [0.0, 1.0]) that will be + // dropped from the mempool once the configured utilisation threshold is reached. + // + // The default value of 0.1 means that the lowest 10% of transactions by + // priority will be dropped when the mempool utilisation exceeds the + // DropUtilisationThreshold. + // + // See DropUtilisationThreshold. + DropPriorityThreshold float64 + + // DropUtilisationThreshold defines the mempool utilisation level (expressed as + // a percentage in the range [0.0, 1.0]) above which transactions will be + // selectively dropped based on their priority hint. + // + // For example, if this parameter is set to 0.8, then once the mempool reaches + // 80% capacity, transactions with priority hints below DropPriorityThreshold + // percentile will be dropped to make room for new transactions. + DropUtilisationThreshold float64 + + // DropPriorityReservoirSize defines the size of the reservoir for keeping track + // of the distribution of transaction priorities in the mempool. + // + // This is used to determine the priority threshold below which transactions will + // be dropped when the mempool utilisation exceeds DropUtilisationThreshold. + // + // The reservoir is a statistically representative sample of transaction + // priorities in the mempool, and is used to estimate the priority distribution + // without needing to store all transaction priorities. + // + // A larger reservoir size will yield a more accurate estimate of the priority + // distribution, but will consume more memory. + // + // The default value of 10,240 is a reasonable compromise between accuracy and + // memory usage for most use cases. It takes approximately 80KB of memory storing + // int64 transaction priorities. + // + // See DropUtilisationThreshold and DropPriorityThreshold. + DropPriorityReservoirSize int `mapstructure:"drop-priority-reservoir-size"` +} + +func DefaultConfig() *Config { + return &Config{ + // Each signature verification takes .5ms, Size reduced until we implement + // ABCI Recheck + Size: 5000, + MaxTxsBytes: 1024 * 1024 * 1024, // 1GB + CacheSize: 10000, + DuplicateTxsCacheSize: 100000, + MaxTxBytes: 1024 * 1024, // 1MB + TTLDuration: 5 * time.Second, // prevent stale txs from filling mempool + TTLNumBlocks: 10, // remove txs after 10 blocks + TxNotifyThreshold: 0, + PendingSize: 5000, + MaxPendingTxsBytes: 1024 * 1024 * 1024, // 1GB + RemoveExpiredTxsFromQueue: true, + DropPriorityThreshold: 0.1, + DropUtilisationThreshold: 1.0, + DropPriorityReservoirSize: 10_240, + } +} + // TxMempool defines a prioritized mempool data structure used by the v1 mempool // reactor. It keeps a thread-safe priority queue of transactions that is used // when a block proposer constructs a block and a thread-safe linked-list that // is used to gossip transactions to peers in a FIFO manner. type TxMempool struct { metrics *Metrics - config *config.MempoolConfig + config *Config app abci.Application // txsAvailable fires once for each height when the mempool is not empty @@ -123,7 +239,7 @@ type TxMempool struct { } func NewTxMempool( - cfg *config.MempoolConfig, + cfg *Config, app abci.Application, metrics *Metrics, txConstraintsFetcher TxConstraintsFetcher, @@ -158,6 +274,10 @@ func NewTxMempool( return txmp } +func (txmp *TxMempool) Config() *Config { return txmp.config } + +func (txmp *TxMempool) App() abci.Application { return txmp.app } + func (txmp *TxMempool) TxStore() *TxStore { return txmp.txStore } // Lock obtains a write-lock on the mempool. A caller must be sure to explicitly @@ -204,14 +324,14 @@ func (txmp *TxMempool) SizeBytes() int64 { return atomic.LoadInt64(&txmp.sizeByt func (txmp *TxMempool) PendingSizeBytes() int64 { return atomic.LoadInt64(&txmp.pendingSizeBytes) } -// WaitForNextTx returns a blocking channel that will be closed when the next -// valid transaction is available to gossip. It is thread-safe. -func (txmp *TxMempool) WaitForNextTx() <-chan struct{} { return txmp.gossipIndex.WaitChan() } - -// NextGossipTx returns the next valid transaction to gossip. A caller must wait -// for WaitForNextTx to signal a transaction is available to gossip first. It is -// thread-safe. -func (txmp *TxMempool) NextGossipTx() *clist.CElement { return txmp.gossipIndex.Front() } +// WaitForNextTx waits until the next transaction is available for gossip. +// Returns the next valid transaction to gossip. +func (txmp *TxMempool) WaitForNextTx(ctx context.Context) (*clist.CElement, error) { + if _, _, err := utils.RecvOrClosed(ctx, txmp.gossipIndex.WaitChan()); err != nil { + return nil, err + } + return txmp.gossipIndex.Front(), nil +} // TxsAvailable returns a channel which fires once for every height, and only // when transactions are available in the mempool. It is thread-safe. @@ -269,14 +389,14 @@ func (txmp *TxMempool) CheckTx( defer txmp.mtx.RUnlock() if txSize := len(tx); txSize > txmp.config.MaxTxBytes { - return fmt.Errorf("%w: max size is %d, but got %d", errTxTooLarge, txmp.config.MaxTxBytes, txSize) + return fmt.Errorf("%w: max size is %d, but got %d", ErrTxTooLarge, txmp.config.MaxTxBytes, txSize) } constraints, err := txmp.txConstraintsFetcher() if err != nil { return fmt.Errorf("txmp.txConstraintsFetcher(): %w", err) } if txSize := types.ComputeProtoSizeForTxs([]types.Tx{tx}); txSize > constraints.MaxDataBytes { - return fmt.Errorf("%w: tx size is too big: %d, max: %d", errTxTooLarge, txSize, constraints.MaxDataBytes) + return fmt.Errorf("%w: tx size is too big: %d, max: %d", ErrTxTooLarge, txSize, constraints.MaxDataBytes) } // Reject low priority transactions when the mempool is more than @@ -305,7 +425,7 @@ func (txmp *TxMempool) CheckTx( // check if we've seen this transaction and error if we have. if !txmp.cache.Push(txHash) { txmp.txStore.GetOrSetPeerByTxHash(txHash, txInfo.SenderID) - return errTxInCache + return ErrTxInCache } txmp.metrics.CacheSize.Set(float64(txmp.cache.Size())) @@ -482,7 +602,7 @@ func (txmp *TxMempool) Flush() { // and gas constraints. The returned list starts with EVM transactions (in priority order), // followed by non-EVM transactions (in priority order). // There are 4 types of constraints. -// 1. maxBytes - stops pulling txs from mempool once maxBytes is hit. Can be set to -1 to be ignored. +// 1. maxBytes - stops pulling txs from mempool once maxBytes is hit. // 2. maxGasWanted - stops pulling txs from mempool once total gas wanted exceeds maxGasWanted. // Can be set to -1 to be ignored. // 3. maxGasEstimated - similar to maxGasWanted but will use the estimated gas used for EVM txs @@ -494,18 +614,53 @@ func (txmp *TxMempool) Flush() { func (txmp *TxMempool) ReapMaxBytesMaxGas(maxBytes, maxGasWanted, maxGasEstimated int64) types.Txs { txmp.mtx.Lock() defer txmp.mtx.Unlock() + txs, _ := txmp.reapTxs(ReapLimits{ + MaxBytes: utils.Some(maxBytes), + MaxGasWanted: utils.Some(maxGasWanted), + MaxGasEstimated: utils.Some(maxGasEstimated), + }) + return txs +} +type ReapLimits struct { + MaxTxs utils.Option[uint64] + MaxBytes utils.Option[int64] + MaxGasWanted utils.Option[int64] + MaxGasEstimated utils.Option[int64] +} + +// ReapMaxTxsBytesMaxGas returns a list of transactions within the provided tx, +// byte, and gas constraints together with the total estimated gas for the +// returned transactions. +// +// NOTE: Gas limits are enforced using int64 running totals. If those totals +// overflow, gas limit enforcement no longer works correctly. This preserves the +// historical behavior for backward compatibility. +func (txmp *TxMempool) reapTxs(l ReapLimits) (types.Txs, int64) { + maxTxs := l.MaxTxs.Or(utils.Max[uint64]()) + maxBytes := l.MaxBytes.Or(utils.Max[int64]()) + maxGasWanted := l.MaxGasWanted.Or(utils.Max[int64]()) + maxGasEstimated := l.MaxGasEstimated.Or(utils.Max[int64]()) + if maxBytes < 0 { + maxBytes = utils.Max[int64]() + } + if maxGasWanted < 0 { + maxGasWanted = utils.Max[int64]() + } + if maxGasEstimated < 0 { + maxGasEstimated = utils.Max[int64]() + } var ( totalGasWanted int64 totalGasEstimated int64 totalSize int64 ) - numTxs := 0 + numTxs := uint64(0) encounteredGasUnfit := false if uint64(txmp.NumTxsNotPending()) < txmp.config.TxNotifyThreshold { //nolint:gosec // NumTxsNotPending returns non-negative value // do not reap anything if threshold is not met - return []types.Tx{} + return []types.Tx{}, 0 } totalTxs := txmp.priorityIndex.NumTxs() evmTxs := make([]types.Tx, 0, totalTxs) @@ -514,7 +669,7 @@ func (txmp *TxMempool) ReapMaxBytesMaxGas(maxBytes, maxGasWanted, maxGasEstimate size := types.ComputeProtoSizeForTxs([]types.Tx{wtx.tx}) // bytes limit is a hard stop - if maxBytes > -1 && totalSize+size > maxBytes { + if totalSize+size > maxBytes || numTxs+1 > maxTxs { return false } @@ -531,8 +686,8 @@ func (txmp *TxMempool) ReapMaxBytesMaxGas(maxBytes, maxGasWanted, maxGasEstimate prospectiveGasWanted := totalGasWanted + wtx.gasWanted prospectiveGasEstimated := totalGasEstimated + txGasEstimate - maxGasWantedExceeded := maxGasWanted > -1 && prospectiveGasWanted > maxGasWanted - maxGasEstimatedExceeded := maxGasEstimated > -1 && prospectiveGasEstimated > maxGasEstimated + maxGasWantedExceeded := prospectiveGasWanted > maxGasWanted + maxGasEstimatedExceeded := prospectiveGasEstimated > maxGasEstimated if maxGasWantedExceeded || maxGasEstimatedExceeded { // skip this unfit-by-gas tx once and attempt to pull up to 10 smaller ones @@ -544,6 +699,7 @@ func (txmp *TxMempool) ReapMaxBytesMaxGas(maxBytes, maxGasWanted, maxGasEstimate } // include tx and update totals + numTxs += 1 totalSize += size totalGasWanted = prospectiveGasWanted totalGasEstimated = prospectiveGasEstimated @@ -553,14 +709,26 @@ func (txmp *TxMempool) ReapMaxBytesMaxGas(maxBytes, maxGasWanted, maxGasEstimate } else { nonEvmTxs = append(nonEvmTxs, wtx.tx) } - numTxs++ if encounteredGasUnfit && numTxs >= MinTxsToPeek { return false } return true }) - return append(evmTxs, nonEvmTxs...) + return append(evmTxs, nonEvmTxs...), totalGasEstimated +} + +// RemoveTxs removes the provided transactions from the mempool if present. +func (txmp *TxMempool) PopTxs(l ReapLimits) (types.Txs, int64) { + txmp.Lock() + defer txmp.Unlock() + txs, gasEstimated := txmp.reapTxs(l) + for _, tx := range txs { + if wtx := txmp.txStore.GetTxByHash(tx.Key()); wtx != nil { + txmp.removeTx(wtx, false, false, true) + } + } + return txs, gasEstimated } // ReapMaxTxs returns a list of transactions within the provided number of diff --git a/sei-tendermint/internal/mempool/mempool_test.go b/sei-tendermint/internal/mempool/mempool_test.go index 3f3225b200..bcb62c9314 100644 --- a/sei-tendermint/internal/mempool/mempool_test.go +++ b/sei-tendermint/internal/mempool/mempool_test.go @@ -5,7 +5,6 @@ import ( "context" "fmt" "math/rand" - "os" "sort" "strconv" "strings" @@ -18,7 +17,7 @@ import ( "github.com/sei-protocol/sei-chain/sei-tendermint/abci/example/code" "github.com/sei-protocol/sei-chain/sei-tendermint/abci/example/kvstore" abci "github.com/sei-protocol/sei-chain/sei-tendermint/abci/types" - "github.com/sei-protocol/sei-chain/sei-tendermint/config" + "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" "github.com/sei-protocol/sei-chain/sei-tendermint/types" ) @@ -172,14 +171,9 @@ func (app *application) GetTxPriorityHint(context.Context, *abci.RequestGetTxPri func setup(t testing.TB, app abci.Application, cacheSize int, txConstraintsFetcher TxConstraintsFetcher) *TxMempool { t.Helper() - cfg, err := config.ResetTestRoot(t.TempDir(), strings.ReplaceAll(t.Name(), "/", "|")) - require.NoError(t, err) - cfg.Mempool.CacheSize = cacheSize - cfg.Mempool.DropUtilisationThreshold = 0.0 // disable dropping by priority hint to allow testing eviction logic - - t.Cleanup(func() { os.RemoveAll(cfg.RootDir) }) - - return NewTxMempool(cfg.Mempool, app, NopMetrics(), txConstraintsFetcher) + cfg := TestConfig() + cfg.CacheSize = cacheSize + return NewTxMempool(cfg, app, NopMetrics(), txConstraintsFetcher) } func checkTxs(ctx context.Context, t *testing.T, txmp *TxMempool, numTxs int, peerID uint16) []testTx { @@ -370,7 +364,7 @@ func TestTxMempool_ReapMaxBytesMaxGas(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - reapedTxs := txmp.ReapMaxBytesMaxGas(-1, 50, -1) + reapedTxs := txmp.ReapMaxBytesMaxGas(utils.Max[int64](), 50, utils.Max[int64]()) ensurePrioritized(reapedTxs) require.Equal(t, len(tTxs), txmp.Size()) require.Equal(t, int64(5690), txmp.SizeBytes()) @@ -381,7 +375,7 @@ func TestTxMempool_ReapMaxBytesMaxGas(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - reapedTxs := txmp.ReapMaxBytesMaxGas(1000, -1, -1) + reapedTxs := txmp.ReapMaxBytesMaxGas(1000, utils.Max[int64](), utils.Max[int64]()) ensurePrioritized(reapedTxs) require.Equal(t, len(tTxs), txmp.Size()) require.Equal(t, int64(5690), txmp.SizeBytes()) @@ -393,7 +387,7 @@ func TestTxMempool_ReapMaxBytesMaxGas(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - reapedTxs := txmp.ReapMaxBytesMaxGas(1500, 30, -1) + reapedTxs := txmp.ReapMaxBytesMaxGas(1500, 30, utils.Max[int64]()) ensurePrioritized(reapedTxs) require.Equal(t, len(tTxs), txmp.Size()) require.Equal(t, int64(5690), txmp.SizeBytes()) @@ -404,7 +398,7 @@ func TestTxMempool_ReapMaxBytesMaxGas(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - reapedTxs := txmp.ReapMaxBytesMaxGas(-1, 2, -1) + reapedTxs := txmp.ReapMaxBytesMaxGas(utils.Max[int64](), 2, utils.Max[int64]()) ensurePrioritized(reapedTxs) require.Equal(t, len(tTxs), txmp.Size()) require.Len(t, reapedTxs, 2) @@ -414,7 +408,7 @@ func TestTxMempool_ReapMaxBytesMaxGas(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - reapedTxs := txmp.ReapMaxBytesMaxGas(-1, -1, 50) + reapedTxs := txmp.ReapMaxBytesMaxGas(utils.Max[int64](), utils.Max[int64](), 50) ensurePrioritized(reapedTxs) require.Equal(t, len(tTxs), txmp.Size()) require.Len(t, reapedTxs, 50) @@ -458,7 +452,7 @@ func TestTxMempool_ReapMaxBytesMaxGas_FallbackToGasWanted(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - reapedTxs := txmp.ReapMaxBytesMaxGas(-1, -1, 50) + reapedTxs := txmp.ReapMaxBytesMaxGas(utils.Max[int64](), utils.Max[int64](), 50) ensurePrioritized(reapedTxs) require.Equal(t, len(tTxs), txmp.Size()) require.Len(t, reapedTxs, 50) @@ -504,7 +498,7 @@ func TestTxMempool_ReapMaxTxs(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - reapedTxs := txmp.ReapMaxTxs(-1) + reapedTxs := txmp.ReapMaxTxs(utils.Max[int]()) ensurePrioritized(reapedTxs) require.Equal(t, len(tTxs), txmp.Size()) require.Equal(t, int64(5690), txmp.SizeBytes()) @@ -554,7 +548,7 @@ func TestTxMempool_ReapMaxBytesMaxGas_MinGasEVMTxThreshold(t *testing.T) { // With MinGasEVMTx=21000, estimatedGas (10000) is ignored and we fallback to gasWanted (50000). // Setting maxGasEstimated below gasWanted should therefore result in 0 reaped txs. - reaped := txmp.ReapMaxBytesMaxGas(-1, -1, 40000) + reaped := txmp.ReapMaxBytesMaxGas(utils.Max[int64](), utils.Max[int64](), 40000) require.Len(t, reaped, 0) // Note: If MinGasEVMTx is changed to 0, the same scenario would use estimatedGas (10000) @@ -609,7 +603,7 @@ func TestTxMempool_Reap_SkipGasUnfitAndCollectMinTxs(t *testing.T) { } // Reap with a maxGasEstimated that makes the first tx unfit but allows many small txs - reaped := txmp.ReapMaxBytesMaxGas(-1, -1, 50) + reaped := txmp.ReapMaxBytesMaxGas(utils.Max[int64](), utils.Max[int64](), 50) require.Len(t, reaped, MinTxsToPeek) // Ensure all reaped small txs are under gas constraint @@ -646,7 +640,7 @@ func TestTxMempool_Reap_SkipGasUnfitStopsAtMinEvenWithCapacity(t *testing.T) { } // Make the gas limit very small so the first (big) tx is unfit and we only collect MinTxsPerBlock - reaped := txmp.ReapMaxBytesMaxGas(-1, -1, 10) + reaped := txmp.ReapMaxBytesMaxGas(utils.Max[int64](), utils.Max[int64](), 10) require.Len(t, reaped, MinTxsToPeek) } @@ -1039,7 +1033,7 @@ func TestReapMaxBytesMaxGas_EVMFirst(t *testing.T) { require.Equal(t, 5, txmp.Size()) // Reap all transactions - reapedTxs := txmp.ReapMaxBytesMaxGas(-1, -1, -1) + reapedTxs := txmp.ReapMaxBytesMaxGas(utils.Max[int64](), utils.Max[int64](), utils.Max[int64]()) require.Len(t, reapedTxs, 5) // Verify EVM transactions come first, then non-EVM @@ -1111,7 +1105,7 @@ func TestBlockFailedTxNotReAdmittedAfterSecondFailure(t *testing.T) { // Second failure: tx should remain in cache — CheckTx should reject it err := txmp.CheckTx(ctx, tx, nil, TxInfo{SenderID: 0}) - require.Equal(t, errTxInCache, err) + require.Equal(t, ErrTxInCache, err) require.Equal(t, 0, txmp.Size()) // A different tx (different hash) should still be admitted diff --git a/sei-tendermint/internal/mempool/ids.go b/sei-tendermint/internal/mempool/reactor/ids.go similarity index 90% rename from sei-tendermint/internal/mempool/ids.go rename to sei-tendermint/internal/mempool/reactor/ids.go index 24e96c77fe..b089526fe9 100644 --- a/sei-tendermint/internal/mempool/ids.go +++ b/sei-tendermint/internal/mempool/reactor/ids.go @@ -1,12 +1,16 @@ -package mempool +package reactor import ( "fmt" + "math" "sync" + "github.com/sei-protocol/sei-chain/sei-tendermint/internal/mempool" "github.com/sei-protocol/sei-chain/sei-tendermint/types" ) +const MaxActiveIDs = math.MaxUint16 + type IDs struct { mtx sync.RWMutex peerMap map[types.NodeID]uint16 @@ -19,7 +23,7 @@ func NewMempoolIDs() *IDs { peerMap: make(map[types.NodeID]uint16), // reserve UnknownPeerID for mempoolReactor.BroadcastTx - activeIDs: map[uint16]struct{}{UnknownPeerID: {}}, + activeIDs: map[uint16]struct{}{mempool.UnknownPeerID: {}}, nextID: 1, } } diff --git a/sei-tendermint/internal/mempool/ids_test.go b/sei-tendermint/internal/mempool/reactor/ids_test.go similarity index 99% rename from sei-tendermint/internal/mempool/ids_test.go rename to sei-tendermint/internal/mempool/reactor/ids_test.go index a5e75eeb68..53ad039b07 100644 --- a/sei-tendermint/internal/mempool/ids_test.go +++ b/sei-tendermint/internal/mempool/reactor/ids_test.go @@ -1,4 +1,4 @@ -package mempool +package reactor import ( "testing" diff --git a/sei-tendermint/internal/mempool/reactor.go b/sei-tendermint/internal/mempool/reactor/reactor.go similarity index 50% rename from sei-tendermint/internal/mempool/reactor.go rename to sei-tendermint/internal/mempool/reactor/reactor.go index 764d9c5140..bdc8dc6a88 100644 --- a/sei-tendermint/internal/mempool/reactor.go +++ b/sei-tendermint/internal/mempool/reactor/reactor.go @@ -1,17 +1,17 @@ -package mempool +package reactor import ( "context" "errors" "fmt" "runtime/debug" - "sync" "github.com/sei-protocol/sei-chain/sei-tendermint/config" - "github.com/sei-protocol/sei-chain/sei-tendermint/internal/libs/clist" + "github.com/sei-protocol/sei-chain/sei-tendermint/internal/mempool" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/p2p" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/service" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" + "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils/scope" pb "github.com/sei-protocol/sei-chain/sei-tendermint/proto/tendermint/mempool" "github.com/sei-protocol/sei-chain/sei-tendermint/types" "github.com/sei-protocol/seilog" @@ -23,6 +23,8 @@ var ( _ service.Service = (*Reactor)(nil) ) +const MempoolChannel p2p.ChannelID = 0x30 + // Reactor implements a service that contains mempool of txs that are broadcasted // amongst peers. It maintains a map from peer ID to counter, to prevent gossiping // txs to the peers you received it from. @@ -30,13 +32,11 @@ type Reactor struct { service.BaseService cfg *config.MempoolConfig - mempool *TxMempool + mempool *mempool.TxMempool ids *IDs router *p2p.Router - mtx sync.Mutex - peerRoutines map[types.NodeID]context.CancelFunc failedCheckTxCounts utils.Mutex[map[types.NodeID]int] channel *p2p.Channel[*pb.Message] @@ -44,18 +44,17 @@ type Reactor struct { } // NewReactor returns a reference to a new reactor. -func NewReactor(txmp *TxMempool, router *p2p.Router) (*Reactor, error) { - channel, err := p2p.OpenChannel(router, GetChannelDescriptor(txmp.config)) +func NewReactor(cfg *config.MempoolConfig, txmp *mempool.TxMempool, router *p2p.Router) (*Reactor, error) { + channel, err := p2p.OpenChannel(router, GetChannelDescriptor(cfg)) if err != nil { return nil, fmt.Errorf("router.OpenChannel(): %w", err) } r := &Reactor{ - cfg: txmp.config, + cfg: cfg, mempool: txmp, ids: NewMempoolIDs(), router: router, channel: channel, - peerRoutines: map[types.NodeID]context.CancelFunc{}, failedCheckTxCounts: utils.NewMutex(map[types.NodeID]int{}), readyToStart: make(chan struct{}, 1), } @@ -65,8 +64,8 @@ func NewReactor(txmp *TxMempool, router *p2p.Router) (*Reactor, error) { func (r *Reactor) MarkReadyToStart() { r.readyToStart <- struct{}{} } -// getChannelDescriptor produces an instance of a descriptor for this -// package's required channels. +// GetChannelDescriptor produces an instance of a descriptor for this package's +// required channels. func GetChannelDescriptor(cfg *config.MempoolConfig) p2p.ChannelDescriptor[*pb.Message] { largestTx := make([]byte, cfg.MaxTxBytes) batchMsg := &pb.Message{ @@ -85,21 +84,16 @@ func GetChannelDescriptor(cfg *config.MempoolConfig) p2p.ChannelDescriptor[*pb.M } } -// OnStart starts separate go routines for each p2p Channel and listens for +// OnStart starts separate goroutines for each p2p channel and listens for // envelopes on each. In addition, it also listens for peer updates and handles // messages on that p2p channel accordingly. The caller must be sure to execute -// OnStop to ensure the outbound p2p Channels are closed. +// OnStop to ensure the outbound p2p channels are closed. func (r *Reactor) OnStart(ctx context.Context) error { if !r.cfg.Broadcast { logger.Info("tx broadcasting is disabled") } - - if r.channel == nil { - return errors.New("mempool channel is not set") - } - go r.processMempoolCh(ctx) - go r.processPeerUpdates(ctx) - r.SpawnCritical("mempool", r.mempool.Run) + r.SpawnCritical("processMempoolCh", r.processMempoolCh) + r.SpawnCritical("processPeerUpdates", r.processPeerUpdates) return nil } @@ -119,7 +113,7 @@ func (r *Reactor) handleMempoolMessage(ctx context.Context, m p2p.RecvMsg[*pb.Me } protoTxs := msg.Txs.GetTxs() - txInfo := TxInfo{SenderID: r.ids.GetForPeer(m.From)} + txInfo := mempool.TxInfo{SenderID: r.ids.GetForPeer(m.From)} if len(m.From) != 0 { txInfo.SenderNodeID = m.From } @@ -127,21 +121,16 @@ func (r *Reactor) handleMempoolMessage(ctx context.Context, m p2p.RecvMsg[*pb.Me for _, tx := range protoTxs { if err := r.mempool.CheckTx(ctx, tx, nil, txInfo); err != nil { r.accountFailedCheckTx(m.From, err) - if errors.Is(err, errTxInCache) { - // if the tx is in the cache, - // then we've been gossiped a - // Tx that we've already - // got. Gossip should be - // smarter, but it's not a - // problem. + if errors.Is(err, mempool.ErrTxInCache) { + // If the tx is in the cache, then we've been gossiped a tx + // that we've already got. Gossip should be smarter, but it's + // not a problem. continue } if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { - // Do not propagate context - // cancellation errors, but do - // not continue to check - // transactions from this - // message if we are shutting down. + // Do not propagate context cancellation errors, but do not + // continue to check transactions from this message if we are + // shutting down. return nil } @@ -160,7 +149,7 @@ func (r *Reactor) handleMempoolMessage(ctx context.Context, m p2p.RecvMsg[*pb.Me } func (r *Reactor) accountFailedCheckTx(nodeID types.NodeID, err error) { - if !r.cfg.CheckTxErrorBlacklistEnabled || !errors.Is(err, errTxTooLarge) { + if !r.cfg.CheckTxErrorBlacklistEnabled || !errors.Is(err, mempool.ErrTxTooLarge) { return } for counts := range r.failedCheckTxCounts.Lock() { @@ -174,7 +163,7 @@ func (r *Reactor) accountFailedCheckTx(nodeID types.NodeID, err error) { } } -// handleMessage handles an Envelope sent from a peer on a specific p2p Channel. +// handleMessage handles an envelope sent from a peer on a specific p2p channel. // It will handle errors and any possible panics gracefully. A caller can handle // any error returned by sending a PeerError on the respective channel. func (r *Reactor) handleMessage(ctx context.Context, m p2p.RecvMsg[*pb.Message]) (err error) { @@ -194,13 +183,13 @@ func (r *Reactor) handleMessage(ctx context.Context, m p2p.RecvMsg[*pb.Message]) } // processMempoolCh implements a blocking event loop where we listen for p2p -// Envelope messages from the mempoolCh. -func (r *Reactor) processMempoolCh(ctx context.Context) { +// envelope messages from the mempool channel. +func (r *Reactor) processMempoolCh(ctx context.Context) error { <-r.readyToStart for { m, err := r.channel.Recv(ctx) if err != nil { - return + return err } if err := r.handleMessage(ctx, m); err != nil { r.router.Evict(m.From, fmt.Errorf("mempool: %w", err)) @@ -208,88 +197,52 @@ func (r *Reactor) processMempoolCh(ctx context.Context) { } } -// processPeerUpdate processes a PeerUpdate. For added peers, PeerStatusUp, we -// check if the reactor is running and if we've already started a tx broadcasting -// goroutine or not. If not, we start one for the newly added peer. For down or -// removed peers, we remove the peer from the mempool peer ID set and signal to -// stop the tx broadcasting goroutine. -func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate) { - logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status) - - r.mtx.Lock() - defer r.mtx.Unlock() - - switch peerUpdate.Status { - case p2p.PeerStatusUp: - for counts := range r.failedCheckTxCounts.Lock() { - counts[peerUpdate.NodeID] = 0 - } - - // Do not allow starting new tx broadcast loops after reactor shutdown - // has been initiated. This can happen after we've manually closed all - // peer broadcast, but the router still sends in-flight peer updates. - if !r.IsRunning() { - return - } +// processPeerUpdates initiates a blocking process where we listen for and +// handle PeerUpdate messages. When the reactor is stopped, we will catch the +// signal and close the p2p PeerUpdatesCh gracefully. +func (r *Reactor) processPeerUpdates(ctx context.Context) error { + if !r.cfg.Broadcast { + return nil + } + return scope.Run(ctx, func(ctx context.Context, s scope.Scope) error { + recv := r.router.Subscribe() + peerRoutines := map[types.NodeID]context.CancelFunc{} + for { + update, err := recv.Recv(ctx) + if err != nil { + return err + } + logger.Debug("received peer update", "peer", update.NodeID, "status", update.Status) - if r.cfg.Broadcast { - // Check if we've already started a goroutine for this peer, if not we create - // a new done channel so we can explicitly close the goroutine if the peer - // is later removed, we increment the waitgroup so the reactor can stop - // safely, and finally start the goroutine to broadcast txs to that peer. - _, ok := r.peerRoutines[peerUpdate.NodeID] - if !ok { + switch update.Status { + case p2p.PeerStatusUp: + for counts := range r.failedCheckTxCounts.Lock() { + counts[update.NodeID] = 0 + } pctx, pcancel := context.WithCancel(ctx) - r.peerRoutines[peerUpdate.NodeID] = pcancel - - r.ids.ReserveForPeer(peerUpdate.NodeID) + peerRoutines[update.NodeID] = pcancel + r.ids.ReserveForPeer(update.NodeID) + s.Spawn(func() error { + r.broadcastTxRoutine(pctx, update.NodeID) + return nil + }) - // start a broadcast routine ensuring all txs are forwarded to the peer - go r.broadcastTxRoutine(pctx, peerUpdate.NodeID) + case p2p.PeerStatusDown: + r.ids.Reclaim(update.NodeID) + for counts := range r.failedCheckTxCounts.Lock() { + delete(counts, update.NodeID) + } + peerRoutines[update.NodeID]() + delete(peerRoutines, update.NodeID) } } - - case p2p.PeerStatusDown: - r.ids.Reclaim(peerUpdate.NodeID) - for counts := range r.failedCheckTxCounts.Lock() { - delete(counts, peerUpdate.NodeID) - } - - // Check if we've started a tx broadcasting goroutine for this peer. - // If we have, we signal to terminate the goroutine via the channel's closure. - // This will internally decrement the peer waitgroup and remove the peer - // from the map of peer tx broadcasting goroutines. - closer, ok := r.peerRoutines[peerUpdate.NodeID] - if ok { - closer() - } - } -} - -// processPeerUpdates initiates a blocking process where we listen for and handle -// PeerUpdate messages. When the reactor is stopped, we will catch the signal and -// close the p2p PeerUpdatesCh gracefully. -func (r *Reactor) processPeerUpdates(ctx context.Context) { - recv := r.router.Subscribe() - for { - update, err := recv.Recv(ctx) - if err != nil { - return - } - r.processPeerUpdate(ctx, update) - } + }) } func (r *Reactor) broadcastTxRoutine(ctx context.Context, peerID types.NodeID) { peerMempoolID := r.ids.GetForPeer(peerID) - var nextGossipTx *clist.CElement - - // remove the peer ID from the map of routines and mark the waitgroup as done + // TODO: this function does not call any external code, so panics should not be expected. defer func() { - r.mtx.Lock() - delete(r.peerRoutines, peerID) - r.mtx.Unlock() - if e := recover(); e != nil { logger.Error( "recovering from broadcasting mempool loop", @@ -299,45 +252,32 @@ func (r *Reactor) broadcastTxRoutine(ctx context.Context, peerID types.NodeID) { } }() - for { - if !r.IsRunning() || ctx.Err() != nil { + for ctx.Err() == nil { + nextGossipTx, err := r.mempool.WaitForNextTx(ctx) + if err != nil { return } + for ctx.Err() == nil && nextGossipTx != nil { + memTx := nextGossipTx.Value.(*mempool.WrappedTx) + + if ok := r.mempool.TxStore().TxHasPeer(memTx.Key(), peerMempoolID); !ok { + r.channel.Send(&pb.Message{ + Sum: &pb.Message_Txs{ + Txs: &pb.Txs{Txs: [][]byte{memTx.Tx()}}, + }, + }, peerID) + logger.Debug( + "gossiped tx to peer", + "tx", memTx.Tx().Hash(), + "peer", peerID, + ) + } - // This happens because the CElement we were looking at got garbage - // collected (removed). That is, .NextWait() returned nil. Go ahead and - // start from the beginning. - if nextGossipTx == nil { - select { - case <-ctx.Done(): + if _, _, err := utils.RecvOrClosed(ctx, nextGossipTx.NextWaitChan()); err != nil { return - case <-r.mempool.WaitForNextTx(): // wait until a tx is available - if nextGossipTx = r.mempool.NextGossipTx(); nextGossipTx == nil { - continue - } } - } - - memTx := nextGossipTx.Value.(*WrappedTx) - - // NOTE: Transaction batching was disabled due to: - // https://github.com/tendermint/tendermint/issues/5796 - if ok := r.mempool.txStore.TxHasPeer(memTx.hash, peerMempoolID); !ok { - // Send the mempool tx to the corresponding peer. Note, the peer may be - // behind and thus would not be able to process the mempool tx correctly. - r.channel.Send(&pb.Message{Sum: &pb.Message_Txs{Txs: &pb.Txs{Txs: [][]byte{memTx.tx}}}}, peerID) - logger.Debug( - "gossiped tx to peer", - "tx", memTx.tx.Hash(), - "peer", peerID, - ) - } - - select { - case <-nextGossipTx.NextWaitChan(): + // WARNING: Next() may return nil in case element has been removed. nextGossipTx = nextGossipTx.Next() - case <-ctx.Done(): - return } } } diff --git a/sei-tendermint/internal/mempool/reactor_test.go b/sei-tendermint/internal/mempool/reactor/reactor_test.go similarity index 72% rename from sei-tendermint/internal/mempool/reactor_test.go rename to sei-tendermint/internal/mempool/reactor/reactor_test.go index 4b8b12b141..2506cef75a 100644 --- a/sei-tendermint/internal/mempool/reactor_test.go +++ b/sei-tendermint/internal/mempool/reactor/reactor_test.go @@ -1,8 +1,9 @@ -package mempool +package reactor import ( "context" "fmt" + "math/rand" "os" "runtime" "strings" @@ -16,6 +17,7 @@ import ( "github.com/sei-protocol/sei-chain/sei-tendermint/abci/example/kvstore" abci "github.com/sei-protocol/sei-chain/sei-tendermint/abci/types" "github.com/sei-protocol/sei-chain/sei-tendermint/config" + "github.com/sei-protocol/sei-chain/sei-tendermint/internal/mempool" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/p2p" tmrand "github.com/sei-protocol/sei-chain/sei-tendermint/libs/rand" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" @@ -24,17 +26,72 @@ import ( "github.com/sei-protocol/sei-chain/sei-tendermint/types" ) +type testTx struct { + tx types.Tx +} + type reactorTestSuite struct { network *p2p.TestNetwork reactors map[types.NodeID]*Reactor - mempools map[types.NodeID]*TxMempool + mempools map[types.NodeID]*mempool.TxMempool kvstores map[types.NodeID]*kvstore.Application nodes []types.NodeID } +func setupMempool(t testing.TB, app abci.Application, cacheSize int, txConstraintsFetcher mempool.TxConstraintsFetcher) *mempool.TxMempool { + t.Helper() + + cfg, err := config.ResetTestRoot(t.TempDir(), strings.ReplaceAll(t.Name(), "/", "|")) + require.NoError(t, err) + cfg.Mempool.CacheSize = cacheSize + cfg.Mempool.DropUtilisationThreshold = 0.0 + + t.Cleanup(func() { os.RemoveAll(cfg.RootDir) }) + + return mempool.NewTxMempool(cfg.Mempool.ToMempoolConfig(), app, mempool.NopMetrics(), txConstraintsFetcher) +} + +func checkTxs(ctx context.Context, t *testing.T, txmp *mempool.TxMempool, numTxs int, peerID uint16) []testTx { + t.Helper() + + txs := make([]testTx, numTxs) + txInfo := mempool.TxInfo{SenderID: peerID} + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + + for i := range numTxs { + prefix := make([]byte, 20) + _, err := rng.Read(prefix) + require.NoError(t, err) + + txs[i] = testTx{ + tx: []byte(fmt.Sprintf("sender-%d-%d=%X=%d", i, peerID, prefix, i+1000)), + } + require.NoError(t, txmp.CheckTx(ctx, txs[i].tx, nil, txInfo)) + } + + return txs +} + +func convertTex(in []testTx) types.Txs { + out := make([]types.Tx, len(in)) + for i := range in { + out[i] = in[i].tx + } + return out +} + func setupReactors(ctx context.Context, t *testing.T, numNodes int) *reactorTestSuite { + return setupReactorsWithTxConstraintsFetchers(ctx, t, numNodes, nil) +} + +func setupReactorsWithTxConstraintsFetchers( + ctx context.Context, + t *testing.T, + numNodes int, + txConstraintsFetchers map[int]mempool.TxConstraintsFetcher, +) *reactorTestSuite { t.Helper() cfg, err := config.ResetTestRoot(t.TempDir(), strings.ReplaceAll(t.Name(), "/", "|")) @@ -44,19 +101,23 @@ func setupReactors(ctx context.Context, t *testing.T, numNodes int) *reactorTest rts := &reactorTestSuite{ network: p2p.MakeTestNetwork(t, p2p.TestNetworkOptions{NumNodes: numNodes}), reactors: make(map[types.NodeID]*Reactor, numNodes), - mempools: make(map[types.NodeID]*TxMempool, numNodes), + mempools: make(map[types.NodeID]*mempool.TxMempool, numNodes), kvstores: make(map[types.NodeID]*kvstore.Application, numNodes), } - for _, node := range rts.network.Nodes() { + for i, node := range rts.network.Nodes() { nodeID := node.NodeID rts.kvstores[nodeID] = kvstore.NewApplication() app := rts.kvstores[nodeID] - mempool := setup(t, app, 0, NopTxConstraintsFetcher) - rts.mempools[nodeID] = mempool + txConstraintsFetcher := mempool.NopTxConstraintsFetcher + if customFetcher, ok := txConstraintsFetchers[i]; ok { + txConstraintsFetcher = customFetcher + } + txmp := setupMempool(t, app, 0, txConstraintsFetcher) + rts.mempools[nodeID] = txmp - reactor, err := NewReactor(mempool, node.Router) + reactor, err := NewReactor(config.TestMempoolConfig(), txmp, node.Router) if err != nil { t.Fatalf("NewReactor(): %v", err) } @@ -78,20 +139,19 @@ func setupReactors(ctx context.Context, t *testing.T, numNodes int) *reactorTest return rts } -func setupReactorForTest(t *testing.T, txConstraintsFetcher TxConstraintsFetcher) (*Reactor, *TxMempool) { +func setupReactorForTest(t *testing.T, txConstraintsFetcher mempool.TxConstraintsFetcher) (*Reactor, *mempool.TxMempool) { t.Helper() - cfg, err := config.ResetTestRoot(t.TempDir(), strings.ReplaceAll(t.Name(), "/", "|")) - require.NoError(t, err) + cfg := config.TestConfig() + cfg.SetRoot(t.TempDir()) cfg.Mempool.DropUtilisationThreshold = 0.0 cfg.Mempool.Broadcast = false - t.Cleanup(func() { os.RemoveAll(cfg.RootDir) }) network := p2p.MakeTestNetwork(t, p2p.TestNetworkOptions{NumNodes: 1}) node := network.Nodes()[0] - txmp := NewTxMempool(cfg.Mempool, kvstore.NewApplication(), NopMetrics(), txConstraintsFetcher) - reactor, err := NewReactor(txmp, node.Router) + txmp := mempool.NewTxMempool(cfg.Mempool.ToMempoolConfig(), kvstore.NewApplication(), mempool.NopMetrics(), txConstraintsFetcher) + reactor, err := NewReactor(cfg.Mempool, txmp, node.Router) require.NoError(t, err) reactor.MarkReadyToStart() require.NoError(t, reactor.Start(t.Context())) @@ -109,8 +169,6 @@ func (rts *reactorTestSuite) start(t *testing.T) { func (rts *reactorTestSuite) waitForTxns(t *testing.T, txs []types.Tx, ids ...types.NodeID) { t.Helper() - // ensure that the transactions get fully broadcast to the - // rest of the network wg := &sync.WaitGroup{} for name, pool := range rts.mempools { if !p2p.NodeInSlice(name, ids) { @@ -121,7 +179,7 @@ func (rts *reactorTestSuite) waitForTxns(t *testing.T, txs []types.Tx, ids ...ty } wg.Add(1) - go func(name types.NodeID, pool *TxMempool) { + go func(name types.NodeID, pool *mempool.TxMempool) { defer wg.Done() require.Eventually(t, func() bool { return len(txs) == pool.Size() }, time.Minute, @@ -133,6 +191,16 @@ func (rts *reactorTestSuite) waitForTxns(t *testing.T, txs []types.Tx, ids ...ty wg.Wait() } +func peerFailedCheckTxCount(reactor *Reactor, nodeID types.NodeID) utils.Option[int] { + for counts := range reactor.failedCheckTxCounts.Lock() { + if count, ok := counts[nodeID]; ok { + return utils.Some(count) + } + return utils.None[int]() + } + panic("unreachable") +} + func TestReactorBroadcastTxs(t *testing.T) { numTxs := 512 numNodes := 4 @@ -144,47 +212,35 @@ func TestReactorBroadcastTxs(t *testing.T) { primary := rts.nodes[0] secondaries := rts.nodes[1:] - txs := checkTxs(ctx, t, rts.reactors[primary].mempool, numTxs, UnknownPeerID) + txs := checkTxs(ctx, t, rts.reactors[primary].mempool, numTxs, mempool.UnknownPeerID) require.Equal(t, numTxs, rts.reactors[primary].mempool.Size()) rts.start(t) - - // Wait till all secondary suites (reactor) received all mempool txs from the - // primary suite (node). rts.waitForTxns(t, convertTex(txs), secondaries...) } -func peerFailedCheckTxCount(reactor *Reactor, nodeID types.NodeID) utils.Option[int] { - for counts := range reactor.failedCheckTxCounts.Lock() { - if count, ok := counts[nodeID]; ok { - return utils.Some(count) - } - return utils.None[int]() - } - panic("unreachable") -} - func TestReactorFailedCheckTxCountEvictsPeer(t *testing.T) { ctx := t.Context() - rts := setupReactors(ctx, t, 2) + rts := setupReactorsWithTxConstraintsFetchers(ctx, t, 2, map[int]mempool.TxConstraintsFetcher{ + 1: func() (mempool.TxConstraints, error) { + return mempool.TxConstraints{ + MaxDataBytes: 10, + MaxGas: -1, + }, nil + }, + }) t.Cleanup(leaktest.Check(t)) sender := rts.nodes[0] receiver := rts.nodes[1] - rts.start(t) - receiverReactor := rts.reactors[receiver] receiverReactor.cfg.CheckTxErrorBlacklistEnabled = true receiverReactor.cfg.CheckTxErrorThreshold = 2 - receiverReactor.mempool.txConstraintsFetcher = TxConstraintsFetcher(func() (TxConstraints, error) { - return TxConstraints{ - MaxDataBytes: 10, - MaxGas: -1, - }, nil - }) + + rts.start(t) conn := rts.network.Node(receiver).WaitForConnAndGet(ctx, sender) msgForTx := func(tx []byte) p2p.RecvMsg[*pb.Message] { @@ -197,8 +253,7 @@ func TestReactorFailedCheckTxCountEvictsPeer(t *testing.T) { }, } } - // The network connection can be established before the reactor processes the - // async PeerStatusUp update that initializes the sender's failure count, so we need to wait. + require.Eventually(t, func() bool { return peerFailedCheckTxCount(receiverReactor, sender) == utils.Some(0) }, time.Second, 50*time.Millisecond) @@ -223,8 +278,8 @@ func TestReactorFailedCheckTxCountEvictsPeer(t *testing.T) { func TestReactorPeerDownClearsFailedCheckTxCount(t *testing.T) { reactor, _ := setupReactorForTest( t, - func() (TxConstraints, error) { - return TxConstraints{ + func() (mempool.TxConstraints, error) { + return mempool.TxConstraints{ MaxDataBytes: 10, MaxGas: -1, }, nil @@ -243,19 +298,18 @@ func TestReactorPeerDownClearsFailedCheckTxCount(t *testing.T) { } reactor.cfg.CheckTxErrorBlacklistEnabled = true - reactor.processPeerUpdate(t.Context(), p2p.PeerUpdate{ - NodeID: "sender", - Status: p2p.PeerStatusUp, - }) + for counts := range reactor.failedCheckTxCounts.Lock() { + counts["sender"] = 0 + } require.Equal(t, utils.Some(0), peerFailedCheckTxCount(reactor, "sender")) require.NoError(t, reactor.handleMempoolMessage(t.Context(), msg)) require.Equal(t, utils.Some(1), peerFailedCheckTxCount(reactor, "sender")) - reactor.processPeerUpdate(t.Context(), p2p.PeerUpdate{ - NodeID: "sender", - Status: p2p.PeerStatusDown, - }) + reactor.ids.Reclaim("sender") + for counts := range reactor.failedCheckTxCounts.Lock() { + delete(counts, "sender") + } require.Equal(t, utils.None[int](), peerFailedCheckTxCount(reactor, "sender")) require.Equal(t, utils.Some(1), peerFailedCheckTxCount(reactor, "other")) @@ -264,8 +318,8 @@ func TestReactorPeerDownClearsFailedCheckTxCount(t *testing.T) { func TestReactorMissingFailedCheckTxCountIsNotRecreated(t *testing.T) { reactor, _ := setupReactorForTest( t, - func() (TxConstraints, error) { - return TxConstraints{ + func() (mempool.TxConstraints, error) { + return mempool.TxConstraints{ MaxDataBytes: 10, MaxGas: -1, }, nil @@ -281,24 +335,19 @@ func TestReactorMissingFailedCheckTxCountIsNotRecreated(t *testing.T) { } reactor.cfg.CheckTxErrorBlacklistEnabled = true - reactor.processPeerUpdate(t.Context(), p2p.PeerUpdate{ - NodeID: "sender", - Status: p2p.PeerStatusUp, - }) - reactor.processPeerUpdate(t.Context(), p2p.PeerUpdate{ - NodeID: "sender", - Status: p2p.PeerStatusDown, - }) + for counts := range reactor.failedCheckTxCounts.Lock() { + counts["sender"] = 0 + delete(counts, "sender") + } + reactor.ids.Reclaim("sender") require.NoError(t, reactor.handleMempoolMessage(t.Context(), msg)) require.Equal(t, utils.None[int](), peerFailedCheckTxCount(reactor, "sender")) } -// regression test for https://github.com/tendermint/tendermint/issues/5408 func TestReactorConcurrency(t *testing.T) { numTxs := 10 numNodes := 2 - ctx := t.Context() rts := setupReactors(ctx, t, numNodes) @@ -311,41 +360,36 @@ func TestReactorConcurrency(t *testing.T) { var wg sync.WaitGroup - for i := 0; i < runtime.NumCPU()*2; i++ { + for range runtime.NumCPU() * 2 { wg.Add(2) - // 1. submit a bunch of txs - // 2. update the whole mempool - - txs := checkTxs(ctx, t, rts.reactors[primary].mempool, numTxs, UnknownPeerID) + txs := checkTxs(ctx, t, rts.reactors[primary].mempool, numTxs, mempool.UnknownPeerID) go func() { defer wg.Done() - mempool := rts.mempools[primary] + txmp := rts.mempools[primary] - mempool.Lock() - defer mempool.Unlock() + txmp.Lock() + defer txmp.Unlock() deliverTxResponses := make([]*abci.ExecTxResult, len(txs)) for i := range txs { deliverTxResponses[i] = &abci.ExecTxResult{Code: 0} } - require.NoError(t, mempool.Update(ctx, 1, convertTex(txs), deliverTxResponses, mempool.txConstraintsFetcher, true)) + require.NoError(t, txmp.Update(ctx, 1, convertTex(txs), deliverTxResponses, mempool.NopTxConstraintsFetcher, true)) }() - // 1. submit a bunch of txs - // 2. update none - _ = checkTxs(ctx, t, rts.reactors[secondary].mempool, numTxs, UnknownPeerID) + _ = checkTxs(ctx, t, rts.reactors[secondary].mempool, numTxs, mempool.UnknownPeerID) go func() { defer wg.Done() - mempool := rts.mempools[secondary] + txmp := rts.mempools[secondary] - mempool.Lock() - defer mempool.Unlock() + txmp.Lock() + defer txmp.Unlock() - err := mempool.Update(ctx, 1, []types.Tx{}, make([]*abci.ExecTxResult, 0), mempool.txConstraintsFetcher, true) + err := txmp.Update(ctx, 1, []types.Tx{}, make([]*abci.ExecTxResult, 0), mempool.NopTxConstraintsFetcher, true) require.NoError(t, err) }() } @@ -356,7 +400,6 @@ func TestReactorConcurrency(t *testing.T) { func TestReactorNoBroadcastToSender(t *testing.T) { numTxs := 1000 numNodes := 2 - ctx := t.Context() rts := setupReactors(ctx, t, numNodes) @@ -369,7 +412,6 @@ func TestReactorNoBroadcastToSender(t *testing.T) { _ = checkTxs(ctx, t, rts.mempools[primary], numTxs, peerID) rts.start(t) - time.Sleep(100 * time.Millisecond) require.Eventually(t, func() bool { @@ -380,7 +422,6 @@ func TestReactorNoBroadcastToSender(t *testing.T) { func TestReactor_MaxTxBytes(t *testing.T) { numNodes := 2 cfg := config.TestConfig() - ctx := t.Context() rts := setupReactors(ctx, t, numNodes) @@ -389,16 +430,12 @@ func TestReactor_MaxTxBytes(t *testing.T) { primary := rts.nodes[0] secondary := rts.nodes[1] - // Broadcast a tx, which has the max size and ensure it's received by the - // second reactor. tx1 := tmrand.Bytes(cfg.Mempool.MaxTxBytes) err := rts.reactors[primary].mempool.CheckTx( ctx, tx1, nil, - TxInfo{ - SenderID: UnknownPeerID, - }, + mempool.TxInfo{SenderID: mempool.UnknownPeerID}, ) require.NoError(t, err) @@ -407,32 +444,24 @@ func TestReactor_MaxTxBytes(t *testing.T) { rts.reactors[primary].mempool.Flush() rts.reactors[secondary].mempool.Flush() - // broadcast a tx, which is beyond the max size and ensure it's not sent tx2 := tmrand.Bytes(cfg.Mempool.MaxTxBytes + 1) - err = rts.mempools[primary].CheckTx(ctx, tx2, nil, TxInfo{SenderID: UnknownPeerID}) + err = rts.mempools[primary].CheckTx(ctx, tx2, nil, mempool.TxInfo{SenderID: mempool.UnknownPeerID}) require.Error(t, err) } func TestDontExhaustMaxActiveIDs(t *testing.T) { t.Skip("this test fails, but the property it tests is not very useful") - // we're creating a single node network, but not starting the - // network. ctx := t.Context() - rts := setupReactors(ctx, t, 1) t.Cleanup(leaktest.Check(t)) nodeID := rts.nodes[0] - // ensure the reactor does not panic (i.e. exhaust active IDs) for range MaxActiveIDs + 1 { privKey := ed25519.GenerateSecretKey() peerID := types.NodeIDFromPubKey(privKey.Public()) - rts.reactors[nodeID].processPeerUpdate(ctx, p2p.PeerUpdate{ - Status: p2p.PeerStatusUp, - NodeID: peerID, - }) + rts.reactors[nodeID].ids.ReserveForPeer(peerID) } } @@ -441,7 +470,6 @@ func TestMempoolIDsPanicsIfNodeRequestsOvermaxActiveIDs(t *testing.T) { t.Skip("skipping test in short mode") } - // 0 is already reserved for UnknownPeerID ids := NewMempoolIDs() for i := range MaxActiveIDs - 1 { @@ -471,11 +499,9 @@ func TestBroadcastTxForPeerStopsWhenPeerStops(t *testing.T) { secondary := rts.nodes[1] rts.start(t) - - // disconnect peer rts.network.Remove(t, secondary) - txs := checkTxs(ctx, t, rts.reactors[primary].mempool, 4, UnknownPeerID) + txs := checkTxs(ctx, t, rts.reactors[primary].mempool, 4, mempool.UnknownPeerID) require.Equal(t, 4, len(txs)) require.Equal(t, 4, rts.mempools[primary].Size()) require.Equal(t, 0, rts.mempools[secondary].Size()) diff --git a/sei-tendermint/internal/mempool/testonly.go b/sei-tendermint/internal/mempool/testonly.go new file mode 100644 index 0000000000..2decfd0e9f --- /dev/null +++ b/sei-tendermint/internal/mempool/testonly.go @@ -0,0 +1,8 @@ +package mempool + +func TestConfig() *Config { + cfg := DefaultConfig() + cfg.CacheSize = 1000 + cfg.DropUtilisationThreshold = 0.0 + return cfg +} diff --git a/sei-tendermint/internal/mempool/tx.go b/sei-tendermint/internal/mempool/tx.go index 1f76bc26d7..3b632e41b4 100644 --- a/sei-tendermint/internal/mempool/tx.go +++ b/sei-tendermint/internal/mempool/tx.go @@ -6,7 +6,6 @@ import ( "time" abci "github.com/sei-protocol/sei-chain/sei-tendermint/abci/types" - "github.com/sei-protocol/sei-chain/sei-tendermint/config" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/libs/clist" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" "github.com/sei-protocol/sei-chain/sei-tendermint/types" @@ -85,6 +84,10 @@ func (wtx *WrappedTx) IsBefore(tx *WrappedTx) bool { return wtx.evmNonce < tx.evmNonce } +func (wtx *WrappedTx) Tx() types.Tx { return wtx.tx } + +func (wtx *WrappedTx) Key() types.TxKey { return wtx.hash } + func (wtx *WrappedTx) Size() int { return len(wtx.tx) } @@ -308,7 +311,7 @@ func (wtl *WrappedTxList) Purge(minTime utils.Option[time.Time], minHeight utils type PendingTxs struct { mtx *sync.RWMutex txs []TxWithResponse - config *config.MempoolConfig + config *Config sizeBytes uint64 } @@ -318,7 +321,7 @@ type TxWithResponse struct { txInfo TxInfo } -func NewPendingTxs(conf *config.MempoolConfig) *PendingTxs { +func NewPendingTxs(conf *Config) *PendingTxs { return &PendingTxs{ mtx: &sync.RWMutex{}, txs: []TxWithResponse{}, diff --git a/sei-tendermint/internal/mempool/tx_test.go b/sei-tendermint/internal/mempool/tx_test.go index e4e52a53b0..5ccc36b613 100644 --- a/sei-tendermint/internal/mempool/tx_test.go +++ b/sei-tendermint/internal/mempool/tx_test.go @@ -7,7 +7,6 @@ import ( "time" abci "github.com/sei-protocol/sei-chain/sei-tendermint/abci/types" - "github.com/sei-protocol/sei-chain/sei-tendermint/config" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils/require" "github.com/sei-protocol/sei-chain/sei-tendermint/types" @@ -200,9 +199,9 @@ func TestTxStore_Size(t *testing.T) { txStore := NewTxStore() numTxs := 1000 - for i := 0; i < numTxs; i++ { + for i := range numTxs { txStore.SetTx(&WrappedTx{ - tx: []byte(fmt.Sprintf("test_tx_%d", i)), + tx: fmt.Appendf(nil, "test_tx_%d", i), priority: int64(i), timestamp: time.Now(), }) @@ -271,7 +270,7 @@ func TestWrappedTxList(t *testing.T) { } func TestPendingTxsPopTxsGood(t *testing.T) { - pendingTxs := NewPendingTxs(config.TestMempoolConfig()) + pendingTxs := NewPendingTxs(DefaultConfig()) for _, test := range []struct { origLen int popIndices []int @@ -334,7 +333,7 @@ func TestPendingTxsPopTxsGood(t *testing.T) { } func TestPendingTxsPopTxsBad(t *testing.T) { - pendingTxs := NewPendingTxs(config.TestMempoolConfig()) + pendingTxs := NewPendingTxs(DefaultConfig()) // out of range require.Panics(t, func() { pendingTxs.popTxsAtIndices([]int{0}) }) // out of order @@ -345,7 +344,7 @@ func TestPendingTxsPopTxsBad(t *testing.T) { } func TestPendingTxs_InsertCondition(t *testing.T) { - mempoolCfg := config.TestMempoolConfig() + mempoolCfg := DefaultConfig() // First test exceeding number of txs mempoolCfg.PendingSize = 2 diff --git a/sei-tendermint/internal/mempool/types.go b/sei-tendermint/internal/mempool/types.go index c0b67ec78d..d3343beb37 100644 --- a/sei-tendermint/internal/mempool/types.go +++ b/sei-tendermint/internal/mempool/types.go @@ -1,22 +1,11 @@ package mempool -import ( - "math" - - "github.com/sei-protocol/sei-chain/sei-tendermint/internal/p2p" -) +import "math" const ( - MempoolChannel = p2p.ChannelID(0x30) - - // PeerCatchupSleepIntervalMS defines how much time to sleep if a peer is behind - PeerCatchupSleepIntervalMS = 100 - // UnknownPeerID is the peer ID to use when running CheckTx when there is // no peer (e.g. RPC) UnknownPeerID uint16 = 0 - - MaxActiveIDs = math.MaxUint16 ) // TxConstraints contains the precomputed consensus-derived mempool limits for diff --git a/sei-tendermint/internal/p2p/giga/api.go b/sei-tendermint/internal/p2p/giga/api.go index 109682a486..84ef2b451f 100644 --- a/sei-tendermint/internal/p2p/giga/api.go +++ b/sei-tendermint/internal/p2p/giga/api.go @@ -2,6 +2,7 @@ package giga import ( apb "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/pb" + "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/types" pb "github.com/sei-protocol/sei-chain/sei-tendermint/internal/p2p/giga/pb" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/p2p/rpc" ) @@ -21,7 +22,10 @@ var StreamLaneProposals = rpc.Register[API]( 1, rpc.Limit{Rate: 1, Concurrent: 1}, rpc.Msg[*pb.StreamLaneProposalsReq]{MsgSize: kB, Window: 1}, - rpc.Msg[*pb.LaneProposal]{MsgSize: 2 * MB, Window: 5}, + rpc.Msg[*pb.LaneProposal]{ + MsgSize: rpc.InBytes(types.MaxBlockProtoSize) + 10*kB, + Window: 5, + }, ) var StreamLaneVotes = rpc.Register[API]( 2, @@ -65,5 +69,8 @@ var StreamFullCommitQCs = rpc.Register[API](7, var GetBlock = rpc.Register[API](8, rpc.Limit{Rate: 10, Concurrent: 10}, rpc.Msg[*pb.GetBlockReq]{MsgSize: 10 * kB, Window: 1}, - rpc.Msg[*pb.GetBlockResp]{MsgSize: 2 * MB, Window: 1}, + rpc.Msg[*pb.GetBlockResp]{ + MsgSize: rpc.InBytes(types.MaxBlockProtoSize) + 10*kB, + Window: 1, + }, ) diff --git a/sei-tendermint/internal/p2p/giga_router.go b/sei-tendermint/internal/p2p/giga_router.go index c77b3d72df..9ef7e90436 100644 --- a/sei-tendermint/internal/p2p/giga_router.go +++ b/sei-tendermint/internal/p2p/giga_router.go @@ -11,9 +11,9 @@ import ( "github.com/sei-protocol/sei-chain/sei-tendermint/crypto" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/consensus" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/data" - "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/pb" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/producer" atypes "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/types" + "github.com/sei-protocol/sei-chain/sei-tendermint/internal/mempool" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/p2p/giga" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/p2p/rpc" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" @@ -37,7 +37,7 @@ type GigaRouterConfig struct { ValidatorAddrs map[atypes.PublicKey]GigaNodeAddr Consensus *consensus.Config Producer *producer.Config - App abci.Application + TxMempool *mempool.TxMempool GenDoc *types.GenesisDoc } @@ -52,10 +52,6 @@ type GigaRouter struct { poolOut *giga.Pool[NodePublicKey, rpc.Client[giga.API]] } -func (r *GigaRouter) PushToMempool(ctx context.Context, tx *pb.Transaction) error { - return r.producer.PushToMempool(ctx, tx) -} - func NewGigaRouter(cfg *GigaRouterConfig, key NodeSecretKey) (*GigaRouter, error) { if cfg.GenDoc.InitialHeight < 1 { return nil, fmt.Errorf("GenDoc.InitialHeight = %v, want >=1", cfg.GenDoc.InitialHeight) @@ -81,7 +77,7 @@ func NewGigaRouter(cfg *GigaRouterConfig, key NodeSecretKey) (*GigaRouter, error if err != nil { return nil, fmt.Errorf("consensus.NewState(): %w", err) } - producerState := producer.NewState(cfg.Producer, consensusState) + producerState := producer.NewState(cfg.Producer, cfg.TxMempool, consensusState) logger.Info("GigaRouter initialized", "validators", len(cfg.ValidatorAddrs), "dial_interval", cfg.DialInterval) return &GigaRouter{ cfg: cfg, @@ -95,8 +91,77 @@ func NewGigaRouter(cfg *GigaRouterConfig, key NodeSecretKey) (*GigaRouter, error }, nil } +func (r *GigaRouter) executeBlock(ctx context.Context, b *atypes.GlobalBlock) (*abci.ResponseCommit, error) { + app := r.cfg.TxMempool.App() + hash := b.Header.Hash() + var proposerAddress types.Address + if vals := app.GetValidators(); len(vals) > 0 { + // Deterministically select a proposer from the app's validator committee. + // We need it so that app does not emit error logs. + proposer := slices.MinFunc(vals, func(a, b abci.ValidatorUpdate) int { return a.PubKey.Compare(b.PubKey) }) + key, err := crypto.PubKeyFromProto(proposer.PubKey) + if err != nil { + return nil, fmt.Errorf("crypto.PubKeyFromProto(): %w", err) + } + proposerAddress = key.Address() + } + + // TODO: add metrics to understand execution latency. + r.cfg.TxMempool.Lock() + defer r.cfg.TxMempool.Unlock() + + resp, err := app.FinalizeBlock(ctx, &abci.RequestFinalizeBlock{ + Txs: b.Payload.Txs(), + // Empty DecidedLastCommit does not indicate missing votes. + DecidedLastCommit: abci.CommitInfo{}, + // WARNING: this is a hash of the autobahn block header. + // It is used to identify block processed optimistically + // and is fed as block hash to EVM contracts. + Hash: hash[:], + Header: (&types.Header{ + ChainID: r.cfg.GenDoc.ChainID, + Height: int64(b.GlobalNumber), // nolint:gosec // different representations of the same value + Time: b.Timestamp, + // WARNING: the reward distribution has corner cases where it forgets the proposer, + // because reward is distributed with a delay. This is not our problem here though. + ProposerAddress: proposerAddress, + }).ToProto(), + }) + if err != nil { + return nil, fmt.Errorf("r.cfg.App.FinalizeBlock(): %w", err) + } + if err := r.data.PushAppHash(ctx, b.GlobalNumber, resp.AppHash); err != nil { + return nil, fmt.Errorf("r.data.PushAppHash(%v): %w", b.GlobalNumber, err) + } + commitResp, err := app.Commit(ctx) + if err != nil { + return nil, fmt.Errorf("r.cfg.App.Commit(): %w", err) + } + blockTxs := make(types.Txs, len(b.Payload.Txs())) + for i, tx := range b.Payload.Txs() { + blockTxs[i] = tx + } + err = r.cfg.TxMempool.Update( + ctx, + int64(b.GlobalNumber), // nolint:gosec // autobahn block numbers fit in int64. + blockTxs, + resp.TxResults, + // TODO: We need the constraints to be fixed per epoch, because we don't know where the lane blocks will be sequenced. + // Therefore we disable constraints for now, until epochs are supported AND + // chain state understands that consensus parameters can change only at the epoch boundary. + mempool.NopTxConstraintsFetcher, + true, + ) + if err != nil { + return nil, fmt.Errorf("r.cfg.TxMempool.Update(%v): %w", b.GlobalNumber, err) + } + return commitResp, nil +} + func (r *GigaRouter) runExecute(ctx context.Context) error { - info, err := r.cfg.App.Info(ctx, &version.RequestInfo) + app := r.cfg.TxMempool.App() + + info, err := app.Info(ctx, &version.RequestInfo) if err != nil { return fmt.Errorf("App.Info(): %w", err) } @@ -106,8 +171,11 @@ func (r *GigaRouter) runExecute(ctx context.Context) error { } next := last + 1 if last == 0 { - if _, err := r.cfg.App.InitChain(ctx, r.cfg.GenDoc.ToRequestInitChain()); err != nil { - return fmt.Errorf("App.InitChain(): %w", err) + if _, err := app.InitChain(ctx, r.cfg.GenDoc.ToRequestInitChain()); err != nil { + return fmt.Errorf("app.InitChain(): %w", err) + } + if _, err := app.Commit(ctx); err != nil { + return fmt.Errorf("app.Commit(): %w", err) } var ok bool next, ok = utils.SafeCast[atypes.GlobalBlockNumber](r.cfg.GenDoc.InitialHeight) @@ -125,48 +193,11 @@ func (r *GigaRouter) runExecute(ctx context.Context) error { for n := next; ; n += 1 { b, err := r.data.GlobalBlock(ctx, n) if err != nil { - return err - } - - hash := b.Header.Hash() - var proposerAddress types.Address - if vals := r.cfg.App.GetValidators(); len(vals) > 0 { - // Deterministically select a proposer from the app's validator committee. - // We need it so that app does not emit error logs. - proposer := slices.MinFunc(vals, func(a, b abci.ValidatorUpdate) int { return a.PubKey.Compare(b.PubKey) }) - key, err := crypto.PubKeyFromProto(proposer.PubKey) - if err != nil { - return fmt.Errorf("crypto.PubKeyFromProto(): %w", err) - } - proposerAddress = key.Address() - } - resp, err := r.cfg.App.FinalizeBlock(ctx, &abci.RequestFinalizeBlock{ - Txs: b.Payload.Txs(), - // Empty DecidedLastCommit does not indicate missing votes. - DecidedLastCommit: abci.CommitInfo{}, - // WARNING: this is a hash of the autobahn block header. - // It is used to identify block processed optimistically - // and is fed as block hash to EVM contracts. - Hash: hash[:], - Header: (&types.Header{ - ChainID: r.cfg.GenDoc.ChainID, - Height: int64(n), // nolint:gosec // different representations of the same value - Time: b.Timestamp, - // WARNING: the reward distribution has corner cases where it forgets the proposer, - // because reward is distributed with a delay. This is not our problem here though. - ProposerAddress: proposerAddress, - }).ToProto(), - }) - if err != nil { - return fmt.Errorf("r.cfg.App.FinalizeBlock(): %w", err) - } - // TODO: we need the block to be persisted before we vote for apphash. - if err := r.data.PushAppHash(ctx, n, resp.AppHash); err != nil { - return fmt.Errorf("r.data.PushAppHash(%v): %w", n, err) + return fmt.Errorf("r.data.GlobalBlock(%v): %w", n, err) } - commitResp, err := r.cfg.App.Commit(ctx) + commitResp, err := r.executeBlock(ctx, b) if err != nil { - return fmt.Errorf("r.cfg.App.Commit(): %w", err) + return fmt.Errorf("r.executeBlock(%v): %w", n, err) } pruneBefore, ok := utils.SafeCast[atypes.GlobalBlockNumber](commitResp.RetainHeight) if !ok { diff --git a/sei-tendermint/internal/p2p/giga_router_test.go b/sei-tendermint/internal/p2p/giga_router_test.go index 07165d6045..e7d590099e 100644 --- a/sei-tendermint/internal/p2p/giga_router_test.go +++ b/sei-tendermint/internal/p2p/giga_router_test.go @@ -17,9 +17,9 @@ import ( "github.com/sei-protocol/sei-chain/sei-tendermint/crypto" "github.com/sei-protocol/sei-chain/sei-tendermint/crypto/ed25519" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/consensus" - apb "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/pb" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/producer" atypes "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/types" + "github.com/sei-protocol/sei-chain/sei-tendermint/internal/mempool" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/p2p/conn" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils/require" @@ -36,6 +36,7 @@ type testAppState struct { Blocks []*abci.RequestFinalizeBlock Txs map[shaHash]bool AppHash shaHash + Committed bool } func testAppStateJSON(rng utils.Rng) json.RawMessage { @@ -77,6 +78,15 @@ func (a *testApp) Info(_ context.Context, _ *abci.RequestInfo) (*abci.ResponseIn panic("unreachable") } +func (a *testApp) CheckTx(context.Context, *abci.RequestCheckTxV2) (*abci.ResponseCheckTxV2, error) { + return &abci.ResponseCheckTxV2{ + ResponseCheckTx: &abci.ResponseCheckTx{ + Code: abci.CodeTypeOK, + GasWanted: 1, + }, + }, nil +} + func (a *testApp) InitChain(_ context.Context, req *abci.RequestInitChain) (*abci.ResponseInitChain, error) { for state, ctrl := range a.state.Lock() { if state.Init.IsPresent() { @@ -92,6 +102,7 @@ func (a *testApp) InitChain(_ context.Context, req *abci.RequestInitChain) (*abc state.Init = utils.Some(req) state.AppHash = sha256.Sum256(req.AppStateBytes) state.Validators = utils.Slice(val) + state.Committed = false ctrl.Updated() return &abci.ResponseInitChain{ AppHash: slices.Clone(state.AppHash[:]), @@ -103,6 +114,9 @@ func (a *testApp) InitChain(_ context.Context, req *abci.RequestInitChain) (*abc func (a *testApp) FinalizeBlock(_ context.Context, req *abci.RequestFinalizeBlock) (*abci.ResponseFinalizeBlock, error) { for state, ctrl := range a.state.Lock() { + if !state.Committed { + return nil, fmt.Errorf("not committed") + } init, ok := state.Init.Get() if !ok { return nil, fmt.Errorf("app not initialized") @@ -113,6 +127,7 @@ func (a *testApp) FinalizeBlock(_ context.Context, req *abci.RequestFinalizeBloc state.Txs[sha256.Sum256(tx)] = true } logger.Info("FinalizeBlock", "n", req.Header.Height-init.InitialHeight) + state.Committed = false ctrl.Updated() return &abci.ResponseFinalizeBlock{ AppHash: slices.Clone(state.AppHash[:]), @@ -123,6 +138,13 @@ func (a *testApp) FinalizeBlock(_ context.Context, req *abci.RequestFinalizeBloc } func (a *testApp) Commit(context.Context) (*abci.ResponseCommit, error) { + for state, ctrl := range a.state.Lock() { + if state.Committed { + return nil, fmt.Errorf("double commit") + } + state.Committed = true + ctrl.Updated() + } return &abci.ResponseCommit{ // Don't prune anything. RetainHeight: 0, @@ -145,6 +167,8 @@ func (a *testApp) Snapshot() testAppState { s := *state // Txs is derived and the only mutable field. s.Txs = nil + // "Committed" field is not guaranteed to be consistent. + s.Committed = false return s } panic("unreachable") @@ -199,6 +223,7 @@ func TestGigaRouter_FinalizeBlocks(t *testing.T) { nodeInfo.Network = genDoc.ChainID e := Endpoint{AddrPort: cfg.addr} app := newTestApp() + txMempool := mempool.NewTxMempool(mempool.TestConfig(), app, mempool.NopMetrics(), mempool.NopTxConstraintsFetcher) router, err := NewRouter( NopMetrics(), cfg.nodeKey, @@ -228,8 +253,8 @@ func TestGigaRouter_FinalizeBlocks(t *testing.T) { BlockInterval: 100 * time.Millisecond, AllowEmptyBlocks: false, }, - App: app, - GenDoc: genDoc, + TxMempool: txMempool, + GenDoc: genDoc, }), }, ) @@ -245,17 +270,9 @@ func TestGigaRouter_FinalizeBlocks(t *testing.T) { allTxs = append(allTxs, tx) } s.SpawnNamed(fmt.Sprintf("producer[%v]", i), func() error { - giga, ok := router.giga.Get() - if !ok { - panic("giga router not set up") - } for _, payload := range txs { - tx := &apb.Transaction{ - Payload: payload, - GasUsed: txGasUsed, - } - if err := giga.PushToMempool(ctx, tx); err != nil { - return fmt.Errorf("PushToMempool(): %w", err) + if err := txMempool.CheckTx(ctx, payload, nil, mempool.TxInfo{}); err != nil { + return fmt.Errorf("txMempool.CheckTx(): %w", err) } } return nil diff --git a/sei-tendermint/internal/p2p/router_test.go b/sei-tendermint/internal/p2p/router_test.go index fd79856a11..d6adaace71 100644 --- a/sei-tendermint/internal/p2p/router_test.go +++ b/sei-tendermint/internal/p2p/router_test.go @@ -14,10 +14,12 @@ import ( dbm "github.com/tendermint/tm-db" "golang.org/x/time/rate" + abci "github.com/sei-protocol/sei-chain/sei-tendermint/abci/types" "github.com/sei-protocol/sei-chain/sei-tendermint/crypto/ed25519" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/consensus" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/producer" atypes "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/types" + "github.com/sei-protocol/sei-chain/sei-tendermint/internal/mempool" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/p2p/conn" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils/require" @@ -322,6 +324,7 @@ func TestRouter_GigaSetWhenConfigured(t *testing.T) { // Use intentionally non-default values to ensure config actually propagates. opts := makeRouterOptions() + txMempool := mempool.NewTxMempool(mempool.TestConfig(), abci.BaseApplication{}, mempool.NopMetrics(), mempool.NopTxConstraintsFetcher) opts.Giga = utils.Some(&GigaRouterConfig{ DialInterval: 7 * time.Second, ValidatorAddrs: validatorAddrs, @@ -338,7 +341,7 @@ func TestRouter_GigaSetWhenConfigured(t *testing.T) { BlockInterval: 777 * time.Millisecond, AllowEmptyBlocks: true, }, - App: nil, + TxMempool: txMempool, GenDoc: &types.GenesisDoc{ ChainID: "giga-e2e-test", InitialHeight: 42, diff --git a/sei-tendermint/internal/protoutils/msg.go b/sei-tendermint/internal/protoutils/msg.go index d71d849238..892a2973c7 100644 --- a/sei-tendermint/internal/protoutils/msg.go +++ b/sei-tendermint/internal/protoutils/msg.go @@ -16,6 +16,11 @@ func New[T Message]() T { return utils.Zero[T]().ProtoReflect().New().Interface().(T) } +// Computes the size of the message encoding. +func Size[T Message](t T) int { + return proto.Size(t) +} + func Marshal[T Message](t T) []byte { // Marshalling messages is always expected to succeed. return utils.OrPanic1(proto.Marshal(t)) diff --git a/sei-tendermint/internal/state/helpers_test.go b/sei-tendermint/internal/state/helpers_test.go index 6ddc412218..eba72df804 100644 --- a/sei-tendermint/internal/state/helpers_test.go +++ b/sei-tendermint/internal/state/helpers_test.go @@ -11,7 +11,6 @@ import ( dbm "github.com/tendermint/tm-db" abci "github.com/sei-protocol/sei-chain/sei-tendermint/abci/types" - "github.com/sei-protocol/sei-chain/sei-tendermint/config" "github.com/sei-protocol/sei-chain/sei-tendermint/crypto" "github.com/sei-protocol/sei-chain/sei-tendermint/crypto/ed25519" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/mempool" @@ -234,9 +233,7 @@ func randomGenesisDoc() *types.GenesisDoc { func makeTxMempool(t testing.TB, app abci.Application) *mempool.TxMempool { t.Helper() - cfg := config.TestMempoolConfig() - - return mempool.NewTxMempool(cfg, app, mempool.NopMetrics(), mempool.NopTxConstraintsFetcher) + return mempool.NewTxMempool(mempool.TestConfig(), app, mempool.NopMetrics(), mempool.NopTxConstraintsFetcher) } // used for testing by state store diff --git a/sei-tendermint/node/node.go b/sei-tendermint/node/node.go index e549b1e15d..71953fa69e 100644 --- a/sei-tendermint/node/node.go +++ b/sei-tendermint/node/node.go @@ -23,6 +23,7 @@ import ( "github.com/sei-protocol/sei-chain/sei-tendermint/internal/eventlog" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/evidence" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/mempool" + mempoolreactor "github.com/sei-protocol/sei-chain/sei-tendermint/internal/mempool/reactor" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/p2p" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/p2p/pex" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/proxy" @@ -65,6 +66,7 @@ type nodeImpl struct { initialState sm.State stateStore sm.Store blockStore *store.BlockStore // store the blockchain to disk + mempool *mempool.TxMempool evPool *evidence.Pool indexerService *indexer.Service services []service.Service @@ -187,22 +189,40 @@ func makeNode( fmt.Errorf("autobahn does not support remote validator signers (priv-validator.laddr is set)"), makeCloser(closers)) } - router, peerCloser, err := createRouter(nodeMetrics.p2p, node.NodeInfo, nodeKey, utils.Some(atypes.SecretKeyFromED25519(filePrivval.Key.PrivKey)), cfg, app, genDoc, dbProvider) + gigaEnabled := cfg.AutobahnConfigFile != "" + mp := mempool.NewTxMempool(cfg.Mempool.ToMempoolConfig(), app, nodeMetrics.mempool, sm.TxConstraintsFetcherFromStore(stateStore)) + router, peerCloser, err := createRouter( + nodeMetrics.p2p, + node.NodeInfo, + nodeKey, + utils.Some(atypes.SecretKeyFromED25519(filePrivval.Key.PrivKey)), + cfg, + utils.Some(mp), + genDoc, + dbProvider, + ) closers = append(closers, peerCloser) if err != nil { return nil, combineCloseError( fmt.Errorf("failed to create router: %w", err), makeCloser(closers)) } - mp := mempool.NewTxMempool(cfg.Mempool, app, nodeMetrics.mempool, sm.TxConstraintsFetcherFromStore(stateStore)) - mpReactor, err := mempool.NewReactor(mp, router) - if err != nil { - return nil, fmt.Errorf("mempool.NewReactor(): %w", err) - } node.router = router + node.mempool = mp node.rpcEnv.Router = router node.shutdownOps = makeCloser(closers) + // Mempool gossiping is not compatible with Giga, + // so we disable the mempool reactor. + if !gigaEnabled { + mpReactor, err := mempoolreactor.NewReactor(cfg.Mempool, mp, router) + if err != nil { + return nil, fmt.Errorf("mempoolreactor.NewReactor(): %w", err) + } + mpReactor.MarkReadyToStart() + node.services = append(node.services, mpReactor) + } + evReactor, evPool, edbCloser, err := createEvidenceReactor(cfg, dbProvider, stateStore, blockStore, node.router, nodeMetrics.evidence, eventBus) closers = append(closers, edbCloser) @@ -213,10 +233,7 @@ func makeNode( node.rpcEnv.EvidencePool = evPool node.evPool = evPool - mpReactor.MarkReadyToStart() - node.rpcEnv.Mempool = mp - node.services = append(node.services, mpReactor) // make block executor for consensus and blockchain reactors to execute blocks blockExec := sm.NewBlockExecutor( @@ -239,6 +256,10 @@ func makeNode( // Determine whether we should do block sync. This must happen after the handshake, since the // app may modify the validator set, specifying ourself as the only validator. blockSync := !onlyValidatorIsUs(state, pubKey) + if gigaEnabled { + stateSync = false + blockSync = false + } waitSync := stateSync || blockSync csState, err := consensus.NewState( @@ -258,20 +279,23 @@ func makeNode( } node.rpcEnv.ConsensusState = csState - csReactor, err := consensus.NewReactor( - csState, - node.router, - eventBus, - waitSync, - nodeMetrics.consensus, - cfg, - ) - if err != nil { - return nil, fmt.Errorf("consensus.NewReactor(): %w", err) - } + var csReactor *consensus.Reactor + if !gigaEnabled { + csReactor, err = consensus.NewReactor( + csState, + node.router, + eventBus, + waitSync, + nodeMetrics.consensus, + cfg, + ) + if err != nil { + return nil, fmt.Errorf("consensus.NewReactor(): %w", err) + } - node.services = append(node.services, csReactor) - node.rpcEnv.ConsensusReactor = csReactor + node.services = append(node.services, csReactor) + node.rpcEnv.ConsensusReactor = csReactor + } // Create the blockchain reactor. Note, we do not start block sync if we're // doing a state sync first. @@ -331,29 +355,30 @@ func makeNode( // FIXME The way we do phased startups (e.g. replay -> block sync -> consensus) is very messy, // we should clean this whole thing up. See: // https://github.com/tendermint/tendermint/issues/4644 - ssReactor, err := statesync.NewReactor( - genDoc.ChainID, - genDoc.InitialHeight, - *cfg.StateSync, - app, - node.router, - stateStore, - blockStore, - cfg.StateSync.TempDir, - nodeMetrics.statesync, - eventBus, - // the post-sync operation - postSyncHook, - stateSync, - restartEvent, - cfg.SelfRemediation, - ) - if err != nil { - return nil, fmt.Errorf("statesync.NewReactor(): %w", err) - } - node.shouldHandshake = !stateSync - node.services = append(node.services, ssReactor) + if !gigaEnabled { + ssReactor, err := statesync.NewReactor( + genDoc.ChainID, + genDoc.InitialHeight, + *cfg.StateSync, + app, + node.router, + stateStore, + blockStore, + cfg.StateSync.TempDir, + nodeMetrics.statesync, + eventBus, + // the post-sync operation + postSyncHook, + stateSync, + restartEvent, + cfg.SelfRemediation, + ) + if err != nil { + return nil, fmt.Errorf("statesync.NewReactor(): %w", err) + } + node.services = append(node.services, ssReactor) + } if cfg.Mode == config.ModeValidator { if privValidator != nil { @@ -470,6 +495,7 @@ func (n *nodeImpl) OnStart(ctx context.Context) error { return err } n.rpcEnv.IsListening = true + n.SpawnCritical("mempool", n.mempool.Run) for _, reactor := range n.services { if err := reactor.Start(ctx); err != nil { diff --git a/sei-tendermint/node/node_test.go b/sei-tendermint/node/node_test.go index 4d2835b864..29ce64e507 100644 --- a/sei-tendermint/node/node_test.go +++ b/sei-tendermint/node/node_test.go @@ -285,7 +285,7 @@ func TestCreateProposalBlock(t *testing.T) { proposerAddr, _, ok := state.Validators.GetByIndex(0) require.True(t, ok) mp := mempool.NewTxMempool( - cfg.Mempool, + cfg.Mempool.ToMempoolConfig(), app, mempool.NopMetrics(), mempool.NopTxConstraintsFetcher, @@ -382,7 +382,7 @@ func TestMaxTxsProposalBlockSize(t *testing.T) { // Make Mempool mp := mempool.NewTxMempool( - cfg.Mempool, + cfg.Mempool.ToMempoolConfig(), app, mempool.NopMetrics(), mempool.NopTxConstraintsFetcher, @@ -447,7 +447,7 @@ func TestMaxProposalBlockSize(t *testing.T) { // Make Mempool mp := mempool.NewTxMempool( - cfg.Mempool, + cfg.Mempool.ToMempoolConfig(), app, mempool.NopMetrics(), mempool.NopTxConstraintsFetcher, diff --git a/sei-tendermint/node/seed.go b/sei-tendermint/node/seed.go index 22c83c2402..1abedb24ab 100644 --- a/sei-tendermint/node/seed.go +++ b/sei-tendermint/node/seed.go @@ -12,6 +12,7 @@ import ( "github.com/sei-protocol/sei-chain/sei-tendermint/config" atypes "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/types" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/eventbus" + "github.com/sei-protocol/sei-chain/sei-tendermint/internal/mempool" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/p2p" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/p2p/pex" rpccore "github.com/sei-protocol/sei-chain/sei-tendermint/internal/rpc/core" @@ -70,7 +71,16 @@ func makeSeedNode( return nil, err } - router, peerCloser, err := createRouter(nodeMetrics.p2p, func() *types.NodeInfo { return &nodeInfo }, nodeKey, utils.None[atypes.SecretKey](), cfg, nil, genDoc, dbProvider) + router, peerCloser, err := createRouter( + nodeMetrics.p2p, + func() *types.NodeInfo { return &nodeInfo }, + nodeKey, + utils.None[atypes.SecretKey](), + cfg, + utils.None[*mempool.TxMempool](), + genDoc, + dbProvider, + ) if err != nil { return nil, combineCloseError( fmt.Errorf("failed to create router: %w", err), diff --git a/sei-tendermint/node/setup.go b/sei-tendermint/node/setup.go index 65d001fcac..db7ae10480 100644 --- a/sei-tendermint/node/setup.go +++ b/sei-tendermint/node/setup.go @@ -11,7 +11,6 @@ import ( "strings" "time" - abci "github.com/sei-protocol/sei-chain/sei-tendermint/abci/types" "github.com/sei-protocol/sei-chain/sei-tendermint/config" "github.com/sei-protocol/sei-chain/sei-tendermint/crypto" autobahnConsensus "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/consensus" @@ -22,6 +21,7 @@ import ( "github.com/sei-protocol/sei-chain/sei-tendermint/internal/eventbus" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/evidence" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/mempool" + mempoolreactor "github.com/sei-protocol/sei-chain/sei-tendermint/internal/mempool/reactor" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/p2p" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/p2p/conn" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/p2p/pex" @@ -184,12 +184,12 @@ func loadAutobahnFileConfig(path string) (*config.AutobahnFileConfig, error) { return &fc, nil } -// buildGigaConfig constructs a GigaRouterConfig from the autobahn config file, node key, app, and genesis doc. +// buildGigaConfig constructs a GigaRouterConfig from the autobahn config file, node key, and genesis doc. func buildGigaConfig( autobahnConfigFile string, nodeKey types.NodeKey, validatorKey atypes.SecretKey, - appClient abci.Application, + txMempool *mempool.TxMempool, genDoc *types.GenesisDoc, ) (*p2p.GigaRouterConfig, error) { if autobahnConfigFile == "" { @@ -245,8 +245,8 @@ func buildGigaConfig( BlockInterval: time.Duration(fc.BlockInterval), AllowEmptyBlocks: fc.AllowEmptyBlocks, }, - App: appClient, - GenDoc: genDoc, + TxMempool: txMempool, + GenDoc: genDoc, }, nil } @@ -256,7 +256,7 @@ func createRouter( nodeKey types.NodeKey, validatorKey utils.Option[atypes.SecretKey], cfg *config.Config, - app abci.Application, + txMempool utils.Option[*mempool.TxMempool], genDoc *types.GenesisDoc, dbProvider config.DBProvider, ) (*p2p.Router, closer, error) { @@ -348,7 +348,11 @@ func createRouter( if !ok { return nil, closer, fmt.Errorf("autobahn non-validator nodes are not supported yet; a local validator key is required") } - gigaCfg, err := buildGigaConfig(cfg.AutobahnConfigFile, nodeKey, valKey, app, genDoc) + mp, ok := txMempool.Get() + if !ok { + return nil, closer, errors.New("autobahn requires a tx mempool") + } + gigaCfg, err := buildGigaConfig(cfg.AutobahnConfigFile, nodeKey, valKey, mp, genDoc) if err != nil { return nil, closer, fmt.Errorf("buildGigaConfig: %w", err) } @@ -403,7 +407,7 @@ func makeNodeInfo( byte(consensus.DataChannel), byte(consensus.VoteChannel), byte(consensus.VoteSetBitsChannel), - byte(mempool.MempoolChannel), + byte(mempoolreactor.MempoolChannel), byte(evidence.EvidenceChannel), byte(statesync.SnapshotChannel), byte(statesync.ChunkChannel), diff --git a/sei-tendermint/node/setup_test.go b/sei-tendermint/node/setup_test.go index 9aca24b31f..6c56940b0a 100644 --- a/sei-tendermint/node/setup_test.go +++ b/sei-tendermint/node/setup_test.go @@ -10,9 +10,11 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/sei-protocol/sei-chain/sei-tendermint/abci/example/kvstore" "github.com/sei-protocol/sei-chain/sei-tendermint/config" "github.com/sei-protocol/sei-chain/sei-tendermint/crypto/ed25519" atypes "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/types" + "github.com/sei-protocol/sei-chain/sei-tendermint/internal/mempool" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/p2p" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils/tcp" @@ -65,10 +67,23 @@ func defaultFileConfig(validators []config.AutobahnValidator) *config.AutobahnFi } } +func makeTestGigaDeps() (*mempool.TxMempool, *types.GenesisDoc) { + app := kvstore.NewApplication() + txMempool := mempool.NewTxMempool( + mempool.TestConfig(), + app, + mempool.NopMetrics(), + mempool.NopTxConstraintsFetcher, + ) + genDoc := &types.GenesisDoc{ChainID: "test-chain", InitialHeight: 1} + return txMempool, genDoc +} + func TestBuildGigaConfig_EmptyPathErrors(t *testing.T) { nodeKey := makeTestNodeKey([]byte("test-node-key")) valKey := makeTestValidatorKey([]byte("val-seed")) - _, err := buildGigaConfig("", nodeKey, valKey, nil, nil) + txMempool, genDoc := makeTestGigaDeps() + _, err := buildGigaConfig("", nodeKey, valKey, txMempool, genDoc) assert.Error(t, err, "empty path should error") } @@ -94,9 +109,9 @@ func TestBuildGigaConfig_EnabledWithValidators(t *testing.T) { nodeKey := makeTestNodeKey([]byte("node1-seed")) valKey := makeTestValidatorKey([]byte("val1-seed")) - genDoc := &types.GenesisDoc{ChainID: "test-chain", InitialHeight: 1} + txMempool, genDoc := makeTestGigaDeps() - result, err := buildGigaConfig(cfgFile, nodeKey, valKey, nil, genDoc) + result, err := buildGigaConfig(cfgFile, nodeKey, valKey, txMempool, genDoc) require.NoError(t, err) require.NotNil(t, result) @@ -134,8 +149,9 @@ func TestBuildGigaConfig_NoneMaxTxsPerSecond(t *testing.T) { cfgFile := writeAutobahnConfig(t, fc) nodeKey := makeTestNodeKey([]byte("node-seed")) valKey := makeTestValidatorKey([]byte("val-seed")) + txMempool, genDoc := makeTestGigaDeps() - result, err := buildGigaConfig(cfgFile, nodeKey, valKey, nil, &types.GenesisDoc{InitialHeight: 1}) + result, err := buildGigaConfig(cfgFile, nodeKey, valKey, txMempool, genDoc) require.NoError(t, err) assert.False(t, result.Producer.MaxTxsPerSecond.IsPresent()) } @@ -146,8 +162,9 @@ func TestBuildGigaConfig_NonePersistentStateDir(t *testing.T) { cfgFile := writeAutobahnConfig(t, fc) nodeKey := makeTestNodeKey([]byte("node-seed")) valKey := makeTestValidatorKey([]byte("val-seed")) + txMempool, genDoc := makeTestGigaDeps() - result, err := buildGigaConfig(cfgFile, nodeKey, valKey, nil, &types.GenesisDoc{InitialHeight: 1}) + result, err := buildGigaConfig(cfgFile, nodeKey, valKey, txMempool, genDoc) require.NoError(t, err) assert.False(t, result.Consensus.PersistentStateDir.IsPresent()) } @@ -155,23 +172,24 @@ func TestBuildGigaConfig_NonePersistentStateDir(t *testing.T) { func TestBuildGigaConfig_InvalidConfigFile(t *testing.T) { nodeKey := makeTestNodeKey([]byte("node-seed")) valKey := makeTestValidatorKey([]byte("val-seed")) + txMempool, genDoc := makeTestGigaDeps() t.Run("missing file", func(t *testing.T) { - _, err := buildGigaConfig("/nonexistent/autobahn.json", nodeKey, valKey, nil, &types.GenesisDoc{InitialHeight: 1}) + _, err := buildGigaConfig("/nonexistent/autobahn.json", nodeKey, valKey, txMempool, genDoc) assert.Error(t, err) }) t.Run("invalid json", func(t *testing.T) { path := filepath.Join(t.TempDir(), "bad.json") require.NoError(t, os.WriteFile(path, []byte("not json"), 0644)) - _, err := buildGigaConfig(path, nodeKey, valKey, nil, &types.GenesisDoc{InitialHeight: 1}) + _, err := buildGigaConfig(path, nodeKey, valKey, txMempool, genDoc) assert.Error(t, err) }) t.Run("empty validators", func(t *testing.T) { fc := defaultFileConfig([]config.AutobahnValidator{}) cfgFile := writeAutobahnConfig(t, fc) - _, err := buildGigaConfig(cfgFile, nodeKey, valKey, nil, &types.GenesisDoc{InitialHeight: 1}) + _, err := buildGigaConfig(cfgFile, nodeKey, valKey, txMempool, genDoc) assert.Error(t, err) assert.Contains(t, err.Error(), "validators must not be empty") }) @@ -181,7 +199,7 @@ func TestBuildGigaConfig_InvalidConfigFile(t *testing.T) { fc := defaultFileConfig([]config.AutobahnValidator{v}) fc.MaxGasPerBlock = 0 cfgFile := writeAutobahnConfig(t, fc) - _, err := buildGigaConfig(cfgFile, nodeKey, valKey, nil, &types.GenesisDoc{InitialHeight: 1}) + _, err := buildGigaConfig(cfgFile, nodeKey, valKey, txMempool, genDoc) assert.Error(t, err) assert.Contains(t, err.Error(), "max_gas_per_block") }) @@ -196,8 +214,9 @@ func TestBuildGigaConfig_DuplicateValidatorKey(t *testing.T) { os.WriteFile(path, data, 0644) nodeKey := makeTestNodeKey([]byte("node1")) valKey := makeTestValidatorKey([]byte("val-seed")) + txMempool, genDoc := makeTestGigaDeps() - _, err := buildGigaConfig(path, nodeKey, valKey, nil, &types.GenesisDoc{InitialHeight: 1}) + _, err := buildGigaConfig(path, nodeKey, valKey, txMempool, genDoc) assert.Error(t, err) assert.Contains(t, err.Error(), "duplicate validator key") } @@ -211,8 +230,9 @@ func TestBuildGigaConfig_DuplicateNodeKey(t *testing.T) { os.WriteFile(path, data, 0644) nodeKey := makeTestNodeKey([]byte("same-node")) valKey := makeTestValidatorKey([]byte("val1")) + txMempool, genDoc := makeTestGigaDeps() - _, err := buildGigaConfig(path, nodeKey, valKey, nil, &types.GenesisDoc{InitialHeight: 1}) + _, err := buildGigaConfig(path, nodeKey, valKey, txMempool, genDoc) assert.Error(t, err) assert.Contains(t, err.Error(), "duplicate node key") } @@ -222,8 +242,9 @@ func TestBuildGigaConfig_SelfNotInValidators(t *testing.T) { cfgFile := writeAutobahnConfig(t, defaultFileConfig([]config.AutobahnValidator{v1})) nodeKey := makeTestNodeKey([]byte("my-node")) valKey := makeTestValidatorKey([]byte("my-val")) + txMempool, genDoc := makeTestGigaDeps() - _, err := buildGigaConfig(cfgFile, nodeKey, valKey, nil, &types.GenesisDoc{InitialHeight: 1}) + _, err := buildGigaConfig(cfgFile, nodeKey, valKey, txMempool, genDoc) assert.Error(t, err) assert.Contains(t, err.Error(), "validator key not found") } @@ -234,8 +255,9 @@ func TestBuildGigaConfig_NodeKeyMismatch(t *testing.T) { cfgFile := writeAutobahnConfig(t, defaultFileConfig([]config.AutobahnValidator{v1})) nodeKey := makeTestNodeKey([]byte("my-node")) valKey := makeTestValidatorKey([]byte("my-val")) + txMempool, genDoc := makeTestGigaDeps() - _, err := buildGigaConfig(cfgFile, nodeKey, valKey, nil, &types.GenesisDoc{InitialHeight: 1}) + _, err := buildGigaConfig(cfgFile, nodeKey, valKey, txMempool, genDoc) assert.Error(t, err) assert.Contains(t, err.Error(), "node key mismatch") } diff --git a/sei-tendermint/test/fuzz/tests/mempool_test.go b/sei-tendermint/test/fuzz/tests/mempool_test.go index 5a7b351e88..2fb7efa9ca 100644 --- a/sei-tendermint/test/fuzz/tests/mempool_test.go +++ b/sei-tendermint/test/fuzz/tests/mempool_test.go @@ -16,7 +16,7 @@ func FuzzMempool(f *testing.F) { cfg := config.DefaultMempoolConfig() cfg.Broadcast = false - mp := mempool.NewTxMempool(cfg, app, mempool.NopMetrics(), mempool.NopTxConstraintsFetcher) + mp := mempool.NewTxMempool(cfg.ToMempoolConfig(), app, mempool.NopMetrics(), mempool.NopTxConstraintsFetcher) f.Fuzz(func(t *testing.T, data []byte) { _ = mp.CheckTx(t.Context(), data, nil, mempool.TxInfo{})