Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,31 @@
<!--
Convention: changes to experimental features live in a dedicated
`## Experimental` subsection under each version. Experimental features
may receive breaking changes between releases without a major version
bump. Currently experimental: sync plugins.
-->

# Unreleased

* feat: `script` sync steps now receive `ICP_CLI_ENVIRONMENT`, `ICP_CLI_NETWORK`, `ICP_CLI_CID` (the current canister's principal), and `ICP_CLI_CID_<NAME>` (every canister's principal) as environment variables.
* fix: `icp canister call` with both `--json` and `-o hex` no longer prints both kinds of output at once.
* fix: `icp` no longer picks up a stale inherited `$PWD` when launched as a subprocess via `chdir(2)` + `execve` (e.g. from a test harness). The logical `$PWD` path is now validated against `getcwd()` by inode before use, preserving symlink-aware project root discovery while ignoring stale values.

## Experimental

* feat(sync-plugin): Plugins can now surface messages that persist after the step completes. Anything the plugin writes to stderr (e.g. `eprintln!` in Rust) is streamed live in the rolling step view AND printed under the canister name once the step ends; stdout remains transient. The `exec()` return signature has changed from `result<option<string>, string>` to `result<_, string>` — plugins that returned a summary string should `eprintln!` it instead.

# v0.2.6

* feat: `icp token/cycles balance` now accept `--of-principal`
* fix: The local wasm cache has moved from `.icp/cache/canisters/` to `.icp/cache/wasms/`. Existing cached files will be re-downloaded automatically on the next run.
* feat: Canister manifests now support a `plugin` sync step type. Plugins are WebAssembly components that run in a sandboxed environment and can drive arbitrary post-deployment logic against the canister being synced. See `crates/icp-sync-plugin/DESIGN.md` for details.
* feat: `icp sync` now accepts `--proxy` to route sync plugin calls to the target canister through a proxy canister.
* fix: `icp canister call` now serializes arguments built via the interactive Candid assist prompt against the method's declared signature, matching the behavior of arguments passed on the command line. Previously, narrower values (e.g. a variant case from a multi-case variant) were encoded with a type table inferred only from the value, which the target canister rejected with errors like "Variant index N larger than length 1".

## Experimental

* feat(sync-plugin): Canister manifests now support a `plugin` sync step type. Plugins are WebAssembly components that run in a sandboxed environment and can drive arbitrary post-deployment logic against the canister being synced. See `crates/icp-sync-plugin/DESIGN.md` for details.
* feat(sync-plugin): `icp sync` now accepts `--proxy` to route sync plugin calls to the target canister through a proxy canister.

# v0.2.5

