Skip to content

Commit

Permalink
Add logic for fake transaction buffering in VTGate.
Browse files Browse the repository at this point in the history
  • Loading branch information
aaijazi committed Mar 14, 2016
1 parent b50bcda commit cd8858e
Show file tree
Hide file tree
Showing 4 changed files with 289 additions and 0 deletions.
5 changes: 5 additions & 0 deletions go/vt/vtgate/discoverygateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/youtube/vitess/go/vt/tabletserver/tabletconn"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/vterrors"
"github.com/youtube/vitess/go/vt/vtgate/txbuffer"

topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
vtrpcpb "github.com/youtube/vitess/go/vt/proto/vtrpc"
Expand Down Expand Up @@ -141,9 +142,13 @@ func (dg *discoveryGateway) StreamExecute(ctx context.Context, keyspace, shard s
// Begin starts a transaction for the specified keyspace, shard, and tablet type.
// It returns the transaction ID.
func (dg *discoveryGateway) Begin(ctx context.Context, keyspace string, shard string, tabletType topodatapb.TabletType) (transactionID int64, err error) {
attemptNumber := 0
err = dg.withRetry(ctx, keyspace, shard, tabletType, func(conn tabletconn.TabletConn) error {
var innerErr error
// Potentially buffer this transaction.
txbuffer.FakeBuffer(keyspace, shard, attemptNumber)
transactionID, innerErr = conn.Begin(ctx)
attemptNumber++
return innerErr
}, 0, false)
return transactionID, err
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vtgate/shard_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/youtube/vitess/go/vt/tabletserver/tabletconn"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/vterrors"
"github.com/youtube/vitess/go/vt/vtgate/txbuffer"
"golang.org/x/net/context"

topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
Expand Down Expand Up @@ -166,9 +167,13 @@ func (sdc *ShardConn) StreamExecute(ctx context.Context, query string, bindVars

// Begin begins a transaction. The retry rules are the same as Execute.
func (sdc *ShardConn) Begin(ctx context.Context) (transactionID int64, err error) {
attemptNumber := 0
err = sdc.withRetry(ctx, func(conn tabletconn.TabletConn) error {
var innerErr error
// Potentially buffer this transaction.
txbuffer.FakeBuffer(sdc.keyspace, sdc.shard, attemptNumber)
transactionID, innerErr = conn.Begin(ctx)
attemptNumber++
return innerErr
}, 0, false)
return transactionID, err
Expand Down
70 changes: 70 additions & 0 deletions go/vt/vtgate/txbuffer/tx_buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2015, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

/*
Package txbuffer contains experimental logic to buffer transactions in VTGate.
Only the Begin statement of transactions will be buffered.
The reason why it might be useful to buffer transactions is during failovers:
the master vttablet can become unavailable for a few seconds. Upstream clients
(e.g., web workers) might not retry on failures, and instead may prefer for VTGate to wait for
a few seconds for the failover to complete. Thiis will block upstream callers for that time,
but will not return transient errors during the buffering time.
*/
package txbuffer

import (
"flag"
"sync"
"time"

"github.com/youtube/vitess/go/stats"
)

var (
enableFakeTxBuffer = flag.Bool("enable-fake-tx_buffer", false, "Enable fake transaction buffering.")
bufferKeyspace = flag.String("buffer-keyspace", "", "The name of the keyspace to buffer transactions on.")
bufferShard = flag.String("buffer-shard", "", "The name of the shard to buffer transactions on.")
maxBufferSize = flag.Int("max-buffer-size", 10, "The maximum number of transactions to buffer at a time.")
fakeBufferDelay = flag.Duration("fake-buffer-delay", 1*time.Second, "The amount of time that we should delay all transactions for, to fake a transaction buffer.")

bufferedTransactionsAttempted = stats.NewInt("BufferedTransactionsAttempted")
bufferedTransactionsSuccessful = stats.NewInt("BufferedTransactionsSuccessful")
// Use this lock when adding to the number of currently buffered transactions.
bufferMu sync.Mutex
bufferedTransactions = stats.NewInt("BufferedTransactions")
)

// timeSleep can be mocked out in unit tests
var timeSleep = time.Sleep

// FakeBuffer will pretend to buffer new transactions in VTGate.
// Transactions *will NOT actually be buffered*, they will just have a delayed start time.
// This can be useful to understand what the impact of trasaction buffering will be
// on upstream callers. Once the impact is measured, it can be used to tweak parameter values
// for the best behavior.
// FakeBuffer should be called before the VtTablet Begin, otherwise it will increase transaction times.
func FakeBuffer(keyspace, shard string, attemptNumber int) {
// Only buffer on the first Begin attempt, not on possible retries.
if !*enableFakeTxBuffer || attemptNumber != 0 {
return
}
if keyspace != *bufferKeyspace || shard != *bufferShard {
return
}
bufferedTransactionsAttempted.Add(1)

bufferMu.Lock()
if int(bufferedTransactions.Get()) >= *maxBufferSize {
bufferMu.Unlock()
return
}
bufferedTransactions.Add(1)
bufferMu.Unlock()

defer bufferedTransactionsSuccessful.Add(1)
timeSleep(*fakeBufferDelay)
// Don't need to lock for this, as there's no race when decrementing the count
bufferedTransactions.Add(-1)
}
209 changes: 209 additions & 0 deletions go/vt/vtgate/txbuffer/tx_buffer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
// Copyright 2015, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package txbuffer

import (
"sync"
"testing"
"time"
)

// fakeSleepController is used to control a fake sleepFunc
type fakeSleepController struct {
called bool
block bool
// block until the done channel if closed, if configured to do so.
done chan struct{}
}

type sleepFunc func(d time.Duration)

// createFakeSleep creates a function that can be called to fake sleeping.
// The created fake is managed by the passed in fakeSleepController.
func createFakeSleep(c *fakeSleepController) sleepFunc {
return func(d time.Duration) {
c.called = true
if !c.block {
return
}
select {
case <-c.done:
return
}
}
}

func TestFakeBuffer(t *testing.T) {
unbufferedKeyspace := "ukeyspace"
unbufferedShard := "80-"
bufferedKeyspace := "bkeyspace"
bufferedShard := "-80"

*bufferKeyspace = bufferedKeyspace
*bufferShard = bufferedShard

for _, test := range []struct {
desc string
enableFakeBuffer bool
keyspace string
shard string
attemptNumber int
bufferedTransactions int
// was this transaction buffered?
wantCalled bool
// expected value of BufferedTransactionAttempts
wantAttempted int
}{
{
desc: "enableFakeBuffer=False",
enableFakeBuffer: false,
},
{
desc: "attemptNumber != 0",
enableFakeBuffer: true,
attemptNumber: 1,
},
{
desc: "unbuffered keyspace",
enableFakeBuffer: true,
keyspace: unbufferedKeyspace,
shard: bufferedShard,
},
{
desc: "unbuffered shard",
enableFakeBuffer: true,
keyspace: bufferedKeyspace,
shard: unbufferedShard,
},
{
desc: "buffer full",
enableFakeBuffer: true,
keyspace: bufferedKeyspace,
shard: bufferedShard,
bufferedTransactions: *maxBufferSize,
// When the buffer is full, bufferedTransactionsAttempted should still be incremented
wantAttempted: 1,
},
{
desc: "buffered successful",
enableFakeBuffer: true,
keyspace: bufferedKeyspace,
shard: bufferedShard,
wantCalled: true,
wantAttempted: 1,
},
} {
controller := &fakeSleepController{}
timeSleep = createFakeSleep(controller)
// reset counters
bufferedTransactionsAttempted.Set(0)
bufferedTransactionsSuccessful.Set(0)
bufferedTransactions.Set(int64(test.bufferedTransactions))

*enableFakeTxBuffer = test.enableFakeBuffer

FakeBuffer(test.keyspace, test.shard, test.attemptNumber)

if controller.called != test.wantCalled {
t.Errorf("With %v, FakeBuffer() => timeSleep.called: %v; want: %v",
test.desc, controller.called, test.wantCalled)
}

if bufferedTransactionsAttempted.Get() != int64(test.wantAttempted) {
t.Errorf("With %v, FakeBuffer() => BufferedTransactionsAttempted got: %v; want: %v",
test.desc, bufferedTransactionsAttempted.Get(), test.wantAttempted)
}

if (!test.wantCalled && (bufferedTransactionsSuccessful.Get() == 1)) ||
(test.wantCalled && (bufferedTransactionsSuccessful.Get() != 1)) {
t.Errorf("With %v, FakeBuffer() => BufferedTransactionsSuccessful got: %v; want: 1",
test.desc, bufferedTransactionsSuccessful.Get())
}
}
}

// min for ints
func min(x, y int) int {
if x < y {
return x
}
return y
}

func TestParallelFakeBuffer(t *testing.T) {
bufferedKeyspace := "bkeyspace"
bufferedShard := "-80"

*bufferKeyspace = bufferedKeyspace
*bufferShard = bufferedShard
*enableFakeTxBuffer = true

// reset counters
bufferedTransactionsAttempted.Set(0)
bufferedTransactionsSuccessful.Set(0)

var controllers []*fakeSleepController
var wg sync.WaitGroup

for i := 1; i <= *maxBufferSize+2; i++ {
controller := &fakeSleepController{
block: true,
done: make(chan struct{}),
}
timeSleep = createFakeSleep(controller)
// Only the first maxBufferSize calls to FakeBuffer should actually call fakeSleep
wantFakeSleepCalled := (i <= *maxBufferSize)

wg.Add(1)
go func() {
defer wg.Done()
FakeBuffer(*bufferKeyspace, *bufferShard, 0)
}()
// Give the goroutine some time to run.
// Ideally, we'd use a channel here to indicate when the fake sleep starts blocking.
// However, we can't do that because for some of the goroutines, the fake sleep is never
// even called.
time.Sleep(10 * time.Microsecond)

if controller.called {
controllers = append(controllers, controller)
}

if controller.called != wantFakeSleepCalled {
t.Errorf("On iteration %v, FakeBuffer() => timeSleep.called: %v; want: %v",
i, controller.called, wantFakeSleepCalled)
}

if int(bufferedTransactionsAttempted.Get()) != i {
t.Errorf("On iteration %v, FakeBuffer() => BufferedTransactionsAttempted got: %v; want: %v",
i, bufferedTransactionsAttempted.Get(), i)
}

if int(bufferedTransactions.Get()) != min(i, *maxBufferSize) {
t.Errorf("On iteration %v, FakeBuffer() => BufferedTransactions got: %v; want: %v",
i, bufferedTransactions.Get(), min(i, *maxBufferSize))
}

if int(bufferedTransactionsSuccessful.Get()) != 0 {
t.Errorf("On iteration %v, FakeBuffer() => BufferedTransactionsSuccessful got: %v; want: 0",
i, bufferedTransactionsSuccessful.Get())
}
}

// signal to all the buffered calls that they can stop buffering, and wait for them.
for _, c := range controllers {
close(c.done)
}
wg.Wait()

if int(bufferedTransactionsSuccessful.Get()) != *maxBufferSize {
t.Errorf("After all FakeBuffer() calls are done, BufferedTransactionsSuccessful got: %v; want: %v",
bufferedTransactionsSuccessful.Get(), *maxBufferSize)
}
if int(bufferedTransactions.Get()) != 0 {
t.Errorf("After all FakeBuffer() calls are done, BufferedTransactions got: %v; want: %v",
bufferedTransactions.Get(), 0)
}
}

0 comments on commit cd8858e

Please sign in to comment.