Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions Sources/URLSessionHTTPClient/URLSessionHTTPClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ public final class URLSessionHTTPClient: HTTPClient, IdleTimerEntryProvider {
public mutating func read<Return: ~Copyable, Failure>(
body: (inout UniqueArray<UInt8>) async throws(Failure) -> Return
) async throws(AsyncStreaming.EitherError<any Error, Failure>) -> Return where Failure: Error {
let data: Data?
let data: DispatchData?
do {
data = try await self.actual.data(maximumCount: nil)
data = try await self.actual.data()
} catch {
throw .first(error)
}
Expand All @@ -95,7 +95,9 @@ public final class URLSessionHTTPClient: HTTPClient, IdleTimerEntryProvider {
var buffer = self.buffer.take()!
if let data, !data.isEmpty {
buffer.reserveCapacity(data.count)
buffer.append(copying: data.span)
for region in data.regions {
buffer.append(copying: region)
}
}

let result: Return
Expand Down
69 changes: 42 additions & 27 deletions Sources/URLSessionHTTPClient/URLSessionTaskDelegateBridge.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import HTTPTypesFoundation
import Synchronization

@available(anyAppleOS 26.0, *)
final class URLSessionTaskDelegateBridge: NSObject, Sendable, URLSessionDataDelegate {
final class URLSessionTaskDelegateBridge: NSObject, Sendable, URLSessionTaskDelegate {
private enum Callback: Sendable {
case response(URLResponse)
case redirection(
Expand Down Expand Up @@ -66,10 +66,10 @@ final class URLSessionTaskDelegateBridge: NSObject, Sendable, URLSessionDataDele
// The client is waiting on URLSession. The client is expecting response body data,
// but URLSession has not received any that it can send out to the client. When
// URLSession does receive an update, the provided continuation must be fired.
case awaitingData(CheckedContinuation<(Bool, Data?), any Error>)
case awaitingData(CheckedContinuation<(Bool, DispatchData?), any Error>)
// URLSession is waiting for the client. URLSession has updated the response body
// with more data/completion/error but the client has not consumed/processed this.
case awaitingConsumption(Data, complete: Bool, error: (any Error)?, suspendedTask: URLSessionTask?)
case awaitingConsumption(DispatchData, complete: Bool, error: (any Error)?, suspendedTask: URLSessionTask?)
}
var state: State = .awaitingResponse
var completionContinuation: CheckedContinuation<Void, Never>? = nil
Expand All @@ -82,6 +82,8 @@ final class URLSessionTaskDelegateBridge: NSObject, Sendable, URLSessionDataDele
self.state.withLock { $0.responseTrailerFields }
}

// Manually specifying the selector without conforming to `URLSessionDataDelegate` to avoid Data bridging
@objc(URLSession:dataTask:didReceiveResponse:completionHandler:)
func urlSession(
_ session: URLSession,
dataTask: URLSessionDataTask,
Expand All @@ -92,7 +94,7 @@ final class URLSessionTaskDelegateBridge: NSObject, Sendable, URLSessionDataDele
defer {
switch state.state {
case .awaitingResponse:
state.state = .awaitingConsumption(Data(), complete: false, error: nil, suspendedTask: nil)
state.state = .awaitingConsumption(.empty, complete: false, error: nil, suspendedTask: nil)
case .awaitingData, .awaitingConsumption:
break
}
Expand All @@ -108,23 +110,27 @@ final class URLSessionTaskDelegateBridge: NSObject, Sendable, URLSessionDataDele
completionHandler(.allow)
}

func urlSession(_ session: URLSession, dataTask: URLSessionDataTask, didReceive data: Data) {
// Manually specifying the selector without conforming to `URLSessionDataDelegate` to avoid Data bridging
@objc(URLSession:dataTask:didReceiveData:)
func urlSession(_ session: URLSession, dataTask: URLSessionDataTask, didReceive data: NSData) {
// URLSession usually vends dispatch_data_t as NSData
let data = DispatchData(data)
let oldState = self.state.withLock { state in
defer {
switch state.state {
case .awaitingData:
state.state = .awaitingConsumption(data, complete: false, error: nil, suspendedTask: nil)
case .awaitingResponse:
// We don't support data before response
state.state = .awaitingConsumption(Data(), complete: true, error: nil, suspendedTask: nil)
case .awaitingConsumption(let existingData, let complete, let error, var suspendedTask):
let newData = existingData + data
if newData.count > Self.highWatermark && suspendedTask == nil {
state.state = .awaitingConsumption(.empty, complete: true, error: nil, suspendedTask: nil)
case .awaitingConsumption(var existingData, let complete, let error, var suspendedTask):
existingData.append(data)
if existingData.count > Self.highWatermark && suspendedTask == nil {
dataTask.suspend()
suspendedTask = dataTask
}
state.state = .awaitingConsumption(
newData,
existingData,
complete: complete,
error: error,
suspendedTask: suspendedTask
Expand All @@ -150,9 +156,9 @@ final class URLSessionTaskDelegateBridge: NSObject, Sendable, URLSessionDataDele
defer {
switch state.state {
case .awaitingData:
state.state = .awaitingConsumption(Data(), complete: true, error: error, suspendedTask: nil)
state.state = .awaitingConsumption(.empty, complete: true, error: error, suspendedTask: nil)
case .awaitingResponse:
state.state = .awaitingConsumption(Data(), complete: true, error: nil, suspendedTask: nil)
state.state = .awaitingConsumption(.empty, complete: true, error: nil, suspendedTask: nil)
case .awaitingConsumption(let existingData, _, _, _):
state.state = .awaitingConsumption(existingData, complete: true, error: error, suspendedTask: nil)
}
Expand Down Expand Up @@ -181,35 +187,26 @@ final class URLSessionTaskDelegateBridge: NSObject, Sendable, URLSessionDataDele
self.continuation.finish()
}

func data(maximumCount: Int?) async throws -> Data? {
func data() async throws -> DispatchData? {
try await withTaskCancellationHandler {
// Keep waiting on continuations until:
// a) data is returned
// b) no more data can be returned (complete)
// c) an error occurred
while true {
let (shouldReturn, result): (Bool, Data?) = try await withCheckedThrowingContinuation { continuation in
let (shouldReturn, result): (Bool, DispatchData?) = try await withCheckedThrowingContinuation { continuation in
self.state.withLock { state in
switch state.state {
case .awaitingConsumption(let existingData, let complete, let error, let suspendedTask):
if !existingData.isEmpty {
let (dataToReturn, remainingData) =
if let maximumCount, existingData.count > maximumCount {
(existingData.prefix(maximumCount), existingData.dropFirst(maximumCount))
} else {
(existingData, Data())
}
let shouldResume = remainingData.count <= Self.highWatermark
state.state = .awaitingConsumption(
remainingData,
.empty,
complete: complete,
error: existingData.isEmpty ? nil : error,
suspendedTask: shouldResume ? nil : suspendedTask
suspendedTask: nil
)
if shouldResume {
suspendedTask?.resume()
}
continuation.resume(returning: (true, dataToReturn))
suspendedTask?.resume()
continuation.resume(returning: (true, existingData))
} else if let error, complete {
continuation.resume(throwing: error)
} else if complete {
Expand Down Expand Up @@ -462,4 +459,22 @@ final class URLSessionTaskDelegateBridge: NSObject, Sendable, URLSessionDataDele
}
}
}

extension DispatchData {
init(_ data: NSData) {
// If the NSData is already dispatch_data_t
if let dispatchData = data as AnyObject as? __DispatchData {
self = dispatchData as DispatchData
} else {
// This doesn't actually make a copy if the data is immutable
nonisolated(unsafe) let data = data.copy() as! NSData
self = unsafe DispatchData(
bytesNoCopy: UnsafeRawBufferPointer(start: data.bytes, count: data.count),
deallocator: .custom(nil) {
unsafe withExtendedLifetime(data) {}
}
)
}
}
}
#endif
Loading