Skip to content

Commit

Permalink
New tools, new option (dgraph-io#645)
Browse files Browse the repository at this point in the history
- Add a new flatten command, which would flatten out all the LSM tree levels into a single level. It ensures that that level does not exceed its max size limits, as specified in the options.
- Add a new fill command, which would random key-values to Badger. Useful for testing.

## Option updates
- Add a new option to skip force compaction of Level 0 on DB.Close.
- Decrease the NumCompactors default value from 3 to 2.

Commits:

* Add two new commands: flatten and fill.
* Add an option to skip compacting Level 0 for performance reasons.
* Allow more compactors to run for flatten. Performance wise I didn't see any gains in latency while using multiple compactors. And there's a lot of resource consumption if multiple compactors are running concurrently. Could be due to all compactors working on the same level during flatten. In a live setting, compactors could be working on different levels, so the result might not directly apply.
* Decreased the NumCompactors default option to 2. Didn't set it to 1 to avoid a straggler compactor blocking all compactions.
* Add licenses.
  • Loading branch information
manishrjain authored Dec 18, 2018
1 parent 2d7e014 commit 7e41bba
Show file tree
Hide file tree
Showing 7 changed files with 246 additions and 36 deletions.
90 changes: 90 additions & 0 deletions badger/cmd/fill.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright 2018 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cmd

import (
"crypto/rand"
"time"

"github.com/dgraph-io/badger"
"github.com/dgraph-io/badger/y"
"github.com/spf13/cobra"
)

var fillCmd = &cobra.Command{
Use: "fill",
Short: "Fill Badger with random data.",
Long: `
This command would fill Badger with random data. Useful for testing and performance analysis.
`,
RunE: fill,
}

var keySz, valSz int
var numKeys float64
var force bool

const mil float64 = 1e6

func init() {
RootCmd.AddCommand(fillCmd)
fillCmd.Flags().IntVarP(&keySz, "key-size", "k", 32, "Size of key")
fillCmd.Flags().IntVarP(&valSz, "val-size", "v", 128, "Size of value")
fillCmd.Flags().Float64VarP(&numKeys, "keys-mil", "m", 10.0,
"Number of keys to add in millions")
fillCmd.Flags().BoolVarP(&force, "force-compact", "f", true, "Force compact level 0 on close.")
}

func fill(cmd *cobra.Command, args []string) error {
opts := badger.DefaultOptions
opts.Dir = sstDir
opts.ValueDir = vlogDir
opts.Truncate = truncate
opts.SyncWrites = false
opts.CompactL0OnClose = force

db, err := badger.Open(opts)
if err != nil {
return err
}
defer func() {
start := time.Now()
err := db.Close()
badger.Infof("DB.Close. Error: %v. Time taken: %s", err, time.Since(start))
}()

start := time.Now()
batch := db.NewWriteBatch()
num := int64(numKeys * mil)
for i := int64(1); i <= num; i++ {
k := make([]byte, keySz)
v := make([]byte, valSz)
y.Check2(rand.Read(k))
y.Check2(rand.Read(v))
if err := batch.Set(k, v, 0); err != nil {
return err
}
if i%1e5 == 0 {
badger.Infof("Written keys: %d\n", i)
}
}
if err := batch.Flush(); err != nil {
return err
}
badger.Infof("%d keys written. Time taken: %s\n", num, time.Since(start))
return nil
}
56 changes: 56 additions & 0 deletions badger/cmd/flatten.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2018 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cmd

import (
"github.com/dgraph-io/badger"
"github.com/spf13/cobra"
)

var flattenCmd = &cobra.Command{
Use: "flatten",
Short: "Flatten the LSM tree.",
Long: `
This command would compact all the LSM tables into one level.
`,
RunE: flatten,
}

var numWorkers int

func init() {
RootCmd.AddCommand(flattenCmd)
flattenCmd.Flags().IntVarP(&numWorkers, "num-workers", "w", 1,
"Number of concurrent compactors to run. More compactors would use more"+
" server resources to potentially achieve faster compactions.")
}

func flatten(cmd *cobra.Command, args []string) error {
opts := badger.DefaultOptions
opts.Dir = sstDir
opts.ValueDir = vlogDir
opts.Truncate = truncate
opts.NumCompactors = 0

db, err := badger.Open(opts)
if err != nil {
return err
}
defer db.Close()

return db.Flatten(numWorkers)
}
7 changes: 7 additions & 0 deletions badger/cmd/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ to the Dgraph team.
fmt.Println("Error:", err.Error())
os.Exit(1)
}
if !showTables {
return
}
err = tableInfo(sstDir, vlogDir)
if err != nil {
fmt.Println("Error:", err.Error())
Expand All @@ -55,8 +58,12 @@ to the Dgraph team.
},
}

