Skip to content

Commit

Permalink
Improve value garbage collection.
Browse files Browse the repository at this point in the history
  • Loading branch information
manishrjain committed Apr 25, 2017
1 parent 60f4269 commit ddf126d
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 131 deletions.
8 changes: 3 additions & 5 deletions badger/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type Options struct {
DoNotCompact bool
MapTablesTo int
NumMemtables int
DoNotRunValueGC bool
ValueGCThreshold float64
SyncWrites bool
}

Expand All @@ -67,7 +67,7 @@ var DefaultOptions = Options{
MapTablesTo: table.MemoryMap,
NumMemtables: 5,
MemtableSlack: 10 << 20,
DoNotRunValueGC: false, // Only for testing.
ValueGCThreshold: 0.8, // Set to zero to not run GC.
SyncWrites: true,
}

Expand Down Expand Up @@ -244,9 +244,7 @@ func (s *KV) writeToLSM(b *block) {
var offsetBuf [16]byte
y.AssertTrue(len(b.Ptrs) == len(b.Entries))
for i, entry := range b.Entries {
isPointer := entry.Meta&BitValuePointer > 0

if !isPointer && len(entry.Value) < s.opt.ValueThreshold { // Will include deletion / tombstone case.
if len(entry.Value) < s.opt.ValueThreshold { // Will include deletion / tombstone case.
s.mt.Put(entry.Key, entry.Value, entry.Meta)
} else {
s.mt.Put(entry.Key, b.Ptrs[i].Encode(offsetBuf[:]), entry.Meta|BitValuePointer)
Expand Down
2 changes: 1 addition & 1 deletion badger/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func getTestOptions(dir string) *Options {
opt.LevelOneSize = 4 << 15 // Force more compaction.
opt.Verbose = true
opt.Dir = dir
// opt.ValueGCThreshold = 0.0
opt.SyncWrites = true // Some tests seem to need this to pass.
opt.ValueGCThreshold = 0.0
return opt
}

Expand Down
231 changes: 121 additions & 110 deletions badger/value.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type logFile struct {
fd *os.File
fid int32
offset int64
size int64
}

func (lf *logFile) read(buf []byte, offset int64) error {
Expand All @@ -73,6 +74,9 @@ func (lf *logFile) doneWriting() {
y.Check(lf.fd.Close())
lf.fd, err = os.OpenFile(path, os.O_RDONLY, 0666)
y.Check(err)

fi, err := lf.fd.Stat()
lf.size = fi.Size()
}

type logEntry func(e Entry)
Expand Down Expand Up @@ -140,17 +144,18 @@ func (f *logFile) iterate(offset int64, fn logEntry) error {
return nil
}

func (lf *valueLog) move(f *logFile, newlf *logFile) {
func (vlog *valueLog) move(f *logFile) {
fmt.Println("move callled")
tr := trace.New("badger", "valuelog-move")
ctx := trace.NewContext(context.Background(), tr)

var b block
var buf bytes.Buffer
b := &block{
Wg: sync.WaitGroup{},
}

y.AssertTrue(lf.kv != nil)
y.AssertTrue(vlog.kv != nil)
fe := func(e Entry) {
vptr, meta := lf.kv.get(ctx, e.Key)
vptr, meta := vlog.kv.get(ctx, e.Key)

if (meta & BitDelete) > 0 {
return
Expand All @@ -173,19 +178,14 @@ func (lf *valueLog) move(f *logFile, newlf *logFile) {
if int32(vp.Fid) == f.fid && int64(vp.Offset) == e.Offset {
// This new entry only contains the key, and a pointer to the value.
var ne Entry
ne.Meta = e.Meta | BitValuePointer
y.AssertTruef(e.Meta == 0, "Got meta: %v", e.Meta)
ne.Meta = e.Meta
ne.Key = make([]byte, len(e.Key))
copy(ne.Key, e.Key)
ne.Value = make([]byte, len(e.Value))
copy(ne.Value, e.Value)
b.Entries = append(b.Entries, &ne)

var np valuePointer
np.Fid = uint32(newlf.fid)
np.Len = uint32(8 + len(e.Key) + len(e.Value) + 1)
np.Offset = uint64(buf.Len())
b.Ptrs = append(b.Ptrs, np)

e.EncodeTo(&buf) // This would be written to file. Note the usage of e, not ne.

} else {
y.Fatalf("This shouldn't happen. Latest Pointer:%+v. Meta:%v.", vp, meta)
}
Expand All @@ -195,23 +195,26 @@ func (lf *valueLog) move(f *logFile, newlf *logFile) {
fe(e)
})

n, err := newlf.fd.Write(buf.Bytes())
fmt.Printf("NEWFD: %d Bytes written: %d. Error: %v\n", newlf.fid, n, err)
y.Check(err)
newlf.doneWriting()

// Remove f from files. Push newlf to files.
lf.swapFiles(f, newlf)

fmt.Printf("block has %d entries\n", len(b.Entries))
lf.kv.writeToLSM(&b)
// This should NOT update the value log offset, because we still have
// an older fid doing newer writes. So, when that finishes, the offset would automatically
// jump through the newlf here, to a newer log file with fid > newlf.fid.
b.Wg.Add(1)
vlog.kv.writeCh <- b
b.Wg.Wait()

// Entries written to LSM. Remove the older file now.
rem := lf.fpath(f.fid)
lf.elog.Printf("Removing %s", rem)
{
vlog.RLock()
idx := sort.Search(len(vlog.files), func(idx int) bool {
return vlog.files[idx].fid >= f.fid
})
if idx == len(vlog.files) || vlog.files[idx].fid != f.fid {
y.Fatalf("Unable to find fid: %d", f.fid)
}
vlog.files = append(vlog.files[:idx], vlog.files[idx+1:]...)
vlog.RUnlock()
}

rem := vlog.fpath(f.fid)
vlog.elog.Printf("Removing %s", rem)
y.Check(os.Remove(rem))
}

Expand Down Expand Up @@ -297,26 +300,6 @@ func (l *valueLog) fpath(fid int32) string {
return fmt.Sprintf("%s/%06d.vlog", l.dirPath, fid)
}

func (l *valueLog) swapFiles(rem *logFile, add *logFile) {
rem.Lock()
defer rem.Unlock()
rem.fd.Close()

l.Lock()
defer l.Unlock()

idx := sort.Search(len(l.files), func(idx int) bool {
return l.files[idx].fid >= rem.fid
})
if idx == len(l.files) || l.files[idx].fid != rem.fid {
y.Fatalf("Unable to find fid: %d\n", rem.fid)
}
l.files[idx] = add
sort.Slice(l.files, func(i, j int) bool {
return l.files[i].fid < l.files[i].fid
})
}

func (l *valueLog) openOrCreateFiles() {
files, err := ioutil.ReadDir(l.dirPath)
y.Check(err)
Expand Down Expand Up @@ -438,6 +421,20 @@ func (l *valueLog) Write(blocks []*block) {
l.elog.Printf("Done")
curlf.offset += int64(n)
l.buf.Reset()

if curlf.offset > LogSize {
var err error
curlf.doneWriting()

newlf := &logFile{fid: atomic.AddInt32(&l.maxFid, 1), offset: 0}
newlf.fd, err = y.OpenSyncedFile(l.fpath(newlf.fid), l.opt.SyncWrites)
y.Check(err)

l.Lock()
l.files = append(l.files, newlf)
l.Unlock()
curlf = newlf
}
}

for i := range blocks {
Expand All @@ -453,24 +450,15 @@ func (l *valueLog) Write(blocks []*block) {
b.Ptrs = append(b.Ptrs, p)

e.EncodeTo(&l.buf)
if p.Offset > uint64(LogSize) {
toDisk()
}
}
}
toDisk()

// Acquire mutex locks around this manipulation, so that the reads don't try to use
// an invalid file descriptor.
if curlf.offset > LogSize {
var err error
curlf.doneWriting()

newlf := &logFile{fid: atomic.AddInt32(&l.maxFid, 1), offset: 0}
newlf.fd, err = y.OpenSyncedFile(l.fpath(newlf.fid), l.opt.SyncWrites)
y.Check(err)

l.Lock()
l.files = append(l.files, newlf)
l.Unlock()
}
}

// Write batches the write of an array of entries to value log.
Expand Down Expand Up @@ -523,99 +511,122 @@ func (l *valueLog) Read(ctx context.Context, p valuePointer) (e Entry, err error

func (l *valueLog) runGCInLoop() {
defer l.closer.Done()
if l.opt.DoNotRunValueGC {
defer func() {
fmt.Printf("Stopping runGCInLoop. Signal: %v", l.closer.GotSignal())
}()

if l.opt.ValueGCThreshold == 0.0 {
fmt.Println("l.opt.ValueGCThreshold = 0.0")
return
}

tick := time.NewTicker(10 * time.Second)
for {
select {
case <-tick.C:
l.doRunGC()
case <-l.closer.HasBeenClosed():
fmt.Println("has been closed")
return
case <-tick.C:
l.doRunGC()
}
}
}

func (l *valueLog) doRunGC() {
var lf *logFile
func (l *valueLog) pickLog() *logFile {
l.RLock()
defer l.RUnlock()
if len(l.files) <= 1 {
fmt.Println("Need at least 2 value log files to run GC.")
l.RUnlock()
return
return nil
}
// This file shouldn't be being written to.
lfi := rand.Intn(len(l.files) - 1)
lf = l.files[lfi]
l.RUnlock()
lfi := rand.Intn(len(l.files))
if lfi > 0 {
lfi = rand.Intn(lfi) // Another level of rand to favor smaller fids.
}
return l.files[lfi]
}

func (vlog *valueLog) doRunGC() {
lf := vlog.pickLog()
if lf == nil {
return
}
fmt.Printf("Picked fid: %d for GC\n", lf.fid)

reason := make(map[string]int)
tr := trace.New("badger", "value-gc")
ctx := trace.NewContext(context.Background(), tr)
count := 0

y.AssertTrue(vlog.kv != nil)
err := lf.iterate(0, func(e Entry) {
count++
if count%1000 == 0 {
time.Sleep(time.Millisecond)
}

esz := len(e.Key) + len(e.Value) + 1
vptr, meta := l.kv.get(ctx, e.Key)
vptr, meta := vlog.kv.get(ctx, e.Key)

if (meta & BitDelete) > 0 {
// Key has been deleted. Discard.
reason["discard"] += esz
reason["discard-deleted"] += esz

} else if (meta & BitValuePointer) == 0 {
return
}
if (meta & BitValuePointer) == 0 {
// Value is stored alongside key. Discard.
reason["discard"] += esz
reason["discard-value-alongside"] += esz
return
}

// Value is still present in value log.
y.AssertTrue(len(vptr) > 0)
var vp valuePointer
vp.Decode(vptr)

if int32(vp.Fid) > lf.fid {
// Value is present in a later log. Discard.
reason["discard"] += esz
reason["discard-new-value-higher-fid"] += esz
return
}
if int64(vp.Offset) > e.Offset {
// Value is present in a later offset, but in the same log.
reason["discard"] += esz
reason["discard-new-value-higher-offset"] += esz
return
}
if int32(vp.Fid) == lf.fid && int64(vp.Offset) == e.Offset {
// This is still the active entry. This would need to be rewritten.
reason["keep"] += esz

} else {
// Value is still present in value log.
y.AssertTrue(len(vptr) > 0)
var vp valuePointer
vp.Decode(vptr)

if int32(vp.Fid) > lf.fid {
// Value is present in a later log. Discard.
reason["discard"] += esz
reason["discard-new-value-higher-fid"] += esz

} else if int64(vp.Offset) > e.Offset {
// Value is present in a later offset, but in the same log.
reason["discard"] += esz
reason["discard-new-value-higher-offset"] += esz

} else if int32(vp.Fid) == lf.fid && int64(vp.Offset) == e.Offset {
// This is still the active entry. This would need to be rewritten.
reason["keep"] += esz

} else {
for k, v := range reason {
fmt.Printf("%s = %d\n", k, v)
}
ne, err := l.Read(context.Background(), vp)
y.Check(err)
ne.Offset = int64(vp.Offset)
ne.print("Latest Entry in LSM")
e.print("Latest Entry in Log")
y.Fatalf("This shouldn't happen. Latest Pointer:%+v. Meta:%v.", vp, meta)
for k, v := range reason {
fmt.Printf("%s = %d\n", k, v)
}
ne, err := vlog.Read(context.Background(), vp)
y.Check(err)
ne.Offset = int64(vp.Offset)
ne.print("Latest Entry in LSM")
e.print("Latest Entry in Log")
y.Fatalf("This shouldn't happen. Latest Pointer:%+v. Meta:%v.", vp, meta)
}
})

y.Checkf(err, "While iterating for RunGC.")
for k, v := range reason {
fmt.Printf("%s = %d\n", k, v)
}
fmt.Printf("\nFid: %d Keep: %d. Discard: %d\n", lfi, reason["keep"]/M, reason["discard"]/M)
if reason["keep"]/M >= 800 {
fmt.Printf("Fid: %d Keep: %d. Discard: %d\n\n", lf.fid, reason["keep"]/M, reason["discard"]/M)
if reason["keep"] >= int(vlog.opt.ValueGCThreshold*float64(lf.size)) {
return
}

newlf := &logFile{fid: atomic.AddInt32(&l.maxFid, 1), offset: 0}
newlf.fd, err = y.OpenSyncedFile(l.fpath(newlf.fid), false)
y.Check(err)
fmt.Printf("REWRITING VLOG %d to NEW %d\n", lf.fid, newlf.fid)
l.move(lf, newlf)
l.elog.Printf("Done rewriting.")
fmt.Printf("REWRITING VLOG %d\n", lf.fid)
vlog.move(lf)
fmt.Println("REWRITE DONE")
vlog.elog.Printf("Done rewriting.")
}
Loading

0 comments on commit ddf126d

Please sign in to comment.