From b02ce0a36435e24a3b7286429d16035f62458126 Mon Sep 17 00:00:00 2001 From: Hoikan Date: Sat, 23 May 2026 15:22:49 +0800 Subject: [PATCH] fix(request): propagate download stream errors --- .../src/request/processDownloadProgress.ts | 47 ++++-- .../request/processDownloadProgress.test.ts | 149 ++++++++++++++++++ 2 files changed, 184 insertions(+), 12 deletions(-) create mode 100644 packages/inula-request/tests/unitTest/request/processDownloadProgress.test.ts diff --git a/packages/inula-request/src/request/processDownloadProgress.ts b/packages/inula-request/src/request/processDownloadProgress.ts index 97dbe6f0c..272c44ea9 100644 --- a/packages/inula-request/src/request/processDownloadProgress.ts +++ b/packages/inula-request/src/request/processDownloadProgress.ts @@ -22,27 +22,50 @@ function processDownloadProgress( ) { // 文件下载过程中更新进度 if (onProgress) { - const reader = stream?.getReader(); + if (!stream) { + return stream; + } + + const reader = stream.getReader(); let totalBytesRead = 0; // 跟踪已读取的字节数 + let canceled = false; + return new ReadableStream({ start(controller) { function read() { - reader?.read().then(({ done, value }) => { - if (done) { - controller.close(); - return; - } - - totalBytesRead += value.byteLength; - onProgress!({ loaded: totalBytesRead, total: Number(response.headers.get('Content-Length')) }); - controller.enqueue(value); // 将读取到的数据块添加到新的 ReadableStream 中 - read(); // 递归调用,继续读取 stream 直到结束 - }); + reader + .read() + .then(({ done, value }) => { + if (canceled) { + return; + } + + if (done) { + controller.close(); + return; + } + + totalBytesRead += value.byteLength; + onProgress!({ loaded: totalBytesRead, total: Number(response.headers.get('Content-Length')) }); + controller.enqueue(value); // 将读取到的数据块添加到新的 ReadableStream 中 + read(); // 递归调用,继续读取 stream 直到结束 + }) + .catch(error => { + if (canceled) { + return; + } + + controller.error(error); + }); } read(); // 调用 read 函数以启动从原始 stream 中读取数据的过程 }, + cancel(reason) { + canceled = true; + return reader.cancel(reason); + }, }); } else { return stream; diff --git a/packages/inula-request/tests/unitTest/request/processDownloadProgress.test.ts b/packages/inula-request/tests/unitTest/request/processDownloadProgress.test.ts new file mode 100644 index 000000000..b30249caa --- /dev/null +++ b/packages/inula-request/tests/unitTest/request/processDownloadProgress.test.ts @@ -0,0 +1,149 @@ +/** + * @jest-environment node + */ + +/* + * Copyright (c) 2023 Huawei Technologies Co.,Ltd. + * + * openInula is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +import processDownloadProgress from '../../../src/request/processDownloadProgress'; +import { fetchRequest } from '../../../src/request/fetchRequest'; +import { ReadableStream as WebReadableStream } from 'stream/web'; + +describe('processDownloadProgress', () => { + let originalFetch: typeof fetch; + + beforeEach(() => { + globalThis.ReadableStream = WebReadableStream as typeof ReadableStream; + originalFetch = globalThis.fetch; + }); + + afterEach(() => { + globalThis.fetch = originalFetch; + }); + + it('should forward source stream errors to the progress stream reader', async () => { + const sourceError = new Error('source stream failed'); + const sourceStream = new ReadableStream({ + start(controller) { + controller.error(sourceError); + }, + }); + const response = { + headers: { + get: () => '1', + }, + } as unknown as Response; + const progressStream = processDownloadProgress(sourceStream, response, jest.fn())!; + const reader = progressStream.getReader(); + + const result = await Promise.race([ + reader.read().catch(error => error), + new Promise(resolve => setTimeout(() => resolve('timeout'), 20)), + ]); + + expect(result).toBe(sourceError); + }); + + it('should return null when response body stream is null', () => { + const response = { + headers: { + get: () => null, + }, + } as unknown as Response; + + expect(processDownloadProgress(null, response, jest.fn())).toBeNull(); + }); + + it('should pass chunks through and report accumulated download progress', async () => { + const onProgress = jest.fn(); + const sourceStream = new ReadableStream({ + start(controller) { + controller.enqueue(new Uint8Array([1, 2])); + controller.enqueue(new Uint8Array([3])); + controller.close(); + }, + }); + const response = { + headers: { + get: () => '3', + }, + } as unknown as Response; + const progressStream = processDownloadProgress(sourceStream, response, onProgress)!; + + const data = new Uint8Array(await new Response(progressStream).arrayBuffer()); + + expect(Array.from(data)).toEqual([1, 2, 3]); + expect(onProgress).toHaveBeenNthCalledWith(1, { loaded: 2, total: 3 }); + expect(onProgress).toHaveBeenNthCalledWith(2, { loaded: 3, total: 3 }); + }); + + it('should reject response parsing when an aborted source stream errors', async () => { + const controller = new AbortController(); + const abortError = new DOMException('request aborted', 'AbortError'); + const sourceStream = new ReadableStream({ + start(streamController) { + controller.signal.addEventListener('abort', () => { + streamController.error(abortError); + }); + }, + }); + const response = { + headers: { + get: () => null, + }, + } as unknown as Response; + const progressStream = processDownloadProgress(sourceStream, response, jest.fn())!; + const parseMethod = new Response(progressStream).text(); + + controller.abort(); + + await expect(parseMethod).rejects.toBe(abortError); + }); + + it('should reject with CancelError when fetch body aborts while reporting download progress', async () => { + const controller = new AbortController(); + + globalThis.fetch = jest.fn((_url, options: RequestInit) => { + const body = new ReadableStream({ + start(streamController) { + options.signal!.addEventListener('abort', () => { + streamController.error(new DOMException('request aborted', 'AbortError')); + }); + }, + }); + + return Promise.resolve(new Response(body, { status: 200, statusText: 'OK' })); + }) as unknown as typeof fetch; + + const request = fetchRequest({ + url: '/abort', + method: 'GET', + responseType: 'text', + signal: controller.signal, + onDownloadProgress: jest.fn(), + validateStatus: (status: number) => status >= 200 && status < 300, + }); + + // 为了让 fetchRequest 内部的 .then(response => {...}) 先跑完 + // 把 progress stream 接进 parseMethod,再 abort + await Promise.resolve(); + controller.abort(); + + await expect(request).rejects.toMatchObject({ + name: 'CanceledError', + message: 'request canceled', + }); + }); +});