Skip to content

Commit

Permalink
Merge pull request etcd-io#10590 from jingyih/fix_readIndex_for_learner
Browse files Browse the repository at this point in the history
raft: leader respond to learner read index message
  • Loading branch information
xiang90 authored Mar 28, 2019
2 parents a645e27 + 5088d70 commit 952b9e7
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 2 deletions.
8 changes: 6 additions & 2 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1044,8 +1044,12 @@ func stepLeader(r *raft, m pb.Message) error {
r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: ri, Entries: m.Entries})
}
}
} else {
r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
} else { // there is only one voting member (the leader) in the cluster
if m.From == None || m.From == r.id { // from leader itself
r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
} else { // from learner member
r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: r.raftLog.committed, Entries: m.Entries})
}
}

return nil
Expand Down
51 changes: 51 additions & 0 deletions raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2416,6 +2416,57 @@ func TestReadOnlyOptionSafe(t *testing.T) {
}
}

func TestReadOnlyWithLearner(t *testing.T) {
a := newTestLearnerRaft(1, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage())
b := newTestLearnerRaft(2, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage())

nt := newNetwork(a, b)
setRandomizedElectionTimeout(b, b.electionTimeout+1)

for i := 0; i < b.electionTimeout; i++ {
b.tick()
}
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})

if a.state != StateLeader {
t.Fatalf("state = %s, want %s", a.state, StateLeader)
}

tests := []struct {
sm *raft
proposals int
wri uint64
wctx []byte
}{
{a, 10, 11, []byte("ctx1")},
{b, 10, 21, []byte("ctx2")},
{a, 10, 31, []byte("ctx3")},
{b, 10, 41, []byte("ctx4")},
}

for i, tt := range tests {
for j := 0; j < tt.proposals; j++ {
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
}

nt.send(pb.Message{From: tt.sm.id, To: tt.sm.id, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: tt.wctx}}})

r := tt.sm
if len(r.readStates) == 0 {
t.Fatalf("#%d: len(readStates) = 0, want non-zero", i)
}
rs := r.readStates[0]
if rs.Index != tt.wri {
t.Errorf("#%d: readIndex = %d, want %d", i, rs.Index, tt.wri)
}

if !bytes.Equal(rs.RequestCtx, tt.wctx) {
t.Errorf("#%d: requestCtx = %v, want %v", i, rs.RequestCtx, tt.wctx)
}
r.readStates = nil
}
}

func TestReadOnlyOptionLease(t *testing.T) {
a := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
b := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
Expand Down

0 comments on commit 952b9e7

Please sign in to comment.