Skip to content

Commit

Permalink
Simplify chain interceptors (grpc-ecosystem#421)
Browse files Browse the repository at this point in the history
The four (client/server, unary/stream) interceptors have to wrap a slice
of interceptors in functions which satisfy the handler interface, but
which are closures over the other parameters an interceptor is expected
to have.

The previous approach accomplished this goal with recursion. This had
two drawbacks: first, the code was difficult to understand, as most
recursion attempts to fully encode state in function parameters, but
here state was necessarily also encoded in the closure; and second,
the recursive base-case meant that even the innermost interceptor was
not calling the bare handler, it was calling a wrapped handler.

This new approach instead iteratively constructs wrappers from the
inside out. It results in fewer lines of code, with fewer variables
held in each closure. Hopefully this results in higher readability.
  • Loading branch information
aarongable authored May 26, 2021
1 parent 1778d41 commit 6465c16
Showing 1 changed file with 78 additions and 40 deletions.
118 changes: 78 additions & 40 deletions chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,36 @@ import (
func ChainUnaryServer(interceptors ...grpc.UnaryServerInterceptor) grpc.UnaryServerInterceptor {
n := len(interceptors)

return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
chainer := func(currentInter grpc.UnaryServerInterceptor, currentHandler grpc.UnaryHandler) grpc.UnaryHandler {
return func(currentCtx context.Context, currentReq interface{}) (interface{}, error) {
return currentInter(currentCtx, currentReq, info, currentHandler)
}
// Dummy interceptor maintained for backward compatibility to avoid returning nil.
if n == 0 {
return func(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
return handler(ctx, req)
}
}

chainedHandler := handler
for i := n - 1; i >= 0; i-- {
chainedHandler = chainer(interceptors[i], chainedHandler)
}
// The degenerate case, just return the single wrapped interceptor directly.
if n == 1 {
return interceptors[0]
}

return chainedHandler(ctx, req)
// Return a function which satisfies the interceptor interface, and which is
// a closure over the given list of interceptors to be chained.
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
currHandler := handler
// Iterate backwards through all interceptors except the first (outermost).
// Wrap each one in a function which satisfies the handler interface, but
// is also a closure over the `info` and `handler` parameters. Then pass
// each pseudo-handler to the next outer interceptor as the handler to be called.
for i := n - 1; i > 0; i-- {
// Rebind to loop-local vars so they can be closed over.
innerHandler, i := currHandler, i
currHandler = func(currentCtx context.Context, currentReq interface{}) (interface{}, error) {
return interceptors[i](currentCtx, currentReq, info, innerHandler)
}
}
// Finally return the result of calling the outermost interceptor with the
// outermost pseudo-handler created above as its handler.
return interceptors[0](ctx, req, info, currHandler)
}
}

Expand All @@ -47,19 +64,26 @@ func ChainUnaryServer(interceptors ...grpc.UnaryServerInterceptor) grpc.UnarySer
func ChainStreamServer(interceptors ...grpc.StreamServerInterceptor) grpc.StreamServerInterceptor {
n := len(interceptors)

return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
chainer := func(currentInter grpc.StreamServerInterceptor, currentHandler grpc.StreamHandler) grpc.StreamHandler {
return func(currentSrv interface{}, currentStream grpc.ServerStream) error {
return currentInter(currentSrv, currentStream, info, currentHandler)
}
// Dummy interceptor maintained for backward compatibility to avoid returning nil.
if n == 0 {
return func(srv interface{}, stream grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
return handler(srv, stream)
}
}

chainedHandler := handler
for i := n - 1; i >= 0; i-- {
chainedHandler = chainer(interceptors[i], chainedHandler)
}
if n == 1 {
return interceptors[0]
}

return chainedHandler(srv, ss)
return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
currHandler := handler
for i := n - 1; i > 0; i-- {
innerHandler, i := currHandler, i
currHandler = func(currentSrv interface{}, currentStream grpc.ServerStream) error {
return interceptors[i](currentSrv, currentStream, info, innerHandler)
}
}
return interceptors[0](srv, stream, info, currHandler)
}
}

Expand All @@ -70,19 +94,26 @@ func ChainStreamServer(interceptors ...grpc.StreamServerInterceptor) grpc.Stream
func ChainUnaryClient(interceptors ...grpc.UnaryClientInterceptor) grpc.UnaryClientInterceptor {
n := len(interceptors)

return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
chainer := func(currentInter grpc.UnaryClientInterceptor, currentInvoker grpc.UnaryInvoker) grpc.UnaryInvoker {
return func(currentCtx context.Context, currentMethod string, currentReq, currentRepl interface{}, currentConn *grpc.ClientConn, currentOpts ...grpc.CallOption) error {
return currentInter(currentCtx, currentMethod, currentReq, currentRepl, currentConn, currentInvoker, currentOpts...)
}
// Dummy interceptor maintained for backward compatibility to avoid returning nil.
if n == 0 {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
return invoker(ctx, method, req, reply, cc, opts...)
}
}

chainedInvoker := invoker
for i := n - 1; i >= 0; i-- {
chainedInvoker = chainer(interceptors[i], chainedInvoker)
}
if n == 1 {
return interceptors[0]
}

return chainedInvoker(ctx, method, req, reply, cc, opts...)
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
currInvoker := invoker
for i := n - 1; i > 0; i-- {
innerInvoker, i := currInvoker, i
currInvoker = func(currentCtx context.Context, currentMethod string, currentReq, currentRepl interface{}, currentConn *grpc.ClientConn, currentOpts ...grpc.CallOption) error {
return interceptors[i](currentCtx, currentMethod, currentReq, currentRepl, currentConn, innerInvoker, currentOpts...)
}
}
return interceptors[0](ctx, method, req, reply, cc, currInvoker, opts...)
}
}

Expand All @@ -93,19 +124,26 @@ func ChainUnaryClient(interceptors ...grpc.UnaryClientInterceptor) grpc.UnaryCli
func ChainStreamClient(interceptors ...grpc.StreamClientInterceptor) grpc.StreamClientInterceptor {
n := len(interceptors)

return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
chainer := func(currentInter grpc.StreamClientInterceptor, currentStreamer grpc.Streamer) grpc.Streamer {
return func(currentCtx context.Context, currentDesc *grpc.StreamDesc, currentConn *grpc.ClientConn, currentMethod string, currentOpts ...grpc.CallOption) (grpc.ClientStream, error) {
return currentInter(currentCtx, currentDesc, currentConn, currentMethod, currentStreamer, currentOpts...)
}
// Dummy interceptor maintained for backward compatibility to avoid returning nil.
if n == 0 {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
return streamer(ctx, desc, cc, method, opts...)
}
}

chainedStreamer := streamer
for i := n - 1; i >= 0; i-- {
chainedStreamer = chainer(interceptors[i], chainedStreamer)
}
if n == 1 {
return interceptors[0]
}

return chainedStreamer(ctx, desc, cc, method, opts...)
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
currStreamer := streamer
for i := n - 1; i > 0; i-- {
innerStreamer, i := currStreamer, i
currStreamer = func(currentCtx context.Context, currentDesc *grpc.StreamDesc, currentConn *grpc.ClientConn, currentMethod string, currentOpts ...grpc.CallOption) (grpc.ClientStream, error) {
return interceptors[i](currentCtx, currentDesc, currentConn, currentMethod, innerStreamer, currentOpts...)
}
}
return interceptors[0](ctx, desc, cc, method, currStreamer, opts...)
}
}

Expand Down

0 comments on commit 6465c16

Please sign in to comment.