Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions android/app/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,10 @@ dependencies {
implementation("com.squareup.okhttp3:okhttp:5.3.2")
implementation("androidx.security:security-crypto:1.1.0")
implementation("org.bouncycastle:bcprov-jdk15to18:1.78.1")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-android:1.10.2")

testImplementation("junit:junit:4.13.2")
testImplementation("com.squareup.okhttp3:mockwebserver:5.3.2")
testImplementation("org.json:json:20240303")
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.10.2")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.internxt.cloud.documents

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.runBlocking

/**
* Bridges a binder thread to a suspending body, running it on [Dispatchers.IO].
*
* SAF entry points such as `openDocument` arrive on a binder thread governed by a StrictMode
* policy that forbids blocking network. Some collaborators (e.g. the synchronous OkHttp calls in
* InternxtApiClient) still block, so the body must not run on the caller thread or it trips
* NetworkOnMainThreadException. Plain `runBlocking { }` would do exactly that.
*/
internal fun <T> runBlockingIo(body: suspend CoroutineScope.() -> T): T =
runBlocking(Dispatchers.IO, body)
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ import java.time.Instant
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Executors
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.launch
import kotlin.coroutines.coroutineContext

class InternxtDocumentsProvider : DocumentsProvider() {

Expand All @@ -52,9 +60,7 @@ class InternxtDocumentsProvider : DocumentsProvider() {
Thread(r, "InternxtDocsProvider-loader").apply { isDaemon = true }
}

private val openExecutor = Executors.newCachedThreadPool { r ->
Thread(r, "InternxtDocsProvider-open").apply { isDaemon = true }
}
private val uploadScope = CoroutineScope(SupervisorJob() + Dispatchers.IO)

private val folderLoads = ConcurrentHashMap<String, FolderLoad>()
private val pendingUploads = ConcurrentHashMap<String, PendingUpload>()
Expand All @@ -64,6 +70,7 @@ class InternxtDocumentsProvider : DocumentsProvider() {
private class FolderLoad {
@Volatile var state: LoadState = LoadState.LOADING
@Volatile var errorMessage: String? = null
@Volatile var isRevalidating: Boolean = false
val rows = mutableListOf<Map<String, Any?>>()
}
private val itemKinds = ConcurrentHashMap<String, ItemKind>()
Expand All @@ -82,6 +89,7 @@ class InternxtDocumentsProvider : DocumentsProvider() {

override fun shutdown() {
if (activeInstance === this) activeInstance = null
uploadScope.cancel()
super.shutdown()
}

Expand Down Expand Up @@ -377,10 +385,59 @@ class InternxtDocumentsProvider : DocumentsProvider() {
val documentId = try { DocumentsContract.getDocumentId(uri) } catch (_: Exception) { null }
Log.d(TAG, "refresh uri=$uri documentId=$documentId")
if (documentId == null) return false
invalidateChildren(documentId)
revalidateChildren(documentId)
return true
}

private fun revalidateChildren(parentDocumentId: String) {
val load = folderLoads[parentDocumentId]
if (load == null) {
notifyChildren(parentDocumentId)
return
}
synchronized(load) {
if (load.state == LoadState.LOADING || load.isRevalidating) return
load.isRevalidating = true
}
val parentUuid = rawUuid(parentDocumentId)
val notifyUri = DocumentsContract.buildChildDocumentsUri(AUTHORITY, parentDocumentId)
loaderExecutor.execute { revalidate(parentUuid, load, notifyUri) }
}

private fun revalidate(parentUuid: String, load: FolderLoad, notifyUri: Uri) {
val api = apiClient(op = "refresh[bg]")
if (api == null) {
synchronized(load) { load.isRevalidating = false }
return
}
try {
val fresh = fetchAllRows(api, parentUuid)
synchronized(load) {
load.rows.clear()
load.rows.addAll(fresh)
load.state = LoadState.DONE
load.errorMessage = null
load.isRevalidating = false
}
context?.contentResolver?.notifyChange(notifyUri, null)
Log.d(TAG, "refresh parent=$parentUuid revalidated rows=${fresh.size}")
} catch (e: InternxtApiException) {
synchronized(load) { load.isRevalidating = false }
Log.w(TAG, "refresh parent=$parentUuid revalidation failed: ${e.javaClass.simpleName}: ${e.message}")
}
}

private fun fetchAllRows(api: InternxtApiClient, parentUuid: String): List<Map<String, Any?>> {
val rows = mutableListOf<Map<String, Any?>>()
streamPages({ offset, size -> api.listFolderFolders(parentUuid, offset, size) }) { page ->
rows.addAll(page.map { DocumentRowBuilder.folderRow(it) })
}
streamPages({ offset, size -> api.listFolderFiles(parentUuid, offset, size) }) { page ->
rows.addAll(page.map { DocumentRowBuilder.fileRow(it) })
}
return rows
}

override fun openDocument(
documentId: String?,
mode: String?,
Expand All @@ -402,24 +459,24 @@ class InternxtDocumentsProvider : DocumentsProvider() {
}
val fileUuid = decoded.uuid

val future = openExecutor.submit<ParcelFileDescriptor> {
openDocumentBlocking(ctx, id, signal, fileUuid)
}
return try {
future.get()
} catch (e: java.util.concurrent.ExecutionException) {
val cause = e.cause
if (cause is FileNotFoundException) throw cause
throw FileNotFoundException("openDocument failed: ${cause?.message ?: e.message}").apply {
if (cause != null) initCause(cause)
runBlockingIo {
signal?.setOnCancelListener { coroutineContext.cancel() }
openDocumentSuspending(ctx, id, fileUuid)
}
} catch (e: FileNotFoundException) {
throw e
} catch (e: CancellationException) {
throw FileNotFoundException("openDocument $id cancelled").apply { initCause(e) }
} catch (e: Exception) {
Log.w(TAG, "openDocument $id failed", e)
throw FileNotFoundException("openDocument failed: ${e.message}").apply { initCause(e) }
}
}

private fun openDocumentBlocking(
private suspend fun openDocumentSuspending(
ctx: Context,
id: String,
signal: CancellationSignal?,
fileUuid: String,
): ParcelFileDescriptor {
val cfg = authManager?.loadAuthConfig() ?: throw FileNotFoundException(NOT_AUTHENTICATED)
Expand All @@ -438,7 +495,7 @@ class InternxtDocumentsProvider : DocumentsProvider() {
if (cacheFile.exists() && cacheFile.length() > 0) {
return openCached(ctx, id, cacheFile)
}
materializeIntoCache(ctx, id, api, cfg.mnemonic, file, cacheFile, signal)
materializeIntoCache(ctx, id, api, cfg.mnemonic, file, cacheFile)
return openCached(ctx, id, cacheFile)
}

Expand All @@ -461,25 +518,22 @@ class InternxtDocumentsProvider : DocumentsProvider() {
)
}

private fun materializeIntoCache(
private suspend fun materializeIntoCache(
ctx: Context,
id: String,
api: InternxtApiClient,
mnemonic: String,
file: FileMetadata,
cacheFile: File,
signal: CancellationSignal?,
) {
val (tempEnc, tempDec) = DocumentCache.tempPaths(ctx, id)
try {
signal?.throwIfCanceled()
val links = api.getDownloadLinks(file.bucket, file.fileId)
EncryptedFileDownloader.download(HttpClients.download, links.shards, tempEnc, signal)
EncryptedFileDownloader.download(HttpClients.download, links.shards, tempEnc)

signal?.throwIfCanceled()
val key = FileKeyDeriver.deriveFileKey(mnemonic, file.bucket, links.index)
val iv = FileKeyDeriver.deriveIv(links.index)
decryptBlocking(tempEnc, tempDec, key.toHex(), iv.toHex())
decryptFile(tempEnc, tempDec, key.toHex(), iv.toHex())

if (!tempDec.renameTo(cacheFile)) {
throw FileNotFoundException("Failed to promote temp file to cache for $id")
Expand All @@ -489,10 +543,9 @@ class InternxtDocumentsProvider : DocumentsProvider() {
} catch (e: Exception) {
tempEnc.delete()
tempDec.delete()
throw when (e) {
is FileNotFoundException -> e
is OperationCanceledException ->
FileNotFoundException("openDocument $id cancelled").apply { initCause(e) }
throw when {
e is FileNotFoundException -> e
isCancellation(e) -> e
else ->
FileNotFoundException("openDocument $id failed: ${e.message}").apply { initCause(e) }
}
Expand Down Expand Up @@ -619,27 +672,28 @@ class InternxtDocumentsProvider : DocumentsProvider() {
val pipe = ParcelFileDescriptor.createReliablePipe()
val readEnd = pipe[0]
val writeEnd = pipe[1]
openExecutor.execute {
runUpload(ctx, token, pending, cfg, readEnd, signal)
val job: Job = uploadScope.launch {
runUpload(ctx, token, pending, cfg, readEnd)
}
signal?.setOnCancelListener { job.cancel() }
return writeEnd
} catch (t: Throwable) {
pendingUploads.remove(token)
throw t
}
}

private fun runUpload(
private suspend fun runUpload(
ctx: Context,
token: String,
pending: PendingUpload,
cfg: AuthConfig,
readEnd: ParcelFileDescriptor,
signal: CancellationSignal?,
) {
val temps = uploadTempsFor(ctx, token)
val uploadJob = coroutineContext[Job]
val uploadSignal = CancellationSignal()
signal?.setOnCancelListener { uploadSignal.cancel() }
uploadSignal.setOnCancelListener { uploadJob?.cancel() }
UploadForegroundService.start(ctx, token, pending.plainName, uploadSignal)

var failure: Throwable? = null
Expand All @@ -648,11 +702,11 @@ class InternxtDocumentsProvider : DocumentsProvider() {
val bucketId = resolveBucket(api, pending.parentUuid)
val crypto = prepareEncryption(cfg.mnemonic, bucketId)
val encrypted = encryptInputToTemp(token, temps, readEnd, crypto, uploadSignal)
val outcome = uploadEncryptedFile(token, api, temps.enc, bucketId, encrypted, uploadSignal)
val outcome = uploadEncryptedFile(token, api, temps.enc, bucketId, encrypted)
finalizeAndRecordFile(api, pending, bucketId, crypto.indexHex, encrypted.size, outcome)
invalidateChildren(DocumentId.encodeFolder(pending.parentUuid))
} catch (t: Throwable) {
if (t !is OperationCanceledException) {
if (!isCancellation(t)) {
Log.w(TAG, "upload failed token=$token: ${t.javaClass.simpleName}: ${t.message}")
}
failure = t
Expand All @@ -665,6 +719,9 @@ class InternxtDocumentsProvider : DocumentsProvider() {
}
}

private fun isCancellation(t: Throwable): Boolean =
t is OperationCanceledException || t is CancellationException

/** `tempEnc` holds the ciphertext PUT to bridge; `plain` holds the pipe's plaintext
* while we hand it to CryptoService (which only accepts file paths). */
private data class UploadTemps(val plain: File, val enc: File)
Expand All @@ -690,7 +747,7 @@ class InternxtDocumentsProvider : DocumentsProvider() {
)
}

private fun encryptInputToTemp(
private suspend fun encryptInputToTemp(
token: String,
temps: UploadTemps,
readEnd: ParcelFileDescriptor,
Expand Down Expand Up @@ -739,13 +796,12 @@ class InternxtDocumentsProvider : DocumentsProvider() {
}
}

private fun uploadEncryptedFile(
private suspend fun uploadEncryptedFile(
token: String,
api: InternxtApiClient,
tempEnc: File,
bucketId: String,
encrypted: EncryptedFileUploader.Encrypted,
signal: CancellationSignal,
): UploadOutcome {
val partsCount = if (encrypted.size >= MULTIPART_THRESHOLD) {
((encrypted.size + MULTIPART_PART_SIZE - 1) / MULTIPART_PART_SIZE).toInt().coerceAtLeast(1)
Expand All @@ -763,14 +819,14 @@ class InternxtDocumentsProvider : DocumentsProvider() {
return when (slot) {
is UploadSlot.Single -> {
EncryptedFileUploader.uploadSingle(
HttpClients.upload, tempEnc, slot.url, signal, onProgress,
HttpClients.upload, tempEnc, slot.url, onProgress,
)
UploadOutcome.Single(slot.uuid, listOf(encrypted.wholeSha256Hex))
}
is UploadSlot.Multipart -> {
val partHashes = EncryptedFileUploader.computePartSha256(tempEnc, MULTIPART_PART_SIZE)
val parts = EncryptedFileUploader.uploadMultipart(
HttpClients.upload, tempEnc, slot.urls, MULTIPART_PART_SIZE, signal, onProgress,
HttpClients.upload, tempEnc, slot.urls, MULTIPART_PART_SIZE, onProgress,
)
UploadOutcome.Multipart(slot.uuid, partHashes, parts, slot.uploadId)
}
Expand Down Expand Up @@ -834,7 +890,7 @@ class InternxtDocumentsProvider : DocumentsProvider() {
private fun notifyServiceOfOutcome(ctx: Context, token: String, failure: Throwable?) {
when {
failure == null -> UploadForegroundService.complete(token)
failure is OperationCanceledException -> UploadForegroundService.complete(token)
isCancellation(failure) -> UploadForegroundService.complete(token)
else -> UploadForegroundService.fail(token, friendlyUploadError(ctx, failure))
}
}
Expand Down Expand Up @@ -902,7 +958,7 @@ class InternxtDocumentsProvider : DocumentsProvider() {
Document.COLUMN_SIZE to null,
)

private fun decryptBlocking(src: File, dst: File, hexKey: String, hexIv: String) {
private suspend fun decryptFile(src: File, dst: File, hexKey: String, hexIv: String) {
awaitCryptoService("Decryption failed") { cb ->
CryptoService.getInstance().decryptFile(
src.absolutePath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,25 @@ package com.internxt.cloud.documents.crypto

import com.rncrypto.util.OnlyErrorCallback
import java.io.IOException
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ExecutionException
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlinx.coroutines.suspendCancellableCoroutine

/**
* Bridges `CryptoService.encryptFile` / `decryptFile` (callback-based) to a blocking call.
* Pass `runInBackground = false` to those methods — the caller is already on a background
* executor thread, so there's no point bouncing onto the rn-crypto thread pool just to
* block on it. Any [Throwable] surfaced by the callback is rethrown as [IOException]
* preserving the original cause.
* Suspends until `CryptoService.encryptFile` / `decryptFile` (callback-based) reports an
* outcome via [OnlyErrorCallback]. No thread is blocked waiting for the callback: the rn-crypto
* callback resumes the coroutine. Any non-null error is surfaced as [IOException] preserving the
* original cause.
*/
internal inline fun awaitCryptoService(
internal suspend inline fun awaitCryptoService(
failureMessage: String,
invoke: (OnlyErrorCallback) -> Unit,
) {
val done = CompletableFuture<Unit>()
invoke { err -> if (err != null) done.completeExceptionally(err) else done.complete(Unit) }
try {
done.get()
} catch (e: ExecutionException) {
throw (e.cause as? IOException) ?: IOException(failureMessage, e.cause)
crossinline invoke: (OnlyErrorCallback) -> Unit,
) = suspendCancellableCoroutine { continuation ->
invoke { err ->
if (err != null) {
continuation.resumeWithException((err as? IOException) ?: IOException(failureMessage, err))
} else {
continuation.resume(Unit)
}
}
}
Loading
Loading