fix: reclaim host stream/future transmits when the guest drops its end#13515
fix: reclaim host stream/future transmits when the guest drops its end#13515gfx wants to merge 1 commit into
Conversation
7025008 to
ac7f447
Compare
When the guest drops its end of a stream/future while the host consumer/producer is still `HostReady`, the host end was never finalized, so the `TransmitState` and both handles leaked from the concurrent-state table (eventually trapping with "resource table has no free keys"). The `HostReady` arms of `host_drop_reader` and `host_drop_writer` were no-ops; both now `delete_transmit`. `host_drop_writer` only finalizes once the writer is actually `Dropped`. Adds two `component-async-tests` regression tests (one per path) that fail on `main` and pass here. Fixes bytecodealliance#13514. Assisted-by: Claude Code
ac7f447 to
89cc10d
Compare
alexcrichton
left a comment
There was a problem hiding this comment.
Thanks! I've got a question about the conditional drop, but otherwise this all looks reasonable to me
| // A host consumer (e.g. one registered via `StreamReader::pipe`) | ||
| // is only driven on guest writes; it is never re-polled to | ||
| // observe the guest dropping the write end. Reclaim the transmit | ||
| // (state + both handles) so it does not leak. Unlike | ||
| // `host_drop_reader`, the write end is not forced to `Dropped` | ||
| // earlier in this function, so only finalize once the writer is | ||
| // actually gone -- otherwise we would discard a still-live host | ||
| // writer. The consumer is dropped along with the matched value. | ||
| if matches!( | ||
| self.concurrent_state_mut().get_mut(transmit_id)?.write, | ||
| WriteState::Dropped | ||
| ) { | ||
| log::trace!("host_drop_writer: finalize host consumer, delete {transmit_id:?}"); | ||
| self.concurrent_state_mut().delete_transmit(transmit_id)?; | ||
| } |
There was a problem hiding this comment.
I'm not sure I fully understand why this is conditional, can you explain this a bit more? For example if the write state is not dropped, meaning that this isn't executed, then I believe it's only possible to get here by flowing through the WriteState::HostReady, but that means that the host actually owns the writer. This function is only called when the guest drops the writer, so I'm not sure how that's possible.
Are there tests for this conditional branch? Or if this branch goes away and the delete_transmit unconditionally happens, what bad would happen? (e.g. could a test be written to exercise that?)
Fixes #13514.
When the guest drops its end of a stream/future while the host consumer/producer is still
HostReady, the transmit was never reclaimed. This finalizes the stranded host end so theTransmitStateand both handles are freed.Fix
crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs— twoHostReadyarms were no-ops; both now calldelete_transmit(which also drops the host producer/consumer):host_drop_reader(guest dropped the read end → host producer): the read end is alreadyDropped, so finalize unconditionally.host_drop_writer(guest dropped the write end → host consumer): the write end isn't forcedDroppedhere, so finalize only once the writer is actually gone.Tests
Two regression tests in
component-async-tests, driving only the publicStreamReader::pipe/FutureReader::newAPIs:streams::async_host_consumer_drop(+ a minimalhost-consumer-drop-guesttest program) — covershost_drop_writer.streams::async_host_producer_drop(reuses the existingclosed-streamsguest) — covershost_drop_reader.Both fail on
main(leftover concurrent-state entries viaassert_concurrent_state_empty) and pass with this change.Verification
cargo test -p component-async-tests --test test_all→ 82 passed, 0 failed.