diff --git a/Cargo.lock b/Cargo.lock index 62ec469e9..1a8e0dd97 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3770,6 +3770,7 @@ version = "0.12.0" dependencies = [ "agave-feature-set", "anyhow", + "async-trait", "borsh", "fd-lock", "magic-domain-program", @@ -3875,6 +3876,7 @@ dependencies = [ "solana-transaction-error 3.2.0", "spl-token-2022-interface", "spl-token-interface", + "tempfile", "thiserror 2.0.18", "tokio", "tokio-stream", diff --git a/magicblock-api/Cargo.toml b/magicblock-api/Cargo.toml index cdef84a27..d33762012 100644 --- a/magicblock-api/Cargo.toml +++ b/magicblock-api/Cargo.toml @@ -9,6 +9,7 @@ edition.workspace = true [dependencies] anyhow = { workspace = true } +async-trait = { workspace = true } borsh = "1.5.3" fd-lock = { workspace = true } tracing = { workspace = true } diff --git a/magicblock-api/src/magic_validator.rs b/magicblock-api/src/magic_validator.rs index 7515f2431..996f0646f 100644 --- a/magicblock-api/src/magic_validator.rs +++ b/magicblock-api/src/magic_validator.rs @@ -19,8 +19,10 @@ use magicblock_aperture::{ state::{NodeContext, SharedState}, }; use magicblock_chainlink::{ - config::ChainlinkConfig, remote_account_provider::Endpoints, ProdChainlink, - ProdInnerChainlink, + config::ChainlinkConfig, + fetch_cloner::{UndelegationScheduleRequest, UndelegationScheduler}, + remote_account_provider::Endpoints, + ProdChainlink, ProdInnerChainlink, }; use magicblock_committor_service::{ config::ChainConfig, BaseIntentCommittor, CommittorService, @@ -99,6 +101,29 @@ use crate::{ type InnerChainlinkImpl = ProdInnerChainlink; type ChainlinkImpl = ProdChainlink; +/// Bridges chainlink's [`UndelegationScheduler`] to the committor service so a +/// delegated clone rejected by AML is undelegated on the base layer. +struct CommittorUndelegationScheduler(Arc); + +#[async_trait::async_trait] +impl UndelegationScheduler for CommittorUndelegationScheduler { + async fn schedule_undelegation( + &self, + request: UndelegationScheduleRequest, + ) -> magicblock_chainlink::errors::ChainlinkResult<()> { + let pubkey = request.pubkey; + self.0 + .schedule_undelegation(pubkey, request.account) + .await + .map_err(|err| format!("committor response channel closed: {err}")) + .and_then(|result| result.map_err(|err| err.to_string())) + .map_err(|message| { + magicblock_chainlink::errors::ChainlinkError::FailedToScheduleUndelegationAfterAmlRejection( + pubkey, message, + ) + }) + } +} // ----------------- // MagicValidator @@ -238,6 +263,7 @@ impl MagicValidator { &ledger.latest_block().clone(), &accountsdb, shared_chain_slot.clone(), + committor_service.clone(), ) .await?, ); @@ -495,6 +521,7 @@ impl MagicValidator { latest_block: &LatestBlock, accountsdb: &Arc, chain_slot: Option>, + committor_service: Option>, ) -> ApiResult { if Self::replication_mode_uses_disabled_chainlink( &config.validator.replication_mode, @@ -550,6 +577,10 @@ impl MagicValidator { &config.chainlink, config.storage.as_path(), chain_slot.unwrap_or_default(), + committor_service.map(|committor_service| { + Arc::new(CommittorUndelegationScheduler(committor_service)) + as Arc + }), ) .await?; diff --git a/magicblock-chainlink/Cargo.toml b/magicblock-chainlink/Cargo.toml index 29f664fcf..ce757d365 100644 --- a/magicblock-chainlink/Cargo.toml +++ b/magicblock-chainlink/Cargo.toml @@ -58,6 +58,7 @@ url = { workspace = true } [dev-dependencies] assert_matches = { workspace = true } magicblock-chainlink = { path = ".", features = ["dev-context"] } +tempfile = { workspace = true } [features] default = [] diff --git a/magicblock-chainlink/src/chainlink/errors.rs b/magicblock-chainlink/src/chainlink/errors.rs index 6370d9037..145bdfebf 100644 --- a/magicblock-chainlink/src/chainlink/errors.rs +++ b/magicblock-chainlink/src/chainlink/errors.rs @@ -67,6 +67,11 @@ pub enum ChainlinkError { #[error("Failed to perform Range risk check: {0}")] RangeRisk(#[from] RiskError), + #[error( + "Failed to schedule undelegation for {0} after AML rejection: {1}" + )] + FailedToScheduleUndelegationAfterAmlRejection(Pubkey, String), + #[error("Chainlink is disabled for non-primary mode")] DisabledForNonPrimaryMode, } diff --git a/magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs b/magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs index fe323029f..b0a336c53 100644 --- a/magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs +++ b/magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs @@ -8,6 +8,7 @@ use std::{ time::Duration, }; +use async_trait::async_trait; use dlp_api::{ pda::delegation_record_pda_from_delegated_account, state::DelegationRecord, }; @@ -85,6 +86,23 @@ use crate::{ }, }; +#[derive(Clone)] +pub struct UndelegationScheduleRequest { + pub pubkey: Pubkey, + pub account: AccountSharedData, +} + +/// Schedules an undelegation when a delegated clone is rejected by AML before +/// it enters the local bank. Implemented outside chainlink (e.g. by the API +/// layer bridging to the committor service) to keep chainlink decoupled. +#[async_trait] +pub trait UndelegationScheduler: Send + Sync { + async fn schedule_undelegation( + &self, + request: UndelegationScheduleRequest, + ) -> ChainlinkResult<()>; +} + pub struct FetchCloner where T: ChainRpcClient, @@ -138,6 +156,10 @@ where /// Risk checker for post-delegation action addresses. risk_service: Option>, + + /// Schedules undelegation when post-delegation action AML checks reject + /// a delegated clone before it enters the local bank. + undelegation_scheduler: Option>, } /// Negative-cache capacity for known-empty eATAs. @@ -177,6 +199,7 @@ where .pending_operation_timeout_ms .clone(), risk_service: self.risk_service.clone(), + undelegation_scheduler: self.undelegation_scheduler.clone(), } } } @@ -198,6 +221,7 @@ where subscription_updates_rx: mpsc::Receiver, allowed_programs: Option>, risk_service: Option>, + undelegation_scheduler: Option>, ) -> Arc { let validator_pubkey = validator_keypair.pubkey(); let blacklisted_accounts = blacklisted_accounts(&validator_pubkey); @@ -225,6 +249,7 @@ where FETCH_CLONE_OPERATION_TIMEOUT.as_millis() as u64, )), risk_service, + undelegation_scheduler, }); let accounts_bank_for_eviction = accounts_bank.clone(); @@ -510,16 +535,49 @@ where )); } - self.ensure_delegation_action_dependencies( - request.pubkey, - request.account.remote_slot(), - &request.delegation_actions, - ) - .await?; + if let Err(err) = self + .ensure_delegation_action_dependencies( + request.pubkey, + request.account.remote_slot(), + &request.delegation_actions, + ) + .await + { + if matches!( + err, + ChainlinkError::RangeRisk( + magicblock_aml::RiskError::HighRiskAddresses(_) + ) + ) { + self.schedule_undelegation_after_aml_rejection(&request) + .await?; + } + return Err(err); + } Ok(self.clone_account_with_ownership(request).await?) } + async fn schedule_undelegation_after_aml_rejection( + &self, + request: &AccountCloneRequest, + ) -> ChainlinkResult<()> { + let Some(scheduler) = self.undelegation_scheduler.as_ref() else { + warn!( + pubkey = %request.pubkey, + "AML rejected post-delegation actions but undelegation scheduler is unavailable" + ); + return Ok(()); + }; + + scheduler + .schedule_undelegation(UndelegationScheduleRequest { + pubkey: request.pubkey, + account: request.account.clone(), + }) + .await + } + fn normalize_unresolved_dlp_clone_request( &self, request: &mut AccountCloneRequest, diff --git a/magicblock-chainlink/src/chainlink/fetch_cloner/tests.rs b/magicblock-chainlink/src/chainlink/fetch_cloner/tests.rs index 434614157..7ed5027e6 100644 --- a/magicblock-chainlink/src/chainlink/fetch_cloner/tests.rs +++ b/magicblock-chainlink/src/chainlink/fetch_cloner/tests.rs @@ -1,6 +1,13 @@ -use std::{collections::HashMap, sync::Arc, time::Duration}; +use std::{ + collections::HashMap, + io::{Read, Write}, + net::TcpListener, + sync::{Arc, Mutex}, + time::Duration, +}; use dlp_api::state::DelegationRecord; +use magicblock_config::config::RiskConfig; use solana_account::{ Account, AccountSharedData, ReadableAccount, WritableAccount, }; @@ -203,6 +210,111 @@ where } } +#[derive(Default)] +struct RecordingUndelegationScheduler { + requests: Mutex>, +} + +impl RecordingUndelegationScheduler { + fn requests(&self) -> Vec { + self.requests.lock().unwrap().clone() + } +} + +#[async_trait::async_trait] +impl UndelegationScheduler for RecordingUndelegationScheduler { + async fn schedule_undelegation( + &self, + request: UndelegationScheduleRequest, + ) -> ChainlinkResult<()> { + self.requests.lock().unwrap().push(request); + Ok(()) + } +} + +struct MockRiskServer { + base_url: String, + worker: tokio::task::JoinHandle<()>, +} + +impl MockRiskServer { + async fn start( + address_scores: Vec<(String, u64)>, + expected_calls: usize, + ) -> Self { + let listener = + TcpListener::bind("127.0.0.1:0").expect("bind mock risk server"); + let addr = listener.local_addr().expect("mock risk server address"); + let score_by_address: HashMap = + address_scores.into_iter().collect(); + + let worker = tokio::task::spawn_blocking(move || { + for _ in 0..expected_calls { + let (mut stream, _) = + listener.accept().expect("accept mock risk request"); + stream + .set_read_timeout(Some(Duration::from_secs(2))) + .expect("set mock risk read timeout"); + + let mut buffer = [0u8; 4096]; + let read = stream.read(&mut buffer).expect("read request"); + let request = String::from_utf8_lossy(&buffer[..read]); + let address = extract_query_value(&request, "address") + .expect("missing address query"); + assert!(request.starts_with("GET /risk/address?")); + assert!(request.contains("network=solana")); + + let score = score_by_address + .get(&address) + .expect("unexpected risk address"); + let body = format!(r#"{{"riskScore":{score}}}"#); + let response = format!( + "HTTP/1.1 200 OK\r\ncontent-type: application/json\r\ncontent-length: {}\r\nconnection: close\r\n\r\n{}", + body.len(), + body + ); + stream + .write_all(response.as_bytes()) + .expect("write mock risk response"); + } + }); + + Self { + base_url: format!("http://{addr}"), + worker, + } + } + + async fn join(self) { + self.worker.await.expect("mock risk server panicked"); + } +} + +fn extract_query_value(request: &str, key: &str) -> Option { + let query = request + .lines() + .next()? + .split_whitespace() + .nth(1)? + .split('?') + .nth(1)?; + query.split('&').find_map(|part| { + let (k, v) = part.split_once('=')?; + (k == key).then(|| v.to_string()) + }) +} + +fn make_risk_config(base_url: String) -> RiskConfig { + RiskConfig { + enabled: true, + base_url, + api_key: Some("test-api-key".to_string()), + cache_ttl: Duration::from_secs(60), + request_timeout: Duration::from_secs(2), + risk_score_threshold: 7, + } +} + fn insert_plain_ata_in_bank( accounts_bank: &Arc, ata_pubkey: Pubkey, @@ -292,6 +404,7 @@ fn init_fetch_cloner( subscription_rx, None, None, + None, ); (fetch_cloner, subscription_tx, cloner) } @@ -2434,6 +2547,7 @@ async fn test_allowed_programs_filters_programs() { subscription_rx, allowed_programs, None, + None, ); // Fetch and clone both programs @@ -2506,6 +2620,7 @@ async fn test_allowed_programs_none_allows_all() { subscription_rx, None, // No restriction None, + None, ); // Fetch and clone both programs @@ -2577,6 +2692,7 @@ async fn test_allowed_programs_empty_allows_all() { subscription_rx, allowed_programs, None, + None, ); // Fetch and clone both programs @@ -4278,6 +4394,98 @@ async fn test_post_delegation_actions_reject_non_delegated_clone_target() { ); } +#[tokio::test] +async fn test_post_delegation_action_aml_rejection_schedules_undelegation() { + init_logger(); + let validator_keypair = Keypair::new(); + const CURRENT_SLOT: u64 = 100; + + let FetcherTestCtx { + accounts_bank, + remote_account_provider, + .. + } = setup( + std::iter::empty::<(Pubkey, Account)>(), + CURRENT_SLOT, + validator_keypair.insecure_clone(), + ) + .await; + + let high_risk_signer = random_pubkey(); + let server = + MockRiskServer::start(vec![(high_risk_signer.to_string(), 9)], 1).await; + let temp_ledger = tempfile::tempdir().expect("temp ledger"); + let risk_service = RiskService::try_from_config( + &make_risk_config(server.base_url.clone()), + temp_ledger.path(), + ) + .expect("risk config should be valid") + .expect("risk service should be enabled"); + let risk_service = Arc::new(risk_service); + + let scheduler = Arc::new(RecordingUndelegationScheduler::default()); + let cloner = Arc::new(ClonerStub::new(accounts_bank.clone())); + let (_subscription_tx, subscription_rx) = mpsc::channel(100); + let fetch_cloner = FetchCloner::new( + &remote_account_provider, + &accounts_bank, + &cloner, + validator_keypair.insecure_clone(), + subscription_rx, + None, + Some(risk_service), + Some(scheduler.clone() as Arc), + ); + + let target_pubkey = random_pubkey(); + let mut target_account = AccountSharedData::from(Account { + lamports: 1_000_000, + data: vec![1, 2, 3, 4], + owner: system_program::id(), + executable: false, + rent_epoch: 0, + }); + target_account.set_remote_slot(CURRENT_SLOT); + target_account.set_delegated(true); + + let actions = DelegationActions::from(vec![Instruction::new_with_bytes( + system_program::id(), + &[1], + vec![AccountMeta::new_readonly(high_risk_signer, true)], + )]); + + let err = fetch_cloner + .clone_account_with_post_delegation_action_invariants( + AccountCloneRequest { + pubkey: target_pubkey, + account: target_account.clone(), + commit_frequency_ms: None, + delegation_actions: actions, + delegated_to_other: None, + }, + ) + .await + .expect_err("high-risk signer should reject the clone"); + + assert!(matches!( + err, + ChainlinkError::RangeRisk( + magicblock_aml::RiskError::HighRiskAddresses(_) + ) + )); + assert!( + cloner.clone_requests().is_empty(), + "AML-rejected action target must not be cloned" + ); + + let scheduled = scheduler.requests(); + assert_eq!(scheduled.len(), 1); + assert_eq!(scheduled[0].pubkey, target_pubkey); + assert_eq!(scheduled[0].account, target_account); + + server.join().await; +} + #[tokio::test] async fn test_dlp_owned_clone_without_actions_clears_stale_delegated_flag() { init_logger(); @@ -5275,6 +5483,7 @@ async fn test_fetch_subscription_race_duplicate_clone() { subscription_rx, None, None, + None, ); // Send subscription update (this will become the owner). @@ -5396,6 +5605,7 @@ async fn test_delegated_account_fetch_subscription_race() { subscription_rx, None, None, + None, ); // Send subscription update. @@ -5503,6 +5713,7 @@ async fn test_clone_ownership_failure_propagates_to_waiters() { subscription_rx, None, None, + None, ); // Send subscription update (becomes owner, will fail). @@ -6373,6 +6584,7 @@ async fn test_owned_operation_owner_timeout_cleans_up_pending() { subscription_rx, None, None, + None, ); fetch_cloner.set_pending_operation_timeout(TEST_PENDING_REQUEST_TIMEOUT); @@ -6478,6 +6690,7 @@ async fn test_cancel_pending_terminates_owner_and_all_waiters() { subscription_rx, None, None, + None, ); let owner_task = { @@ -6599,6 +6812,7 @@ async fn test_cancel_all_pending_on_shutdown() { subscription_rx, None, None, + None, ); let mut tasks = Vec::new(); diff --git a/magicblock-chainlink/src/chainlink/mod.rs b/magicblock-chainlink/src/chainlink/mod.rs index 37dd5084b..293bdc9dc 100644 --- a/magicblock-chainlink/src/chainlink/mod.rs +++ b/magicblock-chainlink/src/chainlink/mod.rs @@ -5,7 +5,7 @@ use std::{ use dlp_api::pda::ephemeral_balance_pda_from_payer; use errors::{ChainlinkError, ChainlinkResult}; -use fetch_cloner::FetchCloner; +use fetch_cloner::{FetchCloner, UndelegationScheduler}; use magicblock_accounts_db::{traits::AccountsBank, AccountsDb}; use magicblock_aml::RiskService; use magicblock_config::config::ChainLinkConfig; @@ -220,7 +220,8 @@ impl accounts_bank, cloner, config, - chainlink_config + chainlink_config, + undelegation_scheduler ))] pub async fn try_new_from_endpoints( endpoints: &Endpoints, @@ -232,6 +233,7 @@ impl chainlink_config: &ChainLinkConfig, ledger_path: &Path, chain_slot: Arc, + undelegation_scheduler: Option>, ) -> ChainlinkResult< InnerChainlink< ChainRpcClientImpl, @@ -266,6 +268,7 @@ impl rx, chainlink_config.allowed_programs.clone(), risk_service, + undelegation_scheduler, ); Some(fetch_cloner) } else { diff --git a/magicblock-chainlink/tests/10_aml_undelegation.rs b/magicblock-chainlink/tests/10_aml_undelegation.rs new file mode 100644 index 000000000..ece558da4 --- /dev/null +++ b/magicblock-chainlink/tests/10_aml_undelegation.rs @@ -0,0 +1,252 @@ +use std::{ + collections::HashMap, + io::{Read, Write}, + net::TcpListener, + sync::{Arc, Mutex}, + time::Duration, +}; + +use dlp_api::{ + args::{ + EncryptedBuffer, MaybeEncryptedAccountMeta, MaybeEncryptedInstruction, + MaybeEncryptedIxData, PostDelegationActions, + }, + pda::delegation_record_pda_from_delegated_account, + state::DelegationRecord, +}; +use magicblock_aml::RiskService; +use magicblock_chainlink::{ + chainlink::errors::ChainlinkResult, + fetch_cloner::{UndelegationScheduleRequest, UndelegationScheduler}, + testing::init_logger, +}; +use magicblock_config::config::RiskConfig; +use solana_account::{Account, ReadableAccount}; +use solana_pubkey::Pubkey; +use solana_sdk_ids::system_program; +use tokio::task::JoinHandle; +use utils::test_context::TestContext; + +mod utils; + +#[derive(Default)] +struct RecordingUndelegationScheduler { + requests: Mutex>, +} + +impl RecordingUndelegationScheduler { + fn requests(&self) -> Vec { + self.requests.lock().unwrap().clone() + } +} + +#[async_trait::async_trait] +impl UndelegationScheduler for RecordingUndelegationScheduler { + async fn schedule_undelegation( + &self, + request: UndelegationScheduleRequest, + ) -> ChainlinkResult<()> { + self.requests.lock().unwrap().push(request); + Ok(()) + } +} + +struct MockRiskServer { + base_url: String, + worker: JoinHandle<()>, +} + +impl MockRiskServer { + async fn start( + address_scores: Vec<(String, u64)>, + expected_calls: usize, + ) -> Self { + let listener = + TcpListener::bind("127.0.0.1:0").expect("bind mock risk server"); + let addr = listener.local_addr().expect("mock risk server address"); + let score_by_address: HashMap = + address_scores.into_iter().collect(); + + let worker = tokio::task::spawn_blocking(move || { + for _ in 0..expected_calls { + let (mut stream, _) = + listener.accept().expect("accept mock risk request"); + stream + .set_read_timeout(Some(Duration::from_secs(2))) + .expect("set mock risk read timeout"); + + let mut buffer = [0u8; 4096]; + let read = stream.read(&mut buffer).expect("read request"); + let request = String::from_utf8_lossy(&buffer[..read]); + let address = extract_query_value(&request, "address") + .expect("missing address query"); + assert!(request.starts_with("GET /risk/address?")); + assert!(request.contains("network=solana")); + + let score = score_by_address + .get(&address) + .expect("unexpected risk address"); + let body = format!(r#"{{"riskScore":{score}}}"#); + let response = format!( + "HTTP/1.1 200 OK\r\ncontent-type: application/json\r\ncontent-length: {}\r\nconnection: close\r\n\r\n{}", + body.len(), + body + ); + stream + .write_all(response.as_bytes()) + .expect("write mock risk response"); + } + }); + + Self { + base_url: format!("http://{addr}"), + worker, + } + } + + async fn join(self) { + self.worker.await.expect("mock risk server panicked"); + } +} + +fn extract_query_value(request: &str, key: &str) -> Option { + let query = request + .lines() + .next()? + .split_whitespace() + .nth(1)? + .split('?') + .nth(1)?; + query.split('&').find_map(|part| { + let (k, v) = part.split_once('=')?; + (k == key).then(|| v.to_string()) + }) +} + +fn risk_config(base_url: String) -> RiskConfig { + RiskConfig { + enabled: true, + base_url, + api_key: Some("test-api-key".to_string()), + cache_ttl: Duration::from_secs(60), + request_timeout: Duration::from_secs(2), + risk_score_threshold: 7, + } +} + +fn add_delegation_record_with_signer_action( + ctx: &TestContext, + delegated_pubkey: Pubkey, + owner: Pubkey, + signer: Pubkey, +) { + let record = DelegationRecord { + authority: ctx.validator_pubkey, + owner, + delegation_slot: 1, + lamports: 1_000, + commit_frequency_ms: 2_000, + }; + let mut data = vec![0; DelegationRecord::size_with_discriminator()]; + record.to_bytes_with_discriminator(&mut data).unwrap(); + + let actions = PostDelegationActions { + inserted_signers: 0, + inserted_non_signers: 0, + signers: vec![*signer.as_array(), *system_program::id().as_array()], + non_signers: vec![], + instructions: vec![MaybeEncryptedInstruction { + program_id: 1, + accounts: vec![MaybeEncryptedAccountMeta::ClearText( + dlp_api::compact::AccountMeta::new_readonly(0, true), + )], + data: MaybeEncryptedIxData { + prefix: vec![1], + suffix: EncryptedBuffer::default(), + }, + }], + }; + data.extend_from_slice(&borsh::to_vec(&actions).unwrap()); + + ctx.rpc_client.add_account( + delegation_record_pda_from_delegated_account(&delegated_pubkey), + Account { + owner: dlp_api::id(), + data, + ..Default::default() + }, + ); +} + +#[tokio::test] +async fn post_delegation_aml_rejection_schedules_undelegation() { + init_logger(); + + let high_risk_signer = Pubkey::new_unique(); + let server = + MockRiskServer::start(vec![(high_risk_signer.to_string(), 9)], 1).await; + let temp_ledger = tempfile::tempdir().expect("temp ledger"); + let risk_service = RiskService::try_from_config( + &risk_config(server.base_url.clone()), + temp_ledger.path(), + ) + .expect("risk config should be valid") + .expect("risk service should be enabled"); + let risk_service = Arc::new(risk_service); + let scheduler = Arc::new(RecordingUndelegationScheduler::default()); + + let slot = 100; + let ctx = TestContext::init_with_services( + slot, + Some(risk_service), + Some(scheduler.clone() as Arc), + ) + .await; + + let delegated_pubkey = Pubkey::new_unique(); + let owner = system_program::id(); + ctx.rpc_client.add_account( + delegated_pubkey, + Account { + lamports: 1_000_000, + data: vec![1, 2, 3, 4], + owner: dlp_api::id(), + executable: false, + rent_epoch: 0, + }, + ); + add_delegation_record_with_signer_action( + &ctx, + delegated_pubkey, + owner, + high_risk_signer, + ); + + let err = ctx + .ensure_account(&delegated_pubkey) + .await + .expect_err("high-risk signer should reject the clone"); + + assert!( + matches!( + &err, + magicblock_chainlink::errors::ChainlinkError::PendingRequestOwnerFailed( + pubkey, + message, + ) if *pubkey == delegated_pubkey && message.contains("high risk") + ), + "unexpected error: {err:?}" + ); + assert!( + ctx.cloner.clone_requests().is_empty(), + "AML-rejected action target must not be cloned" + ); + + let scheduled = scheduler.requests(); + assert_eq!(scheduled.len(), 1); + assert_eq!(scheduled[0].pubkey, delegated_pubkey); + assert!(scheduled[0].account.delegated()); + assert_eq!(scheduled[0].account.owner(), &owner); + + server.join().await; +} diff --git a/magicblock-chainlink/tests/utils/test_context.rs b/magicblock-chainlink/tests/utils/test_context.rs index c78854c7f..bfcf4ddeb 100644 --- a/magicblock-chainlink/tests/utils/test_context.rs +++ b/magicblock-chainlink/tests/utils/test_context.rs @@ -5,10 +5,11 @@ use std::{ time::{Duration, Instant}, }; +use magicblock_aml::RiskService; use magicblock_chainlink::{ accounts_bank::mock::AccountsBankStub, errors::ChainlinkResult, - fetch_cloner::{FetchAndCloneResult, FetchCloner}, + fetch_cloner::{FetchAndCloneResult, FetchCloner, UndelegationScheduler}, remote_account_provider::{ chain_pubsub_client::{mock::ChainPubsubClientMock, ChainPubsubClient}, config::RemoteAccountProviderConfig, @@ -55,6 +56,14 @@ pub struct TestContext { impl TestContext { pub async fn init(slot: Slot) -> Self { + Self::init_with_services(slot, None, None).await + } + + pub async fn init_with_services( + slot: Slot, + risk_service: Option>, + undelegation_scheduler: Option>, + ) -> Self { let (rpc_client, pubsub_client) = { let rpc_client = ChainRpcClientMockBuilder::new().slot(slot).build(); @@ -102,7 +111,8 @@ impl TestContext { validator_keypair.insecure_clone(), rx, None, - None, + risk_service, + undelegation_scheduler, )), Some(provider), ) diff --git a/magicblock-committor-service/src/committor_processor.rs b/magicblock-committor-service/src/committor_processor.rs index 5061eb4b0..2649dd746 100644 --- a/magicblock-committor-service/src/committor_processor.rs +++ b/magicblock-committor-service/src/committor_processor.rs @@ -14,7 +14,7 @@ use magicblock_program::magic_scheduled_base_intent::{ }; use magicblock_rpc_client::MagicblockRpcClient; use magicblock_table_mania::{GarbageCollectorConfig, TableMania}; -use solana_account::Account; +use solana_account::{Account, AccountSharedData}; use solana_keypair::Keypair; use solana_pubkey::Pubkey; use solana_rpc_client::nonblocking::rpc_client::RpcClient; @@ -229,6 +229,18 @@ impl CommittorProcessor { Ok(()) } + #[instrument(skip(self, account))] + pub async fn schedule_undelegation( + &self, + pubkey: Pubkey, + account: AccountSharedData, + ) -> CommittorServiceResult<()> { + self.schedule_intent_bundle(vec![undelegation_intent_bundle( + pubkey, account, + )]) + .await + } + #[instrument(skip(self, intent_bundles))] pub async fn schedule_recovered_intent_bundles( &self, @@ -263,6 +275,69 @@ impl CommittorProcessor { } } +pub(crate) fn undelegation_intent_bundle( + pubkey: Pubkey, + account: AccountSharedData, +) -> ScheduledIntentBundle { + let committed_account = CommittedAccount::from((pubkey, account)); + let remote_slot = committed_account.remote_slot; + let intent_bundle = MagicIntentBundle { + commit_and_undelegate: Some(CommitAndUndelegate { + commit_action: CommitType::Standalone(vec![committed_account]), + undelegate_action: UndelegateType::Standalone, + }), + ..Default::default() + }; + + ScheduledIntentBundle { + id: intent_id(), + slot: remote_slot, + blockhash: Default::default(), + sent_transaction: Default::default(), + payer: Default::default(), + intent_bundle, + } +} + +fn intent_id() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_nanos() + .try_into() + .unwrap_or(u64::MAX) +} + +/// Shared assertion that `bundle` is a standalone commit-and-undelegate for a +/// single `(pubkey, owner, slot)` account. Used by the builder and stub tests. +#[cfg(test)] +pub(crate) fn assert_undelegation_bundle( + bundle: &ScheduledIntentBundle, + pubkey: Pubkey, + owner: Pubkey, + slot: u64, +) { + assert_eq!(bundle.slot, slot); + assert!(bundle.intent_bundle.commit.is_none()); + let commit_and_undelegate = bundle + .intent_bundle + .commit_and_undelegate + .as_ref() + .expect("commit-and-undelegate intent should be present"); + assert!(matches!( + commit_and_undelegate.undelegate_action, + UndelegateType::Standalone + )); + let CommitType::Standalone(accounts) = &commit_and_undelegate.commit_action + else { + panic!("commit action should be standalone"); + }; + assert_eq!(accounts.len(), 1); + assert_eq!(accounts[0].pubkey, pubkey); + assert_eq!(accounts[0].remote_slot, slot); + assert_eq!(accounts[0].account.owner, owner); +} + fn pending_rows_to_scheduled_intent_bundles( rows: Vec, payer: Pubkey, @@ -448,6 +523,18 @@ mod tests { row } + #[test] + fn undelegation_intent_bundle_builds_commit_and_undelegate() { + let pubkey = Pubkey::new_unique(); + let owner = Pubkey::new_unique(); + let mut account = AccountSharedData::new(1_000, 4, &owner); + account.set_remote_slot(42); + + let bundle = undelegation_intent_bundle(pubkey, account); + + assert_undelegation_bundle(&bundle, pubkey, owner, 42); + } + #[test] fn pending_rows_reconstruct_commit_finalize_bundle() { let payer = Pubkey::new_unique(); diff --git a/magicblock-committor-service/src/service.rs b/magicblock-committor-service/src/service.rs index b1415b8bc..21959db12 100644 --- a/magicblock-committor-service/src/service.rs +++ b/magicblock-committor-service/src/service.rs @@ -7,6 +7,7 @@ use std::{ use magicblock_core::traits::ActionsCallbackScheduler; use magicblock_program::magic_scheduled_base_intent::ScheduledIntentBundle; +use solana_account::AccountSharedData; use solana_keypair::Keypair; use solana_pubkey::Pubkey; use solana_signature::Signature; @@ -64,6 +65,11 @@ pub enum CommittorMessage { intent_bundles: Vec, respond_to: oneshot::Sender>, }, + ScheduleUndelegation { + pubkey: Pubkey, + account: AccountSharedData, + respond_to: oneshot::Sender>, + }, GetPendingIntentBundles { respond_to: oneshot::Sender>>, @@ -207,6 +213,17 @@ impl CommittorActor { error!(message_type = "ScheduleBaseIntents", error = ?e, "Failed to send response"); } } + ScheduleUndelegation { + pubkey, + account, + respond_to, + } => { + let result = + self.processor.schedule_undelegation(pubkey, account).await; + if let Err(e) = respond_to.send(result) { + error!(message_type = "ScheduleUndelegation", error = ?e, "Failed to send response"); + } + } GetPendingIntentBundles { respond_to } => { let pending_intents = self.processor.pending_intent_bundles().await; @@ -502,6 +519,20 @@ impl BaseIntentCommittor for CommittorService { rx } + fn schedule_undelegation( + &self, + pubkey: Pubkey, + account: AccountSharedData, + ) -> oneshot::Receiver> { + let (tx, rx) = oneshot::channel(); + self.try_send(CommittorMessage::ScheduleUndelegation { + pubkey, + account, + respond_to: tx, + }); + rx + } + fn get_commit_statuses( &self, message_id: u64, @@ -591,6 +622,13 @@ pub trait BaseIntentCommittor: Send + Sync + 'static { intent_bundles: Vec, ) -> oneshot::Receiver>; + /// Schedules commit-and-undelegate for a single delegated account. + fn schedule_undelegation( + &self, + pubkey: Pubkey, + account: AccountSharedData, + ) -> oneshot::Receiver>; + /// Subscribes for results of BaseIntent execution fn subscribe_for_results( &self, diff --git a/magicblock-committor-service/src/service_ext.rs b/magicblock-committor-service/src/service_ext.rs index 024d0423e..ea067da81 100644 --- a/magicblock-committor-service/src/service_ext.rs +++ b/magicblock-committor-service/src/service_ext.rs @@ -8,6 +8,7 @@ use std::{ use async_trait::async_trait; use futures_util::future::join_all; use magicblock_program::magic_scheduled_base_intent::ScheduledIntentBundle; +use solana_account::AccountSharedData; use solana_pubkey::Pubkey; use solana_signature::Signature; use solana_transaction_status_client_types::EncodedConfirmedTransactionWithStatusMeta; @@ -175,6 +176,14 @@ impl BaseIntentCommittor for CommittorServiceExt { self.inner.schedule_intent_bundles(intent_bundles) } + fn schedule_undelegation( + &self, + pubkey: Pubkey, + account: AccountSharedData, + ) -> oneshot::Receiver> { + self.inner.schedule_undelegation(pubkey, account) + } + fn subscribe_for_results( &self, ) -> oneshot::Receiver> diff --git a/magicblock-committor-service/src/stubs/changeset_committor_stub.rs b/magicblock-committor-service/src/stubs/changeset_committor_stub.rs index 0b417db47..2c63c7920 100644 --- a/magicblock-committor-service/src/stubs/changeset_committor_stub.rs +++ b/magicblock-committor-service/src/stubs/changeset_committor_stub.rs @@ -8,7 +8,7 @@ use async_trait::async_trait; use magicblock_program::magic_scheduled_base_intent::{ CommitType, ScheduledIntentBundle, UndelegateType, }; -use solana_account::Account; +use solana_account::{Account, AccountSharedData}; use solana_pubkey::Pubkey; use solana_signature::Signature; use solana_transaction_status_client_types::{ @@ -19,6 +19,7 @@ use tokio::sync::{broadcast, oneshot}; use tokio_util::sync::{CancellationToken, WaitForCancellationFutureOwned}; use crate::{ + committor_processor::undelegation_intent_bundle, error::CommittorServiceResult, intent_execution_manager::BroadcastedIntentExecutionResult, intent_executor::ExecutionOutput, @@ -100,6 +101,16 @@ impl BaseIntentCommittor for ChangesetCommittorStub { receiver } + fn schedule_undelegation( + &self, + pubkey: Pubkey, + account: AccountSharedData, + ) -> oneshot::Receiver> { + self.schedule_intent_bundles(vec![undelegation_intent_bundle( + pubkey, account, + )]) + } + fn subscribe_for_results( &self, ) -> oneshot::Receiver> @@ -297,3 +308,31 @@ fn now() -> u64 { .expect("Time went backwards") .as_secs() } + +#[cfg(test)] +mod tests { + use super::*; + use crate::committor_processor::assert_undelegation_bundle; + + #[tokio::test] + async fn schedule_undelegation_exposes_commit_and_undelegate_api() { + let committor = ChangesetCommittorStub::default(); + let pubkey = Pubkey::new_unique(); + let owner = Pubkey::new_unique(); + let mut account = AccountSharedData::new(1_000, 4, &owner); + account.set_remote_slot(42); + + committor + .schedule_undelegation(pubkey, account.clone()) + .await + .expect("committor response channel should be open") + .expect("undelegation should be scheduled"); + + assert_eq!(committor.len(), 1); + assert_eq!(committor.committed(&pubkey), Some(account.into())); + + let changesets = committor.committed_changesets.lock().unwrap(); + let intent = changesets.values().next().expect("scheduled intent"); + assert_undelegation_bundle(intent, pubkey, owner, 42); + } +} diff --git a/test-integration/Cargo.lock b/test-integration/Cargo.lock index d98545855..07d2b22ac 100644 --- a/test-integration/Cargo.lock +++ b/test-integration/Cargo.lock @@ -4237,7 +4237,7 @@ dependencies = [ "flate2", "lmdb-rkv", "magicblock-config", - "magicblock-magic-program-api 0.11.4", + "magicblock-magic-program-api 0.12.0", "memmap2 0.9.9", "parking_lot", "reflink-copy", @@ -4318,6 +4318,7 @@ version = "0.12.0" dependencies = [ "agave-feature-set", "anyhow", + "async-trait", "borsh", "fd-lock", "magic-domain-program", diff --git a/test-integration/test-chainlink/src/ixtest_context.rs b/test-integration/test-chainlink/src/ixtest_context.rs index 76744f288..4f6c41b61 100644 --- a/test-integration/test-chainlink/src/ixtest_context.rs +++ b/test-integration/test-chainlink/src/ixtest_context.rs @@ -130,6 +130,7 @@ impl IxtestContext { rx, None, None, + None, )), Some(provider), ) diff --git a/test-integration/test-chainlink/src/test_context.rs b/test-integration/test-chainlink/src/test_context.rs index cd6485f94..4966a8d43 100644 --- a/test-integration/test-chainlink/src/test_context.rs +++ b/test-integration/test-chainlink/src/test_context.rs @@ -104,6 +104,7 @@ impl TestContext { rx, None, None, + None, )), Some(provider), )