Skip to content

Commit

Permalink
Allow setting TTL and User metadata at the same time.
Browse files Browse the repository at this point in the history
We expose a public struct Entry, which can be used to set the key,
value, user metadata and TTL all at the same time.
  • Loading branch information
deepakjois committed Nov 30, 2017
1 parent bc45f5c commit 5bc2e16
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 56 deletions.
6 changes: 3 additions & 3 deletions backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@ func (db *DB) Backup(w io.Writer, since uint64) (uint64, error) {
func (db *DB) Load(r io.Reader) error {
br := bufio.NewReaderSize(r, 16<<10)
unmarshalBuf := make([]byte, 1<<10)
var entries []*entry
var entries []*Entry
var wg sync.WaitGroup
errChan := make(chan error, 1)

// func to check for pending error before sending off a batch for writing
batchSetAsyncIfNoErr := func(entries []*entry) error {
batchSetAsyncIfNoErr := func(entries []*Entry) error {
select {
case err := <-errChan:
return err
Expand Down Expand Up @@ -118,7 +118,7 @@ func (db *DB) Load(r io.Reader) error {
if err = e.Unmarshal(unmarshalBuf[:sz]); err != nil {
return err
}
entries = append(entries, &entry{
entries = append(entries, &Entry{
Key: y.KeyWithTs(e.Key, e.Version),
Value: e.Value,
UserMeta: e.UserMeta[0],
Expand Down
24 changes: 12 additions & 12 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ const (
kvWriteChCapacity = 1000
)

func replayFunction(out *DB) func(entry, valuePointer) error {
func replayFunction(out *DB) func(Entry, valuePointer) error {
type txnEntry struct {
nk []byte
v y.ValueStruct
Expand All @@ -96,7 +96,7 @@ func replayFunction(out *DB) func(entry, valuePointer) error {
}

first := true
return func(e entry, vp valuePointer) error { // Function for replaying.
return func(e Entry, vp valuePointer) error { // Function for replaying.
if first {
out.elog.Printf("First key=%s\n", e.Key)
}
Expand Down Expand Up @@ -482,7 +482,7 @@ var requestPool = sync.Pool{
},
}

func (db *DB) shouldWriteValueToLSM(e entry) bool {
func (db *DB) shouldWriteValueToLSM(e Entry) bool {
return len(e.Value) < db.opt.ValueThreshold
}

Expand Down Expand Up @@ -567,7 +567,7 @@ func (db *DB) writeRequests(reqs []*request) error {
return nil
}

func (db *DB) sendToWriteCh(entries []*entry) (*request, error) {
func (db *DB) sendToWriteCh(entries []*Entry) (*request, error) {
var count, size int64
for _, e := range entries {
size += int64(e.estimateSize(db.opt.ValueThreshold))
Expand Down Expand Up @@ -652,7 +652,7 @@ func (db *DB) doWrites(lc *y.Closer) {
// batchSet applies a list of badger.Entry. If a request level error occurs it
// will be returned.
// Check(kv.BatchSet(entries))
func (db *DB) batchSet(entries []*entry) error {
func (db *DB) batchSet(entries []*Entry) error {
req, err := db.sendToWriteCh(entries)
if err != nil {
return err
Expand All @@ -671,7 +671,7 @@ func (db *DB) batchSet(entries []*entry) error {
// err := kv.BatchSetAsync(entries, func(err error)) {
// Check(err)
// }
func (db *DB) batchSetAsync(entries []*entry, f func(error)) error {
func (db *DB) batchSetAsync(entries []*Entry, f func(error)) error {
req, err := db.sendToWriteCh(entries)
if err != nil {
return err
Expand Down Expand Up @@ -881,7 +881,7 @@ func (db *DB) purgeVersionsBelow(txn *Txn, key []byte, ts uint64) error {
opts.PrefetchValues = false
it := txn.NewIterator(opts)

var entries []*entry
var entries []*Entry

for it.Seek(key); it.ValidForPrefix(key); it.Next() {
item := it.Item()
Expand All @@ -891,7 +891,7 @@ func (db *DB) purgeVersionsBelow(txn *Txn, key []byte, ts uint64) error {

// Found an older version. Mark for deletion
entries = append(entries,
&entry{
&Entry{
Key: y.KeyWithTs(key, item.version),
meta: bitDelete,
})
Expand All @@ -913,14 +913,14 @@ func (db *DB) PurgeOlderVersions() error {
opts.PrefetchValues = false
it := txn.NewIterator(opts)

var entries []*entry
var entries []*Entry
var lastKey []byte
var count int
var wg sync.WaitGroup
errChan := make(chan error, 1)

// func to check for pending error before sending off a batch for writing
batchSetAsyncIfNoErr := func(entries []*entry) error {
batchSetAsyncIfNoErr := func(entries []*Entry) error {
select {
case err := <-errChan:
return err
Expand All @@ -946,7 +946,7 @@ func (db *DB) PurgeOlderVersions() error {
}
// Found an older version. Mark for deletion
entries = append(entries,
&entry{
&Entry{
Key: y.KeyWithTs(lastKey, item.version),
meta: bitDelete,
})
Expand All @@ -959,7 +959,7 @@ func (db *DB) PurgeOlderVersions() error {
return err
}
count = 0
entries = []*entry{}
entries = []*Entry{}
}
}

Expand Down
10 changes: 5 additions & 5 deletions structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ func (h *header) Decode(buf []byte) {
h.userMeta = buf[17]
}

// entry provides Key, Value, UserMeta and ExpiresAt. This struct can be used by the user to set data.
type entry struct {
// Entry provides Key, Value, UserMeta and ExpiresAt. This struct can be used by the user to set data.
type Entry struct {
Key []byte
Value []byte
UserMeta byte
Expand All @@ -88,15 +88,15 @@ type entry struct {
offset uint32
}

func (e *entry) estimateSize(threshold int) int {
func (e *Entry) estimateSize(threshold int) int {
if len(e.Value) < threshold {
return len(e.Key) + len(e.Value) + 2 // Meta, UserMeta
}
return len(e.Key) + 12 + 2 // 12 for ValuePointer, 2 for metas.
}

// Encodes e to buf. Returns number of bytes written.
func encodeEntry(e *entry, buf *bytes.Buffer) (int, error) {
func encodeEntry(e *Entry, buf *bytes.Buffer) (int, error) {
h := header{
klen: uint32(len(e.Key)),
vlen: uint32(len(e.Value)),
Expand Down Expand Up @@ -126,7 +126,7 @@ func encodeEntry(e *entry, buf *bytes.Buffer) (int, error) {
return len(headerEnc) + len(e.Key) + len(e.Value) + len(crcBuf), nil
}

func (e entry) print(prefix string) {
func (e Entry) print(prefix string) {
fmt.Printf("%s Key: %s Meta: %d UserMeta: %d Offset: %d len(val)=%d",
prefix, e.Key, e.meta, e.UserMeta, e.offset, len(e.Value))
}
32 changes: 17 additions & 15 deletions transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ type Txn struct {
reads []uint64 // contains fingerprints of keys read.
writes []uint64 // contains fingerprints of keys written.

pendingWrites map[string]*entry // cache stores any writes done by txn.
pendingWrites map[string]*Entry // cache stores any writes done by txn.

db *DB
callbacks []func()
Expand All @@ -196,7 +196,7 @@ type Txn struct {
}

type pendingWritesIterator struct {
entries []*entry
entries []*Entry
nextIdx int
readTs uint64
reversed bool
Expand Down Expand Up @@ -251,7 +251,7 @@ func (txn *Txn) newPendingWritesIterator(reversed bool) *pendingWritesIterator {
if !txn.update || len(txn.pendingWrites) == 0 {
return nil
}
entries := make([]*entry, 0, len(txn.pendingWrites))
entries := make([]*Entry, 0, len(txn.pendingWrites))
for _, e := range txn.pendingWrites {
entries = append(entries, e)
}
Expand All @@ -270,7 +270,7 @@ func (txn *Txn) newPendingWritesIterator(reversed bool) *pendingWritesIterator {
}
}

func (txn *Txn) checkSize(e *entry) error {
func (txn *Txn) checkSize(e *Entry) error {
count := txn.count + 1
// Extra bytes for version in key.
size := txn.size + int64(e.estimateSize(txn.db.opt.ValueThreshold)) + 10
Expand All @@ -286,32 +286,34 @@ func (txn *Txn) checkSize(e *entry) error {
// It will return ErrReadOnlyTxn if update flag was set to false when creating the
// transaction.
func (txn *Txn) Set(key, val []byte) error {
e := &entry{
e := &Entry{
Key: key,
Value: val,
}
return txn.setEntry(e)
return txn.SetEntry(e)
}

// SetWithMeta adds a key-value pair to the database, along with a metadata
// byte. This byte is stored alongside the key, and can be used as an aid to
// interpret the value or store other contextual bits corresponding to the
// key-value pair.
func (txn *Txn) SetWithMeta(key, val []byte, meta byte) error {
e := &entry{Key: key, Value: val, UserMeta: meta}
return txn.setEntry(e)
e := &Entry{Key: key, Value: val, UserMeta: meta}
return txn.SetEntry(e)
}

// SetWithTTL adds a key-value pair to the database, along with a time-to-live
// (TTL) setting. A key stored with with a TTL would automatically expire after
// the time has elapsed , and be eligible for garbage collection.
func (txn *Txn) SetWithTTL(key, val []byte, dur time.Duration) error {
expire := time.Now().Add(dur).Unix()
e := &entry{Key: key, Value: val, ExpiresAt: uint64(expire)}
return txn.setEntry(e)
e := &Entry{Key: key, Value: val, ExpiresAt: uint64(expire)}
return txn.SetEntry(e)
}

func (txn *Txn) setEntry(e *entry) error {
// SetEntry takes an Entry struct and adds the key-value pair in the struct, along
// with other metadata to the database.
func (txn *Txn) SetEntry(e *Entry) error {
switch {
case !txn.update:
return ErrReadOnlyTxn
Expand Down Expand Up @@ -348,7 +350,7 @@ func (txn *Txn) Delete(key []byte) error {
return exceedsMaxKeySizeError(key)
}

e := &entry{
e := &Entry{
Key: key,
meta: bitDelete,
}
Expand Down Expand Up @@ -474,15 +476,15 @@ func (txn *Txn) Commit(callback func(error)) error {
return ErrConflict
}

entries := make([]*entry, 0, len(txn.pendingWrites)+1)
entries := make([]*Entry, 0, len(txn.pendingWrites)+1)
for _, e := range txn.pendingWrites {
// Suffix the keys with commit ts, so the key versions are sorted in
// descending order of commit timestamp.
e.Key = y.KeyWithTs(e.Key, commitTs)
e.meta |= bitTxn
entries = append(entries, e)
}
e := &entry{
e := &Entry{
Key: y.KeyWithTs(txnKey, commitTs),
Value: []byte(strconv.FormatUint(commitTs, 10)),
meta: bitFinTxn,
Expand Down Expand Up @@ -532,7 +534,7 @@ func (db *DB) NewTransaction(update bool) *Txn {
size: int64(len(txnKey) + 10), // Some buffer for the extra entry.
}
if update {
txn.pendingWrites = make(map[string]*entry)
txn.pendingWrites = make(map[string]*Entry)
txn.db.orc.addRef()
}
return txn
Expand Down
2 changes: 1 addition & 1 deletion transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,7 @@ func TestIteratorAllVersionsButDeleted(t *testing.T) {
err = db.View(func(txn *Txn) error {
item, err := txn.Get([]byte("answer1"))
require.NoError(t, err)
err = txn.db.batchSet([]*entry{
err = txn.db.batchSet([]*Entry{
{
Key: y.KeyWithTs(item.key, item.version),
meta: bitDelete,
Expand Down
20 changes: 10 additions & 10 deletions value.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (lf *logFile) sync() error {

var errStop = errors.New("Stop iteration")

type logEntry func(e entry, vp valuePointer) error
type logEntry func(e Entry, vp valuePointer) error

// iterate iterates over log file. It doesn't not allocate new memory for every kv pair.
// Therefore, the kv pair is only valid for the duration of fn call.
Expand Down Expand Up @@ -184,7 +184,7 @@ func (vlog *valueLog) iterate(lf *logFile, offset uint32, fn logEntry) error {
return err
}

var e entry
var e Entry
e.offset = recordOffset
h.Decode(hbuf[:])
if h.klen > maxKeySize {
Expand Down Expand Up @@ -268,12 +268,12 @@ func (vlog *valueLog) rewrite(f *logFile) error {
defer elog.Finish()
elog.Printf("Rewriting fid: %d", f.fid)

wb := make([]*entry, 0, 1000)
wb := make([]*Entry, 0, 1000)
var size int64

y.AssertTrue(vlog.kv != nil)
var count int
fe := func(e entry) error {
fe := func(e Entry) error {
count++
if count%10000 == 0 {
elog.Printf("Processing entry %d", count)
Expand Down Expand Up @@ -302,7 +302,7 @@ func (vlog *valueLog) rewrite(f *logFile) error {
}
if vp.Fid == f.fid && vp.Offset == e.offset {
// This new entry only contains the key, and a pointer to the value.
ne := new(entry)
ne := new(Entry)
ne.meta = 0 // Remove all bits.
ne.UserMeta = e.UserMeta
ne.Key = make([]byte, len(e.Key))
Expand All @@ -325,7 +325,7 @@ func (vlog *valueLog) rewrite(f *logFile) error {
return nil
}

err := vlog.iterate(f, 0, func(e entry, vp valuePointer) error {
err := vlog.iterate(f, 0, func(e Entry, vp valuePointer) error {
return fe(e)
})
if err != nil {
Expand Down Expand Up @@ -631,7 +631,7 @@ func (vlog *valueLog) Replay(ptr valuePointer, fn logEntry) error {

type request struct {
// Input values
Entries []*entry
Entries []*Entry
// Output values and wait group stuff below
Ptrs []valuePointer
Wg sync.WaitGroup
Expand Down Expand Up @@ -786,7 +786,7 @@ func (vlog *valueLog) readValueBytes(vp valuePointer) ([]byte, func(), error) {
}

// Test helper
func valueBytesToEntry(buf []byte) (e entry) {
func valueBytesToEntry(buf []byte) (e Entry) {
var h header
h.Decode(buf)
n := uint32(headerBufSize)
Expand Down Expand Up @@ -841,7 +841,7 @@ func (vlog *valueLog) pickLog(head valuePointer) *logFile {
return vlog.filesMap[fids[idx]]
}

func discardEntry(e entry, vs y.ValueStruct) bool {
func discardEntry(e Entry, vs y.ValueStruct) bool {
if vs.Version != y.ParseTs(e.Key) {
// Version not found. Discard.
return true
Expand Down Expand Up @@ -892,7 +892,7 @@ func (vlog *valueLog) doRunGC(gcThreshold float64, head valuePointer) (err error

start := time.Now()
y.AssertTrue(vlog.kv != nil)
err = vlog.iterate(lf, 0, func(e entry, vp valuePointer) error {
err = vlog.iterate(lf, 0, func(e Entry, vp valuePointer) error {
esz := float64(vp.Len) / (1 << 20) // in MBs. +4 for the CAS stuff.
skipped += esz
if skipped < skipFirstM {
Expand Down
Loading

0 comments on commit 5bc2e16

Please sign in to comment.