diff --git a/Sources/DGWOss/OssMultipartClient.swift b/Sources/DGWOss/OssMultipartClient.swift index d13604c..c93b51c 100644 --- a/Sources/DGWOss/OssMultipartClient.swift +++ b/Sources/DGWOss/OssMultipartClient.swift @@ -129,6 +129,7 @@ package enum OssOperationError: Error, Sendable, Equatable { case invalidResponse(String) case clientFailure(code: String, message: String) case serverFailure(statusCode: Int, code: String, message: String, requestID: String, ec: String?) + case transportFailure(code: Int, message: String) case unexpected(String) } @@ -212,6 +213,14 @@ package struct OssUploadPartOutput: Sendable, Equatable { } } +package struct OssPutObjectOutput: Sendable, Equatable { + package let etag: String? + + package init(etag: String?) { + self.etag = etag + } +} + package struct OssCompleteMultipartUploadOutput: Sendable, Equatable { package let etag: String? @@ -275,6 +284,10 @@ package protocol AlibabaOSSSDKClientProtocol: Sendable { _ request: UploadPartRequest ) async throws -> OssUploadPartOutput + func putObject( + _ request: PutObjectRequest + ) async throws -> OssPutObjectOutput + func completeMultipartUpload( _ request: CompleteMultipartUploadRequest ) async throws -> OssCompleteMultipartUploadOutput @@ -313,6 +326,13 @@ package struct AlibabaOSSSDKClientAdapter: AlibabaOSSSDKClientProtocol { return OssUploadPartOutput(etag: result.etag) } + package func putObject( + _ request: PutObjectRequest + ) async throws -> OssPutObjectOutput { + let result = try await self.client.putObject(request) + return OssPutObjectOutput(etag: result.etag) + } + package func completeMultipartUpload( _ request: CompleteMultipartUploadRequest ) async throws -> OssCompleteMultipartUploadOutput { @@ -517,6 +537,8 @@ package enum OSSDataPlaneErrorMapper { return DataGatewayClientError.ossFailed(httpStatus: nil, ossCode: code, message: message) case .serverFailure(let statusCode, let code, let message, _, _): return DataGatewayClientError.ossFailed(httpStatus: statusCode, ossCode: code, message: message) + case .transportFailure(let code, let message): + return DataGatewayClientError.ossFailed(httpStatus: nil, ossCode: code.description, message: message) } } @@ -545,6 +567,13 @@ package enum OSSDataPlaneErrorMapper { return DataPlaneRetryClassification(action: .retry, httpStatus: statusCode, ossCode: code, message: message) } return DataPlaneRetryClassification(action: .fail, httpStatus: statusCode, ossCode: code, message: message) + case .transportFailure(let code, let message): + return DataPlaneRetryClassification( + action: Self.isRetriableTransportCode(code) ? .retry : .fail, + httpStatus: nil, + ossCode: code.description, + message: message + ) } } @@ -581,7 +610,12 @@ package enum OSSDataPlaneErrorMapper { } private static func isRetriableURLFailure(_ error: URLError) -> Bool { - switch error.code { + self.isRetriableTransportCode(error.code.rawValue) + } + + private static func isRetriableTransportCode(_ code: Int) -> Bool { + let urlErrorCode = URLError.Code(rawValue: code) + switch urlErrorCode { case .timedOut, .networkConnectionLost, .notConnectedToInternet, @@ -693,6 +727,11 @@ package protocol OssMultipartClientProtocol: Sendable { body: Data ) async throws -> UploadedPartDescriptor + func putObject( + objectKey: String, + body: Data + ) async throws -> UploadedPartDescriptor + func completeMultipartUpload( objectKey: String, multipartUploadID: String, @@ -771,6 +810,32 @@ package struct OssMultipartClient: OssMultipartClientProtocol { } } + package func putObject( + objectKey: String, + body: Data + ) async throws -> UploadedPartDescriptor { + do { + let request = PutObjectRequest( + bucket: self.configuration.bucket, + key: objectKey, + body: .data(body) + ) + let result = try await self.sdkClient.putObject(request) + guard let etag = result.etag?.nilIfBlank else { + throw OssOperationError.invalidResponse("PutObject response missing ETag") + } + return UploadedPartDescriptor( + partNumber: 1, + etag: etag, + size: Int64(body.count), + lastModified: nil, + hashCRC64: nil + ) + } catch { + throw Self.mapError(error) + } + } + package func completeMultipartUpload( objectKey: String, multipartUploadID: String, @@ -886,6 +951,9 @@ package struct OssMultipartClient: OssMultipartClientProtocol { ec: serverError.ec.nilIfBlank ) } + if let urlError = error as? URLError { + return .transportFailure(code: urlError.code.rawValue, message: urlError.localizedDescription) + } return .unexpected(String(describing: error)) } } @@ -1009,6 +1077,12 @@ package actor OssUploadSession { } } + package func putObject(body: Data) async throws -> UploadedPartDescriptor { + try await self.executeDataPlaneOperation { + try await self.performPutObject(body: body) + } + } + package func completeMultipartUpload( multipartUploadID: String, parts: [UploadedPartDescriptor] @@ -1086,6 +1160,13 @@ package actor OssUploadSession { ) } + private func performPutObject(body: Data) async throws -> UploadedPartDescriptor { + try await self.client.putObject( + objectKey: self.context.objectKey, + body: body + ) + } + private func performCompleteMultipartUpload( multipartUploadID: String, parts: [UploadedPartDescriptor] diff --git a/Sources/DataGatewayClient/FilePreparation.swift b/Sources/DataGatewayClient/FilePreparation.swift index ec3a89b..04a80e7 100644 --- a/Sources/DataGatewayClient/FilePreparation.swift +++ b/Sources/DataGatewayClient/FilePreparation.swift @@ -898,6 +898,11 @@ package enum ReconcileRemotePartsDecision: Sendable, Equatable { case restartUpload } +private struct DataPlaneUploadCompletion: Sendable, Equatable { + let completedPartCount: Int32 + let ossObjectETag: String +} + package protocol UploadCoordinatorClock: Sendable { func now() async -> Date } @@ -948,6 +953,7 @@ package protocol UploadCoordinatorMultipartSessionProtocol: Sendable { partNumber: Int, body: Data ) async throws -> UploadedPartDescriptor + func putObject(body: Data) async throws -> UploadedPartDescriptor func listParts(multipartUploadID: String) async throws -> [UploadedPartDescriptor] func headObjectETag() async throws -> String func completeMultipartUpload( @@ -1036,86 +1042,29 @@ public actor UploadCoordinator { credentials: createResponse.credentials ) let ossSession = try self.dependencies.ossClientFactory(uploadContext) - await self.emitLog(operation: "upload", uploadID: createResponse.uploadID, logicalUploadID: createResponse.logicalUploadID, phase: "multipart_initiated", message: "created logical upload") - - await onEvent?(.initiatingMultipart(uploadID: createResponse.uploadID)) - let multipartUploadID = try await ossSession.initiateMultipartUpload() - - var multipartState = persistedState - multipartState.multipartUploadID = multipartUploadID - multipartState.phase = .multipartInitiated - multipartState.updatedAt = await self.dependencies.clock.now() - try await self.dependencies.stateStore.saveActive(multipartState) - - let partSize = Int(uploadContext.partSizeBytes) - let parts = try self.loadFileParts(from: preparedFile.managedFileURL, partSize: partSize) - - var uploadedParts: [PersistedUploadedPart] = [] - var uploadedDescriptors: [UploadedPartDescriptor] = [] - var offset: UInt64 = 0 - - for (index, part) in parts.enumerated() { - try await self.refreshUploadSessionIfNeeded(session: ossSession, state: &multipartState, onEvent: onEvent) - let partNumber = index + 1 - await onEvent?(.uploadingPart(partNumber: partNumber, sentBytes: UInt64(part.count), totalBytes: preparedFile.fileSize)) - await self.emitMetric("upload_part", dimensions: ["upload_id": createResponse.uploadID, "part_number": String(partNumber)]) - - let descriptor = try await ossSession.uploadPart( - multipartUploadID: multipartUploadID, - partNumber: partNumber, - body: part + await self.emitLog(operation: "upload", uploadID: createResponse.uploadID, logicalUploadID: createResponse.logicalUploadID, phase: "session_created", message: "created logical upload") + + var uploadState = persistedState + let uploadCompletion: DataPlaneUploadCompletion + if Self.shouldUseSingleObjectUpload(fileSize: uploadState.fileSize, partSizeBytes: uploadState.partSizeBytes) { + uploadCompletion = try await self.uploadSingleObject( + state: &uploadState, + session: ossSession, + onEvent: onEvent ) - uploadedDescriptors.append(descriptor) - uploadedParts.append( - PersistedUploadedPart( - partNumber: partNumber, - etag: descriptor.etag, - offsetStart: offset, - partSize: UInt64(part.count), - md5Hex: Self.md5Hex(part) - ) + } else { + uploadCompletion = try await self.uploadMultipart( + state: &uploadState, + session: ossSession, + existingMultipartUploadID: nil, + onEvent: onEvent ) - offset += UInt64(part.count) - - multipartState.phase = .uploading - multipartState.uploadedParts = uploadedParts - multipartState.updatedAt = await self.dependencies.clock.now() - try await self.dependencies.stateStore.saveActive(multipartState) } - try await self.refreshUploadSessionIfNeeded(session: ossSession, state: &multipartState, onEvent: onEvent) - await onEvent?(.completingMultipart(uploadID: createResponse.uploadID)) - let ossObjectETag = try await ossSession.completeMultipartUpload( - multipartUploadID: multipartUploadID, - parts: uploadedDescriptors - ) - - multipartState.phase = .multipartCompleted - multipartState.updatedAt = await self.dependencies.clock.now() - try await self.dependencies.stateStore.saveActive(multipartState) - - await onEvent?(.completingBusinessUpload(uploadID: createResponse.uploadID)) - _ = try await self.dependencies.gatewayClient.completeUpload( - uploadID: createResponse.uploadID, - fileSize: Int64(preparedFile.fileSize), - rawTags: request.rawTags, - completedPartCount: Int32(uploadedDescriptors.count), - ossObjectEtag: ossObjectETag, - partSizeBytes: createResponse.credentials.partSizeBytes - ) - - let completedAt = await self.dependencies.clock.now() - multipartState.phase = .businessCompleting - multipartState.updatedAt = completedAt - try await self.dependencies.stateStore.moveToCompleted(multipartState) - - let result = UploadResult( - logicalUploadID: createResponse.logicalUploadID, - uploadID: createResponse.uploadID, - bucket: createResponse.credentials.bucket, - objectKey: createResponse.credentials.objectKey, - fileSize: preparedFile.fileSize, - ossObjectETag: ossObjectETag + let result = try await self.completeBusinessUpload( + state: &uploadState, + uploadCompletion: uploadCompletion, + onEvent: onEvent ) await onEvent?(.completed(result)) await self.emitLog(operation: "upload", uploadID: result.uploadID, logicalUploadID: result.logicalUploadID, phase: "completed", message: "upload completed") @@ -1245,6 +1194,31 @@ public actor UploadCoordinator { resumedState.updatedAt = await self.dependencies.clock.now() try await self.dependencies.stateStore.saveActive(resumedState) + if Self.isPersistedSingleObjectCompleted(resumedState) { + return try await self.completePersistedSingleObjectOrRestart( + state: resumedState, + session: ossSession, + onEvent: onEvent + ) + } + + if Self.shouldUseSingleObjectUpload(fileSize: resumedState.fileSize, partSizeBytes: resumedState.partSizeBytes), + resumedState.multipartUploadID?.nilIfBlank == nil, + resumedState.uploadedParts.isEmpty { + let uploadCompletion = try await self.uploadSingleObject( + state: &resumedState, + session: ossSession, + onEvent: onEvent + ) + let result = try await self.completeBusinessUpload( + state: &resumedState, + uploadCompletion: uploadCompletion, + onEvent: onEvent + ) + await onEvent?(.completed(result)) + return result + } + if self.executionPolicy.reconcileRemotePartsOnResume, let existingMultipartUploadID = resumedState.multipartUploadID?.nilIfBlank, resumedState.phase == .multipartInitiated || resumedState.phase == .uploading { @@ -1263,93 +1237,16 @@ public actor UploadCoordinator { } } - let multipartUploadID: String - if let existingMultipartUploadID = resumedState.multipartUploadID?.nilIfBlank { - multipartUploadID = existingMultipartUploadID - } else { - await onEvent?(.initiatingMultipart(uploadID: resumedState.uploadID)) - multipartUploadID = try await ossSession.initiateMultipartUpload() - resumedState.multipartUploadID = multipartUploadID - resumedState.phase = .multipartInitiated - resumedState.updatedAt = await self.dependencies.clock.now() - try await self.dependencies.stateStore.saveActive(resumedState) - } - - let partSize = Int(uploadContext.partSizeBytes) - let parts = try self.loadFileParts(from: resumedState.managedFileURL, partSize: partSize) - var persistedPartsByNumber = Dictionary(uniqueKeysWithValues: resumedState.uploadedParts.map { ($0.partNumber, $0) }) - var uploadedDescriptors = resumedState.uploadedParts - .sorted(by: { $0.partNumber < $1.partNumber }) - .map { - UploadedPartDescriptor( - partNumber: $0.partNumber, - etag: $0.etag, - size: Int64($0.partSize), - lastModified: nil, - hashCRC64: nil - ) - } - - for (index, part) in parts.enumerated() { - let partNumber = index + 1 - if persistedPartsByNumber[partNumber] != nil { - continue - } - - try await self.refreshUploadSessionIfNeeded(session: ossSession, state: &resumedState, onEvent: onEvent) - await onEvent?(.uploadingPart(partNumber: partNumber, sentBytes: UInt64(part.count), totalBytes: resumedState.fileSize)) - let descriptor = try await ossSession.uploadPart( - multipartUploadID: multipartUploadID, - partNumber: partNumber, - body: part - ) - uploadedDescriptors.append(descriptor) - persistedPartsByNumber[partNumber] = PersistedUploadedPart( - partNumber: partNumber, - etag: descriptor.etag, - offsetStart: UInt64(index * partSize), - partSize: UInt64(part.count), - md5Hex: Self.md5Hex(part) - ) - resumedState.uploadedParts = persistedPartsByNumber.values.sorted(by: { $0.partNumber < $1.partNumber }) - resumedState.phase = .uploading - resumedState.updatedAt = await self.dependencies.clock.now() - try await self.dependencies.stateStore.saveActive(resumedState) - } - - uploadedDescriptors.sort(by: { $0.partNumber < $1.partNumber }) - try await self.refreshUploadSessionIfNeeded(session: ossSession, state: &resumedState, onEvent: onEvent) - await onEvent?(.completingMultipart(uploadID: resumedState.uploadID)) - let ossObjectETag = try await ossSession.completeMultipartUpload( - multipartUploadID: multipartUploadID, - parts: uploadedDescriptors + let uploadCompletion = try await self.uploadMultipart( + state: &resumedState, + session: ossSession, + existingMultipartUploadID: resumedState.multipartUploadID?.nilIfBlank, + onEvent: onEvent ) - - resumedState.phase = .multipartCompleted - resumedState.updatedAt = await self.dependencies.clock.now() - try await self.dependencies.stateStore.saveActive(resumedState) - - await onEvent?(.completingBusinessUpload(uploadID: resumedState.uploadID)) - _ = try await self.dependencies.gatewayClient.completeUpload( - uploadID: resumedState.uploadID, - fileSize: Int64(resumedState.fileSize), - rawTags: resumedState.rawTags, - completedPartCount: Int32(uploadedDescriptors.count), - ossObjectEtag: ossObjectETag, - partSizeBytes: Int64(resumedState.partSizeBytes) - ) - - resumedState.phase = .businessCompleting - resumedState.updatedAt = await self.dependencies.clock.now() - try await self.dependencies.stateStore.moveToCompleted(resumedState) - - let result = UploadResult( - logicalUploadID: resumedState.logicalUploadID, - uploadID: resumedState.uploadID, - bucket: resumedState.bucket, - objectKey: resumedState.objectKey, - fileSize: resumedState.fileSize, - ossObjectETag: ossObjectETag + let result = try await self.completeBusinessUpload( + state: &resumedState, + uploadCompletion: uploadCompletion, + onEvent: onEvent ) await onEvent?(.completed(result)) return result @@ -1376,6 +1273,24 @@ public actor UploadCoordinator { ) let ossSession = try self.dependencies.ossClientFactory(uploadContext) + var resumedState = state + resumedState.uploadID = refreshedCredentials.uploadID.nilIfBlank ?? uploadID + resumedState.bucket = refreshedCredentials.credentials.bucket + resumedState.endpoint = refreshedCredentials.credentials.endpoint + resumedState.objectKey = refreshedCredentials.credentials.objectKey + resumedState.partSizeBytes = UInt64(refreshedCredentials.credentials.partSizeBytes) + resumedState.lastKnownSTSExpireAt = Self.makeDate(fromUnix: refreshedCredentials.credentials.stsExpireAtUnix) + resumedState.updatedAt = await self.dependencies.clock.now() + try await self.dependencies.stateStore.saveActive(resumedState) + + if Self.isPersistedSingleObjectCompleted(resumedState) { + return try await self.completePersistedSingleObjectOrRestart( + state: resumedState, + session: ossSession, + onEvent: onEvent + ) + } + guard let expectedObjectETag = expectedObjectETag?.trimmingCharacters(in: .whitespacesAndNewlines), !expectedObjectETag.isEmpty else { throw DataGatewayClientError.uploadRestartExceeded } @@ -1391,20 +1306,10 @@ public actor UploadCoordinator { throw error } - if remoteETag != expectedObjectETag { + if !Self.etagsMatch(remoteETag, expectedObjectETag) { throw DataGatewayClientError.uploadRestartExceeded } - var resumedState = state - resumedState.uploadID = refreshedCredentials.uploadID.nilIfBlank ?? uploadID - resumedState.bucket = refreshedCredentials.credentials.bucket - resumedState.endpoint = refreshedCredentials.credentials.endpoint - resumedState.objectKey = refreshedCredentials.credentials.objectKey - resumedState.partSizeBytes = UInt64(refreshedCredentials.credentials.partSizeBytes) - resumedState.lastKnownSTSExpireAt = Self.makeDate(fromUnix: refreshedCredentials.credentials.stsExpireAtUnix) - resumedState.updatedAt = await self.dependencies.clock.now() - try await self.dependencies.stateStore.saveActive(resumedState) - await onEvent?(.completingBusinessUpload(uploadID: resumedState.uploadID)) _ = try await self.dependencies.gatewayClient.completeUpload( uploadID: resumedState.uploadID, @@ -1482,83 +1387,202 @@ public actor UploadCoordinator { ) let ossSession = try self.dependencies.ossClientFactory(uploadContext) - await onEvent?(.initiatingMultipart(uploadID: createResponse.uploadID)) - let multipartUploadID = try await ossSession.initiateMultipartUpload() - restartedState.multipartUploadID = multipartUploadID - restartedState.phase = .multipartInitiated - restartedState.updatedAt = await self.dependencies.clock.now() - try await self.dependencies.stateStore.saveActive(restartedState) + let uploadCompletion: DataPlaneUploadCompletion + if Self.shouldUseSingleObjectUpload(fileSize: restartedState.fileSize, partSizeBytes: restartedState.partSizeBytes) { + uploadCompletion = try await self.uploadSingleObject( + state: &restartedState, + session: ossSession, + onEvent: onEvent + ) + } else { + uploadCompletion = try await self.uploadMultipart( + state: &restartedState, + session: ossSession, + existingMultipartUploadID: nil, + onEvent: onEvent + ) + } - let partSize = Int(uploadContext.partSizeBytes) - let parts = try self.loadFileParts(from: restartedState.managedFileURL, partSize: partSize) - var uploadedParts: [PersistedUploadedPart] = [] - var uploadedDescriptors: [UploadedPartDescriptor] = [] - var offset: UInt64 = 0 + let result = try await self.completeBusinessUpload( + state: &restartedState, + uploadCompletion: uploadCompletion, + onEvent: onEvent + ) + await onEvent?(.completed(result)) + return result + } + + private func uploadSingleObject( + state: inout PersistedUploadState, + session: any UploadCoordinatorMultipartSessionProtocol, + onEvent: (@Sendable (UploadEvent) async -> Void)? + ) async throws -> DataPlaneUploadCompletion { + try await self.refreshUploadSessionIfNeeded(session: session, state: &state, onEvent: onEvent) + + let body = try self.dependencies.fileCoordinator.readAll(from: state.managedFileURL) + await onEvent?(.uploadingPart(partNumber: 1, sentBytes: UInt64(body.count), totalBytes: state.fileSize)) + await self.emitMetric("upload_part", dimensions: ["upload_id": state.uploadID, "part_number": "1"]) + + let descriptor = try await session.putObject(body: body) + state.multipartUploadID = nil + state.uploadedParts = [ + PersistedUploadedPart( + partNumber: 1, + etag: descriptor.etag, + offsetStart: 0, + partSize: UInt64(body.count), + md5Hex: Self.md5Hex(body) + ), + ] + state.phase = .multipartCompleted + state.updatedAt = await self.dependencies.clock.now() + try await self.dependencies.stateStore.saveActive(state) + + return DataPlaneUploadCompletion(completedPartCount: 1, ossObjectETag: descriptor.etag) + } + + private func uploadMultipart( + state: inout PersistedUploadState, + session: any UploadCoordinatorMultipartSessionProtocol, + existingMultipartUploadID: String?, + onEvent: (@Sendable (UploadEvent) async -> Void)? + ) async throws -> DataPlaneUploadCompletion { + let multipartUploadID: String + if let existingMultipartUploadID { + multipartUploadID = existingMultipartUploadID + } else { + await onEvent?(.initiatingMultipart(uploadID: state.uploadID)) + multipartUploadID = try await session.initiateMultipartUpload() + state.multipartUploadID = multipartUploadID + state.phase = .multipartInitiated + state.updatedAt = await self.dependencies.clock.now() + try await self.dependencies.stateStore.saveActive(state) + } + + let partSize = Int(state.partSizeBytes) + let parts = try self.loadFileParts(from: state.managedFileURL, partSize: partSize) + var persistedPartsByNumber = Dictionary(uniqueKeysWithValues: state.uploadedParts.map { ($0.partNumber, $0) }) + var uploadedDescriptors = state.uploadedParts + .sorted(by: { $0.partNumber < $1.partNumber }) + .map { + UploadedPartDescriptor( + partNumber: $0.partNumber, + etag: $0.etag, + size: Int64($0.partSize), + lastModified: nil, + hashCRC64: nil + ) + } for (index, part) in parts.enumerated() { - try await self.refreshUploadSessionIfNeeded(session: ossSession, state: &restartedState, onEvent: onEvent) let partNumber = index + 1 - await onEvent?(.uploadingPart(partNumber: partNumber, sentBytes: UInt64(part.count), totalBytes: restartedState.fileSize)) - let descriptor = try await ossSession.uploadPart( + if persistedPartsByNumber[partNumber] != nil { + continue + } + + try await self.refreshUploadSessionIfNeeded(session: session, state: &state, onEvent: onEvent) + await onEvent?(.uploadingPart(partNumber: partNumber, sentBytes: UInt64(part.count), totalBytes: state.fileSize)) + await self.emitMetric("upload_part", dimensions: ["upload_id": state.uploadID, "part_number": String(partNumber)]) + + let descriptor = try await session.uploadPart( multipartUploadID: multipartUploadID, partNumber: partNumber, body: part ) uploadedDescriptors.append(descriptor) - uploadedParts.append( - PersistedUploadedPart( - partNumber: partNumber, - etag: descriptor.etag, - offsetStart: offset, - partSize: UInt64(part.count), - md5Hex: Self.md5Hex(part) - ) + persistedPartsByNumber[partNumber] = PersistedUploadedPart( + partNumber: partNumber, + etag: descriptor.etag, + offsetStart: UInt64(index * partSize), + partSize: UInt64(part.count), + md5Hex: Self.md5Hex(part) ) - offset += UInt64(part.count) - - restartedState.phase = .uploading - restartedState.uploadedParts = uploadedParts - restartedState.updatedAt = await self.dependencies.clock.now() - try await self.dependencies.stateStore.saveActive(restartedState) + state.uploadedParts = persistedPartsByNumber.values.sorted(by: { $0.partNumber < $1.partNumber }) + state.phase = .uploading + state.updatedAt = await self.dependencies.clock.now() + try await self.dependencies.stateStore.saveActive(state) } - try await self.refreshUploadSessionIfNeeded(session: ossSession, state: &restartedState, onEvent: onEvent) - await onEvent?(.completingMultipart(uploadID: restartedState.uploadID)) - let ossObjectETag = try await ossSession.completeMultipartUpload( + uploadedDescriptors.sort(by: { $0.partNumber < $1.partNumber }) + try await self.refreshUploadSessionIfNeeded(session: session, state: &state, onEvent: onEvent) + await onEvent?(.completingMultipart(uploadID: state.uploadID)) + let ossObjectETag = try await session.completeMultipartUpload( multipartUploadID: multipartUploadID, parts: uploadedDescriptors ) - restartedState.phase = .multipartCompleted - restartedState.updatedAt = await self.dependencies.clock.now() - try await self.dependencies.stateStore.saveActive(restartedState) + state.phase = .multipartCompleted + state.updatedAt = await self.dependencies.clock.now() + try await self.dependencies.stateStore.saveActive(state) - await onEvent?(.completingBusinessUpload(uploadID: restartedState.uploadID)) - _ = try await self.dependencies.gatewayClient.completeUpload( - uploadID: restartedState.uploadID, - fileSize: Int64(restartedState.fileSize), - rawTags: restartedState.rawTags, + return DataPlaneUploadCompletion( completedPartCount: Int32(uploadedDescriptors.count), - ossObjectEtag: ossObjectETag, - partSizeBytes: Int64(restartedState.partSizeBytes) + ossObjectETag: ossObjectETag ) + } - restartedState.phase = .businessCompleting - restartedState.updatedAt = await self.dependencies.clock.now() - try await self.dependencies.stateStore.moveToCompleted(restartedState) + private func completePersistedSingleObjectOrRestart( + state: PersistedUploadState, + session: any UploadCoordinatorMultipartSessionProtocol, + onEvent: (@Sendable (UploadEvent) async -> Void)? + ) async throws -> UploadResult { + guard let persistedETag = state.uploadedParts.first?.etag.nilIfBlank else { + return try await self.restartUpload(state: state, onEvent: onEvent) + } - let result = UploadResult( - logicalUploadID: restartedState.logicalUploadID, - uploadID: restartedState.uploadID, - bucket: restartedState.bucket, - objectKey: restartedState.objectKey, - fileSize: restartedState.fileSize, - ossObjectETag: ossObjectETag + let remoteETag: String + do { + remoteETag = try await session.headObjectETag() + } catch let error as DataGatewayClientError { + if Self.isObjectMissing(error) { + return try await self.restartUpload(state: state, onEvent: onEvent) + } + throw error + } + + guard Self.etagsMatch(remoteETag, persistedETag) else { + return try await self.restartUpload(state: state, onEvent: onEvent) + } + + var resumedState = state + let result = try await self.completeBusinessUpload( + state: &resumedState, + uploadCompletion: DataPlaneUploadCompletion(completedPartCount: 1, ossObjectETag: remoteETag), + onEvent: onEvent ) await onEvent?(.completed(result)) return result } + private func completeBusinessUpload( + state: inout PersistedUploadState, + uploadCompletion: DataPlaneUploadCompletion, + onEvent: (@Sendable (UploadEvent) async -> Void)? + ) async throws -> UploadResult { + await onEvent?(.completingBusinessUpload(uploadID: state.uploadID)) + _ = try await self.dependencies.gatewayClient.completeUpload( + uploadID: state.uploadID, + fileSize: Int64(state.fileSize), + rawTags: state.rawTags, + completedPartCount: uploadCompletion.completedPartCount, + ossObjectEtag: uploadCompletion.ossObjectETag, + partSizeBytes: Int64(state.partSizeBytes) + ) + + state.phase = .businessCompleting + state.updatedAt = await self.dependencies.clock.now() + try await self.dependencies.stateStore.moveToCompleted(state) + + return UploadResult( + logicalUploadID: state.logicalUploadID, + uploadID: state.uploadID, + bucket: state.bucket, + objectKey: state.objectKey, + fileSize: state.fileSize, + ossObjectETag: uploadCompletion.ossObjectETag + ) + } + private func refreshUploadSessionIfNeeded( session: any UploadCoordinatorMultipartSessionProtocol, state: inout PersistedUploadState, @@ -1634,6 +1658,36 @@ public actor UploadCoordinator { return Date(timeIntervalSince1970: TimeInterval(unix)) } + private static func shouldUseSingleObjectUpload(fileSize: UInt64, partSizeBytes: UInt64) -> Bool { + fileSize <= partSizeBytes + } + + private static func isPersistedSingleObjectCompleted(_ state: PersistedUploadState) -> Bool { + state.phase == .multipartCompleted + && state.multipartUploadID?.nilIfBlank == nil + && state.uploadedParts.count == 1 + } + + private static func isObjectMissing(_ error: DataGatewayClientError) -> Bool { + guard case .ossFailed(let httpStatus, let ossCode, _) = error else { + return false + } + return httpStatus == 404 || ossCode == "NotFound" || ossCode == "NoSuchKey" + } + + private static func etagsMatch(_ lhs: String, _ rhs: String) -> Bool { + self.canonicalETag(lhs) == self.canonicalETag(rhs) + } + + private static func canonicalETag(_ value: String) -> String { + var trimmed = value.trimmingCharacters(in: .whitespacesAndNewlines) + if trimmed.hasPrefix("\""), trimmed.hasSuffix("\""), trimmed.count >= 2 { + trimmed.removeFirst() + trimmed.removeLast() + } + return trimmed.lowercased() + } + package static func decideResumeAction( state: PersistedUploadState, recovery: Archebase_DataGateway_V1_GetUploadRecoveryResponse diff --git a/Sources/DataGatewayClient/TestHarnessSupport.swift b/Sources/DataGatewayClient/TestHarnessSupport.swift index ba96b64..b9db848 100644 --- a/Sources/DataGatewayClient/TestHarnessSupport.swift +++ b/Sources/DataGatewayClient/TestHarnessSupport.swift @@ -168,6 +168,7 @@ package struct LocalStackTestEnvironment: Sendable { /// Errors raised while validating the Aliyun OSS test harness environment. package enum AliyunOSSHarnessError: Error, Sendable, Equatable { case missingEnvironmentVariable(String) + case invalidEnvironmentVariable(String) } /// Expected object metadata for remote upload assertions. @@ -227,11 +228,12 @@ package struct AliyunOSSTestEnvironment: Sendable { endpointsJSON: try self.publicEndpointsJSON(), endpointsURL: endpointsURL ) - return try DataGatewayClientConfig.recommended( + let config = try DataGatewayClientConfig.recommended( credentialBase64: credentialBase64, persistRootURL: persistRoot, endpointsURL: endpointsURL ) + return try self.applyRemoteRequestTimeoutOverride(to: config) } let authEndpoint = try self.requiredURL(for: "DGW_REAL_AUTH_ENDPOINT") @@ -251,13 +253,33 @@ package struct AliyunOSSTestEnvironment: Sendable { tls = authEndpoint.scheme?.lowercased() == "https" ? .tls : .plaintext } - return DataGatewayClientConfig.testRecommended( + let config = DataGatewayClientConfig.testRecommended( authEndpoint: authEndpoint, gatewayEndpoint: gatewayEndpoint, credentialBase64: credentialBase64, persistRootURL: persistRoot, tls: tls ) + return try self.applyRemoteRequestTimeoutOverride(to: config) + } + + private func applyRemoteRequestTimeoutOverride(to config: DataGatewayClientConfig) throws -> DataGatewayClientConfig { + guard let seconds = try self.remoteRequestTimeoutSeconds() else { + return config + } + var copy = config + copy.requestTimeout = .seconds(seconds) + return copy + } + + private func remoteRequestTimeoutSeconds() throws -> Int64? { + guard let value = self.environment["DGW_REAL_REQUEST_TIMEOUT_SECONDS"]?.trimmedNonEmpty else { + return nil + } + guard let seconds = Int64(value), seconds > 0 else { + throw AliyunOSSHarnessError.invalidEnvironmentVariable("DGW_REAL_REQUEST_TIMEOUT_SECONDS") + } + return seconds } private func requiredValue(for key: String) throws -> String { @@ -378,6 +400,16 @@ package struct LocalStackMockMultipartSession: UploadCoordinatorMultipartSession ) } + package func putObject(body: Data) async throws -> UploadedPartDescriptor { + UploadedPartDescriptor( + partNumber: 1, + etag: self.completedETag, + size: Int64(body.count), + lastModified: nil, + hashCRC64: nil + ) + } + package func listParts(multipartUploadID: String) async throws -> [UploadedPartDescriptor] { _ = multipartUploadID return [] diff --git a/Tests/DGWOssTests/OssMultipartClientTests.swift b/Tests/DGWOssTests/OssMultipartClientTests.swift index 1def9ae..5554178 100644 --- a/Tests/DGWOssTests/OssMultipartClientTests.swift +++ b/Tests/DGWOssTests/OssMultipartClientTests.swift @@ -38,6 +38,10 @@ import Testing partNumber: 7, body: Data("abc".utf8) ) + _ = try await client.putObject( + objectKey: "objects/demo.bin", + body: Data("put".utf8) + ) _ = try await client.completeMultipartUpload( objectKey: "objects/demo.bin", multipartUploadID: "upload-1", @@ -63,6 +67,12 @@ import Testing #expect(uploadRequests[0].partNumber == 7) #expect(try uploadRequests[0].body?.readData() == Data("abc".utf8)) + let putRequests = await sdkClient.putRequests() + #expect(putRequests.count == 1) + #expect(putRequests[0].bucket == "bucket-1") + #expect(putRequests[0].key == "objects/demo.bin") + #expect(try putRequests[0].body?.readData() == Data("put".utf8)) + let completeRequests = await sdkClient.completeRequests() #expect(completeRequests.count == 1) #expect(completeRequests[0].bucket == "bucket-1") @@ -131,6 +141,18 @@ import Testing hashCRC64: nil )) + let putObject = try await client.putObject( + objectKey: "objects/demo.bin", + body: Data("put-body".utf8) + ) + #expect(putObject == UploadedPartDescriptor( + partNumber: 1, + etag: "\"etag-put\"", + size: 8, + lastModified: nil, + hashCRC64: nil + )) + let etag = try await client.completeMultipartUpload( objectKey: "objects/demo.bin", multipartUploadID: "upload-1", @@ -180,6 +202,45 @@ import Testing #expect(parts.map(\.etag) == ["\"etag-1\"", "\"etag-2\""]) } +@Test func putObjectMissingETagFailsInvalidResponse() async throws { + let sdkClient = MockAlibabaOSSSDKClient( + initiateValue: OssInitiateMultipartUploadOutput(uploadID: nil), + uploadPartValue: OssUploadPartOutput(etag: nil), + putValue: OssPutObjectOutput(etag: nil), + completeValue: OssCompleteMultipartUploadOutput(etag: nil), + listValues: [], + headValue: OssHeadObjectOutput(etag: nil) + ) + let client = try OssMultipartClient(configuration: makeConfiguration(), sdkClient: sdkClient) + + let error = await #expect(throws: OssOperationError.self) { + try await client.putObject(objectKey: "objects/demo.bin", body: Data("body".utf8)) + } + + #expect(error == .invalidResponse("PutObject response missing ETag")) +} + +@Test func putObjectURLErrorClassifiesAsRetriableTransportFailure() async throws { + let sdkClient = MockAlibabaOSSSDKClient( + initiateValue: OssInitiateMultipartUploadOutput(uploadID: nil), + uploadPartValue: OssUploadPartOutput(etag: nil), + putError: URLError(.timedOut), + completeValue: OssCompleteMultipartUploadOutput(etag: nil), + listValues: [], + headValue: OssHeadObjectOutput(etag: nil) + ) + let client = try OssMultipartClient(configuration: makeConfiguration(), sdkClient: sdkClient) + + let error = await #expect(throws: OssOperationError.self) { + try await client.putObject(objectKey: "objects/demo.bin", body: Data("body".utf8)) + } + + #expect(error == .transportFailure(code: URLError.Code.timedOut.rawValue, message: URLError(.timedOut).localizedDescription)) + if let error { + #expect(OSSDataPlaneErrorMapper.classify(error).action == .retry) + } +} + @Test func ttlLowTriggersClientRebuild() async throws { let initialClient = RecordingMultipartClient(identifier: "initial") let refreshedClient = RecordingMultipartClient( @@ -248,6 +309,13 @@ import Testing lastModified: nil, hashCRC64: nil ), + putObjectResult: UploadedPartDescriptor( + partNumber: 1, + etag: "\"etag-put\"", + size: 4, + lastModified: nil, + hashCRC64: nil + ), completeResult: "\"etag-complete\"", headObjectETagResult: "\"etag-head\"" ) @@ -280,6 +348,7 @@ import Testing partNumber: 2, body: Data("data".utf8) ) + let putObject = try await session.putObject(body: Data("blob".utf8)) let completeETag = try await session.completeMultipartUpload( multipartUploadID: "multipart-1", parts: [uploadedPart] @@ -287,11 +356,13 @@ import Testing let headETag = try await session.headObjectETag() #expect(uploadedPart.etag == "\"etag-2\"") + #expect(putObject.etag == "\"etag-put\"") #expect(completeETag == "\"etag-complete\"") #expect(headETag == "\"etag-head\"") #expect(await provider.requestedUploadIDs() == ["upload-1"]) #expect(await initialClient.uploadPartCalls().isEmpty) #expect(await refreshedClient.uploadPartCalls() == ["multipart-1:2"]) + #expect(await refreshedClient.putObjectCalls() == ["objects/demo.bin:4"]) #expect(await refreshedClient.completeCalls() == [[2]]) #expect(await refreshedClient.headObjectCalls() == ["objects/demo.bin"]) } @@ -444,12 +515,15 @@ import Testing private actor MockAlibabaOSSSDKClient: AlibabaOSSSDKClientProtocol { private let initiateValue: OssInitiateMultipartUploadOutput private let uploadPartValue: OssUploadPartOutput + private let putValue: OssPutObjectOutput + private let putError: (any Error)? private let completeValue: OssCompleteMultipartUploadOutput private let listValues: [OssListPartsPage] private let headValue: OssHeadObjectOutput private var recordedInitiateRequests: [InitiateMultipartUploadRequest] = [] private var recordedUploadRequests: [UploadPartRequest] = [] + private var recordedPutRequests: [PutObjectRequest] = [] private var recordedCompleteRequests: [CompleteMultipartUploadRequest] = [] private var recordedAbortRequests: [AbortMultipartUploadRequest] = [] private var recordedListRequests: [ListPartsRequest] = [] @@ -458,12 +532,16 @@ private actor MockAlibabaOSSSDKClient: AlibabaOSSSDKClientProtocol { init( initiateValue: OssInitiateMultipartUploadOutput, uploadPartValue: OssUploadPartOutput, + putValue: OssPutObjectOutput = OssPutObjectOutput(etag: "\"etag-put\""), + putError: (any Error)? = nil, completeValue: OssCompleteMultipartUploadOutput, listValues: [OssListPartsPage], headValue: OssHeadObjectOutput ) { self.initiateValue = initiateValue self.uploadPartValue = uploadPartValue + self.putValue = putValue + self.putError = putError self.completeValue = completeValue self.listValues = listValues self.headValue = headValue @@ -483,6 +561,16 @@ private actor MockAlibabaOSSSDKClient: AlibabaOSSSDKClientProtocol { return self.uploadPartValue } + func putObject( + _ request: PutObjectRequest + ) async throws -> OssPutObjectOutput { + self.recordedPutRequests.append(request) + if let putError { + throw putError + } + return self.putValue + } + func completeMultipartUpload( _ request: CompleteMultipartUploadRequest ) async throws -> OssCompleteMultipartUploadOutput { @@ -518,6 +606,10 @@ private actor MockAlibabaOSSSDKClient: AlibabaOSSSDKClientProtocol { self.recordedUploadRequests } + func putRequests() -> [PutObjectRequest] { + self.recordedPutRequests + } + func completeRequests() -> [CompleteMultipartUploadRequest] { self.recordedCompleteRequests } @@ -539,11 +631,13 @@ private actor RecordingMultipartClient: OssMultipartClientProtocol { private let identifier: String private let initiateResult: String private let uploadPartResult: UploadedPartDescriptor + private let putObjectResult: UploadedPartDescriptor private let completeResult: String private let listPartsResult: [UploadedPartDescriptor] private let headObjectETagResult: String private var recordedUploadPartCalls: [String] = [] + private var recordedPutObjectCalls: [String] = [] private var recordedCompleteCalls: [[Int]] = [] private var recordedHeadObjectCalls: [String] = [] @@ -557,6 +651,13 @@ private actor RecordingMultipartClient: OssMultipartClientProtocol { lastModified: nil, hashCRC64: nil ), + putObjectResult: UploadedPartDescriptor = UploadedPartDescriptor( + partNumber: 1, + etag: "\"etag-default\"", + size: 1, + lastModified: nil, + hashCRC64: nil + ), completeResult: String = "\"etag-default\"", listPartsResult: [UploadedPartDescriptor] = [], headObjectETagResult: String = "\"etag-default\"" @@ -564,6 +665,7 @@ private actor RecordingMultipartClient: OssMultipartClientProtocol { self.identifier = identifier self.initiateResult = initiateResult self.uploadPartResult = uploadPartResult + self.putObjectResult = putObjectResult self.completeResult = completeResult self.listPartsResult = listPartsResult self.headObjectETagResult = headObjectETagResult @@ -590,6 +692,20 @@ private actor RecordingMultipartClient: OssMultipartClientProtocol { ) } + func putObject( + objectKey: String, + body: Data + ) async throws -> UploadedPartDescriptor { + self.recordedPutObjectCalls.append("\(objectKey):\(body.count)") + return UploadedPartDescriptor( + partNumber: 1, + etag: self.putObjectResult.etag, + size: Int64(body.count), + lastModified: self.putObjectResult.lastModified, + hashCRC64: self.putObjectResult.hashCRC64 + ) + } + func completeMultipartUpload( objectKey: String, multipartUploadID: String, @@ -620,6 +736,10 @@ private actor RecordingMultipartClient: OssMultipartClientProtocol { self.recordedUploadPartCalls } + func putObjectCalls() -> [String] { + self.recordedPutObjectCalls + } + func completeCalls() -> [[Int]] { self.recordedCompleteCalls } @@ -649,6 +769,13 @@ private actor ThrowingMultipartClient: OssMultipartClientProtocol { throw self.error } + func putObject( + objectKey: String, + body: Data + ) async throws -> UploadedPartDescriptor { + throw self.error + } + func completeMultipartUpload( objectKey: String, multipartUploadID: String, diff --git a/Tests/DataGatewayClientIntegrationTests/ArchebaseConfigClientTests.swift b/Tests/DataGatewayClientIntegrationTests/ArchebaseConfigClientTests.swift index ad2dad1..bc15925 100644 --- a/Tests/DataGatewayClientIntegrationTests/ArchebaseConfigClientTests.swift +++ b/Tests/DataGatewayClientIntegrationTests/ArchebaseConfigClientTests.swift @@ -327,6 +327,10 @@ private actor FakeMultipartSession: UploadCoordinatorMultipartSessionProtocol { UploadedPartDescriptor(partNumber: partNumber, etag: "\"etag-\(partNumber)\"", size: Int64(body.count), lastModified: nil, hashCRC64: nil) } + func putObject(body: Data) async throws -> UploadedPartDescriptor { + UploadedPartDescriptor(partNumber: 1, etag: "\"etag-object\"", size: Int64(body.count), lastModified: nil, hashCRC64: nil) + } + func listParts(multipartUploadID: String) async throws -> [UploadedPartDescriptor] { [] } func headObjectETag() async throws -> String { "\"etag-object\"" } diff --git a/Tests/DataGatewayClientIntegrationTests/LocalStackHarnessTests.swift b/Tests/DataGatewayClientIntegrationTests/LocalStackHarnessTests.swift index f9e5d78..8f27bda 100644 --- a/Tests/DataGatewayClientIntegrationTests/LocalStackHarnessTests.swift +++ b/Tests/DataGatewayClientIntegrationTests/LocalStackHarnessTests.swift @@ -221,6 +221,17 @@ struct LocalStackHarnessTests { #expect(config.tls == .plaintext) } +@Test func aliyunEnvironmentAppliesRemoteRequestTimeoutOverride() throws { + let config = try AliyunOSSTestEnvironment(environment: [ + "DGW_REAL_AUTH_ENDPOINT": "http://example-auth:50051", + "DGW_REAL_GATEWAY_ENDPOINT": "http://example-gateway:50053", + "DGW_REAL_CREDENTIAL_BASE64": "credential-base64", + "DGW_REAL_REQUEST_TIMEOUT_SECONDS": "120", + ]).makeRemoteClientConfig() + + #expect(config.requestTimeout == .seconds(120)) +} + @Test func aliyunEnvironmentRemoteConfigRequiresCredentialBeforeEndpointOverrides() { let environment = AliyunOSSTestEnvironment(environment: [:]) @@ -609,11 +620,8 @@ struct LocalStackHarnessTests { let clientConfig = try uniqueRealClientConfig(from: environment.makeRemoteClientConfig(), label: "events") defer { try? FileManager.default.removeItem(at: clientConfig.persistRootURL) } let client = try DataGatewayClient(config: clientConfig) - let fileURL = try writeRealPayload( - Data("aliyun-real-events-payload-\(UUID().uuidString)".utf8), - under: clientConfig.persistRootURL, - name: "aliyun-real-events" - ) + let payload = Data("aliyun-real-events-payload-\(UUID().uuidString)".utf8) + let fileURL = try writeRealPayload(payload, under: clientConfig.persistRootURL, name: "aliyun-real-events") var events: [UploadEvent] = [] for try await event in await client.uploadEvents( @@ -627,18 +635,45 @@ struct LocalStackHarnessTests { events.append(event) } - #expect(events.contains(.preparing)) - #expect(events.contains(.authenticating)) - #expect(events.contains(.creatingLogicalUpload)) - #expect(events.contains(where: { if case .initiatingMultipart = $0 { true } else { false } })) - #expect(events.contains(where: { if case .uploadingPart = $0 { true } else { false } })) - #expect(events.contains(where: { if case .completingMultipart = $0 { true } else { false } })) - #expect(events.contains(where: { if case .completingBusinessUpload = $0 { true } else { false } })) + assertPutObjectUploadEvents(events) - guard case .completed(let result) = events.last else { + guard let result = completedUploadResult(from: events) else { Issue.record("real Aliyun uploadEvents did not end with completed: \(events)") return } + #expect(result.fileSize == UInt64(payload.count)) + #expect(result.bucket == expectation.bucket) + #expect(result.objectKey.hasPrefix(expectation.objectPrefix)) + #expect(!result.ossObjectETag.isEmpty) +} + +@Test( + .enabled(if: realRuntimeIntegrationEnabled) +) func realAliyunExactPartSizeUploadEventsFlow() async throws { + let environment = AliyunOSSTestEnvironment() + try environment.validate() + let expectation = try environment.remoteUploadExpectation() + let clientConfig = try uniqueRealClientConfig(from: environment.makeRemoteClientConfig(), label: "exact-part-size") + defer { try? FileManager.default.removeItem(at: clientConfig.persistRootURL) } + let client = try DataGatewayClient(config: clientConfig) + let size = realPartSizePayloadSizeBytes() + let fileURL = try writeRealPayload(Data(repeating: 0x45, count: size), under: clientConfig.persistRootURL, name: "aliyun-real-exact-part-size") + + var events: [UploadEvent] = [] + for try await event in await client.uploadEvents( + UploadRequest( + fileURL: fileURL, + clientHints: ["suite": "aliyun-real", "mode": "exact-part-size"], + rawTags: ["suite": "aliyun-real", "runtime": "exact-part-size"], + displayName: "aliyun-real-exact-part-size" + ) + ) { + events.append(event) + } + + let result = try #require(completedUploadResult(from: events)) + assertPutObjectUploadEvents(events) + #expect(result.fileSize == UInt64(size)) #expect(result.bucket == expectation.bucket) #expect(result.objectKey.hasPrefix(expectation.objectPrefix)) #expect(!result.ossObjectETag.isEmpty) @@ -656,8 +691,7 @@ struct LocalStackHarnessTests { let size = realMultipartPayloadSizeBytes() let fileURL = try writeRealPayload(Data(repeating: 0x5A, count: size), under: clientConfig.persistRootURL, name: "aliyun-real-multipart") - var uploadedPartCount = 0 - var completedResult: UploadResult? + var events: [UploadEvent] = [] for try await event in await client.uploadEvents( UploadRequest( fileURL: fileURL, @@ -666,15 +700,13 @@ struct LocalStackHarnessTests { displayName: "aliyun-real-multipart" ) ) { - if case .uploadingPart = event { - uploadedPartCount += 1 - } - if case .completed(let result) = event { - completedResult = result - } + events.append(event) } - let result = try #require(completedResult) + let result = try #require(completedUploadResult(from: events)) + let uploadedPartCount = uploadEventCount(events, matching: isUploadingPartEvent) + #expect(events.contains(where: isInitiatingMultipartEvent)) + #expect(events.contains(where: isCompletingMultipartEvent)) #expect(uploadedPartCount >= 2) #expect(result.fileSize == UInt64(size)) #expect(result.bucket == expectation.bucket) @@ -911,6 +943,74 @@ private func realMultipartPayloadSizeBytes() -> Int { return 67_108_864 + 1024 } +private func realPartSizePayloadSizeBytes() -> Int { + if let value = ProcessInfo.processInfo.environment["DGW_REAL_PART_SIZE_BYTES"], + let parsed = Int(value), + parsed > 0 { + return parsed + } + return 67_108_864 +} + +private func assertPutObjectUploadEvents(_ events: [UploadEvent]) { + #expect(events.contains(.preparing)) + #expect(events.contains(.authenticating)) + #expect(events.contains(.creatingLogicalUpload)) + #expect(uploadEventCount(events, matching: isUploadingPartEvent) == 1) + #expect(events.contains(where: isCompletingBusinessUploadEvent)) + #expect(events.last.map(isCompletedEvent) ?? false) + #expect(!events.contains(where: isInitiatingMultipartEvent)) + #expect(!events.contains(where: isCompletingMultipartEvent)) +} + +private func completedUploadResult(from events: [UploadEvent]) -> UploadResult? { + guard case .completed(let result) = events.last else { + return nil + } + return result +} + +private func uploadEventCount(_ events: [UploadEvent], matching matcher: (UploadEvent) -> Bool) -> Int { + events.reduce(0) { count, event in + matcher(event) ? count + 1 : count + } +} + +private func isInitiatingMultipartEvent(_ event: UploadEvent) -> Bool { + if case .initiatingMultipart = event { + return true + } + return false +} + +private func isUploadingPartEvent(_ event: UploadEvent) -> Bool { + if case .uploadingPart = event { + return true + } + return false +} + +private func isCompletingMultipartEvent(_ event: UploadEvent) -> Bool { + if case .completingMultipart = event { + return true + } + return false +} + +private func isCompletingBusinessUploadEvent(_ event: UploadEvent) -> Bool { + if case .completingBusinessUpload = event { + return true + } + return false +} + +private func isCompletedEvent(_ event: UploadEvent) -> Bool { + if case .completed = event { + return true + } + return false +} + private func hasRealUserAuthorizationHeader() -> Bool { do { _ = try realUserAuthorizationHeader() diff --git a/Tests/DataGatewayClientIntegrationTests/UploadCoordinatorTests.swift b/Tests/DataGatewayClientIntegrationTests/UploadCoordinatorTests.swift index 74ce426..c3e3a2f 100644 --- a/Tests/DataGatewayClientIntegrationTests/UploadCoordinatorTests.swift +++ b/Tests/DataGatewayClientIntegrationTests/UploadCoordinatorTests.swift @@ -311,6 +311,9 @@ import Testing CompleteInvocation(uploadID: "upload-idempotent", fileSize: 12, rawTags: completedRawTags, completedPartCount: 1, ossObjectEtag: "\"etag-object\"", partSizeBytes: 12), CompleteInvocation(uploadID: "upload-idempotent", fileSize: 12, rawTags: completedRawTags, completedPartCount: 1, ossObjectEtag: "\"etag-object\"", partSizeBytes: 12), ]) + #expect(await ossSession.initiateCalls() == 0) + #expect(await ossSession.putObjectCalls() == [payload.count]) + #expect(await ossSession.uploadCalls().isEmpty) } @Test func contractOssObjectETagMismatchPropagatesIntegrityFailure() async { @@ -432,9 +435,7 @@ import Testing .preparing, .authenticating, .creatingLogicalUpload, - .initiatingMultipart(uploadID: "upload-1"), .uploadingPart(partNumber: 1, sentBytes: UInt64(payload.count), totalBytes: UInt64(payload.count)), - .completingMultipart(uploadID: "upload-1"), .completingBusinessUpload(uploadID: "upload-1"), .completed(result), ]) @@ -450,7 +451,10 @@ import Testing partSizeBytes: 64 * 1024 * 1024 ), ]) - #expect(await ossSession.uploadCalls() == [UploadCall(multipartUploadID: "multipart-1", partNumber: 1, size: payload.count)]) + #expect(await ossSession.initiateCalls() == 0) + #expect(await ossSession.uploadCalls().isEmpty) + #expect(await ossSession.putObjectCalls() == [payload.count]) + #expect(await ossSession.completeCalls().isEmpty) let pending = try await stateStore.listPendingUploads() #expect(pending.isEmpty) @@ -517,6 +521,9 @@ import Testing #expect(result.objectKey == "objects/small.bin") #expect(result.fileSize == UInt64(payload.count)) #expect(result.ossObjectETag == "\"etag-small-object\"") + #expect(await ossSession.initiateCalls() == 0) + #expect(await ossSession.putObjectCalls() == [payload.count]) + #expect(await ossSession.uploadCalls().isEmpty) } @Test func dataGatewayClientUploadHandlesMultipartFiles() async throws { @@ -568,11 +575,14 @@ import Testing ) #expect(result.uploadID == "upload-multipart") + #expect(await ossSession.initiateCalls() == 1) + #expect(await ossSession.putObjectCalls().isEmpty) #expect(await ossSession.uploadCalls() == [ UploadCall(multipartUploadID: "multipart-24", partNumber: 1, size: 8), UploadCall(multipartUploadID: "multipart-24", partNumber: 2, size: 8), UploadCall(multipartUploadID: "multipart-24", partNumber: 3, size: 8), ]) + #expect(await ossSession.completeCalls() == [[1, 2, 3]]) #expect(await gatewayClient.completeInvocations() == [ CompleteInvocation( uploadID: "upload-multipart", @@ -640,9 +650,7 @@ import Testing .preparing, .authenticating, .creatingLogicalUpload, - .initiatingMultipart(uploadID: "upload-events"), .uploadingPart(partNumber: 1, sentBytes: UInt64(payload.count), totalBytes: UInt64(payload.count)), - .completingMultipart(uploadID: "upload-events"), .completingBusinessUpload(uploadID: "upload-events"), .completed(UploadResult( logicalUploadID: "logical-1", @@ -683,7 +691,7 @@ import Testing UploadedPartDescriptor(partNumber: 1, etag: "\"etag-events-error-part\"", size: Int64(payload.count), lastModified: nil, hashCRC64: nil), ], completedETag: "\"etag-events-error-object\"", - failOnComplete: DataGatewayClientError.ossFailed(httpStatus: 500, ossCode: "InternalError", message: "complete failed") + failOnPutObject: DataGatewayClientError.ossFailed(httpStatus: 500, ossCode: "InternalError", message: "put failed") ) let client = DataGatewayClient( uploadCoordinator: UploadCoordinator( @@ -707,7 +715,7 @@ import Testing } Issue.record("expected uploadEvents to throw") } catch let error as DataGatewayClientError { - #expect(error == .ossFailed(httpStatus: 500, ossCode: "InternalError", message: "complete failed")) + #expect(error == .ossFailed(httpStatus: 500, ossCode: "InternalError", message: "put failed")) } catch { Issue.record("unexpected error: \(error)") } @@ -716,9 +724,7 @@ import Testing .preparing, .authenticating, .creatingLogicalUpload, - .initiatingMultipart(uploadID: "upload-events-error"), .uploadingPart(partNumber: 1, sentBytes: UInt64(payload.count), totalBytes: UInt64(payload.count)), - .completingMultipart(uploadID: "upload-events-error"), ]) } @@ -1677,6 +1683,220 @@ import Testing #expect(error == .uploadRestartExceeded) } +@Test func putObjectResumeCompletedStateUsesHeadObjectAndCompletesBusinessUpload() async throws { + let managedURL = URL(fileURLWithPath: "/staging/put-resume-complete.bin") + let payload = Data("resume".utf8) + let stateStore = UploadStateStore( + persistRoot: FileManager.default.temporaryDirectory.appendingPathComponent("data-gateway-client-put-complete-\(UUID().uuidString)"), + fileManager: .default, + clock: FixedUploadCoordinatorStoreClock(now: Date(timeIntervalSince1970: 2_700)) + ) + try await stateStore.saveActive( + makePersistedResumeState( + managedFileURL: managedURL, + multipartUploadID: nil, + phase: .multipartCompleted, + fileSize: UInt64(payload.count), + firstChunkMD5Hex: "69F2AFC2390CEC954F7C208B07212D39", + uploadedParts: [ + PersistedUploadedPart(partNumber: 1, etag: "etag-put", offsetStart: 0, partSize: UInt64(payload.count), md5Hex: "69F2AFC2390CEC954F7C208B07212D39"), + ] + ) + ) + + let gatewayClient = MockUploadCoordinatorGatewayClient( + createResponse: makeCreateLogicalUploadResponse(), + recoveryResponse: makeContinueRecoveryResponse(currentUploadID: "upload-put-complete", completedPartCount: 0), + reissueResponse: makeReissueResponse( + uploadID: "upload-put-complete", + credentials: makeCoordinatorUploadCredentials(expireAtUnix: 9_000, tokenSuffix: "put-complete", objectKey: "objects/put-complete.bin", partSizeBytes: 8) + ), + completeResponse: Archebase_DataGateway_V1_CompleteUploadResponse() + ) + let ossSession = MockOssUploadSession( + multipartUploadID: "multipart-unused", + uploadedParts: [], + completedETag: "\"etag-unused\"", + headObjectETag: "\"etag-put\"" + ) + let client = DataGatewayClient( + uploadCoordinator: UploadCoordinator( + executionPolicy: makeExecutionPolicy(), + dependencies: UploadCoordinatorDependencies( + gatewayClient: gatewayClient, + stateStore: stateStore, + fileCoordinator: FileStagingCoordinator( + stagingRoot: URL(fileURLWithPath: "/staging"), + fileSystem: MemoryFileSystem(files: [ + managedURL: .file(size: UInt64(payload.count), modifiedAt: Date(timeIntervalSince1970: 250), data: payload), + ]), + securityScopedAccessor: PassthroughSecurityScopedAccessor() + ), + ossClientFactory: { _ in ossSession }, + clock: FixedUploadCoordinatorClock(now: Date(timeIntervalSince1970: 2_700)) + ) + ) + ) + + let result = try await client.resumeUpload(logicalUploadID: "logical-resume") + + #expect(result.uploadID == "upload-put-complete") + #expect(result.ossObjectETag == "\"etag-put\"") + #expect(await ossSession.initiateCalls() == 0) + #expect(await ossSession.putObjectCalls().isEmpty) + #expect(await ossSession.uploadCalls().isEmpty) + #expect(await gatewayClient.completeInvocations() == [ + CompleteInvocation( + uploadID: "upload-put-complete", + fileSize: Int64(payload.count), + rawTags: ["scene": "robot"], + completedPartCount: 1, + ossObjectEtag: "\"etag-put\"", + partSizeBytes: 8 + ), + ]) +} + +@Test func putObjectResumeMissingObjectRestartsWithPutObject() async throws { + let managedURL = URL(fileURLWithPath: "/staging/put-resume-missing.bin") + let payload = Data("missing".utf8) + let stateStore = UploadStateStore( + persistRoot: FileManager.default.temporaryDirectory.appendingPathComponent("data-gateway-client-put-missing-\(UUID().uuidString)"), + fileManager: .default, + clock: FixedUploadCoordinatorStoreClock(now: Date(timeIntervalSince1970: 2_800)) + ) + try await stateStore.saveActive( + makePersistedResumeState( + managedFileURL: managedURL, + multipartUploadID: nil, + phase: .multipartCompleted, + fileSize: UInt64(payload.count), + firstChunkMD5Hex: "EA21841DA70E6405AF19FABC4FF8BDD9", + uploadedParts: [ + PersistedUploadedPart(partNumber: 1, etag: "\"etag-old\"", offsetStart: 0, partSize: UInt64(payload.count), md5Hex: "EA21841DA70E6405AF19FABC4FF8BDD9"), + ] + ) + ) + + let gatewayClient = MockUploadCoordinatorGatewayClient( + createResponse: makeCreateLogicalUploadResponse(uploadID: "upload-put-restarted", objectKey: "objects/put-restarted.bin", partSizeBytes: 8), + recoveryResponse: makeContinueRecoveryResponse(currentUploadID: "upload-put-missing", completedPartCount: 0), + reissueResponse: makeReissueResponse( + uploadID: "upload-put-missing", + credentials: makeCoordinatorUploadCredentials(expireAtUnix: 9_000, tokenSuffix: "put-missing", objectKey: "objects/put-missing.bin", partSizeBytes: 8) + ), + completeResponse: Archebase_DataGateway_V1_CompleteUploadResponse() + ) + let ossSession = MockOssUploadSession( + multipartUploadID: "multipart-unused", + uploadedParts: [], + completedETag: "\"etag-restarted\"", + headObjectError: DataGatewayClientError.ossFailed(httpStatus: 404, ossCode: "NoSuchKey", message: "not found") + ) + let client = DataGatewayClient( + uploadCoordinator: UploadCoordinator( + executionPolicy: makeExecutionPolicy(), + dependencies: UploadCoordinatorDependencies( + gatewayClient: gatewayClient, + stateStore: stateStore, + fileCoordinator: FileStagingCoordinator( + stagingRoot: URL(fileURLWithPath: "/staging"), + fileSystem: MemoryFileSystem(files: [ + managedURL: .file(size: UInt64(payload.count), modifiedAt: Date(timeIntervalSince1970: 250), data: payload), + ]), + securityScopedAccessor: PassthroughSecurityScopedAccessor() + ), + ossClientFactory: { _ in ossSession }, + clock: FixedUploadCoordinatorClock(now: Date(timeIntervalSince1970: 2_800)) + ) + ) + ) + + let result = try await client.resumeUpload(logicalUploadID: "logical-resume") + + #expect(result.uploadID == "upload-put-restarted") + #expect(result.ossObjectETag == "\"etag-restarted\"") + #expect(await gatewayClient.createRestartInvocations() == [CreateInvocation(clientHints: ["device": "iphone"], restartFromUploadID: "upload-put-missing")]) + #expect(await ossSession.putObjectCalls() == [payload.count]) + #expect(await ossSession.initiateCalls() == 0) + #expect(await ossSession.uploadCalls().isEmpty) + #expect(await gatewayClient.completeInvocations() == [ + CompleteInvocation( + uploadID: "upload-put-restarted", + fileSize: Int64(payload.count), + rawTags: ["scene": "robot"], + completedPartCount: 1, + ossObjectEtag: "\"etag-restarted\"", + partSizeBytes: 8 + ), + ]) +} + +@Test func putObjectResumeETagMismatchRestartsWithPutObject() async throws { + let managedURL = URL(fileURLWithPath: "/staging/put-resume-mismatch.bin") + let payload = Data("mismatch".utf8) + let stateStore = UploadStateStore( + persistRoot: FileManager.default.temporaryDirectory.appendingPathComponent("data-gateway-client-put-mismatch-\(UUID().uuidString)"), + fileManager: .default, + clock: FixedUploadCoordinatorStoreClock(now: Date(timeIntervalSince1970: 2_900)) + ) + try await stateStore.saveActive( + makePersistedResumeState( + managedFileURL: managedURL, + multipartUploadID: nil, + phase: .multipartCompleted, + fileSize: UInt64(payload.count), + firstChunkMD5Hex: "1D1C5B76DA944B44DB42C9D0558021C5", + uploadedParts: [ + PersistedUploadedPart(partNumber: 1, etag: "\"etag-old\"", offsetStart: 0, partSize: UInt64(payload.count), md5Hex: "1D1C5B76DA944B44DB42C9D0558021C5"), + ] + ) + ) + + let gatewayClient = MockUploadCoordinatorGatewayClient( + createResponse: makeCreateLogicalUploadResponse(uploadID: "upload-put-mismatch-new", objectKey: "objects/put-mismatch-new.bin", partSizeBytes: 8), + recoveryResponse: makeContinueRecoveryResponse(currentUploadID: "upload-put-mismatch", completedPartCount: 0), + reissueResponse: makeReissueResponse( + uploadID: "upload-put-mismatch", + credentials: makeCoordinatorUploadCredentials(expireAtUnix: 9_000, tokenSuffix: "put-mismatch", objectKey: "objects/put-mismatch.bin", partSizeBytes: 8) + ), + completeResponse: Archebase_DataGateway_V1_CompleteUploadResponse() + ) + let ossSession = MockOssUploadSession( + multipartUploadID: "multipart-unused", + uploadedParts: [], + completedETag: "\"etag-mismatch-new\"", + headObjectETag: "\"etag-other\"" + ) + let client = DataGatewayClient( + uploadCoordinator: UploadCoordinator( + executionPolicy: makeExecutionPolicy(), + dependencies: UploadCoordinatorDependencies( + gatewayClient: gatewayClient, + stateStore: stateStore, + fileCoordinator: FileStagingCoordinator( + stagingRoot: URL(fileURLWithPath: "/staging"), + fileSystem: MemoryFileSystem(files: [ + managedURL: .file(size: UInt64(payload.count), modifiedAt: Date(timeIntervalSince1970: 250), data: payload), + ]), + securityScopedAccessor: PassthroughSecurityScopedAccessor() + ), + ossClientFactory: { _ in ossSession }, + clock: FixedUploadCoordinatorClock(now: Date(timeIntervalSince1970: 2_900)) + ) + ) + ) + + let result = try await client.resumeUpload(logicalUploadID: "logical-resume") + + #expect(result.uploadID == "upload-put-mismatch-new") + #expect(result.ossObjectETag == "\"etag-mismatch-new\"") + #expect(await gatewayClient.createRestartInvocations() == [CreateInvocation(clientHints: ["device": "iphone"], restartFromUploadID: "upload-put-mismatch")]) + #expect(await ossSession.putObjectCalls() == [payload.count]) + #expect(await ossSession.initiateCalls() == 0) + #expect(await ossSession.uploadCalls().isEmpty) +} + @Test func refreshesCredentialsBeforeNextPartWhenTtlIsLow() async throws { let sourceURL = URL(fileURLWithPath: "/files/refresh-next-part.bin") let payload = Data(repeating: 0x80, count: 24) @@ -1757,18 +1977,19 @@ import Testing clock: FixedUploadCoordinatorStoreClock(now: Date(timeIntervalSince1970: 3_100)) ) let gatewayClient = MockUploadCoordinatorGatewayClient( - createResponse: makeCreateLogicalUploadResponse(uploadID: "upload-refresh-complete", objectKey: "objects/refresh-complete.bin", partSizeBytes: 8), + createResponse: makeCreateLogicalUploadResponse(uploadID: "upload-refresh-complete", objectKey: "objects/refresh-complete.bin", partSizeBytes: 4), recoveryResponse: makeContinueRecoveryResponse(currentUploadID: "upload-refresh-complete"), - reissueResponse: makeReissueResponse(uploadID: "upload-refresh-complete", credentials: makeCoordinatorUploadCredentials(expireAtUnix: 5_200, tokenSuffix: "refresh-complete", objectKey: "objects/refresh-complete.bin", partSizeBytes: 8)), + reissueResponse: makeReissueResponse(uploadID: "upload-refresh-complete", credentials: makeCoordinatorUploadCredentials(expireAtUnix: 5_200, tokenSuffix: "refresh-complete", objectKey: "objects/refresh-complete.bin", partSizeBytes: 4)), completeResponse: Archebase_DataGateway_V1_CompleteUploadResponse() ) let ossSession = RefreshAwareMockOssSession( multipartUploadID: "multipart-refresh-complete", uploadDescriptors: [ - UploadedPartDescriptor(partNumber: 1, etag: "\"etag-1\"", size: 8, lastModified: nil, hashCRC64: nil), + UploadedPartDescriptor(partNumber: 1, etag: "\"etag-1\"", size: 4, lastModified: nil, hashCRC64: nil), + UploadedPartDescriptor(partNumber: 2, etag: "\"etag-2\"", size: 4, lastModified: nil, hashCRC64: nil), ], completedETag: "\"etag-refresh-complete\"", - refreshResults: [false, true], + refreshResults: [false, false, true], expirations: [ Date(timeIntervalSince1970: 3_110), Date(timeIntervalSince1970: 5_200), @@ -1796,7 +2017,7 @@ import Testing let events = await eventRecorder.events() #expect(events.contains(.refreshingCredentials(uploadID: "upload-refresh-complete"))) - #expect(await ossSession.refreshCheckCount() == 2) + #expect(await ossSession.refreshCheckCount() == 3) let snapshot = try await stateStore.loadSnapshot(logicalUploadID: "logical-1") #expect(snapshot?.lastKnownSTSExpireAt == Date(timeIntervalSince1970: 5_200)) } @@ -2159,6 +2380,7 @@ private actor RefreshAwareMockOssSession: UploadCoordinatorMultipartSessionProto private let refreshError: DataGatewayClientError? private var refreshChecks = 0 private var refreshIndex = 0 + private var putObjectInvocations: [Int] = [] init( multipartUploadID: String, @@ -2214,6 +2436,17 @@ private actor RefreshAwareMockOssSession: UploadCoordinatorMultipartSessionProto return descriptor } + func putObject(body: Data) async throws -> UploadedPartDescriptor { + self.putObjectInvocations.append(body.count) + return UploadedPartDescriptor( + partNumber: 1, + etag: self.completedETag, + size: Int64(body.count), + lastModified: nil, + hashCRC64: nil + ) + } + func listParts(multipartUploadID: String) async throws -> [UploadedPartDescriptor] { _ = multipartUploadID return [] @@ -2235,6 +2468,10 @@ private actor RefreshAwareMockOssSession: UploadCoordinatorMultipartSessionProto func refreshCheckCount() -> Int { self.refreshChecks } + + func putObjectCalls() -> [Int] { + self.putObjectInvocations + } } private actor MockOssUploadSession: UploadCoordinatorMultipartSessionProtocol { @@ -2242,16 +2479,21 @@ private actor MockOssUploadSession: UploadCoordinatorMultipartSessionProtocol { private let uploadedParts: [UploadedPartDescriptor] private let completedETag: String private let failOnComplete: DataGatewayClientError? + private let failOnPutObject: DataGatewayClientError? private let listedParts: [UploadedPartDescriptor] private let headObjectETagValue: String? private let headObjectError: DataGatewayClientError? + private var initiateInvocations = 0 private var uploadInvocations: [UploadCall] = [] + private var putObjectInvocations: [Int] = [] + private var completeInvocations: [[Int]] = [] init( multipartUploadID: String, uploadedParts: [UploadedPartDescriptor], completedETag: String, failOnComplete: DataGatewayClientError? = nil, + failOnPutObject: DataGatewayClientError? = nil, listedParts: [UploadedPartDescriptor]? = nil, headObjectETag: String? = nil, headObjectError: DataGatewayClientError? = nil @@ -2260,6 +2502,7 @@ private actor MockOssUploadSession: UploadCoordinatorMultipartSessionProtocol { self.uploadedParts = uploadedParts self.completedETag = completedETag self.failOnComplete = failOnComplete + self.failOnPutObject = failOnPutObject self.listedParts = listedParts ?? uploadedParts self.headObjectETagValue = headObjectETag self.headObjectError = headObjectError @@ -2274,7 +2517,8 @@ private actor MockOssUploadSession: UploadCoordinatorMultipartSessionProtocol { } func initiateMultipartUpload() async throws -> String { - self.multipartUploadID + self.initiateInvocations += 1 + return self.multipartUploadID } func uploadPart( @@ -2289,6 +2533,20 @@ private actor MockOssUploadSession: UploadCoordinatorMultipartSessionProtocol { return descriptor } + func putObject(body: Data) async throws -> UploadedPartDescriptor { + self.putObjectInvocations.append(body.count) + if let failOnPutObject { + throw failOnPutObject + } + return UploadedPartDescriptor( + partNumber: 1, + etag: self.completedETag, + size: Int64(body.count), + lastModified: nil, + hashCRC64: nil + ) + } + func listParts(multipartUploadID: String) async throws -> [UploadedPartDescriptor] { _ = multipartUploadID return self.listedParts @@ -2306,16 +2564,28 @@ private actor MockOssUploadSession: UploadCoordinatorMultipartSessionProtocol { parts: [UploadedPartDescriptor] ) async throws -> String { _ = multipartUploadID - _ = parts + self.completeInvocations.append(parts.map(\.partNumber)) if let failOnComplete { throw failOnComplete } return self.completedETag } + func initiateCalls() -> Int { + self.initiateInvocations + } + func uploadCalls() -> [UploadCall] { self.uploadInvocations } + + func putObjectCalls() -> [Int] { + self.putObjectInvocations + } + + func completeCalls() -> [[Int]] { + self.completeInvocations + } } private struct CompleteInvocation: Equatable, Sendable {