var showTables bool

func init() {
RootCmd.AddCommand(infoCmd)
infoCmd.Flags().BoolVarP(&showTables, "show-tables", "s", false,
"If set to true, show tables as well.")
}

func hbytes(sz int64) string {
Expand Down
8 changes: 2 additions & 6 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,16 +172,12 @@ func (cs *compactStatus) compareAndAdd(_ thisAndNextLevelRLocked, cd compactDef)
}
// Check whether this level really needs compaction or not. Otherwise, we'll end up
// running parallel compactions for the same level.
// NOTE: We can directly call thisLevel.totalSize, because we already have acquire a read lock
// over this and the next level.
if cd.thisLevel.totalSize-thisLevel.delSize < cd.thisLevel.maxTotalSize {
return false
}
// Update: We should not be checking size here. Compaction priority already did the size checks.
// Here we should just be executing the wish of others.

thisLevel.ranges = append(thisLevel.ranges, cd.thisRange)
nextLevel.ranges = append(nextLevel.ranges, cd.nextRange)
thisLevel.delSize += cd.thisSize

return true
}

Expand Down
88 changes: 75 additions & 13 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/dgraph-io/badger/skl"
"github.com/dgraph-io/badger/table"
"github.com/dgraph-io/badger/y"
humanize "github.com/dustin/go-humanize"
"github.com/pkg/errors"
"golang.org/x/net/trace"
)
Expand Down Expand Up @@ -360,7 +361,7 @@ func (db *DB) Close() (err error) {
time.Sleep(10 * time.Millisecond)
}
}
db.flushChan <- flushTask{nil, valuePointer{}} // Tell flusher to quit.
close(db.flushChan) // Tell flusher to quit.

