diff --git a/server/cache.go b/server/cache.go index f8c6881a82c..5516d1f0e45 100644 --- a/server/cache.go +++ b/server/cache.go @@ -30,8 +30,25 @@ var ( errStoreNotFound = func(storeID uint64) error { return errors.Errorf("store %v not found", storeID) } + errRegionNotFound = func(regionID uint64) error { + return errors.Errorf("region %v not found", regionID) + } + errRegionIsStale = func(region *metapb.Region, origin *metapb.Region) error { + return errors.Errorf("region is stale: region %v origin %v", region, origin) + } ) +func checkStaleRegion(origin *metapb.Region, region *metapb.Region) error { + o := origin.GetRegionEpoch() + e := region.GetRegionEpoch() + + if e.GetVersion() < o.GetVersion() || e.GetConfVer() < o.GetConfVer() { + return errors.Trace(errRegionIsStale(region, origin)) + } + + return nil +} + type storesInfo struct { stores map[uint64]*storeInfo } @@ -98,18 +115,6 @@ func cloneRegion(r *metapb.Region) *metapb.Region { return proto.Clone(r).(*metapb.Region) } -func checkStaleRegion(region *metapb.Region, checkRegion *metapb.Region) error { - epoch := region.GetRegionEpoch() - checkEpoch := checkRegion.GetRegionEpoch() - - if checkEpoch.GetVersion() >= epoch.GetVersion() && - checkEpoch.GetConfVer() >= epoch.GetConfVer() { - return nil - } - - return errors.Errorf("epoch %s is staler than %s", checkEpoch, epoch) -} - type leaders struct { // store id -> region id -> struct{} storeRegions map[uint64]map[uint64]struct{} @@ -379,11 +384,11 @@ func (r *regionsInfo) heartbeat(region *metapb.Region, leaderPeer *metapb.Peer) return resp, nil } -func (r *regionsInfo) getStoreRegionCount(storeID uint64) uint64 { +func (r *regionsInfo) getStoreRegionCount(storeID uint64) int { r.RLock() defer r.RUnlock() - return r.storeRegionCount[storeID] + return int(r.storeRegionCount[storeID]) } func (r *regionsInfo) getStoreLeaderCount(storeID uint64) int { @@ -552,10 +557,22 @@ func (c *clusterInfo) updateRegion(region *regionInfo) { c.regions.updateRegion(region.Region) } +func (c *clusterInfo) getMetaRegions() []*metapb.Region { + return c.regions.getRegions() +} + func (c *clusterInfo) getRegionCount() int { return c.regions.getRegionCount() } +func (c *clusterInfo) getStoreRegionCount(storeID uint64) int { + return c.regions.getStoreRegionCount(storeID) +} + +func (c *clusterInfo) getStoreLeaderCount(storeID uint64) int { + return c.regions.getStoreLeaderCount(storeID) +} + func (c *clusterInfo) randLeaderRegion(storeID uint64) *regionInfo { region := c.regions.randLeaderRegion(storeID) if region == nil { diff --git a/server/cache_test.go b/server/cache_test.go index 8c40369bef5..69341861679 100644 --- a/server/cache_test.go +++ b/server/cache_test.go @@ -85,6 +85,36 @@ func newTestRegions(n, np uint64) []*regionInfo { return regions } +func checkRegion(c *C, a *regionInfo, b *regionInfo) { + c.Assert(a.Region, DeepEquals, b.Region) + c.Assert(a.Leader, DeepEquals, b.Leader) +} + +func checkRegions(c *C, cache *regionsInfo, regions []*regionInfo) { + regionCount := make(map[uint64]int) + leaderCount := make(map[uint64]int) + for _, region := range regions { + for _, peer := range region.Peers { + regionCount[peer.StoreId]++ + if peer.Id == region.Leader.Id { + leaderCount[peer.StoreId]++ + } + } + } + + c.Assert(cache.getRegionCount(), Equals, len(regions)) + for id, count := range regionCount { + c.Assert(cache.getStoreRegionCount(id), Equals, count) + } + for id, count := range leaderCount { + c.Assert(cache.getStoreLeaderCount(id), Equals, count) + } + + for _, region := range cache.getRegions() { + c.Assert(region, DeepEquals, regions[region.GetId()].Region) + } +} + var _ = Suite(&testClusterInfoSuite{}) type testClusterInfoSuite struct{} @@ -126,6 +156,88 @@ func (s *testClusterInfoSuite) TestStoreHeartbeat(c *C) { c.Assert(cache.getStoreCount(), Equals, int(n)) } +func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) { + n, np := uint64(3), uint64(3) + cache := newClusterInfo(newMockIDAllocator()) + regions := newTestRegions(n, np) + + for i, region := range regions { + // region does not exist. + _, err := cache.handleRegionHeartbeat(region) + c.Assert(err, IsNil) + checkRegions(c, cache.regions, regions[0:i+1]) + + // region is the same, not updated. + _, err = cache.handleRegionHeartbeat(region) + c.Assert(err, IsNil) + checkRegions(c, cache.regions, regions[0:i+1]) + + epoch := region.clone().GetRegionEpoch() + + // region is updated. + region.RegionEpoch = &metapb.RegionEpoch{ + Version: epoch.GetVersion() + 1, + } + _, err = cache.handleRegionHeartbeat(region) + c.Assert(err, IsNil) + checkRegions(c, cache.regions, regions[0:i+1]) + + // region is stale (Version). + stale := region.clone() + stale.RegionEpoch = &metapb.RegionEpoch{ + ConfVer: epoch.GetConfVer() + 1, + } + _, err = cache.handleRegionHeartbeat(stale) + c.Assert(err, NotNil) + checkRegions(c, cache.regions, regions[0:i+1]) + + // region is updated. + region.RegionEpoch = &metapb.RegionEpoch{ + Version: epoch.GetVersion() + 1, + ConfVer: epoch.GetConfVer() + 1, + } + _, err = cache.handleRegionHeartbeat(region) + c.Assert(err, IsNil) + checkRegions(c, cache.regions, regions[0:i+1]) + + // region is stale (ConfVer). + stale = region.clone() + stale.RegionEpoch = &metapb.RegionEpoch{ + Version: epoch.GetVersion() + 1, + } + _, err = cache.handleRegionHeartbeat(stale) + c.Assert(err, NotNil) + checkRegions(c, cache.regions, regions[0:i+1]) + } +} + +var _ = Suite(&testClusterUtilSuite{}) + +type testClusterUtilSuite struct{} + +func (s *testClusterUtilSuite) TestCheckStaleRegion(c *C) { + // (0, 0) v.s. (0, 0) + region := newRegion([]byte{}, []byte{}) + origin := newRegion([]byte{}, []byte{}) + c.Assert(checkStaleRegion(region, origin), IsNil) + c.Assert(checkStaleRegion(origin, region), IsNil) + + // (1, 0) v.s. (0, 0) + region.RegionEpoch.Version++ + c.Assert(checkStaleRegion(origin, region), IsNil) + c.Assert(checkStaleRegion(region, origin), NotNil) + + // (1, 1) v.s. (0, 0) + region.RegionEpoch.ConfVer++ + c.Assert(checkStaleRegion(origin, region), IsNil) + c.Assert(checkStaleRegion(region, origin), NotNil) + + // (0, 1) v.s. (0, 0) + region.RegionEpoch.Version-- + c.Assert(checkStaleRegion(origin, region), IsNil) + c.Assert(checkStaleRegion(region, origin), NotNil) +} + var _ = Suite(&testClusterCacheSuite{}) type testClusterCacheSuite struct { @@ -323,7 +435,7 @@ func randRegions(count int) []*metapb.Region { } func checkStoreRegionCount(c *C, r *regionsInfo, regions []*metapb.Region) { - stores := make(map[uint64]uint64) + stores := make(map[uint64]int) for _, region := range regions { for _, peer := range region.GetPeers() { stores[peer.GetStoreId()]++ diff --git a/server/cluster.go b/server/cluster.go index 801ff95d001..eebb43cdc6c 100644 --- a/server/cluster.go +++ b/server/cluster.go @@ -30,9 +30,6 @@ import ( var ( errClusterNotBootstrapped = errors.New("cluster is not bootstrapped") - errRegionNotFound = func(regionID uint64) error { - return errors.Errorf("region %v not found", regionID) - } ) const ( @@ -352,22 +349,25 @@ func (c *RaftCluster) cacheAllRegions() error { } func (c *RaftCluster) getRegion(regionKey []byte) (*metapb.Region, *metapb.Peer) { - return c.cachedCluster.regions.getRegion(regionKey) + region := c.cachedCluster.searchRegion(regionKey) + if region == nil { + return nil, nil + } + return region.Region, region.Leader } // GetRegionByID gets region and leader peer by regionID from cluster. func (c *RaftCluster) GetRegionByID(regionID uint64) (*metapb.Region, *metapb.Peer) { - return c.cachedCluster.regions.getRegionByID(regionID) -} - -// GetRegion returns the region from cluster. -func (c *RaftCluster) getRegionByID(regionID uint64) *regionInfo { - return c.cachedCluster.getRegion(regionID) + region := c.cachedCluster.getRegion(regionID) + if region == nil { + return nil, nil + } + return region.Region, region.Leader } // GetRegions gets regions from cluster. func (c *RaftCluster) GetRegions() []*metapb.Region { - return c.cachedCluster.regions.getRegions() + return c.cachedCluster.getMetaRegions() } // GetStores gets stores from cluster. @@ -500,7 +500,7 @@ func (c *RaftCluster) checkStores() { if store.GetState() != metapb.StoreState_Offline { continue } - if cluster.regions.getStoreRegionCount(store.GetId()) == 0 { + if cluster.getStoreRegionCount(store.GetId()) == 0 { err := c.BuryStore(store.GetId(), false) if err != nil { log.Errorf("bury store %v failed: %v", store, err) @@ -627,7 +627,7 @@ func (c *RaftCluster) putConfig(meta *metapb.Cluster) error { // NewAddPeerOperator creates an operator to add a peer to the region. // If storeID is 0, it will be chosen according to the balance rules. func (c *RaftCluster) NewAddPeerOperator(regionID uint64, storeID uint64) (Operator, error) { - region := c.getRegionByID(regionID) + region := c.cachedCluster.getRegion(regionID) if region == nil { return nil, errRegionNotFound(regionID) } @@ -679,7 +679,7 @@ func (c *RaftCluster) NewRemovePeerOperator(regionID uint64, peerID uint64) (Ope // SetAdminOperator sets the balance operator of the region. func (c *RaftCluster) SetAdminOperator(regionID uint64, ops []Operator) error { - region := c.getRegionByID(regionID) + region := c.cachedCluster.getRegion(regionID) if region == nil { return errRegionNotFound(regionID) } diff --git a/server/cluster_test.go b/server/cluster_test.go index 359ff92cd9f..720a7546616 100644 --- a/server/cluster_test.go +++ b/server/cluster_test.go @@ -457,7 +457,7 @@ func (s *testClusterSuite) testCheckStores(c *C, conn net.Conn, clusterID uint64 regionInfo := newRegionInfo(region, leader) _, err := cluster.handleRegionHeartbeat(regionInfo) c.Assert(err, IsNil) - c.Assert(cluster.cachedCluster.regions.getStoreRegionCount(store.GetId()), Equals, 1) + c.Assert(cluster.cachedCluster.getStoreRegionCount(store.GetId()), Equals, 1) // store is up w/ region peers will not be buried. cluster.checkStores() @@ -479,7 +479,7 @@ func (s *testClusterSuite) testCheckStores(c *C, conn net.Conn, clusterID uint64 region.Peers = []*metapb.Peer{leader} _, err = cluster.handleRegionHeartbeat(regionInfo) c.Assert(err, IsNil) - c.Assert(cluster.cachedCluster.regions.getStoreRegionCount(store.GetId()), Equals, 0) + c.Assert(cluster.cachedCluster.getStoreRegionCount(store.GetId()), Equals, 0) // store is offline w/o region peers will be buried. cluster.checkStores() diff --git a/server/region_test.go b/server/region_test.go index 693e504c889..d16b5b43ee1 100644 --- a/server/region_test.go +++ b/server/region_test.go @@ -94,9 +94,9 @@ func (s *testRegionSuite) TestRegionTree(c *C) { c.Assert(tree.search([]byte("a")), IsNil) - regionA := newRegionItem([]byte("a"), []byte("b")).region - regionC := newRegionItem([]byte("c"), []byte("d")).region - regionD := newRegionItem([]byte("d"), []byte{}).region + regionA := newRegion([]byte("a"), []byte("b")) + regionC := newRegion([]byte("c"), []byte("d")) + regionD := newRegion([]byte("d"), []byte{}) tree.insert(regionA) tree.insert(regionC) @@ -119,11 +119,14 @@ func (s *testRegionSuite) TestRegionTree(c *C) { c.Assert(tree.search([]byte("e")), Equals, regionD) } -func newRegionItem(start, end []byte) *regionItem { - return ®ionItem{ - region: &metapb.Region{ - StartKey: start, - EndKey: end, - }, +func newRegion(start, end []byte) *metapb.Region { + return &metapb.Region{ + StartKey: start, + EndKey: end, + RegionEpoch: &metapb.RegionEpoch{}, } } + +func newRegionItem(start, end []byte) *regionItem { + return ®ionItem{region: newRegion(start, end)} +}