Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

164 changes: 30 additions & 134 deletions Sources/AsyncStreaming/AsyncReader/AsyncReader+collect.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,33 @@
public import ContainersPreview
import BasicContainers

/// An error indicating that the reader produced more elements than the specified collection limit.
///
/// This error occurs when calling ``AsyncReader/collect(upTo:body:)`` and the reader's buffer
/// contains more elements than the allowed limit.
public struct AsyncReaderLeftOverElementsError: Error, Hashable {
public init() {}
}

@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *)
extension AsyncReader where Self: ~Copyable, Self: ~Escapable, ReadElement: ~Copyable {
/// Collects elements from the reader up to a specified limit and processes them with a body function.
/// Collects elements from the reader up to a specified limit and processes them.
///
/// This method continuously reads elements from the async reader, accumulating them in a buffer
/// until either it reaches the end of the stream (indicated by an empty `Span`) or reaches
/// the specified limit. Once collection completes, it passes the accumulated elements to the
/// provided body function as a `Span` for processing.
/// This method continuously reads elements from the async reader, accumulating them in an
/// internal buffer until either it reaches the end of the stream or the specified limit.
/// Once collection completes, it passes the accumulated elements to the provided body
/// closure as an `InputSpan` for processing.
///
/// - Parameters:
/// - limit: The maximum number of elements to collect. This prevents unbounded memory
/// growth when reading from potentially infinite streams.
/// - body: A closure that receives a `Span` containing all collected elements and returns
/// a result of type `Result`. The method calls this closure once after collecting all
/// elements successfully.
/// - body: A closure that receives an `InputSpan` containing all collected elements and returns
/// a result of type `Result`.
///
/// - Returns: The value returned by the body closure after processing the collected elements.
///
/// - Throws: An `EitherError` containing either a `ReadFailure` from the read operation
/// - Throws: An `EitherError` wrapping either a read failure (which itself may be an
/// ``AsyncReaderLeftOverElementsError`` if the reader produces more elements than the limit),
/// or a `Failure` from the body closure.
///
/// ## Example
Expand All @@ -44,153 +52,41 @@ extension AsyncReader where Self: ~Copyable, Self: ~Escapable, ReadElement: ~Cop
/// // Process all collected elements
/// }
/// ```
///
/// ## Memory Considerations
///
/// Since this method buffers all elements in memory before processing, it should be used
/// with caution on large datasets. The `limit` parameter serves as a safety mechanism
/// to prevent excessive memory usage.
public mutating func collect<Result, Failure: Error>(
upTo limit: Int,
body: (consuming InputSpan<ReadElement>) async throws(Failure) -> Result
) async throws(EitherError<ReadFailure, Failure>) -> Result {
) async throws(EitherError<EitherError<ReadFailure, AsyncReaderLeftOverElementsError>, Failure>) -> Result {
// TODO: In the future we might want to use a temporary allocation instead
// but those don't support async closures yet.
var buffer = UniqueArray<ReadElement>()
buffer.reserveCapacity(limit)
var collectedBuffer = UniqueArray<ReadElement>()
collectedBuffer.reserveCapacity(limit)
var shouldContinue = true
do {
while shouldContinue {
try await self.read(
maximumCount: limit - buffer.count
) { (span: consuming InputSpan<ReadElement>) in
guard span.count > 0 else {
try await self.read { (buffer: inout Buffer) throws(AsyncReaderLeftOverElementsError) -> Void in
guard buffer.count > 0 else {
shouldContinue = false
return
}
precondition(span.count <= limit - buffer.count)
while let element = span.popFirst() {
buffer.append(element)
if limit - collectedBuffer.count < buffer.count {
throw AsyncReaderLeftOverElementsError()
}
var consumer = buffer.consumeAll()
while let element = consumer.next() {
collectedBuffer.append(element)
}
}
}
} catch {
switch error {
case .first(let error):
throw .first(error)
case .second:
fatalError()
}
throw .first(error)
}
do {
var consumer = buffer.consumeAll()
var consumer = collectedBuffer.consumeAll()
return try await body(consumer.drainNext())
} catch {
throw .second(error)
}
}

/// Collects elements from the reader up to a specified limit and processes them with a body function.
///
/// This method continuously reads elements from the async reader, accumulating them in a buffer
/// until either it reaches the end of the stream (indicated by an empty `Span`) or reaches
/// the specified limit. Once collection completes, it passes the accumulated elements to the
/// provided body function as a `Span` for processing.
///
/// - Parameters:
/// - limit: The maximum number of elements to collect. This prevents unbounded memory
/// growth when reading from potentially infinite streams.
/// - body: A closure that receives a `Span` containing all collected elements and returns
/// a result of type `Result`. The method calls this closure once after collecting all
/// elements successfully.
///
/// - Returns: The value returned by the body closure after processing the collected elements.
///
/// ## Example
///
/// ```swift
/// var reader: SomeAsyncReader = ...
///
/// let processedData = try await reader.collect(upTo: 1000) { span in
/// // Process all collected elements
/// }
/// ```
///
/// ## Memory Considerations
///
/// Since this method buffers all elements in memory before processing, it should be used
/// with caution on large datasets. The `limit` parameter serves as a safety mechanism
/// to prevent excessive memory usage.
public mutating func collect<Result>(
upTo limit: Int,
body: (consuming InputSpan<ReadElement>) async -> Result
) async -> Result where ReadFailure == Never {
// TODO: In the future we might want to use a temporary allocation instead
// but those don't support async closures yet.
var buffer = UniqueArray<ReadElement>()
buffer.reserveCapacity(limit)
var shouldContinue = true
while limit - buffer.count > 0 && shouldContinue {
// This force-try is safe since neither read nor the closure are throwing
try! await self.read(
maximumCount: limit - buffer.count
) { (span: consuming InputSpan<ReadElement>) in
precondition(span.count <= limit - buffer.count)
guard span.count > 0 else {
// This means the underlying reader is finished and we can return
shouldContinue = false
return
}
while let element = span.popFirst() {
buffer.append(element)
}
}
}
var consumer = buffer.consumeAll()
return await body(consumer.drainNext())
}

/// Collects elements from the reader into an output span until the span is full.
///
/// This method continuously reads elements from the async reader and appends them to the
/// provided output span until the span reaches its capacity. This provides an efficient
/// way to fill a pre-allocated buffer with elements from the reader.
///
/// - Parameter outputSpan: An `OutputSpan` to append read elements into. The method continues
/// reading until this span is full.
///
/// - Throws: An error of type `ReadFailure` if any read operation fails.
///
/// ## Example
///
/// ```swift
/// var reader: SomeAsyncReader = ...
/// var buffer = [Int](repeating: 0, count: 100)
///
/// try await buffer.withOutputSpan { outputSpan in
/// try await reader.collect(into: &outputSpan)
/// }
/// ```
public mutating func collect(
into outputSpan: inout OutputSpan<ReadElement>
) async throws(ReadFailure) {
while !outputSpan.isFull {
do {
try await self.read(maximumCount: outputSpan.freeCapacity) { (span: consuming InputSpan<ReadElement>) in
while let element = span.popFirst() {
outputSpan.append(element)
}
}
} catch {
switch error {
case .first(let error):
throw error
case .second:
fatalError()
}
}
}
}
}

#endif
47 changes: 21 additions & 26 deletions Sources/AsyncStreaming/AsyncReader/AsyncReader+forEach.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ public import ContainersPreview
// swift-format-ignore: AmbiguousTrailingClosureOverload
@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *)
extension AsyncReader where Self: ~Copyable, Self: ~Escapable {
/// Iterates over all chunks from the reader, executing the provided body for each span.
/// Iterates over all chunks from the reader, executing the provided body for each buffer.
///
/// This method continuously reads chunks from the async reader until the stream ends,
/// executing the provided closure for each span of elements read. The iteration terminates
/// when the reader produces an empty span, indicating the end of the stream.
/// executing the provided closure for each buffer of elements read. The iteration terminates
/// when the reader produces an empty buffer, indicating the end of the stream.
///
/// - Parameter body: An asynchronous closure that processes each span of elements read
/// from the stream. The closure receives a `Span<ReadElement>` for each read operation.
/// - Parameter body: An asynchronous closure that processes each buffer of elements read
/// from the stream.
///
/// - Throws: An `EitherError` containing either a `ReadFailure` from the read operation
/// or a `Failure` from the body closure.
Expand All @@ -33,14 +33,12 @@ extension AsyncReader where Self: ~Copyable, Self: ~Escapable {
/// ```swift
/// var fileReader: FileAsyncReader = ...
///
/// // Process each chunk of data from the file
/// try await fileReader.forEach { chunk in
/// print("Processing \(chunk.count) elements")
/// // Process the chunk
/// try await fileReader.forEachBuffer { buffer in
/// print("Processing \(buffer.count) elements")
/// }
/// ```
public consuming func forEachChunk<Failure: Error>(
body: (consuming InputSpan<ReadElement>) async throws(Failure) -> Void
public consuming func forEachBuffer<Failure: Error>(
body: (inout Buffer) async throws(Failure) -> Void
) async throws(EitherError<ReadFailure, Failure>) {
var shouldContinue = true
while shouldContinue {
Expand All @@ -50,37 +48,34 @@ extension AsyncReader where Self: ~Copyable, Self: ~Escapable {
return
}

try await body(next)
try await body(&next)
}
}
}

/// Iterates over all chunks from the reader, executing the provided body for each span.
/// Iterates over all chunks from a non-failing reader, executing the provided body for each buffer.
///
/// This method continuously reads chunks from the async reader until the stream ends,
/// executing the provided closure for each span of elements read. The iteration terminates
/// when the reader produces an empty span, indicating the end of the stream.
/// executing the provided closure for each buffer of elements read. The iteration terminates
/// when the reader produces an empty buffer, indicating the end of the stream.
///
/// - Parameter body: An asynchronous closure that processes each span of elements read
/// from the stream. The closure receives a `Span<ReadElement>` for each read operation.
/// Use this overload when the reader's ``AsyncReader/ReadFailure`` type is `Never`.
///
/// - Throws: An error of type `Failure` from the body closure. Since this reader never fails,
/// only the body closure can throw errors.
/// - Parameter body: An asynchronous closure that processes each buffer of elements read
/// from the stream.
///
/// ## Example
///
/// ```swift
/// var fileReader: FileAsyncReader = ...
///
/// // Process each chunk of data from the file
/// try await fileReader.forEach { chunk in
/// print("Processing \(chunk.count) elements")
/// // Process the chunk
/// await fileReader.forEachBuffer { buffer in
/// print("Processing \(buffer.count) elements")
/// }
/// ```
@inlinable
public consuming func forEachChunk(
body: (consuming InputSpan<ReadElement>) async -> Void
public consuming func forEachBuffer(
body: (inout Buffer) async -> Void
) async where ReadFailure == Never {
var shouldContinue = true
while shouldContinue {
Expand All @@ -91,7 +86,7 @@ extension AsyncReader where Self: ~Copyable, Self: ~Escapable {
return
}

await body(next)
await body(&next)
}
} catch {
fatalError()
Expand Down
47 changes: 18 additions & 29 deletions Sources/AsyncStreaming/AsyncReader/AsyncReader.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,58 +12,47 @@
#if UnstableAsyncStreaming && compiler(>=6.4)
public import ContainersPreview

/// Reads elements asynchronously from a source.
/// Reads elements asynchronously from a source using callee-managed buffering.
///
/// Adopt ``AsyncReader`` when you need to provide callee-managed buffering,
/// where the reader controls the buffer and passes a span of elements
/// to the caller through the `body` closure.
/// Adopt ``AsyncReader`` when you need callee-managed buffering,
/// where the reader controls the buffer and passes it to the caller
/// through the `body` closure.
@available(macOS 10.14.4, iOS 12.2, watchOS 5.2, tvOS 12.2, visionOS 1.0, *)
public protocol AsyncReader<ReadElement, ReadFailure>: ~Copyable, ~Escapable {
/// The type of elements this reader reads.
associatedtype ReadElement: ~Copyable

/// The container type the reader uses to pass elements to the caller.
associatedtype Buffer: RangeReplaceableContainer<ReadElement> & ~Copyable

/// The error type that reading operations throw.
associatedtype ReadFailure: Error

/// Reads elements from the underlying source and passes them to the provided body closure.
///
/// This method asynchronously reads a span of elements from the source,
/// then passes them to `body` for processing.
/// This method asynchronously reads elements from the source into a buffer,
/// then passes the buffer to `body` for processing. When the buffer is empty,
/// the stream has ended.
///
/// ```swift
/// var fileReader: FileAsyncReader = ...
///
/// // Read data from a file asynchronously and process it.
/// let result = try await fileReader.read { data in
/// guard data.count > 0 else {
/// return
/// let result = try await fileReader.read { buffer in
/// guard buffer.count > 0 else {
/// return 0
/// }
/// return data
/// return buffer.count
/// }
/// ```
///
/// - Parameter maximumCount: The maximum count of items you're ready
/// to process. Must be greater than zero.
/// - Parameter body: A closure that processes a span of read elements
/// and returns a value of type `Return`. When the span is empty,
/// it indicates the end of the stream.
/// - Parameter body: A closure that receives a mutable reference to the buffer
/// of read elements and returns a value of type `Return`. When the buffer
/// is empty, it indicates the end of the stream.
/// - Returns: The value the body closure returns after processing the read elements.
/// - Throws: An `EitherError` containing either a `ReadFailure` from the read operation
/// or a `Failure` from the body closure.
mutating func read<Return: ~Copyable, Failure: Error>(
maximumCount: Int,
body: (consuming InputSpan<ReadElement>) async throws(Failure) -> Return
body: (inout Buffer) async throws(Failure) -> Return
) async throws(EitherError<ReadFailure, Failure>) -> Return

}

@available(macOS 10.14.4, iOS 12.2, watchOS 5.2, tvOS 12.2, visionOS 1.0, *)
extension AsyncReader where Self: ~Copyable, Self: ~Escapable, ReadElement: ~Copyable {
/// Reads elements with no upper bound on span size.
public mutating func read<Return: ~Copyable, Failure: Error>(
body: (consuming InputSpan<ReadElement>) async throws(Failure) -> Return
) async throws(EitherError<ReadFailure, Failure>) -> Return {
try await read(maximumCount: .max, body: body)
}
}
#endif
Loading
Loading