@@ -224,36 +224,34 @@ impl<P: StreamProcessor> StreamingPipeline<P> {
224224 Ok ( ( ) )
225225 }
226226
227- /// Process gzip compressed input to uncompressed output (decompression only)
228- fn process_gzip_to_none < R : Read , W : Write > (
227+ /// Decompress input, process content, and write uncompressed output.
228+ fn decompress_and_process < R : Read , W : Write > (
229229 & mut self ,
230- input : R ,
230+ mut decoder : R ,
231231 mut output : W ,
232+ codec_name : & str ,
232233 ) -> Result < ( ) , Report < TrustedServerError > > {
233- use flate2:: read:: GzDecoder ;
234-
235- // Decompress input
236- let mut decoder = GzDecoder :: new ( input) ;
237234 let mut decompressed = Vec :: new ( ) ;
238235 decoder
239236 . read_to_end ( & mut decompressed)
240237 . change_context ( TrustedServerError :: Proxy {
241- message : "Failed to decompress gzip" . to_string ( ) ,
238+ message : format ! ( "Failed to decompress {codec_name}" ) ,
242239 } ) ?;
243240
244- log:: info!( "Decompressed size: {} bytes" , decompressed. len( ) ) ;
241+ log:: info!(
242+ "{codec_name} decompressed size: {} bytes" ,
243+ decompressed. len( )
244+ ) ;
245245
246- // Process the decompressed content
247246 let processed = self
248247 . processor
249248 . process_chunk ( & decompressed, true )
250249 . change_context ( TrustedServerError :: Proxy {
251250 message : "Failed to process content" . to_string ( ) ,
252251 } ) ?;
253252
254- log:: info!( "Processed size: {} bytes" , processed. len( ) ) ;
253+ log:: info!( "{codec_name} processed size: {} bytes" , processed. len( ) ) ;
255254
256- // Write uncompressed output
257255 output
258256 . write_all ( & processed)
259257 . change_context ( TrustedServerError :: Proxy {
@@ -263,6 +261,17 @@ impl<P: StreamProcessor> StreamingPipeline<P> {
263261 Ok ( ( ) )
264262 }
265263
264+ /// Process gzip compressed input to uncompressed output (decompression only)
265+ fn process_gzip_to_none < R : Read , W : Write > (
266+ & mut self ,
267+ input : R ,
268+ output : W ,
269+ ) -> Result < ( ) , Report < TrustedServerError > > {
270+ use flate2:: read:: GzDecoder ;
271+
272+ self . decompress_and_process ( GzDecoder :: new ( input) , output, "gzip" )
273+ }
274+
266275 /// Process deflate compressed stream
267276 fn process_deflate_to_deflate < R : Read , W : Write > (
268277 & mut self ,
@@ -283,42 +292,11 @@ impl<P: StreamProcessor> StreamingPipeline<P> {
283292 fn process_deflate_to_none < R : Read , W : Write > (
284293 & mut self ,
285294 input : R ,
286- mut output : W ,
295+ output : W ,
287296 ) -> Result < ( ) , Report < TrustedServerError > > {
288297 use flate2:: read:: ZlibDecoder ;
289298
290- // Decompress input
291- let mut decoder = ZlibDecoder :: new ( input) ;
292- let mut decompressed = Vec :: new ( ) ;
293- decoder
294- . read_to_end ( & mut decompressed)
295- . change_context ( TrustedServerError :: Proxy {
296- message : "Failed to decompress deflate" . to_string ( ) ,
297- } ) ?;
298-
299- log:: info!(
300- "Deflate->None decompressed size: {} bytes" ,
301- decompressed. len( )
302- ) ;
303-
304- // Process the decompressed content
305- let processed = self
306- . processor
307- . process_chunk ( & decompressed, true )
308- . change_context ( TrustedServerError :: Proxy {
309- message : "Failed to process content" . to_string ( ) ,
310- } ) ?;
311-
312- log:: info!( "Deflate->None processed size: {} bytes" , processed. len( ) ) ;
313-
314- // Write uncompressed output
315- output
316- . write_all ( & processed)
317- . change_context ( TrustedServerError :: Proxy {
318- message : "Failed to write output" . to_string ( ) ,
319- } ) ?;
320-
321- Ok ( ( ) )
299+ self . decompress_and_process ( ZlibDecoder :: new ( input) , output, "deflate" )
322300 }
323301
324302 /// Process brotli compressed stream
@@ -346,42 +324,11 @@ impl<P: StreamProcessor> StreamingPipeline<P> {
346324 fn process_brotli_to_none < R : Read , W : Write > (
347325 & mut self ,
348326 input : R ,
349- mut output : W ,
327+ output : W ,
350328 ) -> Result < ( ) , Report < TrustedServerError > > {
351329 use brotli:: Decompressor ;
352330
353- // Decompress input
354- let mut decoder = Decompressor :: new ( input, 4096 ) ;
355- let mut decompressed = Vec :: new ( ) ;
356- decoder
357- . read_to_end ( & mut decompressed)
358- . change_context ( TrustedServerError :: Proxy {
359- message : "Failed to decompress brotli" . to_string ( ) ,
360- } ) ?;
361-
362- log:: info!(
363- "Brotli->None decompressed size: {} bytes" ,
364- decompressed. len( )
365- ) ;
366-
367- // Process the decompressed content
368- let processed = self
369- . processor
370- . process_chunk ( & decompressed, true )
371- . change_context ( TrustedServerError :: Proxy {
372- message : "Failed to process content" . to_string ( ) ,
373- } ) ?;
374-
375- log:: info!( "Brotli->None processed size: {} bytes" , processed. len( ) ) ;
376-
377- // Write uncompressed output
378- output
379- . write_all ( & processed)
380- . change_context ( TrustedServerError :: Proxy {
381- message : "Failed to write output" . to_string ( ) ,
382- } ) ?;
383-
384- Ok ( ( ) )
331+ self . decompress_and_process ( Decompressor :: new ( input, 4096 ) , output, "brotli" )
385332 }
386333
387334 /// Generic processing through compression layers
0 commit comments