Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 49 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ tracing-subscriber = { version = "0.3.22", features = ["json"] }
url = "2.5.0"
vergen = "8"
winner-selection = { path = "crates/winner-selection" }
yellowstone-grpc-client = "13.1.0"
yellowstone-grpc-proto = { version = "12.4.0", default-features = false }

[workspace.lints]
Expand Down
3 changes: 3 additions & 0 deletions crates/solana-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@ path = "src/main.rs"
[dependencies]
async-trait = { workspace = true }
bytes = { workspace = true }
dashmap = { workspace = true }
derive_more = { workspace = true }
solana-client = { workspace = true }
solana-sdk = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "sync"] }
tracing = { workspace = true }
yellowstone-grpc-client = { workspace = true }
yellowstone-grpc-proto = { workspace = true }

[lints]
Expand Down
71 changes: 71 additions & 0 deletions crates/solana-indexer/src/indexer/decoder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#![expect(dead_code)]
//! The decoder pulls `StreamUpdate`s from the ingester, decodes
//! settlement-program and SolFlow transactions, joins account-update snapshots,
//! and persists typed events.

// TODO: This file only declares the component skeleton. The `run` body is
// `unimplemented!`; the dispatch logic and persist path arrive in a later
// change.

use {
crate::{
persistence::Persistence,
types::{
errors::PersistenceError,
shared::{PartialEvent, PartialEventKey, StreamUpdate},
},
},
dashmap::DashMap,
solana_sdk::pubkey::Pubkey,
std::sync::Arc,
tokio::sync::mpsc::Receiver,
};

/// Decoder component.
///
/// The watchdog holds a clone of the same `partials` map, so the two operate on
/// the same concurrent map without any message passing between them.
pub(crate) struct Decoder {
/// Persistence layer.
pub persistence: Persistence,

/// Incoming `StreamUpdate` from the ingester.
pub rx: Receiver<StreamUpdate>,

/// Shared in-memory map of partial events keyed by `PartialEventKey`,
/// holding either-half events waiting for their pair. The watchdog holds a
/// clone of this `Arc`.
pub partials: Arc<DashMap<PartialEventKey, PartialEvent>>,

/// Settlement program id (filter target for the decoder).
pub settlement_program: Pubkey,

/// SolFlow program id (filter target for the decoder).
pub solflow_program: Pubkey,
}

impl Decoder {
/// Construct a new decoder. The caller owns the channel capacity decision.
pub fn new(
persistence: Persistence,
rx: Receiver<StreamUpdate>,
partials: Arc<DashMap<PartialEventKey, PartialEvent>>,
settlement_program: Pubkey,
solflow_program: Pubkey,
) -> Self {
Self {
persistence,
rx,
partials,
settlement_program,
solflow_program,
}
}

/// Main loop. Pulls `StreamUpdate` from the receiver, runs the decode
/// pipeline, persists, and records partial events in the shared map for the
/// watchdog to read.
pub async fn run(&mut self) -> Result<(), PersistenceError> {
unimplemented!()
}
}
66 changes: 66 additions & 0 deletions crates/solana-indexer/src/indexer/finalization.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#![expect(dead_code)]
//! The finalization worker updates the commitment level of the transactions
//! tracked by the indexer, promoting rows written at `confirmed` to
//! `finalized`.
//!
//! It does so through two flows. Two are needed because the relevant RPC
//! methods trade off differently: `getSignatureStatuses` is batchable but the
//! node only retains statuses for recent slots, while `getTransaction` reaches
//! arbitrarily old transactions on archival nodes but costs one call per
//! signature. The batched pass handles the common case cheaply; the per-row
//! sweep catches rows that age out of it.
//!
//! - **Promotion pass**: batch-polls `getSignatureStatuses` (at most
//! [`PROMOTION_BATCH_LIMIT`] signatures per call) over rows still at
//! `confirmed` that are at least [`FINALIZATION_WINDOW_SLOTS`] behind the
//! chain tip, and promotes rows whose `confirmationStatus` is `"finalized"`.
//!
//! - **Aged-row sweep**: fallback for rows past the signature-status retention
//! horizon ([`SIGNATURE_STATUS_RETENTION_SLOTS`]), which the promotion pass
//! can no longer check. Each row costs one `getTransaction` call; a non-null
//! response promotes to `finalized`, a null response marks `rolled_back`.

// TODO: This file only declares the component skeleton. The `run` body is
// `unimplemented!`; both flows arrive in a later change.

use {
crate::{persistence::Persistence, traits::solana_client::SolanaClient},
std::sync::Arc,
};

/// Slots a transaction usually needs to finalize (~12.8 s at 400 ms/slot).
/// A heuristic floor, not a guarantee: the promotion pass skips rows fresher
/// than this because they cannot have finalized yet, and degraded consensus
/// can push real finalization later (the aged-row sweep catches those).
pub const FINALIZATION_WINDOW_SLOTS: u64 = 32;

/// Upper limit for the `getSignatureStatuses` batch RPC call.
pub const PROMOTION_BATCH_LIMIT: usize = 256;

/// Approximate slot horizon past which `getSignatureStatuses` no longer returns
/// a result.
pub const SIGNATURE_STATUS_RETENTION_SLOTS: u64 = 150;

/// Transaction finalization worker. See the module docs for the two flows it
/// runs.
pub(crate) struct FinalizationWorker {
/// Persistence layer.
pub persistence: Persistence,

/// RPC implementor.
pub rpc: Arc<dyn SolanaClient>,
}

