Skip to content

Commit

Permalink
raft: move more methods onto the progress tracker
Browse files Browse the repository at this point in the history
Continues what was initiated in the last commit.
  • Loading branch information
tbg committed May 21, 2019
1 parent e60a57a commit 518a731
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 123 deletions.
10 changes: 5 additions & 5 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,15 +353,15 @@ func (n *node) run(r *raft) {
}
case m := <-n.recvc:
// filter out response message from unknown From.
if pr := r.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) {
if pr := r.prs.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) {
r.Step(m)
}
case cc := <-n.confc:
if cc.NodeID == None {
select {
case n.confstatec <- pb.ConfState{
Nodes: r.nodes(),
Learners: r.learnerNodes()}:
Nodes: r.prs.voterNodes(),
Learners: r.prs.learnerNodes()}:
case <-n.done:
}
break
Expand All @@ -384,8 +384,8 @@ func (n *node) run(r *raft) {
}
select {
case n.confstatec <- pb.ConfState{
Nodes: r.nodes(),
Learners: r.learnerNodes()}:
Nodes: r.prs.voterNodes(),
Learners: r.prs.learnerNodes()}:
case <-n.done:
}
case <-n.tickc:
Expand Down
69 changes: 63 additions & 6 deletions progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,22 +291,26 @@ func (in *inflights) reset() {
// the nodes and learners in it. In particular, it tracks the match index for
// each peer which in turn allows reasoning about the committed index.
type prs struct {
nodes map[uint64]*Progress
learners map[uint64]*Progress
matchBuf uint64Slice
nodes map[uint64]*Progress
learners map[uint64]*Progress
maxInflight int
matchBuf uint64Slice
}

func makePRS() prs {
func makePRS(maxInflight int) prs {
return prs{
nodes: map[uint64]*Progress{},
learners: map[uint64]*Progress{},
nodes: map[uint64]*Progress{},
learners: map[uint64]*Progress{},
maxInflight: maxInflight,
}
}

func (p *prs) quorum() int {
return len(p.nodes)/2 + 1
}

// committed returns the largest log index known to be committed based on what
// the voting members of the group have acknowledged.
func (p *prs) committed() uint64 {
// Preserving matchBuf across calls is an optimization
// used to avoid allocating a new slice on each call.
Expand All @@ -327,3 +331,56 @@ func (p *prs) removeAny(id uint64) {
delete(p.nodes, id)
delete(p.learners, id)
}

func (p *prs) getProgress(id uint64) *Progress {
if pr, ok := p.nodes[id]; ok {
return pr
}

return p.learners[id]
}

// initProgress initializes a new progress for the given node, replacing any that
// may exist. It is invalid to replace a voter by a learner and attempts to do so
// will result in a panic.
func (p *prs) initProgress(id, match, next uint64, isLearner bool) {
if !isLearner {
delete(p.learners, id)
p.nodes[id] = &Progress{Next: next, Match: match, ins: newInflights(p.maxInflight)}
return
}

if _, ok := p.nodes[id]; ok {
panic(fmt.Sprintf("changing from voter to learner for %x", id))
}
p.learners[id] = &Progress{Next: next, Match: match, ins: newInflights(p.maxInflight), IsLearner: true}
}

func (p *prs) voterNodes() []uint64 {
nodes := make([]uint64, 0, len(p.nodes))
for id := range p.nodes {
nodes = append(nodes, id)
}
sort.Sort(uint64Slice(nodes))
return nodes
}

func (p *prs) learnerNodes() []uint64 {
nodes := make([]uint64, 0, len(p.learners))
for id := range p.learners {
nodes = append(nodes, id)
}
sort.Sort(uint64Slice(nodes))
return nodes
}

// visit invokes the supplied closure for all tracked progresses.
func (p *prs) visit(f func(id uint64, pr *Progress)) {
for id, pr := range p.nodes {
f(id, pr)
}

for id, pr := range p.learners {
f(id, pr)
}
}
110 changes: 29 additions & 81 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"
"math"
"math/rand"
"sort"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -261,7 +260,6 @@ type raft struct {

maxMsgSize uint64
maxUncommittedSize uint64
maxInflight int
prs prs

state StateType
Expand Down Expand Up @@ -346,9 +344,8 @@ func newRaft(c *Config) *raft {
isLearner: false,
raftLog: raftlog,
maxMsgSize: c.MaxSizePerMsg,
maxInflight: c.MaxInflightMsgs,
maxUncommittedSize: c.MaxUncommittedEntriesSize,
prs: makePRS(),
prs: makePRS(c.MaxInflightMsgs),
electionTimeout: c.ElectionTick,
heartbeatTimeout: c.HeartbeatTick,
logger: c.Logger,
Expand All @@ -358,13 +355,13 @@ func newRaft(c *Config) *raft {
disableProposalForwarding: c.DisableProposalForwarding,
}
for _, p := range peers {
r.prs.nodes[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight)}
// Add node to active config.
r.prs.initProgress(p, 0 /* match */, 1 /* next */, false /* isLearner */)
}
for _, p := range learners {
if _, ok := r.prs.nodes[p]; ok {
panic(fmt.Sprintf("node %x is in both learner and peer list", p))
}
r.prs.learners[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight), IsLearner: true}
// Add learner to active config.
r.prs.initProgress(p, 0 /* match */, 1 /* next */, true /* isLearner */)

if r.id == p {
r.isLearner = true
}
Expand All @@ -379,7 +376,7 @@ func newRaft(c *Config) *raft {
r.becomeFollower(r.Term, None)

var nodesStrs []string
for _, n := range r.nodes() {
for _, n := range r.prs.voterNodes() {
nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n))
}

Expand All @@ -400,24 +397,6 @@ func (r *raft) hardState() pb.HardState {
}
}

func (r *raft) nodes() []uint64 {
nodes := make([]uint64, 0, len(r.prs.nodes))
for id := range r.prs.nodes {
nodes = append(nodes, id)
}
sort.Sort(uint64Slice(nodes))
return nodes
}

func (r *raft) learnerNodes() []uint64 {
nodes := make([]uint64, 0, len(r.prs.learners))
for id := range r.prs.learners {
nodes = append(nodes, id)
}
sort.Sort(uint64Slice(nodes))
return nodes
}

// send persists state to stable storage and then sends to its mailbox.
func (r *raft) send(m pb.Message) {
m.From = r.id
Expand Down Expand Up @@ -452,14 +431,6 @@ func (r *raft) send(m pb.Message) {
r.msgs = append(r.msgs, m)
}

func (r *raft) getProgress(id uint64) *Progress {
if pr, ok := r.prs.nodes[id]; ok {
return pr
}

return r.prs.learners[id]
}

// sendAppend sends an append RPC with new entries (if any) and the
// current commit index to the given peer.
func (r *raft) sendAppend(to uint64) {
Expand All @@ -472,7 +443,7 @@ func (r *raft) sendAppend(to uint64) {
// ("empty" messages are useful to convey updated Commit indexes, but
// are undesirable when we're sending multiple messages in a batch).
func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
pr := r.getProgress(to)
pr := r.prs.getProgress(to)
if pr.IsPaused() {
return false
}
Expand Down Expand Up @@ -541,7 +512,7 @@ func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
// or it might not have all the committed entries.
// The leader MUST NOT forward the follower's commit to
// an unmatched index.
commit := min(r.getProgress(to).Match, r.raftLog.committed)
commit := min(r.prs.getProgress(to).Match, r.raftLog.committed)
m := pb.Message{
To: to,
Type: pb.MsgHeartbeat,
Expand All @@ -552,20 +523,10 @@ func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
r.send(m)
}

func (r *raft) forEachProgress(f func(id uint64, pr *Progress)) {
for id, pr := range r.prs.nodes {
f(id, pr)
}

for id, pr := range r.prs.learners {
f(id, pr)
}
}

// bcastAppend sends RPC, with entries to all peers that are not up-to-date
// according to the progress recorded in r.prs.
func (r *raft) bcastAppend() {
r.forEachProgress(func(id uint64, _ *Progress) {
r.prs.visit(func(id uint64, _ *Progress) {
if id == r.id {
return
}
Expand All @@ -585,7 +546,7 @@ func (r *raft) bcastHeartbeat() {
}

func (r *raft) bcastHeartbeatWithCtx(ctx []byte) {
r.forEachProgress(func(id uint64, _ *Progress) {
r.prs.visit(func(id uint64, _ *Progress) {
if id == r.id {
return
}
Expand Down Expand Up @@ -615,8 +576,8 @@ func (r *raft) reset(term uint64) {
r.abortLeaderTransfer()

r.votes = make(map[uint64]bool)
r.forEachProgress(func(id uint64, pr *Progress) {
*pr = Progress{Next: r.raftLog.lastIndex() + 1, ins: newInflights(r.maxInflight), IsLearner: pr.IsLearner}
r.prs.visit(func(id uint64, pr *Progress) {
*pr = Progress{Next: r.raftLog.lastIndex() + 1, ins: newInflights(r.prs.maxInflight), IsLearner: pr.IsLearner}
if id == r.id {
pr.Match = r.raftLog.lastIndex()
}
Expand Down Expand Up @@ -644,7 +605,7 @@ func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {
}
// use latest "last" index after truncate/append
li = r.raftLog.append(es...)
r.getProgress(r.id).maybeUpdate(li)
r.prs.getProgress(r.id).maybeUpdate(li)
// Regardless of maybeCommit's return, our caller will call bcastAppend.
r.maybeCommit()
return true
Expand Down Expand Up @@ -738,7 +699,7 @@ func (r *raft) becomeLeader() {
// (perhaps after having received a snapshot as a result). The leader is
// trivially in this state. Note that r.reset() has initialized this
// progress with the last index already.
r.prs.nodes[r.id].becomeReplicate()
r.prs.getProgress(r.id).becomeReplicate()

// Conservatively set the pendingConfIndex to the last index in the
// log. There may or may not be a pending config change, but it's
Expand Down Expand Up @@ -1040,7 +1001,7 @@ func stepLeader(r *raft, m pb.Message) error {
}

// All other message types require a progress for m.From (pr).
pr := r.getProgress(m.From)
pr := r.prs.getProgress(m.From)
if pr == nil {
r.logger.Debugf("%x no progress available for %x", r.id, m.From)
return nil
Expand Down Expand Up @@ -1367,16 +1328,16 @@ func (r *raft) restoreNode(nodes []uint64, isLearner bool) {
match = next - 1
r.isLearner = isLearner
}
r.setProgress(n, match, next, isLearner)
r.logger.Infof("%x restored progress of %x [%s]", r.id, n, r.getProgress(n))
r.prs.initProgress(n, match, next, isLearner)
r.logger.Infof("%x restored progress of %x [%s]", r.id, n, r.prs.getProgress(n))
}
}

// promotable indicates whether state machine can be promoted to leader,
// which is true when its own id is in progress list.
func (r *raft) promotable() bool {
_, ok := r.prs.nodes[r.id]
return ok
pr := r.prs.getProgress(r.id)
return pr != nil && !pr.IsLearner
}

func (r *raft) addNode(id uint64) {
Expand All @@ -1388,12 +1349,12 @@ func (r *raft) addLearner(id uint64) {
}

func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) {
pr := r.getProgress(id)
pr := r.prs.getProgress(id)
if pr == nil {
r.setProgress(id, 0, r.raftLog.lastIndex()+1, isLearner)
r.prs.initProgress(id, 0, r.raftLog.lastIndex()+1, isLearner)
} else {
if isLearner && !pr.IsLearner {
// can only change Learner to Voter
// Can only change Learner to Voter.
r.logger.Infof("%x ignored addLearner: do not support changing %x from raft peer to learner.", r.id, id)
return
}
Expand All @@ -1404,10 +1365,11 @@ func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) {
return
}

// change Learner to Voter, use origin Learner progress
delete(r.prs.learners, id)
// Change Learner to Voter, use origin Learner progress.
r.prs.removeAny(id)
r.prs.initProgress(id, 0 /* match */, 1 /* next */, false /* isLearner */)
pr.IsLearner = false
r.prs.nodes[id] = pr
*r.prs.getProgress(id) = *pr
}

if r.id == id {
Expand All @@ -1417,8 +1379,7 @@ func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) {
// When a node is first added, we should mark it as recently active.
// Otherwise, CheckQuorum may cause us to step down if it is invoked
// before the added node has a chance to communicate with us.
pr = r.getProgress(id)
pr.RecentActive = true
r.prs.getProgress(id).RecentActive = true
}

func (r *raft) removeNode(id uint64) {
Expand All @@ -1440,19 +1401,6 @@ func (r *raft) removeNode(id uint64) {
}
}

func (r *raft) setProgress(id, match, next uint64, isLearner bool) {
if !isLearner {
delete(r.prs.learners, id)
r.prs.nodes[id] = &Progress{Next: next, Match: match, ins: newInflights(r.maxInflight)}
return
}

if _, ok := r.prs.nodes[id]; ok {
panic(fmt.Sprintf("%x unexpected changing from voter to learner for %x", r.id, id))
}
r.prs.learners[id] = &Progress{Next: next, Match: match, ins: newInflights(r.maxInflight), IsLearner: true}
}

func (r *raft) loadState(state pb.HardState) {
if state.Commit < r.raftLog.committed || state.Commit > r.raftLog.lastIndex() {
r.logger.Panicf("%x state.commit %d is out of range [%d, %d]", r.id, state.Commit, r.raftLog.committed, r.raftLog.lastIndex())
Expand Down Expand Up @@ -1480,7 +1428,7 @@ func (r *raft) resetRandomizedElectionTimeout() {
func (r *raft) checkQuorumActive() bool {
var act int

r.forEachProgress(func(id uint64, pr *Progress) {
r.prs.visit(func(id uint64, pr *Progress) {
if id == r.id { // self is always active
act++
return
Expand Down
Loading

0 comments on commit 518a731

Please sign in to comment.