Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 117 additions & 0 deletions crates/misc/component-async-tests/tests/scenario/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,123 @@ pub async fn async_closed_stream() -> Result<()> {
.await?
}

mod host_consumer_drop {
wasmtime::component::bindgen!({
path: "wit",
world: "host-consumer-drop-guest",
exports: { default: store | async },
});
}

// Regression test: a host *consumer* registered via `StreamReader::pipe` must be
// finalized when the guest drops the writable end *after* the consumer is
// attached. The guest hands the host the readable end, keeps the writable end,
// writes one byte once the consumer reads, then drops the writer. That reaches
// `host_drop_writer` with the read side in `ReadState::HostReady`, which must
// reclaim the transmit rather than leaving it stranded.
#[tokio::test]
pub async fn async_host_consumer_drop() -> Result<()> {
let engine = Engine::new(&config())?;

let component = make_component(
&engine,
&[test_programs_artifacts::ASYNC_HOST_CONSUMER_DROP_COMPONENT],
)
.await?;

let mut linker = Linker::new(&engine);

wasmtime_wasi::p2::add_to_linker_async(&mut linker)?;

let mut store = Store::new(
&engine,
Ctx {
wasi: WasiCtxBuilder::new().inherit_stdio().build(),
table: ResourceTable::default(),
continue_: false,
},
);

let instance = linker.instantiate_async(&mut store, &component).await?;
let guest = host_consumer_drop::HostConsumerDropGuest::new(&mut store, &instance)?;
store
.run_concurrent(async move |accessor| {
let stream = guest
.local_local_host_consumer_drop()
.call_get(accessor)
.await?;

let (tx, mut rx) = mpsc::channel(1);
accessor.with(move |store| stream.pipe(store, PipeConsumer::new(tx)))?;
assert_eq!(rx.next().await, Some(42));
assert!(rx.next().await.is_none());

wasmtime::error::Ok(())
})
.await??;

// The host consumer and both transmit handles must be gone now that the
// guest dropped its end.
store.assert_concurrent_state_empty();

Ok(())
}

// Regression test: the symmetric host *producer* case. The host hands the guest
// a `future` via `FutureReader::new` and the guest drops the read end without
// reading it. That reaches `host_drop_reader` with the write side in
// `WriteState::HostReady`, which must reclaim the transmit. The guest's
// `read_future` reads `rx` but drops `rx_ignored`.
#[tokio::test]
pub async fn async_host_producer_drop() -> Result<()> {
let engine = Engine::new(&config())?;

let component = make_component(
&engine,
&[test_programs_artifacts::ASYNC_CLOSED_STREAMS_COMPONENT],
)
.await?;

let mut linker = Linker::new(&engine);

wasmtime_wasi::p2::add_to_linker_async(&mut linker)?;

let mut store = Store::new(
&engine,
Ctx {
wasi: WasiCtxBuilder::new().inherit_stdio().build(),
table: ResourceTable::default(),
continue_: false,
},
);

let instance = linker.instantiate_async(&mut store, &component).await?;

let value = 42_u8;
let (tx, rx) = oneshot::channel();
let rx = FutureReader::new(&mut store, OneshotProducer::new(rx))?;
let (_, rx_ignored) = oneshot::channel();
let rx_ignored = FutureReader::new(&mut store, OneshotProducer::new(rx_ignored))?;

let closed_streams = closed_streams::bindings::ClosedStreams::new(&mut store, &instance)?;

store
.run_concurrent(async move |accessor| {
_ = tx.send(value);
closed_streams
.local_local_closed()
.call_read_future(accessor, rx, value, rx_ignored)
.await
})
.await??;

// The host producer behind `rx_ignored` and both transmit handles must be
// gone now that the guest dropped the read end without reading.
store.assert_concurrent_state_empty();

Ok(())
}

