Skip to content

Commit

Permalink
remove async snapshot (talent-plan#118)
Browse files Browse the repository at this point in the history
* remove async snapshot

Signed-off-by: Connor1996 <[email protected]>
  • Loading branch information
Connor1996 committed Mar 12, 2020
1 parent 42ecb06 commit fad70d6
Show file tree
Hide file tree
Showing 13 changed files with 121 additions and 415 deletions.
28 changes: 3 additions & 25 deletions kv/raftstore/batch_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,12 @@ func (bs *RaftBatchSystem) loadPeers() ([]*peer, error) {
kvEngine := ctx.engine.Kv
storeID := ctx.store.Id

var totalCount, tombStoneCount, applyingCount int
var totalCount, tombStoneCount int
var regionPeers []*peer

t := time.Now()
kvWB := new(engine_util.WriteBatch)
raftWB := new(engine_util.WriteBatch)
var applyingRegions []*metapb.Region
var mergingCount int
err := kvEngine.View(func(txn *badger.Txn) error {
it := txn.NewIterator(badger.DefaultIteratorOptions)
defer it.Close()
Expand Down Expand Up @@ -153,14 +151,6 @@ func (bs *RaftBatchSystem) loadPeers() ([]*peer, error) {
bs.clearStaleMeta(kvWB, raftWB, localState)
continue
}
if localState.State == rspb.PeerState_Applying {
// in case of restart happen when we just write region state to Applying,
// but not write raft_local_state to raft rocksdb in time.
recoverFromApplyingState(ctx.engine, raftWB, regionID)
applyingCount++
applyingRegions = append(applyingRegions, region)
continue
}

peer, err := createPeer(storeID, ctx.cfg, ctx.regionTaskSender, ctx.engine, region)
if err != nil {
Expand All @@ -180,20 +170,8 @@ func (bs *RaftBatchSystem) loadPeers() ([]*peer, error) {
kvWB.MustWriteToDB(ctx.engine.Kv)
raftWB.MustWriteToDB(ctx.engine.Raft)

// schedule applying snapshot after raft write batch were written.
for _, region := range applyingRegions {
log.Infof("region %d is applying snapshot", region.Id)
peer, err := createPeer(storeID, ctx.cfg, ctx.regionTaskSender, ctx.engine, region)
if err != nil {
return nil, err
}
peer.peerStorage.ScheduleApplyingSnapshot()
ctx.storeMeta.regionRanges.ReplaceOrInsert(&regionItem{region: region})
ctx.storeMeta.regions[region.Id] = region
regionPeers = append(regionPeers, peer)
}
log.Infof("start store %d, region_count %d, tombstone_count %d, applying_count %d, merge_count %d, takes %v",
storeID, totalCount, tombStoneCount, applyingCount, mergingCount, time.Since(t))
log.Infof("start store %d, region_count %d, tombstone_count %d, takes %v",
storeID, totalCount, tombStoneCount, time.Since(t))
return regionPeers, nil
}

Expand Down
11 changes: 3 additions & 8 deletions kv/raftstore/meta/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@ const (

// Following are the suffix after the local prefix.
// For region id
RaftLogSuffix byte = 0x01
RaftStateSuffix byte = 0x02
ApplyStateSuffix byte = 0x03
SnapshotRaftStateSuffix byte = 0x04
RaftLogSuffix byte = 0x01
RaftStateSuffix byte = 0x02
ApplyStateSuffix byte = 0x03

// For region meta
RegionStateSuffix byte = 0x01
Expand Down Expand Up @@ -83,10 +82,6 @@ func ApplyStateKey(regionID uint64) []byte {
return makeRegionPrefix(regionID, ApplyStateSuffix)
}

func SnapshotRaftStateKey(regionID uint64) []byte {
return makeRegionPrefix(regionID, SnapshotRaftStateSuffix)
}

func IsRaftStateKey(key []byte) bool {
return len(key) == 11 && key[0] == LocalPrefix && key[1] == RegionRaftPrefix
}
Expand Down
8 changes: 0 additions & 8 deletions kv/raftstore/meta/values.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,6 @@ func GetRaftLocalState(db *badger.DB, regionId uint64) (*rspb.RaftLocalState, er
return raftLocalState, nil
}

func GetSnapRaftState(db *badger.DB, regionId uint64) (*rspb.RaftLocalState, error) {
snapRaftState := new(rspb.RaftLocalState)
if err := engine_util.GetMsg(db, SnapshotRaftStateKey(regionId), snapRaftState); err != nil {
return nil, err
}
return snapRaftState, nil
}

func GetApplyState(db *badger.DB, regionId uint64) (*rspb.RaftApplyState, error) {
applyState := new(rspb.RaftApplyState)
if err := engine_util.GetMsg(db, ApplyStateKey(regionId), applyState); err != nil {
Expand Down
17 changes: 2 additions & 15 deletions kv/raftstore/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,6 @@ type peer struct {

// Index of last scheduled committed raft log.
LastCompactedIdx uint64

// If a snapshot is being applied asynchronously, messages should not be sent.
pendingMessages []eraftpb.Message
}

func NewPeer(storeId uint64, cfg *config.Config, engines *engine_util.Engines, region *metapb.Region, regionSched chan<- worker.Task,
Expand Down Expand Up @@ -155,8 +152,8 @@ func NewPeer(storeId uint64, cfg *config.Config, engines *engine_util.Engines, r
peerCache: make(map[uint64]*metapb.Peer),
PeerHeartbeats: make(map[uint64]time.Time),
PeersStartPendingTime: make(map[uint64]time.Time),
Tag: tag,
ticker: newTicker(region.GetId(), cfg),
Tag: tag,
ticker: newTicker(region.GetId(), cfg),
}

// If this region has only one peer and I am the one, campaign directly.
Expand Down Expand Up @@ -201,12 +198,6 @@ func (p *peer) MaybeDestroy() bool {
log.Infof("%v is being destroyed, skip", p.Tag)
return false
}
if p.IsApplyingSnapshot() {
if !p.Store().CancelApplyingSnap() {
log.Infof("%v stale peer %v is applying snapshot", p.Tag, p.Meta.Id)
return false
}
}
return true
}

Expand Down Expand Up @@ -285,10 +276,6 @@ func (p *peer) Store() *PeerStorage {
return p.peerStorage
}

func (p *peer) IsApplyingSnapshot() bool {
return p.Store().IsApplyingSnapshot()
}

func (p *peer) Send(trans Transport, msgs []eraftpb.Message) {
for _, msg := range msgs {
err := p.sendRaftMessage(msg, trans)
Expand Down
12 changes: 1 addition & 11 deletions kv/raftstore/peer_msg_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,14 +147,6 @@ func (d *peerMsgHandler) startTicker() {
}

func (d *peerMsgHandler) onRaftBaseTick() {
// When having pending snapshot, if election timeout is met, it can't pass
// the pending conf change check because first index has been updated to
// a value that is larger than last index.
if d.peer.IsApplyingSnapshot() {
// need to check if snapshot is applied.
d.ticker.schedule(PeerTickRaft)
return
}
d.peer.RaftGroup.Tick()
d.ticker.schedule(PeerTickRaft)
}
Expand Down Expand Up @@ -379,7 +371,6 @@ func (d *peerMsgHandler) destroyPeer() {
log.Infof("%s starts destroy", d.tag())
regionID := d.regionID()
// We can't destroy a peer which is applying snapshot.
y.Assert(!d.peer.IsApplyingSnapshot())
meta := d.ctx.storeMeta
isInitialized := d.peer.isInitialized()
if err := d.peer.Destroy(d.ctx.engine, false); err != nil {
Expand Down Expand Up @@ -541,7 +532,6 @@ func (d *peerMsgHandler) onGCSnap(snaps []snap.SnapKeyWithSending) {
store := d.peer.Store()
compactedIdx := store.truncatedIndex()
compactedTerm := store.truncatedTerm()
isApplyingSnap := store.IsApplyingSnapshot()
for _, snapKeyWithSending := range snaps {
key := snapKeyWithSending.SnapKey
if snapKeyWithSending.IsSending {
Expand All @@ -561,7 +551,7 @@ func (d *peerMsgHandler) onGCSnap(snaps []snap.SnapKeyWithSending) {
}
}
} else if key.Term <= compactedTerm &&
(key.Index < compactedIdx || (key.Index == compactedIdx && !isApplyingSnap)) {
(key.Index < compactedIdx || key.Index == compactedIdx) {
log.Infof("%s snap file %s has been applied, delete", d.tag(), key)
a, err := d.ctx.snapMgr.GetSnapshotForApplying(key)
if err != nil {
Expand Down
114 changes: 22 additions & 92 deletions kv/raftstore/peer_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package raftstore
import (
"bytes"
"fmt"
"sync/atomic"
"time"

"github.com/Connor1996/badger"
Expand Down Expand Up @@ -37,36 +36,6 @@ func (ps *PeerStorage) saveApplyStateTo(wb *engine_util.WriteBatch, applyState *
wb.SetMsg(meta.ApplyStateKey(ps.region.GetId()), applyState)
}

func (ps *PeerStorage) saveSnapshotRaftStateTo(snapshotIdx uint64, wb *engine_util.WriteBatch) {
snapshotRaftState := ps.raftState
snapshotRaftState.HardState.Commit = snapshotIdx
snapshotRaftState.LastIndex = snapshotIdx
wb.SetMsg(meta.SnapshotRaftStateKey(ps.region.GetId()), &snapshotRaftState)
}

func recoverFromApplyingState(engines *engine_util.Engines, raftWB *engine_util.WriteBatch, regionID uint64) error {
snapRaftState, err := meta.GetSnapRaftState(engines.Kv, regionID)
if err != nil {
return errors.Errorf("region %d failed to get raftstate from kv engine when recover from applying state", regionID)
}

raftState, err := meta.GetRaftLocalState(engines.Raft, regionID)
if err != nil && err != badger.ErrKeyNotFound {
return errors.WithStack(err)
}

// if we recv append log when applying snapshot, last_index in raft_local_state will
// larger than snapshot_index. since raft_local_state is written to raft engine, and
// raft write_batch is written after kv write_batch, raft_local_state may wrong if
// restart happen between the two write. so we copy raft_local_state to kv engine
// (snapshot_raft_state), and set snapshot_raft_state.last_index = snapshot_index.
// after restart, we need check last_index.
if snapRaftState.LastIndex > raftState.LastIndex {
raftWB.SetMsg(meta.RaftStateKey(regionID), snapRaftState)
}
return nil
}

var _ raft.Storage = new(PeerStorage)

type PeerStorage struct {
Expand Down Expand Up @@ -246,10 +215,6 @@ func (ps *PeerStorage) Region() *metapb.Region {
return ps.region
}

func (ps *PeerStorage) IsApplyingSnapshot() bool {
return ps.snapState.StateType == snap.SnapState_Applying
}

func (ps *PeerStorage) checkRange(low, high uint64) error {
if low > high {
return errors.Errorf("low %d is greater than high %d", low, high)
Expand Down Expand Up @@ -403,7 +368,7 @@ func WritePeerState(kvWB *engine_util.WriteBatch, region *metapb.Region, state r
}

// Apply the peer with given snapshot.
func (ps *PeerStorage) ApplySnapshot(snap *eraftpb.Snapshot, kvWB *engine_util.WriteBatch, raftWB *engine_util.WriteBatch) (*metapb.Region, error) {
func (ps *PeerStorage) ApplySnapshot(snap *eraftpb.Snapshot, kvWB *engine_util.WriteBatch, raftWB *engine_util.WriteBatch) (*ApplySnapResult, error) {
log.Infof("%v begin to apply snapshot", ps.Tag)

snapData := new(rspb.RaftSnapshotData)
Expand All @@ -425,6 +390,15 @@ func (ps *PeerStorage) ApplySnapshot(snap *eraftpb.Snapshot, kvWB *engine_util.W
ps.raftState.LastIndex = snap.Metadata.Index
ps.lastTerm = snap.Metadata.Term

applyRes := &ApplySnapResult{
PrevRegion: ps.region,
Region: snapData.Region,
}
// cleanup data before scheduling apply worker.Task
if ps.isInitialized() {
ps.clearExtraData(snapData.Region)
}
ps.region = snapData.Region
applyState := &rspb.RaftApplyState{
AppliedIndex: snap.Metadata.Index,
// The snapshot only contains log which index > applied index, so
Expand All @@ -435,10 +409,11 @@ func (ps *PeerStorage) ApplySnapshot(snap *eraftpb.Snapshot, kvWB *engine_util.W
},
}
ps.saveApplyStateTo(kvWB, applyState)
WritePeerState(kvWB, snapData.Region, rspb.PeerState_Applying)
ps.ScheduleApplyingSnapshotAndWait(snapData.Region, snap.Metadata)
WritePeerState(kvWB, snapData.Region, rspb.PeerState_Normal)

log.Debugf("%v apply snapshot for region %v with state %v ok", ps.Tag, snapData.Region, applyState)
return snapData.Region, nil
return applyRes, nil
}

/// Save memory states to disk.
Expand All @@ -447,20 +422,13 @@ func (ps *PeerStorage) SaveReadyState(ready *raft.Ready) (*ApplySnapResult, erro
kvWB, raftWB := new(engine_util.WriteBatch), new(engine_util.WriteBatch)
prevRaftState := ps.raftState

var snapshotIdx uint64 = 0
var snapRegion *metapb.Region = nil
var applyRes *ApplySnapResult = nil
var err error
if !raft.IsEmptySnap(&ready.Snapshot) {
region, err := ps.ApplySnapshot(&ready.Snapshot, kvWB, raftWB)
applyRes, err = ps.ApplySnapshot(&ready.Snapshot, kvWB, raftWB)
if err != nil {
return nil, err
}
applyRes = &ApplySnapResult{
PrevRegion: ps.region,
Region: region,
}
snapshotIdx = ps.raftState.LastIndex
snapRegion = region
}

if len(ready.Entries) != 0 {
Expand All @@ -475,44 +443,29 @@ func (ps *PeerStorage) SaveReadyState(ready *raft.Ready) (*ApplySnapResult, erro

if !proto.Equal(&prevRaftState, &ps.raftState) {
ps.saveRaftStateTo(raftWB)
if snapshotIdx > 0 {
// in case of restart happen when we just write region state to Applying,
// but not write raft_local_state to raft rocksdb in time.
// we write raft state to default rocksdb, with last index set to snap index,
// in case of recv raft log after snapshot.
ps.saveSnapshotRaftStateTo(snapshotIdx, kvWB)
}
}

kvWB.MustWriteToDB(ps.Engines.Kv)
raftWB.MustWriteToDB(ps.Engines.Raft)

// If we apply snapshot ok, we should update some infos like applied index too.
if snapshotIdx > 0 {
// cleanup data before scheduling apply worker.Task
if ps.isInitialized() {
ps.clearExtraData(snapRegion)
}
ps.region = snapRegion
ps.ScheduleApplyingSnapshot()
}

return applyRes, nil
}

func (ps *PeerStorage) ScheduleApplyingSnapshot() {
status := snap.JobStatus_Pending
func (ps *PeerStorage) ScheduleApplyingSnapshotAndWait(snapRegion *metapb.Region, snapMeta *eraftpb.SnapshotMetadata) {
ch := make(chan *eraftpb.Snapshot, 1)
ps.snapState = snap.SnapState{
StateType: snap.SnapState_Applying,
Status: &status,
}
ps.regionSched <- worker.Task{
Tp: worker.TaskTypeRegionApply,
Data: &runner.RegionTask{
RegionId: ps.region.Id,
Status: &status,
Notifier: ch,
SnapMeta: snapMeta,
StartKey: snapRegion.GetStartKey(),
EndKey: snapRegion.GetEndKey(),
},
}
<-ch
}

func (ps *PeerStorage) SetRegion(region *metapb.Region) {
Expand All @@ -529,26 +482,3 @@ func (ps *PeerStorage) ClearData() {
},
}
}

func (p *PeerStorage) CancelApplyingSnap() bool {
// Todo: currently it is a place holder
return true
}

// Check if the storage is applying a snapshot.
func (ps *PeerStorage) CheckApplyingSnap() bool {
switch ps.snapState.StateType {
case snap.SnapState_Applying:
switch atomic.LoadUint32(ps.snapState.Status) {
case snap.JobStatus_Finished:
ps.snapState = snap.SnapState{StateType: snap.SnapState_Relax}
case snap.JobStatus_Cancelled:
ps.snapState = snap.SnapState{StateType: snap.SnapState_ApplyAborted}
case snap.JobStatus_Failed:
panic(fmt.Sprintf("%v applying snapshot failed", ps.Tag))
default:
return true
}
}
return false
}
Loading

0 comments on commit fad70d6

Please sign in to comment.