-
Notifications
You must be signed in to change notification settings - Fork 879
Expand file tree
/
Copy pathstate.go
More file actions
130 lines (118 loc) · 4.01 KB
/
state.go
File metadata and controls
130 lines (118 loc) · 4.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package producer
import (
"context"
"fmt"
"time"
"github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/consensus"
"github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/types"
"github.com/sei-protocol/sei-chain/sei-tendermint/internal/mempool"
"github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils"
"github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils/scope"
"golang.org/x/time/rate"
)
// Config is the config of the block scope.
type Config struct {
MaxGasPerBlock uint64
MaxTxsPerBlock uint64
MaxTxsPerSecond utils.Option[uint64]
MempoolSize uint64
BlockInterval time.Duration
AllowEmptyBlocks bool
}
// minTxGas is the minimum gas cost of an evm tx.
const minTxGas = 21000
func (c *Config) maxTxsPerBlock() uint64 {
return min(c.MaxTxsPerBlock, c.MaxGasPerBlock/minTxGas)
}
// State is the block producer state.
type State struct {
cfg *Config
txMempool *mempool.TxMempool
// consensus state to which published blocks will be reported.
consensus *consensus.State
}
// NewState constructs a new block producer state.
// Returns an error if the current node is NOT a producer.
func NewState(cfg *Config, txMempool *mempool.TxMempool, consensus *consensus.State) *State {
return &State{
cfg: cfg,
txMempool: txMempool,
consensus: consensus,
}
}
// makePayload constructs payload for the next produced block.
// It waits for enough transactions OR until `cfg.BlockInterval` passes.
func (s *State) makePayload(ctx context.Context) *types.Payload {
ctx, cancel := context.WithTimeout(ctx, s.cfg.BlockInterval)
defer cancel()
if s.txMempool.NumTxsNotPending() == 0 {
select {
case <-ctx.Done():
case <-s.txMempool.TxsAvailable():
}
}
txs, gasEstimated := s.txMempool.PopTxs(mempool.ReapLimits{
MaxTxs: utils.Some(min(types.MaxTxsPerBlock, s.cfg.maxTxsPerBlock())),
MaxBytes: utils.Some(utils.Clamp[int64](types.MaxTxsBytesPerBlock)),
MaxGasWanted: utils.Some(utils.Clamp[int64](s.cfg.MaxGasPerBlock)),
MaxGasEstimated: utils.Some(utils.Clamp[int64](s.cfg.MaxGasPerBlock)),
})
payloadTxs := make([][]byte, 0, len(txs))
for _, tx := range txs {
payloadTxs = append(payloadTxs, tx)
}
payload, err := types.PayloadBuilder{
CreatedAt: time.Now(),
// TODO: ReapMaxTxsBytesMaxGas does not handle corner cases correctly rn, which actually
// can produce negative total gas. Fixing it right away might be backward incompatible afaict,
// so we leave it as is for now.
TotalGas: uint64(gasEstimated), // nolint:gosec
Txs: payloadTxs,
}.Build()
// This should never happen: we construct the payload from correctly sized data.
if err != nil {
panic(fmt.Errorf("PayloadBuilder{}.Build(): %w", err))
}
return payload
}
// nextPayload constructs the payload for the next block.
// Wrapper of makePayload which ensures that the block is not empty (if required).
func (s *State) nextPayload(ctx context.Context) (*types.Payload, error) {
for {
payload := s.makePayload(ctx)
if ctx.Err() != nil {
return nil, ctx.Err()
}
if len(payload.Txs()) > 0 || s.cfg.AllowEmptyBlocks {
return payload, nil
}
}
}
// Run runs the background tasks of the producer state.
func (s *State) Run(ctx context.Context) error {
return scope.Run(ctx, func(ctx context.Context, scope scope.Scope) error {
// Construct blocks from mempool.
limit := rate.Inf
burst := 1
if l, ok := s.cfg.MaxTxsPerSecond.Get(); ok {
limit = rate.Limit(l)
burst = int(l + s.cfg.MaxTxsPerBlock) // nolint:gosec
}
limiter := rate.NewLimiter(limit, burst)
for {
if err := s.consensus.WaitForCapacity(ctx); err != nil {
return fmt.Errorf("s.Data().WaitForCapacity(): %w", err)
}
payload, err := s.nextPayload(ctx)
if err != nil {
return fmt.Errorf("s.nextPayload(): %w", err)
}
if _, err := s.consensus.ProduceBlock(ctx, payload); err != nil {
return fmt.Errorf("s.Data().PushBlock(): %w", err)
}
if err := limiter.WaitN(ctx, len(payload.Txs())); err != nil {
return fmt.Errorf("limiter(): %w", err)
}
}
})
}