Skip to content

Commit

Permalink
Add oldest log metric to leader (hashicorp#452)
Browse files Browse the repository at this point in the history
Add oldest log metric to leader
  • Loading branch information
banks authored Apr 6, 2021
1 parent 6b4e41b commit f3ecdb6
Show file tree
Hide file tree
Showing 3 changed files with 232 additions and 1 deletion.
68 changes: 68 additions & 0 deletions log.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
package raft

import (
"time"

metrics "github.com/armon/go-metrics"
)

// LogType describes various types of log entries.
type LogType uint8

Expand Down Expand Up @@ -62,6 +68,19 @@ type Log struct {
// trouble, so gating extension behavior via some flag in the client
// program is also a good idea.
Extensions []byte

// AppendedAt stores the time the leader first appended this log to it's
// LogStore. Followers will observe the leader's time. It is not used for
// coordination or as part of the replication protocol at all. It exists only
// to provide operational information for example how many seconds worth of
// logs are present on the leader which might impact follower's ability to
// catch up after restoring a large snapshot. We should never rely on this
// being in the past when appending on a follower or reading a log back since
// the clock skew can mean a follower could see a log with a future timestamp.
// In general too the leader is not required to persist the log before
// delivering to followers although the current implementation happens to do
// this.
AppendedAt time.Time
}

// LogStore is used to provide an interface for storing
Expand All @@ -85,3 +104,52 @@ type LogStore interface {
// DeleteRange deletes a range of log entries. The range is inclusive.
DeleteRange(min, max uint64) error
}

func oldestLog(s LogStore) (Log, error) {
var l Log

// We might get unlucky and have a truncate right between getting first log
// index and fetching it so keep trying until we succeed or hard fail.
var lastFailIdx uint64
var lastErr error
for {
firstIdx, err := s.FirstIndex()
if err != nil {
return l, err
}
if firstIdx == 0 {
return l, ErrLogNotFound
}
if firstIdx == lastFailIdx {
// Got same index as last time around which errored, don't bother trying
// to fetch it again just return the error.
return l, lastErr
}
err = s.GetLog(firstIdx, &l)
if err == nil {
// We found the oldest log, break the loop
break
}
// We failed, keep trying to see if there is a new firstIndex
lastFailIdx = firstIdx
lastErr = err
}
return l, nil
}

func emitLogStoreMetrics(s LogStore, prefix []string, interval time.Duration, stopCh <-chan struct{}) {
for {
select {
case <-time.After(interval):
// In error case emit 0 as the age
ageMs := float32(0.0)
l, err := oldestLog(s)
if err == nil && !l.AppendedAt.IsZero() {
ageMs = float32(time.Since(l.AppendedAt).Milliseconds())
}
metrics.SetGauge(append(prefix, "oldestLogAge"), ageMs)
case <-stopCh:
return
}
}
}
155 changes: 155 additions & 0 deletions log_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package raft

import (
"bytes"
"fmt"
"testing"
"time"

metrics "github.com/armon/go-metrics"
)

func TestOldestLog(t *testing.T) {
cases := []struct {
Name string
Logs []*Log
WantIdx uint64
WantErr bool
}{
{
Name: "empty logs",
Logs: nil,
WantIdx: 0,
WantErr: true,
},
{
Name: "simple case",
Logs: []*Log{
&Log{
Index: 1234,
Term: 1,
},
&Log{
Index: 1235,
Term: 1,
},
&Log{
Index: 1236,
Term: 2,
},
},
WantIdx: 1234,
WantErr: false,
},
}

for _, tc := range cases {
tc := tc
t.Run(tc.Name, func(t *testing.T) {
s := NewInmemStore()
if err := s.StoreLogs(tc.Logs); err != nil {
t.Fatalf("expected store logs not to fail: %s", err)
}

got, err := oldestLog(s)
switch {
case tc.WantErr && err == nil:
t.Fatalf("wanted error got nil")
case !tc.WantErr && err != nil:
t.Fatalf("wanted no error got: %s", err)
}

if got.Index != tc.WantIdx {
t.Fatalf("got index %v, want %v", got.Index, tc.WantIdx)
}
})
}
}

func TestEmitsLogStoreMetrics(t *testing.T) {
sink := testSetupMetrics(t)

start := time.Now()

s := NewInmemStore()
logs := []*Log{
&Log{
Index: 1234,
Term: 1,
AppendedAt: time.Now(),
},
&Log{
Index: 1235,
Term: 1,
},
&Log{
Index: 1236,
Term: 2,
},
}
if err := s.StoreLogs(logs); err != nil {
t.Fatalf("expected store logs not to fail: %s", err)
}

stopCh := make(chan struct{})
defer close(stopCh)

go emitLogStoreMetrics(s, []string{"foo"}, time.Millisecond, stopCh)

// Wait for at least one interval
time.Sleep(5 * time.Millisecond)

got := getCurrentGaugeValue(t, sink, "raft.test.foo.oldestLogAge")

// Assert the age is in a reasonable range.
if got > float32(time.Since(start).Milliseconds()) {
t.Fatalf("max age before test start: %v", got)
}

if got < 1 {
t.Fatalf("max age less than interval: %v", got)
}
}

func testSetupMetrics(t *testing.T) *metrics.InmemSink {
// Record for ages (5 mins) so we can be confident that our assertions won't
// fail on silly long test runs due to dropped data.
s := metrics.NewInmemSink(10*time.Second, 300*time.Second)
cfg := metrics.DefaultConfig("raft.test")
cfg.EnableHostname = false
metrics.NewGlobal(cfg, s)
return s
}

func getCurrentGaugeValue(t *testing.T, sink *metrics.InmemSink, name string) float32 {
t.Helper()

data := sink.Data()

// Loop backward through intervals until there is a non-empty one
// Addresses flakiness around recording to one interval but accessing during the next
for i := len(data) - 1; i >= 0; i-- {
currentInterval := data[i]

currentInterval.RLock()
if gv, ok := currentInterval.Gauges[name]; ok {
currentInterval.RUnlock()
return gv.Value
}
currentInterval.RUnlock()
}

// Debug print all the gauges
buf := bytes.NewBuffer(nil)
for _, intv := range data {
intv.RLock()
for name, val := range intv.Gauges {
fmt.Fprintf(buf, "[%v][G] '%s': %0.3f\n", intv.Interval, name, val.Value)
}
intv.RUnlock()
}
t.Log(buf.String())

t.Fatalf("didn't find gauge %q", name)
return 0
}
10 changes: 9 additions & 1 deletion raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import (
)

const (
minCheckInterval = 10 * time.Millisecond
minCheckInterval = 10 * time.Millisecond
oldestLogGaugeInterval = 10 * time.Second
)

var (
Expand Down Expand Up @@ -389,8 +390,14 @@ func (r *Raft) runLeader() {
// leaderloop.
r.setupLeaderState()

// Run a background go-routine to emit metrics on log age
stopCh := make(chan struct{})
go emitLogStoreMetrics(r.logs, []string{"raft", "leader"}, oldestLogGaugeInterval, stopCh)

// Cleanup state on step down
defer func() {
close(stopCh)

// Since we were the leader previously, we update our
// last contact time when we step down, so that we are not
// reporting a last contact time from before we were the
Expand Down Expand Up @@ -1094,6 +1101,7 @@ func (r *Raft) dispatchLogs(applyLogs []*logFuture) {
lastIndex++
applyLog.log.Index = lastIndex
applyLog.log.Term = term
applyLog.log.AppendedAt = now
logs[idx] = &applyLog.log
r.leaderState.inflight.PushBack(applyLog)
}
Expand Down

0 comments on commit f3ecdb6

Please sign in to comment.