Skip to content
Merged
Show file tree
Hide file tree
Changes from 45 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
3b3071d
removed unused function
pompon0 Apr 8, 2026
3c866a6
snapshot
pompon0 Apr 8, 2026
4764164
updated test
pompon0 Apr 8, 2026
3b992a1
more cleanup
pompon0 Apr 8, 2026
9fd05fc
snapshot
pompon0 Apr 8, 2026
1fd3453
wip
pompon0 Apr 8, 2026
a0ef9c7
WIP
pompon0 Apr 8, 2026
63e6bd3
wip
pompon0 Apr 8, 2026
254426b
wip
pompon0 Apr 8, 2026
44b9288
wip
pompon0 Apr 8, 2026
3061e49
wip
pompon0 Apr 8, 2026
47aa688
wip
pompon0 Apr 8, 2026
aaf3ee9
wip
pompon0 Apr 8, 2026
4e0faa9
addressed a flake
pompon0 Apr 8, 2026
bd8db80
Merge branch 'gprusak-mempool2' into gprusak-mempool3
pompon0 Apr 8, 2026
d2b6a21
wip
pompon0 Apr 8, 2026
4a3a579
snapshot
pompon0 Apr 9, 2026
59b9f59
removed notion of precheck
pompon0 Apr 9, 2026
28a2ff1
simplified errors
pompon0 Apr 9, 2026
fab6e8c
Merge remote-tracking branch 'origin/main' into gprusak-mempool2
pompon0 Apr 9, 2026
ea8519e
Merge branch 'gprusak-mempool2' into gprusak-mempool3
pompon0 Apr 9, 2026
15ed859
compilation fix
pompon0 Apr 9, 2026
ba81b8b
separated reactor
pompon0 Apr 9, 2026
4601391
wip
pompon0 Apr 9, 2026
b30fc91
concurrency fix
pompon0 Apr 9, 2026
05e69c6
non-nil args to buildGigaConfig
pompon0 Apr 9, 2026
0d3b1d0
mempool in GigaRouter
pompon0 Apr 9, 2026
f13a871
-1 in Reap
pompon0 Apr 9, 2026
cdc7047
disabled some components for autobahn mode
pompon0 Apr 10, 2026
a21d315
disabled mempool notifications when WaitForTxs is disabled
pompon0 Apr 10, 2026
0b1ed51
Merge branch 'main' into gprusak-mempool4
pompon0 Apr 10, 2026
b84cfda
Merge remote-tracking branch 'origin/main' into gprusak-mempool3
pompon0 Apr 10, 2026
a86ce40
Merge branch 'gprusak-mempool3' into gprusak-mempool4
pompon0 Apr 10, 2026
062f712
reverted err visibility
pompon0 Apr 10, 2026
9b5b8e6
restored comments
pompon0 Apr 10, 2026
c8e6d17
more comments
pompon0 Apr 10, 2026
708a690
broadcast fix
pompon0 Apr 10, 2026
b94c719
SpawnCritical
pompon0 Apr 10, 2026
029113b
moved ownership of mempool
pompon0 Apr 10, 2026
67ea52a
lint
pompon0 Apr 10, 2026
1e74249
Merge remote-tracking branch 'origin/main' into gprusak-mempool3
pompon0 Apr 10, 2026
f6be363
wip
pompon0 Apr 10, 2026
de584fa
mempool fix
pompon0 Apr 10, 2026
70c75a1
Merge remote-tracking branch 'origin/main' into gprusak-mempool3
pompon0 Apr 10, 2026
f5a18e0
Merge branch 'gprusak-mempool3' into gprusak-mempool4
pompon0 Apr 10, 2026
26e74ea
applied comments
pompon0 Apr 13, 2026
be79fac
keeping a tx mempool lock for Finalize + Commit + Update
pompon0 Apr 13, 2026
baac6b0
applied comments
pompon0 Apr 14, 2026
121b2e4
applied comments
pompon0 Apr 14, 2026
4dbc0d7
Merge remote-tracking branch 'origin/main' into gprusak-mempool4
pompon0 Apr 14, 2026
d217ee0
moved config
pompon0 Apr 14, 2026
5ed9b06
extracted config
pompon0 Apr 14, 2026
6a2cd95
deflake
pompon0 Apr 14, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 27 additions & 22 deletions sei-tendermint/internal/autobahn/producer/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"time"

