Skip to content

Commit

Permalink
fix transfer leader bugs and add metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangxu19830126 committed May 8, 2019
1 parent b81537f commit 09546f5
Show file tree
Hide file tree
Showing 10 changed files with 27 additions and 27 deletions.
5 changes: 2 additions & 3 deletions cmd/bench/main.go → cmd/test/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func main() {
flag.Parse()
log.InitLog()

for i := 0; i < 1; i++ {
for i := 0; i < 10; i++ {
go func(idx int) {
res := fmt.Sprintf("res-%d", idx)
name := fmt.Sprintf("test-%d", idx)
Expand All @@ -49,7 +49,7 @@ func main() {
var bids []uint64

b := c.CreateBatch()
for j := 0; j < 1; j++ {
for j := 0; j < 8; j++ {
b.CreateGlobal(name, time.Second*30)
}

Expand Down Expand Up @@ -101,7 +101,6 @@ func main() {
}

log.Infof("commit global complete")
return
}
}(i)
}
Expand Down
2 changes: 0 additions & 2 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,6 @@ func (c *Client) sentTask(addr string, q *task.Queue) {
if !cc.hasCallback() || cc.batch {
err := conn.WriteAndFlush(c.buildRPCMessage(cc.id, cc.msg))
if err != nil {
// TODO: remove
log.Fatalf("write failed %+v", err)
c.addToSendWithExclude(cc, addr)
available = false
conn.Close()
Expand Down
2 changes: 1 addition & 1 deletion pkg/core/coordinator_cell.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func NewCellTransactionCoordinator(id, peerID uint64, trans transport.Transport,
tc.peerID = peerID
tc.gidKey = gCellKey(id)
tc.manualKey = manualCellKey(id)
tc.cmds = task.NewRingBuffer(uint64(tc.opts.concurrency) * 4)
tc.cmds = task.NewRingBuffer(uint64(tc.opts.concurrency) * 64)
tc.trans = trans
tc.cell = tc.opts.cell
ctx, cancel := context.WithCancel(context.Background())
Expand Down
8 changes: 1 addition & 7 deletions pkg/core/coordinator_cell_do.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,7 @@ func (tc *cellTransactionCoordinator) doRegistryGlobalTransaction(g *meta.Global
tc.doAddG(g)

if g.Action != meta.NoneAction {
c := acquireCMD()
c.cmdType = cmdGComplete
c.gid = g.ID
err := tc.cmds.Put(c)
if err != nil {
return err
}
tc.doComplete(g.ID)
} else {
err := tc.calcGTimeout(g)
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions pkg/metrics/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,10 @@ const (
// PhaseTwo phase two
PhaseTwo = "two"
)

const (
// RoleLeader leader fragment
RoleLeader = "leader"
// RoleFollower follower fragment
RoleFollower = "follower"
)
3 changes: 3 additions & 0 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,7 @@ func init() {
prometheus.Register(GDurationHistogram)
prometheus.Register(ActionBCounter)
prometheus.Register(BDurationHistogram)

prometheus.Register(FragmentGauge)
prometheus.Register(FragmentPeersGauge)
}
17 changes: 4 additions & 13 deletions pkg/metrics/sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,14 @@ import (
)

var (
// FragmentCounter fragment count
FragmentCounter = prometheus.NewCounter(
prometheus.CounterOpts{
// FragmentGauge fragment count
FragmentGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "taas",
Subsystem: "sharding",
Name: "fragment_total",
Help: "Total number of Fragment.",
})

// FragmentLeaderCounter fragment count
FragmentLeaderCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "taas",
Subsystem: "sharding",
Name: "leader_total",
Help: "Total number of Fragment leader.",
})
}, []string{"role"})

// FragmentPeersGauge fragment peers value
FragmentPeersGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Expand Down
2 changes: 1 addition & 1 deletion pkg/proxy/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type router struct {
stores map[uint64]*meta.StoreMeta
frags map[uint64]*meta.Fragment
availableFrags []uint64
leaders map[uint64]uint64
leaders map[uint64]uint64 // fid->peer id
opts map[uint64]uint64
transports map[uint64]*transport
initC chan struct{}
Expand Down
1 change: 1 addition & 0 deletions pkg/sharding/peer_replicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func (pr *PeerReplicate) collectDownPeers(maxDuration time.Duration) []*prophet.

func (pr *PeerReplicate) becomeLeader() {
pr.stopTasks(true)
// TODO: too many concurrency
pr.addTask(pr.startHB)
pr.addTask(pr.startCheckConcurrency)
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/sharding/prophet_adapter.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package sharding

import (
"fmt"
"time"

"github.com/fagongzi/log"
"github.com/fagongzi/util/json"
"github.com/infinivision/prophet"
"github.com/infinivision/taas/pkg/meta"
"github.com/infinivision/taas/pkg/metrics"
"github.com/infinivision/taas/pkg/util"
)

Expand Down Expand Up @@ -69,6 +71,9 @@ func (pa *ProphetAdapter) FetchContainerHB() *prophet.ContainerHeartbeatReq {
req.LeaderCount = st.fragmentLeaderCount
req.ReplicaCount = st.fragmentCount
req.Busy = false

metrics.FragmentGauge.WithLabelValues(metrics.RoleLeader).Set(float64(st.fragmentLeaderCount))
metrics.FragmentGauge.WithLabelValues(metrics.RoleFollower).Set(float64(st.fragmentCount - st.fragmentLeaderCount))
return req
}

Expand Down Expand Up @@ -226,6 +231,8 @@ func getResourceHB(pr *PeerReplicate) *prophet.ResourceHeartbeatReq {
req.LeaderPeer = &pr.peer
req.PendingPeers = pr.collectPendingPeers()
req.DownPeers = pr.collectDownPeers(pr.store.cfg.MaxPeerDownDuration)

metrics.FragmentPeersGauge.WithLabelValues(fmt.Sprintf("%d", pr.id)).Set(float64(len(pr.frag.Peers)))
return req
}

Expand Down

0 comments on commit 09546f5

Please sign in to comment.