@@ -157,6 +157,7 @@ protected override void Dispose (bool disposing)
157157 {
158158 segment . Dispose ( ) ;
159159 }
160+
160161 _lineQueue . Dispose ( ) ;
161162 }
162163
@@ -200,6 +201,13 @@ private void RestartPipeline (long newPosition)
200201 }
201202 }
202203
204+ /// <summary>
205+ /// Cancels the current pipeline operation and releases associated resources. This method should be called while
206+ /// holding the appropriate lock to ensure thread safety.
207+ /// </summary>
208+ /// <remarks>This method cancels any ongoing producer task, marks the internal queue as complete to
209+ /// unblock waiting consumers, and disposes of pipeline resources. It is intended for internal use and must be
210+ /// invoked only when the pipeline is in a valid state for cancellation.</remarks>
203211 private void CancelPipelineLocked ( )
204212 {
205213 if ( _cts == null )
@@ -226,7 +234,7 @@ private void CancelPipelineLocked ()
226234 }
227235 finally
228236 {
229- _cts ? . Dispose ( ) ;
237+ _cts . Dispose ( ) ;
230238 _cts = null ;
231239 }
232240
@@ -444,15 +452,12 @@ private void EnqueueLine (LineSegment segment)
444452 }
445453
446454 // If still no space, force process current content as truncated line
447- if ( charsInBuffer >= _charBufferSize - 100 )
455+ if ( charsInBuffer >= _charBufferSize - 100 && charsInBuffer > 0 )
448456 {
449- if ( charsInBuffer > 0 )
450- {
451- var segment = CreateSegment ( charBuffer , 0 , charsInBuffer , 0 , byteOffset ) ;
452- byteOffset += segment . ByteLength ;
453- EnqueueLine ( segment ) ;
454- charsInBuffer = 0 ;
455- }
457+ var segment = CreateSegment ( charBuffer , 0 , charsInBuffer , 0 , byteOffset ) ;
458+ byteOffset += segment . ByteLength ;
459+ EnqueueLine ( segment ) ;
460+ charsInBuffer = 0 ;
456461 }
457462
458463 charsAvailable = _charBufferSize - charsInBuffer ;
0 commit comments