From b29a0b54ff3219d6e60baf0a2ea32137c1e92955 Mon Sep 17 00:00:00 2001 From: tilacog Date: Tue, 9 Jun 2026 11:33:00 -0300 Subject: [PATCH 1/6] =?UTF-8?q?feat(solana-indexer):=20PR=204=20=E2=80=94?= =?UTF-8?q?=20Component=20structs=20(skeleton=20declarations)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.lock | 52 +++++++++++++- Cargo.toml | 1 + crates/solana-indexer/Cargo.toml | 3 + crates/solana-indexer/src/indexer/decoder.rs | 70 +++++++++++++++++++ .../src/indexer/finalization.rs | 63 +++++++++++++++++ crates/solana-indexer/src/indexer/ingester.rs | 59 ++++++++++++++++ crates/solana-indexer/src/indexer/mod.rs | 13 ++++ crates/solana-indexer/src/indexer/watchdog.rs | 53 ++++++++++++++ crates/solana-indexer/src/lib.rs | 1 + crates/solana-indexer/src/types/mod.rs | 2 +- .../src/types/{channel.rs => shared.rs} | 38 +++++----- 11 files changed, 331 insertions(+), 24 deletions(-) create mode 100644 crates/solana-indexer/src/indexer/decoder.rs create mode 100644 crates/solana-indexer/src/indexer/finalization.rs create mode 100644 crates/solana-indexer/src/indexer/ingester.rs create mode 100644 crates/solana-indexer/src/indexer/mod.rs create mode 100644 crates/solana-indexer/src/indexer/watchdog.rs rename crates/solana-indexer/src/types/{channel.rs => shared.rs} (54%) diff --git a/Cargo.lock b/Cargo.lock index 692cdd8b79..45edab9ebc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5445,7 +5445,7 @@ dependencies = [ "libc", "percent-encoding", "pin-project-lite", - "socket2 0.5.10", + "socket2 0.6.4", "system-configuration", "tokio", "tower-service", @@ -7514,7 +7514,7 @@ dependencies = [ "quinn-udp", "rustc-hash", "rustls", - "socket2 0.5.10", + "socket2 0.6.4", "thiserror 2.0.18", "tokio", "tracing", @@ -7554,7 +7554,7 @@ dependencies = [ "cfg_aliases", "libc", "once_cell", - "socket2 0.5.10", + "socket2 0.6.4", "tracing", "windows-sys 0.60.2", ] @@ -8151,6 +8151,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c665f33d38cea657d9614f766881e4d510e0eda4239891eea56b4cadcf01801b" dependencies = [ "aws-lc-rs", + "log", "once_cell", "ring", "rustls-pki-types", @@ -9396,13 +9397,16 @@ dependencies = [ name = "solana-indexer" version = "0.1.0" dependencies = [ + "dashmap 6.1.0", "observe", "prometheus", "prometheus-metric-storage", "solana-client", "solana-sdk", "thiserror 1.0.69", + "tokio", "tracing", + "yellowstone-grpc-client", "yellowstone-grpc-proto", ] @@ -11809,8 +11813,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f32a6f80051a4111560201420c7885d0082ba9efe2ab61875c587bb6b18b9a0" dependencies = [ "async-trait", + "axum 0.8.8", "base64 0.22.1", "bytes", + "flate2", + "h2 0.4.13", "http 1.4.0", "http-body 1.0.1", "http-body-util", @@ -11819,13 +11826,17 @@ dependencies = [ "hyper-util", "percent-encoding", "pin-project", + "rustls-native-certs", + "socket2 0.6.4", "sync_wrapper 1.0.2", "tokio", + "tokio-rustls", "tokio-stream", "tower 0.5.3", "tower-layer", "tower-service", "tracing", + "zstd", ] [[package]] @@ -11840,6 +11851,19 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "tonic-health" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4ff0636fef47afb3ec02818f5bceb4377b8abb9d6a386aeade18bd6212f8eb7" +dependencies = [ + "prost 0.14.3", + "tokio", + "tokio-stream", + "tonic 0.14.4", + "tonic-prost", +] + [[package]] name = "tonic-prost" version = "0.14.4" @@ -13104,6 +13128,26 @@ version = "0.13.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "66fee0b777b0f5ac1c69bb06d361268faafa61cd4682ae064a171c16c433e9e4" +[[package]] +name = "yellowstone-grpc-client" +version = "13.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a66894f04139a5c4a2c1108fed68fc3e7efdf4120464ad86e14013c7e2cbeec5" +dependencies = [ + "arc-swap", + "bytes", + "futures", + "hyper-util", + "log", + "pin-project", + "thiserror 2.0.18", + "tokio", + "tonic 0.14.4", + "tonic-health", + "tower 0.4.13", + "yellowstone-grpc-proto", +] + [[package]] name = "yellowstone-grpc-proto" version = "12.4.0" @@ -13117,6 +13161,8 @@ dependencies = [ "siphasher 1.0.3", "solana-pubkey 4.1.0", "thiserror 2.0.18", + "tonic 0.14.4", + "tonic-prost", "tonic-prost-build", ] diff --git a/Cargo.toml b/Cargo.toml index bfcc8bd5bc..afe8bf9bf1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -148,6 +148,7 @@ tracing-subscriber = { version = "0.3.22", features = ["json"] } url = "2.5.0" vergen = "8" winner-selection = { path = "crates/winner-selection" } +yellowstone-grpc-client = "13.1.0" yellowstone-grpc-proto = { version = "12.4.0", default-features = false } [workspace.lints] diff --git a/crates/solana-indexer/Cargo.toml b/crates/solana-indexer/Cargo.toml index 1d871b51c5..f086ae9997 100644 --- a/crates/solana-indexer/Cargo.toml +++ b/crates/solana-indexer/Cargo.toml @@ -16,13 +16,16 @@ name = "solana-indexer" path = "src/main.rs" [dependencies] +dashmap = { workspace = true } observe = { workspace = true } prometheus = { workspace = true } prometheus-metric-storage = { workspace = true } solana-client = { workspace = true } solana-sdk = { workspace = true } thiserror = { workspace = true } +tokio = { workspace = true, features = ["macros", "rt-multi-thread", "sync"] } tracing = { workspace = true } +yellowstone-grpc-client = { workspace = true } yellowstone-grpc-proto = { workspace = true } [lints] diff --git a/crates/solana-indexer/src/indexer/decoder.rs b/crates/solana-indexer/src/indexer/decoder.rs new file mode 100644 index 0000000000..db41baef81 --- /dev/null +++ b/crates/solana-indexer/src/indexer/decoder.rs @@ -0,0 +1,70 @@ +//! The decoder pulls `StreamUpdate`s from the ingester, decodes +//! settlement-program and SolFlow transactions, joins account-update snapshots, +//! and persists typed events. + +// TODO: This file only declares the component skeleton. The `run` body is +// `unimplemented!`; the dispatch logic and persist path arrive in a later +// change. + +use { + crate::{ + traits::store::Store, + types::{ + errors::StoreError, + shared::{PartialEvent, PartialEventKey, StreamUpdate}, + }, + }, + dashmap::DashMap, + solana_sdk::pubkey::Pubkey, + std::sync::Arc, + tokio::sync::mpsc::Receiver, +}; + +/// Decoder component. +/// +/// The watchdog holds a clone of the same `partials` map, so the two operate on +/// the same concurrent map without any message passing between them. +pub struct Decoder { + /// Store implementor. + pub store: St, + + /// Incoming `StreamUpdate` from the ingester. + pub rx: Receiver, + + /// Shared in-memory map of partial events keyed by `PartialEventKey`, + /// holding either-half events waiting for their pair. The watchdog holds a + /// clone of this `Arc`. + pub partials: Arc>, + + /// Settlement program id (filter target for the decoder). + pub settlement_program: Pubkey, + + /// SolFlow program id (filter target for the decoder). + pub solflow_program: Pubkey, +} + +impl Decoder { + /// Construct a new decoder. The caller owns the channel capacity decision. + pub fn new( + store: St, + rx: Receiver, + partials: Arc>, + settlement_program: Pubkey, + solflow_program: Pubkey, + ) -> Self { + Self { + store, + rx, + partials, + settlement_program, + solflow_program, + } + } + + /// Main loop. Pulls `StreamUpdate` from the receiver, runs the decode + /// pipeline, persists, and records partial events in the shared map for the + /// watchdog to read. + pub async fn run(&mut self) -> Result<(), StoreError> { + unimplemented!() + } +} diff --git a/crates/solana-indexer/src/indexer/finalization.rs b/crates/solana-indexer/src/indexer/finalization.rs new file mode 100644 index 0000000000..26b494ccf0 --- /dev/null +++ b/crates/solana-indexer/src/indexer/finalization.rs @@ -0,0 +1,63 @@ +//! The finalization worker updates the commitment level of the transactions +//! tracked by the indexer, promoting rows written at `confirmed` to +//! `finalized`. +//! +//! It does so through two flows. Two are needed because the relevant RPC +//! methods trade off differently: `getSignatureStatuses` is batchable but the +//! node only retains statuses for recent slots, while `getTransaction` reaches +//! arbitrarily old transactions on archival nodes but costs one call per +//! signature. The batched pass handles the common case cheaply; the per-row +//! sweep catches rows that age out of it. +//! +//! - **Promotion pass**: batch-polls `getSignatureStatuses` (at most +//! [`PROMOTION_BATCH_LIMIT`] signatures per call) over rows still at +//! `confirmed` that are at least [`FINALIZATION_WINDOW_SLOTS`] behind the +//! chain tip, and promotes rows whose `confirmationStatus` is `"finalized"`. +//! +//! - **Aged-row sweep**: fallback for rows past the signature-status retention +//! horizon ([`SIGNATURE_STATUS_RETENTION_SLOTS`]), which the promotion pass +//! can no longer check. Each row costs one `getTransaction` call; a non-null +//! response promotes to `finalized`, a null response marks `rolled_back`. + +// TODO: This file only declares the component skeleton. The `run` body is +// `unimplemented!`; both flows arrive in a later change. + +use crate::traits::{solana_client::SolanaClient, store::Store}; + +/// Typical number of slots for a transaction to finalize (~12.8 s). The +/// promotion pass skips rows fresher than this. +#[allow(dead_code)] +pub const FINALIZATION_WINDOW_SLOTS: u64 = 32; + +/// Upper limit for the `getSignatureStatuses` batch RPC call. +#[allow(dead_code)] +pub const PROMOTION_BATCH_LIMIT: usize = 256; + +/// Approximate slot horizon past which `getSignatureStatuses` no longer returns +/// a result. +#[allow(dead_code)] +pub const SIGNATURE_STATUS_RETENTION_SLOTS: u64 = 150; + +/// Transaction finalization worker. See the module docs for the two flows it +/// runs. +pub struct FinalizationWorker { + /// Store implementor. + pub store: St, + + /// RPC implementor. + pub rpc: R, +} + +impl FinalizationWorker { + /// Construct a new finalization worker. + pub fn new(store: St, rpc: R) -> Self { + Self { store, rpc } + } + + /// Outer loop. Runs the promotion pass and the aged-row sweep on a timer. + /// + /// Placeholder for now; implemented in a later change. + pub async fn run(&mut self) { + unimplemented!("implemented in PR 11–12") + } +} diff --git a/crates/solana-indexer/src/indexer/ingester.rs b/crates/solana-indexer/src/indexer/ingester.rs new file mode 100644 index 0000000000..c5afb566d5 --- /dev/null +++ b/crates/solana-indexer/src/indexer/ingester.rs @@ -0,0 +1,59 @@ +//! The ingester owns the yellowstone gRPC stream. It drains the socket as fast +//! as yellowstone delivers, pushes tagged updates into the channel, and updates +//! `LATEST_CHAIN_SLOT` on every slot-filter message. It performs no decoding. + +// TODO: This file only declares the component skeleton. The `run` body is +// `unimplemented!`; the actual drain and reconnect with backoff logic arrives +// in a later change. + +use { + crate::{traits::store::Store, types::shared::StreamUpdate}, + std::sync::atomic::AtomicU64, + tokio::sync::mpsc::Sender, + yellowstone_grpc_client::GrpcConnector, +}; + +/// The sole writer is the ingester, on every slot-filter message. Anchors the +/// partial-event watchdog and the finalization worker. Cold start is zero; the +/// watchdog skips its comparison on the first tick. +pub static LATEST_CHAIN_SLOT: AtomicU64 = AtomicU64::new(0); + +/// Cap on the exponential backoff between reconnect attempts. +#[allow(dead_code)] +pub const RECONNECT_BACKOFF_CAP: std::time::Duration = std::time::Duration::from_secs(30); + +/// Capacity of the channel from the ingester to the decoder. +pub const INGEST_TO_DECODER_CAPACITY: usize = 1024; + +/// Ingester component. +/// +/// Generic over a `GrpcConnector` implementor so the unit tests can drive it +/// with a mock. +pub struct Ingester { + /// gRPC connector implementor + pub connector: C, + + /// Sends `StreamUpdate` to the decoder. Should be bounded to + /// `RECONNECT_BACKOFF_CAP` entries. + pub tx: Sender, + + /// Store implementor; used to checkpoint the slot. + pub store: St, +} + +impl Ingester { + /// Construct a new ingester. The caller owns the channel capacity decision. + pub fn new(connector: C, tx: Sender, store: St) -> Self { + Self { + connector, + tx, + store, + } + } + + /// TODO: Outer loop: open the subscription, drain it, push into the + /// channel, reconnect on failure with exponential backoff. + pub async fn run(&mut self) { + unimplemented!() + } +} diff --git a/crates/solana-indexer/src/indexer/mod.rs b/crates/solana-indexer/src/indexer/mod.rs new file mode 100644 index 0000000000..19065ef29a --- /dev/null +++ b/crates/solana-indexer/src/indexer/mod.rs @@ -0,0 +1,13 @@ +//! Consumer components of the Solana settlement indexer. + +pub mod decoder; +pub mod finalization; +pub mod ingester; +pub mod watchdog; + +pub use { + decoder::Decoder, + finalization::FinalizationWorker, + ingester::Ingester, + watchdog::PartialEventWatchdog, +}; diff --git a/crates/solana-indexer/src/indexer/watchdog.rs b/crates/solana-indexer/src/indexer/watchdog.rs new file mode 100644 index 0000000000..5115749afa --- /dev/null +++ b/crates/solana-indexer/src/indexer/watchdog.rs @@ -0,0 +1,53 @@ +//! The partial-event watchdog. + +// TODO: This file only declares the component skeleton. The `run` body is +// `unimplemented!`; the lag-detection and dead-letter logic arrive in a later +// change. + +use { + crate::{ + traits::store::Store, + types::{ + errors::StoreError, + shared::{PartialEvent, PartialEventKey}, + }, + }, + dashmap::DashMap, + std::sync::Arc, +}; + +#[allow(unused_imports)] +use crate::indexer::ingester::LATEST_CHAIN_SLOT; + +/// Partial-event watchdog component. +/// +/// The watchdog holds a view of the partial-event map the decoder mutates. +/// +/// Every 500 ms it scans the map and gives up on any partial more than 32 slots +/// behind `LATEST_CHAIN_SLOT`. +/// +/// Those entries are flushed to `solana.dead_letter` with a reason of +/// `AccountUpdateMissing` or `TxUpdateMissing` depending on which half was +/// missing. +pub struct PartialEventWatchdog { + /// Store implementor. + pub store: St, + + /// Shared in-memory map of partial events keyed by `PartialEventKey`. + /// + /// The decoder holds a clone of this `Arc` and both inserts and removes + /// halves as it processes them. + pub partials: Arc>, +} + +impl PartialEventWatchdog { + /// Construct a new watchdog. + pub fn new(store: St, partials: Arc>) -> Self { + Self { store, partials } + } + + /// Outer loop. Runs the periodic scan over the shared partial-event map. + pub async fn run(&mut self) -> Result<(), StoreError> { + unimplemented!() + } +} diff --git a/crates/solana-indexer/src/lib.rs b/crates/solana-indexer/src/lib.rs index 24f37b8f46..39835ea56b 100644 --- a/crates/solana-indexer/src/lib.rs +++ b/crates/solana-indexer/src/lib.rs @@ -3,5 +3,6 @@ #![allow(async_fn_in_trait)] #![warn(missing_docs)] +pub mod indexer; pub mod traits; pub mod types; diff --git a/crates/solana-indexer/src/types/mod.rs b/crates/solana-indexer/src/types/mod.rs index 34a654e98c..f95a53ae85 100644 --- a/crates/solana-indexer/src/types/mod.rs +++ b/crates/solana-indexer/src/types/mod.rs @@ -1,12 +1,12 @@ //! Domain types for the Solana settlement indexer. -pub mod channel; pub mod commitment; pub mod dead_letter; pub mod errors; pub mod events; pub mod metrics; pub mod recovery; +pub mod shared; pub mod tx; pub mod wire; diff --git a/crates/solana-indexer/src/types/channel.rs b/crates/solana-indexer/src/types/shared.rs similarity index 54% rename from crates/solana-indexer/src/types/channel.rs rename to crates/solana-indexer/src/types/shared.rs index 8c9d52fc7b..2b02efea22 100644 --- a/crates/solana-indexer/src/types/channel.rs +++ b/crates/solana-indexer/src/types/shared.rs @@ -1,8 +1,4 @@ -//! Message types passed over the internal channels. -//! -//! The ingester pushes [`StreamUpdate`] into the channel to the decoder; the -//! decoder pushes [`PartialEvent`] / [`PartialHalf`] to the partial-event -//! watchdog. +//! Types shared across the internal compoents of this crate. use crate::types::{ Signature, @@ -37,24 +33,26 @@ pub enum StreamUpdate { }, } -/// From `Decoder` → `PartialEventWatchdog`. -/// -/// The watchdog holds incomplete `(slot, signature)` pairs until both halves -/// arrive; each delivery carries the half that just landed. -#[derive(Debug, Clone, Copy)] -pub struct PartialEvent { - /// Slot the partial was observed at. - pub slot: u64, - /// Transaction signature the partial corresponds to. - pub signature: Signature, -} +/// Key for the shared decoder↔watchdog partials map: the `(slot, signature)` +/// pair identifying which on-chain event a `PartialEvent` belongs to. +#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)] +pub struct PartialEventKey(pub u64, pub Signature); -/// One of the two halves a [`StreamUpdate`] can produce. +/// One half of a paired on-chain event, recorded by the decoder when only +/// one of the two matching `StreamUpdate` messages has been observed for a +/// given `PartialEventKey`. +/// +/// The other half is expected to arrive shortly; until it does, the entry +/// lives in the shared decoder↔watchdog map. The watchdog scans the map and +/// dead-letters any partial that has aged out (the matching half never +/// arrived within the slot window), using the variant to report which half +/// was missing. /// -/// The decoder pushes one `PartialEvent` per `StreamUpdate` it processes; the -/// watchdog uses the `(slot, signature)` key to match pairs. +/// Both components hold a clone of the same +/// `Arc>`, so there is no message +/// passing between them — the watchdog simply reads what the decoder wrote. #[derive(Debug, Clone)] -pub enum PartialHalf { +pub enum PartialEvent { /// Transaction-update half. Tx(Box), /// Account-update half. From 126a8cdd840665670e01417185da166df924537f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tiago=20Guimar=C3=A3es?= Date: Wed, 10 Jun 2026 14:33:32 -0300 Subject: [PATCH 2/6] docs(solana-indexer): describe component roles in indexer module docs --- crates/solana-indexer/src/indexer/mod.rs | 26 ++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/crates/solana-indexer/src/indexer/mod.rs b/crates/solana-indexer/src/indexer/mod.rs index 19065ef29a..ff5eb46c01 100644 --- a/crates/solana-indexer/src/indexer/mod.rs +++ b/crates/solana-indexer/src/indexer/mod.rs @@ -1,4 +1,30 @@ //! Consumer components of the Solana settlement indexer. +//! +//! The four components and their roles: +//! +//! - [`Ingester`]: subscribes to the Yellowstone gRPC stream and drains it as +//! fast as updates arrive, forwarding them to the decoder. It does no +//! decoding itself, so the socket never backs up behind slow processing. It +//! is also the single writer of the "latest chain slot" counter that the +//! other components use to know how far the chain has advanced. +//! +//! - [`Decoder`]: receives the raw stream updates, picks out transactions +//! belonging to the settlement and SolFlow programs, matches each transaction +//! with its corresponding account-update snapshot, and persists the resulting +//! typed events to the store. +//! +//! - [`PartialEventWatchdog`]: some events arrive in two halves (a transaction +//! update and an account update) that don't always land together. The decoder +//! parks the half it has in a map shared with the watchdog; the watchdog +//! periodically scans that map and dead-letters any entry whose other half +//! never showed up within the slot window, recording which half went missing. +//! +//! - [`FinalizationWorker`]: rows are first written at the `confirmed` +//! commitment level. This worker re-checks them against the chain and +//! promotes them to `finalized`, or marks them rolled back if the transaction +//! disappeared. It uses a cheap batched RPC call for recent rows and falls +//! back to one-call-per-row lookups for rows old enough that the batched +//! method no longer reports them. pub mod decoder; pub mod finalization; From c6172c60f0671e80c19e18278902b17db9e83d24 Mon Sep 17 00:00:00 2001 From: squadgazzz Date: Fri, 26 Jun 2026 09:25:04 +0000 Subject: [PATCH 3/6] refactor(solana-indexer): address PR4 review comments - PartialEventKey uses Slot instead of u64, matching StreamUpdate - finalization: clarify FINALIZATION_WINDOW_SLOTS doc, drop the per-const #[allow(dead_code)] in favor of the module-level #![expect(dead_code)] --- crates/solana-indexer/src/indexer/finalization.rs | 9 ++++----- crates/solana-indexer/src/indexer/ingester.rs | 1 - crates/solana-indexer/src/types/shared.rs | 2 +- 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/crates/solana-indexer/src/indexer/finalization.rs b/crates/solana-indexer/src/indexer/finalization.rs index e1ad7f4205..9671bd39db 100644 --- a/crates/solana-indexer/src/indexer/finalization.rs +++ b/crates/solana-indexer/src/indexer/finalization.rs @@ -25,18 +25,17 @@ use crate::traits::{solana_client::SolanaClient, store::Store}; -/// Typical number of slots for a transaction to finalize (~12.8 s). The -/// promotion pass skips rows fresher than this. -#[allow(dead_code)] +/// Slots a transaction usually needs to finalize (~12.8 s at 400 ms/slot). +/// A heuristic floor, not a guarantee: the promotion pass skips rows fresher +/// than this because they cannot have finalized yet, and degraded consensus +/// can push real finalization later (the aged-row sweep catches those). pub const FINALIZATION_WINDOW_SLOTS: u64 = 32; /// Upper limit for the `getSignatureStatuses` batch RPC call. -#[allow(dead_code)] pub const PROMOTION_BATCH_LIMIT: usize = 256; /// Approximate slot horizon past which `getSignatureStatuses` no longer returns /// a result. -#[allow(dead_code)] pub const SIGNATURE_STATUS_RETENTION_SLOTS: u64 = 150; /// Transaction finalization worker. See the module docs for the two flows it diff --git a/crates/solana-indexer/src/indexer/ingester.rs b/crates/solana-indexer/src/indexer/ingester.rs index 27b07f6061..be7f4efcad 100644 --- a/crates/solana-indexer/src/indexer/ingester.rs +++ b/crates/solana-indexer/src/indexer/ingester.rs @@ -20,7 +20,6 @@ use { pub static LATEST_CHAIN_SLOT: AtomicU64 = AtomicU64::new(0); /// Cap on the exponential backoff between reconnect attempts. -#[allow(dead_code)] pub const RECONNECT_BACKOFF_CAP: std::time::Duration = std::time::Duration::from_secs(30); /// Capacity of the channel from the ingester to the decoder. diff --git a/crates/solana-indexer/src/types/shared.rs b/crates/solana-indexer/src/types/shared.rs index e56122acd1..d7fbc937a7 100644 --- a/crates/solana-indexer/src/types/shared.rs +++ b/crates/solana-indexer/src/types/shared.rs @@ -37,7 +37,7 @@ pub(crate) enum StreamUpdate { /// Key for the shared decoder↔watchdog partials map: the `(slot, signature)` /// pair identifying which on-chain event a `PartialEvent` belongs to. #[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)] -pub(crate) struct PartialEventKey(pub u64, pub Signature); +pub(crate) struct PartialEventKey(pub Slot, pub Signature); /// One half of a paired on-chain event, recorded by the decoder when only /// one of the two matching `StreamUpdate` messages has been observed for a From 067cd4d797043a09c453672542d502959f606715 Mon Sep 17 00:00:00 2001 From: squadgazzz Date: Fri, 26 Jun 2026 09:40:10 +0000 Subject: [PATCH 4/6] refactor(solana-indexer): hold Store/SolanaClient as Arc Replace the generic type params on the four components with Arc / Arc fields, per review feedback. Avoids propagating type params through the codebase, and dyn dispatch is negligible for these I/O-bound components. Ingester keeps its GrpcConnector generic for stream mocking. --- crates/solana-indexer/src/indexer/decoder.rs | 8 ++++---- crates/solana-indexer/src/indexer/finalization.rs | 15 +++++++++------ crates/solana-indexer/src/indexer/ingester.rs | 10 +++++----- crates/solana-indexer/src/indexer/watchdog.rs | 11 +++++++---- 4 files changed, 25 insertions(+), 19 deletions(-) diff --git a/crates/solana-indexer/src/indexer/decoder.rs b/crates/solana-indexer/src/indexer/decoder.rs index ea340f81c2..855ade4928 100644 --- a/crates/solana-indexer/src/indexer/decoder.rs +++ b/crates/solana-indexer/src/indexer/decoder.rs @@ -25,9 +25,9 @@ use { /// /// The watchdog holds a clone of the same `partials` map, so the two operate on /// the same concurrent map without any message passing between them. -pub(crate) struct Decoder { +pub(crate) struct Decoder { /// Store implementor. - pub store: St, + pub store: Arc, /// Incoming `StreamUpdate` from the ingester. pub rx: Receiver, @@ -44,10 +44,10 @@ pub(crate) struct Decoder { pub solflow_program: Pubkey, } -impl Decoder { +impl Decoder { /// Construct a new decoder. The caller owns the channel capacity decision. pub fn new( - store: St, + store: Arc, rx: Receiver, partials: Arc>, settlement_program: Pubkey, diff --git a/crates/solana-indexer/src/indexer/finalization.rs b/crates/solana-indexer/src/indexer/finalization.rs index 9671bd39db..11b53fa920 100644 --- a/crates/solana-indexer/src/indexer/finalization.rs +++ b/crates/solana-indexer/src/indexer/finalization.rs @@ -23,7 +23,10 @@ // TODO: This file only declares the component skeleton. The `run` body is // `unimplemented!`; both flows arrive in a later change. -use crate::traits::{solana_client::SolanaClient, store::Store}; +use { + crate::traits::{solana_client::SolanaClient, store::Store}, + std::sync::Arc, +}; /// Slots a transaction usually needs to finalize (~12.8 s at 400 ms/slot). /// A heuristic floor, not a guarantee: the promotion pass skips rows fresher @@ -40,17 +43,17 @@ pub const SIGNATURE_STATUS_RETENTION_SLOTS: u64 = 150; /// Transaction finalization worker. See the module docs for the two flows it /// runs. -pub(crate) struct FinalizationWorker { +pub(crate) struct FinalizationWorker { /// Store implementor. - pub store: St, + pub store: Arc, /// RPC implementor. - pub rpc: R, + pub rpc: Arc, } -impl FinalizationWorker { +impl FinalizationWorker { /// Construct a new finalization worker. - pub fn new(store: St, rpc: R) -> Self { + pub fn new(store: Arc, rpc: Arc) -> Self { Self { store, rpc } } diff --git a/crates/solana-indexer/src/indexer/ingester.rs b/crates/solana-indexer/src/indexer/ingester.rs index be7f4efcad..2248bf9e55 100644 --- a/crates/solana-indexer/src/indexer/ingester.rs +++ b/crates/solana-indexer/src/indexer/ingester.rs @@ -9,7 +9,7 @@ use { crate::{traits::store::Store, types::shared::StreamUpdate}, - std::sync::atomic::AtomicU64, + std::sync::{Arc, atomic::AtomicU64}, tokio::sync::mpsc::Sender, yellowstone_grpc_client::GrpcConnector, }; @@ -29,7 +29,7 @@ pub const INGEST_TO_DECODER_CAPACITY: usize = 1024; /// /// Generic over a `GrpcConnector` implementor so the unit tests can drive it /// with a mock. -pub(crate) struct Ingester { +pub(crate) struct Ingester { /// gRPC connector implementor pub connector: C, @@ -38,12 +38,12 @@ pub(crate) struct Ingester { pub tx: Sender, /// Store implementor; used to checkpoint the slot. - pub store: St, + pub store: Arc, } -impl Ingester { +impl Ingester { /// Construct a new ingester. The caller owns the channel capacity decision. - pub fn new(connector: C, tx: Sender, store: St) -> Self { + pub fn new(connector: C, tx: Sender, store: Arc) -> Self { Self { connector, tx, diff --git a/crates/solana-indexer/src/indexer/watchdog.rs b/crates/solana-indexer/src/indexer/watchdog.rs index 1f63c2ce0a..ef1f408207 100644 --- a/crates/solana-indexer/src/indexer/watchdog.rs +++ b/crates/solana-indexer/src/indexer/watchdog.rs @@ -30,9 +30,9 @@ use crate::indexer::ingester::LATEST_CHAIN_SLOT; /// Those entries are flushed to `solana.dead_letter` with a reason of /// `AccountUpdateMissing` or `TxUpdateMissing` depending on which half was /// missing. -pub(crate) struct PartialEventWatchdog { +pub(crate) struct PartialEventWatchdog { /// Store implementor. - pub store: St, + pub store: Arc, /// Shared in-memory map of partial events keyed by `PartialEventKey`. /// @@ -41,9 +41,12 @@ pub(crate) struct PartialEventWatchdog { pub partials: Arc>, } -impl PartialEventWatchdog { +impl PartialEventWatchdog { /// Construct a new watchdog. - pub fn new(store: St, partials: Arc>) -> Self { + pub fn new( + store: Arc, + partials: Arc>, + ) -> Self { Self { store, partials } } From 4527782a330a5bd61d4d2113a80a70c8eeb7129e Mon Sep 17 00:00:00 2001 From: squadgazzz Date: Fri, 26 Jun 2026 09:48:55 +0000 Subject: [PATCH 5/6] docs(solana-indexer): clarify LATEST_CHAIN_SLOT and fix tx capacity doc - note LATEST_CHAIN_SLOT is the chain tip, not indexing progress (the watermark in solana.indexer_state is the separate processed-slot counter) - fix the decoder channel doc to reference INGEST_TO_DECODER_CAPACITY, not the unrelated RECONNECT_BACKOFF_CAP duration --- crates/solana-indexer/src/indexer/ingester.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/crates/solana-indexer/src/indexer/ingester.rs b/crates/solana-indexer/src/indexer/ingester.rs index 2248bf9e55..94e92a69ca 100644 --- a/crates/solana-indexer/src/indexer/ingester.rs +++ b/crates/solana-indexer/src/indexer/ingester.rs @@ -17,6 +17,10 @@ use { /// The sole writer is the ingester, on every slot-filter message. Anchors the /// partial-event watchdog and the finalization worker. Cold start is zero; the /// watchdog skips its comparison on the first tick. +/// +/// This is the chain tip, not indexing progress. How far the indexer has +/// actually persisted is the watermark in `solana.indexer_state`, written by +/// the decoder, which is a separate value. pub static LATEST_CHAIN_SLOT: AtomicU64 = AtomicU64::new(0); /// Cap on the exponential backoff between reconnect attempts. @@ -34,7 +38,7 @@ pub(crate) struct Ingester { pub connector: C, /// Sends `StreamUpdate` to the decoder. Should be bounded to - /// `RECONNECT_BACKOFF_CAP` entries. + /// `INGEST_TO_DECODER_CAPACITY` entries. pub tx: Sender, /// Store implementor; used to checkpoint the slot. From 571a25dc0f4d863c8dd88478e849012139fa85d8 Mon Sep 17 00:00:00 2001 From: squadgazzz Date: Tue, 30 Jun 2026 07:56:22 +0000 Subject: [PATCH 6/6] refactor(solana-indexer): rename store fields to persistence --- crates/solana-indexer/src/indexer/decoder.rs | 8 ++++---- crates/solana-indexer/src/indexer/finalization.rs | 8 ++++---- crates/solana-indexer/src/indexer/ingester.rs | 8 ++++---- crates/solana-indexer/src/indexer/watchdog.rs | 14 ++++++++++---- 4 files changed, 22 insertions(+), 16 deletions(-) diff --git a/crates/solana-indexer/src/indexer/decoder.rs b/crates/solana-indexer/src/indexer/decoder.rs index f7ea5a0524..783d9e3b67 100644 --- a/crates/solana-indexer/src/indexer/decoder.rs +++ b/crates/solana-indexer/src/indexer/decoder.rs @@ -26,8 +26,8 @@ use { /// The watchdog holds a clone of the same `partials` map, so the two operate on /// the same concurrent map without any message passing between them. pub(crate) struct Decoder { - /// Store implementor. - pub store: Persistence, + /// Persistence layer. + pub persistence: Persistence, /// Incoming `StreamUpdate` from the ingester. pub rx: Receiver, @@ -47,14 +47,14 @@ pub(crate) struct Decoder { impl Decoder { /// Construct a new decoder. The caller owns the channel capacity decision. pub fn new( - store: Persistence, + persistence: Persistence, rx: Receiver, partials: Arc>, settlement_program: Pubkey, solflow_program: Pubkey, ) -> Self { Self { - store, + persistence, rx, partials, settlement_program, diff --git a/crates/solana-indexer/src/indexer/finalization.rs b/crates/solana-indexer/src/indexer/finalization.rs index 49309a4091..54a18b219a 100644 --- a/crates/solana-indexer/src/indexer/finalization.rs +++ b/crates/solana-indexer/src/indexer/finalization.rs @@ -44,8 +44,8 @@ pub const SIGNATURE_STATUS_RETENTION_SLOTS: u64 = 150; /// Transaction finalization worker. See the module docs for the two flows it /// runs. pub(crate) struct FinalizationWorker { - /// Store implementor. - pub store: Persistence, + /// Persistence layer. + pub persistence: Persistence, /// RPC implementor. pub rpc: Arc, @@ -53,8 +53,8 @@ pub(crate) struct FinalizationWorker { impl FinalizationWorker { /// Construct a new finalization worker. - pub fn new(store: Persistence, rpc: Arc) -> Self { - Self { store, rpc } + pub fn new(persistence: Persistence, rpc: Arc) -> Self { + Self { persistence, rpc } } /// Outer loop. Runs the promotion pass and the aged-row sweep on a timer. diff --git a/crates/solana-indexer/src/indexer/ingester.rs b/crates/solana-indexer/src/indexer/ingester.rs index 5e6f5fa4a3..6dfb84ce99 100644 --- a/crates/solana-indexer/src/indexer/ingester.rs +++ b/crates/solana-indexer/src/indexer/ingester.rs @@ -41,17 +41,17 @@ pub(crate) struct Ingester { /// `INGEST_TO_DECODER_CAPACITY` entries. pub tx: Sender, - /// Store implementor; used to checkpoint the slot. - pub store: Persistence, + /// Persistence layer, used to checkpoint the slot. + pub persistence: Persistence, } impl Ingester { /// Construct a new ingester. The caller owns the channel capacity decision. - pub fn new(connector: C, tx: Sender, store: Persistence) -> Self { + pub fn new(connector: C, tx: Sender, persistence: Persistence) -> Self { Self { connector, tx, - store, + persistence, } } diff --git a/crates/solana-indexer/src/indexer/watchdog.rs b/crates/solana-indexer/src/indexer/watchdog.rs index 398b556074..f5245da41a 100644 --- a/crates/solana-indexer/src/indexer/watchdog.rs +++ b/crates/solana-indexer/src/indexer/watchdog.rs @@ -31,8 +31,8 @@ use crate::indexer::ingester::LATEST_CHAIN_SLOT; /// `AccountUpdateMissing` or `TxUpdateMissing` depending on which half was /// missing. pub(crate) struct PartialEventWatchdog { - /// Store implementor. - pub store: Persistence, + /// Persistence layer. + pub persistence: Persistence, /// Shared in-memory map of partial events keyed by `PartialEventKey`. /// @@ -43,8 +43,14 @@ pub(crate) struct PartialEventWatchdog { impl PartialEventWatchdog { /// Construct a new watchdog. - pub fn new(store: Persistence, partials: Arc>) -> Self { - Self { store, partials } + pub fn new( + persistence: Persistence, + partials: Arc>, + ) -> Self { + Self { + persistence, + partials, + } } /// Outer loop. Runs the periodic scan over the shared partial-event map.