Skip to content

Commit

Permalink
feat: [2C] basic log compaction done.
Browse files Browse the repository at this point in the history
  • Loading branch information
YangWithU committed Aug 5, 2024
1 parent d4775fe commit e6f1fa4
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 47 deletions.
23 changes: 21 additions & 2 deletions src/raft/commiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,36 @@ type ApplyMsg struct {
func (rf *Raft) committer() {
rf.mu.Lock()
for !rf.killed() {
if newCommittedEntries := rf.log.newCommittedEntries(); len(newCommittedEntries) > 0 {
if rf.log.hasPendingSnapshot {
rf.logger.hasPendingSNP()
snapshot := rf.log.cloneSnapShot()
rf.mu.Unlock()

rf.applyCh <- ApplyMsg{
SnapshotValid: true,
Snapshot: snapshot.Data,
SnapshotTerm: int(snapshot.Term),
SnapshotIndex: int(snapshot.Index),
}

rf.mu.Lock()
rf.log.hasPendingSnapshot = false
rf.logger.pushSnap(snapshot.Index, snapshot.Term)
} else if newCommittedEntries := rf.log.newCommittedEntries(); len(newCommittedEntries) > 0 {
rf.logger.hasCmitEnt(newCommittedEntries)
rf.mu.Unlock()

for _, entry := range newCommittedEntries {
rf.applyCh <- ApplyMsg{CommandValid: true, Command: entry.Data, CommandIndex: int(entry.Index)}
}

rf.mu.Lock()
rf.log.appliedTo(newCommittedEntries[len(newCommittedEntries)-1].Index)
applied := max(rf.log.applied, newCommittedEntries[len(newCommittedEntries)-1].Index)
rf.log.appliedTo(applied)
} else {
rf.logger.printf(SNAP, "N%v waits", rf.me)
rf.hasNewCommittedEntries.Wait()
rf.logger.printf(SNAP, "N%v awakes", rf.me)
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/raft/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func (cfg *config) applierSnap(i int, applyCh chan ApplyMsg) {
cfg.lastApplied[i] = m.CommandIndex
cfg.mu.Unlock()

// 时间到,写入snapshot
// 每9个Index变一下snapshot.Data
if (m.CommandIndex+1)%SnapShotInterval == 0 {
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
Expand Down
50 changes: 35 additions & 15 deletions src/raft/log.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package raft

import "errors"
import (
"errors"
)

type LogEntry struct {
Index uint64
Expand All @@ -22,6 +24,8 @@ type SnapShot struct {
type Log struct {
snapShot SnapShot

hasPendingSnapshot bool

// persisted log entries.
entries []LogEntry

Expand All @@ -36,10 +40,11 @@ type Log struct {

func makeLog() Log {
log := Log{
snapShot: SnapShot{Index: 0, Term: 0, Data: nil},
entries: make([]LogEntry, 1),
applied: 0,
committed: 0,
snapShot: SnapShot{Index: 0, Term: 0, Data: nil},
hasPendingSnapshot: false,
entries: make([]LogEntry, 1),
applied: 0,
committed: 0,
}

log.setDummyLogEntry()
Expand Down Expand Up @@ -95,19 +100,24 @@ func (log *Log) slice(start, end uint64) ([]LogEntry, error) {
}

func (log *Log) committedTo(index uint64) {
oriCommitted := log.committed
log.committed = index
log.logger.updateCommitted(oriCommitted)
if index > log.committed {
oriCommitted := log.committed
log.committed = index
log.logger.updateCommitted(oriCommitted)
}
}

func (log *Log) appliedTo(index uint64) {
oriApplied := log.applied
log.applied = index
log.logger.updateApplied(oriApplied)
if index > log.applied {
oriApplied := log.applied
log.applied = index
log.logger.updateApplied(oriApplied)
}
}

// 压缩当前rf.log的entries
// 给定snapshot,
// 将old替换为给定snapshot,截断给定snapshot.Index+1前内容
// 设置entry[0] dummy
func (log *Log) toCompactSnapShot(snapshot SnapShot) {
snapSuffix := make([]LogEntry, 0)
pos := snapshot.Index + 1
Expand All @@ -120,10 +130,16 @@ func (log *Log) toCompactSnapShot(snapshot SnapShot) {
log.snapShot = snapshot
log.setDummyLogEntry()

log.committed = max(log.committed, log.snapShot.Index)
log.applied = max(log.applied, log.snapShot.Index)
if log.snapShot.Index > log.committed {
log.committedTo(log.snapShot.Index)
}
if log.snapShot.Index > log.applied {
log.appliedTo(log.snapShot.Index)
}

log.logger.compactedTo(log.snapShot.Index, log.snapShot.Term)
lastLogIndex := log.lastIndex()
lastLogTerm, _ := log.term(lastLogIndex)
log.logger.compactedTo(lastLogIndex, lastLogTerm)
}

func (log *Log) mayCommittedTo(leaderCommittedIndex uint64) {
Expand Down Expand Up @@ -163,6 +179,10 @@ func (log *Log) clone(entries []LogEntry) []LogEntry {
func (log *Log) newCommittedEntries() []LogEntry {
start := log.toArrayIndex(log.applied + 1)
end := log.toArrayIndex(log.committed + 1)

log.logger.printf(SNAP, "newCommittedEntries [start=%v, end=%v) LN=%v",
log.applied+1, log.committed+1, len(log.entries[start:end]))

if start >= end {
return nil
}
Expand Down
39 changes: 32 additions & 7 deletions src/raft/log_compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,21 @@ import "time"
// all info up to and including index. this means the
// service no longer needs the log through (and including)
// that index. Raft should now trim its log as much as possible.
// 手动主动存snapshot,假如entries中存在
func (rf *Raft) Snapshot(index int, snapshot []byte) {
// Your code here (2D).
rf.mu.Lock()
defer rf.mu.Unlock()

rf.logger.pullSnap(uint64(index))

if rf.log.hasPendingSnapshot {
return
}

snapshotIndex := uint64(index)
snapshotTerm, err := rf.log.term(snapshotIndex)
if err != nil && snapshotIndex > rf.log.snapShot.Index {
if err == nil && snapshotIndex > rf.log.snapShot.Index {
initialSnapShot := SnapShot{Data: snapshot, Index: snapshotIndex, Term: snapshotTerm}
rf.log.toCompactSnapShot(initialSnapShot)
rf.persist()
Expand Down Expand Up @@ -63,13 +70,22 @@ func (rf *Raft) handleInstallSnapShot(args *InstallSnapshotArgs, reply *InstallS
if rf.currentTerm != reply.Term || rf.state != Leader {
return
}
if args.SnapShot.Index >= rf.peerTrackers[reply.From].nextIndex {
if args.SnapShot.Index < rf.peerTrackers[reply.From].nextIndex {
return
}

// TODO Why?
if reply.Installed {
rf.peerTrackers[reply.From].nextIndex = args.SnapShot.Index + 1
oriNext := rf.peerTrackers[reply.From].nextIndex
oriMatch := rf.peerTrackers[reply.From].matchIndex

rf.peerTrackers[reply.From].matchIndex = args.SnapShot.Index
rf.peerTrackers[reply.From].nextIndex = rf.peerTrackers[reply.From].matchIndex + 1

newNext := rf.peerTrackers[reply.From].nextIndex
newMatch := rf.peerTrackers[reply.From].matchIndex
if newNext != oriNext || newMatch != oriMatch {
rf.logger.updateProgOf(uint64(reply.From), oriNext, oriMatch, newNext, newMatch)
}
}
}

Expand All @@ -93,12 +109,21 @@ func (rf *Raft) InstallSnapShot(args *InstallSnapshotArgs, reply *InstallSnapsho
reply.Term = rf.currentTerm
defer rf.persist()
}
rf.resetElectionTimer()
defer rf.resetElectionTimer()

if args.SnapShot.Index >= rf.log.snapShot.Index {
if args.SnapShot.Index <= rf.log.snapShot.Index {
reply.Installed = true
return
}

if !rf.log.hasPendingSnapshot && args.SnapShot.Index > rf.log.snapShot.Index {
rf.log.toCompactSnapShot(args.SnapShot)
reply.Installed = true
if !termChanged {
rf.persist()
defer rf.persist()
}
}

rf.log.hasPendingSnapshot = true
rf.signalAndLog()
}
9 changes: 7 additions & 2 deletions src/raft/log_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ func (rf *Raft) quorumMatched(index uint64) bool {
return 2*matchCnt > len(rf.peers)
}

func (rf *Raft) signalAndLog() {
rf.hasNewCommittedEntries.Signal()
rf.logger.printf(SNAP, "N%v signal()", rf.me)
}

// 输入entries的index,反向遍历 (log.committed~index].
// 假如logEntries中的term与当前rf的term相同,
// 多半数peerTracker的matchIndex不小于当前index,
Expand All @@ -62,7 +67,7 @@ func (rf *Raft) mayCommittedMatched(index uint64) {
for i := index; i > rf.log.committed; i-- {
if term, err := rf.log.term(i); err == nil && term == rf.currentTerm && rf.quorumMatched(i) {
rf.log.committedTo(i)
rf.hasNewCommittedEntries.Signal()
rf.signalAndLog()
break
}
}
Expand All @@ -72,7 +77,7 @@ func (rf *Raft) mayCommittedMatched(index uint64) {
func (rf *Raft) mayCommittedTo(index uint64) {
if res := min(index, rf.log.lastIndex()); res > rf.log.committed {
rf.log.committedTo(res)
rf.hasNewCommittedEntries.Signal()
rf.signalAndLog()
}
}

Expand Down
81 changes: 66 additions & 15 deletions src/raft/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ import (
"os"
"runtime"
"strconv"
"strings"
"time"
)

// true to turn on debugging/logging.
const debug = true
const LOGTOFILE = true
const logPrintEnts = true
const logPrintEnts = false

// what topic the log message is related to.
// logs are organized by topics which further consists of events.
Expand Down Expand Up @@ -137,20 +138,50 @@ func (logger *Logger) printf(topic logTopic, format string, a ...interface{}) {
// e.g. 008256 VOTE ...
prefix := fmt.Sprintf("%010d %v ", time, string(topic))

pc, _, _, ok := runtime.Caller(1)
funcName := runtime.FuncForPC(pc).Name()
if ok {
for i, x := range funcName {
//pc, _, _, ok := runtime.Caller(1)
//funcName := runtime.FuncForPC(pc).Name()
//if ok {
// for i, x := range funcName {
// if x == ')' {
// *str = (*str)[i+2:]
// break
// }
// }
// prefix = prefix + funcName + " | "
//}

pcs := make([]uintptr, 6)
n := runtime.Callers(2, pcs)
pc := pcs[:n]
frames := runtime.CallersFrames(pc)
fNames := ""

tpre := func(str *string) {
for i, x := range *str {
if x == ')' {
funcName = funcName[i+2:]
*str = (*str)[i+2:]
break
}
}
prefix = prefix + funcName + " | "
if p := strings.Index(*str, "raft."); p != -1 {
*str = (*str)[p+5:]
}
}
for {
frame, more := frames.Next()
cur := frame.Function
tpre(&cur)
fNames = fNames + cur + " <- "
if !more {
break
}
}

format = prefix + format
log.Printf(format, a...)
pre := fmt.Sprintf(format, a...)
pre = fmt.Sprintf("%-*s", 63, pre)
pre = pre + "| " + fNames
log.Printf(pre)
}
}

Expand Down Expand Up @@ -393,17 +424,17 @@ func (l *Logger) recvHBETRes(m *AppendEntriesReply) {

func (l *Logger) restoreLog() {
r := l.r
l.printf(PERS, "N%v rs (T:%v V:%v LI:%v CI:%v AI:%v)", r.me, r.currentTerm, r.votedTo,
r.log.lastIndex(), r.log.committed, r.log.applied)
l.printf(PERS, "N%v rs (T:%v V:%v LI:%v CI:%v AI:%v SI:%v ST:%v)", r.me, r.currentTerm, r.votedTo,
r.log.lastIndex(), r.log.committed, r.log.applied, r.log.snapShot.Index, r.log.snapShot.Term)
if logPrintEnts {
l.printEnts(PERS, uint64(r.me), r.log.entries)
}
}

func (l *Logger) persistLog() {
r := l.r
l.printf(PERS, "N%v pslog (T:%v V:%v LI:%v CI:%v AI:%v)", r.me, r.currentTerm, r.votedTo,
r.log.lastIndex(), r.log.committed, r.log.applied)
l.printf(PERS, "N%v pslog (T:%v V:%v LI:%v CI:%v AI:%v SI:%v ST:%v)", r.me, r.currentTerm, r.votedTo,
r.log.lastIndex(), r.log.committed, r.log.applied, r.log.snapShot.Index, r.log.snapShot.Term)
if logPrintEnts {
l.printEnts(PERS, uint64(r.me), r.log.entries)
}
Expand Down Expand Up @@ -436,9 +467,8 @@ func (l *Logger) PersistEnts(oldlastStabledIndex, lastStabledIndex uint64) {

func (l *Logger) compactedTo(snapshotIndex, snapshotTerm uint64) {
r := l.r
lastLogIndex := r.log.lastIndex()
lastLogTerm, _ := r.log.term(lastLogIndex)
l.printf(SNAP, "N%v cp (SI:%v ST:%v LI:%v LT:%v)", r.me, snapshotIndex, snapshotTerm, lastLogIndex, lastLogTerm)
l.printf(SNAP, "N%v cp (SI:%v ST:%v LI:%v LT:%v)", r.me, r.log.snapShot.Index, r.log.snapShot.Term,
snapshotIndex, snapshotTerm)
}

func (l *Logger) sendISNP(to int, snapshotIndex, snapshotTerm uint64) {
Expand All @@ -455,3 +485,24 @@ func (l *Logger) recvISNPRes(m *InstallSnapshotReply) {
r := l.r
l.printf(SNAP, "N%v <- N%v ISNP RES (IS:%v)", r.me, m.From, m.Installed)
}

func (l *Logger) hasPendingSNP() {
r := l.r
l.printf(SNAP, "N%v has pending snapshot", r.me)
}

func (l *Logger) hasCmitEnt(nCommittedEntries []LogEntry) {
r := l.r
l.printf(SNAP, "N%v has new committed entries. LN=%v FI=%v LI=%v", r.me,
len(nCommittedEntries), nCommittedEntries[0].Index, nCommittedEntries[len(nCommittedEntries)-1].Index)
}

func (l *Logger) pullSnap(snapshotIndex uint64) {
r := l.r
l.printf(SNAP, "N%v pull SNP (SI:%v)", r.me, snapshotIndex)
}

func (l *Logger) pushSnap(snapshotIndex, snapshotTerm uint64) {
r := l.r
l.printf(SNAP, "N%v push SNP (SI:%v ST:%v)", r.me, snapshotIndex, snapshotTerm)
}
Loading

0 comments on commit e6f1fa4

Please sign in to comment.