Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/consumable array buffer writer pooling #59

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 38 additions & 24 deletions src/Bedrock.Framework/Protocols/ConsumableArrayBufferWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,27 @@

namespace Bedrock.Framework.Protocols
{
/// <summary>
/// This should be replaced with the framework version if https://github.com/dotnet/runtime/issues/1248 is implemented
/// </summary>
/// <typeparam name="T"></typeparam>
internal class ConsumableArrayBufferWriter<T> : IBufferWriter<T>
internal class ConsumableArrayBufferWriter : IBufferWriter<byte>, IDisposable
{
private T[] _buffer;
private byte[] _buffer;
private int _index;
private int _consumedCount;

private const int DefaultInitialBufferSize = 256;

/// <summary>
/// Creates an instance of an <see cref="ConsumableArrayBufferWriter{T}"/>, in which data can be written to,
/// Creates an instance of an <see cref="ConsumableArrayBufferWriter{byte}"/>, in which data can be written to,
/// with the default initial capacity.
/// </summary>
public ConsumableArrayBufferWriter()
{
_buffer = Array.Empty<T>();
_buffer = Array.Empty<byte>();
_index = 0;
_consumedCount = 0;
}

/// <summary>
/// Creates an instance of an <see cref="ConsumableArrayBufferWriter{T}"/>, in which data can be written to,
/// Creates an instance of an <see cref="ConsumableArrayBufferWriter{byte}"/>, in which data can be written to,
/// with an initial capacity specified.
/// </summary>
/// <param name="initialCapacity">The minimum capacity with which to initialize the underlying buffer.</param>
Expand All @@ -40,20 +36,20 @@ public ConsumableArrayBufferWriter(int initialCapacity)
if (initialCapacity <= 0)
throw new ArgumentException(nameof(initialCapacity));

_buffer = new T[initialCapacity];
_buffer = ArrayPool<byte>.Shared.Rent(initialCapacity);
_index = 0;
_consumedCount = 0;
}

/// <summary>
/// Returns the unconsumed data written to the underlying buffer so far, as a <see cref="ReadOnlyMemory{T}"/>.
/// Returns the unconsumed data written to the underlying buffer so far, as a <see cref="ReadOnlyMemory{byte}"/>.
/// </summary>
public ReadOnlyMemory<T> WrittenMemory => _buffer.AsMemory(_consumedCount.._index);
public ReadOnlyMemory<byte> WrittenMemory => _buffer.AsMemory(_consumedCount.._index);

/// <summary>
/// Returns the unconsumed data written to the underlying buffer so far, as a <see cref="ReadOnlySpan{T}"/>.
/// Returns the unconsumed data written to the underlying buffer so far, as a <see cref="ReadOnlySpan{byte}"/>.
/// </summary>
public ReadOnlySpan<T> WrittenSpan => _buffer.AsSpan(_consumedCount.._index);
public ReadOnlySpan<byte> WrittenSpan => _buffer.AsSpan(_consumedCount.._index);

/// <summary>
/// Returns the amount of unconsumed data written to the underlying buffer so far.
Expand All @@ -74,7 +70,7 @@ public ConsumableArrayBufferWriter(int initialCapacity)
/// Clears the data written to the underlying buffer.
/// </summary>
/// <remarks>
/// You must clear the <see cref="ConsumedArrayBufferWriter{T}"/> before trying to re-use it.
/// You must clear the <see cref="ConsumedArrayBufferWriter{byte}"/> before trying to re-use it.
/// </remarks>
public void Clear()
{
Expand All @@ -85,7 +81,7 @@ public void Clear()
}

/// <summary>
/// Notifies <see cref="IBufferWriter{T}"/> that <paramref name="count"/> amount of data was written to the output <see cref="Span{T}"/>/<see cref="Memory{T}"/>
/// Notifies <see cref="IBufferWriter{byte}"/> that <paramref name="count"/> amount of data was written to the output <see cref="Span{byte}"/>/<see cref="Memory{byte}"/>
/// </summary>
/// <exception cref="ArgumentException">
/// Thrown when <paramref name="count"/> is negative.
Expand All @@ -108,7 +104,7 @@ public void Advance(int count)
}

/// <summary>
/// Notifies <see cref="ConsumableArrayBufferWriter{T}"/> that <paramref name="count"/> amount of data was consumed from the output <see cref="Span{T}"/>/<see cref="Memory{T}"/ and can be overwritten>
/// Notifies <see cref="ConsumableArrayBufferWriter{byte}"/> that <paramref name="count"/> amount of data was consumed from the output <see cref="Span{byte}"/>/<see cref="Memory{byte}"/ and can be overwritten>
/// </summary>
/// <exception cref="ArgumentException">
/// Thrown when <paramref name="count"/> is negative.
Expand All @@ -132,6 +128,12 @@ public void Consume(int count)
{
_index = 0;
_consumedCount = 0;
if (Capacity >= DefaultInitialBufferSize * 2)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to return the buffer on consumed regardless of the size no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's the Default size why return it? The moment anyone wrote anything we would ask for it back, and if they've finished with it they should dispose...

{
// No point holding on to a large buffer
ArrayPool<byte>.Shared.Return(_buffer);
_buffer = Array.Empty<byte>();
}
}
else
{
Expand All @@ -140,45 +142,45 @@ public void Consume(int count)
}