* feat: `icp new --init` no longer requires specifying a project name. If non is provided, the containing folder's name is used as the project name
Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ bigdecimal = "0.4.10"
bip32 = "0.5.0"
bollard = "0.20.2"
byte-unit = "5.1.6"
bytes = "1.11"
camino = { version = "1.1.9", features = ["serde1"] }
cargo-generate = "0.23.7"
camino-tempfile = "1"
Expand Down
16 changes: 13 additions & 3 deletions crates/icp-cli/src/operations/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ async fn sync_canister(
proxy: Option<Principal>,
pb: &mut MultiStepProgressBar,
pkg_cache: &PackageCache,
) -> Result<(), SynchronizeError> {
) -> Result<Vec<String>, SynchronizeError> {
let step_count = canister_info.sync.steps.len();
let mut stderr_lines = Vec::new();

for (i, step) in canister_info.sync.steps.iter().enumerate() {
// Indicate to user the current step being executed
Expand Down Expand Up @@ -72,10 +73,10 @@ async fn sync_canister(
// Ensure background receiver drains all messages
pb.end_step().await;

sync_result?;
stderr_lines.extend(sync_result?);
}

Ok(())
Ok(stderr_lines)
}

/// Orchestrates syncing multiple canisters with progress tracking
Expand Down Expand Up @@ -129,6 +130,15 @@ pub(crate) async fn sync_many(
)
.await;

// Print stderr lines the plugin emitted; the rolling buffer
// discards them on success, but they belong on the persistent
// output channel.
if let Ok(lines) = &result {
for line in lines {
eprintln!("[{}] {line}", canister_info.name);
}
}

// Map error to include canister context for deferred printing
result.map_err(|error| SyncFailure {
canister_name: canister_info.name.clone(),
Expand Down
2 changes: 2 additions & 0 deletions crates/icp-sync-plugin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ repository.workspace = true
publish.workspace = true

[dependencies]
async-trait.workspace = true
bytes.workspace = true
camino.workspace = true
candid.workspace = true
console.workspace = true
Expand Down
222 changes: 198 additions & 24 deletions crates/icp-sync-plugin/src/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
// Host-side Component Model runtime for sync plugins.
use std::pin::Pin;
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::task::{Context as TaskContext, Poll};
use std::time::{Duration, Instant};

const MAX_PLUGIN_OUTPUT: usize = 1024 * 1024; // 1 MiB per stream
Expand All @@ -9,12 +12,15 @@ const MAX_WASM_STACK: usize = 512 * 1024;
// How many seconds of pure wasm compute a plugin may use (host-call latency is excluded).
const PLUGIN_COMPUTE_LIMIT_SECS: u64 = 60;

use bytes::Bytes;
use camino::{Utf8Component, Utf8PathBuf};
use candid::{Encode, Principal};
use ic_agent::Agent;
use snafu::prelude::*;
use tokio::io::{self, AsyncWrite};
use tokio::sync::mpsc::Sender;
use wasmtime_wasi::p2::pipe::MemoryOutputPipe;
use wasmtime_wasi::cli::{IsTerminal, StdoutStream};
use wasmtime_wasi::p2::{OutputStream, Pollable, StreamError};
use wasmtime_wasi::{DirPerms, FilePerms};

wasmtime::component::bindgen!({
Expand Down Expand Up @@ -173,7 +179,7 @@ pub fn run_plugin(
identity_principal: Principal,
environment: String,
stdio: Option<Sender<String>>,
) -> Result<(), RunPluginError> {
) -> Result<Vec<String>, RunPluginError> {
use wasmtime::component::{Component, Linker};
use wasmtime::{Config, Engine, Store};

Expand Down Expand Up @@ -236,13 +242,12 @@ pub fn run_plugin(
.context(PreopenDirSnafu { dir: host_path })?;
}

let stdout_pipe = MemoryOutputPipe::new(MAX_PLUGIN_OUTPUT);
let stderr_pipe = MemoryOutputPipe::new(MAX_PLUGIN_OUTPUT);
if stdio.is_some() {
wasi_builder
.stdout(stdout_pipe.clone())
.stderr(stderr_pipe.clone());
}
let persistent_stderr: Arc<StdMutex<Vec<String>>> = Arc::default();
let stdout_capture = LineCapture::new("stdout", stdio.clone(), None);
let stderr_capture = LineCapture::new("stderr", stdio.clone(), Some(persistent_stderr.clone()));
wasi_builder
.stdout(stdout_capture.clone())
.stderr(stderr_capture.clone());
Comment thread
lwshang marked this conversation as resolved.

let epoch_extension = Arc::new(AtomicU64::new(0));
let host_state = HostState {
Expand Down Expand Up @@ -292,30 +297,172 @@ pub fn run_plugin(
proxy_canister_id: proxy.map(|p| p.to_text()),
};

let result = plugin
.call_exec(&mut store, &input)
.context(CallExecSnafu { path: wasm_path })?;
let call_result = plugin.call_exec(&mut store, &input);

// Flush any partial line and emit the truncation note (if any) before
// we hand control back, so the last line of plugin output isn't lost.
stdout_capture.finalize();
stderr_capture.finalize();

match call_result.context(CallExecSnafu { path: wasm_path })? {
Ok(()) => {}
Err(message) => return PluginFailedSnafu { message }.fail(),
}

let lines = std::mem::take(&mut *persistent_stderr.lock().unwrap());
Ok(lines)
Comment thread
lwshang marked this conversation as resolved.
}

// -------------------------------------------------------------------------
// Plugin stdout/stderr capture
// -------------------------------------------------------------------------
//
// `LineCapture` implements both `StdoutStream` (so it can be installed on a
// `WasiCtxBuilder`) and `OutputStream` / `AsyncWrite` (so the bytes written
// by the guest flow through the same code path). Each write is split on
// newlines; complete lines have ANSI escapes stripped and are pushed to the
// rolling-view `Sender<String>` via `try_send` (best-effort). For stderr,
// the same lines are also appended to `persistent`, which is drained by
// `run_plugin()` after `exec()` returns. Total accepted bytes are capped at
// `MAX_PLUGIN_OUTPUT` per stream; further bytes are dropped and `finalize`
// emits a single "… N bytes of <label> truncated" line.

#[derive(Default)]
struct CaptureState {
/// Bytes seen since the last newline, awaiting more input or finalize.
partial: Vec<u8>,
/// Total bytes accepted (i.e. counted toward the cap).
bytes_written: usize,
/// Total bytes dropped after hitting the cap.
bytes_dropped: usize,
}

#[derive(Clone)]
struct LineCapture {
state: Arc<StdMutex<CaptureState>>,
label: &'static str,
forward: Option<Sender<String>>,
persistent: Option<Arc<StdMutex<Vec<String>>>>,
}

impl LineCapture {
fn new(
label: &'static str,
forward: Option<Sender<String>>,
persistent: Option<Arc<StdMutex<Vec<String>>>>,
) -> Self {
Self {
state: Arc::default(),
label,
forward,
persistent,
}
}

if let Some(tx) = &stdio {
for bytes in [stdout_pipe.contents(), stderr_pipe.contents()] {
if !bytes.is_empty() {
let s = console::strip_ansi_codes(&String::from_utf8_lossy(&bytes)).into_owned();
let _ = tx.blocking_send(s);
fn push_bytes(&self, buf: &[u8]) {
let mut to_emit: Vec<String> = Vec::new();
{
let mut st = self.state.lock().unwrap();
let remaining = MAX_PLUGIN_OUTPUT.saturating_sub(st.bytes_written);
let (accepted, dropped) = if buf.len() > remaining {
(&buf[..remaining], buf.len() - remaining)
} else {
(buf, 0)
};
st.bytes_written += accepted.len();
st.bytes_dropped += dropped;
st.partial.extend_from_slice(accepted);
while let Some(pos) = st.partial.iter().position(|&b| b == b'\n') {
let line: Vec<u8> = st.partial.drain(..=pos).collect();
let s = String::from_utf8_lossy(&line);
let trimmed = s.trim_end_matches('\n').trim_end_matches('\r');
to_emit.push(console::strip_ansi_codes(trimmed).into_owned());
}
}
for line in to_emit {
self.emit(line);
}
}

match result {
Ok(Some(msg)) => {
if let Some(tx) = &stdio {
let _ = tx.blocking_send(console::strip_ansi_codes(&msg).into_owned());
fn emit(&self, line: String) {
if let Some(tx) = &self.forward {
let _ = tx.try_send(line.clone());
}
if let Some(p) = &self.persistent {
p.lock().unwrap().push(line);
}
}
Comment thread
lwshang marked this conversation as resolved.

/// Flush any partial line and emit a single truncation note if we dropped
/// bytes past the cap. Called exactly once, after `exec()` returns.
fn finalize(&self) {
let (partial, dropped) = {
let mut st = self.state.lock().unwrap();
(std::mem::take(&mut st.partial), st.bytes_dropped)
};
if !partial.is_empty() {
let s = String::from_utf8_lossy(&partial);
let trimmed = s.trim_end_matches('\n').trim_end_matches('\r');
if !trimmed.is_empty() {
let line = console::strip_ansi_codes(trimmed).into_owned();
self.emit(line);
}
}
Ok(None) => {}
Err(message) => return PluginFailedSnafu { message }.fail(),
if dropped > 0 {
self.emit(format!("… {dropped} bytes of {} truncated", self.label));
}
}
}

impl IsTerminal for LineCapture {
fn is_terminal(&self) -> bool {
false
}
}

impl StdoutStream for LineCapture {
fn p2_stream(&self) -> Box<dyn OutputStream> {
Box::new(self.clone())
}
fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> {
Box::new(self.clone())
}
}

Ok(())
#[async_trait::async_trait]
impl Pollable for LineCapture {
async fn ready(&mut self) {}
}

#[async_trait::async_trait]
impl OutputStream for LineCapture {
fn write(&mut self, bytes: Bytes) -> Result<(), StreamError> {
self.push_bytes(&bytes);
Ok(())
}
fn flush(&mut self) -> Result<(), StreamError> {
Ok(())
}
fn check_write(&mut self) -> Result<usize, StreamError> {
Ok(usize::MAX)
}
}

impl AsyncWrite for LineCapture {
fn poll_write(
self: Pin<&mut Self>,
_cx: &mut TaskContext<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
self.push_bytes(buf);
Poll::Ready(Ok(buf.len()))
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut TaskContext<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut TaskContext<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
}

#[cfg(test)]
Expand Down Expand Up @@ -448,4 +595,31 @@ mod tests {
let msg = rx.try_recv().expect("expected stdout message on channel");
assert!(msg.contains("stdout from plugin"), "got: {msg}");
}

#[tokio::test(flavor = "multi_thread")]
async fn plugin_stderr_lines_returned_as_persistent_output() {
let Some(wasm_path) = option_env!("TEST_PLUGIN_WASM") else {
return;
};
let (tx, mut rx) = tokio::sync::mpsc::channel::<String>(16);
let result = tokio::task::block_in_place(|| {
run_plugin(
wasm_path.into(),
".".into(),
vec![],
vec![],
anon(),
dummy_agent(),
None,
anon(),
"hello".to_string(),
Some(tx),
)
});
let lines = result.expect("plugin should succeed");
assert_eq!(lines, vec!["hello".to_string()]);
// The same line is forwarded to the rolling-view channel.
let live = rx.try_recv().expect("expected stderr line on channel");
assert!(live.contains("hello"), "got: {live}");
}
}
Loading
Loading