impl FinalizationWorker {
/// Construct a new finalization worker.
pub fn new(persistence: Persistence, rpc: Arc<dyn SolanaClient>) -> Self {
Self { persistence, rpc }
}

/// Outer loop. Runs the promotion pass and the aged-row sweep on a timer.
///
/// Placeholder for now; implemented in a later change.
pub async fn run(&mut self) {
unimplemented!("implemented in PR 11–12")
}
}
63 changes: 63 additions & 0 deletions crates/solana-indexer/src/indexer/ingester.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#![expect(dead_code)]
//! The ingester owns the yellowstone gRPC stream. It drains the socket as fast
//! as yellowstone delivers, pushes tagged updates into the channel, and updates
//! `LATEST_CHAIN_SLOT` on every slot-filter message. It performs no decoding.

// TODO: This file only declares the component skeleton. The `run` body is
// `unimplemented!`; the actual drain and reconnect with backoff logic arrives
// in a later change.

use {
crate::{persistence::Persistence, types::shared::StreamUpdate},
std::sync::atomic::AtomicU64,
tokio::sync::mpsc::Sender,
yellowstone_grpc_client::GrpcConnector,
};

/// The sole writer is the ingester, on every slot-filter message. Anchors the
/// partial-event watchdog and the finalization worker. Cold start is zero; the
/// watchdog skips its comparison on the first tick.
///
/// This is the chain tip, not indexing progress. How far the indexer has
/// actually persisted is the watermark in `solana.indexer_state`, written by
/// the decoder, which is a separate value.
pub static LATEST_CHAIN_SLOT: AtomicU64 = AtomicU64::new(0);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Using a global static AtomicU64 for LATEST_CHAIN_SLOT introduces shared mutable state across the entire process. This causes race conditions and flakiness when running unit/integration tests in parallel (Cargo's default behavior). It also prevents running multiple indexer instances in the same process.

Actionable Suggestion:
Remove the global static and instead pass an Arc<AtomicU64> (or a shared state struct) to the constructors of Ingester, Decoder, PartialEventWatchdog, and FinalizationWorker.

References
  1. Focus exclusively on identifying missing edge cases, potential race conditions, or logic that deviates from the PR's stated goals. (link)

Comment on lines +17 to +24

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason that this value does not live inside a specific component that gets accessed via a getter?
Also the API is currently fragile. The type is pub and allows anyone to access it. A safer API would be a new type wrapping the counter with an update function that is only accessible in this module for example.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Half of this lands in #4549, where the global static becomes an Arc<AtomicU64> owned by the Ingester, so it's no longer a free-floating global. The newtype-with-restricted-write part isn't done yet though, it's still a raw Arc<AtomicU64> any holder can write to. I'll wrap it in a small ChainTip type in #4549 then.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason that this value does not live inside a specific component that gets accessed via a getter?

Good point, I hadn't thought much about it, but that makes more sense. Thanks @squadgazzz for handling this.

Comment on lines +17 to +24

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be documented that this slot number is completely unrelated to the indexing progress of the system. Obviously depends on the rest of the indexer logic but it sounds like a counter that tracks fully processed slots might be more useful.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documented it. The doc now says LATEST_CHAIN_SLOT is the chain tip, not indexing progress. The processed-slots counter you're describing already exists separately, it's the watermark in solana.indexer_state written by the decoder.


/// Cap on the exponential backoff between reconnect attempts.
pub const RECONNECT_BACKOFF_CAP: std::time::Duration = std::time::Duration::from_secs(30);

/// Capacity of the channel from the ingester to the decoder.
pub const INGEST_TO_DECODER_CAPACITY: usize = 1024;
Comment on lines +29 to +30

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This and a few other consts seems like it should be configurable by a config file.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These move to the config module in #4549.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point, it's just that setting up the configuration scheme was deferred to the end of the PR plan. Happy to have it earlier though.

Comment on lines +29 to +30

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why the ingester is necessary? Why is it better to have it forward the events to a different channel? If the other channel it pushed into does not get cleared fast enough the backpressure will end up here anyway.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UUIC, the ingester exists to keep reading from the stream without pausing since yellowstone disconnects slow readers, and the slowest part of our work is writing to the DB. If one loop reads a message and then writes it to the DB, it stops reading while it writes, messages pile up, and the server drops us. A bigger buffer in the client shouldn't help, because a buffer still needs something to keep emptying it, and that same loop is busy writing.

You're right that a long overload still backs up to the ingester and drops us anyway. The buffer only smooths over short slow spikes, like a burst in one slot or one slow write. So the split isn't about speed or handling more load. It only keeps us from getting disconnected during short slow moments.

@tilacog please correct me if I am wrong.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@squadgazzz summed it up perfectly.

Comment thread
squadgazzz marked this conversation as resolved.

/// Ingester component.
///
/// Generic over a `GrpcConnector` implementor so the unit tests can drive it
/// with a mock.
pub(crate) struct Ingester<C: GrpcConnector> {
/// gRPC connector implementor
pub connector: C,

/// Sends `StreamUpdate` to the decoder. Should be bounded to
/// `INGEST_TO_DECODER_CAPACITY` entries.
pub tx: Sender<StreamUpdate>,
Comment on lines +40 to +42

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment says the size of the channel should be bounded to a unit of time which does not seem to make sense.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, the doc now references INGEST_TO_DECODER_CAPACITY (the channel capacity), not the backoff duration.


/// Persistence layer, used to checkpoint the slot.
pub persistence: Persistence,
}

impl<C: GrpcConnector> Ingester<C> {
/// Construct a new ingester. The caller owns the channel capacity decision.
pub fn new(connector: C, tx: Sender<StreamUpdate>, persistence: Persistence) -> Self {
Self {
connector,
tx,
persistence,
}
}

/// TODO: Outer loop: open the subscription, drain it, push into the
/// channel, reconnect on failure with exponential backoff.
pub async fn run(&mut self) {
unimplemented!()
}
}
Loading
Loading