diff --git a/backup.go b/backup.go index 6b89ef6d1..375b5a316 100644 --- a/backup.go +++ b/backup.go @@ -75,12 +75,12 @@ func (db *DB) Backup(w io.Writer, since uint64) (uint64, error) { func (db *DB) Load(r io.Reader) error { br := bufio.NewReaderSize(r, 16<<10) unmarshalBuf := make([]byte, 1<<10) - var entries []*entry + var entries []*Entry var wg sync.WaitGroup errChan := make(chan error, 1) // func to check for pending error before sending off a batch for writing - batchSetAsyncIfNoErr := func(entries []*entry) error { + batchSetAsyncIfNoErr := func(entries []*Entry) error { select { case err := <-errChan: return err @@ -118,7 +118,7 @@ func (db *DB) Load(r io.Reader) error { if err = e.Unmarshal(unmarshalBuf[:sz]); err != nil { return err } - entries = append(entries, &entry{ + entries = append(entries, &Entry{ Key: y.KeyWithTs(e.Key, e.Version), Value: e.Value, UserMeta: e.UserMeta[0], diff --git a/db.go b/db.go index 654124580..236bf1769 100644 --- a/db.go +++ b/db.go @@ -78,7 +78,7 @@ const ( kvWriteChCapacity = 1000 ) -func replayFunction(out *DB) func(entry, valuePointer) error { +func replayFunction(out *DB) func(Entry, valuePointer) error { type txnEntry struct { nk []byte v y.ValueStruct @@ -96,7 +96,7 @@ func replayFunction(out *DB) func(entry, valuePointer) error { } first := true - return func(e entry, vp valuePointer) error { // Function for replaying. + return func(e Entry, vp valuePointer) error { // Function for replaying. if first { out.elog.Printf("First key=%s\n", e.Key) } @@ -482,7 +482,7 @@ var requestPool = sync.Pool{ }, } -func (db *DB) shouldWriteValueToLSM(e entry) bool { +func (db *DB) shouldWriteValueToLSM(e Entry) bool { return len(e.Value) < db.opt.ValueThreshold } @@ -567,7 +567,7 @@ func (db *DB) writeRequests(reqs []*request) error { return nil } -func (db *DB) sendToWriteCh(entries []*entry) (*request, error) { +func (db *DB) sendToWriteCh(entries []*Entry) (*request, error) { var count, size int64 for _, e := range entries { size += int64(e.estimateSize(db.opt.ValueThreshold)) @@ -652,7 +652,7 @@ func (db *DB) doWrites(lc *y.Closer) { // batchSet applies a list of badger.Entry. If a request level error occurs it // will be returned. // Check(kv.BatchSet(entries)) -func (db *DB) batchSet(entries []*entry) error { +func (db *DB) batchSet(entries []*Entry) error { req, err := db.sendToWriteCh(entries) if err != nil { return err @@ -671,7 +671,7 @@ func (db *DB) batchSet(entries []*entry) error { // err := kv.BatchSetAsync(entries, func(err error)) { // Check(err) // } -func (db *DB) batchSetAsync(entries []*entry, f func(error)) error { +func (db *DB) batchSetAsync(entries []*Entry, f func(error)) error { req, err := db.sendToWriteCh(entries) if err != nil { return err @@ -881,7 +881,7 @@ func (db *DB) purgeVersionsBelow(txn *Txn, key []byte, ts uint64) error { opts.PrefetchValues = false it := txn.NewIterator(opts) - var entries []*entry + var entries []*Entry for it.Seek(key); it.ValidForPrefix(key); it.Next() { item := it.Item() @@ -891,7 +891,7 @@ func (db *DB) purgeVersionsBelow(txn *Txn, key []byte, ts uint64) error { // Found an older version. Mark for deletion entries = append(entries, - &entry{ + &Entry{ Key: y.KeyWithTs(key, item.version), meta: bitDelete, }) @@ -913,14 +913,14 @@ func (db *DB) PurgeOlderVersions() error { opts.PrefetchValues = false it := txn.NewIterator(opts) - var entries []*entry + var entries []*Entry var lastKey []byte var count int var wg sync.WaitGroup errChan := make(chan error, 1) // func to check for pending error before sending off a batch for writing - batchSetAsyncIfNoErr := func(entries []*entry) error { + batchSetAsyncIfNoErr := func(entries []*Entry) error { select { case err := <-errChan: return err @@ -946,7 +946,7 @@ func (db *DB) PurgeOlderVersions() error { } // Found an older version. Mark for deletion entries = append(entries, - &entry{ + &Entry{ Key: y.KeyWithTs(lastKey, item.version), meta: bitDelete, }) @@ -959,7 +959,7 @@ func (db *DB) PurgeOlderVersions() error { return err } count = 0 - entries = []*entry{} + entries = []*Entry{} } } diff --git a/structs.go b/structs.go index 841915207..09547a42f 100644 --- a/structs.go +++ b/structs.go @@ -76,8 +76,8 @@ func (h *header) Decode(buf []byte) { h.userMeta = buf[17] } -// entry provides Key, Value, UserMeta and ExpiresAt. This struct can be used by the user to set data. -type entry struct { +// Entry provides Key, Value, UserMeta and ExpiresAt. This struct can be used by the user to set data. +type Entry struct { Key []byte Value []byte UserMeta byte @@ -88,7 +88,7 @@ type entry struct { offset uint32 } -func (e *entry) estimateSize(threshold int) int { +func (e *Entry) estimateSize(threshold int) int { if len(e.Value) < threshold { return len(e.Key) + len(e.Value) + 2 // Meta, UserMeta } @@ -96,7 +96,7 @@ func (e *entry) estimateSize(threshold int) int { } // Encodes e to buf. Returns number of bytes written. -func encodeEntry(e *entry, buf *bytes.Buffer) (int, error) { +func encodeEntry(e *Entry, buf *bytes.Buffer) (int, error) { h := header{ klen: uint32(len(e.Key)), vlen: uint32(len(e.Value)), @@ -126,7 +126,7 @@ func encodeEntry(e *entry, buf *bytes.Buffer) (int, error) { return len(headerEnc) + len(e.Key) + len(e.Value) + len(crcBuf), nil } -func (e entry) print(prefix string) { +func (e Entry) print(prefix string) { fmt.Printf("%s Key: %s Meta: %d UserMeta: %d Offset: %d len(val)=%d", prefix, e.Key, e.meta, e.UserMeta, e.offset, len(e.Value)) } diff --git a/transaction.go b/transaction.go index 962606a4b..7617b3be9 100644 --- a/transaction.go +++ b/transaction.go @@ -185,7 +185,7 @@ type Txn struct { reads []uint64 // contains fingerprints of keys read. writes []uint64 // contains fingerprints of keys written. - pendingWrites map[string]*entry // cache stores any writes done by txn. + pendingWrites map[string]*Entry // cache stores any writes done by txn. db *DB callbacks []func() @@ -196,7 +196,7 @@ type Txn struct { } type pendingWritesIterator struct { - entries []*entry + entries []*Entry nextIdx int readTs uint64 reversed bool @@ -251,7 +251,7 @@ func (txn *Txn) newPendingWritesIterator(reversed bool) *pendingWritesIterator { if !txn.update || len(txn.pendingWrites) == 0 { return nil } - entries := make([]*entry, 0, len(txn.pendingWrites)) + entries := make([]*Entry, 0, len(txn.pendingWrites)) for _, e := range txn.pendingWrites { entries = append(entries, e) } @@ -270,7 +270,7 @@ func (txn *Txn) newPendingWritesIterator(reversed bool) *pendingWritesIterator { } } -func (txn *Txn) checkSize(e *entry) error { +func (txn *Txn) checkSize(e *Entry) error { count := txn.count + 1 // Extra bytes for version in key. size := txn.size + int64(e.estimateSize(txn.db.opt.ValueThreshold)) + 10 @@ -286,11 +286,11 @@ func (txn *Txn) checkSize(e *entry) error { // It will return ErrReadOnlyTxn if update flag was set to false when creating the // transaction. func (txn *Txn) Set(key, val []byte) error { - e := &entry{ + e := &Entry{ Key: key, Value: val, } - return txn.setEntry(e) + return txn.SetEntry(e) } // SetWithMeta adds a key-value pair to the database, along with a metadata @@ -298,8 +298,8 @@ func (txn *Txn) Set(key, val []byte) error { // interpret the value or store other contextual bits corresponding to the // key-value pair. func (txn *Txn) SetWithMeta(key, val []byte, meta byte) error { - e := &entry{Key: key, Value: val, UserMeta: meta} - return txn.setEntry(e) + e := &Entry{Key: key, Value: val, UserMeta: meta} + return txn.SetEntry(e) } // SetWithTTL adds a key-value pair to the database, along with a time-to-live @@ -307,11 +307,13 @@ func (txn *Txn) SetWithMeta(key, val []byte, meta byte) error { // the time has elapsed , and be eligible for garbage collection. func (txn *Txn) SetWithTTL(key, val []byte, dur time.Duration) error { expire := time.Now().Add(dur).Unix() - e := &entry{Key: key, Value: val, ExpiresAt: uint64(expire)} - return txn.setEntry(e) + e := &Entry{Key: key, Value: val, ExpiresAt: uint64(expire)} + return txn.SetEntry(e) } -func (txn *Txn) setEntry(e *entry) error { +// SetEntry takes an Entry struct and adds the key-value pair in the struct, along +// with other metadata to the database. +func (txn *Txn) SetEntry(e *Entry) error { switch { case !txn.update: return ErrReadOnlyTxn @@ -348,7 +350,7 @@ func (txn *Txn) Delete(key []byte) error { return exceedsMaxKeySizeError(key) } - e := &entry{ + e := &Entry{ Key: key, meta: bitDelete, } @@ -474,7 +476,7 @@ func (txn *Txn) Commit(callback func(error)) error { return ErrConflict } - entries := make([]*entry, 0, len(txn.pendingWrites)+1) + entries := make([]*Entry, 0, len(txn.pendingWrites)+1) for _, e := range txn.pendingWrites { // Suffix the keys with commit ts, so the key versions are sorted in // descending order of commit timestamp. @@ -482,7 +484,7 @@ func (txn *Txn) Commit(callback func(error)) error { e.meta |= bitTxn entries = append(entries, e) } - e := &entry{ + e := &Entry{ Key: y.KeyWithTs(txnKey, commitTs), Value: []byte(strconv.FormatUint(commitTs, 10)), meta: bitFinTxn, @@ -532,7 +534,7 @@ func (db *DB) NewTransaction(update bool) *Txn { size: int64(len(txnKey) + 10), // Some buffer for the extra entry. } if update { - txn.pendingWrites = make(map[string]*entry) + txn.pendingWrites = make(map[string]*Entry) txn.db.orc.addRef() } return txn diff --git a/transaction_test.go b/transaction_test.go index fe1566ce9..d88e0b82c 100644 --- a/transaction_test.go +++ b/transaction_test.go @@ -584,7 +584,7 @@ func TestIteratorAllVersionsButDeleted(t *testing.T) { err = db.View(func(txn *Txn) error { item, err := txn.Get([]byte("answer1")) require.NoError(t, err) - err = txn.db.batchSet([]*entry{ + err = txn.db.batchSet([]*Entry{ { Key: y.KeyWithTs(item.key, item.version), meta: bitDelete, diff --git a/value.go b/value.go index 67e3797ca..1e99f274e 100644 --- a/value.go +++ b/value.go @@ -151,7 +151,7 @@ func (lf *logFile) sync() error { var errStop = errors.New("Stop iteration") -type logEntry func(e entry, vp valuePointer) error +type logEntry func(e Entry, vp valuePointer) error // iterate iterates over log file. It doesn't not allocate new memory for every kv pair. // Therefore, the kv pair is only valid for the duration of fn call. @@ -184,7 +184,7 @@ func (vlog *valueLog) iterate(lf *logFile, offset uint32, fn logEntry) error { return err } - var e entry + var e Entry e.offset = recordOffset h.Decode(hbuf[:]) if h.klen > maxKeySize { @@ -268,12 +268,12 @@ func (vlog *valueLog) rewrite(f *logFile) error { defer elog.Finish() elog.Printf("Rewriting fid: %d", f.fid) - wb := make([]*entry, 0, 1000) + wb := make([]*Entry, 0, 1000) var size int64 y.AssertTrue(vlog.kv != nil) var count int - fe := func(e entry) error { + fe := func(e Entry) error { count++ if count%10000 == 0 { elog.Printf("Processing entry %d", count) @@ -302,7 +302,7 @@ func (vlog *valueLog) rewrite(f *logFile) error { } if vp.Fid == f.fid && vp.Offset == e.offset { // This new entry only contains the key, and a pointer to the value. - ne := new(entry) + ne := new(Entry) ne.meta = 0 // Remove all bits. ne.UserMeta = e.UserMeta ne.Key = make([]byte, len(e.Key)) @@ -325,7 +325,7 @@ func (vlog *valueLog) rewrite(f *logFile) error { return nil } - err := vlog.iterate(f, 0, func(e entry, vp valuePointer) error { + err := vlog.iterate(f, 0, func(e Entry, vp valuePointer) error { return fe(e) }) if err != nil { @@ -631,7 +631,7 @@ func (vlog *valueLog) Replay(ptr valuePointer, fn logEntry) error { type request struct { // Input values - Entries []*entry + Entries []*Entry // Output values and wait group stuff below Ptrs []valuePointer Wg sync.WaitGroup @@ -786,7 +786,7 @@ func (vlog *valueLog) readValueBytes(vp valuePointer) ([]byte, func(), error) { } // Test helper -func valueBytesToEntry(buf []byte) (e entry) { +func valueBytesToEntry(buf []byte) (e Entry) { var h header h.Decode(buf) n := uint32(headerBufSize) @@ -841,7 +841,7 @@ func (vlog *valueLog) pickLog(head valuePointer) *logFile { return vlog.filesMap[fids[idx]] } -func discardEntry(e entry, vs y.ValueStruct) bool { +func discardEntry(e Entry, vs y.ValueStruct) bool { if vs.Version != y.ParseTs(e.Key) { // Version not found. Discard. return true @@ -892,7 +892,7 @@ func (vlog *valueLog) doRunGC(gcThreshold float64, head valuePointer) (err error start := time.Now() y.AssertTrue(vlog.kv != nil) - err = vlog.iterate(lf, 0, func(e entry, vp valuePointer) error { + err = vlog.iterate(lf, 0, func(e Entry, vp valuePointer) error { esz := float64(vp.Len) / (1 << 20) // in MBs. +4 for the CAS stuff. skipped += esz if skipped < skipFirstM { diff --git a/value_test.go b/value_test.go index 220339621..294a359dc 100644 --- a/value_test.go +++ b/value_test.go @@ -41,19 +41,19 @@ func TestValueBasic(t *testing.T) { const val2 = "samplevalb012345678901234567890123" require.True(t, len(val1) >= kv.opt.ValueThreshold) - e := &entry{ + e := &Entry{ Key: []byte("samplekey"), Value: []byte(val1), meta: bitValuePointer, } - e2 := &entry{ + e2 := &Entry{ Key: []byte("samplekeyb"), Value: []byte(val2), meta: bitValuePointer, } b := new(request) - b.Entries = []*entry{e, e2} + b.Entries = []*Entry{e, e2} log.write([]*request{b}) require.Len(t, b.Ptrs, 2) @@ -66,8 +66,8 @@ func TestValueBasic(t *testing.T) { defer runCallback(cb1) defer runCallback(cb2) - readEntries := []entry{valueBytesToEntry(buf1), valueBytesToEntry(buf2)} - require.EqualValues(t, []entry{ + readEntries := []Entry{valueBytesToEntry(buf1), valueBytesToEntry(buf2)} + require.EqualValues(t, []Entry{ { Key: []byte("samplekey"), Value: []byte(val1), @@ -375,7 +375,7 @@ func TestChecksums(t *testing.T) { require.True(t, len(v0) >= kv.opt.ValueThreshold) // Use a vlog with K0=V0 and a (corrupted) second transaction(k1,k2) - buf := createVlog(t, []*entry{ + buf := createVlog(t, []*Entry{ {Key: k0, Value: v0}, {Key: k1, Value: v1}, {Key: k2, Value: v2}, @@ -458,7 +458,7 @@ func TestPartialAppendToValueLog(t *testing.T) { // Create truncated vlog to simulate a partial append. // k0 - single transaction, k1 and k2 in another transaction - buf := createVlog(t, []*entry{ + buf := createVlog(t, []*Entry{ {Key: k0, Value: v0}, {Key: k1, Value: v1}, {Key: k2, Value: v2}, @@ -529,7 +529,7 @@ func TestValueLogTrigger(t *testing.T) { require.Equal(t, ErrRejected, err, "Error should be returned after closing DB.") } -func createVlog(t *testing.T, entries []*entry) []byte { +func createVlog(t *testing.T, entries []*Entry) []byte { dir, err := ioutil.TempDir("", "badger") require.NoError(t, err) defer os.RemoveAll(dir) @@ -585,11 +585,11 @@ func BenchmarkReadWrite(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - e := new(entry) + e := new(Entry) e.Key = make([]byte, 16) e.Value = make([]byte, vsz) bl := new(request) - bl.Entries = []*entry{e} + bl.Entries = []*Entry{e} var ptrs []valuePointer