Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ All notable changes to this project will be documented in this file.
- SDK
- Deserialize `agent_version` and `agent_commit` from device latency samples in Go, TypeScript, and Python SDKs
- Add `BGPStatus` type (Unknown/Up/Down) and `SetUserBGPStatus` executor instruction to the Go serviceability SDK
- Sentinel
- Improve `find-validator-multicast-publishers` and `create-validator-multicast-publishers` with multi-value `--client` filter, `--ip` filter, nearest-device selection, dynamic capacity re-evaluation, and a fix for multicast publisher owner being set to the sentinel payer instead of the validator's owner
- Smartcontract
- Add `agent_version` (`[u8; 16]`) and `agent_commit` (`[u8; 8]`) fields to `DeviceLatencySamplesHeader`, carved from the existing reserved region; accept both fields in the `InitializeDeviceLatencySamples` instruction via incremental deserialization (fully backward compatible)
- Implement `SetUserBGPStatus` processor: validates metrics publisher authorization, updates `bgp_status`, `last_bgp_reported_at`, and `last_bgp_up_at` fields on the user account
Expand Down
331 changes: 277 additions & 54 deletions controlplane/doublezero-admin/src/cli/sentinel.rs

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions crates/sentinel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ async-trait.workspace = true
backon.workspace = true
borsh.workspace = true
clap.workspace = true
doublezero-config.workspace = true
doublezero_sdk.workspace = true
doublezero-serviceability.workspace = true
doublezero-telemetry.workspace = true
metrics.workspace = true
metrics-exporter-prometheus.workspace = true
reqwest.workspace = true
Expand All @@ -37,6 +39,9 @@ url.workspace = true
[dev-dependencies]
mockall.workspace = true

[lib]
path = "src/lib.rs"

[[bin]]
name = "doublezero-sentinel"
path = "src/main.rs"
181 changes: 180 additions & 1 deletion crates/sentinel/src/dz_ledger_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ use std::{collections::HashMap, net::Ipv4Addr};

use anyhow::{bail, Context, Result};
use doublezero_sdk::{
AccountData, AccountType, DeviceStatus, MulticastGroupStatus, UserStatus, UserType,
AccountData, AccountType, DeviceStatus, LocationStatus, MulticastGroupStatus, UserStatus,
UserType,
};
use doublezero_telemetry::state::device_latency_samples::DeviceLatencySamples;
use solana_account_decoder::UiAccountEncoding;
use solana_client::{
nonblocking::rpc_client::RpcClient as NonblockingRpcClient,
Expand Down Expand Up @@ -33,6 +35,20 @@ pub struct DzLedgerCodes {
pub device_codes: HashMap<Pubkey, String>,
}

/// Device info used for nearest-device calculations.
#[derive(Debug, Clone)]
pub struct DzDeviceInfo {
pub pk: Pubkey,
pub code: String,
pub lat: f64,
pub lng: f64,
pub users_count: u16,
pub max_users: u16,
pub reserved_seats: u16,
pub multicast_publishers_count: u16,
pub max_multicast_publishers: u16,
}

// ---------------------------------------------------------------------------
// Trait
// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -186,6 +202,169 @@ pub fn fetch_device_codes(client: &RpcClient, program_id: &Pubkey) -> Result<DzL
Ok(DzLedgerCodes { device_codes })
}

