Skip to content

Commit 76a42f2

Browse files
committed
Pipeline added
1 parent f54bcc4 commit 76a42f2

2 files changed

Lines changed: 146 additions & 8 deletions

File tree

src/LogExpert.Core/Classes/Log/LogfileReader.cs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
namespace LogExpert.Core.Classes.Log;
1414

15-
public class LogfileReader : IAutoLogLineColumnizerCallback, IDisposable
15+
public partial class LogfileReader : IAutoLogLineColumnizerCallback, IDisposable
1616
{
1717
#region Fields
1818

@@ -1621,20 +1621,14 @@ private ILogStreamReader GetLogStreamReader (Stream stream, EncodingOptions enco
16211621
return IsXmlMode ? new XmlBlockSplitter(new XmlLogReader(reader), XmlLogConfig) : reader;
16221622
}
16231623

1624-
public enum ReaderType
1625-
{
1626-
Legacy,
1627-
System,
1628-
Channel
1629-
}
1630-
16311624
private ILogStreamReader CreateLogStreamReader (Stream stream, EncodingOptions encodingOptions)
16321625
{
16331626
return _readerType switch
16341627
{
16351628
ReaderType.Legacy => new PositionAwareStreamReaderLegacy(stream, encodingOptions, _maximumLineLength),
16361629
ReaderType.System => new PositionAwareStreamReaderSystem(stream, encodingOptions, _maximumLineLength),
16371630
ReaderType.Channel => new PositionAwareStreamReaderChannel(stream, encodingOptions, _maximumLineLength),
1631+
ReaderType.Pipeline => new PositionAwareStreamReaderPipeline(stream, encodingOptions, _maximumLineLength),
16381632
_ => throw new ArgumentOutOfRangeException(nameof(ReaderType), _readerType, null)
16391633
};
16401634
}
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
using System.IO.Pipelines;
2+
using System.Text;
3+
4+
using LogExpert.Core.Entities;
5+
6+
namespace LogExpert.Core.Classes.Log;
7+
8+
public class PositionAwareStreamReaderPipeline : LogStreamReaderBase
9+
{
10+
private const int DEFAULT_BYTE_BUFFER_SIZE = 64 * 1024; // 64 KB
11+
private const int MINIMUM_READ_AHEAD_SIZE = 4 * 1024; // 4 KB
12+
private const int DEFAULT_CHANNEL_CAPACITY = 128; // Number of line segments
13+
14+
private static readonly Encoding[] _preambleEncodings =
15+
[
16+
Encoding.UTF8,
17+
Encoding.Unicode,
18+
Encoding.BigEndianUnicode,
19+
Encoding.UTF32
20+
];
21+
22+
private readonly StreamPipeReaderOptions _streamPipeReaderOptions = new(bufferSize: DEFAULT_BYTE_BUFFER_SIZE, minimumReadSize: MINIMUM_READ_AHEAD_SIZE, leaveOpen: true);
23+
private readonly int _maximumLineLength;
24+
private readonly Lock _reconfigureLock = new();
25+
private readonly Stream _stream;
26+
private readonly Encoding _encoding;
27+
private readonly int _byteBufferSize;
28+
private readonly int _charBufferSize;
29+
private readonly long _preambleLength;
30+
31+
private readonly PipeReader _pipeReader;
32+
private readonly PipeWriter _pipeWriter;
33+
34+
//private Channel<LineSegment> _channel;
35+
private CancellationTokenSource _cts;
36+
private Task _producerTask;
37+
private bool _isDisposed;
38+
private long _position;
39+
40+
41+
public PositionAwareStreamReaderPipeline (Stream stream, EncodingOptions encodingOptions, int maximumLineLength)
42+
{
43+
ArgumentNullException.ThrowIfNull(stream);
44+
45+
if (!stream.CanRead)
46+
{
47+
throw new ArgumentException("Stream must support reading.", nameof(stream));
48+
}
49+
50+
if (!stream.CanSeek)
51+
{
52+
throw new ArgumentException("Stream must support seeking.", nameof(stream));
53+
}
54+
55+
if (maximumLineLength <= 0)
56+
{
57+
maximumLineLength = 1024;
58+
}
59+
60+
_maximumLineLength = maximumLineLength;
61+
_byteBufferSize = DEFAULT_BYTE_BUFFER_SIZE;
62+
var (length, detectedEncoding) = DetectPreambleLength(stream);
63+
_preambleLength = length;
64+
_encoding = DetermineEncoding(encodingOptions, detectedEncoding);
65+
66+
67+
_pipeReader = PipeReader.Create(stream, _streamPipeReaderOptions);
68+
69+
70+
_stream = stream;
71+
72+
_charBufferSize = Math.Max(_encoding.GetMaxCharCount(_byteBufferSize), _maximumLineLength + 2);
73+
74+
//RestartPipelineInternal(0);
75+
}
76+
77+
public override long Position { get; set; }
78+
79+
public override bool IsBufferComplete { get; }
80+
81+
public override Encoding Encoding { get; }
82+
83+
public override bool IsDisposed { get; protected set; }
84+
85+
public override int ReadChar ()
86+
{
87+
throw new NotImplementedException();
88+
}
89+
90+
public override string ReadLine ()
91+
{
92+
throw new NotImplementedException();
93+
}
94+
95+
protected override void Dispose (bool disposing)
96+
{
97+
throw new NotImplementedException();
98+
}
99+
100+
private static Encoding DetermineEncoding (EncodingOptions options, Encoding detectedEncoding)
101+
{
102+
return options?.Encoding != null
103+
? options.Encoding
104+
: detectedEncoding ?? options?.DefaultEncoding ?? Encoding.Default;
105+
}
106+
107+
private static (int length, Encoding? detectedEncoding) DetectPreambleLength (Stream stream)
108+
{
109+
if (!stream.CanSeek)
110+
{
111+
return (0, null);
112+
}
113+
114+
var originalPos = stream.Position;
115+
var buffer = new byte[4];
116+
_ = stream.Seek(0, SeekOrigin.Begin);
117+
var readBytes = stream.Read(buffer, 0, buffer.Length);
118+
_ = stream.Seek(originalPos, SeekOrigin.Begin);
119+
120+
if (readBytes >= 2)
121+
{
122+
foreach (var encoding in _preambleEncodings)
123+
{
124+
var preamble = encoding.GetPreamble();
125+
var fail = false;
126+
for (var i = 0; i < readBytes && i < preamble.Length; ++i)
127+
{
128+
if (buffer[i] != preamble[i])
129+
{
130+
fail = true;
131+
break;
132+
}
133+
}
134+
135+
if (!fail)
136+
{
137+
return (preamble.Length, encoding);
138+
}
139+
}
140+
}
141+
142+
return (0, null);
143+
}
144+
}

0 commit comments

Comments
 (0)