Skip to content

Commit

Permalink
grpclb: fix issues caused by caching SubConns (grpc#1977)
Browse files Browse the repository at this point in the history
  • Loading branch information
menghanl authored and lyuxuan committed Apr 10, 2018
1 parent cf3bf7f commit 844b2a5
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 17 deletions.
14 changes: 11 additions & 3 deletions grpclb/grpclb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,9 +473,17 @@ func TestDropRequest(t *testing.T) {
ServerName: lbServerName,
}})

// The 1st, non-fail-fast RPC should succeed. This ensures both server
// connections are made, because the first one has DropForLoadBalancing set to true.
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
// Wait for the 1st, non-fail-fast RPC to succeed. This ensures both server
// connections are made, because the first one has DropForLoadBalancing set
// to true.
var i int
for i = 0; i < 1000; i++ {
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err == nil {
break
}
time.Sleep(time.Millisecond)
}
if i >= 1000 {
t.Fatalf("%v.SayHello(_, _) = _, %v, want _, <nil>", testC, err)
}
for _, failfast := range []bool{true, false} {
Expand Down
25 changes: 15 additions & 10 deletions grpclb_remote_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,16 @@ func (lb *lbBalancer) processServerList(l *lbpb.ServerList) {
}

// Call refreshSubConns to create/remove SubConns.
backendsUpdated := lb.refreshSubConns(backendAddrs)
// If no backend was updated, no SubConn will be newed/removed. But since
// the full serverList was different, there might be updates in drops or
// pick weights(different number of duplicates). We need to update picker
// with the fulllist.
if !backendsUpdated {
lb.regeneratePicker()
lb.cc.UpdateBalancerState(lb.state, lb.picker)
}
lb.refreshSubConns(backendAddrs)
// Regenerate and update picker no matter if there's update on backends (if
// any SubConn will be newed/removed). Because since the full serverList was
// different, there might be updates in drops or pick weights(different
// number of duplicates). We need to update picker with the fulllist.
//
// Now with cache, even if SubConn was newed/removed, there might be no
// state changes.
lb.regeneratePicker()
lb.cc.UpdateBalancerState(lb.state, lb.picker)
}

// refreshSubConns creates/removes SubConns with backendAddrs. It returns a bool
Expand Down Expand Up @@ -114,7 +115,11 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address) bool {
continue
}
lb.subConns[addrWithoutMD] = sc // Use the addr without MD as key for the map.
lb.scStates[sc] = connectivity.Idle
if _, ok := lb.scStates[sc]; !ok {
// Only set state of new sc to IDLE. The state could already be
// READY for cached SubConns.
lb.scStates[sc] = connectivity.Idle
}
sc.Connect()
}
}
Expand Down
9 changes: 5 additions & 4 deletions grpclb_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,15 +130,16 @@ func (ccc *lbCacheClientConn) NewSubConn(addrs []resolver.Address, opts balancer
if len(addrs) != 1 {
return nil, fmt.Errorf("grpclb calling NewSubConn with addrs of length %v", len(addrs))
}
addr := addrs[0]
addrWithoutMD := addrs[0]
addrWithoutMD.Metadata = nil

ccc.mu.Lock()
defer ccc.mu.Unlock()
if entry, ok := ccc.subConnCache[addr]; ok {
if entry, ok := ccc.subConnCache[addrWithoutMD]; ok {
// If entry is in subConnCache, the SubConn was being deleted.
// cancel function will never be nil.
entry.cancel()
delete(ccc.subConnCache, addr)
delete(ccc.subConnCache, addrWithoutMD)
return entry.sc, nil
}

Expand All @@ -147,7 +148,7 @@ func (ccc *lbCacheClientConn) NewSubConn(addrs []resolver.Address, opts balancer
return nil, err
}

ccc.subConnToAddr[scNew] = addr
ccc.subConnToAddr[scNew] = addrWithoutMD
return scNew, nil
}

Expand Down

0 comments on commit 844b2a5

Please sign in to comment.