Skip to content

Commit

Permalink
store/tikv: pre-split regions during 2PC to avoid hotspot (pingcap#16920
Browse files Browse the repository at this point in the history
)
  • Loading branch information
tiancaiamao authored Apr 30, 2020
1 parent c0814f0 commit 2f9a487
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 1 deletion.
66 changes: 66 additions & 0 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,9 @@ func txnLockTTL(startTime time.Time, txnSize int) uint64 {
return lockTTL + uint64(elapsed)
}

var preSplitDetectThreshold uint32 = 100000
var preSplitSizeThreshold uint32 = 32 << 20

// doActionOnMutations groups keys into primary batch and secondary batches, if primary batch exists in the key,
// it does action on primary batch first, then on secondary batches. If action is commit, secondary batches
// is done in background goroutine.
Expand All @@ -435,6 +438,68 @@ func (c *twoPhaseCommitter) doActionOnMutations(bo *Backoffer, action twoPhaseCo
return errors.Trace(err)
}

// Pre-split regions to avoid too much write workload into a single region.
// In the large transaction case, this operation is important to avoid TiKV 'server is busy' error.
var preSplited bool
preSplitDetectThresholdVal := atomic.LoadUint32(&preSplitDetectThreshold)
for _, group := range groups {
if uint32(group.mutations.len()) >= preSplitDetectThresholdVal {
logutil.BgLogger().Info("2PC detect large amount of mutations on a single region",
zap.Uint64("region", group.region.GetID()),
zap.Int("mutations count", group.mutations.len()))
// Use context.Background, this time should not add up to Backoffer.
if preSplitAndScatterIn2PC(context.Background(), c.store, group) {
preSplited = true
}
}
}
// Reload region cache again.
if preSplited {
groups, err = c.store.regionCache.GroupSortedMutationsByRegion(bo, mutations)
if err != nil {
return errors.Trace(err)
}
}

return c.doActionOnGroupMutations(bo, action, groups)
}

func preSplitAndScatterIn2PC(ctx context.Context, store *tikvStore, group groupedMutations) bool {
length := group.mutations.len()
splitKeys := make([][]byte, 0, 4)

preSplitSizeThresholdVal := atomic.LoadUint32(&preSplitSizeThreshold)
regionSize := 0
for i := 0; i < length; i++ {
regionSize = regionSize + len(group.mutations.keys[i]) + len(group.mutations.values[i])
// The second condition is used for testing.
if regionSize >= int(preSplitSizeThresholdVal) {
regionSize = 0
splitKeys = append(splitKeys, group.mutations.keys[i])
}
}
if len(splitKeys) == 0 {
return false
}

regionIDs, err := store.SplitRegions(ctx, splitKeys, true)
if err != nil {
logutil.BgLogger().Warn("2PC split regions failed", zap.Uint64("regionID", group.region.id), zap.Int("keys count", length), zap.Error(err))
return false
}

for _, regionID := range regionIDs {
err := store.WaitScatterRegionFinish(regionID, 0)
if err != nil {
logutil.BgLogger().Warn("2PC wait scatter region failed", zap.Uint64("regionID", regionID), zap.Error(err))
}
}
// Invalidate the old region cache information.
store.regionCache.InvalidateCachedRegion(group.region)
return true
}

func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *Backoffer, action twoPhaseCommitAction, groups []groupedMutations) error {
action.tiKVTxnRegionsNumHistogram().Observe(float64(len(groups)))

var batches []batchMutations
Expand Down Expand Up @@ -467,6 +532,7 @@ func (c *twoPhaseCommitter) doActionOnMutations(bo *Backoffer, action twoPhaseCo
_, actionIsCleanup := action.(actionCleanup)
_, actionIsPessimiticLock := action.(actionPessimisticLock)

var err error
failpoint.Inject("skipKeyReturnOK", func(val failpoint.Value) {
valStr, ok := val.(string)
if ok && c.connID > 0 {
Expand Down
42 changes: 41 additions & 1 deletion store/tikv/2pc_slow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@

package tikv

import . "github.com/pingcap/check"
import (
"context"
"sync/atomic"

. "github.com/pingcap/check"
)

// TestCommitMultipleRegions tests commit multiple regions.
// The test takes too long under the race detector.
Expand All @@ -35,3 +40,38 @@ func (s *testCommitterSuite) TestCommitMultipleRegions(c *C) {
}
s.mustCommit(c, m)
}

func (s *testTiclientSuite) TestSplitRegionIn2PC(c *C) {
const preSplitThresholdInTest = 500
old := atomic.LoadUint32(&preSplitDetectThreshold)
defer atomic.StoreUint32(&preSplitDetectThreshold, old)
atomic.StoreUint32(&preSplitDetectThreshold, preSplitThresholdInTest)

old = atomic.LoadUint32(&preSplitSizeThreshold)
defer atomic.StoreUint32(&preSplitSizeThreshold, old)
atomic.StoreUint32(&preSplitSizeThreshold, 5000)

bo := NewBackoffer(context.Background(), 1)
startKey := encodeKey(s.prefix, s08d("key", 0))
endKey := encodeKey(s.prefix, s08d("key", preSplitThresholdInTest))
checkKeyRegion := func(bo *Backoffer, start, end []byte, checker Checker) {
// Check regions after split.
loc1, err := s.store.regionCache.LocateKey(bo, start)
c.Assert(err, IsNil)
loc2, err := s.store.regionCache.LocateKey(bo, end)
c.Assert(err, IsNil)
c.Assert(loc1.Region.id, checker, loc2.Region.id)
}

// Check before test.
checkKeyRegion(bo, startKey, endKey, Equals)
txn := s.beginTxn(c)
for i := 0; i < preSplitThresholdInTest; i++ {
err := txn.Set(encodeKey(s.prefix, s08d("key", i)), valueBytes(i))
c.Assert(err, IsNil)
}
err := txn.Commit(context.Background())
c.Assert(err, IsNil)
// Check region split after test.
checkKeyRegion(bo, startKey, endKey, Not(Equals))
}

0 comments on commit 2f9a487

Please sign in to comment.