Skip to content

Commit

Permalink
xds: serving mode changes outlined in gRFC A36 (grpc#4328)
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars authored Apr 26, 2021
1 parent 9572fd6 commit 52a707c
Show file tree
Hide file tree
Showing 9 changed files with 639 additions and 62 deletions.
5 changes: 5 additions & 0 deletions internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ var (
// gRPC server. An xDS-enabled server needs to know what type of credentials
// is configured on the underlying gRPC server. This is set by server.go.
GetServerCredentials interface{} // func (*grpc.Server) credentials.TransportCredentials
// DrainServerTransports initiates a graceful close of existing connections
// on a gRPC server accepted on the provided listener address. An
// xDS-enabled server invokes this method on a grpc.Server when a particular
// listener moves to "not-serving" mode.
DrainServerTransports interface{} // func(*grpc.Server, string)
)

// HealthChecker defines the signature of the client-side LB channel health checking function.
Expand Down
79 changes: 59 additions & 20 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,22 @@ import (
const (
defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
defaultServerMaxSendMessageSize = math.MaxInt32

// Server transports are tracked in a map which is keyed on listener
// address. For regular gRPC traffic, connections are accepted in Serve()
// through a call to Accept(), and we use the actual listener address as key
// when we add it to the map. But for connections received through
// ServeHTTP(), we do not have a listener and hence use this dummy value.
listenerAddressForServeHTTP = "listenerAddressForServeHTTP"
)

func init() {
internal.GetServerCredentials = func(srv *Server) credentials.TransportCredentials {
return srv.opts.creds
}
internal.DrainServerTransports = func(srv *Server, addr string) {
srv.drainServerTransports(addr)
}
}

var statusOK = status.New(codes.OK, "")
Expand Down Expand Up @@ -107,9 +117,12 @@ type serverWorkerData struct {
type Server struct {
opts serverOptions

mu sync.Mutex // guards following
lis map[net.Listener]bool
conns map[transport.ServerTransport]bool
mu sync.Mutex // guards following
lis map[net.Listener]bool
// conns contains all active server transports. It is a map keyed on a
// listener address with the value being the set of active transports
// belonging to that listener.
conns map[string]map[transport.ServerTransport]bool
serve bool
drain bool
cv *sync.Cond // signaled when connections close for GracefulStop
Expand Down Expand Up @@ -519,7 +532,7 @@ func NewServer(opt ...ServerOption) *Server {
s := &Server{
lis: make(map[net.Listener]bool),
opts: opts,
conns: make(map[transport.ServerTransport]bool),
conns: make(map[string]map[transport.ServerTransport]bool),
services: make(map[string]*serviceInfo),
quit: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
Expand Down Expand Up @@ -778,15 +791,15 @@ func (s *Server) Serve(lis net.Listener) error {
// s.conns before this conn can be added.
s.serveWG.Add(1)
go func() {
s.handleRawConn(rawConn)
s.handleRawConn(lis.Addr().String(), rawConn)
s.serveWG.Done()
}()
}
}

// handleRawConn forks a goroutine to handle a just-accepted connection that
// has not had any I/O performed on it yet.
func (s *Server) handleRawConn(rawConn net.Conn) {
func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
if s.quit.HasFired() {
rawConn.Close()
return
Expand Down Expand Up @@ -814,15 +827,24 @@ func (s *Server) handleRawConn(rawConn net.Conn) {
}

rawConn.SetDeadline(time.Time{})
if !s.addConn(st) {
if !s.addConn(lisAddr, st) {
return
}
go func() {
s.serveStreams(st)
s.removeConn(st)
s.removeConn(lisAddr, st)
}()
}

func (s *Server) drainServerTransports(addr string) {
s.mu.Lock()
conns := s.conns[addr]
for st := range conns {
st.Drain()
}
s.mu.Unlock()
}

// newHTTP2Transport sets up a http/2 transport (using the
// gRPC http2 server transport in transport/http2_server.go).
func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport {
Expand Down Expand Up @@ -924,10 +946,10 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if !s.addConn(st) {
if !s.addConn(listenerAddressForServeHTTP, st) {
return
}
defer s.removeConn(st)
defer s.removeConn(listenerAddressForServeHTTP, st)
s.serveStreams(st)
}

Expand Down Expand Up @@ -955,7 +977,7 @@ func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Strea
return trInfo
}

func (s *Server) addConn(st transport.ServerTransport) bool {
func (s *Server) addConn(addr string, st transport.ServerTransport) bool {
s.mu.Lock()
defer s.mu.Unlock()
if s.conns == nil {
Expand All @@ -967,15 +989,28 @@ func (s *Server) addConn(st transport.ServerTransport) bool {
// immediately.
st.Drain()
}
s.conns[st] = true

if s.conns[addr] == nil {
// Create a map entry if this is the first connection on this listener.
s.conns[addr] = make(map[transport.ServerTransport]bool)
}
s.conns[addr][st] = true
return true
}

func (s *Server) removeConn(st transport.ServerTransport) {
func (s *Server) removeConn(addr string, st transport.ServerTransport) {
s.mu.Lock()
defer s.mu.Unlock()
if s.conns != nil {
delete(s.conns, st)

conns := s.conns[addr]
if conns != nil {
delete(conns, st)
if len(conns) == 0 {
// If the last connection for this address is being removed, also
// remove the map entry corresponding to the address. This is used
// in GracefulStop() when waiting for all connections to be closed.
delete(s.conns, addr)
}
s.cv.Broadcast()
}
}
Expand Down Expand Up @@ -1639,7 +1674,7 @@ func (s *Server) Stop() {
s.mu.Lock()
listeners := s.lis
s.lis = nil
st := s.conns
conns := s.conns
s.conns = nil
// interrupt GracefulStop if Stop and GracefulStop are called concurrently.
s.cv.Broadcast()
Expand All @@ -1648,8 +1683,10 @@ func (s *Server) Stop() {
for lis := range listeners {
lis.Close()
}
for c := range st {
c.Close()
for _, cs := range conns {
for st := range cs {
st.Close()
}
}
if s.opts.numServerWorkers > 0 {
s.stopServerWorkers()
Expand Down Expand Up @@ -1686,8 +1723,10 @@ func (s *Server) GracefulStop() {
}
s.lis = nil
if !s.drain {
for st := range s.conns {
st.Drain()
for _, conns := range s.conns {
for st := range conns {
st.Drain()
}
}
s.drain = true
}
Expand Down
101 changes: 79 additions & 22 deletions xds/internal/server/listener_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,42 @@ var (
backoffFunc = bs.Backoff
)

// ServingMode indicates the current mode of operation of the server.
//
// This API exactly mirrors the one in the public xds package. We have to
// redefine it here to avoid a cyclic dependency.
type ServingMode int

const (
// ServingModeStarting indicates that the serving is starting up.
ServingModeStarting ServingMode = iota
// ServingModeServing indicates the the server contains all required xDS
// configuration is serving RPCs.
ServingModeServing
// ServingModeNotServing indicates that the server is not accepting new
// connections. Existing connections will be closed gracefully, allowing
// in-progress RPCs to complete. A server enters this mode when it does not
// contain the required xDS configuration to serve RPCs.
ServingModeNotServing
)

func (s ServingMode) String() string {
switch s {
case ServingModeNotServing:
return "not-serving"
case ServingModeServing:
return "serving"
default:
return "starting"
}
}

// ServingModeCallback is the callback that users can register to get notified
// about the server's serving mode changes. The callback is invoked with the
// address of the listener and its new mode. The err parameter is set to a
// non-nil error if the server has transitioned into not-serving mode.
type ServingModeCallback func(addr net.Addr, mode ServingMode, err error)

func prefixLogger(p *listenerWrapper) *internalgrpclog.PrefixLogger {
return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[xds-server-listener %p] ", p))
}
Expand All @@ -70,6 +106,8 @@ type ListenerWrapperParams struct {
XDSCredsInUse bool
// XDSClient provides the functionality from the xdsClient required here.
XDSClient XDSClientInterface
// ModeCallback is the callback to invoke when the serving mode changes.
ModeCallback ServingModeCallback
}

// NewListenerWrapper creates a new listenerWrapper with params. It returns a
Expand All @@ -83,6 +121,7 @@ func NewListenerWrapper(params ListenerWrapperParams) (net.Listener, <-chan stru
name: params.ListenerResourceName,
xdsCredsInUse: params.XDSCredsInUse,
xdsC: params.XDSClient,
modeCallback: params.ModeCallback,
isUnspecifiedAddr: params.Listener.Addr().(*net.TCPAddr).IP.IsUnspecified(),

closed: grpcsync.NewEvent(),
Expand Down Expand Up @@ -111,12 +150,11 @@ type listenerWrapper struct {
net.Listener
logger *internalgrpclog.PrefixLogger

// TODO: Maintain serving state of this listener.

name string
xdsCredsInUse bool
xdsC XDSClientInterface
cancelWatch func()
modeCallback ServingModeCallback

// Set to true if the listener is bound to the IP_ANY address (which is
// "0.0.0.0" for IPv4 and "::" for IPv6).
Expand All @@ -138,11 +176,14 @@ type listenerWrapper struct {
// updates received in the callback if this event has fired.
closed *grpcsync.Event

// Filter chains received as part of the last good update. The reason for
// using an rw lock here is that this field will be read by all connections
// during their server-side handshake (in the hot path), but writes to this
// happen rarely (when we get a Listener resource update).
mu sync.RWMutex
// mu guards access to the current serving mode and the filter chains. The
// reason for using an rw lock here is that these fields are read in
// Accept() for all incoming connections, but writes happen rarely (when we
// get a Listener resource update).
mu sync.RWMutex
// Current serving mode.
mode ServingMode
// Filter chains received as part of the last good update.
filterChains *xdsclient.FilterChainManager
}

Expand Down Expand Up @@ -175,8 +216,6 @@ func (l *listenerWrapper) Accept() (net.Conn, error) {
// Reset retries after a successful Accept().
retries = 0

// TODO: Close connections if in "non-serving" state

// Since the net.Conn represents an incoming connection, the source and
// destination address can be retrieved from the local address and
// remote address of the net.Conn respectively.
Expand All @@ -191,6 +230,17 @@ func (l *listenerWrapper) Accept() (net.Conn, error) {
}

l.mu.RLock()
if l.mode == ServingModeNotServing {
// Close connections as soon as we accept them when we are in
// "not-serving" mode. Since we accept a net.Listener from the user
// in Serve(), we cannot close the listener when we move to
// "not-serving". Closing the connection immediately upon accepting
// is one of the other ways to implement the "not-serving" mode as
// outlined in gRFC A36.
l.mu.RUnlock()
conn.Close()
continue
}
fc, err := l.filterChains.Lookup(xdsclient.FilterChainLookupParams{
IsUnspecifiedListener: l.isUnspecifiedAddr,
DestAddr: destAddr.IP,
Expand Down Expand Up @@ -236,14 +286,13 @@ func (l *listenerWrapper) handleListenerUpdate(update xdsclient.ListenerUpdate,
return
}

// TODO: Handle resource-not-found errors by moving to not-serving state.
if err != nil {
// We simply log an error here and hope we get a successful update
// in the future. The error could be because of a timeout or an
// actual error, like the requested resource not found. In any case,
// it is fine for the server to hang indefinitely until Stop() is
// called.
l.logger.Warningf("Received error for resource %q: %+v", l.name, err)
if xdsclient.ErrType(err) == xdsclient.ErrorTypeResourceNotFound {
l.switchMode(nil, ServingModeNotServing, err)
}
// For errors which are anything other than "resource-not-found", we
// continue to use the old configuration.
return
}
l.logger.Infof("Received update for resource %q: %+v", l.name, update)
Expand All @@ -258,18 +307,26 @@ func (l *listenerWrapper) handleListenerUpdate(update xdsclient.ListenerUpdate,
// appropriate context to perform this check.
//
// What this means is that the xdsClient has ACKed a resource which can push
// the server into a "not serving" state. This is not ideal, but this is
// the server into a "not serving" mode. This is not ideal, but this is
// what we have decided to do. See gRPC A36 for more details.
ilc := update.InboundListenerCfg
if ilc.Address != l.addr || ilc.Port != l.port {
// TODO: Switch to "not serving" if the host:port does not match.
l.logger.Warningf("Received host:port (%s:%d) in Listener update does not match local listening address: (%s:%s", ilc.Address, ilc.Port, l.addr, l.port)
l.switchMode(nil, ServingModeNotServing, fmt.Errorf("address (%s:%s) in Listener update does not match listening address: (%s:%s)", ilc.Address, ilc.Port, l.addr, l.port))
return
}

l.mu.Lock()
l.filterChains = ilc.FilterChains
l.mu.Unlock()
l.switchMode(ilc.FilterChains, ServingModeServing, nil)
l.goodUpdate.Fire()
// TODO: Move to serving state on receipt of a good response.
}

func (l *listenerWrapper) switchMode(fcs *xdsclient.FilterChainManager, newMode ServingMode, err error) {
l.mu.Lock()
defer l.mu.Unlock()

l.filterChains = fcs
l.mode = newMode
if l.modeCallback != nil {
l.modeCallback(l.Listener.Addr(), newMode, err)
}
l.logger.Warningf("Listener %q entering mode: %q due to error: %v", l.Addr(), newMode, err)
}
3 changes: 2 additions & 1 deletion xds/internal/test/xds_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import (
)

const (
defaultTestTimeout = 10 * time.Second
defaultTestTimeout = 10 * time.Second
defaultTestShortTimeout = 100 * time.Millisecond
)

type s struct {
Expand Down
Loading

0 comments on commit 52a707c

Please sign in to comment.