Skip to content

Commit

Permalink
Allow ManagedDB to set a discard below timestamp. So, only versions b…
Browse files Browse the repository at this point in the history
…elow this timestamp get discarded, hence ensuring that we keep all the versions since the last snapshot.
  • Loading branch information
manishrjain committed Jul 11, 2018
1 parent 391b6d3 commit 937af5f
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 6 deletions.
10 changes: 5 additions & 5 deletions levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,10 +308,10 @@ func (s *levelsController) compactBuildTables(

it.Rewind()

// Pick up the currently pending transactions' min readTs, so we can discard versions below this
// readTs. We should never discard any versions starting from above this timestamp, because that
// would affect the snapshot view guarantee provided by transactions.
minReadTs := s.kv.orc.readMark.MinReadTs()
// Pick a discard ts, so we can discard versions below this ts. We should
// never discard any versions starting from above this timestamp, because
// that would affect the snapshot view guarantee provided by transactions.
discardTs := s.kv.orc.discardAtOrBelow()

// Start generating new tables.
type newTableResult struct {
Expand Down Expand Up @@ -350,7 +350,7 @@ func (s *levelsController) compactBuildTables(

vs := it.Value()
version := y.ParseTs(it.Key())
if version <= minReadTs {
if version <= discardTs {
// Keep track of the number of versions encountered for this key. Only consider the
// versions which are below the minReadTs, otherwise, we might end up discarding the
// only valid version for a running transaction.
Expand Down
7 changes: 7 additions & 0 deletions managed_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,10 @@ func (txn *Txn) CommitAt(commitTs uint64, callback func(error)) error {
func (db *ManagedDB) GetSequence(_ []byte, _ uint64) (*Sequence, error) {
panic("Cannot use GetSequence for ManagedDB.")
}

// SetDiscardTs sets a timestamp at or below which, any invalid or deleted
// versions can be discarded from the LSM tree, and thence from the value log to
// reclaim disk space.
func (db *ManagedDB) SetDiscardTs(ts uint64) {
db.orc.setDiscardTs(ts)
}
22 changes: 21 additions & 1 deletion transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ type oracle struct {
writeLock sync.Mutex
nextCommit uint64

readMark y.WaterMark
// Either of these is used to determine which versions can be permanently
// discarded during compaction.
discardTs uint64 // Used by ManagedDB.
readMark y.WaterMark // Used by DB.

// commits stores a key fingerprint and latest commit counter for it.
// refCount is used to clear out commits map to avoid a memory blowup.
Expand Down Expand Up @@ -81,6 +84,23 @@ func (o *oracle) commitTs() uint64 {
return o.nextCommit
}

// Any deleted or invalid versions at or below ts would be discarded during
// compaction to reclaim disk space in LSM tree and thence value log.
func (o *oracle) setDiscardTs(ts uint64) {
o.Lock()
defer o.Unlock()
o.discardTs = ts
}

func (o *oracle) discardAtOrBelow() uint64 {
if o.isManaged {
o.Lock()
defer o.Unlock()
return o.discardTs
}
return o.readMark.MinReadTs()
}

// hasConflict must be called while having a lock.
func (o *oracle) hasConflict(txn *Txn) bool {
if len(txn.reads) == 0 {
Expand Down

0 comments on commit 937af5f

Please sign in to comment.