-
Notifications
You must be signed in to change notification settings - Fork 3.7k
/
Copy pathgrpcserver.go
115 lines (98 loc) · 3.75 KB
/
grpcserver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package baseapp
import (
"context"
"fmt"
"strconv"
gogogrpc "github.com/cosmos/gogoproto/grpc"
grpcmiddleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpcrecovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
errorsmod "cosmossdk.io/errors"
storetypes "cosmossdk.io/store/types"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
grpctypes "github.com/cosmos/cosmos-sdk/types/grpc"
)
// RegisterGRPCServer registers gRPC services directly with the gRPC server.
func (app *BaseApp) RegisterGRPCServer(server gogogrpc.Server) {
// Define an interceptor for all gRPC queries: this interceptor will create
// a new sdk.Context, and pass it into the query handler.
interceptor := func(grpcCtx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
// If there's some metadata in the context, retrieve it.
md, ok := metadata.FromIncomingContext(grpcCtx)
if !ok {
return nil, status.Error(codes.Internal, "unable to retrieve metadata")
}
// Get height header from the request context, if present.
var height int64
if heightHeaders := md.Get(grpctypes.GRPCBlockHeightHeader); len(heightHeaders) == 1 {
height, err = strconv.ParseInt(heightHeaders[0], 10, 64)
if err != nil {
return nil, errorsmod.Wrapf(
sdkerrors.ErrInvalidRequest,
"Baseapp.RegisterGRPCServer: invalid height header %q: %v", grpctypes.GRPCBlockHeightHeader, err)
}
if err := checkNegativeHeight(height); err != nil {
return nil, err
}
}
// Create the sdk.Context. Passing false as 2nd arg, as we can't
// actually support proofs with gRPC right now.
sdkCtx, err := app.CreateQueryContext(height, false)
if err != nil {
return nil, err
}
// Add relevant gRPC headers
if height == 0 {
height = sdkCtx.BlockHeight() // If height was not set in the request, set it to the latest
}
// Attach the sdk.Context into the gRPC's context.Context.
grpcCtx = context.WithValue(grpcCtx, sdk.SdkContextKey, sdkCtx)
md = metadata.Pairs(grpctypes.GRPCBlockHeightHeader, strconv.FormatInt(height, 10))
if err = grpc.SetHeader(grpcCtx, md); err != nil {
app.logger.Error("failed to set gRPC header", "err", err)
}
app.logger.Debug("gRPC query received of type: " + fmt.Sprintf("%#v", req))
// Catch an OutOfGasPanic caused in the query handlers
defer func() {
if r := recover(); r != nil {
switch rType := r.(type) {
case storetypes.ErrorOutOfGas:
err = errorsmod.Wrapf(sdkerrors.ErrOutOfGas, "Query gas limit exceeded: %v, out of gas in location: %v", sdkCtx.GasMeter().Limit(), rType.Descriptor)
default:
panic(r)
}
}
}()
return handler(grpcCtx, req)
}
// Loop through all services and methods, add the interceptor, and register
// the service.
for _, data := range app.GRPCQueryRouter().serviceData {
desc := data.serviceDesc
newMethods := make([]grpc.MethodDesc, len(desc.Methods))
for i, method := range desc.Methods {
methodHandler := method.Handler
newMethods[i] = grpc.MethodDesc{
MethodName: method.MethodName,
Handler: func(srv interface{}, ctx context.Context, dec func(interface{}) error, _ grpc.UnaryServerInterceptor) (interface{}, error) {
return methodHandler(srv, ctx, dec, grpcmiddleware.ChainUnaryServer(
grpcrecovery.UnaryServerInterceptor(),
interceptor,
))
},
}
}
newDesc := &grpc.ServiceDesc{
ServiceName: desc.ServiceName,
HandlerType: desc.HandlerType,
Methods: newMethods,
Streams: desc.Streams,
Metadata: desc.Metadata,
}
server.RegisterService(newDesc, data.handler)
}
}