Skip to content

Commit

Permalink
B2MD: Don't try to reclaim if continuing to parse (apple#1733)
Browse files Browse the repository at this point in the history
Motivation:

B2MD called out to the decoder's `shouldReclaimBytes` after every
parsing attempt, even if the parser said `.continue`.

That's quite pointless because we won't add any bytes into the buffer
before we're trying the parser again.

Modifications:

Only ask the decoder if we should reclaim bytes if the decoder actually
says `.needMoreData`.

Result:

Faster, better, more sensible.
  • Loading branch information
weissi authored Jan 29, 2021
1 parent ef9e98a commit c5a214f
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 13 deletions.
10 changes: 8 additions & 2 deletions Sources/NIO/Codec.swift
Original file line number Diff line number Diff line change
Expand Up @@ -512,12 +512,13 @@ extension ByteToMessageHandler {
case .nothingAvailable:
return .didProcess(.needMoreData)
case .available(var buffer):
var possiblyReclaimBytes = false
var decoder: Decoder? = nil
swap(&decoder, &self.decoder)
assert(decoder != nil) // self.decoder only `nil` if we're being re-entered, but .available means we're not
defer {
swap(&decoder, &self.decoder)
if buffer.readableBytes > 0 {
if buffer.readableBytes > 0 && possiblyReclaimBytes {
// we asserted above that the decoder we just swapped back in was non-nil so now `self.decoder` must
// be non-nil.
if self.decoder!.shouldReclaimBytes(buffer: buffer) {
Expand All @@ -526,7 +527,12 @@ extension ByteToMessageHandler {
}
self.buffer.finishProcessing(remainder: &buffer)
}
return .didProcess(try body(&decoder!, &buffer))
let decodeResult = try body(&decoder!, &buffer)

// If we .continue, there's no point in trying to reclaim bytes because we'll loop again. If we need more
// data on the other hand, we should try to reclaim some of those bytes.
possiblyReclaimBytes = decodeResult == .needMoreData
return .didProcess(decodeResult)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2021 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import NIO

final class ByteToMessageDecoderDecodeManySmallsBenchmark: Benchmark {
private let iterations: Int
private let buffer: ByteBuffer
private let channel: EmbeddedChannel

init(iterations: Int, bufferSize: Int) {
self.iterations = iterations
self.buffer = ByteBuffer(repeating: 0, count: bufferSize)
self.channel = EmbeddedChannel(handler: ByteToMessageHandler(Decoder()))
}

func setUp() throws {
//try self.channel.connect(to: .init(ipAddress: "1.2.3.4", port: 5)).wait()
}

func tearDown() {
precondition(try! self.channel.finish().isClean)
}

func run() -> Int {
// for _ in 1...self.iterations {
// try! self.channel.writeInbound(self.buffer)
// }
// return Int(self.buffer.readableBytes)
return 7
}

struct Decoder: ByteToMessageDecoder {
typealias InboundOut = Never

func decode(context: ChannelHandlerContext, buffer: inout ByteBuffer) throws -> DecodingState {
if buffer.readSlice(length: 16) == nil {
return .needMoreData
} else {
return .continue
}
}
}
}
3 changes: 3 additions & 0 deletions Sources/NIOPerformanceTester/main.swift
Original file line number Diff line number Diff line change
Expand Up @@ -767,3 +767,6 @@ try measureAndPrint(desc: "circular_buffer_into_byte_buffer_1kb", benchmark: Cir
try measureAndPrint(desc: "circular_buffer_into_byte_buffer_1mb", benchmark: CircularBufferIntoByteBufferBenchmark(iterations: 20, bufferSize: 1024*1024))

try measureAndPrint(desc: "byte_buffer_view_iterator_1mb", benchmark: ByteBufferViewIteratorBenchmark(iterations: 20, bufferSize: 1024*1024))

try measureAndPrint(desc: "byte_to_message_decoder_decode_many_small",
benchmark: ByteToMessageDecoderDecodeManySmallsBenchmark(iterations: 1_000, bufferSize: 16384))
1 change: 1 addition & 0 deletions Tests/NIOTests/CodecTest+XCTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ extension ByteToMessageDecoderTest {
("testDecoderIsNotQuadratic", testDecoderIsNotQuadratic),
("testMemoryIsReclaimedIfMostIsConsumed", testMemoryIsReclaimedIfMostIsConsumed),
("testMemoryIsReclaimedIfLotsIsAvailable", testMemoryIsReclaimedIfLotsIsAvailable),
("testWeDoNotCallShouldReclaimMemoryAsLongAsWeContinue", testWeDoNotCallShouldReclaimMemoryAsLongAsWeContinue),
("testDecoderReentranceChannelRead", testDecoderReentranceChannelRead),
("testTrivialDecoderDoesSensibleStuffWhenCloseInRead", testTrivialDecoderDoesSensibleStuffWhenCloseInRead),
("testLeftOversMakeDecodeLastCalled", testLeftOversMakeDecodeLastCalled),
Expand Down
91 changes: 80 additions & 11 deletions Tests/NIOTests/CodecTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -200,22 +200,24 @@ public final class ByteToMessageDecoderTest: XCTestCase {
XCTAssertEqual(testDecoderIsNotQuadratic_reallocs, 3)
}

func testMemoryIsReclaimedIfMostIsConsumed() throws {
func testMemoryIsReclaimedIfMostIsConsumed() {
let channel = EmbeddedChannel()
defer {
XCTAssertNoThrow(try channel.finish())
XCTAssertNoThrow(XCTAssertTrue(try channel.finish().isClean))
}

let decoder = ByteToMessageHandler(LargeChunkDecoder())
_ = try channel.pipeline.addHandler(decoder).wait()
XCTAssertNoThrow(try channel.pipeline.addHandler(decoder).wait())

// We're going to send in 513 bytes. This will cause a chunk to be passed on, and will leave
// a 512-byte empty region in a byte buffer with a capacity of 1024 bytes. Since 512 empty
// bytes are exactly 50% of the buffers capacity and not one tiny bit more, the empty space
// will not be reclaimed.
var buffer = channel.allocator.buffer(capacity: 513)
buffer.writeBytes(Array(repeating: 0x04, count: 513))
XCTAssertTrue(try channel.writeInbound(buffer).isFull)
XCTAssertNoThrow(XCTAssertTrue(try channel.writeInbound(buffer).isFull))
XCTAssertNoThrow(XCTAssertEqual(ByteBuffer(repeating: 0x04, count: 512), try channel.readInbound()))
XCTAssertNoThrow(XCTAssertNil(try channel.readInbound()))

XCTAssertEqual(decoder.cumulationBuffer!.capacity, 1024)
XCTAssertEqual(decoder.cumulationBuffer!.readableBytes, 1)
Expand All @@ -230,7 +232,9 @@ public final class ByteToMessageDecoderTest: XCTestCase {
// reclaimed: While the capacity is more than 1024 bytes (2048 bytes), the reader index is
// now at 1024. This means the buffer is exactly 50% consumed and not a tiny bit more, which
// means no space will be reclaimed.
XCTAssertTrue(try channel.writeInbound(buffer).isFull)
XCTAssertNoThrow(XCTAssertTrue(try channel.writeInbound(buffer).isFull))
XCTAssertNoThrow(XCTAssertEqual(ByteBuffer(repeating: 0x04, count: 512), try channel.readInbound()))
XCTAssertNoThrow(XCTAssertNil(try channel.readInbound()))

XCTAssertEqual(decoder.cumulationBuffer!.capacity, 2048)
XCTAssertEqual(decoder.cumulationBuffer!.readableBytes, 2)
Expand All @@ -243,37 +247,102 @@ public final class ByteToMessageDecoderTest: XCTestCase {
// (3 * 512 bytes). This means that 75% of the buffer's capacity can now be reclaimed, which
// will lead to a reclaim. The resulting buffer will have a capacity of 2048 bytes (based
// on its previous growth), with 3 readable bytes remaining.
XCTAssertTrue(try channel.writeInbound(buffer).isFull)
XCTAssertNoThrow(XCTAssertTrue(try channel.writeInbound(buffer).isFull))
XCTAssertNoThrow(XCTAssertEqual(ByteBuffer(repeating: 0x04, count: 512), try channel.readInbound()))
XCTAssertNoThrow(XCTAssertNil(try channel.readInbound()))

XCTAssertEqual(decoder.cumulationBuffer!.capacity, 2048)
XCTAssertEqual(decoder.cumulationBuffer!.readableBytes, 3)
XCTAssertEqual(decoder.cumulationBuffer!.readerIndex, 0)
}

func testMemoryIsReclaimedIfLotsIsAvailable() throws {
func testMemoryIsReclaimedIfLotsIsAvailable() {
let channel = EmbeddedChannel()
defer {
XCTAssertNoThrow(try channel.finish())
XCTAssertNoThrow(XCTAssertTrue(try channel.finish().isClean))
}

let decoder = ByteToMessageHandler(OnceDecoder())
_ = try channel.pipeline.addHandler(decoder).wait()
XCTAssertNoThrow(try channel.pipeline.addHandler(decoder).wait())

// We're going to send in 5119 bytes. This will be held.
var buffer = channel.allocator.buffer(capacity: 5119)
buffer.writeBytes(Array(repeating: 0x04, count: 5119))
XCTAssertTrue(try channel.writeInbound(buffer).isEmpty)
XCTAssertNoThrow(XCTAssertTrue(try channel.writeInbound(buffer).isEmpty))

XCTAssertEqual(decoder.cumulationBuffer!.readableBytes, 5119)
XCTAssertEqual(decoder.cumulationBuffer!.readerIndex, 0)

// Now we're going to send in one more byte. This will cause a chunk to be passed on,
// shrinking the held memory to 3072 bytes. However, memory will be reclaimed.
XCTAssertTrue(try channel.writeInbound(buffer.getSlice(at: 0, length: 1)).isFull)
XCTAssertNoThrow(XCTAssertTrue(try channel.writeInbound(buffer.getSlice(at: 0, length: 1)).isFull))
XCTAssertNoThrow(XCTAssertEqual(ByteBuffer(repeating: 0x04, count: 2048), try channel.readInbound()))
XCTAssertNoThrow(XCTAssertNil(try channel.readInbound()))

XCTAssertEqual(decoder.cumulationBuffer!.readableBytes, 3072)
XCTAssertEqual(decoder.cumulationBuffer!.readerIndex, 0)
}

func testWeDoNotCallShouldReclaimMemoryAsLongAsWeContinue() {
class Decoder: ByteToMessageDecoder {
typealias InboundOut = ByteBuffer

var numberOfDecodeCalls = 0
var numberOfShouldReclaimCalls = 0

func decode(context: ChannelHandlerContext, buffer: inout ByteBuffer) throws -> DecodingState {
self.numberOfDecodeCalls += 1

guard buffer.readSlice(length: [2048, 1024, 512, 256,
128, 64, .max][self.numberOfDecodeCalls - 1]) != nil else {
return .needMoreData
}

XCTAssertEqual(0, self.numberOfShouldReclaimCalls)
return .continue
}

func shouldReclaimBytes(buffer: ByteBuffer) -> Bool {
XCTAssertEqual(7, self.numberOfDecodeCalls)
XCTAssertEqual(64, buffer.readableBytes)
self.numberOfShouldReclaimCalls += 1

return false
}

func decodeLast(context: ChannelHandlerContext,
buffer: inout ByteBuffer,
seenEOF: Bool) throws -> DecodingState {
XCTAssertEqual(64, buffer.readableBytes)
XCTAssertTrue(seenEOF)
return .needMoreData
}
}

let decoder = Decoder()
let channel = EmbeddedChannel(handler: ByteToMessageHandler(decoder))
defer {
XCTAssertNoThrow(XCTAssertTrue(try channel.finish().isClean))
}

let buffer = ByteBuffer(repeating: 0, count: 4096)
XCTAssertEqual(4096, buffer.storageCapacity)

// We're sending 4096 bytes. The decoder will do:
// 1. read 2048 -> .continue
// 2. read 1024 -> .continue
// 3. read 512 -> .continue
// 4. read 256 -> .continue
// 5. read 128 -> .continue
// 6. read 64 -> .continue
// 7. read Int.max -> .needMoreData
//
// So we're expecting 7 decode calls but only 1 call to shouldReclaimBytes (at the end).
XCTAssertNoThrow(try channel.writeInbound(buffer))
XCTAssertEqual(7, decoder.numberOfDecodeCalls)
XCTAssertEqual(1, decoder.numberOfShouldReclaimCalls)
}

func testDecoderReentranceChannelRead() throws {
let channel = EmbeddedChannel()
defer {
Expand Down

0 comments on commit c5a214f

Please sign in to comment.