@@ -258,28 +258,105 @@ public Flux<DataBuffer> openUFileSteam(String pickCode) {
258258 Assert .hasText (pickCode , "'pickCode' must has text." );
259259 final String uFileDownUrl = openUFileDownUrl (pickCode );
260260 return streamFileWithRestTemplate (uFileDownUrl );
261- // return Flux.create(sink -> {
262- // Schedulers.boundedElastic().schedule(() -> {
263- // try {
264- // ResponseEntity<byte[]> response = restTemplate.getForEntity(
265- // uFileDownUrl, byte[].class);
266- // if (response.getStatusCode() == HttpStatus.OK &&
267- // response.getBody() != null) {
268- // // 一次性发送所有数据(适合小文件)
269- // DataBuffer dataBuffer = dataBufferFactory.allocateBuffer(
270- // response.getBody().length);
271- // dataBuffer.write(response.getBody());
272- // sink.next(dataBuffer);
273- // sink.complete();
274- // } else {
275- // sink.error(new RuntimeException(
276- // "请求失败,状态码: " + response.getStatusCode()));
277- // }
278- // } catch (Exception e) {
279- // sink.error(e);
280- // }
281- // });
282- // });
261+ }
262+
263+ @ Override
264+ public Flux <DataBuffer > openUFileSteamWithRange (String pickCode , long start , long end ) {
265+ Assert .hasText (pickCode , "'pickCode' must has text." );
266+ final String uFileDownUrl = openUFileDownUrl (pickCode );
267+ return streamFileWithRestTemplateWithRange (uFileDownUrl , start , end , 8192 );
268+ }
269+
270+ private Flux <DataBuffer > streamFileWithRestTemplateWithRange (String uFileDownUrl , Long start , Long end , int chunkSize ) {
271+ if (chunkSize <= 0 ) {
272+ chunkSize = 8192 ; // 默认8KB
273+ }
274+
275+ final int finalChunkSize = chunkSize ;
276+
277+ // 设置 Range 请求头
278+ String rangeHeader ;
279+ if (start != null && end != null ) {
280+ rangeHeader = String .format ("bytes=%d-%d" , start , end );
281+ } else if (start != null ) {
282+ rangeHeader = String .format ("bytes=%d-" , start );
283+ } else {
284+ rangeHeader = "bytes=0-" ; // 默认从头开始
285+ }
286+
287+ return Flux .create (sink -> {
288+ // 使用 boundedElastic 调度器执行阻塞操作
289+ Schedulers .boundedElastic ().schedule (() -> {
290+ try {
291+ String newUrl = uFileDownUrl ;
292+ if (StringUtils .isNoneBlank (newUrl )) {
293+ newUrl = URLDecoder .decode (newUrl , StandardCharsets .UTF_8 );
294+ }
295+ log .debug ("Open stream range from {} to {} for url: {}" , start , end , newUrl );
296+ restTemplate .execute (newUrl , HttpMethod .GET , new RequestCallback () {
297+ @ Override
298+ public void doWithRequest (ClientHttpRequest request ) throws IOException {
299+ request .getHeaders ().set (HttpHeaders .RANGE , rangeHeader );
300+ request .getHeaders ().addAll (headers );
301+ }
302+ },
303+ (ResponseExtractor <Void >) response -> {
304+ if (response .getStatusCode () != HttpStatus .PARTIAL_CONTENT ) {
305+ sink .error (new RuntimeException (
306+ "请求失败,状态码: " + response .getStatusCode ()));
307+ return null ;
308+ }
309+
310+ try (InputStream inputStream = response .getBody ()) {
311+ byte [] buffer = new byte [finalChunkSize ];
312+ int bytesRead ;
313+ AtomicBoolean isCanceled = new AtomicBoolean (false );
314+
315+ sink .onCancel (() -> {
316+ isCanceled .set (true );
317+ try {
318+ inputStream .close ();
319+ } catch (IOException e ) {
320+ // 忽略关闭异常
321+ }
322+ });
323+
324+ while (!isCanceled .get () &&
325+ (bytesRead = inputStream .read (buffer )) != -1 ) {
326+
327+ // 创建 DataBuffer
328+ DataBuffer dataBuffer = dataBufferFactory .allocateBuffer (bytesRead );
329+ dataBuffer .write (buffer , 0 , bytesRead );
330+
331+ // 发布数据块
332+ sink .next (dataBuffer );
333+
334+ // 检查是否取消
335+ if (sink .isCancelled ()) {
336+ log .debug ("Close stream range from {} to {} for url: {}" , start , end , uFileDownUrl );
337+ break ;
338+ }
339+ }
340+
341+ if (!isCanceled .get ()) {
342+ sink .complete ();
343+ }
344+ } catch (Exception e ) {
345+ if (!sink .isCancelled ()) {
346+ sink .error (e );
347+ }
348+ }
349+ return null ;
350+ });
351+ } catch (Exception e ) {
352+ if (!sink .isCancelled ()) {
353+ sink .error (e );
354+ }
355+ } finally {
356+ headers .remove (HttpHeaders .RANGE );
357+ }
358+ });
359+ }, FluxSink .OverflowStrategy .BUFFER );
283360 }
284361
285362
0 commit comments