diff --git a/README.md b/README.md index 033804b..1181580 100644 --- a/README.md +++ b/README.md @@ -420,7 +420,7 @@ App 只负责提供 `deviceID`、本地配置文件路径、上传持久化目 | `credentialBase64` | 上传凭证 | 来自配置文件或安全下发渠道 | | `authRefreshBefore` | 认证缓存提前刷新时间 | `60s` | | `requestTimeout` | 单次请求超时 | `10s` | -| `persistRootURL` | 上传快照和 staging 根目录 | App 私有 `Application Support` 子目录 | +| `persistRootURL` | 上传快照根目录 | App 私有 `Application Support` 子目录 | | `endpointsURL` | 运行期公共端点文件 | App 私有 `Application Support/Archebase/archebase-endpoints.json` | | `retryPolicy` | 请求重试策略 | `.recommended` | | `execution` | 上传执行策略 | `.recommended` | @@ -442,7 +442,7 @@ App 只负责提供 `deviceID`、本地配置文件路径、上传持久化目 | `reconcileRemotePartsOnResume` | `true` | 恢复时校验已上传分片状态 | | `cleanupOnTerminalFailure` | `true` | 终态失败时允许 SDK 清理不可恢复状态 | | `credentialRefreshSkew` | `30s` | 上传凭证过期前提前刷新 | -| `persistence` | `.recommended` | 本地快照和 staging 策略 | +| `persistence` | `.recommended` | 本地快照和直读文件策略 | 默认本地持久化策略: @@ -452,7 +452,7 @@ App 只负责提供 `deviceID`、本地配置文件路径、上传持久化目 | `keepCompletedSnapshot` | `false` | 完成后不保留快照 | | `completedSnapshotTTL` | `0s` | 完成快照保留时间 | | `terminalSnapshotTTL` | `3600s` | 失败终态快照保留时间 | -| `copyExternalFileIntoManagedStaging` | `true` | 上传前复制外部文件到 SDK staging 目录,提升恢复稳定性 | +| `copyExternalFileIntoManagedStaging` | `false` | 兼容字段;当前版本不再生成 staging 数据副本,上传直接读取源文件 | ## 10. 发起上传 @@ -609,7 +609,7 @@ let result = try await client.resumeUpload(logicalUploadID: pending.logicalUploa print(result.objectKey) ``` -恢复时 SDK 会校验本地 staging 文件是否仍然存在、大小是否一致、文件指纹是否匹配。如果文件丢失或被修改,会抛出 `DataGatewayClientError.resumeNotPossible`。 +恢复时 SDK 会校验原始文件是否仍然存在、大小是否一致、文件指纹是否匹配。如果文件丢失、移动或被修改,会抛出 `DataGatewayClientError.resumeNotPossible`。 恢复逻辑对 App 的承诺: @@ -624,7 +624,7 @@ print(result.objectKey) |---|---| | `logicalUploadID` | 恢复和取消使用的稳定上传标识。 | | `uploadID` | 最近一次上传会话标识。 | -| `fileURL` | SDK 管理的本地文件 URL,可能是 staging 文件。 | +| `fileURL` | 本地源文件 URL。 | | `fileSize` | 文件大小,单位 bytes。 | | `phase` | 本地记录的上传阶段。 | | `restartCount` | 已自动重建上传会话的次数。 | @@ -842,11 +842,11 @@ func makeClientOrRequireInitialization() async throws -> DataGatewayClient? { ### 17.3 文件选择与安全作用域 -如果文件来自 `UIDocumentPickerViewController` 或其他安全作用域 URL,SDK 会尽量访问安全作用域资源并把文件复制到 SDK staging 目录。推荐保持 `copyExternalFileIntoManagedStaging = true`,这样用户移动或撤销原始文件访问权限后,已进入 staging 的上传仍更容易恢复。 +如果文件来自 `UIDocumentPickerViewController` 或其他安全作用域 URL,SDK 会尽量访问安全作用域资源并记录 bookmark,但不会把文件复制到 SDK staging 目录。上传和恢复都直接依赖原始文件 URL;因此 App 需要确保上传期间和恢复前源文件仍然存在且可访问,不要在任务完成前移动、删除或改写该文件。 ### 17.4 文件大小与内存 -当前版本会在本地读取文件并按分片上传。请根据目标设备内存和网络条件控制单个文件大小。对于非常大的文件、后台长传或高并发上传需求,建议在上线前与 Archebase 支持团队确认版本能力和压测结果。 +当前版本直接读取源文件上传:单对象上传使用文件 body,multipart 上传按分片构造区间流,不再把完整文件 `readAll` 到内存。SDK 仍会读取首 1 MiB 计算恢复指纹,并按分片小块读取计算分片 MD5。对于非常大的文件、后台长传或高并发上传需求,建议在上线前与 Archebase 支持团队确认版本能力和压测结果。 ## 18. 完整示例 @@ -1213,7 +1213,7 @@ public enum DataGatewayClientError: Error, Sendable, Equatable { | 上传立即失败 `zeroByteFile` | 用户选择的文件是否为空。 | | 上传立即失败 `invalidLocalFile` | 文件是否仍存在,App 是否有访问权限。 | | 上传失败 `rawTagConflict` | `UploadRequest.rawTags` 是否覆盖了设备配置 tags 的同名 key。 | -| 恢复失败 `resumeNotPossible` | staging 文件是否被删除,原始文件是否被修改,用户是否清理过 App 数据。 | +| 恢复失败 `resumeNotPossible` | 原始文件是否被删除、移动或修改,用户是否撤销文件访问权限或清理过 App 数据。 | | 频繁 `authenticationFailed` | 设备是否需要重新初始化,凭证是否已被管理员轮换或撤销。 | | 频繁 `ossFailed` 或 `retryExhausted` | 网络质量、代理、防火墙、系统时间和对象存储可用性。 | | `uploadRestartExceeded` | 让用户重新上传,并保留 `logicalUploadID` 与 SDK 日志联系支持。 | diff --git a/Sources/DGWOss/OssMultipartClient.swift b/Sources/DGWOss/OssMultipartClient.swift index c93b51c..b0aa6de 100644 --- a/Sources/DGWOss/OssMultipartClient.swift +++ b/Sources/DGWOss/OssMultipartClient.swift @@ -221,6 +221,81 @@ package struct OssPutObjectOutput: Sendable, Equatable { } } +package struct OssUploadBody: @unchecked Sendable { + package enum Kind: Sendable, Equatable { + case data + case file + case stream + } + + fileprivate enum Storage { + case data(Data) + case file(URL, sizeBytes: Int64) + case stream(@Sendable () throws -> InputStream, sizeBytes: Int64) + } + + fileprivate let storage: Storage + fileprivate let contentMD5Base64: String? + + package var sizeBytes: Int64 { + switch self.storage { + case .data(let data): + return Int64(data.count) + case .file(_, let sizeBytes), .stream(_, let sizeBytes): + return sizeBytes + } + } + + package var kind: Kind { + switch self.storage { + case .data: + return .data + case .file: + return .file + case .stream: + return .stream + } + } + + package static func data(_ data: Data) -> OssUploadBody { + OssUploadBody(storage: .data(data), contentMD5Base64: nil) + } + + package static func file( + _ fileURL: URL, + sizeBytes: Int64, + contentMD5Base64: String? = nil + ) -> OssUploadBody { + OssUploadBody(storage: .file(fileURL, sizeBytes: sizeBytes), contentMD5Base64: contentMD5Base64) + } + + package static func stream( + sizeBytes: Int64, + contentMD5Base64: String? = nil, + makeStream: @escaping @Sendable () throws -> InputStream + ) -> OssUploadBody { + OssUploadBody(storage: .stream(makeStream, sizeBytes: sizeBytes), contentMD5Base64: contentMD5Base64) + } + + package func byteStream() throws -> ByteStream { + switch self.storage { + case .data(let data): + return .data(data) + case .file(let fileURL, _): + return .file(fileURL) + case .stream(let makeStream, _): + return try .stream(makeStream()) + } + } + + fileprivate func addIntegrityHeaders(to request: inout some RequestModel) { + guard let contentMD5Base64 else { + return + } + request.addHeader("Content-MD5", contentMD5Base64) + } +} + package struct OssCompleteMultipartUploadOutput: Sendable, Equatable { package let etag: String? @@ -724,12 +799,12 @@ package protocol OssMultipartClientProtocol: Sendable { objectKey: String, multipartUploadID: String, partNumber: Int, - body: Data + body: OssUploadBody ) async throws -> UploadedPartDescriptor func putObject( objectKey: String, - body: Data + body: OssUploadBody ) async throws -> UploadedPartDescriptor func completeMultipartUpload( @@ -784,16 +859,18 @@ package struct OssMultipartClient: OssMultipartClientProtocol { objectKey: String, multipartUploadID: String, partNumber: Int, - body: Data + body: OssUploadBody ) async throws -> UploadedPartDescriptor { do { - let request = UploadPartRequest( + var request = UploadPartRequest( bucket: self.configuration.bucket, key: objectKey, partNumber: partNumber, uploadId: multipartUploadID, - body: .data(body) + body: try body.byteStream() ) + request.addHeader("Content-Length", body.sizeBytes.description) + body.addIntegrityHeaders(to: &request) let result = try await self.sdkClient.uploadPart(request) guard let etag = result.etag?.nilIfBlank else { throw OssOperationError.invalidResponse("UploadPart response missing ETag") @@ -801,7 +878,7 @@ package struct OssMultipartClient: OssMultipartClientProtocol { return UploadedPartDescriptor( partNumber: partNumber, etag: etag, - size: Int64(body.count), + size: body.sizeBytes, lastModified: nil, hashCRC64: nil ) @@ -812,14 +889,16 @@ package struct OssMultipartClient: OssMultipartClientProtocol { package func putObject( objectKey: String, - body: Data + body: OssUploadBody ) async throws -> UploadedPartDescriptor { do { - let request = PutObjectRequest( + var request = PutObjectRequest( bucket: self.configuration.bucket, key: objectKey, - body: .data(body) + body: try body.byteStream() ) + request.addHeader("Content-Length", body.sizeBytes.description) + body.addIntegrityHeaders(to: &request) let result = try await self.sdkClient.putObject(request) guard let etag = result.etag?.nilIfBlank else { throw OssOperationError.invalidResponse("PutObject response missing ETag") @@ -827,7 +906,7 @@ package struct OssMultipartClient: OssMultipartClientProtocol { return UploadedPartDescriptor( partNumber: 1, etag: etag, - size: Int64(body.count), + size: body.sizeBytes, lastModified: nil, hashCRC64: nil ) @@ -1066,7 +1145,7 @@ package actor OssUploadSession { package func uploadPart( multipartUploadID: String, partNumber: Int, - body: Data + body: OssUploadBody ) async throws -> UploadedPartDescriptor { try await self.executeDataPlaneOperation { try await self.performUploadPart( @@ -1077,7 +1156,7 @@ package actor OssUploadSession { } } - package func putObject(body: Data) async throws -> UploadedPartDescriptor { + package func putObject(body: OssUploadBody) async throws -> UploadedPartDescriptor { try await self.executeDataPlaneOperation { try await self.performPutObject(body: body) } @@ -1150,7 +1229,7 @@ package actor OssUploadSession { private func performUploadPart( multipartUploadID: String, partNumber: Int, - body: Data + body: OssUploadBody ) async throws -> UploadedPartDescriptor { try await self.client.uploadPart( objectKey: self.context.objectKey, @@ -1160,7 +1239,7 @@ package actor OssUploadSession { ) } - private func performPutObject(body: Data) async throws -> UploadedPartDescriptor { + private func performPutObject(body: OssUploadBody) async throws -> UploadedPartDescriptor { try await self.client.putObject( objectKey: self.context.objectKey, body: body diff --git a/Sources/DataGatewayClient/FilePreparation.swift b/Sources/DataGatewayClient/FilePreparation.swift index 04a80e7..3ed6a6a 100644 --- a/Sources/DataGatewayClient/FilePreparation.swift +++ b/Sources/DataGatewayClient/FilePreparation.swift @@ -104,12 +104,13 @@ public struct DataGatewayClientObservability: Sendable { public static let disabled = DataGatewayClientObservability() } -/// Persistence knobs that affect local snapshot retention and staging behavior. +/// Persistence knobs that affect local snapshot retention and direct-file upload behavior. public struct LocalPersistencePolicy: Sendable, Equatable { public var keepTerminalSnapshot: Bool public var keepCompletedSnapshot: Bool public var completedSnapshotTTL: Duration public var terminalSnapshotTTL: Duration + /// Compatibility flag retained for callers that already set it; new uploads always read the source file directly. public var copyExternalFileIntoManagedStaging: Bool public init( @@ -132,7 +133,7 @@ public struct LocalPersistencePolicy: Sendable, Equatable { keepCompletedSnapshot: false, completedSnapshotTTL: .seconds(0), terminalSnapshotTTL: .seconds(3600), - copyExternalFileIntoManagedStaging: true + copyExternalFileIntoManagedStaging: false ) } @@ -530,25 +531,85 @@ package final class DeviceInitRuntimeResources: @unchecked Sendable { } package protocol SecurityScopedFileAccessing: Sendable { - func access(_ fileURL: URL, operation: @Sendable () throws -> Result) rethrows -> Result + func access(_ fileURL: URL, operation: @Sendable () throws -> Result) throws -> Result + func access(_ fileURL: URL, operation: @Sendable () async throws -> Result) async throws -> Result + func access( + _ fileURL: URL, + bookmarkData: Data?, + operation: @Sendable (_ accessibleURL: URL) throws -> Result + ) throws -> Result + func access( + _ fileURL: URL, + bookmarkData: Data?, + operation: @Sendable (_ accessibleURL: URL) async throws -> Result + ) async throws -> Result func bookmarkData(for fileURL: URL) throws -> Data } package struct SecurityScopedFileAccessor: SecurityScopedFileAccessing { package init() {} - package func access(_ fileURL: URL, operation: @Sendable () throws -> Result) rethrows -> Result { - let started = fileURL.startAccessingSecurityScopedResource() + package func access(_ fileURL: URL, operation: @Sendable () throws -> Result) throws -> Result { + try self.access(fileURL, bookmarkData: nil) { _ in + try operation() + } + } + + package func access(_ fileURL: URL, operation: @Sendable () async throws -> Result) async throws -> Result { + try await self.access(fileURL, bookmarkData: nil) { _ in + try await operation() + } + } + + package func access( + _ fileURL: URL, + bookmarkData: Data?, + operation: @Sendable (_ accessibleURL: URL) throws -> Result + ) throws -> Result { + let scopedURL = self.resolveSecurityScopedURL(fileURL: fileURL, bookmarkData: bookmarkData) + let started = scopedURL.startAccessingSecurityScopedResource() + defer { + if started { + scopedURL.stopAccessingSecurityScopedResource() + } + } + return try operation(scopedURL) + } + + package func access( + _ fileURL: URL, + bookmarkData: Data?, + operation: @Sendable (_ accessibleURL: URL) async throws -> Result + ) async throws -> Result { + let scopedURL = self.resolveSecurityScopedURL(fileURL: fileURL, bookmarkData: bookmarkData) + let started = scopedURL.startAccessingSecurityScopedResource() defer { if started { - fileURL.stopAccessingSecurityScopedResource() + scopedURL.stopAccessingSecurityScopedResource() } } - return try operation() + return try await operation(scopedURL) } package func bookmarkData(for fileURL: URL) throws -> Data { - try fileURL.bookmarkData(options: [.minimalBookmark], includingResourceValuesForKeys: nil, relativeTo: nil) + try fileURL.bookmarkData(options: [.minimalBookmark, .withSecurityScope], includingResourceValuesForKeys: nil, relativeTo: nil) + } + + private func resolveSecurityScopedURL(fileURL: URL, bookmarkData: Data?) -> URL { + guard let bookmarkData else { + return fileURL + } + + var bookmarkDataIsStale = false + guard let resolvedURL = try? URL( + resolvingBookmarkData: bookmarkData, + options: [.withSecurityScope], + relativeTo: nil, + bookmarkDataIsStale: &bookmarkDataIsStale + ) else { + return fileURL + } + return resolvedURL.standardizedFileURL } } @@ -556,7 +617,8 @@ package protocol FileSystemProviding: Sendable { func fileExists(at url: URL) -> Bool func attributes(at url: URL) throws -> [FileAttributeKey: Any] func read(prefixFrom url: URL, maxLength: Int) throws -> Data - func readAll(from url: URL) throws -> Data + func readRange(from url: URL, offset: UInt64, maxLength: Int) throws -> Data + func inputStream(from url: URL, offset: UInt64, length: UInt64) throws -> InputStream func createDirectory(at url: URL) throws func copyItem(at sourceURL: URL, to destinationURL: URL) throws } @@ -578,8 +640,15 @@ package struct LocalFileSystem: FileSystemProviding { return try handle.read(upToCount: maxLength) ?? Data() } - package func readAll(from url: URL) throws -> Data { - try Data(contentsOf: url) + package func readRange(from url: URL, offset: UInt64, maxLength: Int) throws -> Data { + let handle = try FileHandle(forReadingFrom: url) + defer { try? handle.close() } + try handle.seek(toOffset: offset) + return try handle.read(upToCount: maxLength) ?? Data() + } + + package func inputStream(from url: URL, offset: UInt64, length: UInt64) throws -> InputStream { + FileRangeInputStream(fileURL: url, offset: offset, length: length) } package func createDirectory(at url: URL) throws { @@ -594,6 +663,123 @@ package struct LocalFileSystem: FileSystemProviding { } } +private final class FileRangeInputStream: InputStream, @unchecked Sendable { + private let fileURL: URL + private let offset: UInt64 + private let length: UInt64 + private var handle: FileHandle? + private var remaining: UInt64 + private var statusValue: Stream.Status = .notOpen + private var errorValue: (any Error)? + private var delegateValue: StreamDelegate? + + init(fileURL: URL, offset: UInt64, length: UInt64) { + self.fileURL = fileURL + self.offset = offset + self.length = length + self.remaining = length + super.init(data: Data()) + } + + override var streamStatus: Stream.Status { + self.statusValue + } + + override var streamError: (any Error)? { + self.errorValue + } + + override var delegate: StreamDelegate? { + get { + self.delegateValue + } + set { + self.delegateValue = newValue + } + } + + override var hasBytesAvailable: Bool { + self.statusValue == .open && self.remaining > 0 + } + + override func open() { + do { + let handle = try FileHandle(forReadingFrom: self.fileURL) + try handle.seek(toOffset: self.offset) + self.handle = handle + self.remaining = self.length + self.errorValue = nil + self.statusValue = self.remaining == 0 ? .atEnd : .open + } catch { + self.errorValue = error + self.statusValue = .error + } + } + + override func close() { + try? self.handle?.close() + self.handle = nil + if self.statusValue != .error { + self.statusValue = .closed + } + } + + override func read(_ buffer: UnsafeMutablePointer, maxLength len: Int) -> Int { + guard self.statusValue == .open else { + return self.statusValue == .atEnd ? 0 : -1 + } + guard self.remaining > 0 else { + self.statusValue = .atEnd + return 0 + } + guard let handle else { + self.errorValue = CocoaError(.fileReadUnknown) + self.statusValue = .error + return -1 + } + + do { + let readLength = min(len, Int(min(self.remaining, UInt64(Int.max)))) + guard readLength > 0 else { + self.statusValue = .atEnd + return 0 + } + let data = try handle.read(upToCount: readLength) ?? Data() + guard !data.isEmpty else { + self.statusValue = .atEnd + return 0 + } + data.copyBytes(to: buffer, count: data.count) + self.remaining -= UInt64(data.count) + if self.remaining == 0 { + self.statusValue = .atEnd + } + return data.count + } catch { + self.errorValue = error + self.statusValue = .error + return -1 + } + } + + override func schedule(in aRunLoop: RunLoop, forMode mode: RunLoop.Mode) { + _ = (aRunLoop, mode) + } + + override func remove(from aRunLoop: RunLoop, forMode mode: RunLoop.Mode) { + _ = (aRunLoop, mode) + } + + override func getBuffer( + _ buffer: UnsafeMutablePointer?>, + length len: UnsafeMutablePointer + ) -> Bool { + buffer.pointee = nil + len.pointee = 0 + return false + } +} + package struct PreparedLocalFile: Sendable, Equatable { package let sourceFileURL: URL package let managedFileURL: URL @@ -616,8 +802,19 @@ package struct PreparedLocalFile: Sendable, Equatable { } } +package struct LocalFileMD5Checksum: Sendable, Equatable { + package let hex: String + package let base64: String + + package init(hex: String, base64: String) { + self.hex = hex + self.base64 = base64 + } +} + package struct FileStagingCoordinator: Sendable { private static let modifiedAtComparisonTolerance: TimeInterval = 1 + private static let md5ChunkSize = 1024 * 1024 private let stagingRoot: URL private let fileSystem: any FileSystemProviding @@ -659,9 +856,8 @@ package struct FileStagingCoordinator: Sendable { firstChunkMD5Hex: Self.md5Hex(firstChunk) ) - let managedFileURL = persistence.copyExternalFileIntoManagedStaging - ? try self.copyToManagedStaging(sourceURL) - : sourceURL + _ = persistence.copyExternalFileIntoManagedStaging + let managedFileURL = sourceURL return PreparedLocalFile( sourceFileURL: sourceURL, @@ -675,33 +871,60 @@ package struct FileStagingCoordinator: Sendable { package func validatePreparedFile( managedFileURL: URL, + bookmarkData: Data? = nil, expectedFingerprint: LocalFileFingerprint ) throws { - guard self.fileSystem.fileExists(at: managedFileURL) else { - throw DataGatewayClientError.resumeNotPossible("managed file missing: \(managedFileURL.path)") - } + try self.securityScopedAccessor.access(managedFileURL, bookmarkData: bookmarkData) { accessibleURL in + guard self.fileSystem.fileExists(at: accessibleURL) else { + throw DataGatewayClientError.resumeNotPossible("source file missing: \(managedFileURL.path)") + } - let attributes = try self.fileSystem.attributes(at: managedFileURL) - let actualFingerprint = LocalFileFingerprint( - size: try Self.resolveFileSize(from: attributes), - modifiedAt: attributes[.modificationDate] as? Date, - firstChunkMD5Hex: Self.md5Hex(try self.fileSystem.read(prefixFrom: managedFileURL, maxLength: 1024 * 1024)) - ) + let attributes = try self.fileSystem.attributes(at: accessibleURL) + let actualFingerprint = LocalFileFingerprint( + size: try Self.resolveFileSize(from: attributes), + modifiedAt: attributes[.modificationDate] as? Date, + firstChunkMD5Hex: Self.md5Hex(try self.fileSystem.read(prefixFrom: accessibleURL, maxLength: 1024 * 1024)) + ) - guard Self.fingerprintsMatch(actual: actualFingerprint, expected: expectedFingerprint) else { - throw DataGatewayClientError.resumeNotPossible("local file fingerprint changed") + guard Self.fingerprintsMatch(actual: actualFingerprint, expected: expectedFingerprint) else { + throw DataGatewayClientError.resumeNotPossible("local file fingerprint changed") + } } } - package func readAll(from fileURL: URL) throws -> Data { - try self.fileSystem.readAll(from: fileURL) + package func accessPreparedFile( + fileURL: URL, + bookmarkData: Data?, + operation: @Sendable (_ accessibleURL: URL) async throws -> Result + ) async throws -> Result { + try await self.securityScopedAccessor.access(fileURL, bookmarkData: bookmarkData, operation: operation) } - private func copyToManagedStaging(_ sourceURL: URL) throws -> URL { - try self.fileSystem.createDirectory(at: self.stagingRoot) - let destinationURL = self.stagingRoot.appendingPathComponent(UUID().uuidString).appendingPathExtension(sourceURL.pathExtension) - try self.fileSystem.copyItem(at: sourceURL, to: destinationURL) - return destinationURL + package func inputStream(from fileURL: URL, offset: UInt64, length: UInt64) throws -> InputStream { + try self.fileSystem.inputStream(from: fileURL, offset: offset, length: length) + } + + package func md5Checksum(from fileURL: URL, offset: UInt64, length: UInt64) throws -> LocalFileMD5Checksum { + var hasher = Insecure.MD5() + var cursor = offset + var remaining = length + + while remaining > 0 { + let chunkLength = min(Self.md5ChunkSize, Int(min(remaining, UInt64(Self.md5ChunkSize)))) + let chunk = try self.fileSystem.readRange(from: fileURL, offset: cursor, maxLength: chunkLength) + guard !chunk.isEmpty else { + throw DataGatewayClientError.invalidLocalFile("unexpected EOF while reading local file range") + } + hasher.update(data: chunk) + cursor += UInt64(chunk.count) + remaining -= UInt64(chunk.count) + } + + let digest = hasher.finalize() + return LocalFileMD5Checksum( + hex: Self.md5Hex(digest), + base64: Data(digest).base64EncodedString() + ) } private static func resolveFileSize(from attributes: [FileAttributeKey: Any]) throws -> UInt64 { @@ -713,6 +936,10 @@ package struct FileStagingCoordinator: Sendable { private static func md5Hex(_ data: Data) -> String { let digest = Insecure.MD5.hash(data: data) + return Self.md5Hex(digest) + } + + private static func md5Hex(_ digest: Insecure.MD5.Digest) -> String { return digest.map { String(format: "%02X", $0) }.joined() } @@ -951,9 +1178,9 @@ package protocol UploadCoordinatorMultipartSessionProtocol: Sendable { func uploadPart( multipartUploadID: String, partNumber: Int, - body: Data + body: OssUploadBody ) async throws -> UploadedPartDescriptor - func putObject(body: Data) async throws -> UploadedPartDescriptor + func putObject(body: OssUploadBody) async throws -> UploadedPartDescriptor func listParts(multipartUploadID: String) async throws -> [UploadedPartDescriptor] func headObjectETag() async throws -> String func completeMultipartUpload( @@ -1083,6 +1310,7 @@ public actor UploadCoordinator { await self.emitLog(operation: "resume", logicalUploadID: logicalUploadID, phase: "resolving", message: "resuming persisted upload") try self.dependencies.fileCoordinator.validatePreparedFile( managedFileURL: state.managedFileURL, + bookmarkData: state.fileURLBookmarkData, expectedFingerprint: state.fileFingerprint ) @@ -1310,27 +1538,13 @@ public actor UploadCoordinator { throw DataGatewayClientError.uploadRestartExceeded } - await onEvent?(.completingBusinessUpload(uploadID: resumedState.uploadID)) - _ = try await self.dependencies.gatewayClient.completeUpload( - uploadID: resumedState.uploadID, - fileSize: Int64(resumedState.fileSize), - rawTags: resumedState.rawTags, - completedPartCount: Int32(resumedState.uploadedParts.count), - ossObjectEtag: remoteETag, - partSizeBytes: Int64(resumedState.partSizeBytes) - ) - - resumedState.phase = .businessCompleting - resumedState.updatedAt = await self.dependencies.clock.now() - try await self.dependencies.stateStore.moveToCompleted(resumedState) - - let result = UploadResult( - logicalUploadID: resumedState.logicalUploadID, - uploadID: resumedState.uploadID, - bucket: resumedState.bucket, - objectKey: resumedState.objectKey, - fileSize: resumedState.fileSize, - ossObjectETag: remoteETag + let result = try await self.completeBusinessUpload( + state: &resumedState, + uploadCompletion: DataPlaneUploadCompletion( + completedPartCount: Int32(resumedState.uploadedParts.count), + ossObjectETag: remoteETag + ), + onEvent: onEvent ) await onEvent?(.completed(result)) return result @@ -1419,19 +1633,39 @@ public actor UploadCoordinator { ) async throws -> DataPlaneUploadCompletion { try await self.refreshUploadSessionIfNeeded(session: session, state: &state, onEvent: onEvent) - let body = try self.dependencies.fileCoordinator.readAll(from: state.managedFileURL) - await onEvent?(.uploadingPart(partNumber: 1, sentBytes: UInt64(body.count), totalBytes: state.fileSize)) - await self.emitMetric("upload_part", dimensions: ["upload_id": state.uploadID, "part_number": "1"]) + let bodySize = state.fileSize + let uploadID = state.uploadID + let totalBytes = state.fileSize + let upload = try await self.dependencies.fileCoordinator.accessPreparedFile( + fileURL: state.managedFileURL, + bookmarkData: state.fileURLBookmarkData + ) { accessibleFileURL in + let checksum = try self.dependencies.fileCoordinator.md5Checksum( + from: accessibleFileURL, + offset: 0, + length: bodySize + ) + let body = OssUploadBody.file( + accessibleFileURL, + sizeBytes: try Self.int64Size(bodySize), + contentMD5Base64: checksum.base64 + ) + await onEvent?(.uploadingPart(partNumber: 1, sentBytes: bodySize, totalBytes: totalBytes)) + await self.emitMetric("upload_part", dimensions: ["upload_id": uploadID, "part_number": "1"]) - let descriptor = try await session.putObject(body: body) + let descriptor = try await session.putObject(body: body) + return (descriptor, checksum.hex) + } + let descriptor = upload.0 + let md5Hex = upload.1 state.multipartUploadID = nil state.uploadedParts = [ PersistedUploadedPart( partNumber: 1, etag: descriptor.etag, offsetStart: 0, - partSize: UInt64(body.count), - md5Hex: Self.md5Hex(body) + partSize: bodySize, + md5Hex: md5Hex ), ] state.phase = .multipartCompleted @@ -1459,8 +1693,11 @@ public actor UploadCoordinator { try await self.dependencies.stateStore.saveActive(state) } - let partSize = Int(state.partSizeBytes) - let parts = try self.loadFileParts(from: state.managedFileURL, partSize: partSize) + let partSize = state.partSizeBytes + guard partSize > 0 else { + throw DataGatewayClientError.invalidConfiguration("partSizeBytes must be greater than 0") + } + let partCount = Int((state.fileSize + partSize - 1) / partSize) var persistedPartsByNumber = Dictionary(uniqueKeysWithValues: state.uploadedParts.map { ($0.partNumber, $0) }) var uploadedDescriptors = state.uploadedParts .sorted(by: { $0.partNumber < $1.partNumber }) @@ -1474,28 +1711,53 @@ public actor UploadCoordinator { ) } - for (index, part) in parts.enumerated() { + for index in 0 ..< partCount { let partNumber = index + 1 if persistedPartsByNumber[partNumber] != nil { continue } + let offsetStart = UInt64(index) * partSize + let currentPartSize = min(partSize, state.fileSize - offsetStart) try await self.refreshUploadSessionIfNeeded(session: session, state: &state, onEvent: onEvent) - await onEvent?(.uploadingPart(partNumber: partNumber, sentBytes: UInt64(part.count), totalBytes: state.fileSize)) + await onEvent?(.uploadingPart(partNumber: partNumber, sentBytes: currentPartSize, totalBytes: state.fileSize)) await self.emitMetric("upload_part", dimensions: ["upload_id": state.uploadID, "part_number": String(partNumber)]) - let descriptor = try await session.uploadPart( - multipartUploadID: multipartUploadID, - partNumber: partNumber, - body: part - ) + let upload = try await self.dependencies.fileCoordinator.accessPreparedFile( + fileURL: state.managedFileURL, + bookmarkData: state.fileURLBookmarkData + ) { accessibleFileURL in + let checksum = try self.dependencies.fileCoordinator.md5Checksum( + from: accessibleFileURL, + offset: offsetStart, + length: currentPartSize + ) + let fileCoordinator = self.dependencies.fileCoordinator + let descriptor = try await session.uploadPart( + multipartUploadID: multipartUploadID, + partNumber: partNumber, + body: .stream( + sizeBytes: try Self.int64Size(currentPartSize), + contentMD5Base64: checksum.base64 + ) { + try fileCoordinator.inputStream( + from: accessibleFileURL, + offset: offsetStart, + length: currentPartSize + ) + } + ) + return (descriptor, checksum.hex) + } + let descriptor = upload.0 + let md5Hex = upload.1 uploadedDescriptors.append(descriptor) persistedPartsByNumber[partNumber] = PersistedUploadedPart( partNumber: partNumber, etag: descriptor.etag, - offsetStart: UInt64(index * partSize), - partSize: UInt64(part.count), - md5Hex: Self.md5Hex(part) + offsetStart: offsetStart, + partSize: currentPartSize, + md5Hex: md5Hex ) state.uploadedParts = persistedPartsByNumber.values.sorted(by: { $0.partNumber < $1.partNumber }) state.phase = .uploading @@ -1559,6 +1821,11 @@ public actor UploadCoordinator { uploadCompletion: DataPlaneUploadCompletion, onEvent: (@Sendable (UploadEvent) async -> Void)? ) async throws -> UploadResult { + try self.dependencies.fileCoordinator.validatePreparedFile( + managedFileURL: state.managedFileURL, + bookmarkData: state.fileURLBookmarkData, + expectedFingerprint: state.fileFingerprint + ) await onEvent?(.completingBusinessUpload(uploadID: state.uploadID)) _ = try await self.dependencies.gatewayClient.completeUpload( uploadID: state.uploadID, @@ -1643,14 +1910,6 @@ public actor UploadCoordinator { return message } - private func loadFileParts(from fileURL: URL, partSize: Int) throws -> [Data] { - let data = try self.dependencies.fileCoordinator.readAll(from: fileURL) - return stride(from: 0, to: data.count, by: partSize).map { offset in - let end = min(offset + partSize, data.count) - return data.subdata(in: offset ..< end) - } - } - private static func makeDate(fromUnix unix: Int64) -> Date? { guard unix > 0 else { return nil @@ -1753,9 +2012,11 @@ public actor UploadCoordinator { return .continueUpload(merged.values.sorted(by: { $0.partNumber < $1.partNumber })) } - private static func md5Hex(_ data: Data) -> String { - let digest = Insecure.MD5.hash(data: data) - return digest.map { String(format: "%02X", $0) }.joined() + private static func int64Size(_ size: UInt64) throws -> Int64 { + guard size <= UInt64(Int64.max) else { + throw DataGatewayClientError.invalidLocalFile("local file size exceeds supported OSS request length") + } + return Int64(size) } } diff --git a/Sources/DataGatewayClient/QiongcheDeviceProvisioner.swift b/Sources/DataGatewayClient/QiongcheDeviceProvisioner.swift index 3f3f2a1..dca962d 100644 --- a/Sources/DataGatewayClient/QiongcheDeviceProvisioner.swift +++ b/Sources/DataGatewayClient/QiongcheDeviceProvisioner.swift @@ -97,9 +97,14 @@ package struct DefaultQiongcheDeviceProvisioner: QiongcheDeviceProvisioning { private extension DataGatewayClientError { var isDeviceAlreadyInitialized: Bool { - guard case .gatewayFailed(_, let detailCode, _) = self else { + guard case .gatewayFailed(let statusCode, let detailCode, let message) = self else { return false } - return detailCode == DeviceInitGatewayDetailCode.alreadyInitialized + if detailCode == DeviceInitGatewayDetailCode.alreadyInitialized { + return true + } + return statusCode == 9 + && message.localizedCaseInsensitiveContains("already") + && message.localizedCaseInsensitiveContains("initialized") } } diff --git a/Sources/DataGatewayClient/TestHarnessSupport.swift b/Sources/DataGatewayClient/TestHarnessSupport.swift index b9db848..b95eab2 100644 --- a/Sources/DataGatewayClient/TestHarnessSupport.swift +++ b/Sources/DataGatewayClient/TestHarnessSupport.swift @@ -388,23 +388,23 @@ package struct LocalStackMockMultipartSession: UploadCoordinatorMultipartSession package func uploadPart( multipartUploadID: String, partNumber: Int, - body: Data + body: OssUploadBody ) async throws -> UploadedPartDescriptor { _ = multipartUploadID return UploadedPartDescriptor( partNumber: partNumber, etag: "\"runtime-part-\(partNumber)\"", - size: Int64(body.count), + size: body.sizeBytes, lastModified: nil, hashCRC64: nil ) } - package func putObject(body: Data) async throws -> UploadedPartDescriptor { + package func putObject(body: OssUploadBody) async throws -> UploadedPartDescriptor { UploadedPartDescriptor( partNumber: 1, etag: self.completedETag, - size: Int64(body.count), + size: body.sizeBytes, lastModified: nil, hashCRC64: nil ) diff --git a/Tests/DGWOssTests/OssMultipartClientTests.swift b/Tests/DGWOssTests/OssMultipartClientTests.swift index 5554178..992046a 100644 --- a/Tests/DGWOssTests/OssMultipartClientTests.swift +++ b/Tests/DGWOssTests/OssMultipartClientTests.swift @@ -36,11 +36,11 @@ import Testing objectKey: "objects/demo.bin", multipartUploadID: "upload-1", partNumber: 7, - body: Data("abc".utf8) + body: .data(Data("abc".utf8)) ) _ = try await client.putObject( objectKey: "objects/demo.bin", - body: Data("put".utf8) + body: .data(Data("put".utf8)) ) _ = try await client.completeMultipartUpload( objectKey: "objects/demo.bin", @@ -131,7 +131,7 @@ import Testing objectKey: "objects/demo.bin", multipartUploadID: "upload-1", partNumber: 2, - body: Data("abcd".utf8) + body: .data(Data("abcd".utf8)) ) #expect(uploadedPart == UploadedPartDescriptor( partNumber: 2, @@ -143,7 +143,7 @@ import Testing let putObject = try await client.putObject( objectKey: "objects/demo.bin", - body: Data("put-body".utf8) + body: .data(Data("put-body".utf8)) ) #expect(putObject == UploadedPartDescriptor( partNumber: 1, @@ -175,6 +175,42 @@ import Testing #expect(headETag == "\"etag-head\"") } +@Test func ossMultipartClientAddsContentMD5ForExplicitIntegrityBodies() async throws { + let root = FileManager.default.temporaryDirectory.appendingPathComponent("oss-md5-\(UUID().uuidString)", isDirectory: true) + try FileManager.default.createDirectory(at: root, withIntermediateDirectories: true) + let fileURL = root.appendingPathComponent("body.bin") + try Data("put-body".utf8).write(to: fileURL) + let sdkClient = MockAlibabaOSSSDKClient( + initiateValue: OssInitiateMultipartUploadOutput(uploadID: "upload-1"), + uploadPartValue: OssUploadPartOutput(etag: "\"etag-1\""), + completeValue: OssCompleteMultipartUploadOutput(etag: "\"etag-complete\""), + listValues: [], + headValue: OssHeadObjectOutput(etag: "\"etag-head\"") + ) + let client = try OssMultipartClient(configuration: makeConfiguration(), sdkClient: sdkClient) + + _ = try await client.uploadPart( + objectKey: "objects/demo.bin", + multipartUploadID: "upload-1", + partNumber: 1, + body: .stream(sizeBytes: 3, contentMD5Base64: "part-md5-base64") { + InputStream(data: Data("abc".utf8)) + } + ) + _ = try await client.putObject( + objectKey: "objects/demo.bin", + body: .file(fileURL, sizeBytes: 8, contentMD5Base64: "put-md5-base64") + ) + + let uploadRequests = await sdkClient.uploadPartRequests() + #expect(uploadRequests[0].commonProp.headers?["content-md5"] == "part-md5-base64") + #expect(uploadRequests[0].commonProp.headers?["content-length"] == "3") + + let putRequests = await sdkClient.putRequests() + #expect(putRequests[0].commonProp.headers?["content-md5"] == "put-md5-base64") + #expect(putRequests[0].commonProp.headers?["content-length"] == "8") +} + @Test func listPartsMergesPaginatorPagesInOrder() async throws { let sdkClient = MockAlibabaOSSSDKClient( initiateValue: OssInitiateMultipartUploadOutput(uploadID: nil), @@ -214,7 +250,7 @@ import Testing let client = try OssMultipartClient(configuration: makeConfiguration(), sdkClient: sdkClient) let error = await #expect(throws: OssOperationError.self) { - try await client.putObject(objectKey: "objects/demo.bin", body: Data("body".utf8)) + try await client.putObject(objectKey: "objects/demo.bin", body: .data(Data("body".utf8))) } #expect(error == .invalidResponse("PutObject response missing ETag")) @@ -232,7 +268,7 @@ import Testing let client = try OssMultipartClient(configuration: makeConfiguration(), sdkClient: sdkClient) let error = await #expect(throws: OssOperationError.self) { - try await client.putObject(objectKey: "objects/demo.bin", body: Data("body".utf8)) + try await client.putObject(objectKey: "objects/demo.bin", body: .data(Data("body".utf8))) } #expect(error == .transportFailure(code: URLError.Code.timedOut.rawValue, message: URLError(.timedOut).localizedDescription)) @@ -280,7 +316,7 @@ import Testing let uploadedPart = try await session.uploadPart( multipartUploadID: "multipart-1", partNumber: 1, - body: Data("abc".utf8) + body: .data(Data("abc".utf8)) ) #expect(uploadedPart.etag == "\"etag-refreshed\"") @@ -346,9 +382,9 @@ import Testing let uploadedPart = try await session.uploadPart( multipartUploadID: "multipart-1", partNumber: 2, - body: Data("data".utf8) + body: .data(Data("data".utf8)) ) - let putObject = try await session.putObject(body: Data("blob".utf8)) + let putObject = try await session.putObject(body: .data(Data("blob".utf8))) let completeETag = try await session.completeMultipartUpload( multipartUploadID: "multipart-1", parts: [uploadedPart] @@ -680,13 +716,13 @@ private actor RecordingMultipartClient: OssMultipartClientProtocol { objectKey: String, multipartUploadID: String, partNumber: Int, - body: Data + body: OssUploadBody ) async throws -> UploadedPartDescriptor { self.recordedUploadPartCalls.append("\(multipartUploadID):\(partNumber)") return UploadedPartDescriptor( partNumber: partNumber, etag: self.uploadPartResult.etag, - size: Int64(body.count), + size: body.sizeBytes, lastModified: self.uploadPartResult.lastModified, hashCRC64: self.uploadPartResult.hashCRC64 ) @@ -694,13 +730,13 @@ private actor RecordingMultipartClient: OssMultipartClientProtocol { func putObject( objectKey: String, - body: Data + body: OssUploadBody ) async throws -> UploadedPartDescriptor { - self.recordedPutObjectCalls.append("\(objectKey):\(body.count)") + self.recordedPutObjectCalls.append("\(objectKey):\(body.sizeBytes)") return UploadedPartDescriptor( partNumber: 1, etag: self.putObjectResult.etag, - size: Int64(body.count), + size: body.sizeBytes, lastModified: self.putObjectResult.lastModified, hashCRC64: self.putObjectResult.hashCRC64 ) @@ -764,14 +800,14 @@ private actor ThrowingMultipartClient: OssMultipartClientProtocol { objectKey: String, multipartUploadID: String, partNumber: Int, - body: Data + body: OssUploadBody ) async throws -> UploadedPartDescriptor { throw self.error } func putObject( objectKey: String, - body: Data + body: OssUploadBody ) async throws -> UploadedPartDescriptor { throw self.error } diff --git a/Tests/DataGatewayClientIntegrationTests/ArchebaseConfigClientTests.swift b/Tests/DataGatewayClientIntegrationTests/ArchebaseConfigClientTests.swift index bc15925..2526571 100644 --- a/Tests/DataGatewayClientIntegrationTests/ArchebaseConfigClientTests.swift +++ b/Tests/DataGatewayClientIntegrationTests/ArchebaseConfigClientTests.swift @@ -323,12 +323,12 @@ private actor FakeMultipartSession: UploadCoordinatorMultipartSessionProtocol { func initiateMultipartUpload() async throws -> String { "multipart-1" } - func uploadPart(multipartUploadID: String, partNumber: Int, body: Data) async throws -> UploadedPartDescriptor { - UploadedPartDescriptor(partNumber: partNumber, etag: "\"etag-\(partNumber)\"", size: Int64(body.count), lastModified: nil, hashCRC64: nil) + func uploadPart(multipartUploadID: String, partNumber: Int, body: OssUploadBody) async throws -> UploadedPartDescriptor { + UploadedPartDescriptor(partNumber: partNumber, etag: "\"etag-\(partNumber)\"", size: body.sizeBytes, lastModified: nil, hashCRC64: nil) } - func putObject(body: Data) async throws -> UploadedPartDescriptor { - UploadedPartDescriptor(partNumber: 1, etag: "\"etag-object\"", size: Int64(body.count), lastModified: nil, hashCRC64: nil) + func putObject(body: OssUploadBody) async throws -> UploadedPartDescriptor { + UploadedPartDescriptor(partNumber: 1, etag: "\"etag-object\"", size: body.sizeBytes, lastModified: nil, hashCRC64: nil) } func listParts(multipartUploadID: String) async throws -> [UploadedPartDescriptor] { [] } diff --git a/Tests/DataGatewayClientIntegrationTests/FilePreparationTests.swift b/Tests/DataGatewayClientIntegrationTests/FilePreparationTests.swift index 67f530b..8f3ceb2 100644 --- a/Tests/DataGatewayClientIntegrationTests/FilePreparationTests.swift +++ b/Tests/DataGatewayClientIntegrationTests/FilePreparationTests.swift @@ -6,16 +6,102 @@ import Testing @testable import DataGatewayClient final class PassthroughSecurityScopedAccessor: SecurityScopedFileAccessing, @unchecked Sendable { - func access(_ fileURL: URL, operation: @Sendable () throws -> Result) rethrows -> Result where Result : Sendable { + func access(_ fileURL: URL, operation: @Sendable () throws -> Result) throws -> Result where Result: Sendable { _ = fileURL return try operation() } + func access(_ fileURL: URL, operation: @Sendable () async throws -> Result) async throws -> Result where Result: Sendable { + _ = fileURL + return try await operation() + } + + func access( + _ fileURL: URL, + bookmarkData: Data?, + operation: @Sendable (_ accessibleURL: URL) throws -> Result + ) throws -> Result where Result: Sendable { + _ = bookmarkData + return try operation(fileURL) + } + + func access( + _ fileURL: URL, + bookmarkData: Data?, + operation: @Sendable (_ accessibleURL: URL) async throws -> Result + ) async throws -> Result where Result: Sendable { + _ = bookmarkData + return try await operation(fileURL) + } + func bookmarkData(for fileURL: URL) throws -> Data { Data("bookmark:\(fileURL.path)".utf8) } } +final class RecordingSecurityScopedAccessor: SecurityScopedFileAccessing, @unchecked Sendable { + struct AccessRecord: Equatable { + let fileURL: URL + let bookmarkData: Data? + } + + private let lock = NSLock() + private var records: [AccessRecord] = [] + + func access(_ fileURL: URL, operation: @Sendable () throws -> Result) throws -> Result where Result: Sendable { + self.record(fileURL: fileURL, bookmarkData: nil) + return try operation() + } + + func access(_ fileURL: URL, operation: @Sendable () async throws -> Result) async throws -> Result where Result: Sendable { + self.record(fileURL: fileURL, bookmarkData: nil) + return try await operation() + } + + func access( + _ fileURL: URL, + bookmarkData: Data?, + operation: @Sendable (_ accessibleURL: URL) throws -> Result + ) throws -> Result where Result: Sendable { + self.record(fileURL: fileURL, bookmarkData: bookmarkData) + return try operation(fileURL) + } + + func access( + _ fileURL: URL, + bookmarkData: Data?, + operation: @Sendable (_ accessibleURL: URL) async throws -> Result + ) async throws -> Result where Result: Sendable { + self.record(fileURL: fileURL, bookmarkData: bookmarkData) + return try await operation(fileURL) + } + + func bookmarkData(for fileURL: URL) throws -> Data { + Data("bookmark:\(fileURL.path)".utf8) + } + + func accessRecords() -> [AccessRecord] { + self.lock.lock() + defer { self.lock.unlock() } + return self.records + } + + private func record(fileURL: URL, bookmarkData: Data?) { + self.lock.lock() + defer { self.lock.unlock() } + self.records.append(AccessRecord(fileURL: fileURL, bookmarkData: bookmarkData)) + } +} + +final class RecordingStreamDelegate: NSObject, StreamDelegate { + private(set) var events: [Stream.Event] = [] + + func stream(_ aStream: Stream, handle eventCode: Stream.Event) { + _ = aStream + self.events.append(eventCode) + } +} + final class MemoryFileSystem: FileSystemProviding, @unchecked Sendable { struct Entry { let size: UInt64 @@ -63,6 +149,20 @@ final class MemoryFileSystem: FileSystemProviding, @unchecked Sendable { return entry.data } + func readRange(from url: URL, offset: UInt64, maxLength: Int) throws -> Data { + guard let entry = self.storage[url.standardizedFileURL] else { + throw CocoaError(.fileNoSuchFile) + } + let start = min(Int(offset), entry.data.count) + let end = min(start + maxLength, entry.data.count) + return entry.data.subdata(in: start ..< end) + } + + func inputStream(from url: URL, offset: UInt64, length: UInt64) throws -> InputStream { + let data = try self.readRange(from: url, offset: offset, maxLength: Int(length)) + return InputStream(data: data) + } + func createDirectory(at url: URL) throws { _ = url } @@ -425,7 +525,7 @@ private extension Dictionary { #expect(error == .zeroByteFile) } -@Test func externalFileIsCopiedIntoManagedStaging() throws { +@Test func externalFileUsesSourceURLWithoutManagedStagingCopy() throws { let sourceURL = URL(fileURLWithPath: "/external/photo.heic") let stagingRoot = URL(fileURLWithPath: "/sandbox/staging") let data = Data("robot-camera-data".utf8) @@ -445,9 +545,8 @@ private extension Dictionary { ) #expect(prepared.sourceFileURL == sourceURL) - #expect(prepared.managedFileURL != sourceURL) - #expect(prepared.managedFileURL.path.hasPrefix(stagingRoot.path)) - #expect(filesystem.copiedItems().count == 1) + #expect(prepared.managedFileURL == sourceURL) + #expect(filesystem.copiedItems().isEmpty) #expect(prepared.fileSize == UInt64(data.count)) #expect(prepared.fingerprint == LocalFileFingerprint( size: UInt64(data.count), @@ -457,6 +556,27 @@ private extension Dictionary { #expect(prepared.bookmarkData == Data("bookmark:/external/photo.heic".utf8)) } +@Test func fileRangeInputStreamSupportsCFNetworkDelegateOperations() throws { + let root = try filePreparationTemporaryRoot() + let sourceURL = root.appendingPathComponent("range-stream.bin") + try Data("0123456789".utf8).write(to: sourceURL) + let coordinator = FileStagingCoordinator(stagingRoot: root.appendingPathComponent("staging", isDirectory: true)) + let delegate = RecordingStreamDelegate() + let stream = try coordinator.inputStream(from: sourceURL, offset: 2, length: 5) + var buffer = [UInt8](repeating: 0, count: 8) + + stream.delegate = delegate + stream.schedule(in: .current, forMode: .default) + stream.open() + let count = stream.read(&buffer, maxLength: buffer.count) + stream.close() + stream.remove(from: .current, forMode: .default) + stream.delegate = nil + + #expect(count == 5) + #expect(Data(buffer.prefix(count)) == Data("23456".utf8)) +} + @Test func missingManagedFileMakesResumeImpossible() { let managedURL = URL(fileURLWithPath: "/sandbox/staging/missing.bin") let coordinator = FileStagingCoordinator( @@ -476,7 +596,7 @@ private extension Dictionary { ) } - #expect(error == .resumeNotPossible("managed file missing: /sandbox/staging/missing.bin")) + #expect(error == .resumeNotPossible("source file missing: /sandbox/staging/missing.bin")) } @Test func fingerprintMismatchMakesResumeImpossible() throws { @@ -505,6 +625,36 @@ private extension Dictionary { #expect(error == .resumeNotPossible("local file fingerprint changed")) } +@Test func fingerprintValidationUsesBookmarkScopedAccess() throws { + let managedURL = URL(fileURLWithPath: "/external/scoped-demo.bin") + let bookmark = Data("bookmark:/external/scoped-demo.bin".utf8) + let data = Data("robot-data-scoped".utf8) + let accessor = RecordingSecurityScopedAccessor() + let filesystem = MemoryFileSystem(files: [ + managedURL: .file(size: UInt64(data.count), modifiedAt: Date(timeIntervalSince1970: 100), data: data), + ]) + let coordinator = FileStagingCoordinator( + stagingRoot: URL(fileURLWithPath: "/sandbox/staging"), + fileSystem: filesystem, + securityScopedAccessor: accessor + ) + + try coordinator.validatePreparedFile( + managedFileURL: managedURL, + bookmarkData: bookmark, + expectedFingerprint: LocalFileFingerprint( + size: UInt64(data.count), + modifiedAt: Date(timeIntervalSince1970: 100), + firstChunkMD5Hex: "E0156588AFEF4061755164862427C151" + ) + ) + + #expect(accessor.accessRecords().contains(RecordingSecurityScopedAccessor.AccessRecord( + fileURL: managedURL, + bookmarkData: bookmark + ))) +} + @Test func fingerprintValidationToleratesFilesystemModifiedAtPrecisionDrift() throws { let managedURL = URL(fileURLWithPath: "/sandbox/staging/demo-drift.bin") let data = Data("robot-data-drift".utf8) diff --git a/Tests/DataGatewayClientIntegrationTests/LocalStackHarnessTests.swift b/Tests/DataGatewayClientIntegrationTests/LocalStackHarnessTests.swift index 8f27bda..40f013a 100644 --- a/Tests/DataGatewayClientIntegrationTests/LocalStackHarnessTests.swift +++ b/Tests/DataGatewayClientIntegrationTests/LocalStackHarnessTests.swift @@ -571,6 +571,7 @@ struct LocalStackHarnessTests { #expect(result.bucket == expectation.bucket) #expect(result.objectKey.hasPrefix(expectation.objectPrefix)) #expect(!result.ossObjectETag.isEmpty) + try assertNoStagingDataCopies(under: clientConfig.persistRootURL) let pending = try await client.listPendingUploads() #expect(!pending.contains(where: { $0.logicalUploadID == result.logicalUploadID })) @@ -645,6 +646,7 @@ struct LocalStackHarnessTests { #expect(result.bucket == expectation.bucket) #expect(result.objectKey.hasPrefix(expectation.objectPrefix)) #expect(!result.ossObjectETag.isEmpty) + try assertNoStagingDataCopies(under: clientConfig.persistRootURL) } @Test( @@ -677,6 +679,7 @@ struct LocalStackHarnessTests { #expect(result.bucket == expectation.bucket) #expect(result.objectKey.hasPrefix(expectation.objectPrefix)) #expect(!result.ossObjectETag.isEmpty) + try assertNoStagingDataCopies(under: clientConfig.persistRootURL) } @Test( @@ -712,6 +715,7 @@ struct LocalStackHarnessTests { #expect(result.bucket == expectation.bucket) #expect(result.objectKey.hasPrefix(expectation.objectPrefix)) #expect(!result.ossObjectETag.isEmpty) + try assertNoStagingDataCopies(under: clientConfig.persistRootURL) } @Test( @@ -761,6 +765,7 @@ struct LocalStackHarnessTests { #expect(result.bucket == expectation.bucket) #expect(result.objectKey.hasPrefix(expectation.objectPrefix)) #expect(!result.ossObjectETag.isEmpty) + try assertNoStagingDataCopies(under: clientConfig.persistRootURL) } @Test( @@ -790,6 +795,7 @@ struct LocalStackHarnessTests { #expect(result.bucket == expectation.bucket) #expect(result.objectKey.hasPrefix(expectation.objectPrefix)) #expect(!result.ossObjectETag.isEmpty) + try assertNoStagingDataCopies(under: clientConfig.persistRootURL) let pendingAfterResume = try await client.listPendingUploads() #expect(!pendingAfterResume.contains(where: { $0.logicalUploadID == state.logicalUploadID })) } @@ -840,42 +846,33 @@ struct LocalStackHarnessTests { let clientConfig = try uniqueRealClientConfig(from: environment.makeRemoteClientConfig(), label: "device-init") defer { try? FileManager.default.removeItem(at: clientConfig.persistRootURL) } let deviceID = try requiredValueFromEnvironment("DGW_REAL_DEVICE_ID") - let configURL = clientConfig.persistRootURL.appendingPathComponent("archebase-config.json") - let initializer: ArchebaseDeviceInitializer - if publicDNSIntegrationEnabled { - let endpointsURL = clientConfig.persistRootURL.appendingPathComponent(ArchebasePublicEndpoints.endpointsFileName) - initializer = try ArchebaseDeviceInitializer( - config: DeviceInitClientConfig(configURL: configURL, endpointsURL: endpointsURL) - ) - } else { - let initEndpoint = try requiredURLFromEnvironment("DGW_REAL_INIT_ENDPOINT") - let initTLS: TLSMode = initEndpoint.scheme?.lowercased() == "https" ? .tls : .plaintext - initializer = try ArchebaseDeviceInitializer( - config: DeviceInitClientConfig(configURL: configURL, tls: initTLS), - initEndpoint: initEndpoint, - sdkVersion: "aliyun-e2e", - platform: "ios-simulator" - ) - } + let paths = try QiongcheSDKPaths(rootURL: clientConfig.persistRootURL) + let configString = try realQiongcheConfigString(deviceID: deviceID, clientConfig: clientConfig) + let qiongcheSDK = try QiongcheDataGatewaySDK( + rootURL: clientConfig.persistRootURL, + deviceInitTimeout: clientConfig.requestTimeout, + readinessTimeout: clientConfig.requestTimeout + ) - let initializedConfig = try await initializer.initDevice(deviceID: deviceID) + try await qiongcheSDK.saveConfigAndInit(configString: configString) + let initializedConfig = try await ArchebaseConfigStore(configURL: paths.configURL).load() #expect(!initializedConfig.apiKey.isEmpty) - #expect(try await ArchebaseConfigStore(configURL: configURL).load() == initializedConfig) + #expect(try QiongcheSDKStateStore(stateURL: paths.stateURL).load().deviceID == deviceID) - let reinitializedConfig = try await initializer.reinitDevice(deviceID: deviceID) + try await qiongcheSDK.saveConfigAndInit(configString: configString) + let reinitializedConfig = try await ArchebaseConfigStore(configURL: paths.configURL).load() #expect(!reinitializedConfig.apiKey.isEmpty) - #expect(try await ArchebaseConfigStore(configURL: configURL).load() == reinitializedConfig) + #expect(try QiongcheSDKStateStore(stateURL: paths.stateURL).load().deviceID == deviceID) + #expect(await qiongcheSDK.isReadyToUpload()) - let client = try await DataGatewayClient.testFromArchebaseConfig( - authEndpoint: clientConfig.authEndpoint, - gatewayEndpoint: clientConfig.gatewayEndpoint, - configURL: configURL, - persistRootURL: clientConfig.persistRootURL, - tls: clientConfig.tls + let client = try await DataGatewayClient.fromArchebaseConfig( + configURL: paths.configURL, + persistRootURL: paths.persistRootURL, + endpointsURL: paths.endpointsURL ) let fileURL = try writeRealPayload( Data("aliyun-real-device-init-payload-\(UUID().uuidString)".utf8), - under: clientConfig.persistRootURL, + under: paths.persistRootURL, name: "aliyun-real-device-init" ) let result = try await client.upload( @@ -885,6 +882,7 @@ struct LocalStackHarnessTests { #expect(result.bucket == expectation.bucket) #expect(result.objectKey.hasPrefix(expectation.objectPrefix)) #expect(!result.ossObjectETag.isEmpty) + try assertNoStagingDataCopies(under: paths.persistRootURL) } } @@ -926,6 +924,59 @@ private func writeRealPayload(_ payload: Data, under root: URL, name: String) th return fileURL } +private func assertNoStagingDataCopies(under persistRoot: URL) throws { + let stagingRoot = persistRoot + .appendingPathComponent("data-gateway-client", isDirectory: true) + .appendingPathComponent("staging", isDirectory: true) + guard FileManager.default.fileExists(atPath: stagingRoot.path) else { + return + } + + let stagedItems = try FileManager.default.contentsOfDirectory( + at: stagingRoot, + includingPropertiesForKeys: nil + ) + #expect(stagedItems.isEmpty) +} + +private func realQiongcheConfigString( + deviceID: String, + clientConfig: DataGatewayClientConfig +) throws -> String { + let object: [String: Any] = [ + "device_id": deviceID, + "auth": try endpointJSONObject(clientConfig.authEndpoint), + "gateway": try endpointJSONObject(clientConfig.gatewayEndpoint), + "deviceInit": try endpointJSONObject(realDeviceInitEndpoint(clientConfig: clientConfig)), + ] + let data = try JSONSerialization.data(withJSONObject: object, options: [.sortedKeys]) + guard let string = String(data: data, encoding: .utf8) else { + throw DataGatewayClientError.invalidConfiguration("failed to encode real qiongche config") + } + return string +} + +private func endpointJSONObject(_ endpoint: URL) throws -> [String: Any] { + guard let scheme = endpoint.scheme?.lowercased(), !scheme.isEmpty else { + throw LocalStackHarnessError.invalidEndpoint(endpoint.absoluteString) + } + guard let host = endpoint.host(percentEncoded: false) ?? endpoint.host, !host.isEmpty else { + throw LocalStackHarnessError.invalidEndpoint(endpoint.absoluteString) + } + let port = endpoint.port ?? (scheme == "https" ? 443 : 80) + return ["scheme": scheme, "host": host, "port": port] +} + +private func realDeviceInitEndpoint(clientConfig: DataGatewayClientConfig) throws -> URL { + if let endpoint = try optionalURLFromEnvironment("DGW_REAL_INIT_ENDPOINT") { + return endpoint + } + if let endpoint = try optionalURLFromEnvironment("DGW_REAL_DEVICE_INIT_ENDPOINT") { + return endpoint + } + return clientConfig.gatewayEndpoint +} + private func canonicalObjectETag(_ value: String) -> String { let trimmed = value.trimmingCharacters(in: .whitespacesAndNewlines) if trimmed.count >= 2, trimmed.first == "\"", trimmed.last == "\"" { @@ -1143,6 +1194,17 @@ private func requiredURLFromEnvironment(_ key: String) throws -> URL { return url } +private func optionalURLFromEnvironment(_ key: String) throws -> URL? { + guard let value = ProcessInfo.processInfo.environment[key]?.trimmingCharacters(in: .whitespacesAndNewlines), + !value.isEmpty else { + return nil + } + guard let url = normalizedURLFromEnvironmentValue(value) else { + throw LocalStackHarnessError.invalidEndpoint(key) + } + return url +} + private func normalizedURLFromEnvironmentValue(_ value: String) -> URL? { if let url = URL(string: value), url.host?.isEmpty == false { return url diff --git a/Tests/DataGatewayClientIntegrationTests/QiongcheDataGatewaySDKTests.swift b/Tests/DataGatewayClientIntegrationTests/QiongcheDataGatewaySDKTests.swift index 5a78ba8..8aa5955 100644 --- a/Tests/DataGatewayClientIntegrationTests/QiongcheDataGatewaySDKTests.swift +++ b/Tests/DataGatewayClientIntegrationTests/QiongcheDataGatewaySDKTests.swift @@ -171,6 +171,36 @@ import Testing #expect(state.initializedAtUnix == 1_778_840_000) } +@Test func qiongcheSaveConfigAndInitFallsBackToReinitWhenAlreadyInitializedDetailIsMissing() async throws { + let root = try qiongcheTemporaryRoot() + let paths = try QiongcheSDKPaths(rootURL: root) + let reinitResponse = deviceInitResponse(apiKey: "credential-v3", tags: ["device": "robot-reinit-message"]) + let transport = SequencedDeviceInitTransport(outcomes: [ + .failure(.gatewayFailed( + statusCode: 9, + detailCode: nil, + message: "device has already been initialized; use explicit reinit" + )), + .success(reinitResponse), + ]) + let provisioner = DefaultQiongcheDeviceProvisioner(makeTransport: { _, _, _ in + QiongcheDeviceInitTransportHandle(serviceClient: transport, shutdown: {}) + }) + let sdk = try QiongcheDataGatewaySDK( + rootURL: root, + deviceProvisioner: provisioner, + clock: FixedQiongcheSDKClock(date: Date(timeIntervalSince1970: 1_778_840_001)) + ) + + try await sdk.saveConfigAndInit(configString: validQiongcheConfig(deviceID: "robot-002")) + + #expect(await transport.methods() == [.initDevice, .reinitDevice]) + #expect(try await ArchebaseConfigStore(configURL: paths.configURL).load() == (try ArchebaseConfig( + apiKey: "credential-v3", + tags: ["device": "robot-reinit-message"] + ))) +} + @Test func qiongcheSDKActorDefaultInitSucceedsWithTemporaryRoot() throws { _ = try QiongcheDataGatewaySDK(rootURL: qiongcheTemporaryRoot()) } diff --git a/Tests/DataGatewayClientIntegrationTests/UploadCoordinatorTests.swift b/Tests/DataGatewayClientIntegrationTests/UploadCoordinatorTests.swift index c3e3a2f..4079344 100644 --- a/Tests/DataGatewayClientIntegrationTests/UploadCoordinatorTests.swift +++ b/Tests/DataGatewayClientIntegrationTests/UploadCoordinatorTests.swift @@ -1,3 +1,4 @@ +@preconcurrency import AlibabaCloudOSS import DGWOss import DGWControlPlane import DGWProto @@ -14,7 +15,7 @@ import Testing #expect(!persistence.keepCompletedSnapshot) #expect(persistence.completedSnapshotTTL == .seconds(0)) #expect(persistence.terminalSnapshotTTL == .seconds(3600)) - #expect(persistence.copyExternalFileIntoManagedStaging) + #expect(!persistence.copyExternalFileIntoManagedStaging) let execution = UploadExecutionPolicy.recommended #expect(execution.maxRestartCount == 3) @@ -125,7 +126,10 @@ import Testing ) let logs = await logRecorder.events() - #expect(logs.contains(where: { $0.operation == "refresh_credentials" && $0.message == "[REDACTED]" })) + let containsRedactedCredentialLog = logs.contains { + $0.operation == "refresh_credentials" && $0.message == "[REDACTED]" + } + #expect(containsRedactedCredentialLog) let metrics = await metricRecorder.events() let recordedUploadPartMetric = metrics.contains { event in @@ -313,6 +317,7 @@ import Testing ]) #expect(await ossSession.initiateCalls() == 0) #expect(await ossSession.putObjectCalls() == [payload.count]) + #expect(await ossSession.putObjectKinds() == [.file]) #expect(await ossSession.uploadCalls().isEmpty) } @@ -454,6 +459,7 @@ import Testing #expect(await ossSession.initiateCalls() == 0) #expect(await ossSession.uploadCalls().isEmpty) #expect(await ossSession.putObjectCalls() == [payload.count]) + #expect(await ossSession.putObjectKinds() == [.file]) #expect(await ossSession.completeCalls().isEmpty) let pending = try await stateStore.listPendingUploads() @@ -523,19 +529,65 @@ import Testing #expect(result.ossObjectETag == "\"etag-small-object\"") #expect(await ossSession.initiateCalls() == 0) #expect(await ossSession.putObjectCalls() == [payload.count]) + #expect(await ossSession.putObjectKinds() == [.file]) #expect(await ossSession.uploadCalls().isEmpty) } +@Test func singleObjectUploadReadsOriginalFileBodyDirectly() async throws { + let root = FileManager.default.temporaryDirectory.appendingPathComponent("single-object-direct-\(UUID().uuidString)", isDirectory: true) + try FileManager.default.createDirectory(at: root, withIntermediateDirectories: true) + let sourceURL = root.appendingPathComponent("demo-small.bin") + let payload = Data("small-direct-file".utf8) + try payload.write(to: sourceURL) + let fileCoordinator = FileStagingCoordinator(stagingRoot: root.appendingPathComponent("staging", isDirectory: true)) + let stateStore = UploadStateStore( + persistRoot: root.appendingPathComponent("state", isDirectory: true), + fileManager: .default, + clock: FixedUploadCoordinatorStoreClock(now: Date(timeIntervalSince1970: 705)) + ) + let gatewayClient = MockUploadCoordinatorGatewayClient( + createResponse: makeCreateLogicalUploadResponse(uploadID: "upload-small-direct", objectKey: "objects/small-direct.bin", partSizeBytes: 1024), + recoveryResponse: makeContinueRecoveryResponse(currentUploadID: "upload-small-direct"), + reissueResponse: makeReissueResponse(uploadID: "upload-small-direct", credentials: makeCoordinatorUploadCredentials(expireAtUnix: 5_000, tokenSuffix: "fresh", objectKey: "objects/small-direct.bin", partSizeBytes: 1024)), + completeResponse: Archebase_DataGateway_V1_CompleteUploadResponse() + ) + let ossSession = MockOssUploadSession( + multipartUploadID: "multipart-small-direct", + uploadedParts: [], + completedETag: "\"etag-small-direct-object\"" + ) + let client = DataGatewayClient( + uploadCoordinator: UploadCoordinator( + executionPolicy: makeExecutionPolicy(), + dependencies: UploadCoordinatorDependencies( + gatewayClient: gatewayClient, + stateStore: stateStore, + fileCoordinator: fileCoordinator, + ossClientFactory: { _ in ossSession }, + clock: FixedUploadCoordinatorClock(now: Date(timeIntervalSince1970: 705)) + ) + ) + ) + + _ = try await client.upload( + UploadRequest(fileURL: sourceURL, clientHints: ["kind": "small"], rawTags: ["scene": "robot"], displayName: "small") + ) + + #expect(await ossSession.putObjectKinds() == [.file]) + #expect(await ossSession.putObjectBytes() == [payload]) +} + @Test func dataGatewayClientUploadHandlesMultipartFiles() async throws { let sourceURL = URL(fileURLWithPath: "/files/demo-multipart.bin") let payload = Data(repeating: 0x42, count: 24) let fileSystem = MemoryFileSystem(files: [ sourceURL: .file(size: UInt64(payload.count), modifiedAt: Date(timeIntervalSince1970: 140), data: payload), ]) + let securityAccessor = RecordingSecurityScopedAccessor() let fileCoordinator = FileStagingCoordinator( stagingRoot: URL(fileURLWithPath: "/staging"), fileSystem: fileSystem, - securityScopedAccessor: PassthroughSecurityScopedAccessor() + securityScopedAccessor: securityAccessor ) let stateStore = UploadStateStore( persistRoot: FileManager.default.temporaryDirectory.appendingPathComponent("data-gateway-client-e2-multipart-\(UUID().uuidString)"), @@ -582,6 +634,13 @@ import Testing UploadCall(multipartUploadID: "multipart-24", partNumber: 2, size: 8), UploadCall(multipartUploadID: "multipart-24", partNumber: 3, size: 8), ]) + #expect(await ossSession.uploadedBodyBytes() == [ + Data(payload[0 ..< 8]), + Data(payload[8 ..< 16]), + Data(payload[16 ..< 24]), + ]) + let bookmark = Data("bookmark:/files/demo-multipart.bin".utf8) + #expect(securityAccessor.accessRecords().filter { $0.bookmarkData == bookmark }.count >= 4) #expect(await ossSession.completeCalls() == [[1, 2, 3]]) #expect(await gatewayClient.completeInvocations() == [ CompleteInvocation( @@ -765,7 +824,7 @@ import Testing try await client.resumeUpload(logicalUploadID: "logical-resume") } - #expect(error == .resumeNotPossible("managed file missing: /missing/demo.bin")) + #expect(error == .resumeNotPossible("source file missing: /missing/demo.bin")) } @Test func resumeUploadFailsWhenFingerprintChanges() async throws { @@ -2381,6 +2440,7 @@ private actor RefreshAwareMockOssSession: UploadCoordinatorMultipartSessionProto private var refreshChecks = 0 private var refreshIndex = 0 private var putObjectInvocations: [Int] = [] + private var putObjectBodyKinds: [OssUploadBody.Kind] = [] init( multipartUploadID: String, @@ -2426,7 +2486,7 @@ private actor RefreshAwareMockOssSession: UploadCoordinatorMultipartSessionProto func uploadPart( multipartUploadID: String, partNumber: Int, - body: Data + body: OssUploadBody ) async throws -> UploadedPartDescriptor { _ = multipartUploadID _ = body @@ -2436,12 +2496,13 @@ private actor RefreshAwareMockOssSession: UploadCoordinatorMultipartSessionProto return descriptor } - func putObject(body: Data) async throws -> UploadedPartDescriptor { - self.putObjectInvocations.append(body.count) + func putObject(body: OssUploadBody) async throws -> UploadedPartDescriptor { + self.putObjectInvocations.append(Int(body.sizeBytes)) + self.putObjectBodyKinds.append(body.kind) return UploadedPartDescriptor( partNumber: 1, etag: self.completedETag, - size: Int64(body.count), + size: body.sizeBytes, lastModified: nil, hashCRC64: nil ) @@ -2472,6 +2533,10 @@ private actor RefreshAwareMockOssSession: UploadCoordinatorMultipartSessionProto func putObjectCalls() -> [Int] { self.putObjectInvocations } + + func putObjectKinds() -> [OssUploadBody.Kind] { + self.putObjectBodyKinds + } } private actor MockOssUploadSession: UploadCoordinatorMultipartSessionProtocol { @@ -2485,7 +2550,10 @@ private actor MockOssUploadSession: UploadCoordinatorMultipartSessionProtocol { private let headObjectError: DataGatewayClientError? private var initiateInvocations = 0 private var uploadInvocations: [UploadCall] = [] + private var uploadBodyBytes: [Data] = [] private var putObjectInvocations: [Int] = [] + private var putObjectBodyKinds: [OssUploadBody.Kind] = [] + private var putObjectBodyBytes: [Data] = [] private var completeInvocations: [[Int]] = [] init( @@ -2524,24 +2592,38 @@ private actor MockOssUploadSession: UploadCoordinatorMultipartSessionProtocol { func uploadPart( multipartUploadID: String, partNumber: Int, - body: Data + body: OssUploadBody ) async throws -> UploadedPartDescriptor { - self.uploadInvocations.append(UploadCall(multipartUploadID: multipartUploadID, partNumber: partNumber, size: body.count)) + if let bodyBytes = try readUploadBodyBytes(body) { + self.uploadBodyBytes.append(bodyBytes) + } + self.uploadInvocations.append( + UploadCall( + multipartUploadID: multipartUploadID, + partNumber: partNumber, + size: Int(body.sizeBytes), + bodyKind: body.kind + ) + ) guard let descriptor = self.uploadedParts.first(where: { $0.partNumber == partNumber }) else { fatalError("missing uploaded part fixture for partNumber=\(partNumber)") } return descriptor } - func putObject(body: Data) async throws -> UploadedPartDescriptor { - self.putObjectInvocations.append(body.count) + func putObject(body: OssUploadBody) async throws -> UploadedPartDescriptor { + self.putObjectInvocations.append(Int(body.sizeBytes)) + self.putObjectBodyKinds.append(body.kind) + if let bodyBytes = try readUploadBodyBytes(body) { + self.putObjectBodyBytes.append(bodyBytes) + } if let failOnPutObject { throw failOnPutObject } return UploadedPartDescriptor( partNumber: 1, etag: self.completedETag, - size: Int64(body.count), + size: body.sizeBytes, lastModified: nil, hashCRC64: nil ) @@ -2579,10 +2661,22 @@ private actor MockOssUploadSession: UploadCoordinatorMultipartSessionProtocol { self.uploadInvocations } + func uploadedBodyBytes() -> [Data] { + self.uploadBodyBytes + } + func putObjectCalls() -> [Int] { self.putObjectInvocations } + func putObjectKinds() -> [OssUploadBody.Kind] { + self.putObjectBodyKinds + } + + func putObjectBytes() -> [Data] { + self.putObjectBodyBytes + } + func completeCalls() -> [[Int]] { self.completeInvocations } @@ -2613,6 +2707,54 @@ private struct UploadCall: Equatable, Sendable { let multipartUploadID: String let partNumber: Int let size: Int + let bodyKind: OssUploadBody.Kind + + init( + multipartUploadID: String, + partNumber: Int, + size: Int, + bodyKind: OssUploadBody.Kind = .stream + ) { + self.multipartUploadID = multipartUploadID + self.partNumber = partNumber + self.size = size + self.bodyKind = bodyKind + } +} + +private func readUploadBodyBytes(_ body: OssUploadBody) throws -> Data? { + switch try body.byteStream() { + case .none: + return nil + case .data(let data): + return data + case .file(let fileURL): + guard FileManager.default.fileExists(atPath: fileURL.path) else { + return nil + } + return try Data(contentsOf: fileURL) + case .stream(let stream): + return try readInputStreamBytes(stream) + } +} + +private func readInputStreamBytes(_ stream: InputStream) throws -> Data { + var output = Data() + var buffer = [UInt8](repeating: 0, count: 4096) + + stream.open() + defer { stream.close() } + + while true { + let count = stream.read(&buffer, maxLength: buffer.count) + if count > 0 { + output.append(buffer, count: count) + } else if count == 0 { + return output + } else { + throw stream.streamError ?? CocoaError(.fileReadUnknown) + } + } } private struct FixedUploadCoordinatorClock: UploadCoordinatorClock {