Skip to content

Commit

Permalink
dragonboat: NodeHost.GetLeaderID() now returns the Raft term value
Browse files Browse the repository at this point in the history
  • Loading branch information
lni committed Jun 15, 2022
1 parent 6ae1d0b commit 7b2d6c1
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 16 deletions.
6 changes: 3 additions & 3 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,13 +569,13 @@ func (n *node) requestAddWitnessWithOrderID(replicaID uint64,
return n.requestConfigChange(pb.AddWitness, replicaID, target, order, timeout)
}

func (n *node) getLeaderID() (uint64, bool) {
func (n *node) getLeaderID() (uint64, uint64, bool) {
lv := n.leaderInfo.Load()
if lv == nil {
return 0, false
return 0, 0, false
}
leaderInfo := lv.(*leaderInfo)
return leaderInfo.leaderID, leaderInfo.leaderID != raft.NoLeader
return leaderInfo.leaderID, leaderInfo.term, leaderInfo.leaderID != raft.NoLeader
}

func (n *node) destroy() error {
Expand Down
2 changes: 1 addition & 1 deletion node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ func TestLeaderIDCanBeQueried(t *testing.T) {
tf := func(t *testing.T, nodes []*node,
smList []*rsm.StateMachine, router *testRouter, ldb raftio.ILogDB) {
n := nodes[0]
v, ok := n.getLeaderID()
v, _, ok := n.getLeaderID()
if !ok {
t.Errorf("failed to get leader id")
}
Expand Down
14 changes: 7 additions & 7 deletions nodehost.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ func (nh *NodeHost) GetNodeHostRegistry() (INodeHostRegistry, bool) {
// started is backed by a regular state machine that implements the
// sm.IStateMachine interface.
//
// The input parameter initialMembers is a map of node ID to node target for all
// The input parameter initialMembers is a map of replica ID to replica target for all
// Raft shard's initial member nodes. By default, the target is the
// RaftAddress value of the NodeHost where the node will be running. When running
// in the AddressByNodeHostID mode, target should be set to the NodeHostID value
Expand Down Expand Up @@ -675,19 +675,19 @@ func (nh *NodeHost) SyncGetShardMembership(ctx context.Context,
return v.(*Membership), nil
}

// GetLeaderID returns the leader node ID of the specified Raft shard based
// GetLeaderID returns the leader replica ID of the specified Raft shard based
// on local node's knowledge. The returned boolean value indicates whether the
// leader information is available.
func (nh *NodeHost) GetLeaderID(shardID uint64) (uint64, bool, error) {
func (nh *NodeHost) GetLeaderID(shardID uint64) (uint64, uint64, bool, error) {
if atomic.LoadInt32(&nh.closed) != 0 {
return 0, false, ErrClosed
return 0, 0, false, ErrClosed
}
v, ok := nh.getShard(shardID)
if !ok {
return 0, false, ErrShardNotFound
return 0, 0, false, ErrShardNotFound
}
leaderID, valid := v.getLeaderID()
return leaderID, valid, nil
leaderID, term, valid := v.getLeaderID()
return leaderID, term, valid, nil
}

// GetNoOPSession returns a NO-OP client session ready to be used for making
Expand Down
13 changes: 8 additions & 5 deletions nodehost_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1473,7 +1473,7 @@ func createRateLimitedTwoTestNodeHosts(addr1 string, addr2 string,
var followerNh *NodeHost

for i := 0; i < 200; i++ {
leaderID, ready, err := nh1.GetLeaderID(1)
leaderID, _, ready, err := nh1.GetLeaderID(1)
if err == nil && ready {
if leaderID == 1 {
leaderNh = nh1
Expand Down Expand Up @@ -1523,8 +1523,11 @@ func rateLimitedTwoNodeHostTest(t *testing.T,

func waitForLeaderToBeElected(t *testing.T, nh *NodeHost, shardID uint64) {
for i := 0; i < 200; i++ {
_, ready, err := nh.GetLeaderID(shardID)
_, term, ready, err := nh.GetLeaderID(shardID)
if err == nil && ready {
if term == 0 {
panic("term is 0")
}
return
}
time.Sleep(100 * time.Millisecond)
Expand Down Expand Up @@ -1797,7 +1800,7 @@ func TestErrShardNotFoundCanBeReturned(t *testing.T) {
defaultTestNode: true,
tf: func(nh *NodeHost) {
pto := lpto(nh)
_, _, err := nh.GetLeaderID(1234)
_, _, _, err := nh.GetLeaderID(1234)
if err != ErrShardNotFound {
t.Errorf("failed to return ErrShardNotFound, %v", err)
}
Expand Down Expand Up @@ -4854,7 +4857,7 @@ func TestDroppedRequestsAreReported(t *testing.T) {
t.Fatalf("failed to add node %v", err)
}
for i := 0; i < 1000; i++ {
_, ok, err := nh.GetLeaderID(1)
_, _, ok, err := nh.GetLeaderID(1)
if err != nil {
t.Fatalf("failed to get leader id %v", err)
}
Expand Down Expand Up @@ -5429,7 +5432,7 @@ func TestUsingClosedNodeHostIsNotAllowed(t *testing.T) {
if _, err := nh.SyncGetShardMembership(ctx, 1); err != ErrClosed {
t.Errorf("failed to return ErrClosed")
}
if _, _, err := nh.GetLeaderID(1); err != ErrClosed {
if _, _, _, err := nh.GetLeaderID(1); err != ErrClosed {
t.Errorf("failed to return ErrClosed")
}
if _, err := nh.Propose(nil, nil, time.Second); err != ErrClosed {
Expand Down

0 comments on commit 7b2d6c1

Please sign in to comment.