forked from grpc/grpc-go
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
rls: Implementation of the RLS client. (grpc#3446)
- Loading branch information
Showing
3 changed files
with
372 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
/* | ||
* | ||
* Copyright 2020 gRPC authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
* | ||
*/ | ||
|
||
package rls | ||
|
||
import ( | ||
"context" | ||
"time" | ||
|
||
"google.golang.org/grpc" | ||
rlspb "google.golang.org/grpc/balancer/rls/internal/proto/grpc_lookup_v1" | ||
) | ||
|
||
// For gRPC services using RLS, the value of target_type in the | ||
// RouteLookupServiceRequest will be set to this. | ||
const grpcTargetType = "grpc" | ||
|
||
// rlsClient is a simple wrapper around a RouteLookupService client which | ||
// provides non-blocking semantics on top of a blocking unary RPC call. | ||
// | ||
// The RLS LB policy creates a new rlsClient object with the following values: | ||
// * a grpc.ClientConn to the RLS server using appropriate credentials from the | ||
// parent channel | ||
// * dialTarget corresponding to the original user dial target, e.g. | ||
// "firestore.googleapis.com". | ||
// | ||
// The RLS LB policy uses an adaptive throttler to perform client side | ||
// throttling and asks this client to make an RPC call only after checking with | ||
// the throttler. | ||
type rlsClient struct { | ||
cc *grpc.ClientConn | ||
stub rlspb.RouteLookupServiceClient | ||
// origDialTarget is the original dial target of the user and sent in each | ||
// RouteLookup RPC made to the RLS server. | ||
origDialTarget string | ||
// rpcTimeout specifies the timeout for the RouteLookup RPC call. The LB | ||
// policy receives this value in its service config. | ||
rpcTimeout time.Duration | ||
} | ||
|
||
func newRLSClient(cc *grpc.ClientConn, dialTarget string, rpcTimeout time.Duration) *rlsClient { | ||
return &rlsClient{ | ||
cc: cc, | ||
stub: rlspb.NewRouteLookupServiceClient(cc), | ||
origDialTarget: dialTarget, | ||
rpcTimeout: rpcTimeout, | ||
} | ||
} | ||
|
||
type lookupCallback func(target, headerData string, err error) | ||
|
||
// lookup starts a RouteLookup RPC in a separate goroutine and returns the | ||
// results (and error, if any) in the provided callback. | ||
func (c *rlsClient) lookup(path string, keyMap map[string]string, cb lookupCallback) { | ||
go func() { | ||
ctx, cancel := context.WithTimeout(context.Background(), c.rpcTimeout) | ||
resp, err := c.stub.RouteLookup(ctx, &rlspb.RouteLookupRequest{ | ||
Server: c.origDialTarget, | ||
Path: path, | ||
TargetType: grpcTargetType, | ||
KeyMap: keyMap, | ||
}) | ||
cb(resp.GetTarget(), resp.GetHeaderData(), err) | ||
cancel() | ||
}() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,199 @@ | ||
/* | ||
* | ||
* Copyright 2020 gRPC authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
* | ||
*/ | ||
|
||
package rls | ||
|
||
import ( | ||
"errors" | ||
"fmt" | ||
"testing" | ||
"time" | ||
|
||
"github.com/golang/protobuf/proto" | ||
"github.com/google/go-cmp/cmp" | ||
"google.golang.org/grpc" | ||
rlspb "google.golang.org/grpc/balancer/rls/internal/proto/grpc_lookup_v1" | ||
"google.golang.org/grpc/balancer/rls/internal/testutils/fakeserver" | ||
"google.golang.org/grpc/codes" | ||
"google.golang.org/grpc/status" | ||
) | ||
|
||
const ( | ||
defaultDialTarget = "dummy" | ||
defaultRPCTimeout = 5 * time.Second | ||
defaultTestTimeout = 1 * time.Second | ||
) | ||
|
||
func setup(t *testing.T) (*fakeserver.Server, *grpc.ClientConn, func()) { | ||
t.Helper() | ||
|
||
server, sCleanup, err := fakeserver.Start() | ||
if err != nil { | ||
t.Fatalf("Failed to start fake RLS server: %v", err) | ||
} | ||
|
||
cc, cCleanup, err := server.ClientConn() | ||
if err != nil { | ||
t.Fatalf("Failed to get a ClientConn to the RLS server: %v", err) | ||
} | ||
|
||
return server, cc, func() { | ||
sCleanup() | ||
cCleanup() | ||
} | ||
} | ||
|
||
// TestLookupFailure verifies the case where the RLS server returns an error. | ||
func TestLookupFailure(t *testing.T) { | ||
server, cc, cleanup := setup(t) | ||
defer cleanup() | ||
|
||
// We setup the fake server to return an error. | ||
server.ResponseChan <- fakeserver.Response{Err: errors.New("rls failure")} | ||
|
||
rlsClient := newRLSClient(cc, defaultDialTarget, defaultRPCTimeout) | ||
|
||
errCh := make(chan error) | ||
rlsClient.lookup("", nil, func(target, headerData string, err error) { | ||
if err == nil { | ||
errCh <- errors.New("rlsClient.lookup() succeeded, should have failed") | ||
return | ||
} | ||
if target != "" || headerData != "" { | ||
errCh <- fmt.Errorf("rlsClient.lookup() = (%s, %s), should be empty strings", target, headerData) | ||
return | ||
} | ||
errCh <- nil | ||
}) | ||
|
||
timer := time.NewTimer(defaultTestTimeout) | ||
select { | ||
case <-timer.C: | ||
t.Fatal("Timeout when expecting a routeLookup callback") | ||
case err := <-errCh: | ||
timer.Stop() | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
} | ||
} | ||
|
||
// TestLookupDeadlineExceeded tests the case where the RPC deadline associated | ||
// with the lookup expires. | ||
func TestLookupDeadlineExceeded(t *testing.T) { | ||
_, cc, cleanup := setup(t) | ||
defer cleanup() | ||
|
||
// Give the Lookup RPC a small deadline, but don't setup the fake server to | ||
// return anything. So the Lookup call will block and eventuall expire. | ||
rlsClient := newRLSClient(cc, defaultDialTarget, 100*time.Millisecond) | ||
|
||
errCh := make(chan error) | ||
rlsClient.lookup("", nil, func(target, headerData string, err error) { | ||
if st, ok := status.FromError(err); !ok || st.Code() != codes.DeadlineExceeded { | ||
errCh <- fmt.Errorf("rlsClient.lookup() returned error: %v, want %v", err, codes.DeadlineExceeded) | ||
return | ||
} | ||
errCh <- nil | ||
}) | ||
|
||
timer := time.NewTimer(defaultTestTimeout) | ||
select { | ||
case <-timer.C: | ||
t.Fatal("Timeout when expecting a routeLookup callback") | ||
case err := <-errCh: | ||
timer.Stop() | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
} | ||
} | ||
|
||
// TestLookupSuccess verifies the successful Lookup API case. | ||
func TestLookupSuccess(t *testing.T) { | ||
server, cc, cleanup := setup(t) | ||
defer cleanup() | ||
|
||
const ( | ||
defaultTestTimeout = 1 * time.Second | ||
rlsReqPath = "/service/method" | ||
rlsRespTarget = "us_east_1.firestore.googleapis.com" | ||
rlsHeaderData = "headerData" | ||
) | ||
|
||
rlsReqKeyMap := map[string]string{ | ||
"k1": "v1", | ||
"k2": "v2", | ||
} | ||
wantLookupRequest := &rlspb.RouteLookupRequest{ | ||
Server: defaultDialTarget, | ||
Path: rlsReqPath, | ||
TargetType: "grpc", | ||
KeyMap: rlsReqKeyMap, | ||
} | ||
|
||
rlsClient := newRLSClient(cc, defaultDialTarget, defaultRPCTimeout) | ||
|
||
errCh := make(chan error) | ||
rlsClient.lookup(rlsReqPath, rlsReqKeyMap, func(t, hd string, err error) { | ||
if err != nil { | ||
errCh <- fmt.Errorf("rlsClient.Lookup() failed: %v", err) | ||
return | ||
} | ||
if t != rlsRespTarget || hd != rlsHeaderData { | ||
errCh <- fmt.Errorf("rlsClient.lookup() = (%s, %s), want (%s, %s)", t, hd, rlsRespTarget, rlsHeaderData) | ||
return | ||
} | ||
errCh <- nil | ||
}) | ||
|
||
// Make sure that the fake server received the expected RouteLookupRequest | ||
// proto. | ||
timer := time.NewTimer(defaultTestTimeout) | ||
select { | ||
case gotLookupRequest := <-server.RequestChan: | ||
if !timer.Stop() { | ||
<-timer.C | ||
} | ||
if diff := cmp.Diff(wantLookupRequest, gotLookupRequest, cmp.Comparer(proto.Equal)); diff != "" { | ||
t.Fatalf("RouteLookupRequest diff (-want, +got):\n%s", diff) | ||
} | ||
case <-timer.C: | ||
t.Fatalf("Timed out wile waiting for a RouteLookupRequest") | ||
} | ||
|
||
// We setup the fake server to return this response when it receives a | ||
// request. | ||
server.ResponseChan <- fakeserver.Response{ | ||
Resp: &rlspb.RouteLookupResponse{ | ||
Target: rlsRespTarget, | ||
HeaderData: rlsHeaderData, | ||
}, | ||
} | ||
|
||
timer = time.NewTimer(defaultTestTimeout) | ||
select { | ||
case <-timer.C: | ||
t.Fatal("Timeout when expecting a routeLookup callback") | ||
case err := <-errCh: | ||
timer.Stop() | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
/* | ||
* | ||
* Copyright 2020 gRPC authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
* | ||
*/ | ||
|
||
// Package fakeserver provides a fake implementation of the RouteLookupService, | ||
// to be used in unit tests. | ||
package fakeserver | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"net" | ||
"time" | ||
|
||
"google.golang.org/grpc" | ||
rlsgrpc "google.golang.org/grpc/balancer/rls/internal/proto/grpc_lookup_v1" | ||
rlspb "google.golang.org/grpc/balancer/rls/internal/proto/grpc_lookup_v1" | ||
) | ||
|
||
const defaultDialTimeout = 5 * time.Second | ||
|
||
// Response wraps the response protobuf (xds/LRS) and error that the Server | ||
// should send out to the client through a call to stream.Send() | ||
type Response struct { | ||
Resp *rlspb.RouteLookupResponse | ||
Err error | ||
} | ||
|
||
// Server is a fake implementation of RLS. It exposes channels to send/receive | ||
// RLS requests and responses. | ||
type Server struct { | ||
RequestChan chan *rlspb.RouteLookupRequest | ||
ResponseChan chan Response | ||
Address string | ||
} | ||
|
||
// Start makes a new Server and gets it to start listening on a local port for | ||
// gRPC requests. The returned cancel function should be invoked by the caller | ||
// upon completion of the test. | ||
func Start() (*Server, func(), error) { | ||
lis, err := net.Listen("tcp", "localhost:0") | ||
if err != nil { | ||
return nil, func() {}, fmt.Errorf("net.Listen() failed: %v", err) | ||
} | ||
|
||
s := &Server{ | ||
// Give the channels a buffer size of 1 so that we can setup | ||
// expectations for one lookup call, without blocking. | ||
RequestChan: make(chan *rlspb.RouteLookupRequest, 1), | ||
ResponseChan: make(chan Response, 1), | ||
Address: lis.Addr().String(), | ||
} | ||
|
||
server := grpc.NewServer() | ||
rlsgrpc.RegisterRouteLookupServiceServer(server, s) | ||
go server.Serve(lis) | ||
|
||
return s, func() { server.Stop() }, nil | ||
} | ||
|
||
// RouteLookup implements the RouteLookupService. | ||
func (s *Server) RouteLookup(ctx context.Context, req *rlspb.RouteLookupRequest) (*rlspb.RouteLookupResponse, error) { | ||
s.RequestChan <- req | ||
resp := <-s.ResponseChan | ||
return resp.Resp, resp.Err | ||
} | ||
|
||
// ClientConn returns a grpc.ClientConn connected to the fakeServer. | ||
func (s *Server) ClientConn() (*grpc.ClientConn, func(), error) { | ||
ctx, cancel := context.WithTimeout(context.Background(), defaultDialTimeout) | ||
defer cancel() | ||
|
||
cc, err := grpc.DialContext(ctx, s.Address, grpc.WithInsecure(), grpc.WithBlock()) | ||
if err != nil { | ||
return nil, nil, fmt.Errorf("grpc.DialContext(%s) failed: %v", s.Address, err) | ||
} | ||
return cc, func() { cc.Close() }, nil | ||
} |