#[tokio::test]
pub async fn async_cross_instance_source() -> Result<()> {
let engine = Engine::new(&config())?;
Expand Down
3 changes: 2 additions & 1 deletion crates/misc/component-async-tests/tests/test_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ use scenario::round_trip_many::{
async_round_trip_many_synchronous, async_round_trip_many_wait,
};
use scenario::streams::{
async_closed_stream, async_closed_streams, async_cross_instance_source, async_short_reads,
async_closed_stream, async_closed_streams, async_cross_instance_source,
async_host_consumer_drop, async_short_reads,
};
use scenario::transmit::{
async_cancel_callee, async_cancel_caller, async_cancel_transmit, async_intertask_communication,
Expand Down
12 changes: 12 additions & 0 deletions crates/misc/component-async-tests/wit/test.wit
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,14 @@ interface closed-stream {
get: func() -> stream<u8>;
}

interface host-consumer-drop {
// Returns the readable end of a fresh stream while the guest keeps the
// writable end. Once a consumer is attached the guest writes one byte and
// then drops the writer, so the host consumer observes a clean close while
// its read side is still `HostReady`.
get: async func() -> stream<u8>;
}

interface short-reads {
resource thing {
constructor(s: string);
Expand Down Expand Up @@ -369,6 +377,10 @@ world closed-stream-guest {
export closed-stream;
}

world host-consumer-drop-guest {
export host-consumer-drop;
}

world short-reads-guest {
export short-reads;
}
32 changes: 32 additions & 0 deletions crates/test-programs/src/bin/async_host_consumer_drop.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
mod bindings {
wit_bindgen::generate!({
path: "../misc/component-async-tests/wit",
world: "host-consumer-drop-guest",
async: true,
});

use super::Component;
export!(Component);
}

use {bindings::exports::local::local::host_consumer_drop::Guest, wit_bindgen::StreamReader};

struct Component;

impl Guest for Component {
async fn get() -> StreamReader<u8> {
let (mut tx, rx) = bindings::wit_stream::new();
// Keep the writable end and hand the readable end to the host. The host
// attaches a consumer (read side -> `HostReady`); the write below blocks
// until that consumer reads, after which we drop the writer. Dropping it
// while the consumer is still `HostReady` is the path that used to leak.
wit_bindgen::spawn(async move {
assert!(tx.write_one(42).await.is_none());
drop(tx);
});
rx
}
}

// Unused function; required since this file is built as a `bin`:
fn main() {}
Original file line number Diff line number Diff line change
Expand Up @@ -2471,7 +2471,18 @@ impl StoreOpaque {
)?;
}

WriteState::HostReady { .. } => {}
WriteState::HostReady { .. } => {
// A host producer (e.g. one installed via `FutureReader::new`)
// is only driven when a reader pulls from it; it is never
// re-polled to observe the guest dropping the read end. The read
// end is already `Dropped` (set at the top of this function), so
// the produced value can never be consumed. Reclaim the transmit
// (state + both handles) here; otherwise it would leak for the
// lifetime of the instance. The producer is dropped along with
// the matched `HostReady` value.
log::trace!("host_drop_reader: finalize host producer, delete {transmit_id:?}");
state.delete_transmit(transmit_id)?;
}

WriteState::Open => {
state.update_event(
Expand Down Expand Up @@ -2570,7 +2581,23 @@ impl StoreOpaque {
)?;
}

ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {}
ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {
// A host consumer (e.g. one registered via `StreamReader::pipe`)
// is only driven on guest writes; it is never re-polled to
// observe the guest dropping the write end. Reclaim the transmit
// (state + both handles) so it does not leak. Unlike
// `host_drop_reader`, the write end is not forced to `Dropped`
// earlier in this function, so only finalize once the writer is
// actually gone -- otherwise we would discard a still-live host
// writer. The consumer is dropped along with the matched value.
if matches!(
self.concurrent_state_mut().get_mut(transmit_id)?.write,
WriteState::Dropped
) {
log::trace!("host_drop_writer: finalize host consumer, delete {transmit_id:?}");
self.concurrent_state_mut().delete_transmit(transmit_id)?;
}
Comment on lines +2585 to +2599
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I fully understand why this is conditional, can you explain this a bit more? For example if the write state is not dropped, meaning that this isn't executed, then I believe it's only possible to get here by flowing through the WriteState::HostReady, but that means that the host actually owns the writer. This function is only called when the guest drops the writer, so I'm not sure how that's possible.

Are there tests for this conditional branch? Or if this branch goes away and the delete_transmit unconditionally happens, what bad would happen? (e.g. could a test be written to exercise that?)

}

// If the read state is open, then there are no registered readers of the stream/future
ReadState::Open => {
Expand Down
Loading