if db.closers.memtable != nil {
db.closers.memtable.Wait()
Expand All @@ -373,19 +374,12 @@ func (db *DB) Close() (err error) {

// Force Compact L0
// We don't need to care about cstatus since no parallel compaction is running.
cd := compactDef{
elog: trace.New("Badger", "Compact"),
thisLevel: db.lc.levels[0],
nextLevel: db.lc.levels[1],
}
cd.elog.SetMaxEvents(100)
defer cd.elog.Finish()
if db.lc.fillTablesL0(&cd) {
if err := db.lc.runCompactDef(0, cd); err != nil {
cd.elog.LazyPrintf("\tLOG Compact FAILED with error: %+v: %+v", err, cd)
if db.opt.CompactL0OnClose {
if err := db.lc.doCompact(compactionPriority{level: 0, score: 1.73}); err != nil {
Warningf("Error while forcing compaction on level 0: %v", err)
} else {
Infof("Force compaction on level 0 done")
}
} else {
cd.elog.LazyPrintf("fillTables failed for level zero. No compaction required")
}

if lcErr := db.lc.close(); err == nil {
Expand Down Expand Up @@ -1143,6 +1137,74 @@ func (db *DB) MaxBatchSize() int64 {
return db.opt.maxBatchSize
}

func (db *DB) Flatten(workers int) error {
if db.opt.NumCompactors > 0 {
return errors.New("Flatten can only be run when no compact workers are running.")
}

compactAway := func(cp compactionPriority) error {
Infof("Attempting to compact with %+v\n", cp)
errCh := make(chan error, 1)
for i := 0; i < workers; i++ {
go func() {
errCh <- db.lc.doCompact(cp)
}()
}
var success int
var rerr error
for i := 0; i < workers; i++ {
err := <-errCh
if err != nil {
rerr = err
Warningf("While running doCompact with %+v. Error: %v\n", cp, err)
} else {
success++
}
}
if success == 0 {
return rerr
}
// We could do at least one successful compaction. So, we'll consider this a success.
Infof("%d compactor(s) succeeded. One or more tables from level %d compacted.\n",
success, cp.level)
return nil
}

hbytes := func(sz int64) string {
return humanize.Bytes(uint64(sz))
}

for {
Infof("\n")
var levels []int
for i, l := range db.lc.levels {
sz := l.getTotalSize()
Infof("Level: %d. %8s Size. %8s Max.\n",
i, hbytes(l.getTotalSize()), hbytes(l.maxTotalSize))
if sz > 0 {
levels = append(levels, i)
}
}
if len(levels) == 1 {
prios := db.lc.pickCompactLevels()
if len(prios) == 0 || prios[0].score <= 1.0 {
Infof("All tables consolidated into one level. Flattening done.\n")
return nil
}
if err := compactAway(prios[0]); err != nil {
return err
}
continue
}
// Create an artificial compaction priority, to ensure that we compact the level.
cp := compactionPriority{level: levels[0], score: 1.71}
if err := compactAway(cp); err != nil {
return err
}
}
return nil
}

// MergeOperator represents a Badger merge operator.
type MergeOperator struct {
sync.RWMutex
Expand Down
20 changes: 8 additions & 12 deletions levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,6 @@ func (s *levelsController) startCompact(lc *y.Closer) {

func (s *levelsController) runWorker(lc *y.Closer) {
defer lc.Done()
if s.kv.opt.DoNotCompact {
return
}

randomDelay := time.NewTimer(time.Duration(rand.Int31n(1000)) * time.Millisecond)
select {
Expand All @@ -257,10 +254,10 @@ func (s *levelsController) runWorker(lc *y.Closer) {
case <-ticker.C:
prios := s.pickCompactLevels()
for _, p := range prios {
// TODO: Handle error.
didCompact, _ := s.doCompact(p)
if didCompact {
if err := s.doCompact(p); err == nil {
break
} else {
Errorf("Error while running doCompact: %v\n", err)
}
}
case <-lc.HasBeenClosed():
Expand Down Expand Up @@ -624,7 +621,6 @@ func (s *levelsController) fillTables(cd *compactDef) bool {
if s.cstatus.overlapsWith(cd.nextLevel.level, cd.nextRange) {
continue
}

if !s.cstatus.compareAndAdd(thisAndNextLevelRLocked{}, *cd) {
continue
}
Expand Down Expand Up @@ -677,7 +673,7 @@ func (s *levelsController) runCompactDef(l int, cd compactDef) (err error) {
}

// doCompact picks some table on level l and compacts it away to the next level.
func (s *levelsController) doCompact(p compactionPriority) (bool, error) {
func (s *levelsController) doCompact(p compactionPriority) error {
l := p.level
y.AssertTrue(l+1 < s.kv.opt.MaxLevels) // Sanity check.

Expand All @@ -696,13 +692,13 @@ func (s *levelsController) doCompact(p compactionPriority) (bool, error) {
if l == 0 {
if !s.fillTablesL0(&cd) {
cd.elog.LazyPrintf("fillTables failed for level: %d\n", l)
return false, nil
return fmt.Errorf("Unable to fill tables for level: %d\n", l)
}

} else {
if !s.fillTables(&cd) {
cd.elog.LazyPrintf("fillTables failed for level: %d\n", l)
return false, nil
return fmt.Errorf("Unable to fill tables for level: %d\n", l)
}
}
defer s.cstatus.delete(cd) // Remove the ranges from compaction status.
Expand All @@ -712,12 +708,12 @@ func (s *levelsController) doCompact(p compactionPriority) (bool, error) {
if err := s.runCompactDef(l, cd); err != nil {
// This compaction couldn't be done successfully.
cd.elog.LazyPrintf("\tLOG Compact FAILED with error: %+v: %+v", err, cd)
return false, err
return err
}

s.cstatus.toLog(cd.elog)
cd.elog.LazyPrintf("Compaction for level: %d DONE", cd.thisLevel.level)
return true, nil
return nil
}

func (s *levelsController) addLevel0Table(t *table.Table) error {
Expand Down
Loading

0 comments on commit 7e41bba

Please sign in to comment.