diff --git a/Cargo.lock b/Cargo.lock index d506bd8ceb..669099e54f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5722,7 +5722,7 @@ dependencies = [ "libc", "percent-encoding", "pin-project-lite", - "socket2 0.5.10", + "socket2 0.6.4", "system-configuration 0.5.1", "tokio", "tower-service", @@ -7944,7 +7944,7 @@ dependencies = [ "quinn-udp", "rustc-hash", "rustls", - "socket2 0.5.10", + "socket2 0.6.4", "thiserror 2.0.18", "tokio", "tracing", @@ -7984,7 +7984,7 @@ dependencies = [ "cfg_aliases", "libc", "once_cell", - "socket2 0.5.10", + "socket2 0.6.4", "tracing", "windows-sys 0.60.2", ] @@ -8589,6 +8589,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6b92b125634d9b795e7beca796cc790df15a7fb38323bf3196fda83292d06b1f" dependencies = [ "aws-lc-rs", + "log", "once_cell", "ring", "rustls-pki-types", @@ -9852,11 +9853,14 @@ version = "0.1.0" dependencies = [ "async-trait", "bytes", + "dashmap 6.1.0", "derive_more 1.0.0", "solana-client", "solana-sdk", "thiserror 1.0.69", + "tokio", "tracing", + "yellowstone-grpc-client", "yellowstone-grpc-proto", ] @@ -12289,8 +12293,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f32a6f80051a4111560201420c7885d0082ba9efe2ab61875c587bb6b18b9a0" dependencies = [ "async-trait", + "axum 0.8.8", "base64 0.22.1", "bytes", + "flate2", + "h2 0.4.13", "http 1.4.0", "http-body 1.0.1", "http-body-util", @@ -12299,13 +12306,17 @@ dependencies = [ "hyper-util", "percent-encoding", "pin-project", + "rustls-native-certs", + "socket2 0.6.4", "sync_wrapper 1.0.2", "tokio", + "tokio-rustls", "tokio-stream", "tower 0.5.3", "tower-layer", "tower-service", "tracing", + "zstd", ] [[package]] @@ -12320,6 +12331,19 @@ dependencies = [ "syn 2.0.118", ] +[[package]] +name = "tonic-health" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4ff0636fef47afb3ec02818f5bceb4377b8abb9d6a386aeade18bd6212f8eb7" +dependencies = [ + "prost 0.14.3", + "tokio", + "tokio-stream", + "tonic 0.14.4", + "tonic-prost", +] + [[package]] name = "tonic-prost" version = "0.14.4" @@ -13600,6 +13624,26 @@ version = "0.13.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "66fee0b777b0f5ac1c69bb06d361268faafa61cd4682ae064a171c16c433e9e4" +[[package]] +name = "yellowstone-grpc-client" +version = "13.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad8d3827e1b27fe865ded7a4b1afdab34fbd427f9ffdd966a7c17c9fcde43181" +dependencies = [ + "arc-swap", + "bytes", + "futures", + "hyper-util", + "log", + "pin-project", + "thiserror 2.0.18", + "tokio", + "tonic 0.14.4", + "tonic-health", + "tower 0.4.13", + "yellowstone-grpc-proto", +] + [[package]] name = "yellowstone-grpc-proto" version = "12.5.0" @@ -13613,6 +13657,8 @@ dependencies = [ "siphasher 1.0.3", "solana-pubkey 4.2.0", "thiserror 2.0.18", + "tonic 0.14.4", + "tonic-prost", "tonic-prost-build", ] diff --git a/Cargo.toml b/Cargo.toml index b1fed72582..5892528422 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -149,6 +149,7 @@ tracing-subscriber = { version = "0.3.22", features = ["json"] } url = "2.5.0" vergen = "8" winner-selection = { path = "crates/winner-selection" } +yellowstone-grpc-client = "13.1.0" yellowstone-grpc-proto = { version = "12.4.0", default-features = false } [workspace.lints] diff --git a/crates/solana-indexer/Cargo.toml b/crates/solana-indexer/Cargo.toml index f9f840b4e5..b4333d83a4 100644 --- a/crates/solana-indexer/Cargo.toml +++ b/crates/solana-indexer/Cargo.toml @@ -18,11 +18,14 @@ path = "src/main.rs" [dependencies] async-trait = { workspace = true } bytes = { workspace = true } +dashmap = { workspace = true } derive_more = { workspace = true } solana-client = { workspace = true } solana-sdk = { workspace = true } thiserror = { workspace = true } +tokio = { workspace = true, features = ["macros", "rt-multi-thread", "sync"] } tracing = { workspace = true } +yellowstone-grpc-client = { workspace = true } yellowstone-grpc-proto = { workspace = true } [lints] diff --git a/crates/solana-indexer/src/indexer/decoder.rs b/crates/solana-indexer/src/indexer/decoder.rs new file mode 100644 index 0000000000..783d9e3b67 --- /dev/null +++ b/crates/solana-indexer/src/indexer/decoder.rs @@ -0,0 +1,71 @@ +#![expect(dead_code)] +//! The decoder pulls `StreamUpdate`s from the ingester, decodes +//! settlement-program and SolFlow transactions, joins account-update snapshots, +//! and persists typed events. + +// TODO: This file only declares the component skeleton. The `run` body is +// `unimplemented!`; the dispatch logic and persist path arrive in a later +// change. + +use { + crate::{ + persistence::Persistence, + types::{ + errors::PersistenceError, + shared::{PartialEvent, PartialEventKey, StreamUpdate}, + }, + }, + dashmap::DashMap, + solana_sdk::pubkey::Pubkey, + std::sync::Arc, + tokio::sync::mpsc::Receiver, +}; + +/// Decoder component. +/// +/// The watchdog holds a clone of the same `partials` map, so the two operate on +/// the same concurrent map without any message passing between them. +pub(crate) struct Decoder { + /// Persistence layer. + pub persistence: Persistence, + + /// Incoming `StreamUpdate` from the ingester. + pub rx: Receiver, + + /// Shared in-memory map of partial events keyed by `PartialEventKey`, + /// holding either-half events waiting for their pair. The watchdog holds a + /// clone of this `Arc`. + pub partials: Arc>, + + /// Settlement program id (filter target for the decoder). + pub settlement_program: Pubkey, + + /// SolFlow program id (filter target for the decoder). + pub solflow_program: Pubkey, +} + +impl Decoder { + /// Construct a new decoder. The caller owns the channel capacity decision. + pub fn new( + persistence: Persistence, + rx: Receiver, + partials: Arc>, + settlement_program: Pubkey, + solflow_program: Pubkey, + ) -> Self { + Self { + persistence, + rx, + partials, + settlement_program, + solflow_program, + } + } + + /// Main loop. Pulls `StreamUpdate` from the receiver, runs the decode + /// pipeline, persists, and records partial events in the shared map for the + /// watchdog to read. + pub async fn run(&mut self) -> Result<(), PersistenceError> { + unimplemented!() + } +} diff --git a/crates/solana-indexer/src/indexer/finalization.rs b/crates/solana-indexer/src/indexer/finalization.rs new file mode 100644 index 0000000000..54a18b219a --- /dev/null +++ b/crates/solana-indexer/src/indexer/finalization.rs @@ -0,0 +1,66 @@ +#![expect(dead_code)] +//! The finalization worker updates the commitment level of the transactions +//! tracked by the indexer, promoting rows written at `confirmed` to +//! `finalized`. +//! +//! It does so through two flows. Two are needed because the relevant RPC +//! methods trade off differently: `getSignatureStatuses` is batchable but the +//! node only retains statuses for recent slots, while `getTransaction` reaches +//! arbitrarily old transactions on archival nodes but costs one call per +//! signature. The batched pass handles the common case cheaply; the per-row +//! sweep catches rows that age out of it. +//! +//! - **Promotion pass**: batch-polls `getSignatureStatuses` (at most +//! [`PROMOTION_BATCH_LIMIT`] signatures per call) over rows still at +//! `confirmed` that are at least [`FINALIZATION_WINDOW_SLOTS`] behind the +//! chain tip, and promotes rows whose `confirmationStatus` is `"finalized"`. +//! +//! - **Aged-row sweep**: fallback for rows past the signature-status retention +//! horizon ([`SIGNATURE_STATUS_RETENTION_SLOTS`]), which the promotion pass +//! can no longer check. Each row costs one `getTransaction` call; a non-null +//! response promotes to `finalized`, a null response marks `rolled_back`. + +// TODO: This file only declares the component skeleton. The `run` body is +// `unimplemented!`; both flows arrive in a later change. + +use { + crate::{persistence::Persistence, traits::solana_client::SolanaClient}, + std::sync::Arc, +}; + +/// Slots a transaction usually needs to finalize (~12.8 s at 400 ms/slot). +/// A heuristic floor, not a guarantee: the promotion pass skips rows fresher +/// than this because they cannot have finalized yet, and degraded consensus +/// can push real finalization later (the aged-row sweep catches those). +pub const FINALIZATION_WINDOW_SLOTS: u64 = 32; + +/// Upper limit for the `getSignatureStatuses` batch RPC call. +pub const PROMOTION_BATCH_LIMIT: usize = 256; + +/// Approximate slot horizon past which `getSignatureStatuses` no longer returns +/// a result. +pub const SIGNATURE_STATUS_RETENTION_SLOTS: u64 = 150; + +/// Transaction finalization worker. See the module docs for the two flows it +/// runs. +pub(crate) struct FinalizationWorker { + /// Persistence layer. + pub persistence: Persistence, + + /// RPC implementor. + pub rpc: Arc, +} + +impl FinalizationWorker { + /// Construct a new finalization worker. + pub fn new(persistence: Persistence, rpc: Arc) -> Self { + Self { persistence, rpc } + } + + /// Outer loop. Runs the promotion pass and the aged-row sweep on a timer. + /// + /// Placeholder for now; implemented in a later change. + pub async fn run(&mut self) { + unimplemented!("implemented in PR 11–12") + } +} diff --git a/crates/solana-indexer/src/indexer/ingester.rs b/crates/solana-indexer/src/indexer/ingester.rs new file mode 100644 index 0000000000..6dfb84ce99 --- /dev/null +++ b/crates/solana-indexer/src/indexer/ingester.rs @@ -0,0 +1,63 @@ +#![expect(dead_code)] +//! The ingester owns the yellowstone gRPC stream. It drains the socket as fast +//! as yellowstone delivers, pushes tagged updates into the channel, and updates +//! `LATEST_CHAIN_SLOT` on every slot-filter message. It performs no decoding. + +// TODO: This file only declares the component skeleton. The `run` body is +// `unimplemented!`; the actual drain and reconnect with backoff logic arrives +// in a later change. + +use { + crate::{persistence::Persistence, types::shared::StreamUpdate}, + std::sync::atomic::AtomicU64, + tokio::sync::mpsc::Sender, + yellowstone_grpc_client::GrpcConnector, +}; + +/// The sole writer is the ingester, on every slot-filter message. Anchors the +/// partial-event watchdog and the finalization worker. Cold start is zero; the +/// watchdog skips its comparison on the first tick. +/// +/// This is the chain tip, not indexing progress. How far the indexer has +/// actually persisted is the watermark in `solana.indexer_state`, written by +/// the decoder, which is a separate value. +pub static LATEST_CHAIN_SLOT: AtomicU64 = AtomicU64::new(0); + +/// Cap on the exponential backoff between reconnect attempts. +pub const RECONNECT_BACKOFF_CAP: std::time::Duration = std::time::Duration::from_secs(30); + +/// Capacity of the channel from the ingester to the decoder. +pub const INGEST_TO_DECODER_CAPACITY: usize = 1024; + +/// Ingester component. +/// +/// Generic over a `GrpcConnector` implementor so the unit tests can drive it +/// with a mock. +pub(crate) struct Ingester { + /// gRPC connector implementor + pub connector: C, + + /// Sends `StreamUpdate` to the decoder. Should be bounded to + /// `INGEST_TO_DECODER_CAPACITY` entries. + pub tx: Sender, + + /// Persistence layer, used to checkpoint the slot. + pub persistence: Persistence, +} + +impl Ingester { + /// Construct a new ingester. The caller owns the channel capacity decision. + pub fn new(connector: C, tx: Sender, persistence: Persistence) -> Self { + Self { + connector, + tx, + persistence, + } + } + + /// TODO: Outer loop: open the subscription, drain it, push into the + /// channel, reconnect on failure with exponential backoff. + pub async fn run(&mut self) { + unimplemented!() + } +} diff --git a/crates/solana-indexer/src/indexer/mod.rs b/crates/solana-indexer/src/indexer/mod.rs new file mode 100644 index 0000000000..ab816b7bec --- /dev/null +++ b/crates/solana-indexer/src/indexer/mod.rs @@ -0,0 +1,40 @@ +//! Consumer components of the Solana settlement indexer. +//! +//! The four components and their roles: +//! +//! - [`Ingester`]: subscribes to the Yellowstone gRPC stream and drains it as +//! fast as updates arrive, forwarding them to the decoder. It does no +//! decoding itself, so the socket never backs up behind slow processing. It +//! is also the single writer of the "latest chain slot" counter that the +//! other components use to know how far the chain has advanced. +//! +//! - [`Decoder`]: receives the raw stream updates, picks out transactions +//! belonging to the settlement and SolFlow programs, matches each transaction +//! with its corresponding account-update snapshot, and persists the resulting +//! typed events to the store. +//! +//! - [`PartialEventWatchdog`]: some events arrive in two halves (a transaction +//! update and an account update) that don't always land together. The decoder +//! parks the half it has in a map shared with the watchdog; the watchdog +//! periodically scans that map and dead-letters any entry whose other half +//! never showed up within the slot window, recording which half went missing. +//! +//! - [`FinalizationWorker`]: rows are first written at the `confirmed` +//! commitment level. This worker re-checks them against the chain and +//! promotes them to `finalized`, or marks them rolled back if the transaction +//! disappeared. It uses a cheap batched RPC call for recent rows and falls +//! back to one-call-per-row lookups for rows old enough that the batched +//! method no longer reports them. + +pub mod decoder; +pub mod finalization; +pub mod ingester; +pub mod watchdog; + +#[expect(unused_imports)] +pub(crate) use { + decoder::Decoder, + finalization::FinalizationWorker, + ingester::Ingester, + watchdog::PartialEventWatchdog, +}; diff --git a/crates/solana-indexer/src/indexer/watchdog.rs b/crates/solana-indexer/src/indexer/watchdog.rs new file mode 100644 index 0000000000..f5245da41a --- /dev/null +++ b/crates/solana-indexer/src/indexer/watchdog.rs @@ -0,0 +1,60 @@ +#![expect(dead_code)] +//! The partial-event watchdog. + +// TODO: This file only declares the component skeleton. The `run` body is +// `unimplemented!`; the lag-detection and dead-letter logic arrive in a later +// change. + +use { + crate::{ + persistence::Persistence, + types::{ + errors::PersistenceError, + shared::{PartialEvent, PartialEventKey}, + }, + }, + dashmap::DashMap, + std::sync::Arc, +}; + +#[allow(unused_imports)] +use crate::indexer::ingester::LATEST_CHAIN_SLOT; + +/// Partial-event watchdog component. +/// +/// The watchdog holds a view of the partial-event map the decoder mutates. +/// +/// Every 500 ms it scans the map and gives up on any partial more than 32 slots +/// behind `LATEST_CHAIN_SLOT`. +/// +/// Those entries are flushed to `solana.dead_letter` with a reason of +/// `AccountUpdateMissing` or `TxUpdateMissing` depending on which half was +/// missing. +pub(crate) struct PartialEventWatchdog { + /// Persistence layer. + pub persistence: Persistence, + + /// Shared in-memory map of partial events keyed by `PartialEventKey`. + /// + /// The decoder holds a clone of this `Arc` and both inserts and removes + /// halves as it processes them. + pub partials: Arc>, +} + +impl PartialEventWatchdog { + /// Construct a new watchdog. + pub fn new( + persistence: Persistence, + partials: Arc>, + ) -> Self { + Self { + persistence, + partials, + } + } + + /// Outer loop. Runs the periodic scan over the shared partial-event map. + pub async fn run(&mut self) -> Result<(), PersistenceError> { + unimplemented!() + } +} diff --git a/crates/solana-indexer/src/lib.rs b/crates/solana-indexer/src/lib.rs index aada6dc51b..2c12177d12 100644 --- a/crates/solana-indexer/src/lib.rs +++ b/crates/solana-indexer/src/lib.rs @@ -2,6 +2,7 @@ #![warn(missing_docs)] +pub mod indexer; pub mod persistence; pub mod traits; pub mod types; diff --git a/crates/solana-indexer/src/types/mod.rs b/crates/solana-indexer/src/types/mod.rs index ef83c3612e..1bcb7b6f96 100644 --- a/crates/solana-indexer/src/types/mod.rs +++ b/crates/solana-indexer/src/types/mod.rs @@ -1,12 +1,12 @@ //! Domain types for the Solana settlement indexer. -pub mod channel; pub mod commitment; pub mod dead_letter; pub mod errors; pub mod events; pub mod order; pub mod recovery; +pub mod shared; pub mod slot; pub mod tx; pub mod wire; diff --git a/crates/solana-indexer/src/types/channel.rs b/crates/solana-indexer/src/types/shared.rs similarity index 51% rename from crates/solana-indexer/src/types/channel.rs rename to crates/solana-indexer/src/types/shared.rs index bcfb25fa5f..d7fbc937a7 100644 --- a/crates/solana-indexer/src/types/channel.rs +++ b/crates/solana-indexer/src/types/shared.rs @@ -1,9 +1,5 @@ #![expect(dead_code)] -//! Message types passed over the internal channels. -//! -//! The ingester pushes [`StreamUpdate`] into the channel to the decoder; the -//! decoder pushes [`PartialEvent`] / [`PartialHalf`] to the partial-event -//! watchdog. +//! Types shared across the internal components of this crate. use crate::types::{ Signature, @@ -38,24 +34,26 @@ pub(crate) enum StreamUpdate { }, } -/// From `Decoder` → `PartialEventWatchdog`. -/// -/// The watchdog holds incomplete `(slot, signature)` pairs until both halves -/// arrive; each delivery carries the half that just landed. -#[derive(Debug, Clone, Copy)] -pub(crate) struct PartialEvent { - /// Slot the partial was observed at. - pub slot: Slot, - /// Transaction signature the partial corresponds to. - pub signature: Signature, -} +/// Key for the shared decoder↔watchdog partials map: the `(slot, signature)` +/// pair identifying which on-chain event a `PartialEvent` belongs to. +#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)] +pub(crate) struct PartialEventKey(pub Slot, pub Signature); -/// One of the two halves a [`StreamUpdate`] can produce. +/// One half of a paired on-chain event, recorded by the decoder when only +/// one of the two matching `StreamUpdate` messages has been observed for a +/// given `PartialEventKey`. +/// +/// The other half is expected to arrive shortly; until it does, the entry +/// lives in the shared decoder↔watchdog map. The watchdog scans the map and +/// dead-letters any partial that has aged out (the matching half never +/// arrived within the slot window), using the variant to report which half +/// was missing. /// -/// The decoder pushes one `PartialEvent` per `StreamUpdate` it processes; the -/// watchdog uses the `(slot, signature)` key to match pairs. +/// Both components hold a clone of the same +/// `Arc>`, so there is no message +/// passing between them — the watchdog simply reads what the decoder wrote. #[derive(Debug, Clone)] -pub(crate) enum PartialHalf { +pub(crate) enum PartialEvent { /// Transaction-update half. Tx(Box), /// Account-update half.