diff --git a/README.md b/README.md index 7f2952349..881269287 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,7 @@ We are currently gearing up for a [v1.0 release][v1-milestone]. - [Read-write transactions](#read-write-transactions) - [Managing transactions manually](#managing-transactions-manually) + [Using key/value pairs](#using-keyvalue-pairs) + + [Setting Time To Live(TTL) and User Metadata on Keys](#setting-time-to-livettl-and-user-metadata-on-keys) + [Iterating over keys](#iterating-over-keys) - [Prefix scans](#prefix-scans) - [Key-only iteration](#key-only-iteration) @@ -150,7 +151,7 @@ if err != nil { defer txn.Discard() // Use the transaction... -err := txn.Set([]byte("answer"), []byte("42"), 0) +err := txn.Set([]byte("answer"), []byte("42")) if err != nil { return err } @@ -178,7 +179,7 @@ To save a key/value pair, use the `Txn.Set()` method: ```go err := db.Update(func(txn *badger.Txn) error { - err := txn.Set([]byte("answer"), []byte("42"), 0) + err := txn.Set([]byte("answer"), []byte("42")) return err }) ``` @@ -209,6 +210,17 @@ then you must use `copy()` to copy it to another byte slice. Use the `Txn.Delete()` method to delete a key. +### Setting Time To Live(TTL) and User Metadata on Keys +Badger allows setting an optional Time to Live (TTL) value on keys. Once the TTL has +elapsed, the key will no longer be retrievable and will be eligible for garbage +collection. A TTL can be set as a `time.Duration` value using the `Txn.SetWithTTL()` +API method. + +An optional user metadata value can be set on each key. A user metadata value +is represented by a single byte. It can be used to set certain bits along +with the key to aid in interpreting or decoding the key-value pair. User +metadata can be set using the `Txn.SetWithMeta()` API method. + ### Iterating over keys To iterate over keys, we can use an `Iterator`, which can be obtained using the `Txn.NewIterator()` method. @@ -386,6 +398,7 @@ Values in SSD-conscious Storage][wisckey]_. | Pure Go (no Cgo) | Yes | No | Yes | | Transactions | Yes, ACID, concurrent with SSI3 | Yes (but non-ACID) | Yes, ACID | | Snapshots | Yes | Yes | Yes | +| TTL support | Yes | Yes | No | 1 The [WISCKEY paper][wisckey] (on which Badger is based) saw big wins with separating values from keys, significantly reducing the write diff --git a/backup.go b/backup.go index 8f8607e75..6b89ef6d1 100644 --- a/backup.go +++ b/backup.go @@ -48,10 +48,11 @@ func (db *DB) Backup(w io.Writer, since uint64) (uint64, error) { } entry := &protos.KVPair{ - Key: y.Copy(item.Key()), - Value: y.Copy(val), - UserMeta: []byte{item.UserMeta()}, - Version: item.Version(), + Key: y.Copy(item.Key()), + Value: y.Copy(val), + UserMeta: []byte{item.UserMeta()}, + Version: item.Version(), + ExpiresAt: item.ExpiresAt(), } // Write entries to disk @@ -118,9 +119,10 @@ func (db *DB) Load(r io.Reader) error { return err } entries = append(entries, &entry{ - Key: y.KeyWithTs(e.Key, e.Version), - Value: e.Value, - UserMeta: e.UserMeta[0], + Key: y.KeyWithTs(e.Key, e.Version), + Value: e.Value, + UserMeta: e.UserMeta[0], + ExpiresAt: e.ExpiresAt, }) if len(entries) == 1000 { diff --git a/backup_test.go b/backup_test.go index ea926d3cf..74755b509 100644 --- a/backup_test.go +++ b/backup_test.go @@ -44,7 +44,7 @@ func TestDumpLoad(t *testing.T) { err = db.Update(func(txn *Txn) error { for _, e := range entries { - err := txn.Set(e.key, e.val, e.userMeta) + err := txn.SetWithMeta(e.key, e.val, e.userMeta) if err != nil { return err } diff --git a/db.go b/db.go index c75ba3ba9..76c679802 100644 --- a/db.go +++ b/db.go @@ -109,7 +109,7 @@ func replayFunction(out *DB) func(entry, valuePointer) error { nk := make([]byte, len(e.Key)) copy(nk, e.Key) var nv []byte - meta := e.Meta + meta := e.meta if out.shouldWriteValueToLSM(e) { nv = make([]byte, len(e.Value)) copy(nv, e.Value) @@ -125,7 +125,7 @@ func replayFunction(out *DB) func(entry, valuePointer) error { UserMeta: e.UserMeta, } - if e.Meta&bitFinTxn > 0 { + if e.meta&bitFinTxn > 0 { txnTs, err := strconv.ParseUint(string(e.Value), 10, 64) if err != nil { return errors.Wrapf(err, "Unable to parse txn fin: %q", e.Value) @@ -139,7 +139,7 @@ func replayFunction(out *DB) func(entry, valuePointer) error { txn = txn[:0] lastCommit = 0 - } else if e.Meta&bitTxn == 0 { + } else if e.meta&bitTxn == 0 { // This entry is from a rewrite. toLSM(nk, v) @@ -488,23 +488,25 @@ func (db *DB) writeToLSM(b *request) error { } for i, entry := range b.Entries { - if entry.Meta&bitFinTxn != 0 { + if entry.meta&bitFinTxn != 0 { continue } if db.shouldWriteValueToLSM(*entry) { // Will include deletion / tombstone case. db.mt.Put(entry.Key, y.ValueStruct{ - Value: entry.Value, - Meta: entry.Meta, - UserMeta: entry.UserMeta, + Value: entry.Value, + Meta: entry.meta, + UserMeta: entry.UserMeta, + ExpiresAt: entry.ExpiresAt, }) } else { var offsetBuf [vptrSize]byte db.mt.Put(entry.Key, y.ValueStruct{ - Value: b.Ptrs[i].Encode(offsetBuf[:]), - Meta: entry.Meta | bitValuePointer, - UserMeta: entry.UserMeta, + Value: b.Ptrs[i].Encode(offsetBuf[:]), + Meta: entry.meta | bitValuePointer, + UserMeta: entry.UserMeta, + ExpiresAt: entry.ExpiresAt, }) } } @@ -561,6 +563,28 @@ func (db *DB) writeRequests(reqs []*request) error { return nil } +func (db *DB) sendToWriteCh(entries []*entry) (*request, error) { + var count, size int64 + for _, e := range entries { + size += int64(db.opt.estimateSize(e)) + count++ + } + if count >= db.opt.maxBatchCount || size >= db.opt.maxBatchSize { + return nil, ErrTxnTooBig + } + + // We can only service one request because we need each txn to be stored in a contigous section. + // Txns should not interleave among other txns or rewrites. + req := requestPool.Get().(*request) + req.Entries = entries + req.Wg = sync.WaitGroup{} + req.Wg.Add(1) + db.writeCh <- req // Handled in doWrites. + y.NumPuts.Add(int64(len(entries))) + + return req, nil +} + func (db *DB) doWrites(lc *y.Closer) { defer lc.Done() pendingCh := make(chan struct{}, 1) @@ -621,28 +645,6 @@ func (db *DB) doWrites(lc *y.Closer) { } } -func (db *DB) sendToWriteCh(entries []*entry) (*request, error) { - var count, size int64 - for _, e := range entries { - size += int64(db.opt.estimateSize(e)) - count++ - } - if count >= db.opt.maxBatchCount || size >= db.opt.maxBatchSize { - return nil, ErrTxnTooBig - } - - // We can only service one request because we need each txn to be stored in a contigous section. - // Txns should not interleave among other txns or rewrites. - req := requestPool.Get().(*request) - req.Entries = entries - req.Wg = sync.WaitGroup{} - req.Wg.Add(1) - db.writeCh <- req - y.NumPuts.Add(int64(len(entries))) - - return req, nil -} - // batchSet applies a list of badger.Entry. If a request level error occurs it // will be returned. // Check(kv.BatchSet(entries)) @@ -887,7 +889,7 @@ func (db *DB) purgeVersionsBelow(txn *Txn, key []byte, ts uint64) error { entries = append(entries, &entry{ Key: y.KeyWithTs(key, item.version), - Meta: bitDelete, + meta: bitDelete, }) db.vlog.updateGCStats(item) } @@ -942,7 +944,7 @@ func (db *DB) PurgeOlderVersions() error { entries = append(entries, &entry{ Key: y.KeyWithTs(lastKey, item.version), - Meta: bitDelete, + meta: bitDelete, }) db.vlog.updateGCStats(item) count++ diff --git a/db_test.go b/db_test.go index 432d4b498..c5f5be09e 100644 --- a/db_test.go +++ b/db_test.go @@ -28,6 +28,7 @@ import ( "sort" "sync" "testing" + "time" "github.com/dgraph-io/badger/y" "github.com/stretchr/testify/require" @@ -56,7 +57,7 @@ func getItemValue(t *testing.T, item *Item) (val []byte) { func txnSet(t *testing.T, kv *DB, key []byte, val []byte, meta byte) { txn := kv.NewTransaction(true) - require.NoError(t, txn.Set(key, val, meta)) + require.NoError(t, txn.SetWithMeta(key, val, meta)) require.NoError(t, txn.Commit(nil)) } @@ -89,7 +90,7 @@ func TestUpdateAndView(t *testing.T) { err = db.Update(func(txn *Txn) error { for i := 0; i < 10; i++ { - err := txn.Set([]byte(fmt.Sprintf("key%d", i)), []byte(fmt.Sprintf("val%d", i)), 0x00) + err := txn.Set([]byte(fmt.Sprintf("key%d", i)), []byte(fmt.Sprintf("val%d", i))) if err != nil { return err } @@ -289,7 +290,7 @@ func TestGetMore(t *testing.T) { for i := 0; i < n; i += m { txn := kv.NewTransaction(true) for j := i; j < i+m && j < n; j++ { - require.NoError(t, txn.Set(data(j), data(j), 0)) + require.NoError(t, txn.Set(data(j), data(j))) } require.NoError(t, txn.Commit(nil)) } @@ -311,8 +312,7 @@ func TestGetMore(t *testing.T) { for j := i; j < i+m && j < n; j++ { require.NoError(t, txn.Set(data(j), // Use a long value that will certainly exceed value threshold. - []byte(fmt.Sprintf("zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz%9d", j)), - 0x00)) + []byte(fmt.Sprintf("zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz%9d", j)))) } require.NoError(t, txn.Commit(nil)) } @@ -398,8 +398,7 @@ func TestExistsMore(t *testing.T) { txn := kv.NewTransaction(true) for j := i; j < i+m && j < n; j++ { require.NoError(t, txn.Set([]byte(fmt.Sprintf("%09d", j)), - []byte(fmt.Sprintf("%09d", j)), - 0x00)) + []byte(fmt.Sprintf("%09d", j)))) } require.NoError(t, txn.Commit(nil)) } @@ -694,11 +693,11 @@ func TestBigKeyValuePairs(t *testing.T) { small := make([]byte, 10) txn := kv.NewTransaction(true) - require.Regexp(t, regexp.MustCompile("Key.*exceeded"), txn.Set(bigK, small, 0)) - require.Regexp(t, regexp.MustCompile("Value.*exceeded"), txn.Set(small, bigV, 0)) + require.Regexp(t, regexp.MustCompile("Key.*exceeded"), txn.Set(bigK, small)) + require.Regexp(t, regexp.MustCompile("Value.*exceeded"), txn.Set(small, bigV)) - require.NoError(t, txn.Set(small, small, 0x00)) - require.Regexp(t, regexp.MustCompile("Key.*exceeded"), txn.Set(bigK, bigV, 0x00)) + require.NoError(t, txn.Set(small, small)) + require.Regexp(t, regexp.MustCompile("Key.*exceeded"), txn.Set(bigK, bigV)) require.NoError(t, kv.View(func(txn *Txn) error { _, err := txn.Get(small) @@ -775,7 +774,7 @@ func TestSetIfAbsentAsync(t *testing.T) { txn := kv.NewTransaction(true) _, err = txn.Get(bkey(i)) require.Equal(t, ErrKeyNotFound, err) - require.NoError(t, txn.Set(bkey(i), nil, byte(i%127))) + require.NoError(t, txn.SetWithMeta(bkey(i), nil, byte(i%127))) require.NoError(t, txn.Commit(f)) } @@ -858,7 +857,7 @@ func TestPurgeVersionsBelow(t *testing.T) { // Write 4 versions of the same key for i := 0; i < 4; i++ { err = db.Update(func(txn *Txn) error { - return txn.Set([]byte("answer"), []byte(fmt.Sprintf("%25d", i)), 0) + return txn.Set([]byte("answer"), []byte(fmt.Sprintf("%25d", i))) }) require.NoError(t, err) } @@ -915,12 +914,12 @@ func TestPurgeOlderVersions(t *testing.T) { // Write two versions of a key err = db.Update(func(txn *Txn) error { - return txn.Set([]byte("answer"), []byte("42"), 0) + return txn.Set([]byte("answer"), []byte("42")) }) require.NoError(t, err) err = db.Update(func(txn *Txn) error { - return txn.Set([]byte("answer"), []byte("43"), 0) + return txn.Set([]byte("answer"), []byte("43")) }) require.NoError(t, err) @@ -965,6 +964,54 @@ func TestPurgeOlderVersions(t *testing.T) { require.NoError(t, err) } +func TestExpiry(t *testing.T) { + dir, err := ioutil.TempDir("", "badger") + require.NoError(t, err) + defer os.RemoveAll(dir) + db, err := Open(getTestOptions(dir)) + require.NoError(t, err) + + // Write two keys, one with a TTL + err = db.Update(func(txn *Txn) error { + return txn.Set([]byte("answer1"), []byte("42")) + }) + require.NoError(t, err) + + err = db.Update(func(txn *Txn) error { + return txn.SetWithTTL([]byte("answer2"), []byte("43"), 1*time.Second) + }) + require.NoError(t, err) + + time.Sleep(2 * time.Second) + + // Verify that only unexpired key is found during iteration + err = db.View(func(txn *Txn) error { + _, err := txn.Get([]byte("answer1")) + require.NoError(t, err) + + _, err = txn.Get([]byte("answer2")) + require.Error(t, ErrKeyNotFound, err) + return nil + }) + require.NoError(t, err) + + // Verify that only one key is found during iteration + opts := DefaultIteratorOptions + opts.PrefetchValues = false + err = db.View(func(txn *Txn) error { + it := txn.NewIterator(opts) + var count int + for it.Rewind(); it.Valid(); it.Next() { + count++ + item := it.Item() + require.Equal(t, []byte("answer1"), item.Key()) + } + require.Equal(t, 1, count) + return nil + }) + require.NoError(t, err) +} + func ExampleOpen() { dir, err := ioutil.TempDir("", "badger") if err != nil { @@ -992,7 +1039,7 @@ func ExampleOpen() { } txn := db.NewTransaction(true) // Read-write txn - err = txn.Set([]byte("key"), []byte("value"), 0) + err = txn.Set([]byte("key"), []byte("value")) if err != nil { log.Fatal(err) } @@ -1052,7 +1099,7 @@ func ExampleTxn_NewIterator() { // Fill in 1000 items n := 1000 for i := 0; i < n; i++ { - err := txn.Set(bkey(i), bval(i), 0) + err := txn.Set(bkey(i), bval(i)) if err != nil { log.Fatal(err) } diff --git a/iterator.go b/iterator.go index b65546e08..25dc2095b 100644 --- a/iterator.go +++ b/iterator.go @@ -20,6 +20,7 @@ import ( "bytes" "fmt" "sync" + "time" "github.com/dgraph-io/badger/y" farm "github.com/dgryski/go-farm" @@ -34,19 +35,20 @@ const ( // Item is returned during iteration. Both the Key() and Value() output is only valid until // iterator.Next() is called. type Item struct { - status prefetchStatus - err error - wg sync.WaitGroup - db *DB - key []byte - vptr []byte - meta byte - userMeta byte - val []byte - slice *y.Slice // Used only during prefetching. - next *Item - version uint64 - txn *Txn + status prefetchStatus + err error + wg sync.WaitGroup + db *DB + key []byte + vptr []byte + meta byte // We need to store meta to know about bitValuePointer. + userMeta byte + expiresAt uint64 + val []byte + slice *y.Slice // Used only during prefetching. + next *Item + version uint64 + txn *Txn } // ToString returns a string representation of Item @@ -89,10 +91,6 @@ func (item *Item) hasValue() bool { // key not found return false } - if (item.meta & bitDelete) != 0 { - // Tombstone encountered. - return false - } return true } @@ -159,6 +157,12 @@ func (item *Item) UserMeta() byte { return item.userMeta } +// ExpiresAt returns a Unix time value indicating when the item will be +// considered expired. 0 indicates that the item will never expire. +func (item *Item) ExpiresAt() uint64 { + return item.expiresAt +} + // TODO: Switch this to use linked list container in Go. type list struct { head *Item @@ -304,6 +308,16 @@ func (it *Iterator) Next() { } } +func isDeletedOrExpired(vs y.ValueStruct) bool { + if vs.Meta&bitDelete > 0 { + return true + } + if vs.ExpiresAt == 0 { + return false + } + return vs.ExpiresAt <= uint64(time.Now().Unix()) +} + // parseItem is a complex function because it needs to handle both forward and reverse iteration // implementation. We store keys such that their versions are sorted in descending order. This makes // forward iteration efficient, but revese iteration complicated. This tradeoff is better because @@ -336,8 +350,8 @@ func (it *Iterator) parseItem() bool { } if it.opt.AllVersions { - // First check if value has been deleted - if mi.Value().Meta&bitDelete > 0 { + // First check if value has been expired. + if isDeletedOrExpired(mi.Value()) { mi.Next() return false } @@ -365,7 +379,7 @@ func (it *Iterator) parseItem() bool { FILL: // If deleted, advance and return. - if mi.Value().Meta&bitDelete > 0 { + if isDeletedOrExpired(mi.Value()) { mi.Next() return false } @@ -397,6 +411,7 @@ func (it *Iterator) fill(item *Item) { vs := it.iitr.Value() item.meta = vs.Meta item.userMeta = vs.UserMeta + item.expiresAt = vs.ExpiresAt item.version = y.ParseTs(it.iitr.Key()) item.key = y.Safecopy(item.key, y.ParseKey(it.iitr.Key())) diff --git a/manifest.go b/manifest.go index fef9ee959..4c64b2e80 100644 --- a/manifest.go +++ b/manifest.go @@ -211,7 +211,7 @@ func (mf *manifestFile) addChanges(changesParam []*protos.ManifestChange) error var magicText = [4]byte{'B', 'd', 'g', 'r'} // The magic version number. -const magicVersion = 2 +const magicVersion = 3 func helpRewrite(dir string, m *Manifest) (*os.File, int, error) { rewritePath := filepath.Join(dir, manifestRewriteFilename) diff --git a/protos/backup.pb.go b/protos/backup.pb.go index b53cbcdbe..13a9f6199 100644 --- a/protos/backup.pb.go +++ b/protos/backup.pb.go @@ -33,10 +33,11 @@ var _ = math.Inf const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package type KVPair struct { - Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` - Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` - UserMeta []byte `protobuf:"bytes,3,opt,name=userMeta,proto3" json:"userMeta,omitempty"` - Version uint64 `protobuf:"varint,4,opt,name=version,proto3" json:"version,omitempty"` + Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` + Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` + UserMeta []byte `protobuf:"bytes,3,opt,name=userMeta,proto3" json:"userMeta,omitempty"` + Version uint64 `protobuf:"varint,4,opt,name=version,proto3" json:"version,omitempty"` + ExpiresAt uint64 `protobuf:"varint,5,opt,name=expires_at,json=expiresAt,proto3" json:"expires_at,omitempty"` } func (m *KVPair) Reset() { *m = KVPair{} } @@ -72,6 +73,13 @@ func (m *KVPair) GetVersion() uint64 { return 0 } +func (m *KVPair) GetExpiresAt() uint64 { + if m != nil { + return m.ExpiresAt + } + return 0 +} + func init() { proto.RegisterType((*KVPair)(nil), "protos.KVPair") } @@ -113,9 +121,32 @@ func (m *KVPair) MarshalTo(dAtA []byte) (int, error) { i++ i = encodeVarintBackup(dAtA, i, uint64(m.Version)) } + if m.ExpiresAt != 0 { + dAtA[i] = 0x28 + i++ + i = encodeVarintBackup(dAtA, i, uint64(m.ExpiresAt)) + } return i, nil } +func encodeFixed64Backup(dAtA []byte, offset int, v uint64) int { + dAtA[offset] = uint8(v) + dAtA[offset+1] = uint8(v >> 8) + dAtA[offset+2] = uint8(v >> 16) + dAtA[offset+3] = uint8(v >> 24) + dAtA[offset+4] = uint8(v >> 32) + dAtA[offset+5] = uint8(v >> 40) + dAtA[offset+6] = uint8(v >> 48) + dAtA[offset+7] = uint8(v >> 56) + return offset + 8 +} +func encodeFixed32Backup(dAtA []byte, offset int, v uint32) int { + dAtA[offset] = uint8(v) + dAtA[offset+1] = uint8(v >> 8) + dAtA[offset+2] = uint8(v >> 16) + dAtA[offset+3] = uint8(v >> 24) + return offset + 4 +} func encodeVarintBackup(dAtA []byte, offset int, v uint64) int { for v >= 1<<7 { dAtA[offset] = uint8(v&0x7f | 0x80) @@ -143,6 +174,9 @@ func (m *KVPair) Size() (n int) { if m.Version != 0 { n += 1 + sovBackup(uint64(m.Version)) } + if m.ExpiresAt != 0 { + n += 1 + sovBackup(uint64(m.ExpiresAt)) + } return n } @@ -300,6 +334,25 @@ func (m *KVPair) Unmarshal(dAtA []byte) error { break } } + case 5: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field ExpiresAt", wireType) + } + m.ExpiresAt = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBackup + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.ExpiresAt |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipBackup(dAtA[iNdEx:]) @@ -429,14 +482,16 @@ var ( func init() { proto.RegisterFile("backup.proto", fileDescriptorBackup) } var fileDescriptorBackup = []byte{ - // 144 bytes of a gzipped FileDescriptorProto + // 167 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x49, 0x4a, 0x4c, 0xce, - 0x2e, 0x2d, 0xd0, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x03, 0x53, 0xc5, 0x4a, 0x69, 0x5c, - 0x6c, 0xde, 0x61, 0x01, 0x89, 0x99, 0x45, 0x42, 0x02, 0x5c, 0xcc, 0xd9, 0xa9, 0x95, 0x12, 0x8c, - 0x0a, 0x8c, 0x1a, 0x3c, 0x41, 0x20, 0xa6, 0x90, 0x08, 0x17, 0x6b, 0x59, 0x62, 0x4e, 0x69, 0xaa, - 0x04, 0x13, 0x58, 0x0c, 0xc2, 0x11, 0x92, 0xe2, 0xe2, 0x28, 0x2d, 0x4e, 0x2d, 0xf2, 0x4d, 0x2d, - 0x49, 0x94, 0x60, 0x06, 0x4b, 0xc0, 0xf9, 0x42, 0x12, 0x5c, 0xec, 0x65, 0xa9, 0x45, 0xc5, 0x99, - 0xf9, 0x79, 0x12, 0x2c, 0x0a, 0x8c, 0x1a, 0x2c, 0x41, 0x30, 0xae, 0x93, 0xc0, 0x89, 0x47, 0x72, - 0x8c, 0x17, 0x1e, 0xc9, 0x31, 0x3e, 0x78, 0x24, 0xc7, 0x38, 0xe3, 0xb1, 0x1c, 0x43, 0x12, 0xc4, - 0x05, 0xc6, 0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0xe0, 0xb7, 0xcb, 0x72, 0x98, 0x00, 0x00, 0x00, + 0x2e, 0x2d, 0xd0, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x03, 0x53, 0xc5, 0x4a, 0xad, 0x8c, + 0x5c, 0x6c, 0xde, 0x61, 0x01, 0x89, 0x99, 0x45, 0x42, 0x02, 0x5c, 0xcc, 0xd9, 0xa9, 0x95, 0x12, + 0x8c, 0x0a, 0x8c, 0x1a, 0x3c, 0x41, 0x20, 0xa6, 0x90, 0x08, 0x17, 0x6b, 0x59, 0x62, 0x4e, 0x69, + 0xaa, 0x04, 0x13, 0x58, 0x0c, 0xc2, 0x11, 0x92, 0xe2, 0xe2, 0x28, 0x2d, 0x4e, 0x2d, 0xf2, 0x4d, + 0x2d, 0x49, 0x94, 0x60, 0x06, 0x4b, 0xc0, 0xf9, 0x42, 0x12, 0x5c, 0xec, 0x65, 0xa9, 0x45, 0xc5, + 0x99, 0xf9, 0x79, 0x12, 0x2c, 0x0a, 0x8c, 0x1a, 0x2c, 0x41, 0x30, 0xae, 0x90, 0x2c, 0x17, 0x57, + 0x6a, 0x45, 0x41, 0x66, 0x51, 0x6a, 0x71, 0x7c, 0x62, 0x89, 0x04, 0x2b, 0x58, 0x92, 0x13, 0x2a, + 0xe2, 0x58, 0xe2, 0x24, 0x70, 0xe2, 0x91, 0x1c, 0xe3, 0x85, 0x47, 0x72, 0x8c, 0x0f, 0x1e, 0xc9, + 0x31, 0xce, 0x78, 0x2c, 0xc7, 0x90, 0x04, 0x71, 0xa1, 0x31, 0x20, 0x00, 0x00, 0xff, 0xff, 0xe7, + 0x3f, 0x3f, 0x95, 0xb8, 0x00, 0x00, 0x00, } diff --git a/protos/backup.proto b/protos/backup.proto index ff7ec5680..0f4e3d61e 100644 --- a/protos/backup.proto +++ b/protos/backup.proto @@ -24,4 +24,5 @@ message KVPair { bytes value = 2; bytes userMeta = 3; uint64 version = 4; + uint64 expires_at = 5; } \ No newline at end of file diff --git a/protos/manifest.pb.go b/protos/manifest.pb.go index 361e9c02a..d8db55f99 100644 --- a/protos/manifest.pb.go +++ b/protos/manifest.pb.go @@ -1,16 +1,6 @@ // Code generated by protoc-gen-gogo. DO NOT EDIT. // source: manifest.proto -/* - Package protos is a generated protocol buffer package. - - It is generated from these files: - manifest.proto - - It has these top-level messages: - ManifestChangeSet - ManifestChange -*/ package protos import proto "github.com/golang/protobuf/proto" @@ -24,12 +14,6 @@ var _ = proto.Marshal var _ = fmt.Errorf var _ = math.Inf -// This is a compile-time assertion to ensure that this generated file -// is compatible with the proto package it is being compiled against. -// A compilation error at this line likely means your copy of the -// proto package needs to be updated. -const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package - type ManifestChange_Operation int32 const ( diff --git a/structs.go b/structs.go index 6ceceffdd..841915207 100644 --- a/structs.go +++ b/structs.go @@ -47,40 +47,42 @@ func (p *valuePointer) Decode(b []byte) { // header is used in value log as a header before Entry. type header struct { - klen uint32 - vlen uint32 - meta byte - userMeta byte + klen uint32 + vlen uint32 + expiresAt uint64 + meta byte + userMeta byte } const ( - headerBufSize = 10 + headerBufSize = 18 ) func (h header) Encode(out []byte) { y.AssertTrue(len(out) >= headerBufSize) binary.BigEndian.PutUint32(out[0:4], h.klen) binary.BigEndian.PutUint32(out[4:8], h.vlen) - out[8] = h.meta - out[9] = h.userMeta + binary.BigEndian.PutUint64(out[8:16], h.expiresAt) + out[16] = h.meta + out[17] = h.userMeta } // Decodes h from buf. func (h *header) Decode(buf []byte) { h.klen = binary.BigEndian.Uint32(buf[0:4]) h.vlen = binary.BigEndian.Uint32(buf[4:8]) - h.meta = buf[8] - h.userMeta = buf[9] + h.expiresAt = binary.BigEndian.Uint64(buf[8:16]) + h.meta = buf[16] + h.userMeta = buf[17] } -// entry provides Key, Value and if required, CASCounterCheck to kv.BatchSet() API. -// If CASCounterCheck is provided, it would be compared against the current casCounter -// assigned to this key-value. Set be done on this key only if the counters match. +// entry provides Key, Value, UserMeta and ExpiresAt. This struct can be used by the user to set data. type entry struct { - Key []byte - Value []byte - Meta byte - UserMeta byte + Key []byte + Value []byte + UserMeta byte + ExpiresAt uint64 // time.Unix + meta byte // Fields maintained internally. offset uint32 @@ -95,11 +97,13 @@ func (e *entry) estimateSize(threshold int) int { // Encodes e to buf. Returns number of bytes written. func encodeEntry(e *entry, buf *bytes.Buffer) (int, error) { - var h header - h.klen = uint32(len(e.Key)) - h.vlen = uint32(len(e.Value)) - h.meta = e.Meta - h.userMeta = e.UserMeta + h := header{ + klen: uint32(len(e.Key)), + vlen: uint32(len(e.Value)), + expiresAt: e.ExpiresAt, + meta: e.meta, + userMeta: e.UserMeta, + } var headerEnc [headerBufSize]byte h.Encode(headerEnc[:]) @@ -124,5 +128,5 @@ func encodeEntry(e *entry, buf *bytes.Buffer) (int, error) { func (e entry) print(prefix string) { fmt.Printf("%s Key: %s Meta: %d UserMeta: %d Offset: %d len(val)=%d", - prefix, e.Key, e.Meta, e.UserMeta, e.offset, len(e.Value)) + prefix, e.Key, e.meta, e.UserMeta, e.offset, len(e.Value)) } diff --git a/table/builder.go b/table/builder.go index efa308400..7ce0d61f9 100644 --- a/table/builder.go +++ b/table/builder.go @@ -144,11 +144,7 @@ func (b *Builder) addHelper(key []byte, v y.ValueStruct) { b.buf.Write(hbuf[:]) b.buf.Write(diffKey) // We only need to store the key difference. - // This should be kept in sync with ValueStruct encode function. - b.buf.WriteByte(v.Meta) // Meta byte precedes actual value. - b.buf.WriteByte(v.UserMeta) - b.buf.Write(v.Value) - + v.EncodeTo(b.buf) b.counter++ // Increment number of keys added for this current block. } diff --git a/transaction.go b/transaction.go index b1d3c32ed..f4aed1e44 100644 --- a/transaction.go +++ b/transaction.go @@ -24,6 +24,7 @@ import ( "strconv" "sync" "sync/atomic" + "time" "github.com/dgraph-io/badger/y" farm "github.com/dgryski/go-farm" @@ -185,36 +186,53 @@ type Txn struct { discarded bool } -// Set sets the provided value for a given key. If key is not present, it is created. +// Set adds a key-value pair to the database. // -// Along with key and value, Set can also take an optional userMeta byte. This byte is stored -// alongside the key, and can be used as an aid to interpret the value or store other contextual -// bits corresponding to the key-value pair. -// -// This would fail with ErrReadOnlyTxn if update flag was set to false when creating the +// It will return ErrReadOnlyTxn if update flag was set to false when creating the // transaction. -func (txn *Txn) Set(key, val []byte, userMeta byte) error { - if !txn.update { +func (txn *Txn) Set(key, val []byte) error { + e := &entry{ + Key: key, + Value: val, + } + return txn.setEntry(e) +} + +// SetWithMeta adds a key-value pair to the database, along with a metadata +// byte. This byte is stored alongside the key, and can be used as an aid to +// interpret the value or store other contextual bits corresponding to the +// key-value pair. +func (txn *Txn) SetWithMeta(key, val []byte, meta byte) error { + e := &entry{Key: key, Value: val, UserMeta: meta} + return txn.setEntry(e) +} + +// SetWithTTL adds a key-value pair to the database, along with a time-to-live +// (TTL) setting. A key stored with with a TTL would automatically expire after +// the time has elapsed , and be eligible for garbage collection. +func (txn *Txn) SetWithTTL(key, val []byte, dur time.Duration) error { + expire := time.Now().Add(dur).Unix() + e := &entry{Key: key, Value: val, ExpiresAt: uint64(expire)} + return txn.setEntry(e) +} + +func (txn *Txn) setEntry(e *entry) error { + switch { + case !txn.update: return ErrReadOnlyTxn - } else if txn.discarded { + case txn.discarded: return ErrDiscardedTxn - } else if len(key) == 0 { + case len(e.Key) == 0: return ErrEmptyKey - } else if len(key) > maxKeySize { - return exceedsMaxKeySizeError(key) - } else if int64(len(val)) > txn.db.opt.ValueLogFileSize { - return exceedsMaxValueSizeError(val, txn.db.opt.ValueLogFileSize) + case len(e.Key) > maxKeySize: + return exceedsMaxKeySizeError(e.Key) + case int64(len(e.Value)) > txn.db.opt.ValueLogFileSize: + return exceedsMaxValueSizeError(e.Value, txn.db.opt.ValueLogFileSize) } - fp := farm.Fingerprint64(key) // Avoid dealing with byte arrays. + fp := farm.Fingerprint64(e.Key) // Avoid dealing with byte arrays. txn.writes = append(txn.writes, fp) - - e := &entry{ - Key: key, - Value: val, - UserMeta: userMeta, - } - txn.pendingWrites[string(key)] = e + txn.pendingWrites[string(e.Key)] = e return nil } @@ -237,7 +255,7 @@ func (txn *Txn) Delete(key []byte) error { e := &entry{ Key: key, - Meta: bitDelete, + meta: bitDelete, } txn.pendingWrites[string(key)] = e return nil @@ -256,7 +274,7 @@ func (txn *Txn) Get(key []byte) (item *Item, rerr error) { if txn.update { if e, has := txn.pendingWrites[string(key)]; has && bytes.Equal(key, e.Key) { // Fulfill from cache. - item.meta = e.Meta + item.meta = e.meta item.val = e.Value item.userMeta = e.UserMeta item.key = key @@ -279,7 +297,7 @@ func (txn *Txn) Get(key []byte) (item *Item, rerr error) { if vs.Value == nil && vs.Meta == 0 { return nil, ErrKeyNotFound } - if (vs.Meta & bitDelete) != 0 { + if isDeletedOrExpired(vs) { return nil, ErrKeyNotFound } @@ -354,13 +372,13 @@ func (txn *Txn) Commit(callback func(error)) error { // Suffix the keys with commit ts, so the key versions are sorted in // descending order of commit timestamp. e.Key = y.KeyWithTs(e.Key, commitTs) - e.Meta |= bitTxn + e.meta |= bitTxn entries = append(entries, e) } e := &entry{ Key: y.KeyWithTs(txnKey, commitTs), Value: []byte(strconv.FormatUint(commitTs, 10)), - Meta: bitFinTxn, + meta: bitFinTxn, } entries = append(entries, e) diff --git a/transaction_test.go b/transaction_test.go index 41d10c47f..dc4a775fd 100644 --- a/transaction_test.go +++ b/transaction_test.go @@ -41,7 +41,7 @@ func TestTxnSimple(t *testing.T) { for i := 0; i < 10; i++ { k := []byte(fmt.Sprintf("key=%d", i)) v := []byte(fmt.Sprintf("val=%d", i)) - txn.Set(k, v, 0) + txn.Set(k, v) } item, err := txn.Get([]byte("key=8")) @@ -67,7 +67,7 @@ func TestTxnVersions(t *testing.T) { for i := 1; i < 10; i++ { txn := kv.NewTransaction(true) - txn.Set(k, []byte(fmt.Sprintf("valversion=%d", i)), 0) + txn.Set(k, []byte(fmt.Sprintf("valversion=%d", i))) require.NoError(t, txn.Commit(nil)) require.Equal(t, uint64(i), kv.orc.readTs()) } @@ -179,8 +179,8 @@ func TestTxnWriteSkew(t *testing.T) { txn := kv.NewTransaction(true) defer txn.Discard() val := []byte(strconv.Itoa(100)) - txn.Set(ax, val, 0) - txn.Set(ay, val, 0) + txn.Set(ax, val) + txn.Set(ay, val) require.NoError(t, txn.Commit(nil)) require.Equal(t, uint64(1), kv.orc.readTs()) @@ -201,7 +201,7 @@ func TestTxnWriteSkew(t *testing.T) { sum := getBal(txn1, ax) sum += getBal(txn1, ay) require.Equal(t, 200, sum) - txn1.Set(ax, []byte("0"), 0) // Deduct 100 from ax. + txn1.Set(ax, []byte("0")) // Deduct 100 from ax. // Let's read this back. sum = getBal(txn1, ax) @@ -215,7 +215,7 @@ func TestTxnWriteSkew(t *testing.T) { sum = getBal(txn2, ax) sum += getBal(txn2, ay) require.Equal(t, 200, sum) - txn2.Set(ay, []byte("0"), 0) // Deduct 100 from ay. + txn2.Set(ay, []byte("0")) // Deduct 100 from ay. // Let's read this back. sum = getBal(txn2, ax) @@ -249,21 +249,21 @@ func TestTxnIterationEdgeCase(t *testing.T) { // c1 txn := kv.NewTransaction(true) - txn.Set(kc, []byte("c1"), 0) + txn.Set(kc, []byte("c1")) require.NoError(t, txn.Commit(nil)) require.Equal(t, uint64(1), kv.orc.readTs()) // a2, c2 txn = kv.NewTransaction(true) - txn.Set(ka, []byte("a2"), 0) - txn.Set(kc, []byte("c2"), 0) + txn.Set(ka, []byte("a2")) + txn.Set(kc, []byte("c2")) require.NoError(t, txn.Commit(nil)) require.Equal(t, uint64(2), kv.orc.readTs()) // b3 txn = kv.NewTransaction(true) - txn.Set(ka, []byte("a3"), 0) - txn.Set(kb, []byte("b3"), 0) + txn.Set(ka, []byte("a3")) + txn.Set(kb, []byte("b3")) require.NoError(t, txn.Commit(nil)) require.Equal(t, uint64(3), kv.orc.readTs()) @@ -332,21 +332,21 @@ func TestTxnIterationEdgeCase2(t *testing.T) { // c1 txn := kv.NewTransaction(true) - txn.Set(kc, []byte("c1"), 0) + txn.Set(kc, []byte("c1")) require.NoError(t, txn.Commit(nil)) require.Equal(t, uint64(1), kv.orc.readTs()) // a2, c2 txn = kv.NewTransaction(true) - txn.Set(ka, []byte("a2"), 0) - txn.Set(kc, []byte("c2"), 0) + txn.Set(ka, []byte("a2")) + txn.Set(kc, []byte("c2")) require.NoError(t, txn.Commit(nil)) require.Equal(t, uint64(2), kv.orc.readTs()) // b3 txn = kv.NewTransaction(true) - txn.Set(ka, []byte("a3"), 0) - txn.Set(kb, []byte("b3"), 0) + txn.Set(ka, []byte("a3")) + txn.Set(kb, []byte("b3")) require.NoError(t, txn.Commit(nil)) require.Equal(t, uint64(3), kv.orc.readTs()) @@ -426,13 +426,13 @@ func TestTxnIterationEdgeCase3(t *testing.T) { // c1 txn := kv.NewTransaction(true) - txn.Set(kc, []byte("c1"), 0) + txn.Set(kc, []byte("c1")) require.NoError(t, txn.Commit(nil)) require.Equal(t, uint64(1), kv.orc.readTs()) // b2 txn = kv.NewTransaction(true) - txn.Set(kb, []byte("b2"), 0) + txn.Set(kb, []byte("b2")) require.NoError(t, txn.Commit(nil)) require.Equal(t, uint64(2), kv.orc.readTs()) @@ -488,8 +488,8 @@ func TestIteratorAllVersionsButDeleted(t *testing.T) { // Write two keys err = db.Update(func(txn *Txn) error { - txn.Set([]byte("answer1"), []byte("42"), 0) - txn.Set([]byte("answer2"), []byte("43"), 0) + txn.Set([]byte("answer1"), []byte("42")) + txn.Set([]byte("answer2"), []byte("43")) return nil }) require.NoError(t, err) @@ -501,7 +501,7 @@ func TestIteratorAllVersionsButDeleted(t *testing.T) { err = txn.db.batchSet([]*entry{ { Key: y.KeyWithTs(item.key, item.version), - Meta: bitDelete, + meta: bitDelete, }, }) require.NoError(t, err) @@ -558,7 +558,7 @@ func TestManagedDB(t *testing.T) { // Write data at t=3. txn := kv.NewTransactionAt(3, true) for i := 0; i <= 3; i++ { - require.NoError(t, txn.Set(key(i), val(i), 0)) + require.NoError(t, txn.Set(key(i), val(i))) } require.Error(t, ErrManagedTxn, txn.Commit(nil)) require.NoError(t, txn.CommitAt(3, nil)) @@ -590,7 +590,7 @@ func TestManagedDB(t *testing.T) { if err == nil { continue // Don't overwrite existing keys. } - require.NoError(t, txn.Set(key(i), val(i), 0)) + require.NoError(t, txn.Set(key(i), val(i))) } require.NoError(t, txn.CommitAt(7, nil)) diff --git a/value.go b/value.go index 2f6842997..d0a061929 100644 --- a/value.go +++ b/value.go @@ -210,8 +210,6 @@ func (vlog *valueLog) iterate(lf *logFile, offset uint32, fn logEntry) error { } return err } - e.Meta = h.meta - e.UserMeta = h.userMeta if _, err = io.ReadFull(tee, e.Value); err != nil { if err == io.EOF || err == io.ErrUnexpectedEOF { truncate = true @@ -233,9 +231,11 @@ func (vlog *valueLog) iterate(lf *logFile, offset uint32, fn logEntry) error { truncate = true break } + e.meta = h.meta + e.UserMeta = h.userMeta + e.ExpiresAt = h.expiresAt var vp valuePointer - vp.Len = headerBufSize + h.klen + h.vlen + uint32(len(crcBuf)) recordOffset += vp.Len @@ -303,7 +303,7 @@ func (vlog *valueLog) rewrite(f *logFile) error { if vp.Fid == f.fid && vp.Offset == e.offset { // This new entry only contains the key, and a pointer to the value. ne := new(entry) - ne.Meta = 0 // Remove all bits. + ne.meta = 0 // Remove all bits. ne.UserMeta = e.UserMeta ne.Key = make([]byte, len(e.Key)) copy(ne.Key, e.Key) @@ -771,12 +771,7 @@ func (vlog *valueLog) Read(vp valuePointer) ([]byte, func(), error) { } var h header h.Decode(buf) - if (h.meta & bitDelete) != 0 { - // Tombstone key - return nil, cb, nil - } - n := uint32(headerBufSize) - n += h.klen + n := uint32(headerBufSize) + h.klen return buf[n : n+h.vlen], cb, nil } @@ -798,7 +793,7 @@ func valueBytesToEntry(buf []byte) (e entry) { e.Key = buf[n : n+h.klen] n += h.klen - e.Meta = h.meta + e.meta = h.meta e.UserMeta = h.userMeta e.Value = buf[n : n+h.vlen] return @@ -851,8 +846,7 @@ func discardEntry(e entry, vs y.ValueStruct) bool { // Version not found. Discard. return true } - if (vs.Meta & bitDelete) > 0 { - // Key deleted. Discard. + if isDeletedOrExpired(vs) { return true } if (vs.Meta & bitValuePointer) == 0 { diff --git a/value_test.go b/value_test.go index 7298bf21f..220339621 100644 --- a/value_test.go +++ b/value_test.go @@ -44,12 +44,12 @@ func TestValueBasic(t *testing.T) { e := &entry{ Key: []byte("samplekey"), Value: []byte(val1), - Meta: bitValuePointer, + meta: bitValuePointer, } e2 := &entry{ Key: []byte("samplekeyb"), Value: []byte(val2), - Meta: bitValuePointer, + meta: bitValuePointer, } b := new(request) @@ -71,12 +71,12 @@ func TestValueBasic(t *testing.T) { { Key: []byte("samplekey"), Value: []byte(val1), - Meta: bitValuePointer, + meta: bitValuePointer, }, { Key: []byte("samplekeyb"), Value: []byte(val2), - Meta: bitValuePointer, + meta: bitValuePointer, }, }, readEntries) @@ -97,7 +97,7 @@ func TestValueGC(t *testing.T) { for i := 0; i < 100; i++ { v := make([]byte, sz) rand.Read(v[:rand.Intn(sz)]) - require.NoError(t, txn.Set([]byte(fmt.Sprintf("key%d", i)), v, 0)) + require.NoError(t, txn.Set([]byte(fmt.Sprintf("key%d", i)), v)) if i%20 == 0 { require.NoError(t, txn.Commit(nil)) txn = kv.NewTransaction(true) @@ -148,7 +148,7 @@ func TestValueGC2(t *testing.T) { for i := 0; i < 100; i++ { v := make([]byte, sz) rand.Read(v[:rand.Intn(sz)]) - require.NoError(t, txn.Set([]byte(fmt.Sprintf("key%d", i)), v, 0)) + require.NoError(t, txn.Set([]byte(fmt.Sprintf("key%d", i)), v)) if i%20 == 0 { require.NoError(t, txn.Commit(nil)) txn = kv.NewTransaction(true) @@ -231,7 +231,7 @@ func TestValueGC3(t *testing.T) { } rand.Read(v[:]) // Keys key000, key001, key002, such that sorted order matches insertion order - require.NoError(t, txn.Set([]byte(fmt.Sprintf("key%03d", i)), v, 0)) + require.NoError(t, txn.Set([]byte(fmt.Sprintf("key%03d", i)), v)) if i%20 == 0 { require.NoError(t, txn.Commit(nil)) txn = kv.NewTransaction(true) @@ -295,7 +295,7 @@ func TestValueGC4(t *testing.T) { for i := 0; i < 24; i++ { v := make([]byte, sz) rand.Read(v[:rand.Intn(sz)]) - require.NoError(t, txn.Set([]byte(fmt.Sprintf("key%d", i)), v, 0)) + require.NoError(t, txn.Set([]byte(fmt.Sprintf("key%d", i)), v)) if i%3 == 0 { require.NoError(t, txn.Commit(nil)) txn = kv.NewTransaction(true) @@ -508,7 +508,7 @@ func TestValueLogTrigger(t *testing.T) { for i := 0; i < 100; i++ { v := make([]byte, sz) rand.Read(v[:rand.Intn(sz)]) - require.NoError(t, txn.Set([]byte(fmt.Sprintf("key%d", i)), v, 0)) + require.NoError(t, txn.Set([]byte(fmt.Sprintf("key%d", i)), v)) if i%20 == 0 { require.NoError(t, txn.Commit(nil)) txn = kv.NewTransaction(true) @@ -538,11 +538,11 @@ func createVlog(t *testing.T, entries []*entry) []byte { opts.ValueLogFileSize = 100 * 1024 * 1024 // 100Mb kv, err := Open(opts) require.NoError(t, err) - txnSet(t, kv, entries[0].Key, entries[0].Value, entries[0].Meta) + txnSet(t, kv, entries[0].Key, entries[0].Value, entries[0].meta) entries = entries[1:] txn := kv.NewTransaction(true) for _, entry := range entries { - require.NoError(t, txn.Set(entry.Key, entry.Value, entry.Meta)) + require.NoError(t, txn.SetWithMeta(entry.Key, entry.Value, entry.meta)) } require.NoError(t, txn.Commit(nil)) require.NoError(t, kv.Close()) diff --git a/y/iterator.go b/y/iterator.go index 80e0c77be..719e8ec8e 100644 --- a/y/iterator.go +++ b/y/iterator.go @@ -19,6 +19,7 @@ package y import ( "bytes" "container/heap" + "encoding/binary" "github.com/pkg/errors" ) @@ -26,31 +27,63 @@ import ( // ValueStruct represents the value info that can be associated with a key, but also the internal // Meta field. type ValueStruct struct { - Meta byte - UserMeta byte - Value []byte + Meta byte + UserMeta byte + ExpiresAt uint64 + Value []byte Version uint64 // This field is not serialized. Only for internal usage. } +func sizeVarint(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} + // EncodedSize is the size of the ValueStruct when encoded func (v *ValueStruct) EncodedSize() uint16 { - return uint16(len(v.Value) + 2) + sz := len(v.Value) + 2 // meta, usermeta. + if v.ExpiresAt == 0 { + return uint16(sz + 1) + } + + enc := sizeVarint(v.ExpiresAt) + return uint16(sz + enc) } // Decode uses the length of the slice to infer the length of the Value field. func (v *ValueStruct) Decode(b []byte) { v.Meta = b[0] v.UserMeta = b[1] - v.Value = b[2:] + var sz int + v.ExpiresAt, sz = binary.Uvarint(b[2:]) + v.Value = b[2+sz:] } // Encode expects a slice of length at least v.EncodedSize(). -// TODO: Who calls this? func (v *ValueStruct) Encode(b []byte) { b[0] = v.Meta b[1] = v.UserMeta - copy(b[2:], v.Value) + sz := binary.PutUvarint(b[2:], v.ExpiresAt) + copy(b[2+sz:], v.Value) +} + +// EncodeTo should be kept in sync with the Encode function above. The reason +// this function exists is to avoid creating byte arrays per key-value pair in +// table/builder.go. +func (v *ValueStruct) EncodeTo(buf *bytes.Buffer) { + buf.WriteByte(v.Meta) + buf.WriteByte(v.UserMeta) + var enc [binary.MaxVarintLen64]byte + sz := binary.PutUvarint(enc[:], v.ExpiresAt) + buf.Write(enc[:sz]) + buf.Write(v.Value) } // Iterator is an interface for a basic iterator.