From bac9e1d16ebc13f65c54f54bf22677b61da2285f Mon Sep 17 00:00:00 2001 From: Mark Nevill Date: Wed, 21 Sep 2016 14:52:10 +0100 Subject: [PATCH] Make concurrent Server.GracefulStop calls all behave equivalently. --- server.go | 14 ++++--- test/end2end_test.go | 87 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 95 insertions(+), 6 deletions(-) diff --git a/server.go b/server.go index 2f73ccac1d19..e0bb187ef975 100644 --- a/server.go +++ b/server.go @@ -527,7 +527,7 @@ func (s *Server) removeConn(c io.Closer) { defer s.mu.Unlock() if s.conns != nil { delete(s.conns, c) - s.cv.Signal() + s.cv.Broadcast() } } @@ -828,7 +828,7 @@ func (s *Server) Stop() { st := s.conns s.conns = nil // interrupt GracefulStop if Stop and GracefulStop are called concurrently. - s.cv.Signal() + s.cv.Broadcast() s.mu.Unlock() for lis := range listeners { @@ -852,17 +852,19 @@ func (s *Server) Stop() { func (s *Server) GracefulStop() { s.mu.Lock() defer s.mu.Unlock() - if s.drain == true || s.conns == nil { + if s.conns == nil { return } - s.drain = true for lis := range s.lis { lis.Close() } s.lis = nil s.cancel() - for c := range s.conns { - c.(transport.ServerTransport).Drain() + if !s.drain { + for c := range s.conns { + c.(transport.ServerTransport).Drain() + } + s.drain = true } for len(s.conns) != 0 { s.cv.Wait() diff --git a/test/end2end_test.go b/test/end2end_test.go index c4178ef88d4c..73071293713d 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -765,6 +765,93 @@ func testServerGoAwayPendingRPC(t *testing.T, e env) { awaitNewConnLogOutput() } +func TestServerMultipleGoAwayPendingRPC(t *testing.T) { + defer leakCheck(t)() + for _, e := range listTestEnv() { + if e.name == "handler-tls" { + continue + } + testServerMultipleGoAwayPendingRPC(t, e) + } +} + +func testServerMultipleGoAwayPendingRPC(t *testing.T, e env) { + te := newTest(t, e) + te.userAgent = testAppUA + te.declareLogNoise( + "transport: http2Client.notifyError got notified that the client transport was broken EOF", + "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", + "grpc: addrConn.resetTransport failed to create client transport: connection error", + ) + te.startServer(&testServer{security: e.security}) + defer te.tearDown() + + cc := te.clientConn() + tc := testpb.NewTestServiceClient(cc) + ctx, cancel := context.WithCancel(context.Background()) + stream, err := tc.FullDuplexCall(ctx, grpc.FailFast(false)) + if err != nil { + t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) + } + // Finish an RPC to make sure the connection is good. + if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil { + t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, ", tc, err) + } + ch1 := make(chan struct{}) + go func() { + te.srv.GracefulStop() + close(ch1) + }() + ch2 := make(chan struct{}) + go func() { + te.srv.GracefulStop() + close(ch2) + }() + // Loop until the server side GoAway signal is propagated to the client. + for { + ctx, _ := context.WithTimeout(context.Background(), 10*time.Millisecond) + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err == nil { + continue + } + break + } + select { + case <-ch1: + t.Fatal("GracefulStop() terminated early") + case <-ch2: + t.Fatal("GracefulStop() terminated early") + default: + } + respParam := []*testpb.ResponseParameters{ + { + Size: proto.Int32(1), + }, + } + payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100)) + if err != nil { + t.Fatal(err) + } + req := &testpb.StreamingOutputCallRequest{ + ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(), + ResponseParameters: respParam, + Payload: payload, + } + // The existing RPC should be still good to proceed. + if err := stream.Send(req); err != nil { + t.Fatalf("%v.Send(%v) = %v, want ", stream, req, err) + } + if _, err := stream.Recv(); err != nil { + t.Fatalf("%v.Recv() = _, %v, want _, ", stream, err) + } + if err := stream.CloseSend(); err != nil { + t.Fatalf("%v.CloseSend() = %v, want ", stream, err) + } + <-ch1 + <-ch2 + cancel() + awaitNewConnLogOutput() +} + func TestConcurrentClientConnCloseAndServerGoAway(t *testing.T) { defer leakCheck(t)() for _, e := range listTestEnv() {