Skip to content

Commit

Permalink
Chore(backups): Consolidate the backup manifests (hypermodeinc#7506)
Browse files Browse the repository at this point in the history
This PR consolidates the backup manifests. Now there will be just
one manifest file for the backups. This change is backward
compatible. It will create a master manifest on creating a new 
backup on the old (version < 21.03) backup directory.
  • Loading branch information
ahsanbarkati authored Mar 4, 2021
1 parent 323a915 commit ca8624c
Show file tree
Hide file tree
Showing 13 changed files with 332 additions and 247 deletions.
15 changes: 2 additions & 13 deletions ee/backup/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"io/ioutil"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -271,14 +270,6 @@ func runLsbackupCmd() error {
return errors.Wrapf(err, "while listing manifests")
}

var paths []string
for path := range manifests {
paths = append(paths, path)
}
sort.Slice(paths, func(i, j int) bool {
return paths[i] < paths[j]
})

type backupEntry struct {
Path string `json:"path"`
Since uint64 `json:"since"`
Expand All @@ -293,12 +284,10 @@ func runLsbackupCmd() error {
type backupOutput []backupEntry

var output backupOutput
for i := 0; i < len(paths); i++ {
path := paths[i]
manifest := manifests[path]
for _, manifest := range manifests {

be := backupEntry{
Path: path,
Path: manifest.Path,
Since: manifest.Since,
BackupId: manifest.BackupId,
BackupNum: manifest.BackupNum,
Expand Down
2 changes: 1 addition & 1 deletion systest/backup/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func DirCleanup(t *testing.T) {
require.NoError(t, os.RemoveAll(restoreDir))
require.NoError(t, os.RemoveAll(copyBackupDir))

cmd := []string{"bash", "-c", "rm -rf /data/backups/dgraph.*"}
cmd := []string{"bash", "-c", "rm -rf /data/backups/*"}
require.NoError(t, testutil.DockerExec(alphaContainers[0], cmd...))
}

Expand Down
25 changes: 14 additions & 11 deletions systest/backup/encryption/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,11 +299,13 @@ func runBackupInternal(t *testing.T, forceFull bool, numExpectedFiles,
})
require.Equal(t, numExpectedDirs, len(dirs))

manifests := x.WalkPathFunc(backupDir, func(path string, isdir bool) bool {
return !isdir && strings.Contains(path, "manifest.json")
})
require.Equal(t, numExpectedDirs, len(manifests))
b, err = ioutil.ReadFile(filepath.Join(backupDir, "manifest.json"))
require.NoError(t, err)

var manifest worker.MasterManifest
err = json.Unmarshal(b, &manifest)
require.NoError(t, err)
require.Equal(t, numExpectedDirs, len(manifest.Manifests))
return dirs
}

Expand All @@ -314,12 +316,11 @@ func runRestore(t *testing.T, lastDir string, commitTs uint64) map[string]string

t.Logf("--- Restoring from: %q", localBackupDst)
testutil.KeyFile = "../../../ee/enc/test-fixtures/enc-key"
argv := []string{"dgraph", "restore", "-l", localBackupDst, "-p", "data/restore",
"--encryption_key_file", testutil.KeyFile, "--force_zero=false"}
cwd, err := os.Getwd()
require.NoError(t, err)
err = testutil.ExecWithOpts(argv, testutil.CmdOpts{Dir: cwd})
key, err := ioutil.ReadFile("../../../ee/enc/test-fixtures/enc-key")
require.NoError(t, err)
result := worker.RunRestore("./data/restore", localBackupDst, lastDir,
x.SensitiveByteSlice(key), options.Snappy, 0)
require.NoError(t, result.Err)

for i, pdir := range []string{"p1", "p2", "p3"} {
pdir = filepath.Join("./data/restore", pdir)
Expand Down Expand Up @@ -398,8 +399,10 @@ func copyToLocalFs(t *testing.T) {
objectCh1 := mc.ListObjectsV2(bucketName, "", false, lsCh1)
for object := range objectCh1 {
require.NoError(t, object.Err)
dstDir := backupDir + "/" + object.Key
os.MkdirAll(dstDir, os.ModePerm)
if object.Key != "manifest.json" {
dstDir := backupDir + "/" + object.Key
require.NoError(t, os.MkdirAll(dstDir, os.ModePerm))
}

// Get all the files in that folder and copy them to the local filesystem.
lsCh2 := make(chan struct{})
Expand Down
10 changes: 6 additions & 4 deletions systest/backup/filesystem/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,10 +409,12 @@ func runBackupInternal(t *testing.T, forceFull bool, numExpectedFiles,
})
require.Equal(t, numExpectedDirs, len(dirs))

manifests := x.WalkPathFunc(copyBackupDir, func(path string, isdir bool) bool {
return !isdir && strings.Contains(path, "manifest.json") && strings.HasPrefix(path, "data/backups_copy/dgraph.")
})
require.Equal(t, numExpectedDirs, len(manifests))
b, err = ioutil.ReadFile(filepath.Join(copyBackupDir, "manifest.json"))
require.NoError(t, err)
var manifest worker.MasterManifest
err = json.Unmarshal(b, &manifest)
require.NoError(t, err)
require.Equal(t, numExpectedDirs, len(manifest.Manifests))

return dirs
}
Expand Down
6 changes: 4 additions & 2 deletions systest/backup/minio-large/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,10 @@ func copyToLocalFs(t *testing.T) {
objectCh1 := mc.ListObjectsV2(bucketName, "", false, lsCh1)
for object := range objectCh1 {
require.NoError(t, object.Err)
dstDir := backupDir + "/" + object.Key
require.NoError(t, os.MkdirAll(dstDir, os.ModePerm))
if object.Key != "manifest.json" {
dstDir := backupDir + "/" + object.Key
require.NoError(t, os.MkdirAll(dstDir, os.ModePerm))
}

// Get all the files in that folder and copy them to the local filesystem.
lsCh2 := make(chan struct{})
Expand Down
26 changes: 14 additions & 12 deletions systest/backup/minio/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,10 +320,13 @@ func runBackupInternal(t *testing.T, forceFull bool, numExpectedFiles,
})
require.Equal(t, numExpectedDirs, len(dirs))

manifests := x.WalkPathFunc(backupDir, func(path string, isdir bool) bool {
return !isdir && strings.Contains(path, "manifest.json")
})
require.Equal(t, numExpectedDirs, len(manifests))
b, err = ioutil.ReadFile(filepath.Join(backupDir, "manifest.json"))
require.NoError(t, err)

var manifest worker.MasterManifest
err = json.Unmarshal(b, &manifest)
require.NoError(t, err)
require.Equal(t, numExpectedDirs, len(manifest.Manifests))

return dirs
}
Expand All @@ -334,12 +337,9 @@ func runRestore(t *testing.T, lastDir string, commitTs uint64) map[string]string
require.NoError(t, os.RemoveAll(restoreDir))

t.Logf("--- Restoring from: %q", localBackupDst)
argv := []string{"dgraph", "restore", "-l", localBackupDst, "-p", "data/restore",
"--force_zero=false"}
cwd, err := os.Getwd()
require.NoError(t, err)
err = testutil.ExecWithOpts(argv, testutil.CmdOpts{Dir: cwd})
require.NoError(t, err)
result := worker.RunRestore("./data/restore", localBackupDst, lastDir,
x.SensitiveByteSlice(nil), options.Snappy, 0)
require.NoError(t, result.Err)

for i, pdir := range []string{"p1", "p2", "p3"} {
pdir = filepath.Join("./data/restore", pdir)
Expand Down Expand Up @@ -389,8 +389,10 @@ func copyToLocalFs(t *testing.T) {
objectCh1 := mc.ListObjectsV2(bucketName, "", false, lsCh1)
for object := range objectCh1 {
require.NoError(t, object.Err)
dstDir := backupDir + "/" + object.Key
os.MkdirAll(dstDir, os.ModePerm)
if object.Key != "manifest.json" {
dstDir := backupDir + "/" + object.Key
require.NoError(t, os.MkdirAll(dstDir, os.ModePerm))
}

// Get all the files in that folder and copy them to the local filesystem.
lsCh2 := make(chan struct{})
Expand Down
5 changes: 0 additions & 5 deletions systest/backup/multi-tenancy/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,11 +356,6 @@ func runBackupInternal(t *testing.T, token *testutil.HttpToken, forceFull bool,
})
require.Equal(t, numExpectedDirs, len(dirs))

manifests := x.WalkPathFunc(copyBackupDir, func(path string, isdir bool) bool {
return !isdir && strings.Contains(path, "manifest.json") && strings.HasPrefix(path, "data/backups_copy/dgraph.")
})
require.Equal(t, numExpectedDirs, len(manifests))

return dirs
}

Expand Down
9 changes: 6 additions & 3 deletions worker/backup_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,19 @@ type Manifest struct {
// versions (<= 20.11), the predicates in Group map do not have namespace. Version will be zero
// for older versions.
Version int `json:"version"`
// Path is the path to the manifest file. This field is only used during
// processing and is not written to disk.
Path string `json:"-"`
// Path is the name of the backup directory to which this manifest belongs to.
Path string `json:"path"`
// Encrypted indicates whether this backup was encrypted or not.
Encrypted bool `json:"encrypted"`
// DropOperations lists the various DROP operations that took place since the last backup.
// These are used during restore to redo those operations before applying the backup.
DropOperations []*pb.DropOperation `json:"drop_operations"`
}

type MasterManifest struct {
Manifests []*Manifest
}

func (m *Manifest) getPredsInGroup(gid uint32) predicateSet {
preds, ok := m.Groups[gid]
if !ok {
Expand Down
6 changes: 3 additions & 3 deletions worker/backup_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ package worker

import (
"context"
"fmt"
"net/url"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -211,8 +211,9 @@ func ProcessBackupRequest(ctx context.Context, req *pb.BackupRequest, forceFull
}
}

dir := fmt.Sprintf(backupPathFmt, req.UnixTs)
m := Manifest{Since: req.ReadTs, Groups: predMap, Version: x.DgraphVersion,
DropOperations: dropOperations}
DropOperations: dropOperations, Path: dir}
if req.SinceTs == 0 {
m.Type = "full"
m.BackupId = x.GetRandomName(1)
Expand Down Expand Up @@ -247,6 +248,5 @@ func ProcessListBackups(ctx context.Context, location string, creds *x.MinioCred
for _, m := range manifests {
res = append(res, m)
}
sort.Slice(res, func(i, j int) bool { return res[i].Path < res[j].Path })
return res, nil
}
55 changes: 13 additions & 42 deletions worker/backup_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@ import (
"io"
"net/url"
"sort"
"sync"

"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
"golang.org/x/sync/errgroup"

"github.com/pkg/errors"
)
Expand Down Expand Up @@ -51,6 +49,8 @@ const (
// because it used by subsequent incremental backups.
// "groups" are the group IDs that participated.
backupManifest = `manifest.json`

tmpManifest = `manifest_tmp.json`
)

// UriHandler interface is implemented by URI scheme handlers.
Expand All @@ -63,7 +63,12 @@ type UriHandler interface {
// These function calls are used by both Create and Load.
io.WriteCloser

// GetManifests returns the list of manfiests for the given backup series ID
// GetManifest returns the master manifest, containing information about all the
// backups. If the backup directory is using old formats (version < 21.03) of manifests,
// then it will return a consolidated master manifest.
GetManifest(*url.URL) (*MasterManifest, error)

// GetManifests returns the list of manifest for the given backup series ID
// and backup number at the specified location. If backupNum is set to zero,
// all the manifests for the backup series will be returned. If it's greater
// than zero, manifests from one to backupNum will be returned.
Expand All @@ -76,8 +81,8 @@ type UriHandler interface {
// CreateBackupFile prepares the object or file to save the backup file.
CreateBackupFile(*url.URL, *pb.BackupRequest) error

// CreateManifest prepares the manifest for writing.
CreateManifest(*url.URL, *pb.BackupRequest) error
// CreateManifest creates the given manifest.
CreateManifest(*url.URL, *MasterManifest) error

// Load will scan location URI for backup files, then load them via loadFn.
// It optionally takes the name of the last directory to consider. Any backup directories
Expand All @@ -90,14 +95,6 @@ type UriHandler interface {
// given groups. The last manifest of that backup should have the same number of
// groups as given list of groups.
Verify(*url.URL, *pb.RestoreRequest, []uint32) error

// ListManifests will scan the provided URI and return the paths to the manifests stored
// in that location.
ListManifests(*url.URL) ([]string, error)

// ReadManifest will read the manifest at the given location and load it into the given
// Manifest object.
ReadManifest(string, *Manifest) error
}

// NewUriHandler parses the requested URI and finds the corresponding UriHandler.
Expand Down Expand Up @@ -175,7 +172,7 @@ func VerifyBackup(req *pb.RestoreRequest, creds *x.MinioCredentials, currentGrou
}

// ListBackupManifests scans location l for backup files and returns the list of manifests.
func ListBackupManifests(l string, creds *x.MinioCredentials) (map[string]*Manifest, error) {
func ListBackupManifests(l string, creds *x.MinioCredentials) ([]*Manifest, error) {
uri, err := url.Parse(l)
if err != nil {
return nil, err
Expand All @@ -186,37 +183,11 @@ func ListBackupManifests(l string, creds *x.MinioCredentials) (map[string]*Manif
return nil, errors.Wrap(err, "ListBackupManifests")
}

paths, err := h.ListManifests(uri)
m, err := h.GetManifest(uri)
if err != nil {
return nil, err
}

res := struct {
sync.Mutex
listedManifests map[string]*Manifest
}{
listedManifests: make(map[string]*Manifest),
}

var g errgroup.Group
for _, path := range paths {
path := path // https://golang.org/doc/faq#closures_and_goroutines
g.Go(func() error {
var m Manifest
if err := h.ReadManifest(path, &m); err != nil {
return errors.Wrapf(err, "ReadManifest: path=%q", path)
}
m.Path = path
res.Lock()
res.listedManifests[path] = &m
res.Unlock()
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}
return res.listedManifests, nil
return m.Manifests, nil
}

// filterManifests takes a list of manifests and returns the list of manifests
Expand Down
12 changes: 7 additions & 5 deletions worker/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"context"
"encoding/binary"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net/url"
Expand Down Expand Up @@ -295,7 +294,7 @@ func (pr *BackupProcessor) WriteBackup(ctx context.Context) (*pb.BackupResponse,
}

// CompleteBackup will finalize a backup by writing the manifest at the backup destination.
func (pr *BackupProcessor) CompleteBackup(ctx context.Context, manifest *Manifest) error {
func (pr *BackupProcessor) CompleteBackup(ctx context.Context, m *Manifest) error {
if err := ctx.Err(); err != nil {
return err
}
Expand All @@ -310,12 +309,15 @@ func (pr *BackupProcessor) CompleteBackup(ctx context.Context, manifest *Manifes
return err
}

if err := handler.CreateManifest(uri, pr.Request); err != nil {
manifest, err := handler.GetManifest(uri)
if err != nil {
return err
}

if err = json.NewEncoder(handler).Encode(manifest); err != nil {
return err
manifest.Manifests = append(manifest.Manifests, m)

if err := handler.CreateManifest(uri, manifest); err != nil {
return errors.Wrap(err, "Complete backup failed")
}

if err = handler.Close(); err != nil {
Expand Down
Loading

0 comments on commit ca8624c

Please sign in to comment.