diff --git a/Sources/AsyncAlgorithms/MultiProducerSingleConsumerChannel/MultiProducerSingleConsumerAsyncChannel+Internal.swift b/Sources/AsyncAlgorithms/MultiProducerSingleConsumerChannel/MultiProducerSingleConsumerAsyncChannel+Internal.swift index 32892678..296149ac 100644 --- a/Sources/AsyncAlgorithms/MultiProducerSingleConsumerChannel/MultiProducerSingleConsumerAsyncChannel+Internal.swift +++ b/Sources/AsyncAlgorithms/MultiProducerSingleConsumerChannel/MultiProducerSingleConsumerAsyncChannel+Internal.swift @@ -321,11 +321,13 @@ extension MultiProducerSingleConsumerAsyncChannel { return $0.enqueueProducer(callbackToken: callbackToken, onProduceMore: onProduceMore) } - switch action { + switch consume action { case .resumeProducer(let onProduceMore): + let onProduceMore = onProduceMore.take() onProduceMore(Result.success(())) case .resumeProducerWithError(let onProduceMore, let error): + let onProduceMore = onProduceMore.take() onProduceMore(Result.failure(error)) case .none: @@ -396,7 +398,7 @@ extension MultiProducerSingleConsumerAsyncChannel { $0.next() } - switch action { + switch consume action { case .returnElement(let element): return element.take() @@ -438,17 +440,15 @@ extension MultiProducerSingleConsumerAsyncChannel { $0.suspendNext(continuation: continuation) } - switch action { - case .resumeConsumerWithElement(let continuation, let element): - continuation.resume(returning: element.take()) - - case .resumeConsumerWithElementAndProducers( - let continuation, - let element, - let producerContinuations - ): - continuation.resume(returning: element.take()) - for producerContinuation in producerContinuations { + switch consume action { + case .resumeConsumerWithElement(let payload): + let element = payload.element.take() + payload.continuation.resume(returning: element) + + case .resumeConsumerWithElementAndProducers(let payload): + let element = payload.element.take() + payload.continuation.resume(returning: element) + for producerContinuation in payload.producerContinuations { switch producerContinuation { case .closure(let onProduceMore): onProduceMore(.success(())) @@ -457,19 +457,13 @@ extension MultiProducerSingleConsumerAsyncChannel { } } - case .resumeConsumerWithFailureAndCallOnTerminations( - let continuation, - let failure, - let onTerminations - ): - switch failure { - case .some(let error): - continuation.resume(throwing: error) - - case .none: - continuation.resume(returning: nil) + case .resumeConsumerWithFailureAndCallOnTerminations(let payload): + if let error = payload.failure { + payload.continuation.resume(throwing: error) + } else { + payload.continuation.resume(returning: nil) } - onTerminations.forEach { $0.1() } + payload.onTerminations.forEach { $0.1() } case .resumeConsumerWithNil(let continuation): continuation.resume(returning: nil) @@ -543,7 +537,7 @@ extension MultiProducerSingleConsumerAsyncChannel._Storage { /// Actions returned by `sourceDeinitialized()`. @usableFromInline - enum SetOnTerminationCallback { + enum SetOnTerminationCallback: Sendable { /// Indicates that `onTermination` should be called. case callOnTermination( @Sendable () -> Void @@ -615,7 +609,7 @@ extension MultiProducerSingleConsumerAsyncChannel._Storage { /// Actions returned by `sourceDeinitialized()`. @usableFromInline - enum SourceDeinitialized { + enum SourceDeinitialized: Sendable { /// Indicates that the consumer should be resumed with the failure, the producers /// should be resumed with an error and `onTermination`s should be called. case resumeConsumerAndCallOnTerminations( @@ -710,7 +704,7 @@ extension MultiProducerSingleConsumerAsyncChannel._Storage { /// Actions returned by `sequenceDeinitialized()`. @usableFromInline - enum ChannelOrSequenceDeinitializedAction { + enum ChannelOrSequenceDeinitializedAction: Sendable { /// Indicates that `onTermination`s should be called. case callOnTerminations(_TinyArray<(UInt64, @Sendable () -> Void)>) /// Indicates that all producers should be failed and `onTermination`s should be called. @@ -875,7 +869,7 @@ extension MultiProducerSingleConsumerAsyncChannel._Storage { /// Actions returned by `iteratorDeinitialized()`. @usableFromInline - enum IteratorDeinitializedAction { + enum IteratorDeinitializedAction: Sendable { /// Indicates that `onTermination`s should be called. case callOnTerminations(_TinyArray<(UInt64, @Sendable () -> Void)>) /// Indicates that all producers should be failed and `onTermination`s should be called. @@ -1047,11 +1041,11 @@ extension MultiProducerSingleConsumerAsyncChannel._Storage { /// Actions returned by `enqueueProducer()`. @usableFromInline - enum EnqueueProducerAction { + enum EnqueueProducerAction: ~Copyable, Sendable { /// Indicates that the producer should be notified to produce more. - case resumeProducer((Result) -> Void) + case resumeProducer(Disconnected<(Result) -> Void>) /// Indicates that the producer should be notified about an error. - case resumeProducerWithError((Result) -> Void, Error) + case resumeProducerWithError(Disconnected<(Result) -> Void>, Error) } @inlinable @@ -1066,12 +1060,12 @@ extension MultiProducerSingleConsumerAsyncChannel._Storage { channeling.cancelledAsyncProducers.remove(at: index) self = .init(state: .channeling(channeling)) - return .resumeProducerWithError(onProduceMore, CancellationError()) + return .resumeProducerWithError(.init(value: onProduceMore), CancellationError()) } else if channeling.hasOutstandingDemand { // We hit an edge case here where we wrote but the consuming thread got interleaved self = .init(state: .channeling(channeling)) - return .resumeProducer(onProduceMore) + return .resumeProducer(.init(value: onProduceMore)) } else { channeling.suspendedProducers.append((callbackToken, .closure(onProduceMore))) self = .init(state: .channeling(channeling)) @@ -1084,20 +1078,26 @@ extension MultiProducerSingleConsumerAsyncChannel._Storage { // It can happen that the source got finished or the consumption fully finishes. self = .init(state: .sourceFinished(sourceFinished)) - return .resumeProducerWithError(onProduceMore, MultiProducerSingleConsumerAsyncChannelAlreadyFinishedError()) + return .resumeProducerWithError( + .init(value: onProduceMore), + MultiProducerSingleConsumerAsyncChannelAlreadyFinishedError() + ) case .finished(let finished): // Since we are unlocking between sending elements and suspending the send // It can happen that the source got finished or the consumption fully finishes. self = .init(state: .finished(finished)) - return .resumeProducerWithError(onProduceMore, MultiProducerSingleConsumerAsyncChannelAlreadyFinishedError()) + return .resumeProducerWithError( + .init(value: onProduceMore), + MultiProducerSingleConsumerAsyncChannelAlreadyFinishedError() + ) } } /// Actions returned by `enqueueContinuation()`. @usableFromInline - enum EnqueueContinuationAction { + enum EnqueueContinuationAction: Sendable { /// Indicates that the producer should be notified to produce more. case resumeProducer(UnsafeContinuation) /// Indicates that the producer should be notified about an error. @@ -1147,7 +1147,7 @@ extension MultiProducerSingleConsumerAsyncChannel._Storage { /// Actions returned by `cancelProducer()`. @usableFromInline - enum CancelProducerAction { + enum CancelProducerAction: Sendable { /// Indicates that the producer should be notified about cancellation. case resumeProducerWithCancellationError(_MultiProducerSingleConsumerSuspendedProducer) } @@ -1190,7 +1190,7 @@ extension MultiProducerSingleConsumerAsyncChannel._Storage { /// Actions returned by `finish()`. @usableFromInline - enum FinishAction { + enum FinishAction: Sendable { /// Indicates that `onTermination`s should be called. case callOnTerminations(_TinyArray<(UInt64, @Sendable () -> Void)>) /// Indicates that the consumer should be resumed with the failure, the producers @@ -1266,12 +1266,12 @@ extension MultiProducerSingleConsumerAsyncChannel._Storage { /// Actions returned by `next()`. @usableFromInline - enum NextAction { + enum NextAction: ~Copyable, Sendable { /// Indicates that the element should be returned to the caller. - case returnElement(SendableConsumeOnceBox) + case returnElement(Disconnected) /// Indicates that the element should be returned to the caller and that all producers should be called. case returnElementAndResumeProducers( - SendableConsumeOnceBox, + Disconnected, _TinyArray<_MultiProducerSingleConsumerSuspendedProducer> ) /// Indicates that the `Failure` should be returned to the caller and that `onTermination`s should be called. @@ -1310,14 +1310,14 @@ extension MultiProducerSingleConsumerAsyncChannel._Storage { // We don't have any new demand, so we can just return the element. self = .init(state: .channeling(channeling)) - return .returnElement(element) + return .returnElement(Disconnected(value: element.take())) } // There is demand and we have to resume our producers let producers = _TinyArray(channeling.suspendedProducers.lazy.map { $0.1 }) channeling.suspendedProducers.removeAll(keepingCapacity: true) self = .init(state: .channeling(channeling)) - return .returnElementAndResumeProducers(element, producers) + return .returnElementAndResumeProducers(Disconnected(value: element.take()), producers) case .sourceFinished(var sourceFinished): // Check if we have an element left in the buffer and return it @@ -1340,7 +1340,7 @@ extension MultiProducerSingleConsumerAsyncChannel._Storage { } self = .init(state: .sourceFinished(sourceFinished)) - return .returnElement(element) + return .returnElement(Disconnected(value: element.take())) case .finished(let finished): self = .init(state: .finished(finished)) @@ -1351,21 +1351,72 @@ extension MultiProducerSingleConsumerAsyncChannel._Storage { /// Actions returned by `suspendNext()`. @usableFromInline - enum SuspendNextAction: ~Copyable { + enum SuspendNextAction: ~Copyable, Sendable { + /// Payload for `.resumeConsumerWithElement`. + /// + /// - Note: Wrapped in a struct to work around a SILGen crash when + /// destructively pattern-matching an enum case with multiple payloads + /// that include a `~Copyable` generic wrapper type (`Disconnected`). + @usableFromInline + struct ResumeElement: ~Copyable, Sendable { + @usableFromInline let continuation: UnsafeContinuation + @usableFromInline var element: Disconnected + + @inlinable + init( + continuation: UnsafeContinuation, + element: consuming Disconnected + ) { + self.continuation = continuation + self.element = element + } + } + /// Payload for `.resumeConsumerWithElementAndProducers`. + /// + /// - Note: See `ResumeElement` for the reason behind this wrapper. + @usableFromInline + struct ResumeElementAndProducers: ~Copyable, Sendable { + @usableFromInline let continuation: UnsafeContinuation + @usableFromInline var element: Disconnected + @usableFromInline let producerContinuations: _TinyArray<_MultiProducerSingleConsumerSuspendedProducer> + + @inlinable + init( + continuation: UnsafeContinuation, + element: consuming Disconnected, + producerContinuations: _TinyArray<_MultiProducerSingleConsumerSuspendedProducer> + ) { + self.continuation = continuation + self.element = element + self.producerContinuations = producerContinuations + } + } + /// Payload for `.resumeConsumerWithFailureAndCallOnTerminations`. + /// + /// - Note: See `ResumeElement` for the reason behind this wrapper. + @usableFromInline + struct ResumeFailure: ~Copyable, Sendable { + @usableFromInline let continuation: UnsafeContinuation + @usableFromInline let failure: Failure? + @usableFromInline let onTerminations: _TinyArray<(UInt64, @Sendable () -> Void)> + + @inlinable + init( + continuation: UnsafeContinuation, + failure: Failure?, + onTerminations: _TinyArray<(UInt64, @Sendable () -> Void)> + ) { + self.continuation = continuation + self.failure = failure + self.onTerminations = onTerminations + } + } /// Indicates that the consumer should be resumed. - case resumeConsumerWithElement(UnsafeContinuation, SendableConsumeOnceBox) + case resumeConsumerWithElement(ResumeElement) /// Indicates that the consumer and all producers should be resumed. - case resumeConsumerWithElementAndProducers( - UnsafeContinuation, - SendableConsumeOnceBox, - _TinyArray<_MultiProducerSingleConsumerSuspendedProducer> - ) + case resumeConsumerWithElementAndProducers(ResumeElementAndProducers) /// Indicates that the consumer should be resumed with the failure and that `onTermination`s should be called. - case resumeConsumerWithFailureAndCallOnTerminations( - UnsafeContinuation, - Failure?, - _TinyArray<(UInt64, @Sendable () -> Void)> - ) + case resumeConsumerWithFailureAndCallOnTerminations(ResumeFailure) /// Indicates that the consumer should be resumed with `nil`. case resumeConsumerWithNil(UnsafeContinuation) } @@ -1396,7 +1447,9 @@ extension MultiProducerSingleConsumerAsyncChannel._Storage { // We don't have any new demand, so we can just return the element. self = .init(state: .channeling(channeling)) - return .resumeConsumerWithElement(continuation, element) + return .resumeConsumerWithElement( + .init(continuation: continuation, element: Disconnected(value: element.take())) + ) } // There is demand and we have to resume our producers let producers = _TinyArray(channeling.suspendedProducers.lazy.map { $0.1 }) @@ -1404,9 +1457,11 @@ extension MultiProducerSingleConsumerAsyncChannel._Storage { self = .init(state: .channeling(channeling)) return .resumeConsumerWithElementAndProducers( - continuation, - element, - producers + .init( + continuation: continuation, + element: Disconnected(value: element.take()), + producerContinuations: producers + ) ) case .sourceFinished(var sourceFinished): @@ -1424,14 +1479,18 @@ extension MultiProducerSingleConsumerAsyncChannel._Storage { ) return .resumeConsumerWithFailureAndCallOnTerminations( - continuation, - sourceFinished.failure, - sourceFinished.onTerminations + .init( + continuation: continuation, + failure: sourceFinished.failure, + onTerminations: sourceFinished.onTerminations + ) ) } self = .init(state: .sourceFinished(sourceFinished)) - return .resumeConsumerWithElement(continuation, element) + return .resumeConsumerWithElement( + .init(continuation: continuation, element: Disconnected(value: element.take())) + ) case .finished(let finished): self = .init(state: .finished(finished)) @@ -1442,7 +1501,7 @@ extension MultiProducerSingleConsumerAsyncChannel._Storage { /// Actions returned by `cancelNext()`. @usableFromInline - enum CancelNextAction { + enum CancelNextAction: Sendable { /// Indicates that the continuation should be resumed with nil, the producers should be finished and call onTerminations. case resumeConsumerWithNilAndCallOnTerminations( UnsafeContinuation, @@ -1715,11 +1774,14 @@ extension MultiProducerSingleConsumerAsyncChannel._Storage._StateMachine { @available(AsyncAlgorithms 1.1, *) @usableFromInline -enum _MultiProducerSingleConsumerSuspendedProducer { +// TODO: This type is unchecked Sendable since the closure is actually sending +// We can't yet use Disconnected here since we need to have a UniqueDeque to store them +enum _MultiProducerSingleConsumerSuspendedProducer: @unchecked Sendable { case closure((Result) -> Void) case continuation(UnsafeContinuation) } +// TODO: This should be replaced with Disconnected once we have a UniqueDeque @usableFromInline struct SendableConsumeOnceBox { @usableFromInline diff --git a/Sources/AsyncStreaming/AsyncReader/AsyncReader+collect.swift b/Sources/AsyncStreaming/AsyncReader/AsyncReader+collect.swift index 6cdbb8be..ba4c0163 100644 --- a/Sources/AsyncStreaming/AsyncReader/AsyncReader+collect.swift +++ b/Sources/AsyncStreaming/AsyncReader/AsyncReader+collect.swift @@ -14,25 +14,33 @@ public import ContainersPreview import BasicContainers +/// An error indicating that the reader produced more elements than the specified collection limit. +/// +/// This error occurs when calling ``AsyncReader/collect(upTo:body:)`` and the reader's buffer +/// contains more elements than the allowed limit. +public struct AsyncReaderLeftOverElementsError: Error, Hashable { + public init() {} +} + @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) extension AsyncReader where Self: ~Copyable, Self: ~Escapable, ReadElement: ~Copyable { - /// Collects elements from the reader up to a specified limit and processes them with a body function. + /// Collects elements from the reader up to a specified limit and processes them. /// - /// This method continuously reads elements from the async reader, accumulating them in a buffer - /// until either it reaches the end of the stream (indicated by an empty `Span`) or reaches - /// the specified limit. Once collection completes, it passes the accumulated elements to the - /// provided body function as a `Span` for processing. + /// This method continuously reads elements from the async reader, accumulating them in an + /// internal buffer until either it reaches the end of the stream or the specified limit. + /// Once collection completes, it passes the accumulated elements to the provided body + /// closure as an `InputSpan` for processing. /// /// - Parameters: /// - limit: The maximum number of elements to collect. This prevents unbounded memory /// growth when reading from potentially infinite streams. - /// - body: A closure that receives a `Span` containing all collected elements and returns - /// a result of type `Result`. The method calls this closure once after collecting all - /// elements successfully. + /// - body: A closure that receives an `InputSpan` containing all collected elements and returns + /// a result of type `Result`. /// /// - Returns: The value returned by the body closure after processing the collected elements. /// - /// - Throws: An `EitherError` containing either a `ReadFailure` from the read operation + /// - Throws: An `EitherError` wrapping either a read failure (which itself may be an + /// ``AsyncReaderLeftOverElementsError`` if the reader produces more elements than the limit), /// or a `Failure` from the body closure. /// /// ## Example @@ -44,153 +52,41 @@ extension AsyncReader where Self: ~Copyable, Self: ~Escapable, ReadElement: ~Cop /// // Process all collected elements /// } /// ``` - /// - /// ## Memory Considerations - /// - /// Since this method buffers all elements in memory before processing, it should be used - /// with caution on large datasets. The `limit` parameter serves as a safety mechanism - /// to prevent excessive memory usage. public mutating func collect( upTo limit: Int, body: (consuming InputSpan) async throws(Failure) -> Result - ) async throws(EitherError) -> Result { + ) async throws(EitherError, Failure>) -> Result { // TODO: In the future we might want to use a temporary allocation instead // but those don't support async closures yet. - var buffer = UniqueArray() - buffer.reserveCapacity(limit) + var collectedBuffer = UniqueArray() + collectedBuffer.reserveCapacity(limit) var shouldContinue = true do { while shouldContinue { - try await self.read( - maximumCount: limit - buffer.count - ) { (span: consuming InputSpan) in - guard span.count > 0 else { + try await self.read { (buffer: inout Buffer) throws(AsyncReaderLeftOverElementsError) -> Void in + guard buffer.count > 0 else { shouldContinue = false return } - precondition(span.count <= limit - buffer.count) - while let element = span.popFirst() { - buffer.append(element) + if limit - collectedBuffer.count < buffer.count { + throw AsyncReaderLeftOverElementsError() + } + var consumer = buffer.consumeAll() + while let element = consumer.next() { + collectedBuffer.append(element) } } } } catch { - switch error { - case .first(let error): - throw .first(error) - case .second: - fatalError() - } + throw .first(error) } do { - var consumer = buffer.consumeAll() + var consumer = collectedBuffer.consumeAll() return try await body(consumer.drainNext()) } catch { throw .second(error) } } - - /// Collects elements from the reader up to a specified limit and processes them with a body function. - /// - /// This method continuously reads elements from the async reader, accumulating them in a buffer - /// until either it reaches the end of the stream (indicated by an empty `Span`) or reaches - /// the specified limit. Once collection completes, it passes the accumulated elements to the - /// provided body function as a `Span` for processing. - /// - /// - Parameters: - /// - limit: The maximum number of elements to collect. This prevents unbounded memory - /// growth when reading from potentially infinite streams. - /// - body: A closure that receives a `Span` containing all collected elements and returns - /// a result of type `Result`. The method calls this closure once after collecting all - /// elements successfully. - /// - /// - Returns: The value returned by the body closure after processing the collected elements. - /// - /// ## Example - /// - /// ```swift - /// var reader: SomeAsyncReader = ... - /// - /// let processedData = try await reader.collect(upTo: 1000) { span in - /// // Process all collected elements - /// } - /// ``` - /// - /// ## Memory Considerations - /// - /// Since this method buffers all elements in memory before processing, it should be used - /// with caution on large datasets. The `limit` parameter serves as a safety mechanism - /// to prevent excessive memory usage. - public mutating func collect( - upTo limit: Int, - body: (consuming InputSpan) async -> Result - ) async -> Result where ReadFailure == Never { - // TODO: In the future we might want to use a temporary allocation instead - // but those don't support async closures yet. - var buffer = UniqueArray() - buffer.reserveCapacity(limit) - var shouldContinue = true - while limit - buffer.count > 0 && shouldContinue { - // This force-try is safe since neither read nor the closure are throwing - try! await self.read( - maximumCount: limit - buffer.count - ) { (span: consuming InputSpan) in - precondition(span.count <= limit - buffer.count) - guard span.count > 0 else { - // This means the underlying reader is finished and we can return - shouldContinue = false - return - } - while let element = span.popFirst() { - buffer.append(element) - } - } - } - var consumer = buffer.consumeAll() - return await body(consumer.drainNext()) - } - - /// Collects elements from the reader into an output span until the span is full. - /// - /// This method continuously reads elements from the async reader and appends them to the - /// provided output span until the span reaches its capacity. This provides an efficient - /// way to fill a pre-allocated buffer with elements from the reader. - /// - /// - Parameter outputSpan: An `OutputSpan` to append read elements into. The method continues - /// reading until this span is full. - /// - /// - Throws: An error of type `ReadFailure` if any read operation fails. - /// - /// ## Example - /// - /// ```swift - /// var reader: SomeAsyncReader = ... - /// var buffer = [Int](repeating: 0, count: 100) - /// - /// try await buffer.withOutputSpan { outputSpan in - /// try await reader.collect(into: &outputSpan) - /// } - /// ``` - public mutating func collect( - into outputSpan: inout OutputSpan - ) async throws(ReadFailure) { - while !outputSpan.isFull { - do { - try await self.read(maximumCount: outputSpan.freeCapacity) { (span: consuming InputSpan) in - while let element = span.popFirst() { - outputSpan.append(element) - } - } - } catch { - switch error { - case .first(let error): - throw error - case .second: - fatalError() - } - } - } - } } #endif diff --git a/Sources/AsyncStreaming/AsyncReader/AsyncReader+forEach.swift b/Sources/AsyncStreaming/AsyncReader/AsyncReader+forEach.swift index 42ab37dc..e2fd1de2 100644 --- a/Sources/AsyncStreaming/AsyncReader/AsyncReader+forEach.swift +++ b/Sources/AsyncStreaming/AsyncReader/AsyncReader+forEach.swift @@ -16,14 +16,14 @@ public import ContainersPreview // swift-format-ignore: AmbiguousTrailingClosureOverload @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) extension AsyncReader where Self: ~Copyable, Self: ~Escapable { - /// Iterates over all chunks from the reader, executing the provided body for each span. + /// Iterates over all chunks from the reader, executing the provided body for each buffer. /// /// This method continuously reads chunks from the async reader until the stream ends, - /// executing the provided closure for each span of elements read. The iteration terminates - /// when the reader produces an empty span, indicating the end of the stream. + /// executing the provided closure for each buffer of elements read. The iteration terminates + /// when the reader produces an empty buffer, indicating the end of the stream. /// - /// - Parameter body: An asynchronous closure that processes each span of elements read - /// from the stream. The closure receives a `Span` for each read operation. + /// - Parameter body: An asynchronous closure that processes each buffer of elements read + /// from the stream. /// /// - Throws: An `EitherError` containing either a `ReadFailure` from the read operation /// or a `Failure` from the body closure. @@ -33,14 +33,12 @@ extension AsyncReader where Self: ~Copyable, Self: ~Escapable { /// ```swift /// var fileReader: FileAsyncReader = ... /// - /// // Process each chunk of data from the file - /// try await fileReader.forEach { chunk in - /// print("Processing \(chunk.count) elements") - /// // Process the chunk + /// try await fileReader.forEachBuffer { buffer in + /// print("Processing \(buffer.count) elements") /// } /// ``` - public consuming func forEachChunk( - body: (consuming InputSpan) async throws(Failure) -> Void + public consuming func forEachBuffer( + body: (inout Buffer) async throws(Failure) -> Void ) async throws(EitherError) { var shouldContinue = true while shouldContinue { @@ -50,37 +48,34 @@ extension AsyncReader where Self: ~Copyable, Self: ~Escapable { return } - try await body(next) + try await body(&next) } } } - /// Iterates over all chunks from the reader, executing the provided body for each span. + /// Iterates over all chunks from a non-failing reader, executing the provided body for each buffer. /// /// This method continuously reads chunks from the async reader until the stream ends, - /// executing the provided closure for each span of elements read. The iteration terminates - /// when the reader produces an empty span, indicating the end of the stream. + /// executing the provided closure for each buffer of elements read. The iteration terminates + /// when the reader produces an empty buffer, indicating the end of the stream. /// - /// - Parameter body: An asynchronous closure that processes each span of elements read - /// from the stream. The closure receives a `Span` for each read operation. + /// Use this overload when the reader's ``AsyncReader/ReadFailure`` type is `Never`. /// - /// - Throws: An error of type `Failure` from the body closure. Since this reader never fails, - /// only the body closure can throw errors. + /// - Parameter body: An asynchronous closure that processes each buffer of elements read + /// from the stream. /// /// ## Example /// /// ```swift /// var fileReader: FileAsyncReader = ... /// - /// // Process each chunk of data from the file - /// try await fileReader.forEach { chunk in - /// print("Processing \(chunk.count) elements") - /// // Process the chunk + /// await fileReader.forEachBuffer { buffer in + /// print("Processing \(buffer.count) elements") /// } /// ``` @inlinable - public consuming func forEachChunk( - body: (consuming InputSpan) async -> Void + public consuming func forEachBuffer( + body: (inout Buffer) async -> Void ) async where ReadFailure == Never { var shouldContinue = true while shouldContinue { @@ -91,7 +86,7 @@ extension AsyncReader where Self: ~Copyable, Self: ~Escapable { return } - await body(next) + await body(&next) } } catch { fatalError() diff --git a/Sources/AsyncStreaming/AsyncReader/AsyncReader.swift b/Sources/AsyncStreaming/AsyncReader/AsyncReader.swift index 549a36b1..7f951e34 100644 --- a/Sources/AsyncStreaming/AsyncReader/AsyncReader.swift +++ b/Sources/AsyncStreaming/AsyncReader/AsyncReader.swift @@ -12,58 +12,47 @@ #if UnstableAsyncStreaming && compiler(>=6.4) public import ContainersPreview -/// Reads elements asynchronously from a source. +/// Reads elements asynchronously from a source using callee-managed buffering. /// -/// Adopt ``AsyncReader`` when you need to provide callee-managed buffering, -/// where the reader controls the buffer and passes a span of elements -/// to the caller through the `body` closure. +/// Adopt ``AsyncReader`` when you need callee-managed buffering, +/// where the reader controls the buffer and passes it to the caller +/// through the `body` closure. @available(macOS 10.14.4, iOS 12.2, watchOS 5.2, tvOS 12.2, visionOS 1.0, *) public protocol AsyncReader: ~Copyable, ~Escapable { /// The type of elements this reader reads. associatedtype ReadElement: ~Copyable + /// The container type the reader uses to pass elements to the caller. + associatedtype Buffer: RangeReplaceableContainer & ~Copyable + /// The error type that reading operations throw. associatedtype ReadFailure: Error /// Reads elements from the underlying source and passes them to the provided body closure. /// - /// This method asynchronously reads a span of elements from the source, - /// then passes them to `body` for processing. + /// This method asynchronously reads elements from the source into a buffer, + /// then passes the buffer to `body` for processing. When the buffer is empty, + /// the stream has ended. /// /// ```swift /// var fileReader: FileAsyncReader = ... /// - /// // Read data from a file asynchronously and process it. - /// let result = try await fileReader.read { data in - /// guard data.count > 0 else { - /// return + /// let result = try await fileReader.read { buffer in + /// guard buffer.count > 0 else { + /// return 0 /// } - /// return data + /// return buffer.count /// } /// ``` /// - /// - Parameter maximumCount: The maximum count of items you're ready - /// to process. Must be greater than zero. - /// - Parameter body: A closure that processes a span of read elements - /// and returns a value of type `Return`. When the span is empty, - /// it indicates the end of the stream. + /// - Parameter body: A closure that receives a mutable reference to the buffer + /// of read elements and returns a value of type `Return`. When the buffer + /// is empty, it indicates the end of the stream. /// - Returns: The value the body closure returns after processing the read elements. /// - Throws: An `EitherError` containing either a `ReadFailure` from the read operation /// or a `Failure` from the body closure. mutating func read( - maximumCount: Int, - body: (consuming InputSpan) async throws(Failure) -> Return + body: (inout Buffer) async throws(Failure) -> Return ) async throws(EitherError) -> Return - -} - -@available(macOS 10.14.4, iOS 12.2, watchOS 5.2, tvOS 12.2, visionOS 1.0, *) -extension AsyncReader where Self: ~Copyable, Self: ~Escapable, ReadElement: ~Copyable { - /// Reads elements with no upper bound on span size. - public mutating func read( - body: (consuming InputSpan) async throws(Failure) -> Return - ) async throws(EitherError) -> Return { - try await read(maximumCount: .max, body: body) - } } #endif diff --git a/Sources/AsyncStreaming/AsyncWriter/AsyncWriter.swift b/Sources/AsyncStreaming/AsyncWriter/AsyncWriter.swift index 6a14b427..999d33b9 100644 --- a/Sources/AsyncStreaming/AsyncWriter/AsyncWriter.swift +++ b/Sources/AsyncStreaming/AsyncWriter/AsyncWriter.swift @@ -10,26 +10,31 @@ //===----------------------------------------------------------------------===// #if UnstableAsyncStreaming && compiler(>=6.4) -/// Writes elements asynchronously to a destination using a provided buffer. +public import ContainersPreview + +/// Writes elements asynchronously to a destination using callee-managed buffering. /// -/// Adopt ``AsyncWriter`` when you need to provide callee-managed buffering, -/// where the writer supplies an output span buffer that the caller fills +/// Adopt ``AsyncWriter`` when you need callee-managed buffering, +/// where the writer supplies a buffer that the caller fills /// with elements to write. @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) public protocol AsyncWriter: ~Copyable, ~Escapable { /// The type of elements this writer writes. associatedtype WriteElement: ~Copyable + /// The container type the writer uses to receive elements from the caller. + associatedtype Buffer: RangeReplaceableContainer & ~Copyable + /// The error type that writing operations throw. associatedtype WriteFailure: Error /// Provides a buffer for writing elements to the destination. /// - /// The writer supplies an output span that `body` uses to append elements. + /// The writer supplies a buffer that `body` uses to append elements. /// The writer manages the buffer allocation and handles the writing /// operation once `body` completes. /// - /// - Parameter body: A closure that receives an `OutputSpan` for appending elements + /// - Parameter body: A closure that receives a buffer for appending elements /// to write. The closure returns a result of type `Return`. /// /// - Returns: The value the body closure returns. @@ -42,15 +47,15 @@ public protocol AsyncWriter: ~Copyable, ~Escapable { /// ```swift /// var writer: SomeAsyncWriter = ... /// - /// try await writer.write { outputSpan in + /// try await writer.write { buffer in /// for item in items { - /// outputSpan.append(item) + /// buffer.append(item) /// } - /// return outputSpan.count + /// return buffer.count /// } /// ``` mutating func write( - _ body: (inout OutputSpan) async throws(Failure) -> Return + _ body: (inout Buffer) async throws(Failure) -> Return ) async throws(EitherError) -> Return } #endif diff --git a/Sources/AsyncStreaming/CallerAsyncReader/CallerAsyncReader.swift b/Sources/AsyncStreaming/CallerAsyncReader/CallerAsyncReader.swift index c6d2b51f..c2f982bb 100644 --- a/Sources/AsyncStreaming/CallerAsyncReader/CallerAsyncReader.swift +++ b/Sources/AsyncStreaming/CallerAsyncReader/CallerAsyncReader.swift @@ -10,10 +10,12 @@ //===----------------------------------------------------------------------===// #if UnstableAsyncStreaming && compiler(>=6.4) +public import ContainersPreview + /// Reads elements asynchronously into a caller-provided buffer. /// /// Adopt ``CallerAsyncReader`` when you need caller-managed buffering, -/// where the caller supplies an output span that the reader fills +/// where the caller supplies a buffer that the reader fills /// with elements. @available(macOS 10.14.4, iOS 12.2, watchOS 5.2, tvOS 12.2, visionOS 1.0, *) public protocol CallerAsyncReader: ~Copyable, ~Escapable { @@ -28,10 +30,10 @@ public protocol CallerAsyncReader: ~Copyable, ~Escapab /// This method appends elements into `buffer`. When the read operation /// reaches the end of the source, it appends no elements. /// - /// - Parameter buffer: The output span to fill with read elements. + /// - Parameter buffer: The buffer to fill with read elements. /// - Throws: A `ReadFailure` from the underlying read operation. - mutating func read( - into buffer: inout OutputSpan - ) async throws(ReadFailure) + mutating func read & ~Copyable>( + into buffer: inout Buffer + ) async throws(ReadFailure) where Buffer.Element: ~Copyable } #endif diff --git a/Sources/AsyncStreaming/CallerAsyncWriter/CallerAsyncWriter.swift b/Sources/AsyncStreaming/CallerAsyncWriter/CallerAsyncWriter.swift index 3b88fcd2..36d55309 100644 --- a/Sources/AsyncStreaming/CallerAsyncWriter/CallerAsyncWriter.swift +++ b/Sources/AsyncStreaming/CallerAsyncWriter/CallerAsyncWriter.swift @@ -12,10 +12,10 @@ #if UnstableAsyncStreaming && compiler(>=6.4) public import ContainersPreview -/// Writes elements asynchronously from a caller-provided span. +/// Writes elements asynchronously from a caller-provided buffer. /// /// Adopt ``CallerAsyncWriter`` when you need caller-managed buffering, -/// where the caller provides a span of elements for the writer +/// where the caller provides a buffer of elements for the writer /// to consume. @available(macOS 10.14.4, iOS 12.2, watchOS 5.2, tvOS 12.2, visionOS 1.0, *) public protocol CallerAsyncWriter: ~Copyable, ~Escapable { @@ -25,27 +25,25 @@ public protocol CallerAsyncWriter: ~Copyable, ~Escap /// The error type that writing operations throw. associatedtype WriteFailure: Error - /// Writes a span of elements to the underlying destination. + /// Writes elements from the provided buffer to the underlying destination. /// - /// This method asynchronously writes all elements from the provided span to whatever destination - /// the writer represents. The operation may require multiple write calls to complete if the - /// writer cannot accept all elements at once. + /// This method asynchronously writes all elements from the provided buffer to the destination + /// the writer represents. /// - /// - Parameter span: The span of elements to write. If not all elements can be written, `span` will be non-empty after `write` returns + /// - Parameter buffer: The buffer of elements to write. /// - /// - Throws: A `WriteFailure` from the underlying write operation + /// - Throws: A `WriteFailure` from the underlying write operation. /// /// ## Example /// /// ```swift /// var fileWriter: FileAsyncWriter = ... - /// let dataBuffer: [UInt8] = [1, 2, 3, 4, 5] + /// var data = UniqueArray(capacity: 5, copying: [1, 2, 3, 4, 5]) /// - /// // Write the entire span to a file asynchronously - /// try await fileWriter.write(dataBuffer.span) + /// try await fileWriter.write(buffer: &data) /// ``` - mutating func write( - span: borrowing InputSpan - ) async throws(WriteFailure) + mutating func write & ~Copyable>( + buffer: inout Buffer + ) async throws(WriteFailure) where Buffer.Element: ~Copyable } #endif diff --git a/Sources/AsyncStreaming/EitherError.swift b/Sources/AsyncStreaming/EitherError.swift index 2f7ccc60..a3ccebcd 100644 --- a/Sources/AsyncStreaming/EitherError.swift +++ b/Sources/AsyncStreaming/EitherError.swift @@ -17,20 +17,16 @@ @frozen public enum EitherError: Error { /// An error of the first type. - /// - /// The associated value contains the specific error instance of type `First`. case first(First) /// An error of the second type. - /// - /// The associated value contains the specific error instance of type `Second`. case second(Second) - /// Throws the underlying error, unwrapping this either error. + /// Throws the underlying error, unwrapping this ``EitherError``. /// - /// This method extracts and throws the error in the either error, + /// This method extracts and throws the contained error, /// whether it's the first or second type. Use this when you need to propagate - /// the original error without the either error wrapper. + /// the original error without the ``EitherError`` wrapper. /// /// - Throws: The underlying error, either of type `First` or `Second`. /// @@ -38,10 +34,9 @@ public enum EitherError: Error { /// /// ```swift /// do { - /// // Some operation that returns EitherError /// let result = try await operation() /// } catch let eitherError as EitherError { - /// try eitherError.unwrap() // Throws the original error + /// try eitherError.unwrap() /// } /// ``` public func unwrap() throws { diff --git a/Sources/AsyncStreaming/NNNN-async-streaming.md b/Sources/AsyncStreaming/NNNN-async-streaming.md index ccf1a336..d21c89fc 100644 --- a/Sources/AsyncStreaming/NNNN-async-streaming.md +++ b/Sources/AsyncStreaming/NNNN-async-streaming.md @@ -138,32 +138,32 @@ the 2×2 possibility matrix: { Caller Buffers, Callee Buffers } × { Read, Write } -These protocols use Swift's `InputSpan` and `OutputSpan` types to enforce that -buffers are not stored after the operation completes, enabling safe reuse. For -the callee-owned variants, a closure-scoped API ensures the compiler prevents -buffer escape. +These protocols use generic buffer types constrained to `RangeReplaceableContainer` +from the Swift Collections library. For the callee-owned variants, a closure-scoped +API with `inout` access ensures the caller can process elements without taking +ownership of the buffer, enabling safe reuse. The bite-sized pseudocode: ``` protocol AsyncReader { - // Callee provides a full buffer; caller drains it - func read(body: (buffer) throws -> R) throws -> R + // Callee provides a full buffer; caller processes it + func read(body: (inout buffer) throws -> R) throws -> R } protocol CallerAsyncReader { // Caller provides an empty buffer; callee fills it - func read(into buffer: buffer) throws + func read(into buffer: inout buffer) throws } -protocol AsyncWriter { +protocol CallerAsyncWriter { // Caller provides a full buffer; callee drains it - func write(span: buffer) throws + func write(buffer: inout buffer) throws } -protocol CalleeAsyncWriter { +protocol AsyncWriter { // Callee provides an empty buffer; caller fills it - func write(body: (buffer) throws -> R) throws -> R + func write(body: (inout buffer) throws -> R) throws -> R } ``` @@ -175,7 +175,7 @@ question "which one(s) should my type conform to?". We believe that applying of situations: > If you are not sure, pick callee-owned (`AsyncReader`) for read streams and -> caller-owned (`AsyncWriter`) for write streams. +> caller-owned (`CallerAsyncWriter`) for write streams. The intuition: data produced by the callee flows toward the caller on the read side, so the callee is the natural owner of the buffer. On the write side, data @@ -184,8 +184,8 @@ owner. The "other" pair exists for cases where the default imposes unnecessary overhead. By documenting this rule and following it in our own types, we expect developers -to naturally reach for `AsyncReader` and `AsyncWriter`, leaving -`CallerAsyncReader` and `CalleeAsyncWriter` for the specialized situations that +to naturally reach for `AsyncReader` and `CallerAsyncWriter`, leaving +`CallerAsyncReader` and `AsyncWriter` for the specialized situations that truly need them. ### Bridging with `AsyncSequence` @@ -272,45 +272,33 @@ extensions allow ergonomic use when one side cannot fail. ### Callee-owned async reader (preferred read type) -The callee-owned reader controls the buffer and passes a span of elements to the -caller through a scoped closure. This is the preferred protocol for read -streams. +The callee-owned reader controls the buffer and passes a mutable reference to it +through a scoped closure. This is the preferred protocol for read streams. ```swift -public protocol AsyncReader: ~Copyable, ~Escapable { +public protocol AsyncReader: ~Copyable, ~Escapable { /// The type of elements that can be read. - associatedtype Element: ~Copyable + associatedtype ReadElement: ~Copyable + + /// The container type the reader uses to pass elements to the caller. + associatedtype Buffer: RangeReplaceableContainer & ~Copyable /// The type of error thrown during reading. - associatedtype Failure: Error + associatedtype ReadFailure: Error /// Reads elements from the source and passes them to the body closure. /// - /// The reader fills an internal buffer from its source and passes a span - /// of the read elements to `body`. When the span is empty, the stream + /// The reader fills an internal buffer from its source and passes a mutable + /// reference to it to `body`. When the buffer is empty, the stream /// has ended. /// - /// - Parameter maximumCount: The maximum number of elements the caller is - /// ready to process. Must be greater than zero. /// - Parameter body: A closure that processes the read elements. /// - Returns: The value returned by the body closure. - /// - Throws: An `EitherError` containing either a `Failure` from the read - /// operation or a `ConsumerFailure` from the body closure. - mutating func read( - maximumCount: Int, - body: (inout InputSpan) async throws(ConsumerFailure) -> Return - ) async throws(EitherError) -> Return -} - -extension AsyncReader { - /// Reads elements with no upper bound on span size. - /// - /// This convenience calls `read(maximumCount: .max, body:)`. - mutating func read( - body: (inout InputSpan) async throws(ConsumerFailure) -> Return - ) async throws(EitherError) -> Return { - try await read(maximumCount: .max, body: body) - } + /// - Throws: An `EitherError` containing either a `ReadFailure` from the read + /// operation or a `Failure` from the body closure. + mutating func read( + body: (inout Buffer) async throws(Failure) -> Return + ) async throws(EitherError) -> Return } ``` @@ -320,50 +308,48 @@ The caller provides a buffer that the reader fills with elements from the source. ```swift -public protocol CallerAsyncReader: ~Copyable, ~Escapable { +public protocol CallerAsyncReader: ~Copyable, ~Escapable { /// The type of elements that can be read. - associatedtype Element: ~Copyable + associatedtype ReadElement: ~Copyable /// The type of error thrown during reading. - associatedtype Failure: Error + associatedtype ReadFailure: Error /// Reads elements from the source into the provided buffer. /// /// Appends elements into `buffer`. When the read operation reaches the /// end of the source, no elements are appended. /// - /// - Parameter buffer: The output span to fill with read elements. - /// - Throws: A `Failure` from the underlying read operation. - mutating func read( - into buffer: inout OutputSpan - ) async throws(Failure) + /// - Parameter buffer: The buffer to fill with read elements. + /// - Throws: A `ReadFailure` from the underlying read operation. + mutating func read>( + into buffer: inout Buffer + ) async throws(ReadFailure) where Buffer.Element: ~Copyable } ``` ### Caller-owned async writer (preferred write type) -The caller provides a span of elements for the writer to consume. This is the +The caller provides a buffer of elements for the writer to consume. This is the preferred protocol for write streams. ```swift -public protocol AsyncWriter: ~Copyable, ~Escapable { +public protocol CallerAsyncWriter: ~Copyable, ~Escapable { /// The type of elements that can be written. associatedtype WriteElement: ~Copyable /// The type of error thrown during writing. associatedtype WriteFailure: Error - /// Writes a span of elements to the destination. + /// Writes elements from the provided buffer to the destination. /// - /// Asynchronously writes all elements from the provided span. If the - /// writer cannot accept all elements at once, `span` will be non-empty - /// after `write` returns. + /// Asynchronously writes all elements from the provided buffer. /// - /// - Parameter span: The span of elements to write. + /// - Parameter buffer: The buffer of elements to write. /// - Throws: A `WriteFailure` from the underlying write operation. - mutating func write( - span: borrowing InputSpan - ) async throws(WriteFailure) + mutating func write & ~Copyable>( + buffer: inout Buffer + ) async throws(WriteFailure) where Buffer.Element: ~Copyable } ``` @@ -372,26 +358,29 @@ public protocol AsyncWriter: ~Copyable, ~Escapable { The writer provides a buffer that the caller fills with elements to write. ```swift -public protocol CalleeAsyncWriter: ~Copyable, ~Escapable { +public protocol AsyncWriter: ~Copyable, ~Escapable { /// The type of elements that can be written. associatedtype WriteElement: ~Copyable + /// The container type the writer uses to receive elements from the caller. + associatedtype Buffer: RangeReplaceableContainer & ~Copyable + /// The type of error thrown during writing. associatedtype WriteFailure: Error /// Provides a buffer for writing elements to the destination. /// - /// The writer supplies an output span that the `body` closure fills with + /// The writer supplies a buffer that the `body` closure fills with /// elements. After the closure returns, the writer handles the actual /// write operation. /// - /// - Parameter body: A closure that receives an `OutputSpan` to fill. + /// - Parameter body: A closure that receives a mutable buffer to fill. /// - Returns: The value returned by the body closure. /// - Throws: An `EitherError` containing either a `WriteFailure` from the - /// write operation or a `ProducerFailure` from the body closure. - mutating func write( - _ body: (inout OutputSpan) async throws(ProducerFailure) -> Return - ) async throws(EitherError) -> Return + /// write operation or a `Failure` from the body closure. + mutating func write( + _ body: (inout Buffer) async throws(Failure) -> Return + ) async throws(EitherError) -> Return } ``` @@ -404,21 +393,21 @@ not support noncopyable elements. #### `AsyncReader` to `AsyncSequence` ```swift -extension AsyncReader where Self: ~Copyable, Self: ~Escapable, Element: Copyable { +extension AsyncReader where Self: ~Copyable, Self: ~Escapable, ReadElement: Copyable { /// Returns an `AsyncSequence` that yields the elements of this reader /// one at a time. /// /// The returned sequence calls `read` on the underlying reader and - /// yields the elements from each span individually. + /// yields the elements from each buffer individually. public var asyncSequence: AsyncReaderSequence { get } } /// An `AsyncSequence` adapter over an `AsyncReader`. public struct AsyncReaderSequence< Reader: AsyncReader & ~Copyable & ~Escapable ->: AsyncSequence where Reader.Element: Copyable { - public typealias Element = Reader.Element - public typealias Failure = Reader.Failure +>: AsyncSequence where Reader.ReadElement: Copyable { + public typealias Element = Reader.ReadElement + public typealias Failure = Reader.ReadFailure public struct AsyncIterator: AsyncIteratorProtocol { public mutating func next() async throws(Failure) -> Element? @@ -435,7 +424,7 @@ extension AsyncSequence { /// Returns an `AsyncReader` that reads elements from this sequence. /// /// Each call to `read` on the returned reader advances the sequence's - /// iterator and passes available elements to the body closure as a span. + /// iterator and passes available elements to the body closure as a buffer. public var asyncReader: AsyncSequenceReader { get } } @@ -443,13 +432,12 @@ extension AsyncSequence { public struct AsyncSequenceReader< Base: AsyncSequence >: AsyncReader { - public typealias Element = Base.Element - public typealias Failure = Base.Failure + public typealias ReadElement = Base.Element + public typealias ReadFailure = Base.Failure - public mutating func read( - maximumCount: Int?, - body: (inout InputSpan) async throws(ConsumerFailure) -> Return - ) async throws(EitherError) -> Return + public mutating func read( + body: (inout Buffer) async throws(Failure) -> Return + ) async throws(EitherError) -> Return } ``` @@ -494,20 +482,55 @@ patterns, for example: These extensions are straightforward to add in a follow-up proposal once the core protocols are established. +### Iteration and collection helpers + +Two common patterns emerge immediately when working with `AsyncReader`: iterating +over all chunks until the stream ends, and collecting elements into a buffer up +to a specified limit. We envision convenience extensions for both: + +```swift +extension AsyncReader where Self: ~Copyable, Self: ~Escapable { + /// Iterates over all chunks, executing `body` for each buffer until the + /// stream ends. + public consuming func forEachBuffer( + body: (inout Buffer) async throws(Failure) -> Void + ) async throws(EitherError) +} + +extension AsyncReader where Self: ~Copyable, Self: ~Escapable, ReadElement: ~Copyable { + /// Collects elements up to `limit`, then passes the accumulated + /// elements to `body` as an `InputSpan`. + public mutating func collect( + upTo limit: Int, + body: (consuming InputSpan) async throws(Failure) -> Result + ) async throws(EitherError, Failure>) -> Result +} +``` + +`forEachBuffer` provides a simple way to consume an entire stream without manually +looping over `read` calls and checking for empty buffers. `collect` accumulates +elements from multiple reads into a single buffer before processing, which is +useful when an algorithm needs all data in contiguous memory (for example, +parsing a complete message frame). + +These helpers are intentionally excluded from this proposal because their error +handling semantics (particularly `collect`'s nested `EitherError` and the +`AsyncReaderLeftOverElementsError` overflow behavior) benefit from real-world +usage feedback before stabilization. + ### Owned buffer transfer protocols -The four protocols in this proposal all use `InputSpan` and `OutputSpan` to -provide scoped, non-escaping access to buffers. This is optimal for -high-throughput streaming where buffer reuse matters — the compiler enforces -that the buffer cannot be stored after the closure returns, enabling safe reuse -without copies. +The four protocols in this proposal all use generic `RangeReplaceableContainer` +buffers with closure-scoped `inout` access. This is optimal for high-throughput +streaming where buffer reuse matters — the caller processes elements in-place +without taking ownership, enabling safe reuse without copies. However, there are important message-oriented I/O patterns where the caller needs to take ownership of the data with an independent lifetime. Consider an HTTP/2 proxy: the proxy decodes a DATA frame, which internally allocates a -buffer to hold the frame payload. With span-based protocols, the proxy receives -a borrowed view and must copy the data if it needs to store it for retry or -fan-out to multiple downstream connections. +buffer to hold the frame payload. With the closure-scoped protocols, the proxy +receives a mutable reference and must copy the data if it needs to store it for +retry or fan-out to multiple downstream connections. An owned buffer transfer protocol would let the decoder hand over its internally-allocated buffer by value. The proxy can store it, retry a failed @@ -524,7 +547,7 @@ protocol OwnedAsyncReader: ~Copyable, ~Escapable { ``` This differs fundamentally from the caller/callee distinction in this proposal. -The span-based protocols are about **who manages a reusable buffer during a +The closure-scoped protocols are about **who manages a reusable buffer during a streaming operation**. Owned buffer transfer is about **transferring independently-allocated data with a lifecycle decoupled from the stream**. This is the distinction between stream-oriented I/O (TCP byte streams, file reads) @@ -788,8 +811,8 @@ Swift's structured concurrency model avoids the cancellation and buffer lifetime problems that fragment Rust's ecosystem: structured concurrency guarantees that child tasks complete before the parent scope exits, and cancellation is cooperative. The closure-scoped design of the callee-owned protocols provides -compiler-enforced safety — `InputSpan` and `OutputSpan` are `~Escapable`, so the -compiler guarantees at the type level that buffers cannot outlive their scope. +compiler-enforced safety — the `Buffer` associated type is `~Copyable`, and the +`inout` closure pattern ensures buffers cannot be stored beyond the closure scope. This is stronger than Go's convention-based "must not retain p" and .NET's runtime-only enforcement after `AdvanceTo`. diff --git a/Tests/AsyncStreamingTests/AsyncReader/AsyncReader+collectTests.swift b/Tests/AsyncStreamingTests/AsyncReader/AsyncReader+collectTests.swift index 36fa922c..4641fedd 100644 --- a/Tests/AsyncStreamingTests/AsyncReader/AsyncReader+collectTests.swift +++ b/Tests/AsyncStreamingTests/AsyncReader/AsyncReader+collectTests.swift @@ -20,10 +20,10 @@ import Testing struct AsyncReaderCollectTests { @Test @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func collectAllElements() async { + func collectAllElements() async throws { var reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 5, copying: [1, 2, 3, 4, 5])) - let result = await reader.collect(upTo: 10) { span in + let result = try await reader.collect(upTo: 10) { span in return Array(span) } @@ -32,10 +32,10 @@ struct AsyncReaderCollectTests { @Test @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func collectWithExactLimit() async { + func collectWithExactLimit() async throws { var reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 5, copying: [1, 2, 3, 4, 5])) - let result = await reader.collect(upTo: 5) { span in + let result = try await reader.collect(upTo: 5) { span in return Array(span) } @@ -44,10 +44,10 @@ struct AsyncReaderCollectTests { @Test @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func collectEmptyReader() async { + func collectEmptyReader() async throws { var reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 0, copying: [])) - let result = await reader.collect(upTo: 10) { span in + let result = try await reader.collect(upTo: 10) { span in return span.count } @@ -56,10 +56,10 @@ struct AsyncReaderCollectTests { @Test @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func collectProcessesAllElements() async { + func collectProcessesAllElements() async throws { var reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 3, copying: [10, 20, 30])) - let result = await reader.collect(upTo: 10) { span in + let result = try await reader.collect(upTo: 10) { span in var sum = 0 for i in span.indices { sum += span[i] @@ -72,29 +72,17 @@ struct AsyncReaderCollectTests { @Test @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func collectIntoOutputSpan() async { - // TODO: Cannot test this yet since we can't get `InputSpan`s available in async contexts - // var reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 5, copying: [1, 2, 3, 4, 5])) - // var buffer = RigidArray.init(capacity: 5) - // - // await buffer.append(count: 5) { outputSpan in - // await reader.collect(into: &outputSpan) - // } - // - // #expect(buffer.count == 5) - } - - @Test - @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func collectWithNeverFailingReader() async { + func collectThrowsLeftOverElements() async throws { var reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 3, copying: [1, 2, 3])) - // This tests the Never overload - let result = await reader.collect(upTo: 10) { span in - return span.count + let expectedError = EitherError, Never>.first( + .second(AsyncReaderLeftOverElementsError()) + ) + await #expect(throws: expectedError) { + try await reader.collect(upTo: 1) { span in + return span.count + } } - - #expect(result == 3) } } diff --git a/Tests/AsyncStreamingTests/AsyncReader/AsyncReader+forEachTests.swift b/Tests/AsyncStreamingTests/AsyncReader/AsyncReader+forEachTests.swift index ae31157a..06bc2489 100644 --- a/Tests/AsyncStreamingTests/AsyncReader/AsyncReader+forEachTests.swift +++ b/Tests/AsyncStreamingTests/AsyncReader/AsyncReader+forEachTests.swift @@ -17,15 +17,15 @@ import ContainersPreview import Testing @Suite -struct AsyncReaderforEachChunkTests { +struct AsyncReaderforEachBufferTests { @Test @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func forEachChunkIteratesAllSpans() async throws { + func forEachBufferIteratesAllSpans() async throws { let reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 5, copying: [1, 2, 3, 4, 5])) var elementCount = 0 - await reader.forEachChunk { span in - elementCount += span.count + await reader.forEachBuffer { buffer in + elementCount += buffer.count } #expect(elementCount == 5) @@ -33,13 +33,13 @@ struct AsyncReaderforEachChunkTests { @Test @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func forEachChunkProcessesElements() async throws { + func forEachBufferProcessesElements() async throws { let reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 3, copying: [10, 20, 30])) var sum = 0 - await reader.forEachChunk { span in - for i in span.indices { - sum += span[i] + await reader.forEachBuffer { buffer in + for i in buffer.indices { + sum += buffer[i] } } @@ -48,11 +48,11 @@ struct AsyncReaderforEachChunkTests { @Test @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func forEachChunkWithEmptyReader() async throws { + func forEachBufferWithEmptyReader() async throws { let reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 0, copying: [])) var callCount = 0 - await reader.forEachChunk { span in + await reader.forEachBuffer { buffer in callCount += 1 } @@ -61,7 +61,7 @@ struct AsyncReaderforEachChunkTests { @Test @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func forEachChunkWithThrowingBody() async { + func forEachBufferWithThrowingBody() async { enum TestError: Error { case failed } @@ -69,7 +69,7 @@ struct AsyncReaderforEachChunkTests { let reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 3, copying: [1, 2, 3])) do { - try await reader.forEachChunk { (span) throws(TestError) -> Void in + try await reader.forEachBuffer { (_) throws(TestError) -> Void in throw TestError.failed } Issue.record("Expected error to be thrown") @@ -80,7 +80,7 @@ struct AsyncReaderforEachChunkTests { @Test @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func forEachChunkWithNeverFailingReader() async { + func forEachBufferWithNeverFailingReader() async { enum TestError: Error { case failed } @@ -89,8 +89,8 @@ struct AsyncReaderforEachChunkTests { var count = 0 do { - try await reader.forEachChunk { (span) throws(TestError) -> Void in - count += span.count + try await reader.forEachBuffer { (buffer) throws(TestError) -> Void in + count += buffer.count } } catch { Issue.record("No error should be thrown from reader") @@ -101,42 +101,19 @@ struct AsyncReaderforEachChunkTests { @Test @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func forEachChunkWithAsyncWork() async throws { + func forEachBufferWithAsyncWork() async throws { let reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 3, copying: [1, 2, 3])) var results: [Int] = [] - await reader.forEachChunk { span in + await reader.forEachBuffer { buffer in await Task.yield() - for i in span.indices { - results.append(span[i]) + for i in buffer.indices { + results.append(buffer[i]) } } #expect(results == [1, 2, 3]) } - - @Test - @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func forEachChunkMultipleSpans() async { - var reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 6, copying: [1, 2, 3, 4, 5, 6])) - var spanCounts: [Int] = [] - - // Force reading in smaller chunks - while true { - let hasMore = try! await reader.read(maximumCount: 2) { span in - if span.count > 0 { - spanCounts.append(span.count) - return true - } - return false - } - if !hasMore { - break - } - } - - #expect(spanCounts == [2, 2, 2]) - } } #endif diff --git a/Tests/AsyncStreamingTests/AsyncReader/AsyncReaderTests.swift b/Tests/AsyncStreamingTests/AsyncReader/AsyncReaderTests.swift index 02de47f7..d0e2a4e1 100644 --- a/Tests/AsyncStreamingTests/AsyncReader/AsyncReaderTests.swift +++ b/Tests/AsyncStreamingTests/AsyncReader/AsyncReaderTests.swift @@ -19,64 +19,39 @@ import Testing struct AsyncReaderTests { @Test @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func readWithMaximumCount() async { + func read() async { var reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 5, copying: [1, 2, 3, 4, 5])) - let result = try! await reader.read(maximumCount: 3) { span in - return Array(span) + let result = try! await reader.read { buffer in + return buffer.clone() } - #expect(result == [1, 2, 3]) + #expect(result.count == 5) + #expect(result[0] == 1) + #expect(result[1] == 2) + #expect(result[2] == 3) + #expect(result[3] == 4) + #expect(result[4] == 5) } @Test @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func readWithoutMaximumCount() async { - var reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 5, copying: [1, 2, 3, 4, 5])) - - let result = try! await reader.read(maximumCount: .max) { span in - return Array(span) - } - - #expect(result == [1, 2, 3, 4, 5]) - } - - @Test - @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func readEmptySpanAtEnd() async { + func readEmptyAtEnd() async { var reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 3, copying: [1, 2, 3])) // Read all data - _ = try! await reader.read { span in - return Array(span) + let first = try! await reader.read { buffer in + return buffer.count } - // Next read should return empty span - let result = try! await reader.read { span in - return span.count - } - - #expect(result == 0) - } + #expect(first == 3) - @Test - @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func readMultipleChunks() async { - var reader = UniqueArrayAsyncReader(storage: UniqueArray(capacity: 6, copying: [1, 2, 3, 4, 5, 6])) - var chunks: [[Int]] = [] - - while true { - let chunk = try! await reader.read(maximumCount: 2) { span in - return Array(span) - } - print(chunk) - if chunk.isEmpty { - break - } - chunks.append(chunk) + // Next read should return empty span + let second = try! await reader.read { buffer in + return buffer.count } - #expect(chunks == [[1, 2], [3, 4], [5, 6]]) + #expect(second == 0) } } #endif diff --git a/Tests/AsyncStreamingTests/AsyncWriter/AsyncWriterTests.swift b/Tests/AsyncStreamingTests/AsyncWriter/AsyncWriterTests.swift index c59baa47..81ffd90a 100644 --- a/Tests/AsyncStreamingTests/AsyncWriter/AsyncWriterTests.swift +++ b/Tests/AsyncStreamingTests/AsyncWriter/AsyncWriterTests.swift @@ -16,63 +16,88 @@ import ContainersPreview import Testing @Suite -struct CallerAsyncWriterTests { +struct AsyncWriterTests { @Test @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func writeSpan() async { - var writer = UniqueArrayCallerAsyncWriter() - var data = UniqueArray() - for i in 1...5 { - data.append(i) - } + func writeElements() async { + var writer = UniqueArrayAsyncWriter() - var consumer = data.consumeAll() - try! await writer.write(span: consumer.drainNext()) + try! await writer.write { buffer in + buffer.append(1) + buffer.append(2) + buffer.append(3) + } - #expect(writer.storage.count == 5) + #expect(writer.storage.count == 3) + #expect(writer.storage[0] == 1) + #expect(writer.storage[1] == 2) + #expect(writer.storage[2] == 3) } @Test @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func writeEmptySpan() async { - var writer = UniqueArrayCallerAsyncWriter() + func writeEmptyBuffer() async { + var writer = UniqueArrayAsyncWriter() - try! await writer.write(span: InputSpan()) + try! await writer.write { _ in } #expect(writer.storage.count == 0) } @Test @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func writeLargeSpan() async { - var writer = UniqueArrayCallerAsyncWriter(capacity: 100) - var data = UniqueArray() - for i in 1...50 { - data.append(i) - } + func writeReturnsValue() async { + var writer = UniqueArrayAsyncWriter() - var consumer = data.consumeAll() - try! await writer.write(span: consumer.drainNext()) + let count = try! await writer.write { buffer in + buffer.append(10) + buffer.append(20) + return buffer.count + } - #expect(writer.storage.count == 50) + #expect(count == 2) + #expect(writer.storage.count == 2) } @Test @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - func writeSpanExceedingCapacity() async { - var writer = UniqueArrayCallerAsyncWriter(capacity: 5) - var data = UniqueArray() - for i in 1...10 { - data.append(i) + func writeWithThrowingBody() async { + enum TestError: Error, Equatable { + case failed } - var consumer = data.consumeAll() + var writer = UniqueArrayAsyncWriter() + do { - try await writer.write(span: consumer.drainNext()) - Issue.record("Expected WriterCapacityError") + try await writer.write { (_) throws(TestError) -> Void in + throw TestError.failed + } + Issue.record("Expected error to be thrown") } catch { - // Expected WriterCapacityError + #expect(error == EitherError.second(TestError.failed)) + } + } + + @Test + @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) + func writeMultipleTimes() async { + var writer = UniqueArrayAsyncWriter() + + try! await writer.write { buffer in + buffer.append(1) + buffer.append(2) } + + try! await writer.write { buffer in + buffer.append(3) + buffer.append(4) + } + + #expect(writer.storage.count == 4) + #expect(writer.storage[0] == 1) + #expect(writer.storage[1] == 2) + #expect(writer.storage[2] == 3) + #expect(writer.storage[3] == 4) } } #endif diff --git a/Tests/AsyncStreamingTests/CallerAsyncReader/CallerAsyncReaderTests.swift b/Tests/AsyncStreamingTests/CallerAsyncReader/CallerAsyncReaderTests.swift new file mode 100644 index 00000000..0350b062 --- /dev/null +++ b/Tests/AsyncStreamingTests/CallerAsyncReader/CallerAsyncReaderTests.swift @@ -0,0 +1,93 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2026 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +#if UnstableAsyncStreaming && compiler(>=6.4) +import AsyncStreaming +import BasicContainers +import ContainersPreview +import Testing + +@Suite +struct CallerAsyncReaderTests { + @Test + @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) + func readIntoBuffer() async { + var reader = UniqueArrayCallerAsyncReader( + storage: UniqueArray(capacity: 5, copying: [1, 2, 3, 4, 5]) + ) + var buffer = UniqueArray(minimumCapacity: 10) + + await reader.read(into: &buffer) + + #expect(buffer.count == 5) + #expect(buffer[0] == 1) + #expect(buffer[1] == 2) + #expect(buffer[2] == 3) + #expect(buffer[3] == 4) + #expect(buffer[4] == 5) + } + + @Test + @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) + func readIntoBufferAtEnd() async { + var reader = UniqueArrayCallerAsyncReader( + storage: UniqueArray(capacity: 0, copying: []) + ) + var buffer = UniqueArray(minimumCapacity: 10) + + await reader.read(into: &buffer) + + #expect(buffer.count == 0) + } + + @Test + @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) + func readIntoBufferRespectsCapacity() async { + var reader = UniqueArrayCallerAsyncReader( + storage: UniqueArray(capacity: 10, copying: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) + ) + var buffer = UniqueArray(minimumCapacity: 3) + + await reader.read(into: &buffer) + + #expect(buffer.count == 3) + #expect(buffer[0] == 1) + #expect(buffer[1] == 2) + #expect(buffer[2] == 3) + } + + @Test + @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) + func readMultipleTimes() async { + var reader = UniqueArrayCallerAsyncReader( + storage: UniqueArray(capacity: 6, copying: [1, 2, 3, 4, 5, 6]) + ) + + var buffer1 = UniqueArray(minimumCapacity: 3) + await reader.read(into: &buffer1) + #expect(buffer1.count == 3) + #expect(buffer1[0] == 1) + #expect(buffer1[1] == 2) + #expect(buffer1[2] == 3) + + var buffer2 = UniqueArray(minimumCapacity: 3) + await reader.read(into: &buffer2) + #expect(buffer2.count == 3) + #expect(buffer2[0] == 4) + #expect(buffer2[1] == 5) + #expect(buffer2[2] == 6) + + var buffer3 = UniqueArray(minimumCapacity: 3) + await reader.read(into: &buffer3) + #expect(buffer3.count == 0) + } +} +#endif diff --git a/Tests/AsyncStreamingTests/CallerAsyncWriter/CallerAsyncWriterTests.swift b/Tests/AsyncStreamingTests/CallerAsyncWriter/CallerAsyncWriterTests.swift new file mode 100644 index 00000000..49066011 --- /dev/null +++ b/Tests/AsyncStreamingTests/CallerAsyncWriter/CallerAsyncWriterTests.swift @@ -0,0 +1,53 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2026 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +#if UnstableAsyncStreaming && compiler(>=6.4) +import AsyncStreaming +import BasicContainers +import ContainersPreview +import Testing + +@Suite +struct CallerAsyncWriterTests { + @Test + @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) + func writeBuffer() async { + var writer = UniqueArrayCallerAsyncWriter() + var data = UniqueArray(capacity: 5, copying: [1, 2, 3, 4, 5]) + + await writer.write(buffer: &data) + + #expect(writer.storage.count == 5) + } + + @Test + @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) + func writeEmptyBuffer() async { + var writer = UniqueArrayCallerAsyncWriter() + var data = UniqueArray() + + await writer.write(buffer: &data) + + #expect(writer.storage.count == 0) + } + + @Test + @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) + func writeLargeBuffer() async { + var writer = UniqueArrayCallerAsyncWriter(capacity: 100) + var data = UniqueArray(capacity: 5, copying: Array(1...50)) + + await writer.write(buffer: &data) + + #expect(writer.storage.count == 50) + } +} +#endif diff --git a/Tests/AsyncStreamingTests/EitherErrorTests.swift b/Tests/AsyncStreamingTests/EitherErrorTests.swift new file mode 100644 index 00000000..b44e776e --- /dev/null +++ b/Tests/AsyncStreamingTests/EitherErrorTests.swift @@ -0,0 +1,68 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2026 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +#if UnstableAsyncStreaming && compiler(>=6.4) +import AsyncStreaming +import Testing + +@Suite +struct EitherErrorTests { + enum FirstError: Error, Equatable, Hashable { + case one + case two + } + + enum SecondError: Error, Equatable, Hashable { + case alpha + case beta + } + + @Test + func unwrapFirst() throws { + let error: EitherError = .first(.one) + + #expect(throws: FirstError.one) { + try error.unwrap() + } + } + + @Test + func unwrapSecond() throws { + let error: EitherError = .second(.alpha) + + #expect(throws: SecondError.alpha) { + try error.unwrap() + } + } + + @Test + func equatable() { + let a: EitherError = .first(.one) + let b: EitherError = .first(.one) + let c: EitherError = .first(.two) + let d: EitherError = .second(.alpha) + + #expect(a == b) + #expect(a != c) + #expect(a != d) + } + + @Test + func hashable() { + let a: EitherError = .first(.one) + let b: EitherError = .first(.one) + let c: EitherError = .second(.alpha) + + #expect(a.hashValue == b.hashValue) + #expect(a.hashValue != c.hashValue) + } +} +#endif diff --git a/Tests/AsyncStreamingTests/Helpers/UniqueArrayAsyncReader.swift b/Tests/AsyncStreamingTests/Helpers/UniqueArrayAsyncReader.swift index 6e0da3d1..82125873 100644 --- a/Tests/AsyncStreamingTests/Helpers/UniqueArrayAsyncReader.swift +++ b/Tests/AsyncStreamingTests/Helpers/UniqueArrayAsyncReader.swift @@ -17,34 +17,18 @@ import ContainersPreview @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) struct UniqueArrayAsyncReader: ~Copyable, AsyncReader { typealias ReadElement = Int + typealias Buffer = UniqueArray typealias ReadFailure = Never var storage: UniqueArray mutating func read( - maximumCount: Int, - body: (consuming InputSpan) async throws(Failure) -> Return + body: (inout UniqueArray) async throws(Failure) -> Return ) async throws(EitherError) -> Return { do { - guard storage.count > 0 else { - return try await body(InputSpan()) - } - - let count = min(maximumCount, storage.count) - - // Use the callback-based consume which correctly updates storage's count. - var chunk = UniqueArray(minimumCapacity: count) - self.storage.consume( - 0..=6.4) +import AsyncStreaming +import BasicContainers +import ContainersPreview + +@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) +struct UniqueArrayAsyncWriter: ~Copyable, AsyncWriter { + typealias WriteElement = Int + typealias Buffer = UniqueArray + typealias WriteFailure = Never + + var storage: UniqueArray + + init(capacity: Int = 100) { + self.storage = UniqueArray(minimumCapacity: capacity) + } + + mutating func write( + _ body: (inout UniqueArray) async throws(Failure) -> Return + ) async throws(EitherError) -> Return { + var buffer = UniqueArray(minimumCapacity: 64) + do { + let result = try await body(&buffer) + self.storage.append(copying: buffer) + return result + } catch { + throw .second(error) + } + } +} +#endif diff --git a/Tests/AsyncStreamingTests/Helpers/UniqueArrayCallerAsyncReader.swift b/Tests/AsyncStreamingTests/Helpers/UniqueArrayCallerAsyncReader.swift index 3e48c977..e08c2f4d 100644 --- a/Tests/AsyncStreamingTests/Helpers/UniqueArrayCallerAsyncReader.swift +++ b/Tests/AsyncStreamingTests/Helpers/UniqueArrayCallerAsyncReader.swift @@ -22,9 +22,9 @@ struct UniqueArrayCallerAsyncReader: ~Copyable, CallerAsyncReader { var storage: UniqueArray var position: Int = 0 - mutating func read( - into buffer: inout OutputSpan - ) async throws(Never) { + mutating func read & ~Copyable>( + into buffer: inout Buffer + ) async throws(ReadFailure) where Buffer.Element: ~Copyable { guard position < storage.count else { return } let count = min(buffer.freeCapacity, storage.count - position) for i in 0.. @@ -27,15 +27,11 @@ struct UniqueArrayCallerAsyncWriter: ~Copyable, CallerAsyncWriter { self.storage = UniqueArray(minimumCapacity: capacity) } - mutating func write( - span: borrowing InputSpan - ) async throws(WriterCapacityError) { - guard span.count <= storage.freeCapacity else { - throw WriterCapacityError() - } - for i in span.indices { - storage.append(span[i]) - } + mutating func write & ~Copyable>( + buffer: inout Buffer + ) async throws(WriteFailure) where Buffer.Element: ~Copyable { + self.storage.reserveCapacity(buffer.count) + self.storage.append(copying: buffer) } } #endif