From 5136ddd7d311ef856b76ad3bee63ab7bdcc5f7cf Mon Sep 17 00:00:00 2001 From: Linwei Shang Date: Fri, 15 May 2026 13:42:35 -0400 Subject: [PATCH 1/3] feat(sync-plugin): persist plugin stderr output after a sync step MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Plugin output was captured into in-memory pipes and only drained after exec() returned, so live progress never reached the rolling step view and warnings were lost when the step ended on success. Split the two streams: stdout stays transient (rolling view only), stderr is streamed live and also surfaced after the step closes — plugin authors pick persistence with their choice of print primitive. - Replace MemoryOutputPipe with a line-buffered capture that strips ANSI, try_send's complete lines to the rolling-view channel, and for stderr accumulates them into a Vec returned to the caller. Each stream is capped at 1 MiB with a single truncation note. - Drop the option payload on exec(); plugins now signal persistent output via stderr instead of a return value. - Print plugin stderr verbatim under the canister name in sync_many; no "Warning:" prefix — the plugin owns the wording. Co-Authored-By: Claude Opus 4.7 (1M context) --- Cargo.lock | 2 + Cargo.toml | 1 + crates/icp-cli/src/operations/sync.rs | 16 +- crates/icp-sync-plugin/Cargo.toml | 2 + crates/icp-sync-plugin/src/runtime.rs | 222 ++++++++++++++++-- crates/icp-sync-plugin/sync-plugin.wit | 18 +- .../tests/fixtures/test-plugin/src/lib.rs | 11 +- crates/icp/src/canister/sync/assets.rs | 4 +- crates/icp/src/canister/sync/mod.rs | 6 +- crates/icp/src/canister/sync/plugin.rs | 2 +- crates/icp/src/canister/sync/script.rs | 5 +- examples/icp-sync-plugin/plugin/src/lib.rs | 8 +- 12 files changed, 249 insertions(+), 48 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1ff5f2665..4ba6d2703 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3905,6 +3905,8 @@ dependencies = [ name = "icp-sync-plugin" version = "0.2.6" dependencies = [ + "async-trait", + "bytes", "camino", "candid", "console 0.16.3", diff --git a/Cargo.toml b/Cargo.toml index 799144480..b160378a2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ bigdecimal = "0.4.10" bip32 = "0.5.0" bollard = "0.20.2" byte-unit = "5.1.6" +bytes = "1.11" camino = { version = "1.1.9", features = ["serde1"] } cargo-generate = "0.23.7" camino-tempfile = "1" diff --git a/crates/icp-cli/src/operations/sync.rs b/crates/icp-cli/src/operations/sync.rs index 9431f7939..18c77efcc 100644 --- a/crates/icp-cli/src/operations/sync.rs +++ b/crates/icp-cli/src/operations/sync.rs @@ -41,8 +41,9 @@ async fn sync_canister( proxy: Option, pb: &mut MultiStepProgressBar, pkg_cache: &PackageCache, -) -> Result<(), SynchronizeError> { +) -> Result, SynchronizeError> { let step_count = canister_info.sync.steps.len(); + let mut stderr_lines = Vec::new(); for (i, step) in canister_info.sync.steps.iter().enumerate() { // Indicate to user the current step being executed @@ -72,10 +73,10 @@ async fn sync_canister( // Ensure background receiver drains all messages pb.end_step().await; - sync_result?; + stderr_lines.extend(sync_result?); } - Ok(()) + Ok(stderr_lines) } /// Orchestrates syncing multiple canisters with progress tracking @@ -129,6 +130,15 @@ pub(crate) async fn sync_many( ) .await; + // Print stderr lines the plugin emitted; the rolling buffer + // discards them on success, but they belong on the persistent + // output channel. + if let Ok(lines) = &result { + for line in lines { + eprintln!("[{}] {line}", canister_info.name); + } + } + // Map error to include canister context for deferred printing result.map_err(|error| SyncFailure { canister_name: canister_info.name.clone(), diff --git a/crates/icp-sync-plugin/Cargo.toml b/crates/icp-sync-plugin/Cargo.toml index 480fdc197..2f8ae325c 100644 --- a/crates/icp-sync-plugin/Cargo.toml +++ b/crates/icp-sync-plugin/Cargo.toml @@ -7,6 +7,8 @@ repository.workspace = true publish.workspace = true [dependencies] +async-trait.workspace = true +bytes.workspace = true camino.workspace = true candid.workspace = true console.workspace = true diff --git a/crates/icp-sync-plugin/src/runtime.rs b/crates/icp-sync-plugin/src/runtime.rs index 16674af22..1b96d9e15 100644 --- a/crates/icp-sync-plugin/src/runtime.rs +++ b/crates/icp-sync-plugin/src/runtime.rs @@ -1,6 +1,9 @@ // Host-side Component Model runtime for sync plugins. +use std::pin::Pin; use std::sync::Arc; +use std::sync::Mutex as StdMutex; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::task::{Context as TaskContext, Poll}; use std::time::{Duration, Instant}; const MAX_PLUGIN_OUTPUT: usize = 1024 * 1024; // 1 MiB per stream @@ -9,12 +12,15 @@ const MAX_WASM_STACK: usize = 512 * 1024; // How many seconds of pure wasm compute a plugin may use (host-call latency is excluded). const PLUGIN_COMPUTE_LIMIT_SECS: u64 = 60; +use bytes::Bytes; use camino::{Utf8Component, Utf8PathBuf}; use candid::{Encode, Principal}; use ic_agent::Agent; use snafu::prelude::*; +use tokio::io::{self, AsyncWrite}; use tokio::sync::mpsc::Sender; -use wasmtime_wasi::p2::pipe::MemoryOutputPipe; +use wasmtime_wasi::cli::{IsTerminal, StdoutStream}; +use wasmtime_wasi::p2::{OutputStream, Pollable, StreamError}; use wasmtime_wasi::{DirPerms, FilePerms}; wasmtime::component::bindgen!({ @@ -173,7 +179,7 @@ pub fn run_plugin( identity_principal: Principal, environment: String, stdio: Option>, -) -> Result<(), RunPluginError> { +) -> Result, RunPluginError> { use wasmtime::component::{Component, Linker}; use wasmtime::{Config, Engine, Store}; @@ -236,13 +242,12 @@ pub fn run_plugin( .context(PreopenDirSnafu { dir: host_path })?; } - let stdout_pipe = MemoryOutputPipe::new(MAX_PLUGIN_OUTPUT); - let stderr_pipe = MemoryOutputPipe::new(MAX_PLUGIN_OUTPUT); - if stdio.is_some() { - wasi_builder - .stdout(stdout_pipe.clone()) - .stderr(stderr_pipe.clone()); - } + let persistent_stderr: Arc>> = Arc::default(); + let stdout_capture = LineCapture::new("stdout", stdio.clone(), None); + let stderr_capture = LineCapture::new("stderr", stdio.clone(), Some(persistent_stderr.clone())); + wasi_builder + .stdout(stdout_capture.clone()) + .stderr(stderr_capture.clone()); let epoch_extension = Arc::new(AtomicU64::new(0)); let host_state = HostState { @@ -292,30 +297,172 @@ pub fn run_plugin( proxy_canister_id: proxy.map(|p| p.to_text()), }; - let result = plugin - .call_exec(&mut store, &input) - .context(CallExecSnafu { path: wasm_path })?; + let call_result = plugin.call_exec(&mut store, &input); + + // Flush any partial line and emit the truncation note (if any) before + // we hand control back, so the last line of plugin output isn't lost. + stdout_capture.finalize(); + stderr_capture.finalize(); + + match call_result.context(CallExecSnafu { path: wasm_path })? { + Ok(()) => {} + Err(message) => return PluginFailedSnafu { message }.fail(), + } + + let lines = std::mem::take(&mut *persistent_stderr.lock().unwrap()); + Ok(lines) +} + +// ------------------------------------------------------------------------- +// Plugin stdout/stderr capture +// ------------------------------------------------------------------------- +// +// `LineCapture` implements both `StdoutStream` (so it can be installed on a +// `WasiCtxBuilder`) and `OutputStream` / `AsyncWrite` (so the bytes written +// by the guest flow through the same code path). Each write is split on +// newlines; complete lines have ANSI escapes stripped and are pushed to the +// rolling-view `Sender` via `try_send` (best-effort). For stderr, +// the same lines are also appended to `persistent`, which is drained by +// `run_plugin()` after `exec()` returns. Total accepted bytes are capped at +// `MAX_PLUGIN_OUTPUT` per stream; further bytes are dropped and `finalize` +// emits a single "… N bytes of