Skip to content

Commit

Permalink
[BREAKING] feat(index): Use flatbuffers instead of protobuf (dgraph-i…
Browse files Browse the repository at this point in the history
…o#1546)

This PR 
- Uses flatbuffers instead of protobufs for table index and directly stores byte slices in the cache.
- Uses MaxVersion to pick the oldest tables for compaction first.
- Uses leveldb/bloom so that we can test it without unmarshal
- Adds uncompressed size and key count in table index.
- Updates write bench tool to use managed mode.
  • Loading branch information
Ibrahim Jarif authored Oct 3, 2020
1 parent 4b6872e commit 599363b
Show file tree
Hide file tree
Showing 28 changed files with 1,207 additions and 977 deletions.
30 changes: 25 additions & 5 deletions badger/cmd/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"io/ioutil"
"os"
"path"
"path/filepath"
"sort"
"strings"
Expand Down Expand Up @@ -239,27 +240,46 @@ func dur(src, dst time.Time) string {
return humanize.RelTime(dst, src, "earlier", "later")
}

func getInfo(fileInfos []os.FileInfo, tid uint64) int64 {
fileName := table.IDToFilename(tid)
for _, fi := range fileInfos {
if path.Base(fi.Name()) == fileName {
return fi.Size()
}
}
return 0
}

func tableInfo(dir, valueDir string, db *badger.DB) {
// we want all tables with keys count here.
tables := db.Tables(true)
tables := db.Tables()
fileInfos, err := ioutil.ReadDir(dir)
y.Check(err)

fmt.Println()
fmt.Println("SSTable [Li, Id, Total Keys including internal keys] " +
"[Left Key, Version -> Right Key, Version] [Index Size] [BF Size]")
"[Compression Ratio, Uncompressed Size, Index Size, BF Size] " +
"[Left Key, Version -> Right Key, Version]")
totalIndex := uint64(0)
totalBloomFilter := uint64(0)
totalCompressionRatio := float64(0.0)
for _, t := range tables {
lk, lt := y.ParseKey(t.Left), y.ParseTs(t.Left)
rk, rt := y.ParseKey(t.Right), y.ParseTs(t.Right)

compressionRatio := float64(t.UncompressedSize) /
float64(getInfo(fileInfos, t.ID)-int64(t.IndexSz))
fmt.Printf("SSTable [L%d, %03d, %07d] [%.2f, %s, %s, %s] [%20X, v%d -> %20X, v%d]\n",
t.Level, t.ID, t.KeyCount, compressionRatio, hbytes(int64(t.UncompressedSize)),
hbytes(int64(t.IndexSz)), hbytes(int64(t.BloomFilterSize)), lk, lt, rk, rt)
totalIndex += uint64(t.IndexSz)
totalBloomFilter += uint64(t.BloomFilterSize)
fmt.Printf("SSTable [L%d, %03d, %07d] [%20X, v%d -> %20X, v%d] [%s] [%s] \n",
t.Level, t.ID, t.KeyCount, lk, lt, rk, rt, hbytes(int64(t.IndexSz)),
hbytes(int64(t.BloomFilterSize)))
totalCompressionRatio += compressionRatio
}
fmt.Println()
fmt.Printf("Total Index Size: %s\n", hbytes(int64(totalIndex)))
fmt.Printf("Total BloomFilter Size: %s\n", hbytes(int64(totalIndex)))
fmt.Printf("Mean Compression Ratio: %.2f\n", totalCompressionRatio/float64(len(tables)))
fmt.Println()
}

Expand Down
53 changes: 16 additions & 37 deletions badger/cmd/read_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package cmd
import (
"context"
"fmt"
"math"
"math/rand"
"strings"
"sync/atomic"
Expand Down Expand Up @@ -120,7 +121,6 @@ func readBench(cmd *cobra.Command, args []string) error {
return y.Wrapf(err, "unable to open DB")
}
defer db.Close()
now := time.Now()

fmt.Println("*********************************************************")
fmt.Println("Starting to benchmark Reads")
Expand All @@ -131,32 +131,7 @@ func readBench(cmd *cobra.Command, args []string) error {
fullScanDB(db)
return nil
}
keys, err := getSampleKeys(db)
if err != nil {
return y.Wrapf(err, "error while sampling keys")
}
fmt.Println("*********************************************************")
fmt.Printf("Total Sampled Keys: %d, read in time: %s\n", len(keys), time.Since(now))
fmt.Println("*********************************************************")

if len(keys) == 0 {
fmt.Println("DB is empty, hence returning")
return nil
}
c := z.NewCloser(0)
startTime = time.Now()
for i := 0; i < numGoroutines; i++ {
c.AddRunning(1)
go readKeys(db, c, keys)
}

// also start printing stats
c.AddRunning(1)
go printStats(c)

<-time.After(dur)
c.SignalAndWait()

readTest(db, dur)
return nil
}

