Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 18 additions & 15 deletions crates/misc/component-async-tests/tests/scenario/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use {
wasmtime::{
Engine, Store,
component::{
GuardedFutureReader, GuardedStreamReader, GuardedStreamWriter, Linker, ResourceTable,
VecBuffer,
Accessor, GuardedFutureReader, GuardedStreamReader, GuardedStreamWriter, Linker,
ResourceTable, VecBuffer,
},
},
wasmtime_wasi::p2::WasiCtxBuilder,
Expand Down Expand Up @@ -49,15 +49,15 @@ pub async fn async_watch_streams() -> Result<()> {
let instance = linker.instantiate_async(&mut store, &component).await?;

// Test watching and then dropping the read end of a stream.
let (mut tx, rx) = instance.stream::<u8>(&mut store)?;
let (mut tx, mut rx) = instance.stream::<u8>(&mut store)?;
instance
.run_concurrent(&mut store, async |store| {
futures::join!(tx.watch_reader(store), async { rx.close_with(store) }).1
})
.await?;

// Test dropping and then watching the read end of a stream.
let (mut tx, rx) = instance.stream::<u8>(&mut store)?;
let (mut tx, mut rx) = instance.stream::<u8>(&mut store)?;
instance
.run_concurrent(&mut store, async |store| {
rx.close_with(store);
Expand All @@ -66,15 +66,15 @@ pub async fn async_watch_streams() -> Result<()> {
.await?;

// Test watching and then dropping the write end of a stream.
let (tx, mut rx) = instance.stream::<u8>(&mut store)?;
let (mut tx, mut rx) = instance.stream::<u8>(&mut store)?;
instance
.run_concurrent(&mut store, async |store| {
futures::join!(rx.watch_writer(store), async { tx.close_with(store) }).1
})
.await?;

// Test dropping and then watching the write end of a stream.
let (tx, mut rx) = instance.stream::<u8>(&mut store)?;
let (mut tx, mut rx) = instance.stream::<u8>(&mut store)?;
instance
.run_concurrent(&mut store, async |store| {
tx.close_with(store);
Expand All @@ -83,15 +83,15 @@ pub async fn async_watch_streams() -> Result<()> {
.await?;

// Test watching and then dropping the read end of a future.
let (mut tx, rx) = instance.future::<u8>(&mut store, || 42)?;
let (mut tx, mut rx) = instance.future::<u8>(&mut store, || 42)?;
instance
.run_concurrent(&mut store, async |store| {
futures::join!(tx.watch_reader(store), async { rx.close_with(store) }).1
})
.await?;

// Test dropping and then watching the read end of a future.
let (mut tx, rx) = instance.future::<u8>(&mut store, || 42)?;
let (mut tx, mut rx) = instance.future::<u8>(&mut store, || 42)?;
instance
.run_concurrent(&mut store, async |store| {
rx.close_with(store);
Expand All @@ -100,15 +100,15 @@ pub async fn async_watch_streams() -> Result<()> {
.await?;

// Test watching and then dropping the write end of a future.
let (tx, mut rx) = instance.future::<u8>(&mut store, || 42)?;
let (mut tx, mut rx) = instance.future::<u8>(&mut store, || 42)?;
instance
.run_concurrent(&mut store, async |store| {
futures::join!(rx.watch_writer(store), async { tx.close_with(store) }).1
})
.await?;

// Test dropping and then watching the write end of a future.
let (tx, mut rx) = instance.future::<u8>(&mut store, || 42)?;
let (mut tx, mut rx) = instance.future::<u8>(&mut store, || 42)?;
instance
.run_concurrent(&mut store, async |store| {
tx.close_with(store);
Expand All @@ -117,8 +117,11 @@ pub async fn async_watch_streams() -> Result<()> {
.await?;

enum Event<'a> {
Write(Option<GuardedStreamWriter<'a, u8, Ctx>>),
Read(Option<GuardedStreamReader<'a, u8, Ctx>>, Option<u8>),
Write(Option<GuardedStreamWriter<u8, &'a Accessor<Ctx>>>),
Read(
Option<GuardedStreamReader<u8, &'a Accessor<Ctx>>>,
Option<u8>,
),
}

// Test watching, then writing to, then dropping, then writing again to the
Expand Down Expand Up @@ -212,9 +215,9 @@ pub async fn test_closed_streams(watch: bool) -> Result<()> {
let instance = linker.instantiate_async(&mut store, &component).await?;

enum StreamEvent<'a> {
FirstWrite(Option<GuardedStreamWriter<'a, u8, Ctx>>),
FirstRead(Option<GuardedStreamReader<'a, u8, Ctx>>, Vec<u8>),
SecondWrite(Option<GuardedStreamWriter<'a, u8, Ctx>>),
FirstWrite(Option<GuardedStreamWriter<u8, &'a Accessor<Ctx>>>),
FirstRead(Option<GuardedStreamReader<u8, &'a Accessor<Ctx>>>, Vec<u8>),
SecondWrite(Option<GuardedStreamWriter<u8, &'a Accessor<Ctx>>>),
GuestCompleted,
}

Expand Down
13 changes: 8 additions & 5 deletions crates/misc/component-async-tests/tests/scenario/transmit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,15 +359,18 @@ async fn test_transmit_with<Test: TransmitTest + 'static>(component: &str) -> Re

enum Event<'a, Test: TransmitTest> {
Result(Test::Result),
ControlWriteA(Option<GuardedStreamWriter<'a, Control, Ctx>>),
ControlWriteB(Option<GuardedStreamWriter<'a, Control, Ctx>>),
ControlWriteC(Option<GuardedStreamWriter<'a, Control, Ctx>>),
ControlWriteA(Option<GuardedStreamWriter<Control, &'a Accessor<Ctx>>>),
ControlWriteB(Option<GuardedStreamWriter<Control, &'a Accessor<Ctx>>>),
ControlWriteC(Option<GuardedStreamWriter<Control, &'a Accessor<Ctx>>>),
ControlWriteD,
WriteA,
WriteB(bool),
ReadC(Option<GuardedStreamReader<'a, String, Ctx>>, Option<String>),
ReadC(
Option<GuardedStreamReader<String, &'a Accessor<Ctx>>>,
Option<String>,
),
ReadD(Option<String>),
ReadNone(Option<GuardedStreamReader<'a, String, Ctx>>),
ReadNone(Option<GuardedStreamReader<String, &'a Accessor<Ctx>>>),
}

let (control_tx, control_rx) = instance.stream(&mut store)?;
Expand Down
4 changes: 2 additions & 2 deletions crates/wasi/src/p3/sockets/host/types/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ impl HostTcpSocketWithStore for WasiSockets {
) -> wasmtime::Result<(StreamReader<u8>, FutureReader<Result<(), ErrorCode>>)> {
store.with(|mut view| {
let instance = view.instance();
let (data_tx, data_rx) = instance
let (mut data_tx, data_rx) = instance
.stream(&mut view)
.context("failed to create stream")?;
let TcpSocket { tcp_state, .. } = get_socket_mut(view.get().table, &socket)?;
Expand All @@ -411,7 +411,7 @@ impl HostTcpSocketWithStore for WasiSockets {
}
prev => {
*tcp_state = prev;
let (result_tx, result_rx) = instance
let (mut result_tx, result_rx) = instance
.future(&mut view, || Err(ErrorCode::InvalidState))
.context("failed to create future")?;
result_tx.close(&mut view);
Expand Down
Loading