From 570eae071248af60c84b41ed16763bd7bd8f6e2a Mon Sep 17 00:00:00 2001 From: Francis Terrero Date: Sat, 6 Jun 2026 17:51:19 -0400 Subject: [PATCH 1/3] refactor(android): migrate DocumentsProvider IO to kotlinx coroutines Replace CompletableFuture.get(), Call.execute() and the manual AtomicReference + setOnCancelListener cancellation plumbing with kotlinx-coroutines across the native DocumentsProvider stack: crypto, download and upload now suspend via Call.await()/CallAwait and CryptoServiceAwait, and openDocument bridges to coroutines through runBlockingIo (BlockingIo) instead of a plain runBlocking, surfacing cancellation as FileNotFoundException. Adds kotlinx-coroutines-android and kotlinx-coroutines-test to android/app/build.gradle and JVM unit tests for BlockingIo, CryptoServiceAwait, EncryptedFileDownloader and EncryptedFileUploader. NOTE: a separate, unrelated change by the maintainer lives in the same file (InternxtDocumentsProvider.kt) and could not be split out because interactive staging (git add -p) is unavailable and editing source is out of scope for the git-manager: refresh() now triggers a background revalidateChildren (new revalidate/fetchAllRows helpers and the FolderLoad.isRevalidating flag) instead of invalidateChildren. It is bundled here intentionally; see progress/git_android_coroutines_migration.md. Refs: PB-6282 --- android/app/build.gradle | 2 + .../internxt/cloud/documents/BlockingIo.kt | 16 +++ .../documents/InternxtDocumentsProvider.kt | 132 +++++++++++++----- .../documents/crypto/CryptoServiceAwait.kt | 32 ++--- .../download/EncryptedFileDownloader.kt | 36 ++--- .../cloud/documents/http/CallAwait.kt | 28 ++++ .../documents/upload/EncryptedFileUploader.kt | 50 ++----- .../cloud/documents/BlockingIoTest.kt | 33 +++++ .../crypto/CryptoServiceAwaitTest.kt | 50 +++++++ .../download/EncryptedFileDownloaderTest.kt | 92 ++++++++++++ .../upload/EncryptedFileUploaderTest.kt | 49 ++++++- 11 files changed, 398 insertions(+), 122 deletions(-) create mode 100644 android/app/src/main/java/com/internxt/cloud/documents/BlockingIo.kt create mode 100644 android/app/src/main/java/com/internxt/cloud/documents/http/CallAwait.kt create mode 100644 android/app/src/test/java/com/internxt/cloud/documents/BlockingIoTest.kt create mode 100644 android/app/src/test/java/com/internxt/cloud/documents/crypto/CryptoServiceAwaitTest.kt create mode 100644 android/app/src/test/java/com/internxt/cloud/documents/download/EncryptedFileDownloaderTest.kt diff --git a/android/app/build.gradle b/android/app/build.gradle index 926452939..5f236d540 100644 --- a/android/app/build.gradle +++ b/android/app/build.gradle @@ -200,8 +200,10 @@ dependencies { implementation("com.squareup.okhttp3:okhttp:5.3.2") implementation("androidx.security:security-crypto:1.1.0") implementation("org.bouncycastle:bcprov-jdk15to18:1.78.1") + implementation("org.jetbrains.kotlinx:kotlinx-coroutines-android:1.10.2") testImplementation("junit:junit:4.13.2") testImplementation("com.squareup.okhttp3:mockwebserver:5.3.2") testImplementation("org.json:json:20240303") + testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.10.2") } diff --git a/android/app/src/main/java/com/internxt/cloud/documents/BlockingIo.kt b/android/app/src/main/java/com/internxt/cloud/documents/BlockingIo.kt new file mode 100644 index 000000000..698c7a384 --- /dev/null +++ b/android/app/src/main/java/com/internxt/cloud/documents/BlockingIo.kt @@ -0,0 +1,16 @@ +package com.internxt.cloud.documents + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.runBlocking + +/** + * Bridges a binder thread to a suspending body, running it on [Dispatchers.IO]. + * + * SAF entry points such as `openDocument` arrive on a binder thread governed by a StrictMode + * policy that forbids blocking network. Some collaborators (e.g. the synchronous OkHttp calls in + * InternxtApiClient) still block, so the body must not run on the caller thread or it trips + * NetworkOnMainThreadException. Plain `runBlocking { }` would do exactly that. + */ +internal fun runBlockingIo(body: suspend CoroutineScope.() -> T): T = + runBlocking(Dispatchers.IO, body) diff --git a/android/app/src/main/java/com/internxt/cloud/documents/InternxtDocumentsProvider.kt b/android/app/src/main/java/com/internxt/cloud/documents/InternxtDocumentsProvider.kt index 452c3046e..8cad697ce 100644 --- a/android/app/src/main/java/com/internxt/cloud/documents/InternxtDocumentsProvider.kt +++ b/android/app/src/main/java/com/internxt/cloud/documents/InternxtDocumentsProvider.kt @@ -43,6 +43,14 @@ import java.time.Instant import java.util.UUID import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.Executors +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.cancel +import kotlinx.coroutines.launch +import kotlin.coroutines.coroutineContext class InternxtDocumentsProvider : DocumentsProvider() { @@ -52,9 +60,7 @@ class InternxtDocumentsProvider : DocumentsProvider() { Thread(r, "InternxtDocsProvider-loader").apply { isDaemon = true } } - private val openExecutor = Executors.newCachedThreadPool { r -> - Thread(r, "InternxtDocsProvider-open").apply { isDaemon = true } - } + private val uploadScope = CoroutineScope(SupervisorJob() + Dispatchers.IO) private val folderLoads = ConcurrentHashMap() private val pendingUploads = ConcurrentHashMap() @@ -64,6 +70,7 @@ class InternxtDocumentsProvider : DocumentsProvider() { private class FolderLoad { @Volatile var state: LoadState = LoadState.LOADING @Volatile var errorMessage: String? = null + @Volatile var isRevalidating: Boolean = false val rows = mutableListOf>() } private val itemKinds = ConcurrentHashMap() @@ -82,6 +89,7 @@ class InternxtDocumentsProvider : DocumentsProvider() { override fun shutdown() { if (activeInstance === this) activeInstance = null + uploadScope.cancel() super.shutdown() } @@ -377,10 +385,59 @@ class InternxtDocumentsProvider : DocumentsProvider() { val documentId = try { DocumentsContract.getDocumentId(uri) } catch (_: Exception) { null } Log.d(TAG, "refresh uri=$uri documentId=$documentId") if (documentId == null) return false - invalidateChildren(documentId) + revalidateChildren(documentId) return true } + private fun revalidateChildren(parentDocumentId: String) { + val load = folderLoads[parentDocumentId] + if (load == null) { + notifyChildren(parentDocumentId) + return + } + synchronized(load) { + if (load.state == LoadState.LOADING || load.isRevalidating) return + load.isRevalidating = true + } + val parentUuid = rawUuid(parentDocumentId) + val notifyUri = DocumentsContract.buildChildDocumentsUri(AUTHORITY, parentDocumentId) + loaderExecutor.execute { revalidate(parentUuid, load, notifyUri) } + } + + private fun revalidate(parentUuid: String, load: FolderLoad, notifyUri: Uri) { + val api = apiClient(op = "refresh[bg]") + if (api == null) { + synchronized(load) { load.isRevalidating = false } + return + } + try { + val fresh = fetchAllRows(api, parentUuid) + synchronized(load) { + load.rows.clear() + load.rows.addAll(fresh) + load.state = LoadState.DONE + load.errorMessage = null + load.isRevalidating = false + } + context?.contentResolver?.notifyChange(notifyUri, null) + Log.d(TAG, "refresh parent=$parentUuid revalidated rows=${fresh.size}") + } catch (e: InternxtApiException) { + synchronized(load) { load.isRevalidating = false } + Log.w(TAG, "refresh parent=$parentUuid revalidation failed: ${e.javaClass.simpleName}: ${e.message}") + } + } + + private fun fetchAllRows(api: InternxtApiClient, parentUuid: String): List> { + val rows = mutableListOf>() + streamPages({ offset, size -> api.listFolderFolders(parentUuid, offset, size) }) { page -> + rows.addAll(page.map { DocumentRowBuilder.folderRow(it) }) + } + streamPages({ offset, size -> api.listFolderFiles(parentUuid, offset, size) }) { page -> + rows.addAll(page.map { DocumentRowBuilder.fileRow(it) }) + } + return rows + } + override fun openDocument( documentId: String?, mode: String?, @@ -402,24 +459,24 @@ class InternxtDocumentsProvider : DocumentsProvider() { } val fileUuid = decoded.uuid - val future = openExecutor.submit { - openDocumentBlocking(ctx, id, signal, fileUuid) - } return try { - future.get() - } catch (e: java.util.concurrent.ExecutionException) { - val cause = e.cause - if (cause is FileNotFoundException) throw cause - throw FileNotFoundException("openDocument failed: ${cause?.message ?: e.message}").apply { - if (cause != null) initCause(cause) + runBlockingIo { + signal?.setOnCancelListener { coroutineContext.cancel() } + openDocumentSuspending(ctx, id, fileUuid) } + } catch (e: FileNotFoundException) { + throw e + } catch (e: CancellationException) { + throw FileNotFoundException("openDocument $id cancelled").apply { initCause(e) } + } catch (e: Exception) { + Log.w(TAG, "openDocument $id failed", e) + throw FileNotFoundException("openDocument failed: ${e.message}").apply { initCause(e) } } } - private fun openDocumentBlocking( + private suspend fun openDocumentSuspending( ctx: Context, id: String, - signal: CancellationSignal?, fileUuid: String, ): ParcelFileDescriptor { val cfg = authManager?.loadAuthConfig() ?: throw FileNotFoundException(NOT_AUTHENTICATED) @@ -438,7 +495,7 @@ class InternxtDocumentsProvider : DocumentsProvider() { if (cacheFile.exists() && cacheFile.length() > 0) { return openCached(ctx, id, cacheFile) } - materializeIntoCache(ctx, id, api, cfg.mnemonic, file, cacheFile, signal) + materializeIntoCache(ctx, id, api, cfg.mnemonic, file, cacheFile) return openCached(ctx, id, cacheFile) } @@ -461,25 +518,22 @@ class InternxtDocumentsProvider : DocumentsProvider() { ) } - private fun materializeIntoCache( + private suspend fun materializeIntoCache( ctx: Context, id: String, api: InternxtApiClient, mnemonic: String, file: FileMetadata, cacheFile: File, - signal: CancellationSignal?, ) { val (tempEnc, tempDec) = DocumentCache.tempPaths(ctx, id) try { - signal?.throwIfCanceled() val links = api.getDownloadLinks(file.bucket, file.fileId) - EncryptedFileDownloader.download(HttpClients.download, links.shards, tempEnc, signal) + EncryptedFileDownloader.download(HttpClients.download, links.shards, tempEnc) - signal?.throwIfCanceled() val key = FileKeyDeriver.deriveFileKey(mnemonic, file.bucket, links.index) val iv = FileKeyDeriver.deriveIv(links.index) - decryptBlocking(tempEnc, tempDec, key.toHex(), iv.toHex()) + decryptFile(tempEnc, tempDec, key.toHex(), iv.toHex()) if (!tempDec.renameTo(cacheFile)) { throw FileNotFoundException("Failed to promote temp file to cache for $id") @@ -491,8 +545,7 @@ class InternxtDocumentsProvider : DocumentsProvider() { tempDec.delete() throw when (e) { is FileNotFoundException -> e - is OperationCanceledException -> - FileNotFoundException("openDocument $id cancelled").apply { initCause(e) } + is CancellationException -> e else -> FileNotFoundException("openDocument $id failed: ${e.message}").apply { initCause(e) } } @@ -619,9 +672,10 @@ class InternxtDocumentsProvider : DocumentsProvider() { val pipe = ParcelFileDescriptor.createReliablePipe() val readEnd = pipe[0] val writeEnd = pipe[1] - openExecutor.execute { - runUpload(ctx, token, pending, cfg, readEnd, signal) + val job: Job = uploadScope.launch { + runUpload(ctx, token, pending, cfg, readEnd) } + signal?.setOnCancelListener { job.cancel() } return writeEnd } catch (t: Throwable) { pendingUploads.remove(token) @@ -629,17 +683,17 @@ class InternxtDocumentsProvider : DocumentsProvider() { } } - private fun runUpload( + private suspend fun runUpload( ctx: Context, token: String, pending: PendingUpload, cfg: AuthConfig, readEnd: ParcelFileDescriptor, - signal: CancellationSignal?, ) { val temps = uploadTempsFor(ctx, token) + val uploadJob = coroutineContext[Job] val uploadSignal = CancellationSignal() - signal?.setOnCancelListener { uploadSignal.cancel() } + uploadSignal.setOnCancelListener { uploadJob?.cancel() } UploadForegroundService.start(ctx, token, pending.plainName, uploadSignal) var failure: Throwable? = null @@ -648,11 +702,11 @@ class InternxtDocumentsProvider : DocumentsProvider() { val bucketId = resolveBucket(api, pending.parentUuid) val crypto = prepareEncryption(cfg.mnemonic, bucketId) val encrypted = encryptInputToTemp(token, temps, readEnd, crypto, uploadSignal) - val outcome = uploadEncryptedFile(token, api, temps.enc, bucketId, encrypted, uploadSignal) + val outcome = uploadEncryptedFile(token, api, temps.enc, bucketId, encrypted) finalizeAndRecordFile(api, pending, bucketId, crypto.indexHex, encrypted.size, outcome) invalidateChildren(DocumentId.encodeFolder(pending.parentUuid)) } catch (t: Throwable) { - if (t !is OperationCanceledException) { + if (!isCancellation(t)) { Log.w(TAG, "upload failed token=$token: ${t.javaClass.simpleName}: ${t.message}") } failure = t @@ -665,6 +719,9 @@ class InternxtDocumentsProvider : DocumentsProvider() { } } + private fun isCancellation(t: Throwable): Boolean = + t is OperationCanceledException || t is CancellationException + /** `tempEnc` holds the ciphertext PUT to bridge; `plain` holds the pipe's plaintext * while we hand it to CryptoService (which only accepts file paths). */ private data class UploadTemps(val plain: File, val enc: File) @@ -690,7 +747,7 @@ class InternxtDocumentsProvider : DocumentsProvider() { ) } - private fun encryptInputToTemp( + private suspend fun encryptInputToTemp( token: String, temps: UploadTemps, readEnd: ParcelFileDescriptor, @@ -739,13 +796,12 @@ class InternxtDocumentsProvider : DocumentsProvider() { } } - private fun uploadEncryptedFile( + private suspend fun uploadEncryptedFile( token: String, api: InternxtApiClient, tempEnc: File, bucketId: String, encrypted: EncryptedFileUploader.Encrypted, - signal: CancellationSignal, ): UploadOutcome { val partsCount = if (encrypted.size >= MULTIPART_THRESHOLD) { ((encrypted.size + MULTIPART_PART_SIZE - 1) / MULTIPART_PART_SIZE).toInt().coerceAtLeast(1) @@ -763,14 +819,14 @@ class InternxtDocumentsProvider : DocumentsProvider() { return when (slot) { is UploadSlot.Single -> { EncryptedFileUploader.uploadSingle( - HttpClients.upload, tempEnc, slot.url, signal, onProgress, + HttpClients.upload, tempEnc, slot.url, onProgress, ) UploadOutcome.Single(slot.uuid, listOf(encrypted.wholeSha256Hex)) } is UploadSlot.Multipart -> { val partHashes = EncryptedFileUploader.computePartSha256(tempEnc, MULTIPART_PART_SIZE) val parts = EncryptedFileUploader.uploadMultipart( - HttpClients.upload, tempEnc, slot.urls, MULTIPART_PART_SIZE, signal, onProgress, + HttpClients.upload, tempEnc, slot.urls, MULTIPART_PART_SIZE, onProgress, ) UploadOutcome.Multipart(slot.uuid, partHashes, parts, slot.uploadId) } @@ -834,7 +890,7 @@ class InternxtDocumentsProvider : DocumentsProvider() { private fun notifyServiceOfOutcome(ctx: Context, token: String, failure: Throwable?) { when { failure == null -> UploadForegroundService.complete(token) - failure is OperationCanceledException -> UploadForegroundService.complete(token) + isCancellation(failure) -> UploadForegroundService.complete(token) else -> UploadForegroundService.fail(token, friendlyUploadError(ctx, failure)) } } @@ -902,7 +958,7 @@ class InternxtDocumentsProvider : DocumentsProvider() { Document.COLUMN_SIZE to null, ) - private fun decryptBlocking(src: File, dst: File, hexKey: String, hexIv: String) { + private suspend fun decryptFile(src: File, dst: File, hexKey: String, hexIv: String) { awaitCryptoService("Decryption failed") { cb -> CryptoService.getInstance().decryptFile( src.absolutePath, diff --git a/android/app/src/main/java/com/internxt/cloud/documents/crypto/CryptoServiceAwait.kt b/android/app/src/main/java/com/internxt/cloud/documents/crypto/CryptoServiceAwait.kt index 5b040f1ab..f29fd3b26 100644 --- a/android/app/src/main/java/com/internxt/cloud/documents/crypto/CryptoServiceAwait.kt +++ b/android/app/src/main/java/com/internxt/cloud/documents/crypto/CryptoServiceAwait.kt @@ -2,25 +2,25 @@ package com.internxt.cloud.documents.crypto import com.rncrypto.util.OnlyErrorCallback import java.io.IOException -import java.util.concurrent.CompletableFuture -import java.util.concurrent.ExecutionException +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException +import kotlinx.coroutines.suspendCancellableCoroutine /** - * Bridges `CryptoService.encryptFile` / `decryptFile` (callback-based) to a blocking call. - * Pass `runInBackground = false` to those methods — the caller is already on a background - * executor thread, so there's no point bouncing onto the rn-crypto thread pool just to - * block on it. Any [Throwable] surfaced by the callback is rethrown as [IOException] - * preserving the original cause. + * Suspends until `CryptoService.encryptFile` / `decryptFile` (callback-based) reports an + * outcome via [OnlyErrorCallback]. No thread is blocked waiting for the callback: the rn-crypto + * callback resumes the coroutine. Any non-null error is surfaced as [IOException] preserving the + * original cause. */ -internal inline fun awaitCryptoService( +internal suspend inline fun awaitCryptoService( failureMessage: String, - invoke: (OnlyErrorCallback) -> Unit, -) { - val done = CompletableFuture() - invoke { err -> if (err != null) done.completeExceptionally(err) else done.complete(Unit) } - try { - done.get() - } catch (e: ExecutionException) { - throw (e.cause as? IOException) ?: IOException(failureMessage, e.cause) + crossinline invoke: (OnlyErrorCallback) -> Unit, +) = suspendCancellableCoroutine { continuation -> + invoke { err -> + if (err != null) { + continuation.resumeWithException((err as? IOException) ?: IOException(failureMessage, err)) + } else { + continuation.resume(Unit) + } } } diff --git a/android/app/src/main/java/com/internxt/cloud/documents/download/EncryptedFileDownloader.kt b/android/app/src/main/java/com/internxt/cloud/documents/download/EncryptedFileDownloader.kt index 281647ea9..7a1334720 100644 --- a/android/app/src/main/java/com/internxt/cloud/documents/download/EncryptedFileDownloader.kt +++ b/android/app/src/main/java/com/internxt/cloud/documents/download/EncryptedFileDownloader.kt @@ -1,37 +1,31 @@ package com.internxt.cloud.documents.download -import android.os.CancellationSignal -import android.os.OperationCanceledException import com.internxt.cloud.documents.api.model.Shard -import okhttp3.Call +import com.internxt.cloud.documents.http.await import okhttp3.OkHttpClient import okhttp3.Request import java.io.File import java.io.FileOutputStream import java.io.IOException -import java.util.concurrent.atomic.AtomicReference +import kotlinx.coroutines.ensureActive +import kotlin.coroutines.coroutineContext object EncryptedFileDownloader { private const val COPY_BUFFER_SIZE = 16 * 1024 - @Throws(IOException::class, OperationCanceledException::class) - fun download( + suspend fun download( client: OkHttpClient, shards: List, target: File, - signal: CancellationSignal? ) { require(shards.isNotEmpty()) { "No shards to download" } prepareTarget(target) - val activeCall = AtomicReference(null) - signal?.setOnCancelListener { activeCall.get()?.cancel() } - FileOutputStream(target, /* append = */ true).use { out -> shards.sortedBy { it.index }.forEach { shard -> - signal?.throwIfCanceled() - downloadShard(client, shard, out, activeCall, signal) + coroutineContext.ensureActive() + downloadShard(client, shard, out) } out.flush() } @@ -44,27 +38,13 @@ object EncryptedFileDownloader { } } - private fun downloadShard( + private suspend fun downloadShard( client: OkHttpClient, shard: Shard, out: FileOutputStream, - activeCall: AtomicReference, - signal: CancellationSignal? ) { val request = Request.Builder().url(shard.url).get().build() - val call = client.newCall(request) - activeCall.set(call) - - try { - call.execute().use { response -> writeShardResponse(shard, response, out) } - } catch (e: IOException) { - if (signal != null && signal.isCanceled) { - throw OperationCanceledException("Download cancelled").apply { initCause(e) } - } - throw e - } finally { - activeCall.set(null) - } + client.newCall(request).await().use { response -> writeShardResponse(shard, response, out) } } private fun writeShardResponse(shard: Shard, response: okhttp3.Response, out: FileOutputStream) { diff --git a/android/app/src/main/java/com/internxt/cloud/documents/http/CallAwait.kt b/android/app/src/main/java/com/internxt/cloud/documents/http/CallAwait.kt new file mode 100644 index 000000000..89007652c --- /dev/null +++ b/android/app/src/main/java/com/internxt/cloud/documents/http/CallAwait.kt @@ -0,0 +1,28 @@ +package com.internxt.cloud.documents.http + +import okhttp3.Call +import okhttp3.Callback +import okhttp3.Response +import java.io.IOException +import kotlin.coroutines.resumeWithException +import kotlinx.coroutines.suspendCancellableCoroutine + +suspend fun Call.await(): Response = suspendCancellableCoroutine { continuation -> + enqueue(object : Callback { + override fun onResponse(call: Call, response: Response) { + continuation.resume(response) { response.closeQuietly() } + } + + override fun onFailure(call: Call, e: IOException) { + continuation.resumeWithException(e) + } + }) + continuation.invokeOnCancellation { cancel() } +} + +private fun Response.closeQuietly() { + try { + close() + } catch (_: Throwable) { + } +} diff --git a/android/app/src/main/java/com/internxt/cloud/documents/upload/EncryptedFileUploader.kt b/android/app/src/main/java/com/internxt/cloud/documents/upload/EncryptedFileUploader.kt index 8a69a4b1a..7d75e4be5 100644 --- a/android/app/src/main/java/com/internxt/cloud/documents/upload/EncryptedFileUploader.kt +++ b/android/app/src/main/java/com/internxt/cloud/documents/upload/EncryptedFileUploader.kt @@ -1,13 +1,11 @@ package com.internxt.cloud.documents.upload -import android.os.CancellationSignal -import android.os.OperationCanceledException import com.internxt.cloud.documents.api.model.UploadedPart import com.internxt.cloud.documents.crypto.Ripemd160 import com.internxt.cloud.documents.crypto.awaitCryptoService import com.internxt.cloud.documents.crypto.toHex +import com.internxt.cloud.documents.http.await import com.rncrypto.util.CryptoService -import okhttp3.Call import okhttp3.MediaType.Companion.toMediaType import okhttp3.OkHttpClient import okhttp3.Request @@ -19,7 +17,8 @@ import java.io.IOException import java.io.InputStream import java.io.RandomAccessFile import java.security.MessageDigest -import java.util.concurrent.atomic.AtomicReference +import kotlinx.coroutines.ensureActive +import kotlin.coroutines.coroutineContext object EncryptedFileUploader { @@ -37,7 +36,7 @@ object EncryptedFileUploader { * package does not return a digest, so SHA-256 over the ciphertext is computed here * by streaming the produced file back through MessageDigest. */ - fun encryptFile(plain: File, tempEnc: File, key: ByteArray, iv: ByteArray): Encrypted { + suspend fun encryptFile(plain: File, tempEnc: File, key: ByteArray, iv: ByteArray): Encrypted { prepareTarget(tempEnc) awaitCryptoService("Encryption failed") { cb -> CryptoService.getInstance().encryptFile( @@ -91,42 +90,36 @@ object EncryptedFileUploader { return hashes } - fun uploadSingle( + suspend fun uploadSingle( client: OkHttpClient, tempEnc: File, url: String, - signal: CancellationSignal?, onProgress: ((Long) -> Unit)? = null, ) { - val active = AtomicReference(null) - signal?.setOnCancelListener { active.get()?.cancel() } val body = fileRangeBody(tempEnc, 0L, tempEnc.length(), onProgress, baseSent = 0L) val request = Request.Builder().url(url).put(body).build() - runCall(client, request, signal, active) { /* ETag not needed for single */ } + runCall(client, request) { /* ETag not needed for single */ } } - fun uploadMultipart( + suspend fun uploadMultipart( client: OkHttpClient, tempEnc: File, urls: List, partSize: Long, - signal: CancellationSignal?, onProgress: ((Long) -> Unit)? = null, ): List { require(urls.isNotEmpty()) { "urls cannot be empty" } val total = tempEnc.length() - val active = AtomicReference(null) - signal?.setOnCancelListener { active.get()?.cancel() } val parts = ArrayList(urls.size) var offset = 0L urls.forEachIndexed { index, url -> - signal?.throwIfCanceled() + coroutineContext.ensureActive() val length = (total - offset).coerceAtMost(partSize) require(length > 0) { "Computed empty part for index $index" } val body = fileRangeBody(tempEnc, offset, length, onProgress, baseSent = offset) val request = Request.Builder().url(url).put(body).build() - val etag = runCall(client, request, signal, active) { response -> + val etag = runCall(client, request) { response -> response.header("ETag") ?: response.header("Etag") ?: throw IOException("Part ${index + 1} missing ETag in response") } @@ -146,30 +139,15 @@ object EncryptedFileUploader { return Ripemd160.digest(hexDecode(concatenated)).toHex() } - private inline fun runCall( + private suspend fun runCall( client: OkHttpClient, request: Request, - signal: CancellationSignal?, - active: AtomicReference, onResponse: (okhttp3.Response) -> T, - ): T { - val call = client.newCall(request) - active.set(call) - try { - return call.execute().use { response -> - if (!response.isSuccessful) { - throw IOException("PUT failed HTTP ${response.code}") - } - onResponse(response) - } - } catch (e: IOException) { - if (signal != null && signal.isCanceled) { - throw OperationCanceledException("Upload cancelled").apply { initCause(e) } - } - throw e - } finally { - active.set(null) + ): T = client.newCall(request).await().use { response -> + if (!response.isSuccessful) { + throw IOException("PUT failed HTTP ${response.code}") } + onResponse(response) } private fun fileRangeBody( diff --git a/android/app/src/test/java/com/internxt/cloud/documents/BlockingIoTest.kt b/android/app/src/test/java/com/internxt/cloud/documents/BlockingIoTest.kt new file mode 100644 index 000000000..7fac8c704 --- /dev/null +++ b/android/app/src/test/java/com/internxt/cloud/documents/BlockingIoTest.kt @@ -0,0 +1,33 @@ +package com.internxt.cloud.documents + +import org.junit.Assert.assertEquals +import org.junit.Assert.assertNotEquals +import org.junit.Assert.assertTrue +import org.junit.Test + +class BlockingIoTest { + + @Test + fun `when bridging from the caller thread, then the body runs on a different IO thread`() { + val callerThread = Thread.currentThread() + + val bodyThread = runBlockingIo { Thread.currentThread() } + + assertNotEquals( + "body must not run on the caller (binder) thread, otherwise blocking network trips StrictMode", + callerThread, + bodyThread, + ) + assertTrue( + "expected an IO dispatcher thread but ran on ${bodyThread.name}", + bodyThread.name.contains("DefaultDispatcher") || bodyThread.name.contains("IO"), + ) + } + + @Test + fun `when the body returns a value, then it is propagated to the caller`() { + val result = runBlockingIo { 42 } + + assertEquals(42, result) + } +} diff --git a/android/app/src/test/java/com/internxt/cloud/documents/crypto/CryptoServiceAwaitTest.kt b/android/app/src/test/java/com/internxt/cloud/documents/crypto/CryptoServiceAwaitTest.kt new file mode 100644 index 000000000..3c9d9ea3b --- /dev/null +++ b/android/app/src/test/java/com/internxt/cloud/documents/crypto/CryptoServiceAwaitTest.kt @@ -0,0 +1,50 @@ +package com.internxt.cloud.documents.crypto + +import kotlinx.coroutines.test.runTest +import org.junit.Assert.assertEquals +import org.junit.Assert.assertTrue +import org.junit.Assert.fail +import org.junit.Test +import java.io.IOException + +class CryptoServiceAwaitTest { + + @Test + fun `when the callback reports no error, then it resumes without throwing`() = runTest { + awaitCryptoService("should not surface") { cb -> cb.onComplete(null) } + } + + @Test + fun `when the callback reports an IOException, then it surfaces it as an IOException`() = runTest { + val original = IOException("boom") + + val thrown = runCatching { + awaitCryptoService("crypto failed") { cb -> cb.onComplete(original) } + }.exceptionOrNull() + + val io = thrown as? IOException ?: return@runTest fail("expected IOException but got $thrown") + assertEquals("boom", io.message) + } + + @Test + fun `when the callback reports a non-IO error, then it wraps it in an IOException keeping the cause`() = runTest { + val original = IllegalStateException("nope") + + val thrown = runCatching { + awaitCryptoService("crypto failed") { cb -> cb.onComplete(original) } + }.exceptionOrNull() + + val io = thrown as? IOException ?: return@runTest fail("expected IOException but got $thrown") + assertEquals("crypto failed", io.message) + assertTrue("original cause not preserved in chain", io.hasCauseWithMessage("nope")) + } + + private fun Throwable.hasCauseWithMessage(message: String): Boolean { + var current: Throwable? = cause + while (current != null) { + if (current.message == message) return true + current = current.cause + } + return false + } +} diff --git a/android/app/src/test/java/com/internxt/cloud/documents/download/EncryptedFileDownloaderTest.kt b/android/app/src/test/java/com/internxt/cloud/documents/download/EncryptedFileDownloaderTest.kt new file mode 100644 index 000000000..be34ab61b --- /dev/null +++ b/android/app/src/test/java/com/internxt/cloud/documents/download/EncryptedFileDownloaderTest.kt @@ -0,0 +1,92 @@ +package com.internxt.cloud.documents.download + +import com.internxt.cloud.documents.api.model.Shard +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.test.runTest +import okhttp3.OkHttpClient +import okhttp3.mockwebserver.Dispatcher +import okhttp3.mockwebserver.MockResponse +import okhttp3.mockwebserver.MockWebServer +import okhttp3.mockwebserver.RecordedRequest +import org.junit.After +import org.junit.Assert.assertArrayEquals +import org.junit.Assert.assertTrue +import org.junit.Before +import org.junit.Test +import java.io.File +import java.nio.file.Files +import kotlin.coroutines.cancellation.CancellationException + +class EncryptedFileDownloaderTest { + + private lateinit var server: MockWebServer + private lateinit var client: OkHttpClient + private lateinit var tempDir: File + + @Before + fun setUp() { + server = MockWebServer().apply { start() } + client = OkHttpClient() + tempDir = Files.createTempDirectory("downloader-test").toFile() + } + + @After + fun tearDown() { + server.shutdown() + tempDir.deleteRecursively() + } + + private fun shard(index: Int, path: String, size: Int): Shard = + Shard(index = index, size = size.toLong(), hash = "h$index", url = server.url(path).toString()) + + @Test + fun `when shards are served out of order, then bytes are written in index order`() = runTest { + val part0 = ByteArray(2_000) { it.toByte() } + val part1 = ByteArray(1_500) { (it + 7).toByte() } + server.enqueue(MockResponse().setResponseCode(200).setBody(okio.Buffer().write(part0))) + server.enqueue(MockResponse().setResponseCode(200).setBody(okio.Buffer().write(part1))) + val target = File(tempDir, "out.bin") + + val shards = listOf(shard(1, "/p1", part1.size), shard(0, "/p0", part0.size)) + EncryptedFileDownloader.download(client, shards, target) + + assertArrayEquals(part0 + part1, target.readBytes()) + } + + @Test + fun `when the coroutine is cancelled mid transfer, then it raises cancellation not a network error`() { + val requestReceived = CompletableDeferred() + server.dispatcher = object : Dispatcher() { + override fun dispatch(request: RecordedRequest): MockResponse { + requestReceived.complete(Unit) + Thread.sleep(2_000) + return MockResponse().setResponseCode(200).setBody("late") + } + } + val target = File(tempDir, "cancel.bin") + val shards = listOf(shard(0, "/slow", 4)) + val thrown = CompletableDeferred() + + runBlocking { + val job = launch(Dispatchers.IO) { + try { + EncryptedFileDownloader.download(client, shards, target) + } catch (t: Throwable) { + thrown.complete(t) + throw t + } + } + requestReceived.await() + job.cancel() + job.join() + } + + assertTrue( + "expected cancellation but got ${thrown.getCompleted().javaClass.name}", + thrown.getCompleted() is CancellationException, + ) + } +} diff --git a/android/app/src/test/java/com/internxt/cloud/documents/upload/EncryptedFileUploaderTest.kt b/android/app/src/test/java/com/internxt/cloud/documents/upload/EncryptedFileUploaderTest.kt index 0f1c155b0..2fcbee3b5 100644 --- a/android/app/src/test/java/com/internxt/cloud/documents/upload/EncryptedFileUploaderTest.kt +++ b/android/app/src/test/java/com/internxt/cloud/documents/upload/EncryptedFileUploaderTest.kt @@ -2,13 +2,20 @@ package com.internxt.cloud.documents.upload import com.internxt.cloud.documents.crypto.Ripemd160 import com.internxt.cloud.documents.crypto.toHex +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.test.runTest import okhttp3.OkHttpClient +import okhttp3.mockwebserver.Dispatcher import okhttp3.mockwebserver.MockResponse import okhttp3.mockwebserver.MockWebServer import okhttp3.mockwebserver.RecordedRequest import org.junit.After import org.junit.Assert.assertArrayEquals import org.junit.Assert.assertEquals +import org.junit.Assert.assertTrue import org.junit.Before import org.junit.Test import java.io.File @@ -17,6 +24,7 @@ import java.security.MessageDigest import javax.crypto.Cipher import javax.crypto.spec.IvParameterSpec import javax.crypto.spec.SecretKeySpec +import kotlin.coroutines.cancellation.CancellationException /** * Encryption itself is exercised by the `@internxt/rn-crypto` package's own test suite @@ -60,7 +68,7 @@ class EncryptedFileUploaderTest { File(tempDir, name).apply { writeBytes(controlEncrypt(plain)) } @Test - fun uploadSinglePutsTempFileContentsAndReportsCorrectHash() { + fun uploadSinglePutsTempFileContentsAndReportsCorrectHash() = runTest { val plain = ByteArray(8_000) { it.toByte() } val tempEnc = writeEncrypted("single.enc", plain) val encBytes = tempEnc.readBytes() @@ -68,7 +76,7 @@ class EncryptedFileUploaderTest { server.enqueue(MockResponse().setResponseCode(200)) val url = server.url("/upload").toString() - EncryptedFileUploader.uploadSingle(client, tempEnc, url, signal = null) + EncryptedFileUploader.uploadSingle(client, tempEnc, url) val recorded = server.takeRequest() assertEquals("PUT", recorded.method) @@ -81,7 +89,7 @@ class EncryptedFileUploaderTest { } @Test - fun uploadMultipartCollectsEtagsAndPartHashesMatchSlices() { + fun uploadMultipartCollectsEtagsAndPartHashesMatchSlices() = runTest { val partSize = 4_000L val plain = ByteArray(13_000) { it.toByte() } val tempEnc = writeEncrypted("multi.enc", plain) @@ -97,7 +105,6 @@ class EncryptedFileUploaderTest { tempEnc = tempEnc, urls = urls, partSize = partSize, - signal = null, ) assertEquals(listOf(1, 2, 3, 4), parts.map { it.partNumber }) @@ -131,6 +138,40 @@ class EncryptedFileUploaderTest { assertEquals(expectedHash, computed) } + @Test + fun `when the upload coroutine is cancelled mid PUT, then it raises cancellation not a failure`() { + val requestReceived = CompletableDeferred() + server.dispatcher = object : Dispatcher() { + override fun dispatch(request: RecordedRequest): MockResponse { + requestReceived.complete(Unit) + Thread.sleep(2_000) + return MockResponse().setResponseCode(200) + } + } + val tempEnc = writeEncrypted("cancel.enc", ByteArray(8_000) { it.toByte() }) + val url = server.url("/slow").toString() + val thrown = CompletableDeferred() + + runBlocking { + val job = launch(Dispatchers.IO) { + try { + EncryptedFileUploader.uploadSingle(client, tempEnc, url) + } catch (t: Throwable) { + thrown.complete(t) + throw t + } + } + requestReceived.await() + job.cancel() + job.join() + } + + assertTrue( + "expected cancellation but got ${thrown.getCompleted().javaClass.name}", + thrown.getCompleted() is CancellationException, + ) + } + private fun hexDecode(hex: String): ByteArray { val out = ByteArray(hex.length / 2) for (i in out.indices) { From fff0e9cdc829a4a6806b39b3876aff771e48fac4 Mon Sep 17 00:00:00 2001 From: Francis Terrero Date: Sat, 6 Jun 2026 18:10:35 -0400 Subject: [PATCH 2/3] refactor(test): replace hardcoded error message with constant in CryptoServiceAwaitTest --- .../cloud/documents/crypto/CryptoServiceAwaitTest.kt | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/android/app/src/test/java/com/internxt/cloud/documents/crypto/CryptoServiceAwaitTest.kt b/android/app/src/test/java/com/internxt/cloud/documents/crypto/CryptoServiceAwaitTest.kt index 3c9d9ea3b..95038f5c9 100644 --- a/android/app/src/test/java/com/internxt/cloud/documents/crypto/CryptoServiceAwaitTest.kt +++ b/android/app/src/test/java/com/internxt/cloud/documents/crypto/CryptoServiceAwaitTest.kt @@ -19,7 +19,7 @@ class CryptoServiceAwaitTest { val original = IOException("boom") val thrown = runCatching { - awaitCryptoService("crypto failed") { cb -> cb.onComplete(original) } + awaitCryptoService(CRYPTO_FAILED_MESSAGE) { cb -> cb.onComplete(original) } }.exceptionOrNull() val io = thrown as? IOException ?: return@runTest fail("expected IOException but got $thrown") @@ -31,11 +31,11 @@ class CryptoServiceAwaitTest { val original = IllegalStateException("nope") val thrown = runCatching { - awaitCryptoService("crypto failed") { cb -> cb.onComplete(original) } + awaitCryptoService(CRYPTO_FAILED_MESSAGE) { cb -> cb.onComplete(original) } }.exceptionOrNull() val io = thrown as? IOException ?: return@runTest fail("expected IOException but got $thrown") - assertEquals("crypto failed", io.message) + assertEquals(CRYPTO_FAILED_MESSAGE, io.message) assertTrue("original cause not preserved in chain", io.hasCauseWithMessage("nope")) } @@ -47,4 +47,8 @@ class CryptoServiceAwaitTest { } return false } + + companion object { + private const val CRYPTO_FAILED_MESSAGE = "crypto failed" + } } From fabfeba8d74b133d65f0055b5315f55274965f76 Mon Sep 17 00:00:00 2001 From: Francis Terrero Date: Thu, 11 Jun 2026 23:25:45 -0400 Subject: [PATCH 3/3] fix: improve exception handling in openDocument method --- .../internxt/cloud/documents/InternxtDocumentsProvider.kt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/android/app/src/main/java/com/internxt/cloud/documents/InternxtDocumentsProvider.kt b/android/app/src/main/java/com/internxt/cloud/documents/InternxtDocumentsProvider.kt index 8cad697ce..305099177 100644 --- a/android/app/src/main/java/com/internxt/cloud/documents/InternxtDocumentsProvider.kt +++ b/android/app/src/main/java/com/internxt/cloud/documents/InternxtDocumentsProvider.kt @@ -543,9 +543,9 @@ class InternxtDocumentsProvider : DocumentsProvider() { } catch (e: Exception) { tempEnc.delete() tempDec.delete() - throw when (e) { - is FileNotFoundException -> e - is CancellationException -> e + throw when { + e is FileNotFoundException -> e + isCancellation(e) -> e else -> FileNotFoundException("openDocument $id failed: ${e.message}").apply { initCause(e) } }