Skip to content

Commit

Permalink
peering: add peer health metric (hashicorp#14004)
Browse files Browse the repository at this point in the history
Signed-off-by: acpana <[email protected]>
  • Loading branch information
acpana authored Aug 25, 2022
1 parent fead3c5 commit 30ff2e9
Show file tree
Hide file tree
Showing 8 changed files with 262 additions and 47 deletions.
36 changes: 27 additions & 9 deletions agent/consul/leader_peering.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,18 @@ import (
)

var leaderExportedServicesCountKey = []string{"consul", "peering", "exported_services"}
var leaderHealthyPeeringKey = []string{"consul", "peering", "healthy"}
var LeaderPeeringMetrics = []prometheus.GaugeDefinition{
{
Name: leaderExportedServicesCountKey,
Help: "A gauge that tracks how many services are exported for the peering. " +
"The labels are \"peering\" and, for enterprise, \"partition\". " +
"The labels are \"peer_name\", \"peer_id\" and, for enterprise, \"partition\". " +
"We emit this metric every 9 seconds",
},
{
Name: leaderHealthyPeeringKey,
Help: "A gauge that tracks how if a peering is healthy (1) or not (0). " +
"The labels are \"peer_name\", \"peer_id\" and, for enterprise, \"partition\". " +
"We emit this metric every 9 seconds",
},
}
Expand Down Expand Up @@ -85,13 +92,6 @@ func (s *Server) emitPeeringMetricsOnce(logger hclog.Logger, metricsImpl *metric
}

for _, peer := range peers {
status, found := s.peerStreamServer.StreamStatus(peer.ID)
if !found {
logger.Trace("did not find status for", "peer_name", peer.Name)
continue
}

esc := status.GetExportedServicesCount()
part := peer.Partition
labels := []metrics.Label{
{Name: "peer_name", Value: peer.Name},
Expand All @@ -101,7 +101,25 @@ func (s *Server) emitPeeringMetricsOnce(logger hclog.Logger, metricsImpl *metric
labels = append(labels, metrics.Label{Name: "partition", Value: part})
}

metricsImpl.SetGaugeWithLabels(leaderExportedServicesCountKey, float32(esc), labels)
status, found := s.peerStreamServer.StreamStatus(peer.ID)
if found {
// exported services count metric
esc := status.GetExportedServicesCount()
metricsImpl.SetGaugeWithLabels(leaderExportedServicesCountKey, float32(esc), labels)
}

// peering health metric
if status.NeverConnected {
metricsImpl.SetGaugeWithLabels(leaderHealthyPeeringKey, float32(math.NaN()), labels)
} else {
healthy := status.IsHealthy()
healthyInt := 0
if healthy {
healthyInt = 1
}

metricsImpl.SetGaugeWithLabels(leaderHealthyPeeringKey, float32(healthyInt), labels)
}
}

return nil
Expand Down
32 changes: 32 additions & 0 deletions agent/consul/leader_peering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"io/ioutil"
"math"
"testing"
"time"

Expand Down Expand Up @@ -974,6 +975,7 @@ func TestLeader_PeeringMetrics_emitPeeringMetrics(t *testing.T) {
var (
s2PeerID1 = generateUUID()
s2PeerID2 = generateUUID()
s2PeerID3 = generateUUID()
testContextTimeout = 60 * time.Second
lastIdx = uint64(0)
)
Expand Down Expand Up @@ -1063,6 +1065,24 @@ func TestLeader_PeeringMetrics_emitPeeringMetrics(t *testing.T) {
// mimic tracking exported services
mst2.TrackExportedService(structs.ServiceName{Name: "d-service"})
mst2.TrackExportedService(structs.ServiceName{Name: "e-service"})

// pretend that the hearbeat happened
mst2.TrackRecvHeartbeat()
}

// Simulate a peering that never connects
{
p3 := &pbpeering.Peering{
ID: s2PeerID3,
Name: "my-peer-s4",
PeerID: token.PeerID, // doesn't much matter what these values are
PeerCAPems: token.CA,
PeerServerName: token.ServerName,
PeerServerAddresses: token.ServerAddresses,
}
require.True(t, p3.ShouldDial())
lastIdx++
require.NoError(t, s2.fsm.State().PeeringWrite(lastIdx, &pbpeering.PeeringWriteRequest{Peering: p3}))
}

// set up a metrics sink
Expand Down Expand Up @@ -1092,6 +1112,18 @@ func TestLeader_PeeringMetrics_emitPeeringMetrics(t *testing.T) {
require.True(r, ok, fmt.Sprintf("did not find the key %q", keyMetric2))

require.Equal(r, float32(2), metric2.Value) // for d, e services

keyHealthyMetric2 := fmt.Sprintf("us-west.consul.peering.healthy;peer_name=my-peer-s3;peer_id=%s", s2PeerID2)
healthyMetric2, ok := intv.Gauges[keyHealthyMetric2]
require.True(r, ok, fmt.Sprintf("did not find the key %q", keyHealthyMetric2))

require.Equal(r, float32(1), healthyMetric2.Value)

keyHealthyMetric3 := fmt.Sprintf("us-west.consul.peering.healthy;peer_name=my-peer-s4;peer_id=%s", s2PeerID3)
healthyMetric3, ok := intv.Gauges[keyHealthyMetric3]
require.True(r, ok, fmt.Sprintf("did not find the key %q", keyHealthyMetric3))

require.True(r, math.IsNaN(float64(healthyMetric3.Value)))
})
}

Expand Down
1 change: 1 addition & 0 deletions agent/consul/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server) (*Ser
return s.ForwardGRPC(s.grpcConnPool, info, fn)
},
})
s.peerStreamTracker.SetHeartbeatTimeout(s.peerStreamServer.Config.IncomingHeartbeatTimeout)
s.peerStreamServer.Register(s.externalGRPCServer)

