Skip to content

Commit

Permalink
Default to zero byte reads when we use PipeReader.Create (dotnet#43276)
Browse files Browse the repository at this point in the history
* Default to zero byte reads when we use PipeReader.Create
- This will avoid allocating buffers until there's data available to read.
  • Loading branch information
davidfowl authored Aug 15, 2022
1 parent 4faa91c commit 6d90a04
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 7 deletions.
5 changes: 4 additions & 1 deletion src/Http/Http/src/Features/RequestBodyPipeFeature.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ public class RequestBodyPipeFeature : IRequestBodyPipeFeature
private Stream? _streamInstanceWhenWrapped;
private readonly HttpContext _context;

// We want to use zero byte reads for the request body
private static readonly StreamPipeReaderOptions _defaultReaderOptions = new(useZeroByteReads: true);

/// <summary>
/// Initializes a new instance of <see cref="IRequestBodyPipeFeature"/>.
/// </summary>
Expand All @@ -36,7 +39,7 @@ public PipeReader Reader
!ReferenceEquals(_streamInstanceWhenWrapped, _context.Request.Body))
{
_streamInstanceWhenWrapped = _context.Request.Body;
_internalPipeReader = PipeReader.Create(_context.Request.Body);
_internalPipeReader = PipeReader.Create(_context.Request.Body, _defaultReaderOptions);

_context.Response.OnCompleted((self) =>
{
Expand Down
25 changes: 25 additions & 0 deletions src/Http/Http/test/Features/RequestBodyPipeFeatureTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Buffers;
using System.IO.Pipelines;
using System.Text;
using Moq;

namespace Microsoft.AspNetCore.Http.Features;

Expand Down Expand Up @@ -37,6 +38,30 @@ public async Task RequestBodyGetsDataFromSecondStream()
Assert.Equal(expectedString, GetStringFromReadResult(data));
}

[Fact]
public async Task RequestBodyDoesZeroByteRead()
{
var context = new DefaultHttpContext();
var mockStream = new Mock<Stream>();

var bufferLengths = new List<int>();

mockStream.Setup(s => s.CanRead).Returns(true);
mockStream.Setup(s => s.ReadAsync(It.IsAny<Memory<byte>>(), It.IsAny<CancellationToken>())).Returns<Memory<byte>, CancellationToken>((buffer, token) =>
{
bufferLengths.Add(buffer.Length);
return ValueTask.FromResult(0);
});

context.Request.Body = mockStream.Object;
var feature = new RequestBodyPipeFeature(context);
var data = await feature.Reader.ReadAsync();

Assert.Equal(2, bufferLengths.Count);
Assert.Equal(0, bufferLengths[0]);
Assert.Equal(4096, bufferLengths[1]);
}

private static string GetStringFromReadResult(ReadResult data)
{
return Encoding.ASCII.GetString(data.Buffer.ToArray());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ PipeReader IRequestBodyPipeFeature.Reader
if (!ReferenceEquals(_requestStreamInternal, RequestBody))
{
_requestStreamInternal = RequestBody;
RequestBodyPipeReader = PipeReader.Create(RequestBody, new StreamPipeReaderOptions(_context.MemoryPool, _context.MemoryPool.GetMinimumSegmentSize(), _context.MemoryPool.GetMinimumAllocSize()));
RequestBodyPipeReader = PipeReader.Create(RequestBody, new StreamPipeReaderOptions(_context.MemoryPool, _context.MemoryPool.GetMinimumSegmentSize(), _context.MemoryPool.GetMinimumAllocSize(), useZeroByteReads: true));

OnCompleted((self) =>
{
Expand Down
20 changes: 16 additions & 4 deletions src/Servers/Kestrel/Core/src/Middleware/Internal/LoggingStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,28 +76,40 @@ public override Task FlushAsync(CancellationToken cancellationToken)
public override int Read(byte[] buffer, int offset, int count)
{
int read = _inner.Read(buffer, offset, count);
Log("Read", new ReadOnlySpan<byte>(buffer, offset, read));
if (count > 0)
{
Log("Read", new ReadOnlySpan<byte>(buffer, offset, read));
}
return read;
}

public override int Read(Span<byte> destination)
{
int read = _inner.Read(destination);
Log("Read", destination.Slice(0, read));
if (!destination.IsEmpty)
{
Log("Read", destination.Slice(0, read));
}
return read;
}

public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
int read = await _inner.ReadAsync(buffer.AsMemory(offset, count), cancellationToken);
Log("ReadAsync", new ReadOnlySpan<byte>(buffer, offset, read));
if (count > 0)
{
Log("ReadAsync", new ReadOnlySpan<byte>(buffer, offset, read));
}
return read;
}

public override async ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default)
{
int read = await _inner.ReadAsync(destination, cancellationToken);
Log("ReadAsync", destination.Span.Slice(0, read));
if (!destination.IsEmpty)
{
Log("ReadAsync", destination.Span.Slice(0, read));
}
return read;
}

Expand Down
5 changes: 4 additions & 1 deletion src/Servers/Kestrel/Core/src/Middleware/LoggingDuplexPipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal;

internal sealed class LoggingDuplexPipe : DuplexPipeStreamAdapter<LoggingStream>
{
private static readonly StreamPipeReaderOptions _defaultReaderOptions = new(useZeroByteReads: true);
private static readonly StreamPipeWriterOptions _defaultWriterOptions = new();

public LoggingDuplexPipe(IDuplexPipe transport, ILogger logger) :
base(transport, stream => new LoggingStream(stream, logger))
base(transport, _defaultReaderOptions, _defaultWriterOptions, stream => new LoggingStream(stream, logger))
{
}
}
31 changes: 31 additions & 0 deletions src/Servers/Kestrel/test/InMemory.FunctionalTests/RequestTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
using Microsoft.AspNetCore.Server.Kestrel.InMemory.FunctionalTests.TestTransport;
using Microsoft.AspNetCore.Testing;
using Microsoft.Extensions.Logging;
using Moq;
using Xunit;

namespace Microsoft.AspNetCore.Server.Kestrel.InMemory.FunctionalTests;
Expand Down Expand Up @@ -80,6 +81,36 @@ public async Task PipesAreNotPersistedAcrossRequests()
}
}

[Fact]
public async Task RequestBodyPipeReaderDoesZeroByteReads()
{
await using (var server = new TestServer(async context =>
{
var bufferLengths = new List<int>();

var mockStream = new Mock<Stream>();

mockStream.Setup(s => s.CanRead).Returns(true);
mockStream.Setup(s => s.ReadAsync(It.IsAny<Memory<byte>>(), It.IsAny<CancellationToken>())).Returns<Memory<byte>, CancellationToken>((buffer, token) =>
{
bufferLengths.Add(buffer.Length);
return ValueTask.FromResult(0);
});

context.Request.Body = mockStream.Object;
var data = await context.Request.BodyReader.ReadAsync();

Assert.Equal(2, bufferLengths.Count);
Assert.Equal(0, bufferLengths[0]);
Assert.Equal(4096, bufferLengths[1]);

await context.Response.WriteAsync("hello, world");
}, new TestServiceContext(LoggerFactory)))
{
Assert.Equal("hello, world", await server.HttpClientSlim.GetStringAsync($"http://localhost:{server.Port}/"));
}
}

[Fact]
public async Task RequestBodyReadAsyncCanBeCancelled()
{
Expand Down

0 comments on commit 6d90a04

Please sign in to comment.