From 3ee24b4adef6ec4e0add4b0e8af2eeb34529195f Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Wed, 30 Jul 2025 12:52:05 -0700 Subject: [PATCH] Refine `Guard*` APIs and implementation This is a follow-up to #11325 with a number of cosmetic changes about the shape of the API and structure of the internals: * `{Stream,Future}{Reader,Writer}::guard` is now an alternative constructor to `Guard*::new` (import fewer types). * Internally `WithAccessor` and `DropWithStore` are removed in favor of direct `Drop for Guard*` impls. * An `Option` is used to replace `ManuallyDrop` and `unsafe` code. * `{Stream,Future}{Reader,Writer}::close{,_with}` now take `&mut self` instead of `self` to be more composable with `&mut self` arguments during `Drop` for other structures (e.g. build-your-own drop-with-store). * The type parameters on `Guard*` are simplified to just `T`, the future or stream payload, and `A: AsAccessor`. This helps cut down on the complexity of signatures. * `Guard*` types now have `into_{stream,future}` as an alternative to `.into()` which doesn't require type annotations. --- .../tests/scenario/streams.rs | 33 +- .../tests/scenario/transmit.rs | 13 +- crates/wasi/src/p3/sockets/host/types/tcp.rs | 4 +- .../concurrent/futures_and_streams.rs | 565 +++++++++++------- 4 files changed, 392 insertions(+), 223 deletions(-) diff --git a/crates/misc/component-async-tests/tests/scenario/streams.rs b/crates/misc/component-async-tests/tests/scenario/streams.rs index a0d45fce642a..63c2dddab184 100644 --- a/crates/misc/component-async-tests/tests/scenario/streams.rs +++ b/crates/misc/component-async-tests/tests/scenario/streams.rs @@ -15,8 +15,8 @@ use { wasmtime::{ Engine, Store, component::{ - GuardedFutureReader, GuardedStreamReader, GuardedStreamWriter, Linker, ResourceTable, - VecBuffer, + Accessor, GuardedFutureReader, GuardedStreamReader, GuardedStreamWriter, Linker, + ResourceTable, VecBuffer, }, }, wasmtime_wasi::p2::WasiCtxBuilder, @@ -49,7 +49,7 @@ pub async fn async_watch_streams() -> Result<()> { let instance = linker.instantiate_async(&mut store, &component).await?; // Test watching and then dropping the read end of a stream. - let (mut tx, rx) = instance.stream::(&mut store)?; + let (mut tx, mut rx) = instance.stream::(&mut store)?; instance .run_concurrent(&mut store, async |store| { futures::join!(tx.watch_reader(store), async { rx.close_with(store) }).1 @@ -57,7 +57,7 @@ pub async fn async_watch_streams() -> Result<()> { .await?; // Test dropping and then watching the read end of a stream. - let (mut tx, rx) = instance.stream::(&mut store)?; + let (mut tx, mut rx) = instance.stream::(&mut store)?; instance .run_concurrent(&mut store, async |store| { rx.close_with(store); @@ -66,7 +66,7 @@ pub async fn async_watch_streams() -> Result<()> { .await?; // Test watching and then dropping the write end of a stream. - let (tx, mut rx) = instance.stream::(&mut store)?; + let (mut tx, mut rx) = instance.stream::(&mut store)?; instance .run_concurrent(&mut store, async |store| { futures::join!(rx.watch_writer(store), async { tx.close_with(store) }).1 @@ -74,7 +74,7 @@ pub async fn async_watch_streams() -> Result<()> { .await?; // Test dropping and then watching the write end of a stream. - let (tx, mut rx) = instance.stream::(&mut store)?; + let (mut tx, mut rx) = instance.stream::(&mut store)?; instance .run_concurrent(&mut store, async |store| { tx.close_with(store); @@ -83,7 +83,7 @@ pub async fn async_watch_streams() -> Result<()> { .await?; // Test watching and then dropping the read end of a future. - let (mut tx, rx) = instance.future::(&mut store, || 42)?; + let (mut tx, mut rx) = instance.future::(&mut store, || 42)?; instance .run_concurrent(&mut store, async |store| { futures::join!(tx.watch_reader(store), async { rx.close_with(store) }).1 @@ -91,7 +91,7 @@ pub async fn async_watch_streams() -> Result<()> { .await?; // Test dropping and then watching the read end of a future. - let (mut tx, rx) = instance.future::(&mut store, || 42)?; + let (mut tx, mut rx) = instance.future::(&mut store, || 42)?; instance .run_concurrent(&mut store, async |store| { rx.close_with(store); @@ -100,7 +100,7 @@ pub async fn async_watch_streams() -> Result<()> { .await?; // Test watching and then dropping the write end of a future. - let (tx, mut rx) = instance.future::(&mut store, || 42)?; + let (mut tx, mut rx) = instance.future::(&mut store, || 42)?; instance .run_concurrent(&mut store, async |store| { futures::join!(rx.watch_writer(store), async { tx.close_with(store) }).1 @@ -108,7 +108,7 @@ pub async fn async_watch_streams() -> Result<()> { .await?; // Test dropping and then watching the write end of a future. - let (tx, mut rx) = instance.future::(&mut store, || 42)?; + let (mut tx, mut rx) = instance.future::(&mut store, || 42)?; instance .run_concurrent(&mut store, async |store| { tx.close_with(store); @@ -117,8 +117,11 @@ pub async fn async_watch_streams() -> Result<()> { .await?; enum Event<'a> { - Write(Option>), - Read(Option>, Option), + Write(Option>>), + Read( + Option>>, + Option, + ), } // Test watching, then writing to, then dropping, then writing again to the @@ -212,9 +215,9 @@ pub async fn test_closed_streams(watch: bool) -> Result<()> { let instance = linker.instantiate_async(&mut store, &component).await?; enum StreamEvent<'a> { - FirstWrite(Option>), - FirstRead(Option>, Vec), - SecondWrite(Option>), + FirstWrite(Option>>), + FirstRead(Option>>, Vec), + SecondWrite(Option>>), GuestCompleted, } diff --git a/crates/misc/component-async-tests/tests/scenario/transmit.rs b/crates/misc/component-async-tests/tests/scenario/transmit.rs index 6f40a9f083a0..2e4ec78d9c0a 100644 --- a/crates/misc/component-async-tests/tests/scenario/transmit.rs +++ b/crates/misc/component-async-tests/tests/scenario/transmit.rs @@ -359,15 +359,18 @@ async fn test_transmit_with(component: &str) -> Re enum Event<'a, Test: TransmitTest> { Result(Test::Result), - ControlWriteA(Option>), - ControlWriteB(Option>), - ControlWriteC(Option>), + ControlWriteA(Option>>), + ControlWriteB(Option>>), + ControlWriteC(Option>>), ControlWriteD, WriteA, WriteB(bool), - ReadC(Option>, Option), + ReadC( + Option>>, + Option, + ), ReadD(Option), - ReadNone(Option>), + ReadNone(Option>>), } let (control_tx, control_rx) = instance.stream(&mut store)?; diff --git a/crates/wasi/src/p3/sockets/host/types/tcp.rs b/crates/wasi/src/p3/sockets/host/types/tcp.rs index aca7b959ba91..de33a6d8e57f 100644 --- a/crates/wasi/src/p3/sockets/host/types/tcp.rs +++ b/crates/wasi/src/p3/sockets/host/types/tcp.rs @@ -392,7 +392,7 @@ impl HostTcpSocketWithStore for WasiSockets { ) -> wasmtime::Result<(StreamReader, FutureReader>)> { store.with(|mut view| { let instance = view.instance(); - let (data_tx, data_rx) = instance + let (mut data_tx, data_rx) = instance .stream(&mut view) .context("failed to create stream")?; let TcpSocket { tcp_state, .. } = get_socket_mut(view.get().table, &socket)?; @@ -411,7 +411,7 @@ impl HostTcpSocketWithStore for WasiSockets { } prev => { *tcp_state = prev; - let (result_tx, result_rx) = instance + let (mut result_tx, result_rx) = instance .future(&mut view, || Err(ErrorCode::InvalidState)) .context("failed to create future")?; result_tx.close(&mut view); diff --git a/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs b/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs index 4f755f355a7b..09452efe992e 100644 --- a/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs +++ b/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs @@ -7,9 +7,7 @@ use crate::component::concurrent::{ConcurrentState, WorkItem}; use crate::component::func::{self, LiftContext, LowerContext, Options}; use crate::component::matching::InstanceType; use crate::component::values::{ErrorContextAny, FutureAny, StreamAny}; -use crate::component::{ - Accessor, AsAccessor, HasData, HasSelf, Instance, Lower, Val, WasmList, WasmStr, -}; +use crate::component::{AsAccessor, Instance, Lower, Val, WasmList, WasmStr}; use crate::store::{StoreOpaque, StoreToken}; use crate::vm::VMStore; use crate::{AsContextMut, StoreContextMut, ValRaw}; @@ -22,7 +20,7 @@ use std::fmt; use std::future; use std::iter; use std::marker::PhantomData; -use std::mem::{self, ManuallyDrop, MaybeUninit}; +use std::mem::{self, MaybeUninit}; use std::string::{String, ToString}; use std::sync::{Arc, Mutex}; use std::task::{Poll, Waker}; @@ -403,60 +401,6 @@ pub(super) struct FlatAbi { pub(super) align: u32, } -/// Trait representing objects (such as streams, futures, or structs containing -/// them) which require access to the store in order to be disposed of properly. -trait DropWithStore: Sized { - /// Dispose of `self` using the specified store. - fn drop(&mut self, store: impl AsContextMut); - - /// Dispose of `self` using the specified accessor. - fn drop_with(&mut self, accessor: impl AsAccessor) { - accessor.as_accessor().with(|store| self.drop(store)) - } -} - -/// RAII wrapper for `DropWithStore` implementations. -/// -/// This may be used to automatically dispose of the wrapped object when it goes -/// out of scope. -struct WithAccessor<'a, T: DropWithStore, U: 'static, D: HasData + ?Sized = HasSelf> { - accessor: &'a Accessor, - inner: ManuallyDrop, -} - -impl<'a, T: DropWithStore, U, D: HasData + ?Sized> WithAccessor<'a, T, U, D> { - /// Create a new instance wrapping the specified `inner` object. - fn new(accessor: &'a Accessor, inner: T) -> Self { - Self { - accessor, - inner: ManuallyDrop::new(inner), - } - } - - fn into_parts(self) -> (&'a Accessor, T) { - let accessor = self.accessor; - let mut me = ManuallyDrop::new(self); - // SAFETY: We've wrapped `self` in a `ManuallyDrop` and will not use or - // drop it after we've moved the `inner` field out. - let inner = unsafe { ManuallyDrop::take(&mut me.inner) }; - (accessor, inner) - } -} - -impl<'a, T: DropWithStore, U, D: HasData + ?Sized> Drop for WithAccessor<'a, T, U, D> { - fn drop(&mut self) { - // SAFETY: `Drop::drop` is called at most once and after which `self` - // can no longer be used, thus ensuring `self.inner` will no longer be - // used. - // - // Technically we could avoid `unsafe` here and just call - // `self.inner.drop_with` instead, but then `T` would never by dropped. - // As of this writing, we don't use types for `T` which implement `Drop` - // anyway, but that could change later. - _ = unsafe { ManuallyDrop::take(&mut self.inner) }.drop_with(self.accessor); - } -} - /// Represents the writable end of a Component Model `future`. /// /// Note that `FutureWriter` instances must be disposed of using either `write` @@ -489,16 +433,23 @@ impl FutureWriter { /// Panics if the store that the [`Accessor`] is derived from does not own /// this future. pub async fn write(self, accessor: impl AsAccessor, value: T) -> bool + where + T: func::Lower + Send + Sync + 'static, + { + self.guard(accessor).write(value).await + } + + /// Mut-ref signature instead of by-value signature for + /// `GuardedFutureWriter` to more easily call. + async fn write_(&mut self, accessor: impl AsAccessor, value: T) -> bool where T: func::Lower + Send + Sync + 'static, { let accessor = accessor.as_accessor(); - let me = WithAccessor::new(accessor, self); - let result = me - .inner + let result = self .instance - .host_write_async(accessor, me.inner.id, Some(value), TransmitKind::Future) + .host_write_async(accessor, self.id, Some(value), TransmitKind::Future) .await; match result { @@ -521,72 +472,123 @@ impl FutureWriter { } /// Close this `FutureWriter`, writing the default value. - pub fn close(mut self, store: impl AsContextMut) + /// + /// # Panics + /// + /// Panics if the store that the [`Accessor`] is derived from does not own + /// this future. Usage of this future after calling `close` will also cause + /// a panic. + pub fn close(&mut self, mut store: impl AsContextMut) where T: func::Lower + Send + Sync + 'static, { - self.drop(store) + let id = mem::replace(&mut self.id, TableId::new(0)); + let default = self.default; + self.instance + .host_drop_writer(store.as_context_mut(), id, Some(&move || Ok(default()))) + .unwrap(); } - /// Close this `FutureWriter`, writing the default value. - pub fn close_with(mut self, accessor: impl AsAccessor) + /// Convenience method around [`Self::close`]. + pub fn close_with(&mut self, accessor: impl AsAccessor) where T: func::Lower + Send + Sync + 'static, { - accessor.as_accessor().with(|access| self.drop(access)) + accessor.as_accessor().with(|access| self.close(access)) } -} -impl DropWithStore for FutureWriter { - fn drop(&mut self, mut store: impl AsContextMut) { - // `self` should never be used again, but leave an invalid handle there just in case. - let id = mem::replace(&mut self.id, TableId::new(0)); - let default = self.default; - self.instance - .host_drop_writer(store.as_context_mut(), id, Some(&move || Ok(default()))) - .unwrap() + /// Returns a [`GuardedFutureWriter`] which will auto-close this future on + /// drop and clean it up from the store. + /// + /// Note that the `accessor` provided must own this future and is + /// additionally transferred to the `GuardedFutureWriter` return value. + pub fn guard(self, accessor: A) -> GuardedFutureWriter + where + T: func::Lower + Send + Sync + 'static, + A: AsAccessor, + { + GuardedFutureWriter::new(accessor, self) } } -/// A `FutureWriter` paired with an `Accessor`. +/// A [`FutureWriter`] paired with an [`Accessor`]. /// -/// This is an RAII wrapper around `FutureWriter` that ensures it is closed when -/// dropped. -pub struct GuardedFutureWriter< - 'a, +/// This is an RAII wrapper around [`FutureWriter`] that ensures it is closed +/// when dropped. This can be created through [`GuardedFutureWriter::new`] or +/// [`FutureWriter::guard`]. +pub struct GuardedFutureWriter +where T: func::Lower + Send + Sync + 'static, - U: 'static, - D: HasData + ?Sized = HasSelf, ->(WithAccessor<'a, FutureWriter, U, D>); + A: AsAccessor, +{ + // This field is `None` to implement the conversion from this guard back to + // `FutureWriter`. When `None` is seen in the destructor it will cause the + // destructor to do nothing. + writer: Option>, + accessor: A, +} -impl<'a, T: func::Lower + Send + Sync + 'static, U: 'static, D: HasData + ?Sized> - GuardedFutureWriter<'a, T, U, D> +impl GuardedFutureWriter +where + T: func::Lower + Send + Sync + 'static, + A: AsAccessor, { - /// Create a new `GuardedFutureWriter` with the specified `accessor` and `writer`. - pub fn new(accessor: &'a Accessor, writer: FutureWriter) -> Self { - Self(WithAccessor::new(accessor, writer)) + /// Create a new `GuardedFutureWriter` with the specified `accessor` and + /// `writer`. + pub fn new(accessor: A, writer: FutureWriter) -> Self { + Self { + writer: Some(writer), + accessor, + } } - /// Wrapper for `FutureWriter::write`. - pub async fn write(self, value: T) -> bool + /// Wrapper for [`FutureWriter::write`]. + pub async fn write(mut self, value: T) -> bool where T: func::Lower + Send + Sync + 'static, { - let (accessor, writer) = self.0.into_parts(); - writer.write(accessor, value).await + self.writer + .as_mut() + .unwrap() + .write_(&self.accessor, value) + .await } - /// Wrapper for `FutureWriter::watch_reader`. + /// Wrapper for [`FutureWriter::watch_reader`] pub async fn watch_reader(&mut self) { - self.0.inner.watch_reader(self.0.accessor).await + self.writer + .as_mut() + .unwrap() + .watch_reader(&self.accessor) + .await + } + + /// Extracts the underlying [`FutureWriter`] from this guard, returning it + /// back. + pub fn into_future(self) -> FutureWriter { + self.into() + } +} + +impl From> for FutureWriter +where + T: func::Lower + Send + Sync + 'static, + A: AsAccessor, +{ + fn from(mut guard: GuardedFutureWriter) -> Self { + guard.writer.take().unwrap() } } -impl<'a, T: func::Lower + Send + Sync + 'static, U: 'static, D: HasData + ?Sized> - From> for FutureWriter +impl Drop for GuardedFutureWriter +where + T: func::Lower + Send + Sync + 'static, + A: AsAccessor, { - fn from(writer: GuardedFutureWriter<'a, T, U, D>) -> Self { - writer.0.into_parts().1 + fn drop(&mut self) { + if let Some(writer) = &mut self.writer { + writer.close_with(&self.accessor) + } } } @@ -624,16 +626,21 @@ impl FutureReader { /// Panics if the store that the [`Accessor`] is derived from does not own /// this future. pub async fn read(self, accessor: impl AsAccessor) -> Option + where + T: func::Lift + Send + 'static, + { + self.guard(accessor).read().await + } + + async fn read_(&mut self, accessor: impl AsAccessor) -> Option where T: func::Lift + Send + 'static, { let accessor = accessor.as_accessor(); - let me = WithAccessor::new(accessor, self); - let result = me - .inner + let result = self .instance - .host_read_async(accessor, me.inner.id, None, TransmitKind::Future) + .host_read_async(accessor, self.id, None, TransmitKind::Future) .await; if let Ok(HostResult { @@ -714,19 +721,14 @@ impl FutureReader { } } - /// Close this `FutureReader`. - pub fn close(mut self, store: impl AsContextMut) { - self.drop(store) - } - - /// Close this `FutureReader`. - pub fn close_with(mut self, accessor: impl AsAccessor) { - accessor.as_accessor().with(|access| self.drop(access)) - } -} - -impl DropWithStore for FutureReader { - fn drop(&mut self, mut store: impl AsContextMut) { + /// Close this `FutureReader`, writing the default value. + /// + /// # Panics + /// + /// Panics if the store that the [`Accessor`] is derived from does not own + /// this future. Usage of this future after calling `close` will also cause + /// a panic. + pub fn close(&mut self, mut store: impl AsContextMut) { // `self` should never be used again, but leave an invalid handle there just in case. let id = mem::replace(&mut self.id, TableId::new(0)); self.instance @@ -735,7 +737,24 @@ impl DropWithStore for FutureReader { id, TransmitKind::Future, ) - .unwrap() + .unwrap(); + } + + /// Convenience method around [`Self::close`]. + pub fn close_with(&mut self, accessor: impl AsAccessor) { + accessor.as_accessor().with(|access| self.close(access)) + } + + /// Returns a [`GuardedFutureReader`] which will auto-close this future on + /// drop and clean it up from the store. + /// + /// Note that the `accessor` provided must own this future and is + /// additionally transferred to the `GuardedFutureReader` return value. + pub fn guard(self, accessor: A) -> GuardedFutureReader + where + A: AsAccessor, + { + GuardedFutureReader::new(accessor, self) } } @@ -838,40 +857,75 @@ unsafe impl func::Lift for FutureReader { } } -/// A `FutureReader` paired with an `Accessor`. +/// A [`FutureReader`] paired with an [`Accessor`]. /// -/// This is an RAII wrapper around `FutureReader` that ensures it is closed when -/// dropped. -pub struct GuardedFutureReader<'a, T, U: 'static, D: HasData + ?Sized = HasSelf>( - WithAccessor<'a, FutureReader, U, D>, -); +/// This is an RAII wrapper around [`FutureReader`] that ensures it is closed +/// when dropped. This can be created through [`GuardedFutureReader::new`] or +/// [`FutureReader::guard`]. +pub struct GuardedFutureReader +where + A: AsAccessor, +{ + // This field is `None` to implement the conversion from this guard back to + // `FutureReader`. When `None` is seen in the destructor it will cause the + // destructor to do nothing. + reader: Option>, + accessor: A, +} -impl<'a, T, U: 'static, D: HasData + ?Sized> GuardedFutureReader<'a, T, U, D> { +impl GuardedFutureReader +where + A: AsAccessor, +{ /// Create a new `GuardedFutureReader` with the specified `accessor` and `reader`. - pub fn new(accessor: &'a Accessor, reader: FutureReader) -> Self { - Self(WithAccessor::new(accessor.as_accessor(), reader)) + pub fn new(accessor: A, reader: FutureReader) -> Self { + Self { + reader: Some(reader), + accessor, + } } - /// Wrapper for `FutureReader::read`. - pub async fn read(self) -> Option + /// Wrapper for [`FutureReader::read`]. + pub async fn read(mut self) -> Option where - T: func::Lift + Send + Sync + 'static, + T: func::Lift + Send + 'static, { - let (accessor, reader) = self.0.into_parts(); - reader.read(accessor).await + self.reader.as_mut().unwrap().read_(&self.accessor).await } - /// Wrapper for `FutureReader::watch_writer`. + /// Wrapper for [`FutureReader::watch_writer`]. pub async fn watch_writer(&mut self) { - self.0.inner.watch_writer(self.0.accessor).await + self.reader + .as_mut() + .unwrap() + .watch_writer(&self.accessor) + .await + } + + /// Extracts the underlying [`FutureReader`] from this guard, returning it + /// back. + pub fn into_future(self) -> FutureReader { + self.into() + } +} + +impl From> for FutureReader +where + A: AsAccessor, +{ + fn from(mut guard: GuardedFutureReader) -> Self { + guard.reader.take().unwrap() } } -impl<'a, T, U: 'static, D: HasData + ?Sized> From> - for FutureReader +impl Drop for GuardedFutureReader +where + A: AsAccessor, { - fn from(reader: GuardedFutureReader<'a, T, U, D>) -> Self { - reader.0.into_parts().1 + fn drop(&mut self) { + if let Some(reader) = &mut self.reader { + reader.close_with(&self.accessor) + } } } @@ -982,75 +1036,131 @@ impl StreamWriter { watch_reader(accessor, self.instance, self.id).await } - /// Close this `StreamWriter`. - pub fn close(mut self, store: impl AsContextMut) { - self.drop(store) - } - - /// Close this `StreamWriter`. - pub fn close_with(mut self, accessor: impl AsAccessor) { - accessor.as_accessor().with(|access| self.drop(access)) - } -} - -impl DropWithStore for StreamWriter { - fn drop(&mut self, mut store: impl AsContextMut) { + /// Close this `StreamWriter`, writing the default value. + /// + /// # Panics + /// + /// Panics if the store that the [`Accessor`] is derived from does not own + /// this future. Usage of this future after calling `close` will also cause + /// a panic. + pub fn close(&mut self, mut store: impl AsContextMut) { // `self` should never be used again, but leave an invalid handle there just in case. let id = mem::replace(&mut self.id, TableId::new(0)); self.instance .host_drop_writer(store.as_context_mut(), id, None::<&dyn Fn() -> Result<()>>) .unwrap() } + + /// Convenience method around [`Self::close`]. + pub fn close_with(&mut self, accessor: impl AsAccessor) { + accessor.as_accessor().with(|access| self.close(access)) + } + + /// Returns a [`GuardedStreamWriter`] which will auto-close this stream on + /// drop and clean it up from the store. + /// + /// Note that the `accessor` provided must own this future and is + /// additionally transferred to the `GuardedStreamWriter` return value. + pub fn guard(self, accessor: A) -> GuardedStreamWriter + where + A: AsAccessor, + { + GuardedStreamWriter::new(accessor, self) + } } -/// A `StreamWriter` paired with an `Accessor`. +/// A [`StreamWriter`] paired with an [`Accessor`]. /// -/// This is an RAII wrapper around `StreamWriter` that ensures it is closed when -/// dropped. -pub struct GuardedStreamWriter<'a, T, U: 'static, D: HasData + ?Sized = HasSelf>( - WithAccessor<'a, StreamWriter, U, D>, -); +/// This is an RAII wrapper around [`StreamWriter`] that ensures it is closed +/// when dropped. This can be created through [`GuardedStreamWriter::new`] or +/// [`StreamWriter::guard`]. +pub struct GuardedStreamWriter +where + A: AsAccessor, +{ + // This field is `None` to implement the conversion from this guard back to + // `StreamWriter`. When `None` is seen in the destructor it will cause the + // destructor to do nothing. + writer: Option>, + accessor: A, +} -impl<'a, T, U: 'static, D: HasData + ?Sized> GuardedStreamWriter<'a, T, U, D> { +impl GuardedStreamWriter +where + A: AsAccessor, +{ /// Create a new `GuardedStreamWriter` with the specified `accessor` and `writer`. - pub fn new(accessor: &'a Accessor, writer: StreamWriter) -> Self { - Self(WithAccessor::new(accessor.as_accessor(), writer)) + pub fn new(accessor: A, writer: StreamWriter) -> Self { + Self { + writer: Some(writer), + accessor, + } } - /// Wrapper for `StreamWriter::is_closed` + /// Wrapper for [`StreamWriter::is_closed`]. pub fn is_closed(&self) -> bool { - self.0.inner.is_closed() + self.writer.as_ref().unwrap().is_closed() } - /// Wrapper for `StreamWriter::write`. + /// Wrapper for [`StreamWriter::write`]. pub async fn write(&mut self, buffer: B) -> B where T: func::Lower + 'static, B: WriteBuffer, { - self.0.inner.write(self.0.accessor, buffer).await + self.writer + .as_mut() + .unwrap() + .write(&self.accessor, buffer) + .await } - /// Wrapper for `StreamWriter::write_all`. + /// Wrapper for [`StreamWriter::write_all`]. pub async fn write_all(&mut self, buffer: B) -> B where T: func::Lower + 'static, B: WriteBuffer, { - self.0.inner.write_all(self.0.accessor, buffer).await + self.writer + .as_mut() + .unwrap() + .write_all(&self.accessor, buffer) + .await } - /// Wrapper for `StreamWriter::watch_reader`. + /// Wrapper for [`StreamWriter::watch_reader`]. pub async fn watch_reader(&mut self) { - self.0.inner.watch_reader(self.0.accessor).await + self.writer + .as_mut() + .unwrap() + .watch_reader(&self.accessor) + .await + } + + /// Extracts the underlying [`StreamWriter`] from this guard, returning it + /// back. + pub fn into_stream(self) -> StreamWriter { + self.into() } } -impl<'a, T, U: 'static, D: HasData + ?Sized> From> - for StreamWriter +impl From> for StreamWriter +where + A: AsAccessor, { - fn from(writer: GuardedStreamWriter<'a, T, U, D>) -> Self { - writer.0.into_parts().1 + fn from(mut guard: GuardedStreamWriter) -> Self { + guard.writer.take().unwrap() + } +} + +impl Drop for GuardedStreamWriter +where + A: AsAccessor, +{ + fn drop(&mut self) { + if let Some(writer) = &mut self.writer { + writer.close_with(&self.accessor) + } } } @@ -1185,19 +1295,14 @@ impl StreamReader { } } - /// Close this `StreamReader`. - pub fn close(mut self, store: impl AsContextMut) { - self.drop(store) - } - - /// Close this `StreamReader`. - pub fn close_with(mut self, accessor: impl AsAccessor) { - accessor.as_accessor().with(|access| self.drop(access)) - } -} - -impl DropWithStore for StreamReader { - fn drop(&mut self, mut store: impl AsContextMut) { + /// Close this `StreamReader`, writing the default value. + /// + /// # Panics + /// + /// Panics if the store that the [`Accessor`] is derived from does not own + /// this future. Usage of this future after calling `close` will also cause + /// a panic. + pub fn close(&mut self, mut store: impl AsContextMut) { // `self` should never be used again, but leave an invalid handle there just in case. let id = mem::replace(&mut self.id, TableId::new(0)); self.instance @@ -1208,6 +1313,23 @@ impl DropWithStore for StreamReader { ) .unwrap() } + + /// Convenience method around [`Self::close`]. + pub fn close_with(&mut self, accessor: impl AsAccessor) { + accessor.as_accessor().with(|access| self.close(access)) + } + + /// Returns a [`GuardedStreamReader`] which will auto-close this stream on + /// drop and clean it up from the store. + /// + /// Note that the `accessor` provided must own this future and is + /// additionally transferred to the `GuardedStreamReader` return value. + pub fn guard(self, accessor: A) -> GuardedStreamReader + where + A: AsAccessor, + { + GuardedStreamReader::new(accessor, self) + } } impl fmt::Debug for StreamReader { @@ -1309,23 +1431,38 @@ unsafe impl func::Lift for StreamReader { } } -/// A `StreamReader` paired with an `Accessor`. +/// A [`StreamReader`] paired with an [`Accessor`]. /// -/// This is an RAII wrapper around `StreamReader` that ensures it is closed when -/// dropped. -pub struct GuardedStreamReader<'a, T, U: 'static, D: HasData + ?Sized = HasSelf>( - WithAccessor<'a, StreamReader, U, D>, -); +/// This is an RAII wrapper around [`StreamReader`] that ensures it is closed +/// when dropped. This can be created through [`GuardedStreamReader::new`] or +/// [`StreamReader::guard`]. +pub struct GuardedStreamReader +where + A: AsAccessor, +{ + // This field is `None` to implement the conversion from this guard back to + // `StreamReader`. When `None` is seen in the destructor it will cause the + // destructor to do nothing. + reader: Option>, + accessor: A, +} -impl<'a, T, U: 'static, D: HasData + ?Sized> GuardedStreamReader<'a, T, U, D> { - /// Create a new `GuardedStreamReader` with the specified `accessor` and `reader`. - pub fn new(accessor: &'a Accessor, reader: StreamReader) -> Self { - Self(WithAccessor::new(accessor.as_accessor(), reader)) +impl GuardedStreamReader +where + A: AsAccessor, +{ + /// Create a new `GuardedStreamReader` with the specified `accessor` and + /// `reader`. + pub fn new(accessor: A, reader: StreamReader) -> Self { + Self { + reader: Some(reader), + accessor, + } } /// Wrapper for `StreamReader::is_closed` pub fn is_closed(&self) -> bool { - self.0.inner.is_closed() + self.reader.as_ref().unwrap().is_closed() } /// Wrapper for `StreamReader::read`. @@ -1334,20 +1471,46 @@ impl<'a, T, U: 'static, D: HasData + ?Sized> GuardedStreamReader<'a, T, U, D> { T: func::Lift + 'static, B: ReadBuffer + Send + 'static, { - self.0.inner.read(self.0.accessor, buffer).await + self.reader + .as_mut() + .unwrap() + .read(&self.accessor, buffer) + .await } /// Wrapper for `StreamReader::watch_writer`. pub async fn watch_writer(&mut self) { - self.0.inner.watch_writer(self.0.accessor).await + self.reader + .as_mut() + .unwrap() + .watch_writer(&self.accessor) + .await + } + + /// Extracts the underlying [`StreamReader`] from this guard, returning it + /// back. + pub fn into_stream(self) -> StreamReader { + self.into() + } +} + +impl From> for StreamReader +where + A: AsAccessor, +{ + fn from(mut guard: GuardedStreamReader) -> Self { + guard.reader.take().unwrap() } } -impl<'a, T, U: 'static, D: HasData + ?Sized> From> - for StreamReader +impl Drop for GuardedStreamReader +where + A: AsAccessor, { - fn from(reader: GuardedStreamReader<'a, T, U, D>) -> Self { - reader.0.into_parts().1 + fn drop(&mut self) { + if let Some(reader) = &mut self.reader { + reader.close_with(&self.accessor) + } } }