// Initialize internal gRPC server.
Expand Down
8 changes: 4 additions & 4 deletions agent/grpc-external/services/peerstream/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ type Config struct {
// outgoingHeartbeatInterval is how often we send a heartbeat.
outgoingHeartbeatInterval time.Duration

// incomingHeartbeatTimeout is how long we'll wait between receiving heartbeats before we close the connection.
incomingHeartbeatTimeout time.Duration
// IncomingHeartbeatTimeout is how long we'll wait between receiving heartbeats before we close the connection.
IncomingHeartbeatTimeout time.Duration
}

//go:generate mockery --name ACLResolver --inpackage
Expand All @@ -63,8 +63,8 @@ func NewServer(cfg Config) *Server {
if cfg.outgoingHeartbeatInterval == 0 {
cfg.outgoingHeartbeatInterval = defaultOutgoingHeartbeatInterval
}
if cfg.incomingHeartbeatTimeout == 0 {
cfg.incomingHeartbeatTimeout = defaultIncomingHeartbeatTimeout
if cfg.IncomingHeartbeatTimeout == 0 {
cfg.IncomingHeartbeatTimeout = defaultIncomingHeartbeatTimeout
}
return &Server{
Config: cfg,
Expand Down
5 changes: 3 additions & 2 deletions agent/grpc-external/services/peerstream/stream_resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error {

// incomingHeartbeatCtx will complete if incoming heartbeats time out.
incomingHeartbeatCtx, incomingHeartbeatCtxCancel :=
context.WithTimeout(context.Background(), s.incomingHeartbeatTimeout)
context.WithTimeout(context.Background(), s.IncomingHeartbeatTimeout)
// NOTE: It's important that we wrap the call to cancel in a wrapper func because during the loop we're
// re-assigning the value of incomingHeartbeatCtxCancel and we want the defer to run on the last assigned
// value, not the current value.
Expand Down Expand Up @@ -605,7 +605,7 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error {
// They just can't trace the execution properly for some reason (possibly golang/go#29587).
//nolint:govet
incomingHeartbeatCtx, incomingHeartbeatCtxCancel =
context.WithTimeout(context.Background(), s.incomingHeartbeatTimeout)
context.WithTimeout(context.Background(), s.IncomingHeartbeatTimeout)
}

case update := <-subCh:
Expand Down Expand Up @@ -642,6 +642,7 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error {
if err := streamSend(replResp); err != nil {
return fmt.Errorf("failed to push data for %q: %w", update.CorrelationID, err)
}
status.TrackSendSuccess()
}
}
}
Expand Down
52 changes: 33 additions & 19 deletions agent/grpc-external/services/peerstream/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
})
})

var lastSendSuccess time.Time
var lastSendAck, lastSendSuccess time.Time

testutil.RunStep(t, "ack tracked as success", func(t *testing.T) {
ack := &pbpeerstream.ReplicationMessage{
Expand All @@ -587,19 +587,22 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
},
}

lastSendSuccess = it.FutureNow(1)
lastSendAck = time.Date(2000, time.January, 1, 0, 0, 2, 0, time.UTC)
lastSendSuccess = time.Date(2000, time.January, 1, 0, 0, 3, 0, time.UTC)
err := client.Send(ack)
require.NoError(t, err)

expect := Status{
Connected: true,
LastAck: lastSendSuccess,
Connected: true,
LastAck: lastSendAck,
heartbeatTimeout: defaultIncomingHeartbeatTimeout,
LastSendSuccess: lastSendSuccess,
}

retry.Run(t, func(r *retry.R) {
status, ok := srv.StreamStatus(testPeerID)
rStatus, ok := srv.StreamStatus(testPeerID)
require.True(r, ok)
require.Equal(r, expect, status)
require.Equal(r, expect, rStatus)
})
})

