-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathParsedBufferConsumer.cs
80 lines (70 loc) · 2.89 KB
/
ParsedBufferConsumer.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using LogGrokCore.Data.Index;
using LogGrokCore.Data.Monikers;
namespace LogGrokCore.Data
{
public sealed class ParsedBufferConsumer
{
private readonly BlockingCollection<(long startOffset, int lineCount, string buffer)> _queue
= new(4);
private readonly LineIndex _lineIndex;
private readonly Indexer _indexer;
private readonly LogMetaInformation _logMetaInformation;
private readonly StringPool _stringPool;
private long? _totalBytesRead;
public ParsedBufferConsumer(
LineIndex lineIndex,
Indexer indexer,
LogMetaInformation logMetaInformation,
StringPool stringPool)
{
_lineIndex = lineIndex;
_indexer = indexer;
_logMetaInformation = logMetaInformation;
_stringPool = stringPool;
Task.Factory.StartNew(ConsumeBuffers);
}
public void AddParsedBuffer(long bufferStartOffset, int lineCount, string parsedBuffer)
{
_queue.Add((bufferStartOffset, lineCount, parsedBuffer));
}
public void CompleteAdding(long totalBytesRead)
{
_totalBytesRead = totalBytesRead;
_queue.CompleteAdding();
}
private unsafe void ConsumeBuffers()
{
long lineOffsetFromBufferStart = 0;
var componentsCount = _logMetaInformation.IndexedFieldNumbers.Length;
#pragma warning disable CS8619
foreach (var (bufferStartOffset, lineCount, buffer) in _queue.GetConsumingEnumerable())
#pragma warning restore CS8619
{
var metaOffset = 0;
fixed (char* start = buffer)
{
for (int idx = 0; idx < lineCount; idx++)
{
var lineMetaInformation = LineMetaInformation.Get(start + metaOffset, componentsCount);
lineOffsetFromBufferStart = bufferStartOffset +
lineMetaInformation.LineOffsetFromBufferStart;
var lineNum = _lineIndex.Add(lineOffsetFromBufferStart);
var indexKey = new IndexKey(buffer, metaOffset, componentsCount);
_indexer.Add(indexKey, lineNum);
metaOffset += lineMetaInformation.TotalSizeWithPayloadCharsAligned;
}
}
_stringPool.Return(buffer);
}
if (_totalBytesRead is not { } fileSize)
{
throw new InvalidOperationException();
}
_lineIndex.Finish((int) (fileSize - lineOffsetFromBufferStart));
_indexer.Finish();
}
}
}