/// Fetch activated devices with their location coordinates.
///
/// Makes two RPC calls: one for all Device accounts, one for all Location accounts.
pub fn fetch_device_infos(
client: &RpcClient,
program_id: &Pubkey,
) -> Result<HashMap<Pubkey, DzDeviceInfo>> {
// Fetch all Device accounts.
let device_accounts = client
.get_program_accounts_with_config(
program_id,
RpcProgramAccountsConfig {
filters: Some(vec![RpcFilterType::Memcmp(Memcmp::new_raw_bytes(
0,
vec![AccountType::Device as u8],
))]),
account_config: RpcAccountInfoConfig {
encoding: Some(UiAccountEncoding::Base64),
..Default::default()
},
..Default::default()
},
)
.context("failed to fetch Device accounts from DZ Ledger")?;

// Collect activated devices and their location pubkeys.
let mut devices: Vec<(Pubkey, doublezero_sdk::Device)> = Vec::new();
let mut location_pks: Vec<Pubkey> = Vec::new();
for (pk, account) in &device_accounts {
let Ok(ad) = AccountData::try_from(account.data.as_slice()) else {
continue;
};
let Ok(device) = ad.get_device() else {
continue;
};
if device.status == DeviceStatus::Activated {
location_pks.push(device.location_pk);
devices.push((*pk, device));
}
}

if devices.is_empty() {
return Ok(HashMap::new());
}

// Fetch all Location accounts in one batch call.
let location_accounts = client
.get_multiple_accounts(&location_pks)
.context("failed to fetch Location accounts from DZ Ledger")?;

// Build location_pk → (lat, lng) map.
let mut location_coords: HashMap<Pubkey, (f64, f64)> = HashMap::new();
for (pk, maybe_account) in location_pks.iter().zip(location_accounts.iter()) {
let Some(account) = maybe_account else {
continue;
};
let Ok(ad) = AccountData::try_from(account.data.as_slice()) else {
continue;
};
let Ok(loc) = ad.get_location() else {
continue;
};
if loc.status == LocationStatus::Activated {
location_coords.insert(*pk, (loc.lat, loc.lng));
}
}

// Join devices with their location coordinates.
let mut infos = HashMap::new();
for (pk, device) in devices {
let (lat, lng) = location_coords
.get(&device.location_pk)
.copied()
.unwrap_or((0.0, 0.0));
infos.insert(
pk,
DzDeviceInfo {
pk,
code: device.code.clone(),
lat,
lng,
users_count: device.users_count,
max_users: device.max_users,
reserved_seats: device.reserved_seats,
multicast_publishers_count: device.multicast_publishers_count,
max_multicast_publishers: device.max_multicast_publishers,
},
);
}

Ok(infos)
}

/// Fetch a map of min-latency (in microseconds) between device pairs for the current epoch.
///
/// Returns `HashMap<(origin_device_pk, target_device_pk), min_latency_us>`.
/// The map is derived from `DeviceLatencySamples` onchain telemetry accounts.
pub fn fetch_device_latency_map(
client: &RpcClient,
telemetry_program_id: &Pubkey,
) -> Result<HashMap<(Pubkey, Pubkey), f64>> {
const DEVICE_LATENCY_SAMPLES_ACCOUNT_TYPE: u8 = 3;

let epoch_info = client
.get_epoch_info()
.context("failed to fetch epoch info")?;
let epoch = epoch_info.epoch;

let accounts = client
.get_program_accounts_with_config(
telemetry_program_id,
RpcProgramAccountsConfig {
filters: Some(vec![
RpcFilterType::Memcmp(Memcmp::new_raw_bytes(
0,
vec![DEVICE_LATENCY_SAMPLES_ACCOUNT_TYPE],
)),
RpcFilterType::Memcmp(Memcmp::new_raw_bytes(1, epoch.to_le_bytes().to_vec())),
]),
account_config: RpcAccountInfoConfig {
encoding: Some(UiAccountEncoding::Base64),
..Default::default()
},
..Default::default()
},
)
.context("failed to fetch DeviceLatencySamples accounts")?;

let mut map: HashMap<(Pubkey, Pubkey), f64> = HashMap::new();

for (_pk, account) in accounts {
let Ok(samples) = DeviceLatencySamples::try_from(account.data.as_slice()) else {
continue;
};
if samples.samples.is_empty() {
continue;
}
let origin = samples.header.origin_device_pk;
let target = samples.header.target_device_pk;
// Zero RTT indicates packet loss — exclude from min calculation.
// If all samples are zero (total loss), skip this account entirely.
let Some(min_us) = samples
.samples
.iter()
.copied()
.filter(|&s| s > 0)
.min()
.map(|v| v as f64)
else {
continue;
};
map.entry((origin, target))
.and_modify(|e| {
if min_us < *e {
*e = min_us;
}
})
.or_insert(min_us);
}

Ok(map)
}

