diff --git a/round-based/Cargo.toml b/round-based/Cargo.toml index 7ee4949..19f8f43 100644 --- a/round-based/Cargo.toml +++ b/round-based/Cargo.toml @@ -36,7 +36,7 @@ udigest = { version = "0.2", default-features = false, features = ["alloc", "dig trybuild = "1" matches = "0.1" futures = { version = "0.3", default-features = false } -tokio = { version = "1", features = ["macros"] } +tokio = { version = "1", features = ["macros", "time", "rt"] } hex = "0.4" rand = "0.8" @@ -48,14 +48,20 @@ sha2 = "0.10" [features] default = [] +perf-profiler = ["std"] state-machine = [] sim = ["state-machine"] sim-async = ["sim", "tokio/sync", "tokio-stream", "futures-util/alloc"] derive = ["round-based-derive"] runtime-tokio = ["tokio"] +std = [] echo-broadcast = ["dep:digest", "dep:udigest"] [[test]] name = "derive" required-features = ["derive"] + +[[test]] +name = "perf_test" +required-features = ["perf-profiler"] diff --git a/round-based/src/lib.rs b/round-based/src/lib.rs index b6859b0..67d2554 100644 --- a/round-based/src/lib.rs +++ b/round-based/src/lib.rs @@ -125,12 +125,14 @@ //! ## Join us in Discord! //! Feel free to reach out to us [in Discord](https://discordapp.com/channels/905194001349627914/1285268686147424388)! +#![no_std] #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg, doc_cfg_hide))] #![warn(unused_crate_dependencies, missing_docs)] #![allow(async_fn_in_trait)] -#![no_std] extern crate alloc; +#[cfg(feature = "std")] +extern crate std; /// Fixes false-positive of `unused_crate_dependencies` lint that only occur in the tests #[cfg(test)] diff --git a/round-based/src/mpc/mod.rs b/round-based/src/mpc/mod.rs index 2eda512..a827fce 100644 --- a/round-based/src/mpc/mod.rs +++ b/round-based/src/mpc/mod.rs @@ -346,3 +346,6 @@ where { MpcParty::connected_halves(incomings, outgoings) } + +#[cfg(feature = "perf-profiler")] +pub mod profiler; diff --git a/round-based/src/mpc/profiler/mod.rs b/round-based/src/mpc/profiler/mod.rs new file mode 100644 index 0000000..497ccd2 --- /dev/null +++ b/round-based/src/mpc/profiler/mod.rs @@ -0,0 +1,8 @@ +//! Performance profiler for MPC execution. + +/// Performance reporting. +pub mod profiling; +/// Statistics aggregation. +pub mod stats; +/// Profiler wrapper. +pub mod wrapper; diff --git a/round-based/src/mpc/profiler/profiling.rs b/round-based/src/mpc/profiler/profiling.rs new file mode 100644 index 0000000..2776490 --- /dev/null +++ b/round-based/src/mpc/profiler/profiling.rs @@ -0,0 +1,207 @@ +use std::fmt; +use std::time::{Duration, Instant}; +use std::vec::Vec; + +/// An event captured during MPC execution. +#[derive(Debug, Clone)] +pub enum Event { + /// Sent a message. + SendMsg { + /// Number of the round. + round: u16, + /// Time when `.send().await` was called. + started: Instant, + /// Time when `.send().await` has returned. + finished: Instant, + }, + /// Received messages (completed a round). + RecvMsgs { + /// Number of the round. + round: u16, + /// Time when `.complete().await` was called. + started: Instant, + /// Time when `.complete().await` has returned. + finished: Instant, + }, + /// Yielded to the scheduler. + Yielded { + /// Time when `.yield_now().await` was called. + started: Instant, + /// Time when `.yield_now().await` has returned. + finished: Instant, + }, +} + +impl Event { + /// Returns the round number associated with the event. + pub fn round(&self) -> u16 { + match self { + Event::SendMsg { round, .. } => *round, + Event::RecvMsgs { round, .. } => *round, + Event::Yielded { .. } => 0, // Global + } + } + + /// Returns the duration of the event. + pub fn duration(&self) -> Duration { + match self { + Event::SendMsg { + started, finished, .. + } => *finished - *started, + Event::RecvMsgs { + started, finished, .. + } => *finished - *started, + Event::Yielded { + started, finished, .. + } => *finished - *started, + } + } +} + +/// Statistics for a single round of an MPC protocol. +#[derive(Debug, Clone, Default)] +pub struct RoundStats { + /// Number of the round. + pub round: usize, + /// Time spent on computation during this round. + pub computation_time: Duration, + /// Time spent on sending messages during this round. + pub sent_io_time: Duration, + /// Time spent on receiving messages during this round. + pub recv_io_time: Duration, + /// Time spent on waiting for the scheduler (yield_now). + pub yield_time: Duration, +} + +/// A full performance report for a single protocol execution. +#[derive(Debug, Clone, Default)] +pub struct PerfReport { + /// Statistics for each round. + pub rounds: Vec, +} + +impl PerfReport { + /// Builds a report from a sequence of events. + pub fn from_events(start_time: Instant, end_time: Instant, events: Vec) -> Self { + let mut report = Self::default(); + let mut last_finished = start_time; + + for event in events { + // Computation is the gap since the last event finished + let computation = event.started().duration_since(last_finished); + let round = event.round() as usize; + + let (sent, recv, yielded) = match &event { + Event::SendMsg { .. } => (event.duration(), Duration::ZERO, Duration::ZERO), + Event::RecvMsgs { .. } => (Duration::ZERO, event.duration(), Duration::ZERO), + Event::Yielded { .. } => (Duration::ZERO, Duration::ZERO, event.duration()), + }; + + report.apply_stats(round, computation, sent, recv, yielded); + last_finished = event.finished(); + } + + // Add trailing computation + if end_time > last_finished { + report.apply_stats( + 0, + end_time - last_finished, + Duration::ZERO, + Duration::ZERO, + Duration::ZERO, + ); + } + + report + } + + fn apply_stats( + &mut self, + round: usize, + computation: Duration, + sent_io: Duration, + recv_io: Duration, + yield_time: Duration, + ) { + if let Some(existing) = self.rounds.iter_mut().find(|r| r.round == round) { + existing.computation_time += computation; + existing.sent_io_time += sent_io; + existing.recv_io_time += recv_io; + existing.yield_time += yield_time; + return; + } + self.rounds.push(RoundStats { + round, + computation_time: computation, + sent_io_time: sent_io, + recv_io_time: recv_io, + yield_time, + }); + } + + /// Calculates the total computation time across all rounds. + pub fn total_computation(&self) -> Duration { + self.rounds.iter().map(|r| r.computation_time).sum() + } + /// Calculates the total I/O time spent on sending across all rounds. + pub fn total_sent_io(&self) -> Duration { + self.rounds.iter().map(|r| r.sent_io_time).sum() + } + /// Calculates the total I/O time spent on receiving across all rounds. + pub fn total_recv_io(&self) -> Duration { + self.rounds.iter().map(|r| r.recv_io_time).sum() + } + /// Calculates the total I/O time spent on yielding across all rounds. + pub fn total_yield(&self) -> Duration { + self.rounds.iter().map(|r| r.yield_time).sum() + } + /// Calculates the total I/O time across all rounds (send + recv + yield). + pub fn total_io(&self) -> Duration { + self.total_sent_io() + self.total_recv_io() + self.total_yield() + } + /// Calculates the total execution time (computation + I/O). + pub fn total_time(&self) -> Duration { + self.total_computation() + self.total_io() + } +} + +impl Event { + fn started(&self) -> Instant { + match self { + Event::SendMsg { started, .. } => *started, + Event::RecvMsgs { started, .. } => *started, + Event::Yielded { started, .. } => *started, + } + } + fn finished(&self) -> Instant { + match self { + Event::SendMsg { finished, .. } => *finished, + Event::RecvMsgs { finished, .. } => *finished, + Event::Yielded { finished, .. } => *finished, + } + } +} + +impl fmt::Display for PerfReport { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + writeln!(f, "=== MPC Performance Report ===")?; + for stat in &self.rounds { + writeln!( + f, + "Round {}: Computation: {:?}, Sent I/O: {:?}, Recv I/O: {:?}, Yield: {:?}", + stat.round, + stat.computation_time, + stat.sent_io_time, + stat.recv_io_time, + stat.yield_time + )?; + } + writeln!(f, "------------------------------")?; + writeln!(f, "Total Computation: {:?}", self.total_computation())?; + writeln!(f, "Total Sent I/O: {:?}", self.total_sent_io())?; + writeln!(f, "Total Recv I/O: {:?}", self.total_recv_io())?; + writeln!(f, "Total Yield: {:?}", self.total_yield())?; + writeln!(f, "Total Time: {:?}", self.total_time())?; + Ok(()) + } +} diff --git a/round-based/src/mpc/profiler/stats.rs b/round-based/src/mpc/profiler/stats.rs new file mode 100644 index 0000000..63ca99b --- /dev/null +++ b/round-based/src/mpc/profiler/stats.rs @@ -0,0 +1,121 @@ +//! Statistics for MPC protocol execution. + +use super::profiling::PerfReport; +use std::string::{String, ToString}; +use std::time::Duration; +use std::vec::Vec; +use std::{format, println}; + +/// Aggregated statistics for a set of durations. +#[derive(Debug)] +pub struct AggregatedStats { + /// Name of the metric. + pub metric_name: String, + /// Mean duration. + pub mean: Duration, + /// Standard deviation of durations. + pub std_dev: Duration, + /// Median (50th percentile) duration. + pub p50: Duration, + /// 75th percentile duration. + pub p75: Duration, + /// 90th percentile duration. + pub p90: Duration, +} + +impl std::fmt::Display for AggregatedStats { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + // Table-like formatting: {:) -> AggregatedStats { + if durations.is_empty() { + return AggregatedStats { + metric_name: name.to_string(), + mean: Duration::ZERO, + std_dev: Duration::ZERO, + p50: Duration::ZERO, + p75: Duration::ZERO, + p90: Duration::ZERO, + }; + } + + durations.sort_unstable(); + let len = durations.len(); + + let sum: Duration = durations.iter().sum(); + let mean = sum / len as u32; + + let variance_secs = durations + .iter() + .map(|&d| { + let diff = d.abs_diff(mean); + diff.as_secs_f64().powi(2) + }) + .sum::() + / len as f64; + + let std_dev = Duration::from_secs_f64(variance_secs.sqrt()); + + let p50 = durations[(len as f64 * 0.50).floor() as usize]; + let p75 = durations[(len as f64 * 0.75).floor() as usize]; + let p90 = durations[(len as f64 * 0.90).floor() as usize]; + + AggregatedStats { + metric_name: name.to_string(), + mean, + std_dev, + p50, + p75, + p90, + } +} + +/// Helper to consume multiple reports and print aggregated analytics for all metrics. +pub fn analyze_reports(reports: &[PerfReport]) { + if reports.is_empty() { + println!("No reports to analyze."); + return; + } + + let mut total_times = Vec::with_capacity(reports.len()); + let mut comp_times = Vec::with_capacity(reports.len()); + let mut sent_io_times = Vec::with_capacity(reports.len()); + let mut recv_io_times = Vec::with_capacity(reports.len()); + let mut yield_times = Vec::with_capacity(reports.len()); + + for report in reports { + total_times.push(report.total_time()); + comp_times.push(report.total_computation()); + sent_io_times.push(report.total_sent_io()); + recv_io_times.push(report.total_recv_io()); + yield_times.push(report.total_yield()); + } + + println!("\n=== MPC Execution Analytics ({} runs) ===", reports.len()); + println!( + "{:<20} | {:<12} | {:<12} | {:<12} | {:<12} | {:<12}", + "Metric", "Mean", "Std Dev", "p50", "p75", "p90" + ); + println!("{}", "-".repeat(90)); + println!("{}", analyze_durations("Total Time", total_times)); + println!("{}", analyze_durations("Computation", comp_times)); + println!("{}", analyze_durations("Sent I/O", sent_io_times)); + println!("{}", analyze_durations("Recv I/O", recv_io_times)); + println!("{}", analyze_durations("Yield", yield_times)); + println!( + "========================================================================================\n" + ); +} diff --git a/round-based/src/mpc/profiler/wrapper.rs b/round-based/src/mpc/profiler/wrapper.rs new file mode 100644 index 0000000..5434143 --- /dev/null +++ b/round-based/src/mpc/profiler/wrapper.rs @@ -0,0 +1,208 @@ +use std::sync::{Arc, Mutex}; +use std::time::Instant; +use std::vec::Vec; + +use crate::{ + Mpc, MpcExecution, Outgoing, ProtocolMsg, RoundMsg, + mpc::SendMany, + round::{RoundInfo, RoundStore}, +}; + +use super::profiling::{Event, PerfReport}; + +/// A handle to the performance profiler that can be used to generate a report +/// even after the profiler itself has been consumed. +#[derive(Clone)] +pub struct PerfProfilerHandle { + events: Arc>>, + start_time: Instant, +} + +impl PerfProfilerHandle { + /// Consumes the handle and returns the performance report. + pub fn into_report(self) -> PerfReport { + let end_time = Instant::now(); + let events = self.events.lock().unwrap().clone(); + PerfReport::from_events(self.start_time, end_time, events) + } +} + +/// A wrapper around an MPC engine or execution that measures performance. +/// +/// It stores a sequence of events (I/O and Yield) and uses them to calculate performance stats. +pub struct PerfProfiler { + inner: M, + events: Arc>>, + start_time: Instant, +} + +impl PerfProfiler { + /// Creates a new performance profiler and a handle to retrieve the report. + pub fn new(inner: M) -> (Self, PerfProfilerHandle) { + let start_time = Instant::now(); + let events = Arc::new(Mutex::new(Vec::new())); + ( + Self { + inner, + events: events.clone(), + start_time, + }, + PerfProfilerHandle { events, start_time }, + ) + } + + /// Consumes the profiler and returns the performance report. + pub fn into_report(self) -> PerfReport { + let end_time = Instant::now(); + let events = self.events.lock().unwrap().clone(); + PerfReport::from_events(self.start_time, end_time, events) + } + + /// Returns a reference to the inner MPC execution. + pub fn get_ref(&self) -> &M { + &self.inner + } + + /// Returns a mutable reference to the inner MPC execution. + pub fn get_mut(&mut self) -> &mut M { + &mut self.inner + } + + /// Consumes the profiler and returns the inner MPC execution. + pub fn into_inner(self) -> M { + self.inner + } +} + +impl Mpc for PerfProfiler +where + M::Msg: ProtocolMsg, +{ + type Msg = M::Msg; + type Exec = PerfProfiler; + type SendErr = M::SendErr; + + fn add_round(&mut self, round: R) -> ::Round + where + R: RoundStore, + Self::Msg: RoundMsg, + { + self.inner.add_round(round) + } + + fn finish_setup(self) -> Self::Exec { + PerfProfiler { + inner: self.inner.finish_setup(), + events: self.events, + start_time: self.start_time, + } + } +} + +impl MpcExecution for PerfProfiler +where + M::Msg: ProtocolMsg, +{ + type Round = M::Round; + type Msg = M::Msg; + type CompleteRoundErr = M::CompleteRoundErr; + type SendErr = M::SendErr; + type SendMany = ProfilerSendMany; + + async fn complete( + &mut self, + round: Self::Round, + ) -> Result> + where + R: RoundInfo, + Self::Msg: RoundMsg, + { + let started = Instant::now(); + let result = self.inner.complete(round).await; + let finished = Instant::now(); + + let round_idx = >::ROUND; + self.events.lock().unwrap().push(Event::RecvMsgs { + round: round_idx, + started, + finished, + }); + + result + } + + async fn send(&mut self, msg: Outgoing) -> Result<(), Self::SendErr> { + let round_idx = msg.msg.round(); + let started = Instant::now(); + let result = self.inner.send(msg).await; + let finished = Instant::now(); + + self.events.lock().unwrap().push(Event::SendMsg { + round: round_idx, + started, + finished, + }); + + result + } + + fn send_many(self) -> Self::SendMany { + ProfilerSendMany { + inner: self.inner.send_many(), + events: self.events, + start_time: self.start_time, + } + } + + async fn yield_now(&self) { + let started = Instant::now(); + self.inner.yield_now().await; + let finished = Instant::now(); + + self.events + .lock() + .unwrap() + .push(Event::Yielded { started, finished }); + } +} + +/// A wrapper around [`SendMany`] that measures performance. +pub struct ProfilerSendMany { + inner: S, + events: Arc>>, + start_time: Instant, +} + +impl SendMany for ProfilerSendMany +where + S::Msg: ProtocolMsg, +{ + type Exec = PerfProfiler; + type Msg = S::Msg; + type SendErr = S::SendErr; + + async fn send(&mut self, msg: Outgoing) -> Result<(), S::SendErr> { + let round_idx = msg.msg.round(); + let started = Instant::now(); + let result = self.inner.send(msg).await; + let finished = Instant::now(); + + self.events.lock().unwrap().push(Event::SendMsg { + round: round_idx, + started, + finished, + }); + + result + } + + async fn flush(self) -> Result { + let result = self.inner.flush().await; + + Ok(PerfProfiler { + inner: result?, + events: self.events, + start_time: self.start_time, + }) + } +} diff --git a/round-based/tests/perf_test.rs b/round-based/tests/perf_test.rs new file mode 100644 index 0000000..a46b656 --- /dev/null +++ b/round-based/tests/perf_test.rs @@ -0,0 +1,220 @@ +#[cfg(feature = "perf-profiler")] +mod tests { + use core::cell::RefCell; + use round_based::{ + Mpc, MpcExecution, Outgoing, ProtocolMsg, RoundMsg, mpc::SendMany, mpc::profiler::stats, + mpc::profiler::wrapper::PerfProfiler, round::RoundInfo, + }; + use std::time::Duration; + + #[derive(Debug, Clone)] + struct ManualEvent; + + struct MockMpc { + manual_events: RefCell>, + } + + /// Random Beacon Messages + #[derive(Clone, Debug)] + enum RandomBeaconMsg { + Commit([u8; 32]), // Round 1 + Decommit, // Round 2 + } + + impl ProtocolMsg for RandomBeaconMsg { + fn round(&self) -> u16 { + match self { + RandomBeaconMsg::Commit(_) => 1, + RandomBeaconMsg::Decommit => 2, + } + } + } + + // Round 1 Marker + struct Round1; + impl RoundMsg<[u8; 32]> for RandomBeaconMsg { + const ROUND: u16 = 1; + fn to_protocol_msg(m: [u8; 32]) -> Self { + RandomBeaconMsg::Commit(m) + } + fn from_protocol_msg(msg: Self) -> Result<[u8; 32], Self> { + match msg { + RandomBeaconMsg::Commit(m) => Ok(m), + _ => Err(msg), + } + } + } + + // Round 2 Marker + struct Round2; + impl RoundMsg for RandomBeaconMsg { + const ROUND: u16 = 2; + fn to_protocol_msg(_m: u64) -> Self { + RandomBeaconMsg::Decommit + } + fn from_protocol_msg(msg: Self) -> Result { + match msg { + RandomBeaconMsg::Decommit => Ok(0), + _ => Err(msg), + } + } + } + + impl RoundInfo for Round1 { + type Msg = [u8; 32]; + type Output = Vec<[u8; 32]>; + type Error = core::convert::Infallible; + } + impl RoundInfo for Round2 { + type Msg = u64; + type Output = Vec; + type Error = core::convert::Infallible; + } + + impl Mpc for MockMpc { + type Msg = RandomBeaconMsg; + type Exec = MockMpc; + type SendErr = core::convert::Infallible; + fn add_round(&mut self, _round: R) -> ::Round + where + R: round_based::round::RoundStore, + Self::Msg: RoundMsg, + { + } + fn finish_setup(self) -> Self::Exec { + self + } + } + + impl MpcExecution for MockMpc { + type Round = (); + type Msg = RandomBeaconMsg; + type CompleteRoundErr = core::convert::Infallible; + type SendErr = core::convert::Infallible; + type SendMany = MockSendMany; + + async fn complete( + &mut self, + _round: Self::Round, + ) -> Result> + where + R: RoundInfo, + Self::Msg: RoundMsg, + { + tokio::time::sleep(Duration::from_millis(40)).await; + self.manual_events.borrow_mut().push(ManualEvent); + + let res = Vec::::new(); + let ptr = Box::into_raw(Box::new(res)); + Ok(unsafe { *Box::from_raw(ptr as *mut R::Output) }) + } + + async fn send(&mut self, _msg: Outgoing) -> Result<(), Self::SendErr> { + tokio::time::sleep(Duration::from_millis(20)).await; + self.manual_events.borrow_mut().push(ManualEvent); + Ok(()) + } + + fn send_many(self) -> Self::SendMany { + MockSendMany { + manual_events: self.manual_events.clone(), + } + } + + async fn yield_now(&self) { + tokio::time::sleep(Duration::from_millis(10)).await; + self.manual_events.borrow_mut().push(ManualEvent); + } + } + + struct MockSendMany { + manual_events: RefCell>, + } + impl SendMany for MockSendMany { + type Exec = MockMpc; + type Msg = RandomBeaconMsg; + type SendErr = core::convert::Infallible; + async fn send(&mut self, _msg: Outgoing) -> Result<(), Self::SendErr> { + tokio::time::sleep(Duration::from_millis(20)).await; + self.manual_events.borrow_mut().push(ManualEvent); + Ok(()) + } + async fn flush(self) -> Result { + Ok(MockMpc { + manual_events: self.manual_events, + }) + } + } + + /// Random Beacon Example + async fn run_random_beacon(mut mpc: PerfProfiler) -> [u8; 32] { + // --- Round 1: Commit --- + tokio::time::sleep(Duration::from_millis(10)).await; + mpc.send(Outgoing::all_parties(RandomBeaconMsg::Commit([1u8; 32]))) + .await + .ok(); + let _hashes = mpc.complete::(()).await.expect("round 1"); + + // --- Round 2: Reveal --- + tokio::time::sleep(Duration::from_millis(5)).await; + mpc.send(Outgoing::all_parties(RandomBeaconMsg::Decommit)) + .await + .ok(); + let _numbers = mpc.complete::(()).await.expect("round 2"); + + // --- Final: XOR --- + tokio::time::sleep(Duration::from_millis(5)).await; + + // --- Yield: Let others run --- + mpc.yield_now().await; + + [0u8; 32] + } + + #[tokio::test] + async fn test_profiler_random_beacon() { + let inner = MockMpc { + manual_events: RefCell::new(Vec::new()), + }; + let (profiler, handle) = PerfProfiler::new(inner); + + let _result = run_random_beacon(profiler).await; + + let report = handle.into_report(); + println!("{}", report); + + let r1 = report.rounds.iter().find(|r| r.round == 1).unwrap(); + let r2 = report.rounds.iter().find(|r| r.round == 2).unwrap(); + let r0 = report.rounds.iter().find(|r| r.round == 0).unwrap(); + + assert!(r1.computation_time >= Duration::from_millis(10)); + assert!(r1.sent_io_time >= Duration::from_millis(20)); + assert!(r1.recv_io_time >= Duration::from_millis(40)); + + assert!(r2.computation_time >= Duration::from_millis(5)); + assert!(r2.sent_io_time >= Duration::from_millis(20)); + assert!(r2.recv_io_time >= Duration::from_millis(40)); + + // Yield Check + assert!(r0.yield_time >= Duration::from_millis(10)); + } + + #[tokio::test] + async fn test_profiler_statistics() { + let mut reports = Vec::new(); + + // Run the protocol 5 times and collect reports + for _ in 0..5 { + let inner = MockMpc { + manual_events: RefCell::new(Vec::new()), + }; + let (profiler, handle) = PerfProfiler::new(inner); + + let _result = run_random_beacon(profiler).await; + reports.push(handle.into_report()); + } + + // Analyze reports and print aggregated analytics + stats::analyze_reports(&reports); + } +}