Skip to content

Commit

Permalink
Introduce ManagedDB struct to perform managed transactions.
Browse files Browse the repository at this point in the history
This is a bit of a cleanup of dgraph-io#280 (committed as
1d8f9b1)

* Make ManagedTxns option private, so that it can be set only within the
package.

* Introduce a separate struct ManagedDB, which embeds badger.DB.

* Introduce ManagedDB.NewTransaction() and ManagedDB.NewTransactionAt()
methods.

* Introduce Txn.CommitAt, applicable only to ManageDB.
  • Loading branch information
deepakjois committed Oct 23, 2017
1 parent 9a3390d commit 436e090
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 38 deletions.
2 changes: 1 addition & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func Open(opt Options) (db *DB, err error) {
}()

orc := &oracle{
isManaged: opt.ManagedTxns,
isManaged: opt.managedTxns,
nextCommit: 1,
pendingCommits: make(map[uint64]struct{}),
commits: make(map[uint64]uint64),
Expand Down
7 changes: 4 additions & 3 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,10 @@ var (
// ErrInvalidRequest is returned if the user request is invalid.
ErrInvalidRequest = errors.New("Invalid request")

// ErrManagedTxn is returned if the user tries to use an API which isn't allowed due to
// external management of transactions.
ErrManagedTxn = errors.New("Invalid API request for managed transaction")
// ErrManagedTxn is returned if the user tries to use an API which isn't
// allowed due toV external management of transactions, when using ManagedDB.
ErrManagedTxn = errors.New(
"Invalid API request. Not allowed to perform this action using ManagedDB")
)

const maxKeySize = 1 << 20
Expand Down
73 changes: 73 additions & 0 deletions managed_db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright 2017 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package badger

// ManagedDB allows end users to manage the transactions themselves. Transaction
// start and commit timestamps are set by end-user.
//
// This is only useful for databases built on top of Badger (like Dgraph), and
// can be ignored by most users.
//
// WARNING: This is an experimental feature and may be changed significantly in
// a future release. So please proceed with caution.
type ManagedDB struct {
*DB
}

// OpenManaged returns a new ManagedDB, which allows more control over setting
// transaction timestamps.
//
// This is only useful for databases built on top of Badger (like Dgraph), and
// can be ignored by most users.
func OpenManaged(opts Options) (*ManagedDB, error) {
opts.managedTxns = true
db, err := Open(opts)
if err != nil {
return nil, err
}
return &ManagedDB{db}, nil
}

// NewTransaction overrides DB.NewTransaction() and panics when invoked. Use
// NewTransactionAt() instead.
func (db *ManagedDB) NewTransaction(update bool) {
panic("Cannot use NewTransaction() for ManagedDB. Use NewTransactionAt() instead.")
}

// NewTransactionAt follows the same logic as DB.NewTransaction(), but uses the
// provided read timestamp.
//
// This is only useful for databases built on top of Badger (like Dgraph), and
// can be ignored by most users.
func (db *ManagedDB) NewTransactionAt(readTs uint64, update bool) *Txn {
txn := db.DB.NewTransaction(update)
txn.readTs = readTs
return txn
}

// CommitAt commits the transaction, following the same logic as Commit(), but
// at the given commit timestamp. This will panic if not used with ManagedDB.
//
// This is only useful for databases built on top of Badger (like Dgraph), and
// can be ignored by most users.
func (txn *Txn) CommitAt(commitTs uint64, callback func(error)) error {
if !txn.db.opt.managedTxns {
panic("CommitAt() can only be used with ManagedDB. Use Commit() instead.")
}
txn.commitTs = commitTs
return txn.Commit(callback)
}
5 changes: 3 additions & 2 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ type Options struct {
// Number of compaction workers to run concurrently.
NumCompactors int

// Transaction start and commit timestamps are managed by end-user.
ManagedTxns bool
// Transaction start and commit timestamps are manaVgedTxns by end-user. This
// is a private option used by ManagedDB.
managedTxns bool

// 4. Flags for testing purposes
// ------------------------------
Expand Down
30 changes: 4 additions & 26 deletions transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,8 @@ func (txn *Txn) Discard() {
// If error is nil, the transaction is successfully committed. In case of a non-nil error, the LSM
// tree won't be updated, so there's no need for any rollback.
func (txn *Txn) Commit(callback func(error)) error {
if txn.commitTs == 0 && txn.db.opt.ManagedTxns {
return ErrManagedTxn
if txn.commitTs == 0 && txn.db.opt.managedTxns {
panic("Cannot use Commit() for ManagedDB. Use CommitAt() instead.")
}
if txn.discarded {
return ErrDiscardedTxn
Expand Down Expand Up @@ -374,19 +374,6 @@ func (txn *Txn) Commit(callback func(error)) error {
return txn.db.batchSetAsync(entries, callback)
}

// CommitAt commits the transaction, following the same logic as Commit(), but at the given
// commit timestamp. It returns an error if ManagedTxns option is not set.
//
// This API is only useful for databases built on top of Badger (like Dgraph), and
// can be ignored by most users.
func (txn *Txn) CommitAt(commitTs uint64, callback func(error)) error {
if !txn.db.opt.ManagedTxns {
return ErrManagedTxn
}
txn.commitTs = commitTs
return txn.Commit(callback)
}

// NewTransaction creates a new transaction. Badger supports concurrent execution of transactions,
// providing serializable snapshot isolation, avoiding write skews. Badger achieves this by tracking
// the keys read and at Commit time, ensuring that these read keys weren't concurrently modified by
Expand Down Expand Up @@ -420,19 +407,10 @@ func (db *DB) NewTransaction(update bool) *Txn {
return txn
}

// NewTransactionAt follows the same logic as NewTransaction, but uses the provided read timestamp.
// This API is only useful for databases built on top of Badger (like Dgraph), and can be ignored by
// most users.
func (db *DB) NewTransactionAt(readTs uint64, update bool) *Txn {
txn := db.NewTransaction(update)
txn.readTs = readTs
return txn
}

// View executes a function creating and managing a read-only transaction for the user. Error
// returned by the function is relayed by the View method.
func (db *DB) View(fn func(txn *Txn) error) error {
if db.opt.ManagedTxns {
if db.opt.managedTxns {
return ErrManagedTxn
}
txn := db.NewTransaction(false)
Expand All @@ -444,7 +422,7 @@ func (db *DB) View(fn func(txn *Txn) error) error {
// Update executes a function, creating and managing a read-write transaction
// for the user. Error returned by the function is relayed by the Update method.
func (db *DB) Update(fn func(txn *Txn) error) error {
if db.opt.ManagedTxns {
if db.opt.managedTxns {
return ErrManagedTxn
}
txn := db.NewTransaction(true)
Expand Down
13 changes: 7 additions & 6 deletions transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestTxnSimple(t *testing.T) {
require.NoError(t, err)
require.Equal(t, []byte("val=8"), val)

require.Equal(t, ErrManagedTxn, txn.CommitAt(100, nil))
require.Panics(t, func() { txn.CommitAt(100, nil) })
require.NoError(t, txn.Commit(nil))
}

Expand Down Expand Up @@ -528,14 +528,13 @@ func TestIteratorAllVersionsButDeleted(t *testing.T) {
require.NoError(t, err)
}

func TestTxnManaged(t *testing.T) {
func TestManagedDB(t *testing.T) {
dir, err := ioutil.TempDir("", "badger")
require.NoError(t, err)
defer os.RemoveAll(dir)

opt := getTestOptions(dir)
opt.ManagedTxns = true
kv, err := Open(opt)
kv, err := OpenManaged(opt)
require.NoError(t, err)
defer kv.Close()

Expand All @@ -547,7 +546,9 @@ func TestTxnManaged(t *testing.T) {
return []byte(fmt.Sprintf("val-%d", i))
}

// Don't allow these APIs in managed.
// Don't allow these APIs in ManagedDB
require.Panics(t, func() { kv.NewTransaction(false) })

err = kv.Update(func(tx *Txn) error { return nil })
require.Equal(t, ErrManagedTxn, err)

Expand All @@ -559,7 +560,7 @@ func TestTxnManaged(t *testing.T) {
for i := 0; i <= 3; i++ {
require.NoError(t, txn.Set(key(i), val(i), 0))
}
require.Equal(t, ErrManagedTxn, txn.Commit(nil))
require.Panics(t, func() { txn.Commit(nil) })
require.NoError(t, txn.CommitAt(3, nil))

// Read data at t=2.
Expand Down

0 comments on commit 436e090

Please sign in to comment.