"github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/consensus"
"github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/pb"
"github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/types"
"github.com/sei-protocol/sei-chain/sei-tendermint/internal/mempool"
"github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils"
"github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils/scope"
"golang.org/x/time/rate"
Expand All @@ -32,19 +32,18 @@ func (c *Config) maxTxsPerBlock() uint64 {

// State is the block producer state.
type State struct {
cfg *Config
// channel of transactions to build the next block from.
mempool chan *pb.Transaction
cfg *Config
txMempool *mempool.TxMempool
// consensus state to which published blocks will be reported.
consensus *consensus.State
}

// NewState constructs a new block producer state.
// Returns an error if the current node is NOT a producer.
func NewState(cfg *Config, consensus *consensus.State) *State {
func NewState(cfg *Config, txMempool *mempool.TxMempool, consensus *consensus.State) *State {
return &State{
cfg: cfg,
mempool: make(chan *pb.Transaction, cfg.MempoolSize),
txMempool: txMempool,
consensus: consensus,
}
}
Expand All @@ -54,21 +53,32 @@ func NewState(cfg *Config, consensus *consensus.State) *State {
func (s *State) makePayload(ctx context.Context) *types.Payload {
ctx, cancel := context.WithTimeout(ctx, s.cfg.BlockInterval)
defer cancel()
maxTxs := s.cfg.maxTxsPerBlock()
var totalGas uint64
var txs [][]byte
for totalGas < s.cfg.MaxGasPerBlock && uint64(len(txs)) < maxTxs {
tx, err := utils.Recv(ctx, s.mempool)
if err != nil {
break

if s.txMempool.NumTxsNotPending() == 0 {
select {
case <-ctx.Done():
case <-s.txMempool.TxsAvailable():
}
txs = append(txs, tx.Payload)
totalGas += tx.GasUsed
}

txs, totalGas := s.txMempool.ReapMaxTxsBytesMaxGas(
int(s.cfg.maxTxsPerBlock()), // nolint:gosec // config values fit into int on supported platforms.
Comment thread
pompon0 marked this conversation as resolved.
Outdated
utils.Max[int64](),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably should have maxBytes as well?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eventually yes

int64(s.cfg.MaxGasPerBlock), // nolint:gosec // config values stay within int64 range.
int64(s.cfg.MaxGasPerBlock), // nolint:gosec // config values stay within int64 range.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we won't have different maxGasWanted and maxGasEstimated in the future?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know, probably we will

)
s.txMempool.RemoveTxs(txs)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just an observation: I think it's okay here to acquire mutex twice, since if the tx disappeared in between RemoveTxs still work.
But maybe some day we should re-think the mutex use in mempool, it looks correct and efficient to me in the current context, but if we add more and more features then it might be harder to reason.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed, Mempool handling is already very fragile imo

payloadTxs := make([][]byte, 0, len(txs))
Comment thread
wen-coding marked this conversation as resolved.
for _, tx := range txs {
payloadTxs = append(payloadTxs, tx)
}
return types.PayloadBuilder{
CreatedAt: time.Now(),
TotalGas: totalGas,
Txs: txs,
// TODO: ReapMaxTxsBytesMaxGas does not handle corner cases correctly rn, which actually
// can produce negative total gas. Fixing it right away might be backward incompatible afaict,
// so we leave it as is for now.
Comment thread
pompon0 marked this conversation as resolved.
TotalGas: uint64(totalGas), // nolint:gosec // guaranteed to be positive
Txs: payloadTxs,
}.Build()
}

Expand All @@ -86,11 +96,6 @@ func (s *State) nextPayload(ctx context.Context) (*types.Payload, error) {
}
}

// PushToMempool pushes the transaction to the mempool.
func (s *State) PushToMempool(ctx context.Context, tx *pb.Transaction) error {
return utils.Send(ctx, s.mempool, tx)
}

// Run runs the background tasks of the producer state.
func (s *State) Run(ctx context.Context) error {
return scope.Run(ctx, func(ctx context.Context, scope scope.Scope) error {
Expand Down
3 changes: 2 additions & 1 deletion sei-tendermint/internal/consensus/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
sm "github.com/sei-protocol/sei-chain/sei-tendermint/internal/state"
"github.com/sei-protocol/sei-chain/sei-tendermint/internal/store"
"github.com/sei-protocol/sei-chain/sei-tendermint/internal/test/factory"
"github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils"
"github.com/sei-protocol/sei-chain/sei-tendermint/types"
)

Expand Down Expand Up @@ -247,7 +248,7 @@ func TestMempoolRmBadTx(t *testing.T) {

// check for the tx
for {
txs := cs.txMempool.ReapMaxBytesMaxGas(int64(len(txBytes)), -1, -1)
txs := cs.txMempool.ReapMaxBytesMaxGas(int64(len(txBytes)), utils.Max[int64](), utils.Max[int64]())
if len(txs) == 0 {
emptyMempoolCh <- struct{}{}
return
Expand Down
89 changes: 68 additions & 21 deletions sei-tendermint/internal/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@ import (
"github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils"
"github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils/scope"
"github.com/sei-protocol/sei-chain/sei-tendermint/types"
"github.com/sei-protocol/seilog"
)

// errTxInCache is returned to the client if we saw tx earlier.
var errTxInCache = errors.New("tx already exists in cache")
var logger = seilog.NewLogger("tendermint", "internal", "mempool")

// errTxTooLarge defines an error when a transaction is too big to be sent to peers.
var errTxTooLarge = errors.New("tx too large")
// ErrTxInCache is returned to the client if we saw tx earlier.
var ErrTxInCache = errors.New("tx already exists in cache")

// ErrTxTooLarge defines an error when a transaction is too big to be sent to peers.
var ErrTxTooLarge = errors.New("tx too large")

// Using SHA-256 truncated to 128 bits as the cache key: At 2K tx/sec, the
// collision probability is effectively zero (≈10^-29 for 120K keys in a minute,
Expand Down Expand Up @@ -158,6 +161,10 @@ func NewTxMempool(
return txmp
}

func (txmp *TxMempool) Config() *config.MempoolConfig { return txmp.config }

func (txmp *TxMempool) App() abci.Application { return txmp.app }

func (txmp *TxMempool) TxStore() *TxStore { return txmp.txStore }

// Lock obtains a write-lock on the mempool. A caller must be sure to explicitly
Expand Down Expand Up @@ -204,14 +211,14 @@ func (txmp *TxMempool) SizeBytes() int64 { return atomic.LoadInt64(&txmp.sizeByt

func (txmp *TxMempool) PendingSizeBytes() int64 { return atomic.LoadInt64(&txmp.pendingSizeBytes) }

// WaitForNextTx returns a blocking channel that will be closed when the next
// valid transaction is available to gossip. It is thread-safe.
func (txmp *TxMempool) WaitForNextTx() <-chan struct{} { return txmp.gossipIndex.WaitChan() }

// NextGossipTx returns the next valid transaction to gossip. A caller must wait
// for WaitForNextTx to signal a transaction is available to gossip first. It is
// thread-safe.
func (txmp *TxMempool) NextGossipTx() *clist.CElement { return txmp.gossipIndex.Front() }
// WaitForNextTx waits until the next transaction is available for gossip.
// Returns the next valid transaction to gossip.
func (txmp *TxMempool) WaitForNextTx(ctx context.Context) (*clist.CElement, error) {
if _, _, err := utils.RecvOrClosed(ctx, txmp.gossipIndex.WaitChan()); err != nil {
return nil, err
}
return txmp.gossipIndex.Front(), nil
}

// TxsAvailable returns a channel which fires once for every height, and only
// when transactions are available in the mempool. It is thread-safe.
Expand Down Expand Up @@ -269,14 +276,14 @@ func (txmp *TxMempool) CheckTx(
defer txmp.mtx.RUnlock()

if txSize := len(tx); txSize > txmp.config.MaxTxBytes {
return fmt.Errorf("%w: max size is %d, but got %d", errTxTooLarge, txmp.config.MaxTxBytes, txSize)
return fmt.Errorf("%w: max size is %d, but got %d", ErrTxTooLarge, txmp.config.MaxTxBytes, txSize)
}
constraints, err := txmp.txConstraintsFetcher()
if err != nil {
return fmt.Errorf("txmp.txConstraintsFetcher(): %w", err)
}
if txSize := types.ComputeProtoSizeForTxs([]types.Tx{tx}); txSize > constraints.MaxDataBytes {
return fmt.Errorf("%w: tx size is too big: %d, max: %d", errTxTooLarge, txSize, constraints.MaxDataBytes)
return fmt.Errorf("%w: tx size is too big: %d, max: %d", ErrTxTooLarge, txSize, constraints.MaxDataBytes)
}

// Reject low priority transactions when the mempool is more than
Expand Down Expand Up @@ -305,7 +312,7 @@ func (txmp *TxMempool) CheckTx(
// check if we've seen this transaction and error if we have.
if !txmp.cache.Push(txHash) {
txmp.txStore.GetOrSetPeerByTxHash(txHash, txInfo.SenderID)
return errTxInCache
return ErrTxInCache
}
txmp.metrics.CacheSize.Set(float64(txmp.cache.Size()))

Expand Down Expand Up @@ -482,7 +489,7 @@ func (txmp *TxMempool) Flush() {
// and gas constraints. The returned list starts with EVM transactions (in priority order),
// followed by non-EVM transactions (in priority order).
// There are 4 types of constraints.
// 1. maxBytes - stops pulling txs from mempool once maxBytes is hit. Can be set to -1 to be ignored.
// 1. maxBytes - stops pulling txs from mempool once maxBytes is hit.
// 2. maxGasWanted - stops pulling txs from mempool once total gas wanted exceeds maxGasWanted.
// Can be set to -1 to be ignored.
// 3. maxGasEstimated - similar to maxGasWanted but will use the estimated gas used for EVM txs
Expand All @@ -492,6 +499,31 @@ func (txmp *TxMempool) Flush() {
// - Transactions returned are not removed from the mempool transaction
// store or indexes.
func (txmp *TxMempool) ReapMaxBytesMaxGas(maxBytes, maxGasWanted, maxGasEstimated int64) types.Txs {
txs, _ := txmp.ReapMaxTxsBytesMaxGas(utils.Max[int](), maxBytes, maxGasWanted, maxGasEstimated)
return txs
}

// ReapMaxTxsBytesMaxGas returns a list of transactions within the provided tx,
// byte, and gas constraints together with the total estimated gas for the
// returned transactions.
//
// NOTE: Gas limits are enforced using int64 running totals. If those totals
// overflow, gas limit enforcement no longer works correctly. This preserves the
// historical behavior for backward compatibility.
func (txmp *TxMempool) ReapMaxTxsBytesMaxGas(maxTxs int, maxBytes, maxGasWanted, maxGasEstimated int64) (types.Txs, int64) {
if maxTxs < 0 {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's okay for maxTxs to be equal to 0?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose so

maxTxs = utils.Max[int]()
}
if maxBytes < 0 {
maxBytes = utils.Max[int64]()
}
if maxGasWanted < 0 {
maxGasWanted = utils.Max[int64]()
}
if maxGasEstimated < 0 {
maxGasEstimated = utils.Max[int64]()
}

txmp.mtx.Lock()
defer txmp.mtx.Unlock()

Expand All @@ -505,7 +537,7 @@ func (txmp *TxMempool) ReapMaxBytesMaxGas(maxBytes, maxGasWanted, maxGasEstimate
encounteredGasUnfit := false
if uint64(txmp.NumTxsNotPending()) < txmp.config.TxNotifyThreshold { //nolint:gosec // NumTxsNotPending returns non-negative value
// do not reap anything if threshold is not met
return []types.Tx{}
return []types.Tx{}, 0
}
totalTxs := txmp.priorityIndex.NumTxs()
evmTxs := make([]types.Tx, 0, totalTxs)
Expand All @@ -514,7 +546,7 @@ func (txmp *TxMempool) ReapMaxBytesMaxGas(maxBytes, maxGasWanted, maxGasEstimate
size := types.ComputeProtoSizeForTxs([]types.Tx{wtx.tx})

// bytes limit is a hard stop
if maxBytes > -1 && totalSize+size > maxBytes {
if totalSize+size > maxBytes {
Comment thread
pompon0 marked this conversation as resolved.
Outdated
return false
}

Expand All @@ -531,8 +563,8 @@ func (txmp *TxMempool) ReapMaxBytesMaxGas(maxBytes, maxGasWanted, maxGasEstimate
prospectiveGasWanted := totalGasWanted + wtx.gasWanted
prospectiveGasEstimated := totalGasEstimated + txGasEstimate

maxGasWantedExceeded := maxGasWanted > -1 && prospectiveGasWanted > maxGasWanted
maxGasEstimatedExceeded := maxGasEstimated > -1 && prospectiveGasEstimated > maxGasEstimated
maxGasWantedExceeded := prospectiveGasWanted > maxGasWanted
maxGasEstimatedExceeded := prospectiveGasEstimated > maxGasEstimated

if maxGasWantedExceeded || maxGasEstimatedExceeded {
// skip this unfit-by-gas tx once and attempt to pull up to 10 smaller ones
Expand All @@ -557,10 +589,25 @@ func (txmp *TxMempool) ReapMaxBytesMaxGas(maxBytes, maxGasWanted, maxGasEstimate
if encounteredGasUnfit && numTxs >= MinTxsToPeek {
return false
}
if numTxs >= maxTxs {
return false
}
return true
})

return append(evmTxs, nonEvmTxs...)
return append(evmTxs, nonEvmTxs...), totalGasEstimated
}

// RemoveTxs removes the provided transactions from the mempool if present.
func (txmp *TxMempool) RemoveTxs(txs types.Txs) {
txmp.Lock()
defer txmp.Unlock()

for _, tx := range txs {
if wtx := txmp.txStore.GetTxByHash(tx.Key()); wtx != nil {
txmp.removeTx(wtx, false, true, true)
}
}
}

// ReapMaxTxs returns a list of transactions within the provided number of
Expand Down
Loading
Loading