/// Resolve a multicast group key-or-code to a pubkey.
pub async fn resolve_multicast_group(
key_or_code: &str,
Expand Down
20 changes: 15 additions & 5 deletions crates/sentinel/src/dz_ledger_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,17 @@ pub struct CreateMulticastPublisherInstructions {
}

/// Build the three instructions needed to create a multicast publisher for a user.
///
/// `payer` signs and pays for all transactions. `owner` is the validator's owner pubkey;
/// the access pass and user account are created under this owner, not the payer.
pub fn build_create_multicast_publisher_instructions(
program_id: &Pubkey,
payer: &Pubkey,
owner: &Pubkey,
multicast_group_pk: &Pubkey,
user: &DzUser,
) -> Result<CreateMulticastPublisherInstructions> {
let (accesspass_pda, _) = get_accesspass_pda(program_id, &user.client_ip, payer);
let (accesspass_pda, _) = get_accesspass_pda(program_id, &user.client_ip, owner);
let (globalstate_pda, _) = get_globalstate_pda(program_id);

// Step 1: set_access_pass
Expand All @@ -48,7 +52,7 @@ pub fn build_create_multicast_publisher_instructions(
vec![
AccountMeta::new(accesspass_pda, false),
AccountMeta::new_readonly(globalstate_pda, false),
AccountMeta::new(*payer, false),
AccountMeta::new(*owner, false),
AccountMeta::new(*payer, true),
AccountMeta::new_readonly(solana_sdk::system_program::ID, false),
],
Expand All @@ -59,7 +63,7 @@ pub fn build_create_multicast_publisher_instructions(
program_id,
DoubleZeroInstruction::AddMulticastGroupPubAllowlist(AddMulticastGroupPubAllowlistArgs {
client_ip: user.client_ip,
user_payer: *payer,
user_payer: *owner,
}),
vec![
AccountMeta::new(*multicast_group_pk, false),
Expand All @@ -82,7 +86,7 @@ pub fn build_create_multicast_publisher_instructions(
subscriber: false,
tunnel_endpoint: std::net::Ipv4Addr::UNSPECIFIED,
dz_prefix_count: 0,
owner: Pubkey::default(),
owner: *owner,
}),
vec![
AccountMeta::new(user_pda, false),
Expand Down Expand Up @@ -137,9 +141,12 @@ mod tests {
publishers: vec![],
};

let owner = Pubkey::new_unique();

let ixs = build_create_multicast_publisher_instructions(
&program_id,
&payer,
&owner,
&multicast_group,
&user,
)
Expand All @@ -155,7 +162,7 @@ mod tests {
assert!(!ixs.add_allowlist.data.is_empty());
assert!(!ixs.create_user.data.is_empty());

// set_access_pass: 5 accounts (accesspass_pda, globalstate, payer×2, system_program)
// set_access_pass: 5 accounts (accesspass_pda, globalstate, owner, payer, system_program)
assert_eq!(ixs.set_access_pass.accounts.len(), 5);
// add_allowlist: 5 accounts (multicast_group, accesspass_pda, globalstate, payer, system_program)
assert_eq!(ixs.add_allowlist.accounts.len(), 5);
Expand All @@ -177,9 +184,12 @@ mod tests {
publishers: vec![],
};

let owner = Pubkey::new_unique();

let ixs = build_create_multicast_publisher_instructions(
&program_id,
&payer,
&owner,
&multicast_group,
&user,
)
Expand Down
1 change: 1 addition & 0 deletions crates/sentinel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod error;
pub mod multicast_create;
pub mod multicast_find;
pub mod multicast_publisher;
pub mod nearest_device;
pub mod output;
pub mod settings;
pub mod validator_metadata_reader;
Loading
Loading