Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion op-conductor/conductor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ func (c *OpConductor) initConsensus(ctx context.Context) error {
return nil
}

c.log.Info("initializing raft consensus", "backend", c.cfg.RaftBackend, "storage_dir", c.cfg.RaftStorageDir, "server_id", c.cfg.RaftServerID)

raftConsensusConfig := &consensus.RaftConsensusConfig{
ServerID: c.cfg.RaftServerID,
// AdvertisedAddr may be empty: the server will then default to what it binds to.
Expand Down Expand Up @@ -296,6 +298,13 @@ func (oc *OpConductor) initRPCServer(ctx context.Context) error {
Service: api,
})

// Binary SSZ commit endpoint. Sized to comfortably fit a 10MB SSZ block;
// raise alongside the JSON-RPC body limit if larger blocks are needed.
server.AddHandler(
conductorrpc.CommitUnsafePayloadPath,
conductorrpc.BinaryCommitHandler(oc.log, oc, 16*1024*1024, oc.metrics),
)

if oc.cfg.RPCEnableProxy {
execClient, err := dial.DialEthClientWithTimeout(ctx, 1*time.Minute, oc.log, oc.cfg.ExecutionRPC)
if err != nil {
Expand Down Expand Up @@ -466,7 +475,7 @@ func (oc *OpConductor) Start(ctx context.Context) error {
oc.wg.Add(1)
go oc.loop()

oc.metrics.RecordInfo(oc.version)
oc.metrics.RecordInfo(oc.version, oc.cfg.RaftBackend)
oc.metrics.RecordUp()

oc.log.Info("OpConductor started")
Expand Down Expand Up @@ -640,6 +649,12 @@ func (oc *OpConductor) CommitUnsafePayload(_ context.Context, payload *eth.Execu
return oc.cons.CommitUnsafePayload(payload)
}

// CommitUnsafePayloadSSZ commits a pre-SSZ-encoded unsafe payload to the cluster FSM.
// Used by the binary HTTP endpoint to avoid the JSON-decode -> SSZ-marshal round trip.
func (oc *OpConductor) CommitUnsafePayloadSSZ(_ context.Context, ssz []byte) error {
return oc.cons.CommitUnsafePayloadSSZ(ssz)
}

// SequencerHealthy returns true if sequencer is healthy.
func (oc *OpConductor) SequencerHealthy(_ context.Context) bool {
return oc.healthy.Load()
Expand Down
4 changes: 4 additions & 0 deletions op-conductor/consensus/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ type Consensus interface {

// CommitUnsafePayload commits latest unsafe payload to the FSM in a strongly consistent fashion.
CommitUnsafePayload(payload *eth.ExecutionPayloadEnvelope) error
// CommitUnsafePayloadSSZ commits a pre-SSZ-encoded unsafe payload to the FSM,
// skipping the marshal step. The bytes are handed directly to raft.Apply and
// validated by the FSM on receive. Used by the binary HTTP endpoint.
CommitUnsafePayloadSSZ(ssz []byte) error
// LatestUnsafePayload returns the latest unsafe payload from FSM in a strongly consistent fashion.
LatestUnsafePayload() (*eth.ExecutionPayloadEnvelope, error)

Expand Down
45 changes: 45 additions & 0 deletions op-conductor/consensus/mocks/Consensus.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions op-conductor/consensus/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,30 @@ func (rc *RaftConsensus) CommitUnsafePayload(payload *eth.ExecutionPayloadEnvelo
return nil
}

// CommitUnsafePayloadSSZ implements Consensus. The bytes are passed directly to
// raft.Apply; the FSM validates by attempting UnmarshalSSZ on receive. This
// avoids the SSZ marshal step (and, when callers send SSZ over the wire, the
// JSON-decode-then-SSZ-marshal round trip the typed entrypoint requires).
func (rc *RaftConsensus) CommitUnsafePayloadSSZ(ssz []byte) error {
if len(ssz) == 0 {
return errors.New("empty payload")
}

applyStart := time.Now()
f := rc.r.Apply(ssz, defaultTimeout)
if err := f.Error(); err != nil {
return errors.Wrap(err, "failed to apply payload envelope")
}
applyDur := time.Since(applyStart)

if rc.metrics != nil {
// No marshal step on this path.
rc.metrics.RecordCommitDuration(0, applyDur.Seconds())
rc.metrics.RecordCommitPayloadSize(float64(len(ssz)))
}
return nil
}

type instrumentedLogStore struct {
raft.LogStore
metrics ConsensusMetrics
Expand Down
34 changes: 26 additions & 8 deletions op-conductor/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
const Namespace = "op_conductor"

type Metricer interface {
RecordInfo(version string)
RecordInfo(version, raftBackend string)
RecordUp()
RecordStateChange(leader bool, healthy bool, active bool)
RecordLeaderTransfer(success bool)
Expand All @@ -22,6 +22,10 @@ type Metricer interface {
RecordLoopExecutionTime(duration float64)
RecordRollupBoostConnectionAttempts(success bool, source string)
RecordWebSocketClientCount(count int)
// RecordBinaryCommitDuration records end-to-end handler duration for
// POST /commit-unsafe-payload requests. The equivalent metric for the
// JSON-RPC path is rpc_server_request_duration_seconds{method=conductor_commitUnsafePayload}.
RecordBinaryCommitDuration(seconds float64, success bool)
opmetrics.RPCMetricer
consensus.ConsensusMetrics
}
Expand Down Expand Up @@ -50,11 +54,12 @@ type Metrics struct {
loopExecutionTime prometheus.Histogram
webSocketClients prometheus.Gauge

commitMarshalDuration prometheus.Histogram
commitRaftApplyDuration prometheus.Histogram
commitPayloadSize prometheus.Histogram
fsmApplyDuration prometheus.Histogram
logStoreDuration prometheus.Histogram
commitMarshalDuration prometheus.Histogram
commitRaftApplyDuration prometheus.Histogram
commitPayloadSize prometheus.Histogram
fsmApplyDuration prometheus.Histogram
logStoreDuration prometheus.Histogram
binaryCommitRequestDuration *prometheus.HistogramVec
}

func (m *Metrics) Registry() *prometheus.Registry {
Expand All @@ -80,6 +85,7 @@ func NewMetrics() *Metrics {
Help: "Pseudo-metric tracking version and config info",
}, []string{
"version",
"raft_backend",
}),
up: factory.NewGauge(prometheus.GaugeOpts{
Namespace: Namespace,
Expand Down Expand Up @@ -161,6 +167,14 @@ func NewMetrics() *Metrics {
Help: "Time (in seconds) spent writing raft log entries to the underlying log store",
Buckets: []float64{.0001, .00025, .0005, .001, .0025, .005, .01, .025, .05},
}),
binaryCommitRequestDuration: factory.NewHistogramVec(prometheus.HistogramOpts{
Namespace: Namespace,
Name: "binary_commit_request_duration_seconds",
Help: "End-to-end handler duration for POST /commit-unsafe-payload requests. " +
"Directly comparable to rpc_server_request_duration_seconds{method=conductor_commitunsafepayload} " +
"on the JSON-RPC path.",
Buckets: []float64{.001, .0025, .005, .01, .025, .05, .1, .25, .5},
}, []string{"success"}),
}
}

Expand All @@ -170,8 +184,8 @@ func (m *Metrics) Start(host string, port int) (*httputil.HTTPServer, error) {

// RecordInfo sets a pseudo-metric that contains versioning and
// config info for the op-proposer.
func (m *Metrics) RecordInfo(version string) {
m.info.WithLabelValues(version).Set(1)
func (m *Metrics) RecordInfo(version, raftBackend string) {
m.info.WithLabelValues(version, raftBackend).Set(1)
}

// RecordUp sets the up metric to 1.
Expand Down Expand Up @@ -239,3 +253,7 @@ func (m *Metrics) RecordFSMApplyDuration(seconds float64) {
func (m *Metrics) RecordLogStoreDuration(seconds float64) {
m.logStoreDuration.Observe(seconds)
}

func (m *Metrics) RecordBinaryCommitDuration(seconds float64, success bool) {
m.binaryCommitRequestDuration.WithLabelValues(strconv.FormatBool(success)).Observe(seconds)
}
7 changes: 4 additions & 3 deletions op-conductor/metrics/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ type NoopMetricsImpl struct {

var NoopMetrics Metricer = new(NoopMetricsImpl)

func (*NoopMetricsImpl) RecordInfo(version string) {}
func (*NoopMetricsImpl) RecordInfo(version, raftBackend string) {}
func (*NoopMetricsImpl) RecordUp() {}
func (*NoopMetricsImpl) RecordStateChange(leader bool, healthy bool, active bool) {}
func (*NoopMetricsImpl) RecordLeaderTransfer(success bool) {}
Expand All @@ -20,5 +20,6 @@ func (*NoopMetricsImpl) RecordRollupBoostConnectionAttempts(success bool, source
func (*NoopMetricsImpl) RecordWebSocketClientCount(count int) {}
func (*NoopMetricsImpl) RecordCommitDuration(marshalSec, raftApplySec float64) {}
func (*NoopMetricsImpl) RecordCommitPayloadSize(payloadBytes float64) {}
func (*NoopMetricsImpl) RecordFSMApplyDuration(seconds float64) {}
func (*NoopMetricsImpl) RecordLogStoreDuration(seconds float64) {}
func (*NoopMetricsImpl) RecordFSMApplyDuration(seconds float64) {}
func (*NoopMetricsImpl) RecordLogStoreDuration(seconds float64) {}
func (*NoopMetricsImpl) RecordBinaryCommitDuration(seconds float64, success bool) {}
112 changes: 112 additions & 0 deletions op-conductor/rpc/binary.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package rpc

import (
"context"
"errors"
"fmt"
"io"
"net/http"
"time"

"github.com/ethereum/go-ethereum/log"
)

// CommitUnsafePayloadPath is the HTTP route for the SSZ binary commit endpoint.
// External clients (op-node, base's Rust CL replacement, etc.) POST a raw
// SSZ-encoded ExecutionPayloadEnvelope here. The body is handed verbatim to
// raft.Apply; the FSM validates by attempting UnmarshalSSZ on receive.
//
// Wire format:
// - method: POST
// - path: /commit-unsafe-payload
// - content-type: application/octet-stream
// - body: SSZ-encoded ExecutionPayloadEnvelope (no length prefix,
// body length implies SSZ scope; current FSM tries V4 then
// V3, matching the JSON-RPC path).
// - response: 200 on success, 4xx for client errors, 5xx for raft
// errors. Body is empty on 200, plain-text error message
// otherwise.
const CommitUnsafePayloadPath = "/commit-unsafe-payload"

// SSZContentType is the content type clients should send for the binary endpoint.
const SSZContentType = "application/octet-stream"

// commitSSZBackend is the subset of the conductor backend the binary endpoint needs.
type commitSSZBackend interface {
CommitUnsafePayloadSSZ(ctx context.Context, ssz []byte) error
}

// BinaryCommitRecorder records latency for the binary commit endpoint.
// Implement this with a Prometheus histogram to get a metric comparable to
// op_conductor_rpc_server_request_duration_seconds on the JSON-RPC path.
type BinaryCommitRecorder interface {
RecordBinaryCommitDuration(seconds float64, success bool)
}

// BinaryCommitHandler returns an http.Handler that accepts SSZ-encoded payloads
// and forwards them to the conductor's raft layer. maxBodyBytes caps the
// request body to prevent DoS; 0 means no cap (not recommended).
// recorder may be nil (metrics disabled).
func BinaryCommitHandler(lgr log.Logger, backend commitSSZBackend, maxBodyBytes int64, recorder BinaryCommitRecorder) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
if r.Method != http.MethodPost {
w.Header().Set("Allow", "POST")
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
if ct := r.Header.Get("Content-Type"); ct != "" && ct != SSZContentType {
http.Error(w, fmt.Sprintf("unsupported content-type %q, want %s", ct, SSZContentType), http.StatusUnsupportedMediaType)
return
}

body := r.Body
if maxBodyBytes > 0 {
// Reject upfront if Content-Length declares an over-limit body.
if r.ContentLength > maxBodyBytes {
http.Error(w, fmt.Sprintf("payload too large: %d > %d", r.ContentLength, maxBodyBytes), http.StatusRequestEntityTooLarge)
return
}
body = http.MaxBytesReader(w, r.Body, maxBodyBytes)
}

// When Content-Length is set, pre-allocate the exact buffer and use
// ReadFull. Avoids io.ReadAll's grow-and-copy. ~10% faster end-to-end
// for multi-MB bodies; pure win when the client sends Content-Length
// (every standard HTTP client does).
var ssz []byte
var err error
if r.ContentLength > 0 {
ssz = make([]byte, r.ContentLength)
_, err = io.ReadFull(body, ssz)
} else {
ssz, err = io.ReadAll(body)
}
if err != nil {
var maxErr *http.MaxBytesError
if errors.As(err, &maxErr) {
http.Error(w, fmt.Sprintf("payload too large: > %d bytes", maxErr.Limit), http.StatusRequestEntityTooLarge)
return
}
http.Error(w, fmt.Sprintf("read body: %v", err), http.StatusBadRequest)
return
}
if len(ssz) == 0 {
http.Error(w, "empty payload", http.StatusBadRequest)
return
}

if err := backend.CommitUnsafePayloadSSZ(r.Context(), ssz); err != nil {
lgr.Warn("failed to commit unsafe payload (binary)", "err", err, "size", len(ssz))
if recorder != nil {
recorder.RecordBinaryCommitDuration(time.Since(start).Seconds(), false)
}
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if recorder != nil {
recorder.RecordBinaryCommitDuration(time.Since(start).Seconds(), true)
}
w.WriteHeader(http.StatusOK)
})
}
Loading