/// <summary>
/// Returns a <see cref="Memory{T}"/> to write to that is at least the requested length (specified by <paramref name="sizeHint"/>).
/// Returns a <see cref="Memory{byte}"/> to write to that is at least the requested length (specified by <paramref name="sizeHint"/>).
/// If no <paramref name="sizeHint"/> is provided (or it's equal to <code>0</code>), some non-empty buffer is returned.
/// </summary>
/// <exception cref="ArgumentException">
/// Thrown when <paramref name="sizeHint"/> is negative.
/// </exception>
/// <remarks>
/// This will never return an empty <see cref="Memory{T}"/>.
/// This will never return an empty <see cref="Memory{byte}"/>.
/// </remarks>
/// <remarks>
/// There is no guarantee that successive calls will return the same buffer or the same-sized buffer.
/// </remarks>
/// <remarks>
/// You must request a new buffer after calling Advance to continue writing more data and cannot write to a previously acquired buffer.
/// </remarks>
public Memory<T> GetMemory(int sizeHint = 0)
public Memory<byte> GetMemory(int sizeHint = 0)
{
CheckAndResizeBuffer(sizeHint);
Debug.Assert(_buffer.Length > _index);
return _buffer.AsMemory(_index);
}

/// <summary>
/// Returns a <see cref="Span{T}"/> to write to that is at least the requested length (specified by <paramref name="sizeHint"/>).
/// Returns a <see cref="Span{byte}"/> to write to that is at least the requested length (specified by <paramref name="sizeHint"/>).
/// If no <paramref name="sizeHint"/> is provided (or it's equal to <code>0</code>), some non-empty buffer is returned.
/// </summary>
/// <exception cref="ArgumentException">
/// Thrown when <paramref name="sizeHint"/> is negative.
/// </exception>
/// <remarks>
/// This will never return an empty <see cref="Span{T}"/>.
/// This will never return an empty <see cref="Span{byte}"/>.
/// </remarks>
/// <remarks>
/// There is no guarantee that successive calls will return the same buffer or the same-sized buffer.
/// </remarks>
/// <remarks>
/// You must request a new buffer after calling Advance to continue writing more data and cannot write to a previously acquired buffer.
/// </remarks>
public Span<T> GetSpan(int sizeHint = 0)
public Span<byte> GetSpan(int sizeHint = 0)
{
CheckAndResizeBuffer(sizeHint);
Debug.Assert(_buffer.Length > _index);
Expand Down Expand Up @@ -216,8 +218,9 @@ private void CheckAndResizeBuffer(int sizeHint)
}

var newSize = checked(_buffer.Length + growBy);
var destinationArray = new T[newSize];
var destinationArray = ArrayPool<byte>.Shared.Rent(newSize);
Array.Copy(_buffer, _consumedCount, destinationArray, 0, countUnconsumed);
ReturnBuffer();
_buffer = destinationArray;
}
_index = countUnconsumed;
Expand All @@ -226,5 +229,16 @@ private void CheckAndResizeBuffer(int sizeHint)

Debug.Assert(FreeCapacity > 0 && FreeCapacity >= sizeHint);
}

public void Dispose()
{
ReturnBuffer();
_buffer = null; // This will cause a NRE if we use after dispose rather than writing data into a random array
}

private void ReturnBuffer()
{
ArrayPool<byte>.Shared.Return(_buffer);
}
}
}
4 changes: 2 additions & 2 deletions src/Bedrock.Framework/Protocols/MessagePipeReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public class MessagePipeReader : PipeReader
private bool _isThisCompleted;
private bool _isCanceled;
private bool _isCompleted;
private ConsumableArrayBufferWriter<byte> _backlog = new ConsumableArrayBufferWriter<byte>();
private readonly ConsumableArrayBufferWriter _backlog = new ConsumableArrayBufferWriter();
private bool _allExamined;
private bool _advanced = true;
public MessagePipeReader(PipeReader reader, IMessageReader<ReadOnlySequence<byte>> messageReader)
Expand Down Expand Up @@ -80,7 +80,7 @@ public override void Complete(Exception exception = null)
_reader.AdvanceTo(_consumed, _examined);
}
_isThisCompleted = true;
_backlog = null;
_backlog.Dispose();
}

public override async ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
Expand Down
Loading