diff --git a/crates/solana-indexer/src/indexer/ingester.rs b/crates/solana-indexer/src/indexer/ingester.rs index 7d3fe9c685..785c5ac4ff 100644 --- a/crates/solana-indexer/src/indexer/ingester.rs +++ b/crates/solana-indexer/src/indexer/ingester.rs @@ -401,3 +401,6 @@ fn subscribe_request( ..Default::default() } } + +#[cfg(test)] +mod tests; diff --git a/crates/solana-indexer/src/indexer/ingester/tests.rs b/crates/solana-indexer/src/indexer/ingester/tests.rs new file mode 100644 index 0000000000..b23d030f7c --- /dev/null +++ b/crates/solana-indexer/src/indexer/ingester/tests.rs @@ -0,0 +1,215 @@ +use { + super::{Error, INGEST_TO_DECODER_CAPACITY, Ingester}, + crate::types::{ + Signature, + shared::StreamUpdate, + slot::Slot, + wire::{ + SubscribeUpdate, + SubscribeUpdateAccount, + SubscribeUpdateAccountInfo, + SubscribeUpdateBlock, + SubscribeUpdateBlockMeta, + SubscribeUpdateEntry, + SubscribeUpdatePing, + SubscribeUpdatePong, + SubscribeUpdateSlot, + SubscribeUpdateTransaction, + SubscribeUpdateTransactionInfo, + SubscribeUpdateTransactionStatus, + UpdateOneof, + }, + }, + futures::stream, + std::sync::{ + Arc, + atomic::{AtomicU64, Ordering}, + }, + tokio::sync::mpsc::channel, + yellowstone_grpc_proto::tonic::Status, +}; + +fn signature(n: u8) -> Signature { + Signature::from([n; 64]) +} + +fn signature_bytes(n: u8) -> Vec { + signature(n).as_ref().to_vec() +} + +fn tx_update(slot: u64, sig: u8) -> Result { + Ok(SubscribeUpdate { + update_oneof: Some(UpdateOneof::Transaction(SubscribeUpdateTransaction { + slot, + transaction: Some(SubscribeUpdateTransactionInfo { + signature: signature_bytes(sig), + ..Default::default() + }), + })), + ..Default::default() + }) +} + +fn account_update(slot: u64, sig: u8) -> Result { + Ok(SubscribeUpdate { + update_oneof: Some(UpdateOneof::Account(SubscribeUpdateAccount { + slot, + account: Some(SubscribeUpdateAccountInfo { + txn_signature: Some(signature_bytes(sig)), + ..Default::default() + }), + ..Default::default() + })), + ..Default::default() + }) +} + +fn slot_update(slot: u64) -> Result { + Ok(SubscribeUpdate { + update_oneof: Some(UpdateOneof::Slot(SubscribeUpdateSlot { + slot, + ..Default::default() + })), + ..Default::default() + }) +} + +fn update_of(update: UpdateOneof) -> Result { + Ok(SubscribeUpdate { + update_oneof: Some(update), + ..Default::default() + }) +} + +fn ingester( + stream: impl stream::Stream> + Unpin + Send, +) -> ( + Ingester> + Unpin + Send>, + tokio::sync::mpsc::Receiver, + Arc, +) { + let (tx, rx) = channel(INGEST_TO_DECODER_CAPACITY); + let latest_chain_slot = Arc::new(AtomicU64::new(0)); + ( + Ingester::new(stream, tx, latest_chain_slot.clone()), + rx, + latest_chain_slot, + ) +} + +#[tokio::test] +async fn transaction_update_with_valid_signature_is_forwarded() { + let signature = signature(1); + let (mut ingester, mut rx, _) = ingester(stream::iter([tx_update(42, 1)])); + + assert!(matches!(ingester.run().await, Err(Error::StreamEnded))); + let update = rx.recv().await.unwrap(); + assert!( + matches!(update, StreamUpdate::Tx { slot: Slot(42), signature: s, .. } if s == signature) + ); + assert!(rx.is_empty()); +} + +#[tokio::test] +async fn account_update_with_body_is_forwarded() { + let signature = signature(2); + let (mut ingester, mut rx, _) = ingester(stream::iter([account_update(100, 2)])); + + assert!(matches!(ingester.run().await, Err(Error::StreamEnded))); + let update = rx.recv().await.unwrap(); + assert!( + matches!(update, StreamUpdate::Account { slot: Slot(100), txn_signature: Some(s), .. } if s == signature) + ); + assert!(rx.is_empty()); +} + +#[tokio::test] +async fn slot_update_advances_latest_chain_slot() { + let (mut ingester, _rx, slot) = ingester(stream::iter([slot_update(9_001)])); + + assert!(matches!(ingester.run().await, Err(Error::StreamEnded))); + assert_eq!(slot.load(Ordering::Relaxed), 9_001); +} + +#[tokio::test] +async fn unrelated_and_empty_updates_are_ignored() { + let (mut ingester, mut rx, slot) = ingester(stream::iter([ + Ok(SubscribeUpdate::default()), + update_of(UpdateOneof::Ping(SubscribeUpdatePing::default())), + update_of(UpdateOneof::Pong(SubscribeUpdatePong::default())), + update_of(UpdateOneof::TransactionStatus( + SubscribeUpdateTransactionStatus::default(), + )), + update_of(UpdateOneof::Block(SubscribeUpdateBlock::default())), + update_of(UpdateOneof::BlockMeta(SubscribeUpdateBlockMeta::default())), + update_of(UpdateOneof::Entry(SubscribeUpdateEntry::default())), + tx_update(7, 3), + ])); + + assert!(matches!(ingester.run().await, Err(Error::StreamEnded))); + let update = rx.recv().await.unwrap(); + assert!( + matches!(update, StreamUpdate::Tx { slot: Slot(7), signature: s, .. } if s == signature(3)) + ); + assert!(rx.is_empty()); + assert_eq!(slot.load(Ordering::Relaxed), 0); +} + +#[tokio::test] +async fn transaction_without_body_or_malformed_signature_is_skipped() { + let signature = signature(4); + let (mut ingester, mut rx, _) = ingester(stream::iter([ + Ok(SubscribeUpdate { + update_oneof: Some(UpdateOneof::Transaction(SubscribeUpdateTransaction { + slot: 1, + transaction: None, + })), + ..Default::default() + }), + Ok(SubscribeUpdate { + update_oneof: Some(UpdateOneof::Transaction(SubscribeUpdateTransaction { + slot: 2, + transaction: Some(SubscribeUpdateTransactionInfo { + signature: vec![1, 2, 3], + ..Default::default() + }), + })), + ..Default::default() + }), + tx_update(3, 4), + ])); + + assert!(matches!(ingester.run().await, Err(Error::StreamEnded))); + let update = rx.recv().await.unwrap(); + assert!( + matches!(update, StreamUpdate::Tx { slot: Slot(3), signature: s, .. } if s == signature) + ); + assert!(rx.is_empty()); +} + +#[tokio::test] +async fn terminal_grpc_status_returns_stream_error() { + let status = Status::invalid_argument("boom"); + let (mut ingester, _rx, _) = ingester(stream::iter([Err(status.clone())])); + + let result = ingester.run().await; + assert!( + matches!(result, Err(Error::Stream(s)) if s.code() == status.code() && s.message() == status.message()) + ); +} + +#[tokio::test] +async fn clean_stream_end_returns_stream_ended() { + let (mut ingester, _rx, _) = + ingester(stream::iter(Vec::>::new())); + + assert!(matches!(ingester.run().await, Err(Error::StreamEnded))); +} + +#[tokio::test] +async fn closed_decoder_receiver_stops_cleanly() { + let (mut ingester, rx, _) = ingester(stream::iter([tx_update(1, 5)])); + drop(rx); + + assert!(ingester.run().await.is_ok()); +} diff --git a/crates/solana-indexer/src/types/wire.rs b/crates/solana-indexer/src/types/wire.rs index c52e94703a..25216531bb 100644 --- a/crates/solana-indexer/src/types/wire.rs +++ b/crates/solana-indexer/src/types/wire.rs @@ -13,10 +13,15 @@ pub use yellowstone_grpc_proto::{ SubscribeUpdate, SubscribeUpdateAccount, SubscribeUpdateAccountInfo, + SubscribeUpdateBlock, + SubscribeUpdateBlockMeta, + SubscribeUpdateEntry, SubscribeUpdatePing, + SubscribeUpdatePong, SubscribeUpdateSlot, SubscribeUpdateTransaction, SubscribeUpdateTransactionInfo, + SubscribeUpdateTransactionStatus, subscribe_update::UpdateOneof, }, solana::storage::confirmed_block::{