Skip to content

Commit

Permalink
fix(badger): Do not reuse variable across badger commands (dgraph-io#…
Browse files Browse the repository at this point in the history
…1624)

The same variables were being across multiple badger commands. The
default value of a flag that uses the same variable across two commands
could be from one of the commands and it isn't clear which default value
would be used.

The numVersions command was being used in flatten (default value 1)
and stream (default value 0). Running badger stream --help would show
the default value to be 0 but if you print the value of the
numVersions variable, it would turn out to be 1. This is misleading.

This PR separates the variable so that they don't overlap.

This PR also adds the compression flag to the flatten tool.
  • Loading branch information
Ibrahim Jarif authored Dec 23, 2020
1 parent c20628f commit 3adc574
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 143 deletions.
15 changes: 9 additions & 6 deletions badger/cmd/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ import (
"github.com/spf13/cobra"
)

var backupFile string
var bo = struct {
backupFile string
numVersions int
}{}

// backupCmd represents the backup command
var backupCmd = &cobra.Command{
Expand All @@ -42,9 +45,9 @@ database.`,

func init() {
RootCmd.AddCommand(backupCmd)
backupCmd.Flags().StringVarP(&backupFile, "backup-file", "f",
backupCmd.Flags().StringVarP(&bo.backupFile, "backup-file", "f",
"badger.bak", "File to backup to")
backupCmd.Flags().IntVarP(&numVersions, "num-versions", "n",
backupCmd.Flags().IntVarP(&bo.numVersions, "num-versions", "n",
0, "Number of versions to keep. A value <= 0 means keep all versions.")
}

Expand All @@ -53,8 +56,8 @@ func doBackup(cmd *cobra.Command, args []string) error {
WithValueDir(vlogDir).
WithNumVersionsToKeep(math.MaxInt32)

if numVersions > 0 {
opt.NumVersionsToKeep = numVersions
if bo.numVersions > 0 {
opt.NumVersionsToKeep = bo.numVersions
}

// Open DB
Expand All @@ -65,7 +68,7 @@ func doBackup(cmd *cobra.Command, args []string) error {
defer db.Close()

// Create File
f, err := os.Create(backupFile)
f, err := os.Create(bo.backupFile)
if err != nil {
return err
}
Expand Down
34 changes: 24 additions & 10 deletions badger/cmd/flatten.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"math"

"github.com/dgraph-io/badger/v2"
"github.com/dgraph-io/badger/v2/options"
"github.com/pkg/errors"
"github.com/spf13/cobra"
)

Expand All @@ -33,36 +35,48 @@ This command would compact all the LSM tables into one level.
RunE: flatten,
}

var keyPath string
var numWorkers int
var fo = struct {
keyPath string
numWorkers int
numVersions int
compressionType uint32
}{}

func init() {
RootCmd.AddCommand(flattenCmd)
flattenCmd.Flags().IntVarP(&numWorkers, "num-workers", "w", 1,
flattenCmd.Flags().IntVarP(&fo.numWorkers, "num-workers", "w", 1,
"Number of concurrent compactors to run. More compactors would use more"+
" server resources to potentially achieve faster compactions.")
flattenCmd.Flags().IntVarP(&numVersions, "num_versions", "", 1,
flattenCmd.Flags().IntVarP(&fo.numVersions, "num_versions", "", 0,
"Option to configure the maximum number of versions per key. "+
"Values <= 0 will be considered to have the max number of versions.")
flattenCmd.Flags().StringVar(&keyPath, "encryption-key-file", "",
flattenCmd.Flags().StringVar(&fo.keyPath, "encryption-key-file", "",
"Path of the encryption key file.")
flattenCmd.Flags().Uint32VarP(&fo.compressionType, "compression", "", 1,
"Option to configure the compression type in output DB. "+
"0 to disable, 1 for Snappy, and 2 for ZSTD.")
}

func flatten(cmd *cobra.Command, args []string) error {
if numVersions <= 0 {
if fo.numVersions <= 0 {
// Keep all versions.
numVersions = math.MaxInt32
fo.numVersions = math.MaxInt32
}
encKey, err := getKey(keyPath)
encKey, err := getKey(fo.keyPath)
if err != nil {
return err
}
if fo.compressionType < 0 || fo.compressionType > 2 {
return errors.Errorf(
"compression value must be one of 0 (disabled), 1 (Snappy), or 2 (ZSTD)")
}
opt := badger.DefaultOptions(sstDir).
WithValueDir(vlogDir).
WithNumVersionsToKeep(numVersions).
WithNumVersionsToKeep(fo.numVersions).
WithNumCompactors(0).
WithBlockCacheSize(100 << 20).
WithIndexCacheSize(200 << 20).
WithCompression(options.CompressionType(fo.compressionType)).
WithEncryptionKey(encKey)
fmt.Printf("Opening badger with options = %+v\n", opt)
db, err := badger.Open(opt)
Expand All @@ -71,5 +85,5 @@ func flatten(cmd *cobra.Command, args []string) error {
}
defer db.Close()

return db.Flatten(numWorkers)
return db.Flatten(fo.numWorkers)
}
44 changes: 21 additions & 23 deletions badger/cmd/read_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,15 @@ var (
entriesRead uint64 // will store entries read till now
startTime time.Time // start time of read benchmarking

blockCacheSize int64
indexCacheSize int64

sampleSize int
loadingMode string
keysOnly bool
readOnly bool
fullScan bool
ro = struct {
blockCacheSize int64
indexCacheSize int64

sampleSize int
keysOnly bool
readOnly bool
fullScan bool
}{}
)

func init() {
Expand All @@ -64,18 +65,15 @@ func init() {
readBenchCmd.Flags().StringVarP(
&duration, "duration", "d", "1m", "How long to run the benchmark.")
readBenchCmd.Flags().IntVar(
&sampleSize, "sample-size", 1000000, "Keys sample size to be used for random lookup.")
&ro.sampleSize, "sample-size", 1000000, "Keys sample size to be used for random lookup.")
readBenchCmd.Flags().BoolVar(
&keysOnly, "keys-only", false, "If false, values will also be read.")
&ro.keysOnly, "keys-only", false, "If false, values will also be read.")
readBenchCmd.Flags().BoolVar(
&readOnly, "read-only", true, "If true, DB will be opened in read only mode.")
readBenchCmd.Flags().StringVar(
&loadingMode, "loading-mode", "mmap", "Mode for accessing SSTables and value log files. "+
"Valid loading modes are fileio and mmap.")
&ro.readOnly, "read-only", true, "If true, DB will be opened in read only mode.")
readBenchCmd.Flags().BoolVar(
&fullScan, "full-scan", false, "If true, full db will be scanned using iterators.")
readBenchCmd.Flags().Int64Var(&blockCacheSize, "block-cache", 0, "Max size of block cache in MB")
readBenchCmd.Flags().Int64Var(&indexCacheSize, "index-cache", 0, "Max size of index cache in MB")
&ro.fullScan, "full-scan", false, "If true, full db will be scanned using iterators.")
readBenchCmd.Flags().Int64Var(&ro.blockCacheSize, "block-cache", 256, "Max size of block cache in MB")
readBenchCmd.Flags().Int64Var(&ro.indexCacheSize, "index-cache", 0, "Max size of index cache in MB")
}

// Scan the whole database using the iterators
Expand Down Expand Up @@ -108,9 +106,9 @@ func readBench(cmd *cobra.Command, args []string) error {
y.AssertTrue(numGoroutines > 0)
opt := badger.DefaultOptions(sstDir).
WithValueDir(vlogDir).
WithReadOnly(readOnly).
WithBlockCacheSize(blockCacheSize << 20).
WithIndexCacheSize(indexCacheSize << 20)
WithReadOnly(ro.readOnly).
WithBlockCacheSize(ro.blockCacheSize << 20).
WithIndexCacheSize(ro.indexCacheSize << 20)
fmt.Printf("Opening badger with options = %+v\n", opt)
db, err := badger.OpenManaged(opt)
if err != nil {
Expand All @@ -123,7 +121,7 @@ func readBench(cmd *cobra.Command, args []string) error {
fmt.Println("*********************************************************")

// if fullScan is true then do a complete scan of the db and return
if fullScan {
if ro.fullScan {
fullScanDB(db)
return nil
}
Expand Down Expand Up @@ -210,7 +208,7 @@ func getSampleKeys(db *badger.DB) ([][]byte, error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream.Send = func(buf *z.Buffer) error {
if count >= sampleSize {
if count >= ro.sampleSize {
return nil
}
err := buf.SliceIterate(func(s []byte) error {
Expand All @@ -220,7 +218,7 @@ func getSampleKeys(db *badger.DB) ([][]byte, error) {
}
keys = append(keys, kv.Key)
count++
if count >= sampleSize {
if count >= ro.sampleSize {
cancel()
return errStop
}
Expand Down
55 changes: 30 additions & 25 deletions badger/cmd/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,48 +38,53 @@ This command streams the contents of this DB into another DB with the given opti
RunE: stream,
}

var outDir string
var outFile string
var compressionType uint32
var so = struct {
outDir string
outFile string
compressionType uint32
numVersions int
readOnly bool
keyPath string
}{}

func init() {
// TODO: Add more options.
RootCmd.AddCommand(streamCmd)
streamCmd.Flags().StringVarP(&outDir, "out", "o", "",
streamCmd.Flags().StringVarP(&so.outDir, "out", "o", "",
"Path to output DB. The directory should be empty.")
streamCmd.Flags().StringVarP(&outFile, "", "f", "",
streamCmd.Flags().StringVarP(&so.outFile, "", "f", "",
"Run a backup to this file.")
streamCmd.Flags().BoolVarP(&readOnly, "read_only", "", true,
streamCmd.Flags().BoolVarP(&so.readOnly, "read_only", "", true,
"Option to open input DB in read-only mode")
streamCmd.Flags().IntVarP(&numVersions, "num_versions", "", 0,
streamCmd.Flags().IntVarP(&so.numVersions, "num_versions", "", 0,
"Option to configure the maximum number of versions per key. "+
"Values <= 0 will be considered to have the max number of versions.")
streamCmd.Flags().Uint32VarP(&compressionType, "compression", "", 1,
streamCmd.Flags().Uint32VarP(&so.compressionType, "compression", "", 1,
"Option to configure the compression type in output DB. "+
"0 to disable, 1 for Snappy, and 2 for ZSTD.")
streamCmd.Flags().StringVarP(&keyPath, "encryption-key-file", "e", "",
streamCmd.Flags().StringVarP(&so.keyPath, "encryption-key-file", "e", "",
"Path of the encryption key file.")
}

func stream(cmd *cobra.Command, args []string) error {
// Options for input DB.
if numVersions <= 0 {
numVersions = math.MaxInt32
if so.numVersions <= 0 {
so.numVersions = math.MaxInt32
}
encKey, err := getKey(keyPath)
encKey, err := getKey(so.keyPath)
if err != nil {
return err
}
inOpt := badger.DefaultOptions(sstDir).
WithReadOnly(readOnly).
WithReadOnly(so.readOnly).
WithValueThreshold(1 << 10 /* 1KB */).
WithNumVersionsToKeep(numVersions).
WithNumVersionsToKeep(so.numVersions).
WithBlockCacheSize(100 << 20).
WithIndexCacheSize(200 << 20).
WithEncryptionKey(encKey)

// Options for output DB.
if compressionType < 0 || compressionType > 2 {
if so.compressionType < 0 || so.compressionType > 2 {
return errors.Errorf(
"compression value must be one of 0 (disabled), 1 (Snappy), or 2 (ZSTD)")
}
Expand All @@ -91,9 +96,9 @@ func stream(cmd *cobra.Command, args []string) error {

stream := inDB.NewStreamAt(math.MaxUint64)

if len(outDir) > 0 {
if _, err := os.Stat(outDir); err == nil {
f, err := os.Open(outDir)
if len(so.outDir) > 0 {
if _, err := os.Stat(so.outDir); err == nil {
f, err := os.Open(so.outDir)
if err != nil {
return err
}
Expand All @@ -102,22 +107,22 @@ func stream(cmd *cobra.Command, args []string) error {
_, err = f.Readdirnames(1)
if err != io.EOF {
return errors.Errorf(
"cannot run stream tool on non-empty output directory %s", outDir)
"cannot run stream tool on non-empty output directory %s", so.outDir)
}
}

stream.LogPrefix = "DB.Stream"
outOpt := inOpt.
WithDir(outDir).
WithValueDir(outDir).
WithNumVersionsToKeep(numVersions).
WithCompression(options.CompressionType(compressionType)).
WithDir(so.outDir).
WithValueDir(so.outDir).
WithNumVersionsToKeep(so.numVersions).
WithCompression(options.CompressionType(so.compressionType)).
WithReadOnly(false)
err = inDB.StreamDB(outOpt)

} else if len(outFile) > 0 {
} else if len(so.outFile) > 0 {
stream.LogPrefix = "DB.Backup"
f, err := os.OpenFile(outFile, os.O_RDWR|os.O_CREATE, 0666)
f, err := os.OpenFile(so.outFile, os.O_RDWR|os.O_CREATE, 0666)
y.Check(err)
_, err = stream.Backup(f, 0)
}
Expand Down
Loading

0 comments on commit 3adc574

Please sign in to comment.