Skip to content

Commit

Permalink
Merge branch 'compact-db' of https://github.com/vincent-petithory/bolt
Browse files Browse the repository at this point in the history
…into vincent-petithory-compact-db
  • Loading branch information
benbjohnson committed Sep 1, 2016
2 parents 94c8db5 + 3279c88 commit 52d0f5e
Show file tree
Hide file tree
Showing 2 changed files with 360 additions and 0 deletions.
180 changes: 180 additions & 0 deletions cmd/bolt/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ func (m *Main) Run(args ...string) error {
return newBenchCommand(m).Run(args[1:]...)
case "check":
return newCheckCommand(m).Run(args[1:]...)
case "compact":
return newCompactCommand(m).Run(args[1:]...)
case "dump":
return newDumpCommand(m).Run(args[1:]...)
case "info":
Expand Down Expand Up @@ -130,6 +132,7 @@ The commands are:
bench run synthetic benchmark against bolt
check verifies integrity of bolt database
compact copies a bolt database, compacting it in the process
info print basic info
help print this screen
pages print list of pages with their types
Expand Down Expand Up @@ -1530,3 +1533,180 @@ func (n *leafPageElement) value() []byte {
buf := (*[maxAllocSize]byte)(unsafe.Pointer(n))
return buf[n.pos+n.ksize : n.pos+n.ksize+n.vsize]
}

// CompactCommand represents the "compact" command execution.
type CompactCommand struct {
Stdin io.Reader
Stdout io.Writer
Stderr io.Writer
}

// newCompactCommand returns a CompactCommand.
func newCompactCommand(m *Main) *CompactCommand {
return &CompactCommand{
Stdin: m.Stdin,
Stdout: m.Stdout,
Stderr: m.Stderr,
}
}

// BucketWalkFunc is the type of the function called for keys (buckets and "normal" values)
// discovered by Walk.
// keys is the list of keys to descend to the bucket owning the discovered key/value pair k/v.
type BucketWalkFunc func(keys [][]byte, k []byte, v []byte) error

// Walk walks recursively the bolt database db, calling walkFn for each key it finds.
func (cmd *CompactCommand) Walk(db *bolt.DB, walkFn BucketWalkFunc) error {
return db.View(func(tx *bolt.Tx) error {
return tx.ForEach(func(name []byte, b *bolt.Bucket) error {
return cmd.walkBucket(b, nil, name, nil, walkFn)
})
})
}

func (cmd *CompactCommand) walkBucket(b *bolt.Bucket, keys [][]byte, k []byte, v []byte, walkFn BucketWalkFunc) error {
if err := walkFn(keys, k, v); err != nil {
return err
}
// not a bucket, exit.
if v != nil {
return nil
}

keys2 := append(keys, k)
return b.ForEach(func(k, v []byte) error {
if v == nil {
return cmd.walkBucket(b.Bucket(k), keys2, k, nil, walkFn)
}
return cmd.walkBucket(b, keys2, k, v, walkFn)
})
}

// Run executes the command.
func (cmd *CompactCommand) Run(args ...string) (err error) {
// Parse flags.
fs := flag.NewFlagSet("", flag.ContinueOnError)
fs.SetOutput(cmd.Stderr)
var txMaxSize int64
fs.Int64Var(&txMaxSize, "tx-max-size", 0, "commit tx when key/value size sum exceed this value. If 0, only one transaction is used. If you are compacting a large database, set this to a value appropriate for the available memory.")
help := fs.Bool("h", false, "print this help")
if err := fs.Parse(args); err != nil {
return err
} else if *help {
fmt.Fprintln(cmd.Stderr, cmd.Usage())
fs.PrintDefaults()
return ErrUsage
}

// Require database path.
path := fs.Arg(0)
if path == "" {
return ErrPathRequired
} else if _, err := os.Stat(path); os.IsNotExist(err) {
return ErrFileNotFound
}
fi, err := os.Stat(path)
if err != nil {
return err
}
initialSize := fi.Size()

// Open database.
db, err := bolt.Open(path, 0444, nil)
if err != nil {
return err
}
defer db.Close()

var dstPath string
if fs.NArg() < 2 {
f, err := ioutil.TempFile("", "bolt-compact-")
if err != nil {
return fmt.Errorf("temp file: %v", err)
}
_ = f.Close()
_ = os.Remove(f.Name())
dstPath = f.Name()
fmt.Fprintf(cmd.Stdout, "compacting db to %s\n", dstPath)
} else {
dstPath = fs.Arg(1)
}

defer func() {
fi, err := os.Stat(dstPath)
if err != nil {
fmt.Fprintln(cmd.Stderr, err)
}
newSize := fi.Size()
if newSize == 0 {
fmt.Fprintln(cmd.Stderr, "db size is 0")
}
fmt.Fprintf(cmd.Stdout, "%d -> %d bytes (gain=%.2fx)\n", initialSize, newSize, float64(initialSize)/float64(newSize))
}()

dstdb, err := bolt.Open(dstPath, 0666, nil)
if err != nil {
return err
}
defer dstdb.Close()

// commit regularly, or we'll run out of memory for large datasets if using one transaction.
var size int64
tx, err := dstdb.Begin(true)
if err != nil {
return err
}
defer func() {
if err != nil {
_ = tx.Rollback()
} else {
err = tx.Commit()
}
}()
return cmd.Walk(db, func(keys [][]byte, k []byte, v []byte) error {
s := int64(len(k) + len(v))
if size+s > txMaxSize && txMaxSize != 0 {
if err := tx.Commit(); err != nil {
return err
}
tx, err = dstdb.Begin(true)
if err != nil {
return err
}
size = 0
}
size += s
nk := len(keys)
if nk == 0 {
_, err := tx.CreateBucket(k)
return err
}

b := tx.Bucket(keys[0])
if nk > 1 {
for _, k := range keys[1:] {
b = b.Bucket(k)
}
}
if v == nil {
_, err := b.CreateBucket(k)
return err
}
return b.Put(k, v)
})
}

// Usage returns the help message.
func (cmd *CompactCommand) Usage() string {
return strings.TrimLeft(`
usage: bolt compact PATH [DST_PATH]
Compact opens a database at PATH and walks it recursively entirely,
copying keys as they are found from all buckets, to a newly created db.
If DST_PATH is non-empty, the new db is created at DST_PATH, else it will be
in a temporary location.
The original db is left untouched.
`, "\n")
}
180 changes: 180 additions & 0 deletions cmd/bolt/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@ package main_test

import (
"bytes"
crypto "crypto/rand"
"encoding/binary"
"fmt"
"io"
"io/ioutil"
"math/rand"
"os"
"strconv"
"testing"
Expand Down Expand Up @@ -183,3 +188,178 @@ func (db *DB) Close() error {
defer os.Remove(db.Path)
return db.DB.Close()
}

func TestCompactCommand_Run(t *testing.T) {
var s int64
if err := binary.Read(crypto.Reader, binary.BigEndian, &s); err != nil {
t.Fatal(err)
}
rand.Seed(s)

dstdb := MustOpen(0666, nil)
dstdb.Close()

// fill the db
db := MustOpen(0666, nil)
if err := db.Update(func(tx *bolt.Tx) error {
n := 2 + rand.Intn(5)
for i := 0; i < n; i++ {
k := []byte(fmt.Sprintf("b%d", i))
b, err := tx.CreateBucketIfNotExists(k)
if err != nil {
return err
}
if err := fillBucket(b, append(k, '.')); err != nil {
return err
}
}
return nil
}); err != nil {
db.Close()
t.Fatal(err)
}

// make the db grow by adding large values, and delete them.
if err := db.Update(func(tx *bolt.Tx) error {
b, err := tx.CreateBucketIfNotExists([]byte("large_vals"))
if err != nil {
return err
}
n := 5 + rand.Intn(5)
for i := 0; i < n; i++ {
v := make([]byte, 1000*1000*(1+rand.Intn(5)))
_, err := crypto.Read(v)
if err != nil {
return err
}
if err := b.Put([]byte(fmt.Sprintf("l%d", i)), v); err != nil {
return err
}
}
return nil
}); err != nil {
db.Close()
t.Fatal(err)
}
if err := db.Update(func(tx *bolt.Tx) error {
c := tx.Bucket([]byte("large_vals")).Cursor()
for k, _ := c.First(); k != nil; k, _ = c.Next() {
if err := c.Delete(); err != nil {
return err
}
}
return tx.DeleteBucket([]byte("large_vals"))
}); err != nil {
db.Close()
t.Fatal(err)
}
db.DB.Close()
defer db.Close()
defer dstdb.Close()

dbChk, err := chkdb(db.Path)
if err != nil {
t.Fatal(err)
}

m := NewMain()
if err := m.Run("compact", db.Path, dstdb.Path); err != nil {
t.Fatal(err)
}

dbChkAfterCompact, err := chkdb(db.Path)
if err != nil {
t.Fatal(err)
}

dstdbChk, err := chkdb(dstdb.Path)
if err != nil {
t.Fatal(err)
}

if !bytes.Equal(dbChk, dbChkAfterCompact) {
t.Error("the original db has been touched")
}
if !bytes.Equal(dbChk, dstdbChk) {
t.Error("the compacted db data isn't the same than the original db")
}
}

func fillBucket(b *bolt.Bucket, prefix []byte) error {
n := 10 + rand.Intn(50)
for i := 0; i < n; i++ {
v := make([]byte, 10*(1+rand.Intn(4)))
_, err := crypto.Read(v)
if err != nil {
return err
}
k := append(prefix, []byte(fmt.Sprintf("k%d", i))...)
if err := b.Put(k, v); err != nil {
return err
}
}
// limit depth of subbuckets
s := 2 + rand.Intn(4)
if len(prefix) > (2*s + 1) {
return nil
}
n = 1 + rand.Intn(3)
for i := 0; i < n; i++ {
k := append(prefix, []byte(fmt.Sprintf("b%d", i))...)
sb, err := b.CreateBucket(k)
if err != nil {
return err
}
if err := fillBucket(sb, append(k, '.')); err != nil {
return err
}
}
return nil
}

func chkdb(path string) ([]byte, error) {
db, err := bolt.Open(path, 0666, nil)
if err != nil {
return nil, err
}
defer db.Close()
var buf bytes.Buffer
err = db.View(func(tx *bolt.Tx) error {
return tx.ForEach(func(name []byte, b *bolt.Bucket) error {
return walkBucket(b, name, nil, &buf)
})
})
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}

func walkBucket(parent *bolt.Bucket, k []byte, v []byte, w io.Writer) error {
_, err := w.Write(k)
if err != nil {
return err
}
_, err = io.WriteString(w, ":")
if err != nil {
return err
}
_, err = w.Write(v)
if err != nil {
return err
}
_, err = fmt.Fprintln(w)
if err != nil {
return err
}
// not a bucket, exit.
if v != nil {
return nil
}
return parent.ForEach(func(k, v []byte) error {
if v == nil {
return walkBucket(parent.Bucket(k), k, nil, w)
}
return walkBucket(parent, k, v, w)
})
}

0 comments on commit 52d0f5e

Please sign in to comment.