forked from vitessio/vitess
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request vitessio#1560 from aaijazi/aaijazi_fake_tx_buffer
Add logic for fake transaction buffering in VTGate.
- Loading branch information
Showing
4 changed files
with
289 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
// Copyright 2015, Google Inc. All rights reserved. | ||
// Use of this source code is governed by a BSD_style | ||
// license that can be found in the LICENSE file. | ||
|
||
/* | ||
Package txbuffer contains experimental logic to buffer transactions in VTGate. | ||
Only the Begin statement of transactions will be buffered. | ||
The reason why it might be useful to buffer transactions is during failovers: | ||
the master vttablet can become unavailable for a few seconds. Upstream clients | ||
(e.g., web workers) might not retry on failures, and instead may prefer for VTGate to wait for | ||
a few seconds for the failover to complete. Thiis will block upstream callers for that time, | ||
but will not return transient errors during the buffering time. | ||
*/ | ||
package txbuffer | ||
|
||
import ( | ||
"flag" | ||
"sync" | ||
"time" | ||
|
||
"github.com/youtube/vitess/go/stats" | ||
) | ||
|
||
var ( | ||
enableFakeTxBuffer = flag.Bool("enable_fake_tx_buffer", false, "Enable fake transaction buffering.") | ||
bufferKeyspace = flag.String("buffer_keyspace", "", "The name of the keyspace to buffer transactions on.") | ||
bufferShard = flag.String("buffer_shard", "", "The name of the shard to buffer transactions on.") | ||
maxBufferSize = flag.Int("max_buffer_size", 10, "The maximum number of transactions to buffer at a time.") | ||
fakeBufferDelay = flag.Duration("fake_buffer_delay", 1*time.Second, "The amount of time that we should delay all transactions for, to fake a transaction buffer.") | ||
|
||
bufferedTransactionsAttempted = stats.NewInt("BufferedTransactionsAttempted") | ||
bufferedTransactionsSuccessful = stats.NewInt("BufferedTransactionsSuccessful") | ||
// Use this lock when adding to the number of currently buffered transactions. | ||
bufferMu sync.Mutex | ||
bufferedTransactions = stats.NewInt("BufferedTransactions") | ||
) | ||
|
||
// timeSleep can be mocked out in unit tests | ||
var timeSleep = time.Sleep | ||
|
||
// FakeBuffer will pretend to buffer new transactions in VTGate. | ||
// Transactions *will NOT actually be buffered*, they will just have a delayed start time. | ||
// This can be useful to understand what the impact of trasaction buffering will be | ||
// on upstream callers. Once the impact is measured, it can be used to tweak parameter values | ||
// for the best behavior. | ||
// FakeBuffer should be called before the VtTablet Begin, otherwise it will increase transaction times. | ||
func FakeBuffer(keyspace, shard string, attemptNumber int) { | ||
// Only buffer on the first Begin attempt, not on possible retries. | ||
if !*enableFakeTxBuffer || attemptNumber != 0 { | ||
return | ||
} | ||
if keyspace != *bufferKeyspace || shard != *bufferShard { | ||
return | ||
} | ||
bufferedTransactionsAttempted.Add(1) | ||
|
||
bufferMu.Lock() | ||
if int(bufferedTransactions.Get()) >= *maxBufferSize { | ||
bufferMu.Unlock() | ||
return | ||
} | ||
bufferedTransactions.Add(1) | ||
bufferMu.Unlock() | ||
|
||
defer bufferedTransactionsSuccessful.Add(1) | ||
timeSleep(*fakeBufferDelay) | ||
// Don't need to lock for this, as there's no race when decrementing the count | ||
bufferedTransactions.Add(-1) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,209 @@ | ||
// Copyright 2015, Google Inc. All rights reserved. | ||
// Use of this source code is governed by a BSD-style | ||
// license that can be found in the LICENSE file. | ||
|
||
package txbuffer | ||
|
||
import ( | ||
"sync" | ||
"testing" | ||
"time" | ||
) | ||
|
||
// fakeSleepController is used to control a fake sleepFunc | ||
type fakeSleepController struct { | ||
called bool | ||
block bool | ||
// block until the done channel if closed, if configured to do so. | ||
done chan struct{} | ||
} | ||
|
||
type sleepFunc func(d time.Duration) | ||
|
||
// createFakeSleep creates a function that can be called to fake sleeping. | ||
// The created fake is managed by the passed in fakeSleepController. | ||
func createFakeSleep(c *fakeSleepController) sleepFunc { | ||
return func(d time.Duration) { | ||
c.called = true | ||
if !c.block { | ||
return | ||
} | ||
select { | ||
case <-c.done: | ||
return | ||
} | ||
} | ||
} | ||
|
||
func TestFakeBuffer(t *testing.T) { | ||
unbufferedKeyspace := "ukeyspace" | ||
unbufferedShard := "80-" | ||
bufferedKeyspace := "bkeyspace" | ||
bufferedShard := "-80" | ||
|
||
*bufferKeyspace = bufferedKeyspace | ||
*bufferShard = bufferedShard | ||
|
||
for _, test := range []struct { | ||
desc string | ||
enableFakeBuffer bool | ||
keyspace string | ||
shard string | ||
attemptNumber int | ||
bufferedTransactions int | ||
// was this transaction buffered? | ||
wantCalled bool | ||
// expected value of BufferedTransactionAttempts | ||
wantAttempted int | ||
}{ | ||
{ | ||
desc: "enableFakeBuffer=False", | ||
enableFakeBuffer: false, | ||
}, | ||
{ | ||
desc: "attemptNumber != 0", | ||
enableFakeBuffer: true, | ||
attemptNumber: 1, | ||
}, | ||
{ | ||
desc: "unbuffered keyspace", | ||
enableFakeBuffer: true, | ||
keyspace: unbufferedKeyspace, | ||
shard: bufferedShard, | ||
}, | ||
{ | ||
desc: "unbuffered shard", | ||
enableFakeBuffer: true, | ||
keyspace: bufferedKeyspace, | ||
shard: unbufferedShard, | ||
}, | ||
{ | ||
desc: "buffer full", | ||
enableFakeBuffer: true, | ||
keyspace: bufferedKeyspace, | ||
shard: bufferedShard, | ||
bufferedTransactions: *maxBufferSize, | ||
// When the buffer is full, bufferedTransactionsAttempted should still be incremented | ||
wantAttempted: 1, | ||
}, | ||
{ | ||
desc: "buffered successful", | ||
enableFakeBuffer: true, | ||
keyspace: bufferedKeyspace, | ||
shard: bufferedShard, | ||
wantCalled: true, | ||
wantAttempted: 1, | ||
}, | ||
} { | ||
controller := &fakeSleepController{} | ||
timeSleep = createFakeSleep(controller) | ||
// reset counters | ||
bufferedTransactionsAttempted.Set(0) | ||
bufferedTransactionsSuccessful.Set(0) | ||
bufferedTransactions.Set(int64(test.bufferedTransactions)) | ||
|
||
*enableFakeTxBuffer = test.enableFakeBuffer | ||
|
||
FakeBuffer(test.keyspace, test.shard, test.attemptNumber) | ||
|
||
if controller.called != test.wantCalled { | ||
t.Errorf("With %v, FakeBuffer() => timeSleep.called: %v; want: %v", | ||
test.desc, controller.called, test.wantCalled) | ||
} | ||
|
||
if bufferedTransactionsAttempted.Get() != int64(test.wantAttempted) { | ||
t.Errorf("With %v, FakeBuffer() => BufferedTransactionsAttempted got: %v; want: %v", | ||
test.desc, bufferedTransactionsAttempted.Get(), test.wantAttempted) | ||
} | ||
|
||
if (!test.wantCalled && (bufferedTransactionsSuccessful.Get() == 1)) || | ||
(test.wantCalled && (bufferedTransactionsSuccessful.Get() != 1)) { | ||
t.Errorf("With %v, FakeBuffer() => BufferedTransactionsSuccessful got: %v; want: 1", | ||
test.desc, bufferedTransactionsSuccessful.Get()) | ||
} | ||
} | ||
} | ||
|
||
// min for ints | ||
func min(x, y int) int { | ||
if x < y { | ||
return x | ||
} | ||
return y | ||
} | ||
|
||
func TestParallelFakeBuffer(t *testing.T) { | ||
bufferedKeyspace := "bkeyspace" | ||
bufferedShard := "-80" | ||
|
||
*bufferKeyspace = bufferedKeyspace | ||
*bufferShard = bufferedShard | ||
*enableFakeTxBuffer = true | ||
|
||
// reset counters | ||
bufferedTransactionsAttempted.Set(0) | ||
bufferedTransactionsSuccessful.Set(0) | ||
|
||
var controllers []*fakeSleepController | ||
var wg sync.WaitGroup | ||
|
||
for i := 1; i <= *maxBufferSize+2; i++ { | ||
controller := &fakeSleepController{ | ||
block: true, | ||
done: make(chan struct{}), | ||
} | ||
timeSleep = createFakeSleep(controller) | ||
// Only the first maxBufferSize calls to FakeBuffer should actually call fakeSleep | ||
wantFakeSleepCalled := (i <= *maxBufferSize) | ||
|
||
wg.Add(1) | ||
go func() { | ||
defer wg.Done() | ||
FakeBuffer(*bufferKeyspace, *bufferShard, 0) | ||
}() | ||
// Give the goroutine some time to run. | ||
// Ideally, we'd use a channel here to indicate when the fake sleep starts blocking. | ||
// However, we can't do that because for some of the goroutines, the fake sleep is never | ||
// even called. | ||
time.Sleep(10 * time.Microsecond) | ||
|
||
if controller.called { | ||
controllers = append(controllers, controller) | ||
} | ||
|
||
if controller.called != wantFakeSleepCalled { | ||
t.Errorf("On iteration %v, FakeBuffer() => timeSleep.called: %v; want: %v", | ||
i, controller.called, wantFakeSleepCalled) | ||
} | ||
|
||
if int(bufferedTransactionsAttempted.Get()) != i { | ||
t.Errorf("On iteration %v, FakeBuffer() => BufferedTransactionsAttempted got: %v; want: %v", | ||
i, bufferedTransactionsAttempted.Get(), i) | ||
} | ||
|
||
if int(bufferedTransactions.Get()) != min(i, *maxBufferSize) { | ||
t.Errorf("On iteration %v, FakeBuffer() => BufferedTransactions got: %v; want: %v", | ||
i, bufferedTransactions.Get(), min(i, *maxBufferSize)) | ||
} | ||
|
||
if int(bufferedTransactionsSuccessful.Get()) != 0 { | ||
t.Errorf("On iteration %v, FakeBuffer() => BufferedTransactionsSuccessful got: %v; want: 0", | ||
i, bufferedTransactionsSuccessful.Get()) | ||
} | ||
} | ||
|
||
// signal to all the buffered calls that they can stop buffering, and wait for them. | ||
for _, c := range controllers { | ||
close(c.done) | ||
} | ||
wg.Wait() | ||
|
||
if int(bufferedTransactionsSuccessful.Get()) != *maxBufferSize { | ||
t.Errorf("After all FakeBuffer() calls are done, BufferedTransactionsSuccessful got: %v; want: %v", | ||
bufferedTransactionsSuccessful.Get(), *maxBufferSize) | ||
} | ||
if int(bufferedTransactions.Get()) != 0 { | ||
t.Errorf("After all FakeBuffer() calls are done, BufferedTransactions got: %v; want: %v", | ||
bufferedTransactions.Get(), 0) | ||
} | ||
} |