Expand Down Expand Up @@ -199,16 +174,20 @@ func readKeys(db *badger.DB, c *z.Closer, keys [][]byte) {

func lookupForKey(db *badger.DB, key []byte) (sz uint64) {
err := db.View(func(txn *badger.Txn) error {
itm, err := txn.Get(key)
y.Check(err)

if keysOnly {
sz = uint64(itm.KeySize())
} else {
y.Check2(itm.ValueCopy(nil))
sz = uint64(itm.EstimatedSize())
iopt := badger.DefaultIteratorOptions
iopt.AllVersions = true
it := txn.NewKeyIterator(key, iopt)
defer it.Close()

cnt := 0
for it.Seek(key); it.Valid(); it.Next() {
itm := it.Item()
sz += uint64(itm.EstimatedSize())
cnt++
if cnt == 10 {
break
}
}

return nil
})
y.Check(err)
Expand All @@ -219,7 +198,7 @@ func lookupForKey(db *badger.DB, key []byte) (sz uint64) {
func getSampleKeys(db *badger.DB) ([][]byte, error) {
var keys [][]byte
count := 0
stream := db.NewStream()
stream := db.NewStreamAt(math.MaxUint64)

// overide stream.KeyToList as we only want keys. Also
// we can take only first version for the key.
Expand Down
85 changes: 73 additions & 12 deletions badger/cmd/write_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/binary"
"fmt"
"log"
"math"
"math/rand"
"os"
"path/filepath"
Expand Down Expand Up @@ -88,20 +89,20 @@ const (
func init() {
benchCmd.AddCommand(writeBenchCmd)
writeBenchCmd.Flags().IntVarP(&keySz, "key-size", "k", 32, "Size of key")
writeBenchCmd.Flags().IntVarP(&valSz, "val-size", "v", 128, "Size of value")
writeBenchCmd.Flags().IntVar(&valSz, "val-size", 128, "Size of value")
writeBenchCmd.Flags().Float64VarP(&numKeys, "keys-mil", "m", 10.0,
"Number of keys to add in millions")
writeBenchCmd.Flags().BoolVar(&syncWrites, "sync", true,
"If true, sync writes to disk.")
writeBenchCmd.Flags().BoolVarP(&force, "force-compact", "f", true,
"Force compact level 0 on close.")
writeBenchCmd.Flags().BoolVarP(&sorted, "sorted", "s", false, "Write keys in sorted order.")
writeBenchCmd.Flags().BoolVarP(&showLogs, "logs", "l", false, "Show Badger logs.")
writeBenchCmd.Flags().BoolVarP(&showLogs, "verbose", "v", false, "Show Badger logs.")
writeBenchCmd.Flags().IntVarP(&valueThreshold, "value-th", "t", 1<<10, "Value threshold")
writeBenchCmd.Flags().IntVarP(&numVersions, "num-version", "n", 1, "Number of versions to keep")
writeBenchCmd.Flags().Int64Var(&blockCacheSize, "block-cache", 0,
writeBenchCmd.Flags().Int64Var(&blockCacheSize, "block-cache-mb", 0,
"Size of block cache in MB")
writeBenchCmd.Flags().Int64Var(&indexCacheSize, "index-cache", 0,
writeBenchCmd.Flags().Int64Var(&indexCacheSize, "index-cache-mb", 0,
"Size of index cache in MB.")
writeBenchCmd.Flags().Uint32Var(&vlogMaxEntries, "vlog-maxe", 1000000, "Value log Max Entries")
writeBenchCmd.Flags().StringVarP(&encryptionKey, "encryption-key", "e", "",
Expand All @@ -112,7 +113,7 @@ func init() {
"Load Bloom filter on DB open.")
writeBenchCmd.Flags().BoolVar(&detectConflicts, "conficts", true,
"If true, it badger will detect the conflicts")
writeBenchCmd.Flags().BoolVar(&compression, "compression", false,
writeBenchCmd.Flags().BoolVar(&compression, "compression", true,
"If true, badger will use ZSTD mode")
writeBenchCmd.Flags().BoolVar(&showDir, "show-dir", false,
"If true, the report will include the directory contents")
Expand All @@ -133,7 +134,7 @@ func writeRandom(db *badger.DB, num uint64) error {
y.Check2(rand.Read(value))

es := uint64(keySz + valSz) // entry size is keySz + valSz
batch := db.NewWriteBatch()
batch := db.NewManagedWriteBatch()

ttlPeriod, errParse := time.ParseDuration(ttlDuration)
y.Check(errParse)
Expand All @@ -146,11 +147,11 @@ func writeRandom(db *badger.DB, num uint64) error {
if ttlPeriod != 0 {
e.WithTTL(ttlPeriod)
}
err := batch.SetEntry(e)
err := batch.SetEntryAt(e, 1)
for err == badger.ErrBlockedWrites {
time.Sleep(time.Second)
batch = db.NewWriteBatch()
err = batch.SetEntry(e)
batch = db.NewManagedWriteBatch()
err = batch.SetEntryAt(e, 1)
}
if err != nil {
panic(err)
Expand All @@ -162,6 +163,34 @@ func writeRandom(db *badger.DB, num uint64) error {
return batch.Flush()
}

func readTest(db *badger.DB, dur time.Duration) {
now := time.Now()
keys, err := getSampleKeys(db)
if err != nil {
panic(err)
}
fmt.Println("*********************************************************")
fmt.Printf("Total Sampled Keys: %d, read in time: %s\n", len(keys), time.Since(now))
fmt.Println("*********************************************************")

if len(keys) == 0 {
fmt.Println("DB is empty, hence returning")
return
}
c := z.NewCloser(0)
readStartTime := time.Now()
for i := 0; i < numGoroutines; i++ {
c.AddRunning(1)
go readKeys(db, c, keys)
}

// also start printing stats
c.AddRunning(1)
go printReadStats(c, readStartTime)
<-time.After(dur)
c.SignalAndWait()
}

func writeSorted(db *badger.DB, num uint64) error {
value := make([]byte, valSz)
y.Check2(rand.Read(value))
Expand Down Expand Up @@ -257,7 +286,7 @@ func writeBench(cmd *cobra.Command, args []string) error {
}

fmt.Printf("Opening badger with options = %+v\n", opt)
db, err := badger.Open(opt)
db, err := badger.OpenManaged(opt)
if err != nil {
return err
}
Expand All @@ -282,6 +311,16 @@ func writeBench(cmd *cobra.Command, args []string) error {
if sorted {
err = writeSorted(db, num)
} else {
go func() {
for {
select {
case <-c.HasBeenClosed():
return
case <-time.After(30 * time.Second):
readTest(db, 5*time.Minute)
}
}
}()
err = writeRandom(db, num)
}

Expand All @@ -296,7 +335,7 @@ func showKeysStats(db *badger.DB) {
validKeyCount uint32
)

txn := db.NewTransaction(false)
txn := db.NewTransactionAt(math.MaxUint64, false)
defer txn.Discard()

iopt := badger.DefaultIteratorOptions
Expand Down Expand Up @@ -363,7 +402,7 @@ func reportStats(c *z.Closer, db *badger.DB) {
entries := atomic.LoadUint64(&entriesWritten)
bytesRate := sz / uint64(dur.Seconds())
entriesRate := entries / uint64(dur.Seconds())
fmt.Printf("Time elapsed: %s, bytes written: %s, speed: %s/sec, "+
fmt.Printf("[WRITE] Time elapsed: %s, bytes written: %s, speed: %s/sec, "+
"entries written: %d, speed: %d/sec, gcSuccess: %d\n", y.FixedDuration(time.Since(startTime)),
humanize.Bytes(sz), humanize.Bytes(bytesRate), entries, entriesRate, gcSuccess)
}
Expand Down Expand Up @@ -453,3 +492,25 @@ func dropPrefix(c *z.Closer, db *badger.DB) {
}
}
}

func printReadStats(c *z.Closer, startTime time.Time) {
defer c.Done()

t := time.NewTicker(time.Second)
defer t.Stop()
for {
select {
case <-c.HasBeenClosed():
return
case <-t.C:
dur := time.Since(startTime)
sz := atomic.LoadUint64(&sizeRead)
entries := atomic.LoadUint64(&entriesRead)
bytesRate := sz / uint64(dur.Seconds())
entriesRate := entries / uint64(dur.Seconds())
fmt.Printf("[READ] Time elapsed: %s, bytes read: %s, speed: %s/sec, "+
"entries read: %d, speed: %d/sec\n", y.FixedDuration(time.Since(startTime)),
humanize.Bytes(sz), humanize.Bytes(bytesRate), entries, entriesRate)
}
}
}
15 changes: 0 additions & 15 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (
"math"
"sync"

"golang.org/x/net/trace"

"github.com/dgraph-io/badger/v2/table"
"github.com/dgraph-io/badger/v2/y"
)
Expand Down Expand Up @@ -128,19 +126,6 @@ type compactStatus struct {
levels []*levelCompactStatus
}

func (cs *compactStatus) toLog(tr trace.Trace) {
cs.RLock()
defer cs.RUnlock()

tr.LazyPrintf("Compaction status:")
for i, l := range cs.levels {
if l.debug() == "" {
continue
}
tr.LazyPrintf("[%d] %s", i, l.debug())
}
}

func (cs *compactStatus) overlapsWith(level int, this keyRange) bool {
cs.RLock()
defer cs.RUnlock()
Expand Down
Loading

0 comments on commit 599363b

Please sign in to comment.