Skip to content

Commit

Permalink
Merge branch 'master' into vtgate
Browse files Browse the repository at this point in the history
  • Loading branch information
guoliang100 committed Mar 30, 2016
2 parents fbeb682 + 6bec606 commit e5d45a8
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 34 deletions.
76 changes: 58 additions & 18 deletions go/vt/vtgate/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,22 +128,60 @@ type GatewayEndPointCacheStatus struct {
AvgLatency float64 // in milliseconds
}

const (
aggrChanSize = 10000
)

var (
// aggrChan buffers queryInfo objects to be processed.
aggrChan chan *queryInfo
// muAggr protects below vars.
muAggr sync.Mutex
// aggregators holds all Aggregators created.
aggregators []*GatewayEndPointStatusAggregator
// gatewayStatsChanFull tracks the number of times
// aggrChan becomes full.
gatewayStatsChanFull *stats.Int
)

func init() {
// init global goroutines to aggregate stats.
aggrChan = make(chan *queryInfo, aggrChanSize)
gatewayStatsChanFull = stats.NewInt("GatewayStatsChanFullCount")
go resetAggregators()
go processQueryInfo()
}

// registerAggregator registers an aggregator to the global list.
func registerAggregator(a *GatewayEndPointStatusAggregator) {
muAggr.Lock()
defer muAggr.Unlock()
aggregators = append(aggregators, a)
}

// resetAggregators resets the next stats slot for all aggregators every second.
func resetAggregators() {
ticker := time.NewTicker(time.Second)
for range ticker.C {
muAggr.Lock()
for _, a := range aggregators {
a.resetNextSlot()
}
muAggr.Unlock()
}
}

// processQueryInfo processes the next queryInfo object.
func processQueryInfo() {
for qi := range aggrChan {
qi.aggr.processQueryInfo(qi)
}
}

// NewGatewayEndPointStatusAggregator creates a GatewayEndPointStatusAggregator.
func NewGatewayEndPointStatusAggregator() *GatewayEndPointStatusAggregator {
gepsa := &GatewayEndPointStatusAggregator{
qiChan: make(chan *queryInfo, 10000),
}
go func() {
ticker := time.NewTicker(time.Second)
for range ticker.C {
gepsa.resetNextSlot()
}
}()
go func() {
for qi := range gepsa.qiChan {
gepsa.processQueryInfo(qi)
}
}()
gepsa := &GatewayEndPointStatusAggregator{}
registerAggregator(gepsa)
return gepsa
}

