Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions magicblock-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ edition.workspace = true

[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
borsh = "1.5.3"
fd-lock = { workspace = true }
tracing = { workspace = true }
Expand Down
35 changes: 33 additions & 2 deletions magicblock-api/src/magic_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ use magicblock_aperture::{
state::{NodeContext, SharedState},
};
use magicblock_chainlink::{
config::ChainlinkConfig, remote_account_provider::Endpoints, ProdChainlink,
ProdInnerChainlink,
config::ChainlinkConfig,
fetch_cloner::{UndelegationScheduleRequest, UndelegationScheduler},
remote_account_provider::Endpoints,
ProdChainlink, ProdInnerChainlink,
};
use magicblock_committor_service::{
config::ChainConfig, BaseIntentCommittor, CommittorService,
Expand Down Expand Up @@ -99,6 +101,29 @@ use crate::{
type InnerChainlinkImpl = ProdInnerChainlink<ChainlinkCloner>;

type ChainlinkImpl = ProdChainlink<ChainlinkCloner>;
/// Bridges chainlink's [`UndelegationScheduler`] to the committor service so a
/// delegated clone rejected by AML is undelegated on the base layer.
struct CommittorUndelegationScheduler(Arc<CommittorService>);

#[async_trait::async_trait]
impl UndelegationScheduler for CommittorUndelegationScheduler {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

magic_validator.rs is already massive, let's extract this into separate file

async fn schedule_undelegation(
&self,
request: UndelegationScheduleRequest,
) -> magicblock_chainlink::errors::ChainlinkResult<()> {
let pubkey = request.pubkey;
self.0
.schedule_undelegation(pubkey, request.account)
.await
.map_err(|err| format!("committor response channel closed: {err}"))
.and_then(|result| result.map_err(|err| err.to_string()))
.map_err(|message| {
Comment on lines +115 to +120

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Add a timeout around committor scheduling await.

Line 115 awaits the committor response without a deadline. If that actor stalls, this task can hang indefinitely and back up AML-rejection handling. Bound this external call with a timeout and map timeout to the same chainlink error variant.

Suggested fix
         let pubkey = request.pubkey;
-        self.0
-            .schedule_undelegation(pubkey, request.account)
-            .await
+        tokio::time::timeout(
+            Duration::from_secs(5),
+            self.0.schedule_undelegation(pubkey, request.account),
+        )
+        .await
+            .map_err(|_| "committor schedule_undelegation timed out".to_string())?
             .map_err(|err| format!("committor response channel closed: {err}"))
             .and_then(|result| result.map_err(|err| err.to_string()))
             .map_err(|message| {
                 magicblock_chainlink::errors::ChainlinkError::FailedToScheduleUndelegationAfterAmlRejection(
                     pubkey, message,
                 )
             })
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@magicblock-api/src/magic_validator.rs` around lines 115 - 120, Wrap the await
on self.0.schedule_undelegation(pubkey, request.account).await in a tokio
timeout (e.g., tokio::time::timeout(Duration::from_secs(...), ...)) so the call
cannot hang indefinitely; after awaiting the timeout, map a timeout error to the
same chainlink/committor error variant currently produced (the "committor
response channel closed" mapping) so both channel-closure and timeout produce
the same error path, and then continue to .and_then(|result| ...) as before;
update imports if necessary to include tokio::time::timeout and Duration and
ensure the timeout mapping converts into a String consistent with the existing
.map_err(|message| ...) flow.

magicblock_chainlink::errors::ChainlinkError::FailedToScheduleUndelegationAfterAmlRejection(
pubkey, message,
)
})
}
}

// -----------------
// MagicValidator
Expand Down Expand Up @@ -238,6 +263,7 @@ impl MagicValidator {
&ledger.latest_block().clone(),
&accountsdb,
shared_chain_slot.clone(),
committor_service.clone(),
)
.await?,
);
Expand Down Expand Up @@ -495,6 +521,7 @@ impl MagicValidator {
latest_block: &LatestBlock,
accountsdb: &Arc<AccountsDb>,
chain_slot: Option<Arc<AtomicU64>>,
committor_service: Option<Arc<CommittorService>>,
) -> ApiResult<ChainlinkImpl> {
if Self::replication_mode_uses_disabled_chainlink(
&config.validator.replication_mode,
Expand Down Expand Up @@ -550,6 +577,10 @@ impl MagicValidator {
&config.chainlink,
config.storage.as_path(),
chain_slot.unwrap_or_default(),
committor_service.map(|committor_service| {
Arc::new(CommittorUndelegationScheduler(committor_service))
as Arc<dyn UndelegationScheduler>
}),
)
.await?;

Expand Down
1 change: 1 addition & 0 deletions magicblock-chainlink/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ url = { workspace = true }
[dev-dependencies]
assert_matches = { workspace = true }
magicblock-chainlink = { path = ".", features = ["dev-context"] }
tempfile = { workspace = true }

[features]
default = []
Expand Down
5 changes: 5 additions & 0 deletions magicblock-chainlink/src/chainlink/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ pub enum ChainlinkError {
#[error("Failed to perform Range risk check: {0}")]
RangeRisk(#[from] RiskError),

#[error(
"Failed to schedule undelegation for {0} after AML rejection: {1}"
)]
FailedToScheduleUndelegationAfterAmlRejection(Pubkey, String),

#[error("Chainlink is disabled for non-primary mode")]
DisabledForNonPrimaryMode,
}
70 changes: 64 additions & 6 deletions magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{
time::Duration,
};

use async_trait::async_trait;
use dlp_api::{
pda::delegation_record_pda_from_delegated_account, state::DelegationRecord,
};
Expand Down Expand Up @@ -85,6 +86,23 @@ use crate::{
},
};

#[derive(Clone)]
pub struct UndelegationScheduleRequest {
pub pubkey: Pubkey,
pub account: AccountSharedData,
}

/// Schedules an undelegation when a delegated clone is rejected by AML before
/// it enters the local bank. Implemented outside chainlink (e.g. by the API
/// layer bridging to the committor service) to keep chainlink decoupled.
#[async_trait]
pub trait UndelegationScheduler: Send + Sync {
async fn schedule_undelegation(
&self,
request: UndelegationScheduleRequest,
) -> ChainlinkResult<()>;
}

pub struct FetchCloner<T, U, V, C>
where
T: ChainRpcClient,
Expand Down Expand Up @@ -138,6 +156,10 @@ where

/// Risk checker for post-delegation action addresses.
risk_service: Option<Arc<RiskService>>,

/// Schedules undelegation when post-delegation action AML checks reject
/// a delegated clone before it enters the local bank.
undelegation_scheduler: Option<Arc<dyn UndelegationScheduler>>,
}

/// Negative-cache capacity for known-empty eATAs.
Expand Down Expand Up @@ -177,6 +199,7 @@ where
.pending_operation_timeout_ms
.clone(),
risk_service: self.risk_service.clone(),
undelegation_scheduler: self.undelegation_scheduler.clone(),
}
}
}
Expand All @@ -198,6 +221,7 @@ where
subscription_updates_rx: mpsc::Receiver<ForwardedSubscriptionUpdate>,
allowed_programs: Option<Vec<AllowedProgram>>,
risk_service: Option<Arc<RiskService>>,
undelegation_scheduler: Option<Arc<dyn UndelegationScheduler>>,
) -> Arc<Self> {
let validator_pubkey = validator_keypair.pubkey();
let blacklisted_accounts = blacklisted_accounts(&validator_pubkey);
Expand Down Expand Up @@ -225,6 +249,7 @@ where
FETCH_CLONE_OPERATION_TIMEOUT.as_millis() as u64,
)),
risk_service,
undelegation_scheduler,
});

let accounts_bank_for_eviction = accounts_bank.clone();
Expand Down Expand Up @@ -510,16 +535,49 @@ where
));
}

self.ensure_delegation_action_dependencies(
request.pubkey,
request.account.remote_slot(),
&request.delegation_actions,
)
.await?;
if let Err(err) = self
.ensure_delegation_action_dependencies(
request.pubkey,
request.account.remote_slot(),
&request.delegation_actions,
)
.await
{
if matches!(
err,
ChainlinkError::RangeRisk(
magicblock_aml::RiskError::HighRiskAddresses(_)
)
) {
self.schedule_undelegation_after_aml_rejection(&request)
.await?;
}
return Err(err);
}

Ok(self.clone_account_with_ownership(request).await?)
}

async fn schedule_undelegation_after_aml_rejection(
&self,
request: &AccountCloneRequest,
) -> ChainlinkResult<()> {
let Some(scheduler) = self.undelegation_scheduler.as_ref() else {
warn!(
pubkey = %request.pubkey,
"AML rejected post-delegation actions but undelegation scheduler is unavailable"
);
return Ok(());
};

scheduler
.schedule_undelegation(UndelegationScheduleRequest {
pubkey: request.pubkey,
account: request.account.clone(),
})
.await
}

fn normalize_unresolved_dlp_clone_request(
&self,
request: &mut AccountCloneRequest,
Expand Down
Loading
Loading