From da76e14f83edf8e4ae4a7c691dee0b44f938fc0a Mon Sep 17 00:00:00 2001 From: theroguevigilante Date: Mon, 13 Apr 2026 00:05:35 +0530 Subject: [PATCH 1/6] feat(profiler): implement generic PerfProfiler for MPC protocols Signed-off-by: theroguevigilante --- round-based/Cargo.toml | 5 +- round-based/src/lib.rs | 2 +- round-based/src/mpc/mod.rs | 3 + round-based/src/mpc/profiler/mod.rs | 8 + round-based/src/mpc/profiler/profiling.rs | 55 ++++++ round-based/src/mpc/profiler/stats.rs | 94 ++++++++++ round-based/src/mpc/profiler/wrapper.rs | 218 ++++++++++++++++++++++ round-based/tests/perf_test.rs | 163 ++++++++++++++++ 8 files changed, 545 insertions(+), 3 deletions(-) create mode 100644 round-based/src/mpc/profiler/mod.rs create mode 100644 round-based/src/mpc/profiler/profiling.rs create mode 100644 round-based/src/mpc/profiler/stats.rs create mode 100644 round-based/src/mpc/profiler/wrapper.rs create mode 100644 round-based/tests/perf_test.rs diff --git a/round-based/Cargo.toml b/round-based/Cargo.toml index 7ee4949..08ee80b 100644 --- a/round-based/Cargo.toml +++ b/round-based/Cargo.toml @@ -23,7 +23,7 @@ thiserror = { version = "2", default-features = false } round-based-derive = { version = "0.5.0-alpha.0", optional = true, path = "../round-based-derive" } -tokio = { version = "1", features = ["rt"], optional = true } +tokio = { version = "1", features = ["rt", "time"], optional = true } tokio-stream = { version = "0.1", features = ["sync"], optional = true } pin-project-lite = "0.2" @@ -36,7 +36,7 @@ udigest = { version = "0.2", default-features = false, features = ["alloc", "dig trybuild = "1" matches = "0.1" futures = { version = "0.3", default-features = false } -tokio = { version = "1", features = ["macros"] } +tokio = { version = "1", features = ["macros", "time", "rt"] } hex = "0.4" rand = "0.8" @@ -53,6 +53,7 @@ sim = ["state-machine"] sim-async = ["sim", "tokio/sync", "tokio-stream", "futures-util/alloc"] derive = ["round-based-derive"] runtime-tokio = ["tokio"] +std = [] echo-broadcast = ["dep:digest", "dep:udigest"] diff --git a/round-based/src/lib.rs b/round-based/src/lib.rs index b6859b0..ce0dee0 100644 --- a/round-based/src/lib.rs +++ b/round-based/src/lib.rs @@ -128,7 +128,7 @@ #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg, doc_cfg_hide))] #![warn(unused_crate_dependencies, missing_docs)] #![allow(async_fn_in_trait)] -#![no_std] +#![cfg_attr(not(feature = "std"), no_std)] extern crate alloc; diff --git a/round-based/src/mpc/mod.rs b/round-based/src/mpc/mod.rs index 2eda512..041aa2c 100644 --- a/round-based/src/mpc/mod.rs +++ b/round-based/src/mpc/mod.rs @@ -346,3 +346,6 @@ where { MpcParty::connected_halves(incomings, outgoings) } + +#[cfg(feature = "std")] +pub mod profiler; diff --git a/round-based/src/mpc/profiler/mod.rs b/round-based/src/mpc/profiler/mod.rs new file mode 100644 index 0000000..72b6334 --- /dev/null +++ b/round-based/src/mpc/profiler/mod.rs @@ -0,0 +1,8 @@ +//! Performance profiler for MPC execution. + +/// Statistics aggregation. +pub mod stats; +/// Performance reporting. +pub mod profiling; +/// Profiler wrapper. +pub mod wrapper; diff --git a/round-based/src/mpc/profiler/profiling.rs b/round-based/src/mpc/profiler/profiling.rs new file mode 100644 index 0000000..8b1b087 --- /dev/null +++ b/round-based/src/mpc/profiler/profiling.rs @@ -0,0 +1,55 @@ +use std::fmt; +use std::time::Duration; + +/// Statistics for a single round of an MPC protocol. +#[derive(Debug, Clone, Default)] +pub struct RoundStats { + /// Number of the round. + pub round: usize, + /// Time spent on computation during this round. + pub computation_time: Duration, + /// Time spent on I/O operations during this round. + pub io_time: Duration, +} + +/// A full performance report for a single protocol execution. +#[derive(Debug, Clone, Default)] +pub struct PerfReport { + /// Statistics for each round. + pub rounds: Vec, +} + +impl PerfReport { + /// Calculates the total computation time across all rounds. + pub fn total_computation(&self) -> Duration { + self.rounds.iter().map(|r| r.computation_time).sum() + } + + /// Calculates the total I/O time across all rounds. + pub fn total_io(&self) -> Duration { + self.rounds.iter().map(|r| r.io_time).sum() + } + + /// Calculates the total execution time (computation + I/O). + pub fn total_time(&self) -> Duration { + self.total_computation() + self.total_io() + } +} + +impl fmt::Display for PerfReport { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + writeln!(f, "=== MPC Performance Report ===")?; + for stat in &self.rounds { + writeln!( + f, + "Round {}: Computation: {:?}, I/O: {:?}", + stat.round, stat.computation_time, stat.io_time + )?; + } + writeln!(f, "------------------------------")?; + writeln!(f, "Total Computation: {:?}", self.total_computation())?; + writeln!(f, "Total I/O: {:?}", self.total_io())?; + writeln!(f, "Total Time: {:?}", self.total_time())?; + Ok(()) + } +} diff --git a/round-based/src/mpc/profiler/stats.rs b/round-based/src/mpc/profiler/stats.rs new file mode 100644 index 0000000..a38812b --- /dev/null +++ b/round-based/src/mpc/profiler/stats.rs @@ -0,0 +1,94 @@ +//! Statistics for MPC protocol execution. + +use super::profiling::PerfReport; +use std::time::Duration; + +/// Aggregated statistics for a set of durations. +#[derive(Debug)] +pub struct AggregatedStats { + /// Name of the metric. + pub metric_name: String, + /// Mean duration. + pub mean: Duration, + /// Standard deviation of durations. + pub std_dev: Duration, + /// Median (50th percentile) duration. + pub p50: Duration, + /// 75th percentile duration. + pub p75: Duration, + /// 90th percentile duration. + pub p90: Duration, +} + +impl std::fmt::Display for AggregatedStats { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{:<20} | Mean: {:<10} | Dev: {:<10} | p50: {:<10} | p75: {:<10} | p90: {:<10}", + self.metric_name, + format!("{:?}", self.mean), + format!("{:?}", self.std_dev), + format!("{:?}", self.p50), + format!("{:?}", self.p75), + format!("{:?}", self.p90) + ) + } +} + +/// Analyzes a set of durations and returns aggregated statistics. +pub fn analyze_durations(name: &str, mut durations: Vec) -> AggregatedStats { + if durations.is_empty() { + return AggregatedStats { + metric_name: name.to_string(), + mean: Duration::ZERO, + std_dev: Duration::ZERO, + p50: Duration::ZERO, + p75: Duration::ZERO, + p90: Duration::ZERO, + }; + } + + durations.sort_unstable(); + let len = durations.len(); + + let sum: Duration = durations.iter().sum(); + let mean = sum / len as u32; + + let variance_secs = durations + .iter() + .map(|&d| { + let diff = d.abs_diff(mean); + diff.as_secs_f64().powi(2) + }) + .sum::() + / len as f64; + + let std_dev = Duration::from_secs_f64(variance_secs.sqrt()); + + let p50 = durations[(len as f64 * 0.50).floor() as usize]; + let p75 = durations[(len as f64 * 0.75).floor() as usize]; + let p90 = durations[(len as f64 * 0.90).floor() as usize]; + + AggregatedStats { + metric_name: name.to_string(), + mean, + std_dev, + p50, + p75, + p90, + } +} + +/// Helper to consume multiple reports and print aggregated analytics +pub fn analyze_reports(reports: &[PerfReport]) { + // Here you would extract vectors of total_computation, total_io, etc. + // across all reports and pass them to `analyze_durations`. + + let mut total_times = Vec::with_capacity(reports.len()); + for report in reports { + total_times.push(report.total_time()); + } + + let stats = analyze_durations("Total Execution Time", total_times); + println!("{}", stats); +} diff --git a/round-based/src/mpc/profiler/wrapper.rs b/round-based/src/mpc/profiler/wrapper.rs new file mode 100644 index 0000000..ea8fa7e --- /dev/null +++ b/round-based/src/mpc/profiler/wrapper.rs @@ -0,0 +1,218 @@ +use std::time::{Duration, Instant}; + +use crate::{MpcExecution, Outgoing, RoundMsg, mpc::SendMany, round::RoundInfo}; + +use super::profiling::{PerfReport, RoundStats}; + +/// Extension trait that allows to wrap any MPC execution with a performance profiler. +pub trait ProfilerExt: MpcExecution + Sized { + /// Wraps the MPC execution with a performance profiler. + fn profile(self) -> PerfProfiler { + PerfProfiler::new(self) + } +} + +impl ProfilerExt for M {} + +/// A wrapper around an MPC execution that measures performance. +/// +/// It measures computation time (time between MPC calls) and I/O time (time spent inside MPC calls). +pub struct PerfProfiler { + inner: M, + report: PerfReport, + last_resume: Instant, + current_round: usize, + current_comp_time: Duration, +} + +impl PerfProfiler { + /// Creates a new performance profiler. + pub fn new(inner: M) -> Self { + Self { + inner, + report: PerfReport::default(), + last_resume: Instant::now(), + current_round: 1, + current_comp_time: Duration::ZERO, + } + } + + /// Consumes the profiler and returns the performance report. + pub fn into_report(mut self) -> PerfReport { + self.current_comp_time += self.last_resume.elapsed(); + if self.current_comp_time != Duration::ZERO { + self.update_report(self.current_round, self.current_comp_time, Duration::ZERO); + } + self.report + } + + /// Returns a reference to the inner MPC execution. + pub fn get_ref(&self) -> &M { + &self.inner + } + + /// Returns a mutable reference to the inner MPC execution. + pub fn get_mut(&mut self) -> &mut M { + &mut self.inner + } + + /// Consumes the profiler and returns the inner MPC execution. + pub fn into_inner(self) -> M { + self.inner + } + + fn update_report(&mut self, round: usize, comp_time: Duration, io_time: Duration) { + if let Some(last) = self.report.rounds.last_mut() + && last.round == round + { + last.computation_time += comp_time; + last.io_time += io_time; + return; + } + self.report.rounds.push(RoundStats { + round, + computation_time: comp_time, + io_time, + }); + } +} + +impl MpcExecution for PerfProfiler { + type Round = M::Round; + type Msg = M::Msg; + type CompleteRoundErr = M::CompleteRoundErr; + type SendErr = M::SendErr; + type SendMany = ProfilerSendMany; + + async fn complete( + &mut self, + round: Self::Round, + ) -> Result> + where + R: RoundInfo, + Self::Msg: RoundMsg, + { + self.current_comp_time += self.last_resume.elapsed(); + + let io_start = Instant::now(); + let result = self.inner.complete(round).await; + let io_time = io_start.elapsed(); + + let round_idx = >::ROUND as usize; + + self.update_report(round_idx, self.current_comp_time, io_time); + + self.current_round = round_idx + 1; + self.current_comp_time = Duration::ZERO; + self.last_resume = Instant::now(); + + result + } + + async fn send(&mut self, msg: Outgoing) -> Result<(), Self::SendErr> { + self.current_comp_time += self.last_resume.elapsed(); + + let io_start = Instant::now(); + let result = self.inner.send(msg).await; + let io_time = io_start.elapsed(); + + self.update_report(self.current_round, self.current_comp_time, io_time); + + self.current_comp_time = Duration::ZERO; + self.last_resume = Instant::now(); + + result + } + + fn send_many(self) -> Self::SendMany { + ProfilerSendMany { + inner: self.inner.send_many(), + report: self.report, + last_resume: self.last_resume, + current_round: self.current_round, + current_comp_time: self.current_comp_time, + } + } + + async fn yield_now(&self) { + self.inner.yield_now().await; + } +} + +/// A wrapper around [`SendMany`] that measures performance. +pub struct ProfilerSendMany { + inner: S, + report: PerfReport, + last_resume: Instant, + current_round: usize, + current_comp_time: Duration, +} + +impl SendMany for ProfilerSendMany { + type Exec = PerfProfiler; + type Msg = S::Msg; + type SendErr = S::SendErr; + + async fn send(&mut self, msg: Outgoing) -> Result<(), S::SendErr> { + self.current_comp_time += self.last_resume.elapsed(); + + let io_start = Instant::now(); + let result = self.inner.send(msg).await; + let io_time = io_start.elapsed(); + + self.update_report(self.current_round, self.current_comp_time, io_time); + + self.current_comp_time = Duration::ZERO; + self.last_resume = Instant::now(); + + result + } + + async fn flush(self) -> Result { + let current_comp_time = self.current_comp_time + self.last_resume.elapsed(); + + let io_start = Instant::now(); + let result = self.inner.flush().await; + let io_time = io_start.elapsed(); + + let mut report = self.report; + let profiler_inner = result?; + + // We need to update the report before returning + update_report_static(&mut report, self.current_round, current_comp_time, io_time); + + Ok(PerfProfiler { + inner: profiler_inner, + report, + last_resume: Instant::now(), + current_round: self.current_round, + current_comp_time: Duration::ZERO, + }) + } +} + +impl ProfilerSendMany { + fn update_report(&mut self, round: usize, comp_time: Duration, io_time: Duration) { + update_report_static(&mut self.report, round, comp_time, io_time); + } +} + +fn update_report_static( + report: &mut PerfReport, + round: usize, + comp_time: Duration, + io_time: Duration, +) { + if let Some(last) = report.rounds.last_mut() + && last.round == round + { + last.computation_time += comp_time; + last.io_time += io_time; + return; + } + report.rounds.push(RoundStats { + round, + computation_time: comp_time, + io_time, + }); +} diff --git a/round-based/tests/perf_test.rs b/round-based/tests/perf_test.rs new file mode 100644 index 0000000..dab7a92 --- /dev/null +++ b/round-based/tests/perf_test.rs @@ -0,0 +1,163 @@ +#[cfg(feature = "std")] +mod tests { + use round_based::{mpc::profiler::{wrapper::PerfProfiler, stats}, MpcExecution, Outgoing, RoundMsg, ProtocolMsg}; + use std::time::Duration; + use std::thread; + + struct MockMpc; + + #[derive(Clone, Debug)] + enum MockMsg { + Round1(()), + } + + impl ProtocolMsg for MockMsg { + fn round(&self) -> u16 { + match self { + MockMsg::Round1(_) => 1, + } + } + } + + impl RoundMsg<()> for MockMsg { + const ROUND: u16 = 1; + fn to_protocol_msg(m: ()) -> Self { MockMsg::Round1(m) } + fn from_protocol_msg(protocol_msg: Self) -> Result<(), Self> { + match protocol_msg { + MockMsg::Round1(m) => Ok(m), + } + } + } + + impl MpcExecution for MockMpc { + type Round = (); + type Msg = MockMsg; + type CompleteRoundErr = core::convert::Infallible; + type SendErr = core::convert::Infallible; + type SendMany = MockSendMany; + + async fn complete( + &mut self, + _round: Self::Round, + ) -> Result> + where + R: round_based::round::RoundInfo, + Self::Msg: RoundMsg, + { + tokio::time::sleep(Duration::from_millis(100)).await; + unreachable!() + } + + async fn send(&mut self, _msg: Outgoing) -> Result<(), Self::SendErr> { + tokio::time::sleep(Duration::from_millis(100)).await; + Ok(()) + } + + fn send_many(self) -> Self::SendMany { + MockSendMany + } + + async fn yield_now(&self) {} + } + + struct MockSendMany; + impl round_based::mpc::SendMany for MockSendMany { + type Exec = MockMpc; + type Msg = MockMsg; + type SendErr = core::convert::Infallible; + + async fn send(&mut self, _msg: Outgoing) -> Result<(), Self::SendErr> { + tokio::time::sleep(Duration::from_millis(100)).await; + Ok(()) + } + + async fn flush(self) -> Result { + Ok(MockMpc) + } + } + + impl MockMpc { + async fn simulate_round(&self) { + // Simulate "Pure Computation" + thread::sleep(Duration::from_millis(50)); + } + } + + #[tokio::test] + async fn test_profiler_captures_correct_times() { + let inner = MockMpc; + let mut profiler = PerfProfiler::new(inner); + + // --- ROUND 1 --- + // 1. Computation happens + profiler.get_ref().simulate_round().await; + + // 2. I/O happens via send + profiler.send(Outgoing::all_parties(MockMsg::Round1(()))).await.unwrap(); + + let report = profiler.into_report(); + + // Check if computation is at least 50ms + assert!(report.total_computation() >= Duration::from_millis(50), "Computation time was {:?}", report.total_computation()); + // Check if I/O is at least 100ms + assert!(report.total_io() >= Duration::from_millis(100), "IO time was {:?}", report.total_io()); + + println!("{}", report); + } + + #[test] + fn test_statistical_analysis() { + use round_based::mpc::profiler::{profiling::PerfReport, profiling::RoundStats}; + + // Create dummy reports to test the math + let mut reports = Vec::new(); + for i in 1..=10 { + reports.push(PerfReport { + rounds: vec![RoundStats { + round: 1, + computation_time: Duration::from_millis(i * 10), // 10, 20, ... 100 + io_time: Duration::from_millis(50), + }], + }); + } + + // Capture total times + let total_times: Vec = reports.iter().map(|r| r.total_time()).collect(); + let analysis = stats::analyze_durations("Batch Execution", total_times); + + // Verification + assert_eq!(analysis.metric_name, "Batch Execution"); + assert!(analysis.p50 >= Duration::from_millis(50)); + assert!(analysis.p90 >= Duration::from_millis(90)); + + // Test the Display for stats + let stats_output = format!("{}", analysis); + assert!(stats_output.contains("Mean")); + println!("{}", stats_output); + } + + #[test] + fn test_report_display_formatting() { + use round_based::mpc::profiler::{profiling::PerfReport, profiling::RoundStats}; + + let report = PerfReport { + rounds: vec![ + RoundStats { + round: 1, + computation_time: Duration::from_millis(15), + io_time: Duration::from_millis(45), + }, + RoundStats { + round: 2, + computation_time: Duration::from_millis(20), + io_time: Duration::from_millis(30), + }, + ], + }; + + let output = format!("{}", report); + assert!(output.contains("Round 1")); + assert!(output.contains("Round 2")); + assert!(output.contains("Total Time")); + } +} From 21c4c7dbb9c403705baad6e0c921f557e7796b0a Mon Sep 17 00:00:00 2001 From: theroguevigilante Date: Wed, 15 Apr 2026 00:00:55 +0530 Subject: [PATCH 2/6] feat(profiler): feature-gate profiler behind perf-profiler Signed-off-by: theroguevigilante --- round-based/Cargo.toml | 3 ++- round-based/src/mpc/mod.rs | 2 +- round-based/tests/perf_test.rs | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/round-based/Cargo.toml b/round-based/Cargo.toml index 08ee80b..1316c11 100644 --- a/round-based/Cargo.toml +++ b/round-based/Cargo.toml @@ -23,7 +23,7 @@ thiserror = { version = "2", default-features = false } round-based-derive = { version = "0.5.0-alpha.0", optional = true, path = "../round-based-derive" } -tokio = { version = "1", features = ["rt", "time"], optional = true } +tokio = { version = "1", features = ["rt"], optional = true } tokio-stream = { version = "0.1", features = ["sync"], optional = true } pin-project-lite = "0.2" @@ -48,6 +48,7 @@ sha2 = "0.10" [features] default = [] +perf-profiler = ["std"] state-machine = [] sim = ["state-machine"] sim-async = ["sim", "tokio/sync", "tokio-stream", "futures-util/alloc"] diff --git a/round-based/src/mpc/mod.rs b/round-based/src/mpc/mod.rs index 041aa2c..a827fce 100644 --- a/round-based/src/mpc/mod.rs +++ b/round-based/src/mpc/mod.rs @@ -347,5 +347,5 @@ where MpcParty::connected_halves(incomings, outgoings) } -#[cfg(feature = "std")] +#[cfg(feature = "perf-profiler")] pub mod profiler; diff --git a/round-based/tests/perf_test.rs b/round-based/tests/perf_test.rs index dab7a92..8a36f39 100644 --- a/round-based/tests/perf_test.rs +++ b/round-based/tests/perf_test.rs @@ -1,4 +1,4 @@ -#[cfg(feature = "std")] +#[cfg(feature = "perf-profiler")] mod tests { use round_based::{mpc::profiler::{wrapper::PerfProfiler, stats}, MpcExecution, Outgoing, RoundMsg, ProtocolMsg}; use std::time::Duration; From 10e74cb80c0e5d78b5d84b353a2d3238a1fb93e7 Mon Sep 17 00:00:00 2001 From: theroguevigilante Date: Wed, 15 Apr 2026 21:10:39 +0530 Subject: [PATCH 3/6] fix(round-based): enforce strict no_std and fix tests, fmt Signed-off-by: theroguevigilante --- round-based/Cargo.toml | 4 +++ round-based/src/lib.rs | 4 ++- round-based/src/mpc/profiler/mod.rs | 4 +-- round-based/src/mpc/profiler/profiling.rs | 1 + round-based/src/mpc/profiler/stats.rs | 3 +++ round-based/src/mpc/profiler/wrapper.rs | 16 +++++++----- round-based/tests/perf_test.rs | 32 +++++++++++++++++------ 7 files changed, 47 insertions(+), 17 deletions(-) diff --git a/round-based/Cargo.toml b/round-based/Cargo.toml index 1316c11..19f8f43 100644 --- a/round-based/Cargo.toml +++ b/round-based/Cargo.toml @@ -61,3 +61,7 @@ echo-broadcast = ["dep:digest", "dep:udigest"] [[test]] name = "derive" required-features = ["derive"] + +[[test]] +name = "perf_test" +required-features = ["perf-profiler"] diff --git a/round-based/src/lib.rs b/round-based/src/lib.rs index ce0dee0..67d2554 100644 --- a/round-based/src/lib.rs +++ b/round-based/src/lib.rs @@ -125,12 +125,14 @@ //! ## Join us in Discord! //! Feel free to reach out to us [in Discord](https://discordapp.com/channels/905194001349627914/1285268686147424388)! +#![no_std] #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg, doc_cfg_hide))] #![warn(unused_crate_dependencies, missing_docs)] #![allow(async_fn_in_trait)] -#![cfg_attr(not(feature = "std"), no_std)] extern crate alloc; +#[cfg(feature = "std")] +extern crate std; /// Fixes false-positive of `unused_crate_dependencies` lint that only occur in the tests #[cfg(test)] diff --git a/round-based/src/mpc/profiler/mod.rs b/round-based/src/mpc/profiler/mod.rs index 72b6334..497ccd2 100644 --- a/round-based/src/mpc/profiler/mod.rs +++ b/round-based/src/mpc/profiler/mod.rs @@ -1,8 +1,8 @@ //! Performance profiler for MPC execution. -/// Statistics aggregation. -pub mod stats; /// Performance reporting. pub mod profiling; +/// Statistics aggregation. +pub mod stats; /// Profiler wrapper. pub mod wrapper; diff --git a/round-based/src/mpc/profiler/profiling.rs b/round-based/src/mpc/profiler/profiling.rs index 8b1b087..c78708b 100644 --- a/round-based/src/mpc/profiler/profiling.rs +++ b/round-based/src/mpc/profiler/profiling.rs @@ -1,5 +1,6 @@ use std::fmt; use std::time::Duration; +use std::vec::Vec; /// Statistics for a single round of an MPC protocol. #[derive(Debug, Clone, Default)] diff --git a/round-based/src/mpc/profiler/stats.rs b/round-based/src/mpc/profiler/stats.rs index a38812b..d8768a8 100644 --- a/round-based/src/mpc/profiler/stats.rs +++ b/round-based/src/mpc/profiler/stats.rs @@ -1,7 +1,10 @@ //! Statistics for MPC protocol execution. use super::profiling::PerfReport; +use std::string::{String, ToString}; use std::time::Duration; +use std::vec::Vec; +use std::{format, println}; /// Aggregated statistics for a set of durations. #[derive(Debug)] diff --git a/round-based/src/mpc/profiler/wrapper.rs b/round-based/src/mpc/profiler/wrapper.rs index ea8fa7e..812b21d 100644 --- a/round-based/src/mpc/profiler/wrapper.rs +++ b/round-based/src/mpc/profiler/wrapper.rs @@ -1,20 +1,24 @@ use std::time::{Duration, Instant}; -use crate::{MpcExecution, Outgoing, RoundMsg, mpc::SendMany, round::RoundInfo}; +use crate::{ + Mpc, MpcExecution, Outgoing, RoundMsg, + mpc::SendMany, + round::{RoundInfo, RoundStore}, +}; use super::profiling::{PerfReport, RoundStats}; -/// Extension trait that allows to wrap any MPC execution with a performance profiler. -pub trait ProfilerExt: MpcExecution + Sized { - /// Wraps the MPC execution with a performance profiler. +/// Extension trait that allows to wrap any MPC engine or execution with a performance profiler. +pub trait ProfilerExt: Sized { + /// Wraps the MPC engine or execution with a performance profiler. fn profile(self) -> PerfProfiler { PerfProfiler::new(self) } } -impl ProfilerExt for M {} +impl ProfilerExt for M {} -/// A wrapper around an MPC execution that measures performance. +/// A wrapper around an MPC engine or execution that measures performance. /// /// It measures computation time (time between MPC calls) and I/O time (time spent inside MPC calls). pub struct PerfProfiler { diff --git a/round-based/tests/perf_test.rs b/round-based/tests/perf_test.rs index 8a36f39..45bb773 100644 --- a/round-based/tests/perf_test.rs +++ b/round-based/tests/perf_test.rs @@ -1,8 +1,11 @@ #[cfg(feature = "perf-profiler")] mod tests { - use round_based::{mpc::profiler::{wrapper::PerfProfiler, stats}, MpcExecution, Outgoing, RoundMsg, ProtocolMsg}; - use std::time::Duration; + use round_based::{ + MpcExecution, Outgoing, ProtocolMsg, RoundMsg, + mpc::profiler::{stats, wrapper::PerfProfiler}, + }; use std::thread; + use std::time::Duration; struct MockMpc; @@ -21,7 +24,9 @@ mod tests { impl RoundMsg<()> for MockMsg { const ROUND: u16 = 1; - fn to_protocol_msg(m: ()) -> Self { MockMsg::Round1(m) } + fn to_protocol_msg(m: ()) -> Self { + MockMsg::Round1(m) + } fn from_protocol_msg(protocol_msg: Self) -> Result<(), Self> { match protocol_msg { MockMsg::Round1(m) => Ok(m), @@ -91,17 +96,28 @@ mod tests { // --- ROUND 1 --- // 1. Computation happens profiler.get_ref().simulate_round().await; - + // 2. I/O happens via send - profiler.send(Outgoing::all_parties(MockMsg::Round1(()))).await.unwrap(); + profiler + .send(Outgoing::all_parties(MockMsg::Round1(()))) + .await + .unwrap(); let report = profiler.into_report(); // Check if computation is at least 50ms - assert!(report.total_computation() >= Duration::from_millis(50), "Computation time was {:?}", report.total_computation()); + assert!( + report.total_computation() >= Duration::from_millis(50), + "Computation time was {:?}", + report.total_computation() + ); // Check if I/O is at least 100ms - assert!(report.total_io() >= Duration::from_millis(100), "IO time was {:?}", report.total_io()); - + assert!( + report.total_io() >= Duration::from_millis(100), + "IO time was {:?}", + report.total_io() + ); + println!("{}", report); } From b3bb7528b02fd5ec90d0cf5abb6d4f8347d2c4fc Mon Sep 17 00:00:00 2001 From: theroguevigilante Date: Fri, 17 Apr 2026 01:05:48 +0530 Subject: [PATCH 4/6] feat(profiler): redesign MPC profiler for full lifecycle - Wrap `Mpc` trait to profile both setup and execution - Split I/O into sent, received, and yield (scheduler wait) - Use interior mutability (RefCell) to profile `&self` calls - Centralize merging logic in `PerfReport::apply_stats` - Remove redundant fields and deprecated `ProfilerExt` - Refactor existing tests to match new schema Signed-off-by: theroguevigilante --- round-based/src/mpc/profiler/profiling.rs | 67 ++++++- round-based/src/mpc/profiler/stats.rs | 1 + round-based/src/mpc/profiler/wrapper.rs | 226 ++++++++++++---------- round-based/tests/perf_test.rs | 87 +++++---- 4 files changed, 240 insertions(+), 141 deletions(-) diff --git a/round-based/src/mpc/profiler/profiling.rs b/round-based/src/mpc/profiler/profiling.rs index c78708b..cc40239 100644 --- a/round-based/src/mpc/profiler/profiling.rs +++ b/round-based/src/mpc/profiler/profiling.rs @@ -9,8 +9,12 @@ pub struct RoundStats { pub round: usize, /// Time spent on computation during this round. pub computation_time: Duration, - /// Time spent on I/O operations during this round. - pub io_time: Duration, + /// Time spent on sending messages during this round. + pub sent_io_time: Duration, + /// Time spent on receiving messages during this round. + pub recv_io_time: Duration, + /// Time spent on waiting for the scheduler (yield_now). + pub yield_time: Duration, } /// A full performance report for a single protocol execution. @@ -21,14 +25,57 @@ pub struct PerfReport { } impl PerfReport { + /// Applies new statistics to the report. + /// + /// If an entry for the same round already exists, the statistics are added to it. + /// Otherwise, a new entry is created. + pub fn apply_stats( + &mut self, + round: usize, + computation: Duration, + sent_io: Duration, + recv_io: Duration, + yield_time: Duration, + ) { + if let Some(existing) = self.rounds.iter_mut().find(|r| r.round == round) { + existing.computation_time += computation; + existing.sent_io_time += sent_io; + existing.recv_io_time += recv_io; + existing.yield_time += yield_time; + return; + } + self.rounds.push(RoundStats { + round, + computation_time: computation, + sent_io_time: sent_io, + recv_io_time: recv_io, + yield_time, + }); + } + /// Calculates the total computation time across all rounds. pub fn total_computation(&self) -> Duration { self.rounds.iter().map(|r| r.computation_time).sum() } - /// Calculates the total I/O time across all rounds. + /// Calculates the total I/O time spent on sending across all rounds. + pub fn total_sent_io(&self) -> Duration { + self.rounds.iter().map(|r| r.sent_io_time).sum() + } + + /// Calculates the total I/O time spent on receiving across all rounds. + pub fn total_recv_io(&self) -> Duration { + self.rounds.iter().map(|r| r.recv_io_time).sum() + } + + /// Calculates the total I/O time spent on yielding across all rounds. + pub fn total_yield(&self) -> Duration { + self.rounds.iter().map(|r| r.yield_time).sum() + } + + /// Calculates the total I/O time across all rounds (send + recv + yield). pub fn total_io(&self) -> Duration { - self.rounds.iter().map(|r| r.io_time).sum() + self.total_sent_io() + self.total_recv_io() + self.total_yield() } /// Calculates the total execution time (computation + I/O). @@ -43,13 +90,19 @@ impl fmt::Display for PerfReport { for stat in &self.rounds { writeln!( f, - "Round {}: Computation: {:?}, I/O: {:?}", - stat.round, stat.computation_time, stat.io_time + "Round {}: Computation: {:?}, Sent I/O: {:?}, Recv I/O: {:?}, Yield: {:?}", + stat.round, + stat.computation_time, + stat.sent_io_time, + stat.recv_io_time, + stat.yield_time )?; } writeln!(f, "------------------------------")?; writeln!(f, "Total Computation: {:?}", self.total_computation())?; - writeln!(f, "Total I/O: {:?}", self.total_io())?; + writeln!(f, "Total Sent I/O: {:?}", self.total_sent_io())?; + writeln!(f, "Total Recv I/O: {:?}", self.total_recv_io())?; + writeln!(f, "Total Yield: {:?}", self.total_yield())?; writeln!(f, "Total Time: {:?}", self.total_time())?; Ok(()) } diff --git a/round-based/src/mpc/profiler/stats.rs b/round-based/src/mpc/profiler/stats.rs index d8768a8..c1e2bf6 100644 --- a/round-based/src/mpc/profiler/stats.rs +++ b/round-based/src/mpc/profiler/stats.rs @@ -25,6 +25,7 @@ pub struct AggregatedStats { impl std::fmt::Display for AggregatedStats { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + // Table-like formatting: {: PerfProfiler { - PerfProfiler::new(self) - } -} - -impl ProfilerExt for M {} +use super::profiling::PerfReport; /// A wrapper around an MPC engine or execution that measures performance. /// /// It measures computation time (time between MPC calls) and I/O time (time spent inside MPC calls). pub struct PerfProfiler { inner: M, - report: PerfReport, - last_resume: Instant, - current_round: usize, - current_comp_time: Duration, + report: RefCell, + last_resume: Cell, } impl PerfProfiler { @@ -34,20 +23,25 @@ impl PerfProfiler { pub fn new(inner: M) -> Self { Self { inner, - report: PerfReport::default(), - last_resume: Instant::now(), - current_round: 1, - current_comp_time: Duration::ZERO, + report: RefCell::new(PerfReport::default()), + last_resume: Cell::new(Instant::now()), } } /// Consumes the profiler and returns the performance report. - pub fn into_report(mut self) -> PerfReport { - self.current_comp_time += self.last_resume.elapsed(); - if self.current_comp_time != Duration::ZERO { - self.update_report(self.current_round, self.current_comp_time, Duration::ZERO); + pub fn into_report(self) -> PerfReport { + let elapsed = self.last_resume.get().elapsed(); + if elapsed != Duration::ZERO { + // Attribute trailing time to round 0 (Global/Teardown) + self.report.borrow_mut().apply_stats( + 0, + elapsed, + Duration::ZERO, + Duration::ZERO, + Duration::ZERO, + ); } - self.report + self.report.into_inner() } /// Returns a reference to the inner MPC execution. @@ -65,23 +59,67 @@ impl PerfProfiler { self.inner } - fn update_report(&mut self, round: usize, comp_time: Duration, io_time: Duration) { - if let Some(last) = self.report.rounds.last_mut() - && last.round == round - { - last.computation_time += comp_time; - last.io_time += io_time; - return; + fn update_report( + &self, + round: usize, + comp_time: Duration, + sent_io: Duration, + recv_io: Duration, + yield_time: Duration, + ) { + self.report + .borrow_mut() + .apply_stats(round, comp_time, sent_io, recv_io, yield_time); + } +} + +impl Mpc for PerfProfiler +where + M::Msg: ProtocolMsg, +{ + type Msg = M::Msg; + type Exec = PerfProfiler; + type SendErr = M::SendErr; + + fn add_round(&mut self, round: R) -> ::Round + where + R: RoundStore, + Self::Msg: RoundMsg, + { + let elapsed = self.last_resume.get().elapsed(); + let round_idx = >::ROUND as usize; + self.update_report( + round_idx, + elapsed, + Duration::ZERO, + Duration::ZERO, + Duration::ZERO, + ); + + let res = self.inner.add_round(round); + self.last_resume.set(Instant::now()); + res + } + + fn finish_setup(self) -> Self::Exec { + let elapsed = self.last_resume.get().elapsed(); + if elapsed != Duration::ZERO { + // Attribute setup completion time to round 0 + self.update_report(0, elapsed, Duration::ZERO, Duration::ZERO, Duration::ZERO); + } + + PerfProfiler { + inner: self.inner.finish_setup(), + report: self.report, + last_resume: Cell::new(Instant::now()), } - self.report.rounds.push(RoundStats { - round, - computation_time: comp_time, - io_time, - }); } } -impl MpcExecution for PerfProfiler { +impl MpcExecution for PerfProfiler +where + M::Msg: ProtocolMsg, +{ type Round = M::Round; type Msg = M::Msg; type CompleteRoundErr = M::CompleteRoundErr; @@ -96,35 +134,42 @@ impl MpcExecution for PerfProfiler { R: RoundInfo, Self::Msg: RoundMsg, { - self.current_comp_time += self.last_resume.elapsed(); + let comp_time = self.last_resume.get().elapsed(); let io_start = Instant::now(); let result = self.inner.complete(round).await; let io_time = io_start.elapsed(); let round_idx = >::ROUND as usize; + self.update_report( + round_idx, + comp_time, + Duration::ZERO, + io_time, + Duration::ZERO, + ); - self.update_report(round_idx, self.current_comp_time, io_time); - - self.current_round = round_idx + 1; - self.current_comp_time = Duration::ZERO; - self.last_resume = Instant::now(); - + self.last_resume.set(Instant::now()); result } async fn send(&mut self, msg: Outgoing) -> Result<(), Self::SendErr> { - self.current_comp_time += self.last_resume.elapsed(); + let comp_time = self.last_resume.get().elapsed(); + let round_idx = msg.msg.round() as usize; let io_start = Instant::now(); let result = self.inner.send(msg).await; let io_time = io_start.elapsed(); - self.update_report(self.current_round, self.current_comp_time, io_time); - - self.current_comp_time = Duration::ZERO; - self.last_resume = Instant::now(); + self.update_report( + round_idx, + comp_time, + io_time, + Duration::ZERO, + Duration::ZERO, + ); + self.last_resume.set(Instant::now()); result } @@ -133,90 +178,73 @@ impl MpcExecution for PerfProfiler { inner: self.inner.send_many(), report: self.report, last_resume: self.last_resume, - current_round: self.current_round, - current_comp_time: self.current_comp_time, } } async fn yield_now(&self) { + let comp_time = self.last_resume.get().elapsed(); + + let start = Instant::now(); self.inner.yield_now().await; + let yield_time = start.elapsed(); + + // Attribute yield to round 0 + self.update_report(0, comp_time, Duration::ZERO, Duration::ZERO, yield_time); + self.last_resume.set(Instant::now()); } } /// A wrapper around [`SendMany`] that measures performance. pub struct ProfilerSendMany { inner: S, - report: PerfReport, - last_resume: Instant, - current_round: usize, - current_comp_time: Duration, + report: RefCell, + last_resume: Cell, } -impl SendMany for ProfilerSendMany { +impl SendMany for ProfilerSendMany +where + S::Msg: ProtocolMsg, +{ type Exec = PerfProfiler; type Msg = S::Msg; type SendErr = S::SendErr; async fn send(&mut self, msg: Outgoing) -> Result<(), S::SendErr> { - self.current_comp_time += self.last_resume.elapsed(); + let comp_time = self.last_resume.get().elapsed(); + let round_idx = msg.msg.round() as usize; let io_start = Instant::now(); let result = self.inner.send(msg).await; let io_time = io_start.elapsed(); - self.update_report(self.current_round, self.current_comp_time, io_time); - - self.current_comp_time = Duration::ZERO; - self.last_resume = Instant::now(); + self.report.borrow_mut().apply_stats( + round_idx, + comp_time, + io_time, + Duration::ZERO, + Duration::ZERO, + ); + self.last_resume.set(Instant::now()); result } async fn flush(self) -> Result { - let current_comp_time = self.current_comp_time + self.last_resume.elapsed(); + let comp_time = self.last_resume.get().elapsed(); let io_start = Instant::now(); let result = self.inner.flush().await; let io_time = io_start.elapsed(); - let mut report = self.report; - let profiler_inner = result?; - - // We need to update the report before returning - update_report_static(&mut report, self.current_round, current_comp_time, io_time); + // Attribute flush to round 0 + self.report + .borrow_mut() + .apply_stats(0, comp_time, io_time, Duration::ZERO, Duration::ZERO); Ok(PerfProfiler { - inner: profiler_inner, - report, - last_resume: Instant::now(), - current_round: self.current_round, - current_comp_time: Duration::ZERO, + inner: result?, + report: self.report, + last_resume: Cell::new(Instant::now()), }) } } - -impl ProfilerSendMany { - fn update_report(&mut self, round: usize, comp_time: Duration, io_time: Duration) { - update_report_static(&mut self.report, round, comp_time, io_time); - } -} - -fn update_report_static( - report: &mut PerfReport, - round: usize, - comp_time: Duration, - io_time: Duration, -) { - if let Some(last) = report.rounds.last_mut() - && last.round == round - { - last.computation_time += comp_time; - last.io_time += io_time; - return; - } - report.rounds.push(RoundStats { - round, - computation_time: comp_time, - io_time, - }); -} diff --git a/round-based/tests/perf_test.rs b/round-based/tests/perf_test.rs index 45bb773..1ee1a73 100644 --- a/round-based/tests/perf_test.rs +++ b/round-based/tests/perf_test.rs @@ -1,10 +1,9 @@ #[cfg(feature = "perf-profiler")] mod tests { use round_based::{ - MpcExecution, Outgoing, ProtocolMsg, RoundMsg, - mpc::profiler::{stats, wrapper::PerfProfiler}, + Mpc, MpcExecution, Outgoing, ProtocolMsg, RoundMsg, mpc::profiler::profiling::PerfReport, + mpc::profiler::stats, mpc::profiler::wrapper::PerfProfiler, }; - use std::thread; use std::time::Duration; struct MockMpc; @@ -34,6 +33,23 @@ mod tests { } } + impl Mpc for MockMpc { + type Msg = MockMsg; + type Exec = MockMpc; + type SendErr = core::convert::Infallible; + + fn add_round(&mut self, _round: R) -> ::Round + where + R: round_based::round::RoundStore, + Self::Msg: RoundMsg, + { + } + + fn finish_setup(self) -> Self::Exec { + self + } + } + impl MpcExecution for MockMpc { type Round = (); type Msg = MockMsg; @@ -62,7 +78,9 @@ mod tests { MockSendMany } - async fn yield_now(&self) {} + async fn yield_now(&self) { + tokio::time::sleep(Duration::from_millis(10)).await; + } } struct MockSendMany; @@ -82,9 +100,9 @@ mod tests { } impl MockMpc { - async fn simulate_round(&self) { + fn simulate_computation(&self) { // Simulate "Pure Computation" - thread::sleep(Duration::from_millis(50)); + std::thread::sleep(Duration::from_millis(50)); } } @@ -95,7 +113,7 @@ mod tests { // --- ROUND 1 --- // 1. Computation happens - profiler.get_ref().simulate_round().await; + profiler.get_ref().simulate_computation(); // 2. I/O happens via send profiler @@ -113,9 +131,9 @@ mod tests { ); // Check if I/O is at least 100ms assert!( - report.total_io() >= Duration::from_millis(100), - "IO time was {:?}", - report.total_io() + report.total_sent_io() >= Duration::from_millis(100), + "Sent IO time was {:?}", + report.total_sent_io() ); println!("{}", report); @@ -123,18 +141,18 @@ mod tests { #[test] fn test_statistical_analysis() { - use round_based::mpc::profiler::{profiling::PerfReport, profiling::RoundStats}; - // Create dummy reports to test the math let mut reports = Vec::new(); for i in 1..=10 { - reports.push(PerfReport { - rounds: vec![RoundStats { - round: 1, - computation_time: Duration::from_millis(i * 10), // 10, 20, ... 100 - io_time: Duration::from_millis(50), - }], - }); + let mut report = PerfReport::default(); + report.apply_stats( + 1, + Duration::from_millis(i * 10), // 10, 20, ... 100 + Duration::from_millis(50), + Duration::ZERO, + Duration::ZERO, + ); + reports.push(report); } // Capture total times @@ -154,22 +172,21 @@ mod tests { #[test] fn test_report_display_formatting() { - use round_based::mpc::profiler::{profiling::PerfReport, profiling::RoundStats}; - - let report = PerfReport { - rounds: vec![ - RoundStats { - round: 1, - computation_time: Duration::from_millis(15), - io_time: Duration::from_millis(45), - }, - RoundStats { - round: 2, - computation_time: Duration::from_millis(20), - io_time: Duration::from_millis(30), - }, - ], - }; + let mut report = PerfReport::default(); + report.apply_stats( + 1, + Duration::from_millis(15), + Duration::from_millis(45), + Duration::ZERO, + Duration::ZERO, + ); + report.apply_stats( + 2, + Duration::from_millis(20), + Duration::ZERO, + Duration::from_millis(30), + Duration::ZERO, + ); let output = format!("{}", report); assert!(output.contains("Round 1")); From bd93f1f101e0b1bea56bfebdbe31cdcbef57a52c Mon Sep 17 00:00:00 2001 From: theroguevigilante Date: Sat, 18 Apr 2026 14:48:19 +0530 Subject: [PATCH 5/6] feat(profiler): thread-safe event stream with survival handle - Switch to `Arc>>` for thread-safe shared ownership - Implement `PerfProfilerHandle` to allow reporting after profiler drop - Update `PerfProfiler::new` to return `(Profiler, Handle)` - Add comprehensive Random Beacon simulation test in `perf_test.rs` - Verify timing accuracy for multi-round computation and I/O - Ensure `Mpc` setup and `MpcExecution` phases share the same event log Signed-off-by: theroguevigilante --- round-based/src/mpc/profiler/profiling.rs | 120 ++++++++++-- round-based/src/mpc/profiler/wrapper.rs | 202 ++++++++----------- round-based/tests/perf_test.rs | 229 +++++++++++----------- 3 files changed, 306 insertions(+), 245 deletions(-) diff --git a/round-based/src/mpc/profiler/profiling.rs b/round-based/src/mpc/profiler/profiling.rs index cc40239..2776490 100644 --- a/round-based/src/mpc/profiler/profiling.rs +++ b/round-based/src/mpc/profiler/profiling.rs @@ -1,7 +1,63 @@ use std::fmt; -use std::time::Duration; +use std::time::{Duration, Instant}; use std::vec::Vec; +/// An event captured during MPC execution. +#[derive(Debug, Clone)] +pub enum Event { + /// Sent a message. + SendMsg { + /// Number of the round. + round: u16, + /// Time when `.send().await` was called. + started: Instant, + /// Time when `.send().await` has returned. + finished: Instant, + }, + /// Received messages (completed a round). + RecvMsgs { + /// Number of the round. + round: u16, + /// Time when `.complete().await` was called. + started: Instant, + /// Time when `.complete().await` has returned. + finished: Instant, + }, + /// Yielded to the scheduler. + Yielded { + /// Time when `.yield_now().await` was called. + started: Instant, + /// Time when `.yield_now().await` has returned. + finished: Instant, + }, +} + +impl Event { + /// Returns the round number associated with the event. + pub fn round(&self) -> u16 { + match self { + Event::SendMsg { round, .. } => *round, + Event::RecvMsgs { round, .. } => *round, + Event::Yielded { .. } => 0, // Global + } + } + + /// Returns the duration of the event. + pub fn duration(&self) -> Duration { + match self { + Event::SendMsg { + started, finished, .. + } => *finished - *started, + Event::RecvMsgs { + started, finished, .. + } => *finished - *started, + Event::Yielded { + started, finished, .. + } => *finished - *started, + } + } +} + /// Statistics for a single round of an MPC protocol. #[derive(Debug, Clone, Default)] pub struct RoundStats { @@ -25,11 +81,41 @@ pub struct PerfReport { } impl PerfReport { - /// Applies new statistics to the report. - /// - /// If an entry for the same round already exists, the statistics are added to it. - /// Otherwise, a new entry is created. - pub fn apply_stats( + /// Builds a report from a sequence of events. + pub fn from_events(start_time: Instant, end_time: Instant, events: Vec) -> Self { + let mut report = Self::default(); + let mut last_finished = start_time; + + for event in events { + // Computation is the gap since the last event finished + let computation = event.started().duration_since(last_finished); + let round = event.round() as usize; + + let (sent, recv, yielded) = match &event { + Event::SendMsg { .. } => (event.duration(), Duration::ZERO, Duration::ZERO), + Event::RecvMsgs { .. } => (Duration::ZERO, event.duration(), Duration::ZERO), + Event::Yielded { .. } => (Duration::ZERO, Duration::ZERO, event.duration()), + }; + + report.apply_stats(round, computation, sent, recv, yielded); + last_finished = event.finished(); + } + + // Add trailing computation + if end_time > last_finished { + report.apply_stats( + 0, + end_time - last_finished, + Duration::ZERO, + Duration::ZERO, + Duration::ZERO, + ); + } + + report + } + + fn apply_stats( &mut self, round: usize, computation: Duration, @@ -57,33 +143,45 @@ impl PerfReport { pub fn total_computation(&self) -> Duration { self.rounds.iter().map(|r| r.computation_time).sum() } - /// Calculates the total I/O time spent on sending across all rounds. pub fn total_sent_io(&self) -> Duration { self.rounds.iter().map(|r| r.sent_io_time).sum() } - /// Calculates the total I/O time spent on receiving across all rounds. pub fn total_recv_io(&self) -> Duration { self.rounds.iter().map(|r| r.recv_io_time).sum() } - /// Calculates the total I/O time spent on yielding across all rounds. pub fn total_yield(&self) -> Duration { self.rounds.iter().map(|r| r.yield_time).sum() } - /// Calculates the total I/O time across all rounds (send + recv + yield). pub fn total_io(&self) -> Duration { self.total_sent_io() + self.total_recv_io() + self.total_yield() } - /// Calculates the total execution time (computation + I/O). pub fn total_time(&self) -> Duration { self.total_computation() + self.total_io() } } +impl Event { + fn started(&self) -> Instant { + match self { + Event::SendMsg { started, .. } => *started, + Event::RecvMsgs { started, .. } => *started, + Event::Yielded { started, .. } => *started, + } + } + fn finished(&self) -> Instant { + match self { + Event::SendMsg { finished, .. } => *finished, + Event::RecvMsgs { finished, .. } => *finished, + Event::Yielded { finished, .. } => *finished, + } + } +} + impl fmt::Display for PerfReport { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { writeln!(f, "=== MPC Performance Report ===")?; diff --git a/round-based/src/mpc/profiler/wrapper.rs b/round-based/src/mpc/profiler/wrapper.rs index b032991..5434143 100644 --- a/round-based/src/mpc/profiler/wrapper.rs +++ b/round-based/src/mpc/profiler/wrapper.rs @@ -1,5 +1,6 @@ -use core::cell::{Cell, RefCell}; -use std::time::{Duration, Instant}; +use std::sync::{Arc, Mutex}; +use std::time::Instant; +use std::vec::Vec; use crate::{ Mpc, MpcExecution, Outgoing, ProtocolMsg, RoundMsg, @@ -7,41 +8,54 @@ use crate::{ round::{RoundInfo, RoundStore}, }; -use super::profiling::PerfReport; +use super::profiling::{Event, PerfReport}; + +/// A handle to the performance profiler that can be used to generate a report +/// even after the profiler itself has been consumed. +#[derive(Clone)] +pub struct PerfProfilerHandle { + events: Arc>>, + start_time: Instant, +} + +impl PerfProfilerHandle { + /// Consumes the handle and returns the performance report. + pub fn into_report(self) -> PerfReport { + let end_time = Instant::now(); + let events = self.events.lock().unwrap().clone(); + PerfReport::from_events(self.start_time, end_time, events) + } +} /// A wrapper around an MPC engine or execution that measures performance. /// -/// It measures computation time (time between MPC calls) and I/O time (time spent inside MPC calls). +/// It stores a sequence of events (I/O and Yield) and uses them to calculate performance stats. pub struct PerfProfiler { inner: M, - report: RefCell, - last_resume: Cell, + events: Arc>>, + start_time: Instant, } impl PerfProfiler { - /// Creates a new performance profiler. - pub fn new(inner: M) -> Self { - Self { - inner, - report: RefCell::new(PerfReport::default()), - last_resume: Cell::new(Instant::now()), - } + /// Creates a new performance profiler and a handle to retrieve the report. + pub fn new(inner: M) -> (Self, PerfProfilerHandle) { + let start_time = Instant::now(); + let events = Arc::new(Mutex::new(Vec::new())); + ( + Self { + inner, + events: events.clone(), + start_time, + }, + PerfProfilerHandle { events, start_time }, + ) } /// Consumes the profiler and returns the performance report. pub fn into_report(self) -> PerfReport { - let elapsed = self.last_resume.get().elapsed(); - if elapsed != Duration::ZERO { - // Attribute trailing time to round 0 (Global/Teardown) - self.report.borrow_mut().apply_stats( - 0, - elapsed, - Duration::ZERO, - Duration::ZERO, - Duration::ZERO, - ); - } - self.report.into_inner() + let end_time = Instant::now(); + let events = self.events.lock().unwrap().clone(); + PerfReport::from_events(self.start_time, end_time, events) } /// Returns a reference to the inner MPC execution. @@ -58,19 +72,6 @@ impl PerfProfiler { pub fn into_inner(self) -> M { self.inner } - - fn update_report( - &self, - round: usize, - comp_time: Duration, - sent_io: Duration, - recv_io: Duration, - yield_time: Duration, - ) { - self.report - .borrow_mut() - .apply_stats(round, comp_time, sent_io, recv_io, yield_time); - } } impl Mpc for PerfProfiler @@ -86,32 +87,14 @@ where R: RoundStore, Self::Msg: RoundMsg, { - let elapsed = self.last_resume.get().elapsed(); - let round_idx = >::ROUND as usize; - self.update_report( - round_idx, - elapsed, - Duration::ZERO, - Duration::ZERO, - Duration::ZERO, - ); - - let res = self.inner.add_round(round); - self.last_resume.set(Instant::now()); - res + self.inner.add_round(round) } fn finish_setup(self) -> Self::Exec { - let elapsed = self.last_resume.get().elapsed(); - if elapsed != Duration::ZERO { - // Attribute setup completion time to round 0 - self.update_report(0, elapsed, Duration::ZERO, Duration::ZERO, Duration::ZERO); - } - PerfProfiler { inner: self.inner.finish_setup(), - report: self.report, - last_resume: Cell::new(Instant::now()), + events: self.events, + start_time: self.start_time, } } } @@ -134,71 +117,60 @@ where R: RoundInfo, Self::Msg: RoundMsg, { - let comp_time = self.last_resume.get().elapsed(); - - let io_start = Instant::now(); + let started = Instant::now(); let result = self.inner.complete(round).await; - let io_time = io_start.elapsed(); - - let round_idx = >::ROUND as usize; - self.update_report( - round_idx, - comp_time, - Duration::ZERO, - io_time, - Duration::ZERO, - ); - - self.last_resume.set(Instant::now()); + let finished = Instant::now(); + + let round_idx = >::ROUND; + self.events.lock().unwrap().push(Event::RecvMsgs { + round: round_idx, + started, + finished, + }); + result } async fn send(&mut self, msg: Outgoing) -> Result<(), Self::SendErr> { - let comp_time = self.last_resume.get().elapsed(); - let round_idx = msg.msg.round() as usize; - - let io_start = Instant::now(); + let round_idx = msg.msg.round(); + let started = Instant::now(); let result = self.inner.send(msg).await; - let io_time = io_start.elapsed(); + let finished = Instant::now(); - self.update_report( - round_idx, - comp_time, - io_time, - Duration::ZERO, - Duration::ZERO, - ); + self.events.lock().unwrap().push(Event::SendMsg { + round: round_idx, + started, + finished, + }); - self.last_resume.set(Instant::now()); result } fn send_many(self) -> Self::SendMany { ProfilerSendMany { inner: self.inner.send_many(), - report: self.report, - last_resume: self.last_resume, + events: self.events, + start_time: self.start_time, } } async fn yield_now(&self) { - let comp_time = self.last_resume.get().elapsed(); - - let start = Instant::now(); + let started = Instant::now(); self.inner.yield_now().await; - let yield_time = start.elapsed(); + let finished = Instant::now(); - // Attribute yield to round 0 - self.update_report(0, comp_time, Duration::ZERO, Duration::ZERO, yield_time); - self.last_resume.set(Instant::now()); + self.events + .lock() + .unwrap() + .push(Event::Yielded { started, finished }); } } /// A wrapper around [`SendMany`] that measures performance. pub struct ProfilerSendMany { inner: S, - report: RefCell, - last_resume: Cell, + events: Arc>>, + start_time: Instant, } impl SendMany for ProfilerSendMany @@ -210,41 +182,27 @@ where type SendErr = S::SendErr; async fn send(&mut self, msg: Outgoing) -> Result<(), S::SendErr> { - let comp_time = self.last_resume.get().elapsed(); - let round_idx = msg.msg.round() as usize; - - let io_start = Instant::now(); + let round_idx = msg.msg.round(); + let started = Instant::now(); let result = self.inner.send(msg).await; - let io_time = io_start.elapsed(); + let finished = Instant::now(); - self.report.borrow_mut().apply_stats( - round_idx, - comp_time, - io_time, - Duration::ZERO, - Duration::ZERO, - ); + self.events.lock().unwrap().push(Event::SendMsg { + round: round_idx, + started, + finished, + }); - self.last_resume.set(Instant::now()); result } async fn flush(self) -> Result { - let comp_time = self.last_resume.get().elapsed(); - - let io_start = Instant::now(); let result = self.inner.flush().await; - let io_time = io_start.elapsed(); - - // Attribute flush to round 0 - self.report - .borrow_mut() - .apply_stats(0, comp_time, io_time, Duration::ZERO, Duration::ZERO); Ok(PerfProfiler { inner: result?, - report: self.report, - last_resume: Cell::new(Instant::now()), + events: self.events, + start_time: self.start_time, }) } } diff --git a/round-based/tests/perf_test.rs b/round-based/tests/perf_test.rs index 1ee1a73..945e3ea 100644 --- a/round-based/tests/perf_test.rs +++ b/round-based/tests/perf_test.rs @@ -1,58 +1,94 @@ #[cfg(feature = "perf-profiler")] mod tests { + use core::cell::RefCell; use round_based::{ - Mpc, MpcExecution, Outgoing, ProtocolMsg, RoundMsg, mpc::profiler::profiling::PerfReport, - mpc::profiler::stats, mpc::profiler::wrapper::PerfProfiler, + Mpc, MpcExecution, Outgoing, ProtocolMsg, RoundMsg, mpc::profiler::wrapper::PerfProfiler, + round::RoundInfo, }; use std::time::Duration; - struct MockMpc; + #[derive(Debug, Clone)] + struct ManualEvent; + struct MockMpc { + manual_events: RefCell>, + } + + /// Random Beacon Messages #[derive(Clone, Debug)] - enum MockMsg { - Round1(()), + enum RandomBeaconMsg { + Commit([u8; 32]), // Round 1 + Decommit, // Round 2 } - impl ProtocolMsg for MockMsg { + impl ProtocolMsg for RandomBeaconMsg { fn round(&self) -> u16 { match self { - MockMsg::Round1(_) => 1, + RandomBeaconMsg::Commit(_) => 1, + RandomBeaconMsg::Decommit => 2, } } } - impl RoundMsg<()> for MockMsg { + // Round 1 Marker + struct Round1; + impl RoundMsg<[u8; 32]> for RandomBeaconMsg { const ROUND: u16 = 1; - fn to_protocol_msg(m: ()) -> Self { - MockMsg::Round1(m) + fn to_protocol_msg(m: [u8; 32]) -> Self { + RandomBeaconMsg::Commit(m) + } + fn from_protocol_msg(msg: Self) -> Result<[u8; 32], Self> { + match msg { + RandomBeaconMsg::Commit(m) => Ok(m), + _ => Err(msg), + } } - fn from_protocol_msg(protocol_msg: Self) -> Result<(), Self> { - match protocol_msg { - MockMsg::Round1(m) => Ok(m), + } + + // Round 2 Marker + struct Round2; + impl RoundMsg for RandomBeaconMsg { + const ROUND: u16 = 2; + fn to_protocol_msg(_m: u64) -> Self { + RandomBeaconMsg::Decommit + } + fn from_protocol_msg(msg: Self) -> Result { + match msg { + RandomBeaconMsg::Decommit => Ok(0), + _ => Err(msg), } } } + impl RoundInfo for Round1 { + type Msg = [u8; 32]; + type Output = Vec<[u8; 32]>; + type Error = core::convert::Infallible; + } + impl RoundInfo for Round2 { + type Msg = u64; + type Output = Vec; + type Error = core::convert::Infallible; + } + impl Mpc for MockMpc { - type Msg = MockMsg; + type Msg = RandomBeaconMsg; type Exec = MockMpc; type SendErr = core::convert::Infallible; - fn add_round(&mut self, _round: R) -> ::Round where R: round_based::round::RoundStore, Self::Msg: RoundMsg, { } - fn finish_setup(self) -> Self::Exec { self } } impl MpcExecution for MockMpc { - type Round = (); - type Msg = MockMsg; + type Round = (); + type Msg = RandomBeaconMsg; type CompleteRoundErr = core::convert::Infallible; type SendErr = core::convert::Infallible; type SendMany = MockSendMany; @@ -62,135 +98,104 @@ mod tests { _round: Self::Round, ) -> Result> where - R: round_based::round::RoundInfo, + R: RoundInfo, Self::Msg: RoundMsg, { - tokio::time::sleep(Duration::from_millis(100)).await; - unreachable!() + tokio::time::sleep(Duration::from_millis(40)).await; + self.manual_events.borrow_mut().push(ManualEvent); + + let res = Vec::::new(); + let ptr = Box::into_raw(Box::new(res)); + Ok(unsafe { *Box::from_raw(ptr as *mut R::Output) }) } async fn send(&mut self, _msg: Outgoing) -> Result<(), Self::SendErr> { - tokio::time::sleep(Duration::from_millis(100)).await; + tokio::time::sleep(Duration::from_millis(20)).await; + self.manual_events.borrow_mut().push(ManualEvent); Ok(()) } fn send_many(self) -> Self::SendMany { - MockSendMany + MockSendMany { + manual_events: self.manual_events, + } } async fn yield_now(&self) { tokio::time::sleep(Duration::from_millis(10)).await; + self.manual_events.borrow_mut().push(ManualEvent); } } - struct MockSendMany; + struct MockSendMany { + manual_events: RefCell>, + } impl round_based::mpc::SendMany for MockSendMany { type Exec = MockMpc; - type Msg = MockMsg; + type Msg = RandomBeaconMsg; type SendErr = core::convert::Infallible; - async fn send(&mut self, _msg: Outgoing) -> Result<(), Self::SendErr> { - tokio::time::sleep(Duration::from_millis(100)).await; + tokio::time::sleep(Duration::from_millis(20)).await; + self.manual_events.borrow_mut().push(ManualEvent); Ok(()) } - async fn flush(self) -> Result { - Ok(MockMpc) + Ok(MockMpc { + manual_events: self.manual_events, + }) } } - impl MockMpc { - fn simulate_computation(&self) { - // Simulate "Pure Computation" - std::thread::sleep(Duration::from_millis(50)); - } - } + /// Random Beacon Example + async fn run_random_beacon(mut mpc: PerfProfiler) -> [u8; 32] { + // --- Round 1: Commit --- + tokio::time::sleep(Duration::from_millis(10)).await; + mpc.send(Outgoing::all_parties(RandomBeaconMsg::Commit([1u8; 32]))) + .await + .ok(); + let _hashes = mpc.complete::(()).await.expect("round 1"); - #[tokio::test] - async fn test_profiler_captures_correct_times() { - let inner = MockMpc; - let mut profiler = PerfProfiler::new(inner); + // --- Round 2: Reveal --- + tokio::time::sleep(Duration::from_millis(5)).await; + mpc.send(Outgoing::all_parties(RandomBeaconMsg::Decommit)) + .await + .ok(); + let _numbers = mpc.complete::(()).await.expect("round 2"); - // --- ROUND 1 --- - // 1. Computation happens - profiler.get_ref().simulate_computation(); + // --- Final: XOR --- + tokio::time::sleep(Duration::from_millis(5)).await; - // 2. I/O happens via send - profiler - .send(Outgoing::all_parties(MockMsg::Round1(()))) - .await - .unwrap(); - - let report = profiler.into_report(); - - // Check if computation is at least 50ms - assert!( - report.total_computation() >= Duration::from_millis(50), - "Computation time was {:?}", - report.total_computation() - ); - // Check if I/O is at least 100ms - assert!( - report.total_sent_io() >= Duration::from_millis(100), - "Sent IO time was {:?}", - report.total_sent_io() - ); + // --- Yield: Let others run --- + mpc.yield_now().await; - println!("{}", report); + [0u8; 32] } - #[test] - fn test_statistical_analysis() { - // Create dummy reports to test the math - let mut reports = Vec::new(); - for i in 1..=10 { - let mut report = PerfReport::default(); - report.apply_stats( - 1, - Duration::from_millis(i * 10), // 10, 20, ... 100 - Duration::from_millis(50), - Duration::ZERO, - Duration::ZERO, - ); - reports.push(report); - } + #[tokio::test] + async fn test_profiler_random_beacon() { + let inner = MockMpc { + manual_events: RefCell::new(Vec::new()), + }; + let (profiler, handle) = PerfProfiler::new(inner); - // Capture total times - let total_times: Vec = reports.iter().map(|r| r.total_time()).collect(); - let analysis = stats::analyze_durations("Batch Execution", total_times); + let _result = run_random_beacon(profiler).await; - // Verification - assert_eq!(analysis.metric_name, "Batch Execution"); - assert!(analysis.p50 >= Duration::from_millis(50)); - assert!(analysis.p90 >= Duration::from_millis(90)); + let report = handle.into_report(); + println!("{}", report); - // Test the Display for stats - let stats_output = format!("{}", analysis); - assert!(stats_output.contains("Mean")); - println!("{}", stats_output); - } + let r1 = report.rounds.iter().find(|r| r.round == 1).unwrap(); + let r2 = report.rounds.iter().find(|r| r.round == 2).unwrap(); + let r0 = report.rounds.iter().find(|r| r.round == 0).unwrap(); + + assert!(r1.computation_time >= Duration::from_millis(10)); + assert!(r1.sent_io_time >= Duration::from_millis(20)); + assert!(r1.recv_io_time >= Duration::from_millis(40)); + + assert!(r2.computation_time >= Duration::from_millis(5)); + assert!(r2.sent_io_time >= Duration::from_millis(20)); + assert!(r2.recv_io_time >= Duration::from_millis(40)); - #[test] - fn test_report_display_formatting() { - let mut report = PerfReport::default(); - report.apply_stats( - 1, - Duration::from_millis(15), - Duration::from_millis(45), - Duration::ZERO, - Duration::ZERO, - ); - report.apply_stats( - 2, - Duration::from_millis(20), - Duration::ZERO, - Duration::from_millis(30), - Duration::ZERO, - ); - - let output = format!("{}", report); - assert!(output.contains("Round 1")); - assert!(output.contains("Round 2")); - assert!(output.contains("Total Time")); + // Yield Check + assert!(r0.yield_time >= Duration::from_millis(10)); } } From 7ab4357aad25abdfe1990b14f41214003c5e6ee6 Mon Sep 17 00:00:00 2001 From: theroguevigilante Date: Sat, 18 Apr 2026 21:59:55 +0530 Subject: [PATCH 6/6] refactor(stats): analytics for all the metrics Signed-off-by: theroguevigilante --- round-based/src/mpc/profiler/stats.rs | 35 ++++++++++++++++++++++----- round-based/tests/perf_test.rs | 27 ++++++++++++++++++--- 2 files changed, 52 insertions(+), 10 deletions(-) diff --git a/round-based/src/mpc/profiler/stats.rs b/round-based/src/mpc/profiler/stats.rs index c1e2bf6..63ca99b 100644 --- a/round-based/src/mpc/profiler/stats.rs +++ b/round-based/src/mpc/profiler/stats.rs @@ -28,7 +28,7 @@ impl std::fmt::Display for AggregatedStats { // Table-like formatting: {:) -> Aggregated } } -/// Helper to consume multiple reports and print aggregated analytics +/// Helper to consume multiple reports and print aggregated analytics for all metrics. pub fn analyze_reports(reports: &[PerfReport]) { - // Here you would extract vectors of total_computation, total_io, etc. - // across all reports and pass them to `analyze_durations`. + if reports.is_empty() { + println!("No reports to analyze."); + return; + } let mut total_times = Vec::with_capacity(reports.len()); + let mut comp_times = Vec::with_capacity(reports.len()); + let mut sent_io_times = Vec::with_capacity(reports.len()); + let mut recv_io_times = Vec::with_capacity(reports.len()); + let mut yield_times = Vec::with_capacity(reports.len()); + for report in reports { total_times.push(report.total_time()); + comp_times.push(report.total_computation()); + sent_io_times.push(report.total_sent_io()); + recv_io_times.push(report.total_recv_io()); + yield_times.push(report.total_yield()); } - let stats = analyze_durations("Total Execution Time", total_times); - println!("{}", stats); + println!("\n=== MPC Execution Analytics ({} runs) ===", reports.len()); + println!( + "{:<20} | {:<12} | {:<12} | {:<12} | {:<12} | {:<12}", + "Metric", "Mean", "Std Dev", "p50", "p75", "p90" + ); + println!("{}", "-".repeat(90)); + println!("{}", analyze_durations("Total Time", total_times)); + println!("{}", analyze_durations("Computation", comp_times)); + println!("{}", analyze_durations("Sent I/O", sent_io_times)); + println!("{}", analyze_durations("Recv I/O", recv_io_times)); + println!("{}", analyze_durations("Yield", yield_times)); + println!( + "========================================================================================\n" + ); } diff --git a/round-based/tests/perf_test.rs b/round-based/tests/perf_test.rs index 945e3ea..a46b656 100644 --- a/round-based/tests/perf_test.rs +++ b/round-based/tests/perf_test.rs @@ -2,8 +2,8 @@ mod tests { use core::cell::RefCell; use round_based::{ - Mpc, MpcExecution, Outgoing, ProtocolMsg, RoundMsg, mpc::profiler::wrapper::PerfProfiler, - round::RoundInfo, + Mpc, MpcExecution, Outgoing, ProtocolMsg, RoundMsg, mpc::SendMany, mpc::profiler::stats, + mpc::profiler::wrapper::PerfProfiler, round::RoundInfo, }; use std::time::Duration; @@ -117,7 +117,7 @@ mod tests { fn send_many(self) -> Self::SendMany { MockSendMany { - manual_events: self.manual_events, + manual_events: self.manual_events.clone(), } } @@ -130,7 +130,7 @@ mod tests { struct MockSendMany { manual_events: RefCell>, } - impl round_based::mpc::SendMany for MockSendMany { + impl SendMany for MockSendMany { type Exec = MockMpc; type Msg = RandomBeaconMsg; type SendErr = core::convert::Infallible; @@ -198,4 +198,23 @@ mod tests { // Yield Check assert!(r0.yield_time >= Duration::from_millis(10)); } + + #[tokio::test] + async fn test_profiler_statistics() { + let mut reports = Vec::new(); + + // Run the protocol 5 times and collect reports + for _ in 0..5 { + let inner = MockMpc { + manual_events: RefCell::new(Vec::new()), + }; + let (profiler, handle) = PerfProfiler::new(inner); + + let _result = run_random_beacon(profiler).await; + reports.push(handle.into_report()); + } + + // Analyze reports and print aggregated analytics + stats::analyze_reports(&reports); + } }