Skip to content

Commit

Permalink
dragonboat: add SyncRequestImportSnapshot method.
Browse files Browse the repository at this point in the history
This method imports a snapshot from a snapshot
directory synchronously and then compact log entries
at the index [snapshotIndex-1] asynchronously.
  • Loading branch information
volgariver6 committed Nov 15, 2022
1 parent aadd6a9 commit a82c3e6
Show file tree
Hide file tree
Showing 5 changed files with 246 additions and 18 deletions.
28 changes: 28 additions & 0 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -987,6 +987,34 @@ func (n *node) requestCompaction() (*SysOpState, error) {
return nil, ErrRejected
}

// requestImportSnapshot commits the snapshot and then compact logs
// just before the snapshot index.
func (n *node) requestImportSnapshot(ss pb.Snapshot) error {
req := rsm.SSRequest{
OverrideCompaction: true,
// Compact log entries at index just before snapshot index.
CompactionIndex: ss.Index - 1,
}
if err := n.snapshotter.Commit(ss, req); err != nil {
if snapshotCommitAborted(err) || saveAborted(err) {
return nil
}
return errors.Wrapf(err, "%s commit snapshot failed", n.id())
}
if !ss.Validate(n.snapshotter.fs) {
plog.Panicf("%s generated invalid snapshot %v", n.id(), ss)
}
if err := n.logReader.CreateSnapshot(ss); err != nil {
if isSoftSnapshotError(err) {
return nil
}
return errors.Wrapf(err, "%s create snapshot failed", n.id())
}
n.compactLog(req, ss.Index)
n.ss.setIndex(ss.Index)
return nil
}

