Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RemovableChannelHandler conformance to MessageToByteHandler #1993

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
86 changes: 69 additions & 17 deletions Sources/NIOCore/Codec.swift
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ extension ByteToMessageDecoderError {
/// ### Implementing ByteToMessageDecoder
///
/// A type that implements `ByteToMessageDecoder` may implement two methods: decode and decodeLast. Implementations
/// must implement decode: if they do not implement decodeLast, a default implementation will be used that
/// must implement decode: if they do not implement decodeLast, a default implementation will be used that
/// simply calls decode.
///
/// `decode` is the main decoding method, and is the one that will be called most often. `decode` is invoked
Expand Down Expand Up @@ -238,7 +238,7 @@ extension ByteToMessageDecoder {
public func wrapInboundOut(_ value: InboundOut) -> NIOAny {
return NIOAny(value)
}

public mutating func decodeLast(context: ChannelHandlerContext, buffer: inout ByteBuffer, seenEOF: Bool) throws -> DecodingState {
while try self.decode(context: context, buffer: &buffer) == .continue {}
return .needMoreData
Expand Down Expand Up @@ -703,20 +703,7 @@ extension ByteToMessageHandler: ChannelOutboundHandler, _ChannelOutboundHandler
}
}

/// A protocol for straightforward encoders which encode custom messages to `ByteBuffer`s.
/// To add a `MessageToByteEncoder` to a `ChannelPipeline`, use
/// `channel.pipeline.addHandler(MessageToByteHandler(myEncoder)`.
public protocol MessageToByteEncoder {
associatedtype OutboundIn

/// Called once there is data to encode.
///
/// - parameters:
/// - data: The data to encode into a `ByteBuffer`.
/// - out: The `ByteBuffer` into which we want to encode.
func encode(data: OutboundIn, out: inout ByteBuffer) throws
}

// MARK: ByteToMessageHandler: RemovableChannelHandler
extension ByteToMessageHandler: RemovableChannelHandler {
public func removeHandler(context: ChannelHandlerContext, removalToken: ChannelHandlerContext.RemovalToken) {
precondition(self.removalState == .notBeingRemoved)
Expand All @@ -740,6 +727,45 @@ extension ByteToMessageHandler: RemovableChannelHandler {
}
}

/// A protocol for straightforward encoders which encode custom messages to `ByteBuffer`s.
/// To add a `MessageToByteEncoder` to a `ChannelPipeline`, use
/// `channel.pipeline.addHandler(MessageToByteHandler(myEncoder)`.
public protocol MessageToByteEncoder {
associatedtype OutboundIn

/// Called once there is data to encode.
///
/// - parameters:
/// - data: The data to encode into a `ByteBuffer`.
/// - out: The `ByteBuffer` into which we want to encode.
func encode(data: OutboundIn, out: inout ByteBuffer) throws

/// Called once there is data to encode and the `MessageToByteDecoder` is about to leave
///
/// - parameters:
/// - data: The data to encode into a `ByteBuffer`.
/// - out: The `ByteBuffer` into which we want to encode.
func encodeLast(data: OutboundIn, out: inout ByteBuffer) throws

/// Called once this `MessageToByteEncoder` is removed from the `ChannelPipeline`.
mutating func encoderRemoved()

/// Called when this `MessageToByteEncoder` is added to the `ChannelPipeline`.
mutating func encoderAdded()
}

extension MessageToByteEncoder {

public mutating func encoderRemoved() {
}

public mutating func encoderAdded() {
}

public func encodeLast(out: inout ByteBuffer) throws {
}
}

/// A handler which turns a given `MessageToByteEncoder` into a `ChannelOutboundHandler` that can then be added to a
/// `ChannelPipeline`.
public final class MessageToByteHandler<Encoder: MessageToByteEncoder>: ChannelOutboundHandler {
Expand All @@ -760,10 +786,19 @@ public final class MessageToByteHandler<Encoder: MessageToByteEncoder>: ChannelO
return false
}
}

var isOperational: Bool {
switch self {
case .operational:
return true
case .notInChannelYet, .error, .done:
return false
}
}
AlexisQapa marked this conversation as resolved.
Show resolved Hide resolved
}

private var state: State = .notInChannelYet
private let encoder: Encoder
private var encoder: Encoder
private var buffer: ByteBuffer? = nil

public init(_ encoder: Encoder) {
Expand All @@ -777,11 +812,13 @@ extension MessageToByteHandler {
"illegal state when adding to Channel: \(self.state)")
self.state = .operational
self.buffer = context.channel.allocator.buffer(capacity: 256)
self.encoder.encoderAdded()
}

public func handlerRemoved(context: ChannelHandlerContext) {
self.state = .done
self.buffer = nil
self.encoder.encoderRemoved()
}

public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
Expand Down Expand Up @@ -812,3 +849,18 @@ extension MessageToByteHandler {
}
}
}

// MARK: ByteToMessageHandler: RemovableChannelHandler
extension MessageToByteHandler: RemovableChannelHandler {
public func removeHandler(context: ChannelHandlerContext, removalToken: ChannelHandlerContext.RemovalToken) {
do {
AlexisQapa marked this conversation as resolved.
Show resolved Hide resolved
self.buffer!.clear()
try self.encoder.encodeLast(out: &self.buffer!)
context.write(self.wrapOutboundOut(self.buffer!), promise: nil)
} catch {
self.state = .error(error)
context.fireErrorCaught(error)
}
context.leavePipeline(removalToken: removalToken)
}
AlexisQapa marked this conversation as resolved.
Show resolved Hide resolved
}