Expand All @@ -154,7 +192,6 @@ type GatewayEndPointStatusAggregator struct {
TabletType topodatapb.TabletType
Name string // the alternative name of an endpoint
Addr string // the host:port of an endpoint
qiChan chan *queryInfo

// mu protects below fields.
mu sync.RWMutex
Expand All @@ -167,6 +204,7 @@ type GatewayEndPointStatusAggregator struct {
}

type queryInfo struct {
aggr *GatewayEndPointStatusAggregator
addr string
tabletType topodatapb.TabletType
elapsed time.Duration
Expand All @@ -176,20 +214,24 @@ type queryInfo struct {
// UpdateQueryInfo updates the aggregator with the given information about a query.
func (gepsa *GatewayEndPointStatusAggregator) UpdateQueryInfo(addr string, tabletType topodatapb.TabletType, elapsed time.Duration, hasError bool) {
qi := &queryInfo{
aggr: gepsa,
addr: addr,
tabletType: tabletType,
elapsed: elapsed,
hasError: hasError,
}
select {
case gepsa.qiChan <- qi:
case aggrChan <- qi:
default:
gatewayStatsChanFull.Add(1)
}
}

func (gepsa *GatewayEndPointStatusAggregator) processQueryInfo(qi *queryInfo) {
gepsa.mu.Lock()
defer gepsa.mu.Unlock()
if gepsa.TabletType != qi.tabletType {
gepsa.TabletType = qi.tabletType
// reset counters
gepsa.QueryCount = 0
gepsa.QueryError = 0
Expand All @@ -203,14 +245,12 @@ func (gepsa *GatewayEndPointStatusAggregator) processQueryInfo(qi *queryInfo) {
if qi.addr != "" {
gepsa.Addr = qi.addr
}
gepsa.TabletType = qi.tabletType
gepsa.QueryCount++
gepsa.queryCountInMinute[gepsa.tick]++
gepsa.latencyInMinute[gepsa.tick] += qi.elapsed
if qi.hasError {
gepsa.QueryError++
}
gepsa.mu.Unlock()
}

// GetCacheStatus returns a GatewayEndPointCacheStatus representing the current gateway status.
Expand Down
61 changes: 45 additions & 16 deletions go/vt/vtgate/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,37 @@ func TestGatwayEndPointStatusAggregator(t *testing.T) {
TabletType: topodatapb.TabletType_REPLICA,
Name: "n",
Addr: "a",
qiChan: make(chan *queryInfo, 10000),
}
t.Logf("aggr = GatwayEndPointStatusAggregator{k, s, replica, n, a}")
aggr.UpdateQueryInfo("", topodatapb.TabletType_REPLICA, 10*time.Millisecond, false)
aggr.processQueryInfo(<-aggr.qiChan)
t.Logf("aggr.UpdateQueryInfo(, replica, 10ms, false)")
qi := &queryInfo{
aggr: aggr,
addr: "",
tabletType: topodatapb.TabletType_REPLICA,
elapsed: 10 * time.Millisecond,
hasError: false,
}
aggr.processQueryInfo(qi)
t.Logf("aggr.processQueryInfo(, replica, 10ms, false)")
aggr.resetNextSlot()
t.Logf("aggr.resetNextSlot()")
aggr.UpdateQueryInfo("", topodatapb.TabletType_REPLICA, 8*time.Millisecond, false)
aggr.processQueryInfo(<-aggr.qiChan)
t.Logf("aggr.UpdateQueryInfo(, replica, 8ms, false)")
aggr.UpdateQueryInfo("", topodatapb.TabletType_REPLICA, 3*time.Millisecond, true)
aggr.processQueryInfo(<-aggr.qiChan)
t.Logf("aggr.UpdateQueryInfo(, replica, 3ms, true)")
qi = &queryInfo{
aggr: aggr,
addr: "",
tabletType: topodatapb.TabletType_REPLICA,
elapsed: 8 * time.Millisecond,
hasError: false,
}
aggr.processQueryInfo(qi)
t.Logf("aggr.processQueryInfo(, replica, 8ms, false)")
qi = &queryInfo{
aggr: aggr,
addr: "",
tabletType: topodatapb.TabletType_REPLICA,
elapsed: 3 * time.Millisecond,
hasError: true,
}
aggr.processQueryInfo(qi)
t.Logf("aggr.processQueryInfo(, replica, 3ms, true)")
want := &GatewayEndPointCacheStatus{
Keyspace: "k",
Shard: "s",
Expand All @@ -53,12 +70,24 @@ func TestGatwayEndPointStatusAggregator(t *testing.T) {
aggr.resetNextSlot()
}
t.Logf("59 aggr.resetNextSlot()")
aggr.UpdateQueryInfo("b", topodatapb.TabletType_MASTER, 9*time.Millisecond, false)
aggr.processQueryInfo(<-aggr.qiChan)
t.Logf("aggr.UpdateQueryInfo(b, master, 9ms, false)")
aggr.UpdateQueryInfo("", topodatapb.TabletType_MASTER, 6*time.Millisecond, true)
aggr.processQueryInfo(<-aggr.qiChan)
t.Logf("aggr.UpdateQueryInfo(, master, 6ms, true)")
qi = &queryInfo{
aggr: aggr,
addr: "b",
tabletType: topodatapb.TabletType_MASTER,
elapsed: 9 * time.Millisecond,
hasError: false,
}
aggr.processQueryInfo(qi)
t.Logf("aggr.processQueryInfo(b, master, 9ms, false)")
qi = &queryInfo{
aggr: aggr,
addr: "",
tabletType: topodatapb.TabletType_MASTER,
elapsed: 6 * time.Millisecond,
hasError: true,
}
aggr.processQueryInfo(qi)
t.Logf("aggr.processQueryInfo(, master, 6ms, true)")
want = &GatewayEndPointCacheStatus{
Keyspace: "k",
Shard: "s",
Expand Down

0 comments on commit e5d45a8

Please sign in to comment.