func isFreeOrderMessage(m pb.Message) bool {
return m.Type == pb.Replicate || m.Type == pb.Ping
}
Expand Down
78 changes: 78 additions & 0 deletions nodehost.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ import (

"github.com/lni/dragonboat/v4/client"
"github.com/lni/dragonboat/v4/config"
"github.com/lni/dragonboat/v4/internal/fileutil"
"github.com/lni/dragonboat/v4/internal/id"
"github.com/lni/dragonboat/v4/internal/invariants"
"github.com/lni/dragonboat/v4/internal/logdb"
Expand All @@ -83,6 +84,7 @@ import (
"github.com/lni/dragonboat/v4/raftio"
pb "github.com/lni/dragonboat/v4/raftpb"
sm "github.com/lni/dragonboat/v4/statemachine"
"github.com/lni/dragonboat/v4/tools"
)

const (
Expand Down Expand Up @@ -1010,6 +1012,82 @@ func (nh *NodeHost) RequestCompaction(shardID uint64,
return n.requestCompaction()
}

// SyncRequestImportSnapshot requests a snapshot to be imported for the
// specified shard node. It imports a snapshot in srcDir and make it
// available to the replica then try to remove logs just before the
// snapshot index.
func (nh *NodeHost) SyncRequestImportSnapshot(
ctx context.Context, shardID uint64, replicaID uint64, srcDir string,
) error {
_, err := getTimeoutFromContext(ctx)
if err != nil {
return err
}
ssFilePath, err := tools.GetSnapshotFilepath(srcDir, nh.fs)
if err != nil {
return err
}
srcSnapshot, err := tools.GetSnapshotRecord(srcDir, server.MetadataFilename, nh.fs)
if err != nil {
return err
}
// Check whether it is a valid snapshot.
ok, err := tools.IsCompleteSnapshotImage(ssFilePath, srcSnapshot, nh.fs)
if err != nil {
return err
}
if !ok {
return tools.ErrIncompleteSnapshot
}
// Get the snapshot destination directory for the replica.
ssDir := nh.env.GetSnapshotDir(nh.nhConfig.DeploymentID,
srcSnapshot.ShardID, replicaID)
exist, err := fileutil.Exist(ssDir, nh.fs)
if err != nil {
return err
}
if exist {
// If the destination directory exists, clean it to make it ready.
if err := tools.CleanupSnapshotDir(ssDir, nh.fs); err != nil {
return err
}
} else {
// If the destination directory does not exist, create it.
if err := nh.env.CreateSnapshotDir(nh.nhConfig.DeploymentID,
srcSnapshot.ShardID, replicaID); err != nil {
return err
}
}
getSnapshotDir := func(cid uint64, nid uint64) string {
return nh.env.GetSnapshotDir(nh.nhConfig.DeploymentID, cid, nid)
}
ssEnv := server.NewSSEnv(getSnapshotDir,
srcSnapshot.ShardID, replicaID, srcSnapshot.Index, replicaID, server.SnapshotMode, nh.fs)
if err := ssEnv.CreateTempDir(); err != nil {
return err
}
dstDir := ssEnv.GetTempDir()
finalDir := ssEnv.GetFinalDir()
members, err := nh.SyncGetShardMembership(ctx, shardID)
if err != nil {
return err
}
// Get a new snapshot record, mainly the members. Because members may be different
// from those in source snapshot. So we need update members according to current
// member in system.
ss := tools.GetProcessedSnapshotRecord(finalDir, srcSnapshot, members.Nodes, nh.fs)
// Just copy source snapshot directory to destination directory.
if err := tools.CopySnapshot(srcSnapshot, srcDir, dstDir, nh.fs); err != nil {
return err
}
n, ok := nh.getShard(shardID)
if !ok {
return ErrShardNotFound
}
defer nh.engine.setStepReady(shardID)
return n.requestImportSnapshot(ss)
}

// SyncRequestDeleteReplica is the synchronous variant of the RequestDeleteReplica
// method. See RequestDeleteReplica for more details.
//
Expand Down
122 changes: 122 additions & 0 deletions nodehost_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4375,6 +4375,128 @@ func TestShardWithoutQuorumCanBeRestoreByImportingSnapshot(t *testing.T) {
runNodeHostTestDC(t, tf, true, fs)
}

func TestRequestImportSnapshotAndQueryLog(t *testing.T) {
if vfs.GetTestFS() != vfs.DefaultFS {
t.Skip("not using the default fs")
return
}
fs := vfs.GetTestFS()
tf := func() {
rc := config.Config{
ShardID: 1,
ReplicaID: 1,
ElectionRTT: 3,
HeartbeatRTT: 1,
CheckQuorum: true,
CompactionOverhead: 2,
SnapshotCompressionType: config.NoCompression,
}
peers := make(map[uint64]string)
peers[1] = nodeHostTestAddr1
nhc := config.NodeHostConfig{
DeploymentID: 1,
NodeHostDir: singleNodeHostTestDir,
RTTMillisecond: getRTTMillisecond(fs, singleNodeHostTestDir),
RaftAddress: nodeHostTestAddr1,
Expert: getTestExpertConfig(fs),
}
nh, err := NewNodeHost(nhc)
if err != nil {
t.Fatalf("failed to create node host %v", err)
}
pto := lpto(nh)
newSM := func(uint64, uint64) sm.IOnDiskStateMachine {
return tests.NewSimDiskSM(0)
}
if err := nh.StartOnDiskReplica(peers, false, newSM, rc); err != nil {
t.Fatalf("failed to start shard %v", err)
}
waitForLeaderToBeElected(t, nh, 1)
makeProposals := func(nn *NodeHost) {
session := nn.GetNoOPSession(1)
for i := 0; i < 16; i++ {
ctx, cancel := context.WithTimeout(context.Background(), pto)
_, err := nn.SyncPropose(ctx, session, []byte("test-data"))
cancel()
if err != nil {
t.Errorf("failed to make proposal %v", err)
}
}
}
makeProposals(nh)
sspath := "exported_snapshot_safe_to_delete"
if err := fs.RemoveAll(sspath); err != nil {
t.Fatalf("%v", err)
}
if err := fs.MkdirAll(sspath, 0755); err != nil {
t.Fatalf("%v", err)
}
defer func() {
if err := fs.RemoveAll(sspath); err != nil {
t.Fatalf("%v", err)
}
}()
opt := SnapshotOption{
Exported: true,
ExportPath: sspath,
}
var index uint64
for {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
index, err = nh.SyncRequestSnapshot(ctx, 1, opt)
cancel()
if err != nil {
if errors.Is(ErrRejected, err) {
time.Sleep(500 * time.Millisecond)
continue
}
t.Fatalf("failed to sync request snapshot %v", err)
}
break
}

makeProposals(nh)
ctx, cancel := context.WithTimeout(context.Background(), pto)
rv, err := nh.SyncRead(ctx, 1, nil)
cancel()
if err != nil {
t.Fatalf("failed to read applied value %v", err)
}
applied := rv.(uint64)
if applied <= index {
t.Fatalf("invalid applied value %d", applied)
}
snapshotDir := fmt.Sprintf("snapshot-%016X", index)
dir := fs.PathJoin(sspath, snapshotDir)
ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second)
if err := nh.SyncRequestImportSnapshot(ctx, 1, 1, dir); err != nil {
t.Fatalf("failed to import snapshot %v", err)
}
cancel()

// SyncRequestImportSnapshot imports the snapshot synchronously, but compact log entries
// asynchronously, this is ok for application because we can accept less compaction.
// Here we wait for the compaction completion, then query raft log .
time.Sleep(2 * time.Second)
rs, err := nh.QueryRaftLog(1, 18, 35, math.MaxUint64)
assert.NoError(t, err)
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
select {
case v := <-rs.ResultC():
assert.True(t, v.Completed())
entries, logRange := v.RaftLogs()
assert.Equal(t, 17, len(entries))
assert.Equal(t, LogRange{FirstIndex: 18, LastIndex: 35}, logRange)
case <-ticker.C:
t.Fatalf("no results")
}
rs.Release()
nh.Close()
}
runNodeHostTestDC(t, tf, true, fs)
}

type chunks struct {
received uint64
confirmed uint64
Expand Down
26 changes: 13 additions & 13 deletions tools/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,15 @@ func ImportSnapshot(nhConfig config.NodeHostConfig,
if err := checkImportSettings(nhConfig, memberNodes, replicaID); err != nil {
return err
}
ssfp, err := getSnapshotFilepath(srcDir, fs)
ssfp, err := GetSnapshotFilepath(srcDir, fs)
if err != nil {
return err
}
oldss, err := getSnapshotRecord(srcDir, server.MetadataFilename, fs)
oldss, err := GetSnapshotRecord(srcDir, server.MetadataFilename, fs)
if err != nil {
return err
}
ok, err := isCompleteSnapshotImage(ssfp, oldss, fs)
ok, err := IsCompleteSnapshotImage(ssfp, oldss, fs)
if err != nil {
return err
}
Expand Down Expand Up @@ -194,7 +194,7 @@ func ImportSnapshot(nhConfig config.NodeHostConfig,
return err
}
if exist {
if err := cleanupSnapshotDir(ssDir, fs); err != nil {
if err := CleanupSnapshotDir(ssDir, fs); err != nil {
return err
}
} else {
Expand All @@ -213,8 +213,8 @@ func ImportSnapshot(nhConfig config.NodeHostConfig,
}
dstDir := ssEnv.GetTempDir()
finalDir := ssEnv.GetFinalDir()
ss := getProcessedSnapshotRecord(finalDir, oldss, memberNodes, fs)
if err := copySnapshot(oldss, srcDir, dstDir, fs); err != nil {
ss := GetProcessedSnapshotRecord(finalDir, oldss, memberNodes, fs)
if err := CopySnapshot(oldss, srcDir, dstDir, fs); err != nil {
return err
}
if err := ssEnv.FinalizeSnapshot(&ss); err != nil {
Expand All @@ -223,7 +223,7 @@ func ImportSnapshot(nhConfig config.NodeHostConfig,
return logdb.ImportSnapshot(ss, replicaID)
}

func cleanupSnapshotDir(dir string, fs vfs.IFS) error {
func CleanupSnapshotDir(dir string, fs vfs.IFS) error {
files, err := fs.List(dir)
if err != nil {
return err
Expand Down Expand Up @@ -264,7 +264,7 @@ func checkImportSettings(nhConfig config.NodeHostConfig,
return nil
}

func isCompleteSnapshotImage(ssfp string,
func IsCompleteSnapshotImage(ssfp string,
ss pb.Snapshot, fs vfs.IFS) (bool, error) {
checksum, err := rsm.GetV2PayloadChecksum(ssfp, fs)
if err != nil {
Expand All @@ -273,7 +273,7 @@ func isCompleteSnapshotImage(ssfp string,
return bytes.Equal(checksum, ss.Checksum), nil
}

func getSnapshotFilepath(dir string, fs vfs.IFS) (string, error) {
func GetSnapshotFilepath(dir string, fs vfs.IFS) (string, error) {
exist, err := fileutil.Exist(dir, fs)
if err != nil {
return "", err
Expand Down Expand Up @@ -324,7 +324,7 @@ func getSnapshotFilenames(path string, fs vfs.IFS) ([]string, error) {
return results, nil
}

func getSnapshotRecord(dir string,
func GetSnapshotRecord(dir string,
filename string, fs vfs.IFS) (pb.Snapshot, error) {
var ss pb.Snapshot
if err := fileutil.GetFlagFileContent(dir, filename, &ss, fs); err != nil {
Expand Down Expand Up @@ -361,7 +361,7 @@ func checkMembers(old pb.Membership, members map[uint64]string) error {
return nil
}

func getProcessedSnapshotRecord(dstDir string,
func GetProcessedSnapshotRecord(dstDir string,
old pb.Snapshot, members map[uint64]string, fs vfs.IFS) pb.Snapshot {
for _, file := range old.Files {
file.Filepath = fs.PathJoin(dstDir, fs.PathBase(file.Filepath))
Expand Down Expand Up @@ -412,9 +412,9 @@ func getProcessedSnapshotRecord(dstDir string,
return ss
}

func copySnapshot(ss pb.Snapshot,
func CopySnapshot(ss pb.Snapshot,
srcDir string, dstDir string, fs vfs.IFS) error {
fp, err := getSnapshotFilepath(srcDir, fs)
fp, err := GetSnapshotFilepath(srcDir, fs)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit a82c3e6

Please sign in to comment.