Expand All @@ -621,23 +624,26 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
},
}

lastNack = it.FutureNow(1)
lastSendAck = time.Date(2000, time.January, 1, 0, 0, 4, 0, time.UTC)
lastNack = time.Date(2000, time.January, 1, 0, 0, 5, 0, time.UTC)
err := client.Send(nack)
require.NoError(t, err)

lastNackMsg = "client peer was unable to apply resource: bad bad not good"

expect := Status{
Connected: true,
LastAck: lastSendSuccess,
LastNack: lastNack,
LastNackMessage: lastNackMsg,
Connected: true,
LastAck: lastSendAck,
LastNack: lastNack,
LastNackMessage: lastNackMsg,
heartbeatTimeout: defaultIncomingHeartbeatTimeout,
LastSendSuccess: lastSendSuccess,
}

retry.Run(t, func(r *retry.R) {
status, ok := srv.StreamStatus(testPeerID)
rStatus, ok := srv.StreamStatus(testPeerID)
require.True(r, ok)
require.Equal(r, expect, status)
require.Equal(r, expect, rStatus)
})
})

Expand Down Expand Up @@ -694,13 +700,15 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {

expect := Status{
Connected: true,
LastAck: lastSendSuccess,
LastAck: lastSendAck,
LastNack: lastNack,
LastNackMessage: lastNackMsg,
LastRecvResourceSuccess: lastRecvResourceSuccess,
ImportedServices: map[string]struct{}{
api.String(): {},
},
heartbeatTimeout: defaultIncomingHeartbeatTimeout,
LastSendSuccess: lastSendSuccess,
}

retry.Run(t, func(r *retry.R) {
Expand Down Expand Up @@ -753,7 +761,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {

expect := Status{
Connected: true,
LastAck: lastSendSuccess,
LastAck: lastSendAck,
LastNack: lastNack,
LastNackMessage: lastNackMsg,
LastRecvResourceSuccess: lastRecvResourceSuccess,
Expand All @@ -762,6 +770,8 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
ImportedServices: map[string]struct{}{
api.String(): {},
},
heartbeatTimeout: defaultIncomingHeartbeatTimeout,
LastSendSuccess: lastSendSuccess,
}

retry.Run(t, func(r *retry.R) {
Expand All @@ -785,7 +795,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {

expect := Status{
Connected: true,
LastAck: lastSendSuccess,
LastAck: lastSendAck,
LastNack: lastNack,
LastNackMessage: lastNackMsg,
LastRecvResourceSuccess: lastRecvResourceSuccess,
Expand All @@ -795,6 +805,8 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
ImportedServices: map[string]struct{}{
api.String(): {},
},
heartbeatTimeout: defaultIncomingHeartbeatTimeout,
LastSendSuccess: lastSendSuccess,
}

retry.Run(t, func(r *retry.R) {
Expand All @@ -816,7 +828,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
expect := Status{
Connected: false,
DisconnectErrorMessage: lastRecvErrorMsg,
LastAck: lastSendSuccess,
LastAck: lastSendAck,
LastNack: lastNack,
LastNackMessage: lastNackMsg,
DisconnectTime: disconnectTime,
Expand All @@ -827,6 +839,8 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
ImportedServices: map[string]struct{}{
api.String(): {},
},
heartbeatTimeout: defaultIncomingHeartbeatTimeout,
LastSendSuccess: lastSendSuccess,
}

retry.Run(t, func(r *retry.R) {
Expand Down Expand Up @@ -1129,7 +1143,7 @@ func TestStreamResources_Server_DisconnectsOnHeartbeatTimeout(t *testing.T) {

srv, store := newTestServer(t, func(c *Config) {
c.Tracker.SetClock(it.Now)
c.incomingHeartbeatTimeout = 5 * time.Millisecond
c.IncomingHeartbeatTimeout = 5 * time.Millisecond
})

p := writePeeringToBeDialed(t, store, 1, "my-peer")
Expand Down Expand Up @@ -1236,7 +1250,7 @@ func TestStreamResources_Server_KeepsConnectionOpenWithHeartbeat(t *testing.T) {

srv, store := newTestServer(t, func(c *Config) {
c.Tracker.SetClock(it.Now)
c.incomingHeartbeatTimeout = incomingHeartbeatTimeout
c.IncomingHeartbeatTimeout = incomingHeartbeatTimeout
})

p := writePeeringToBeDialed(t, store, 1, "my-peer")
Expand Down
Loading

0 comments on commit 30ff2e9

Please sign in to comment.