Skip to content

Commit

Permalink
Fetch metadata for heartbeat in background (gravitational#30528)
Browse files Browse the repository at this point in the history
This change updates SSH server heartbeats to fetch metadata in the
background so that the first heartbeat won't be delayed (this was
causing flakiness in tests).
  • Loading branch information
atburke authored Aug 24, 2023
1 parent 6c1a0d3 commit 92c8442
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 15 deletions.
46 changes: 31 additions & 15 deletions lib/srv/heartbeatv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package srv

import (
"context"
"sync/atomic"
"time"

"github.com/gravitational/trace"
Expand Down Expand Up @@ -73,20 +74,32 @@ func NewSSHServerHeartbeat(cfg SSHServerHeartbeatConfig) (*HeartbeatV2, error) {
return nil, trace.Wrap(err)
}

var metadataPtr atomic.Pointer[metadata.Metadata]
inner := &sshServerHeartbeatV2{
getServer: func(ctx context.Context) *types.ServerV2 {
server := cfg.GetServer()
metadata, err := metadata.Get(ctx)
if err == nil {
if metadata.CloudMetadata != nil {
server.SetCloudMetadata(metadata.CloudMetadata)
getMetadata: metadata.Get,
announcer: cfg.Announcer,
}
inner.getServer = func(ctx context.Context) *types.ServerV2 {
server := cfg.GetServer()

if meta := metadataPtr.Load(); meta == nil {
go func() {
meta, err := inner.getMetadata(ctx)
if err != nil {
log.Warnf("Failed to get metadata: %v", err)
} else if meta != nil && meta.CloudMetadata != nil {
// Set the metadata immediately to give the heartbeat
// a chance to use it.
server.SetCloudMetadata(meta.CloudMetadata)
metadataPtr.CompareAndSwap(nil, meta)
}
} else {
log.Warnf("Failed to get metadata: %v", err)
}
return server
},
announcer: cfg.Announcer,
}()
} else if meta.CloudMetadata != nil {
// Server isn't cached between heartbeats, so set the metadata again.
server.SetCloudMetadata(meta.CloudMetadata)
}

return server
}

return newHeartbeatV2(cfg.InventoryHandle, inner, heartbeatV2Config{
Expand Down Expand Up @@ -444,11 +457,14 @@ type heartbeatV2Driver interface {
SupportsFallback() bool
}

type metadataGetter func(ctx context.Context) (*metadata.Metadata, error)

// sshServerHeartbeatV2 is the heartbeatV2 implementation for ssh servers.
type sshServerHeartbeatV2 struct {
getServer func(ctx context.Context) *types.ServerV2
announcer auth.Announcer
prev *types.ServerV2
getServer func(ctx context.Context) *types.ServerV2
getMetadata metadataGetter
announcer auth.Announcer
prev *types.ServerV2
}

func (h *sshServerHeartbeatV2) Poll(ctx context.Context) (changed bool) {
Expand Down
74 changes: 74 additions & 0 deletions lib/srv/heartbeatv2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/client"
"github.com/gravitational/teleport/api/client/proto"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/lib/inventory"
"github.com/gravitational/teleport/lib/inventory/metadata"
)

type fakeHeartbeatDriver struct {
Expand Down Expand Up @@ -459,3 +461,75 @@ func awaitEvents(t *testing.T, ch <-chan hbv2TestEvent, opts ...eventOption) {
}
}
}

type fakeDownstreamHandle struct {
inventory.DownstreamHandle
}

func (f *fakeDownstreamHandle) CloseContext() context.Context {
return context.Background()
}

func mockMetadataGetter() (metadataGetter, chan *metadata.Metadata) {
ch := make(chan *metadata.Metadata, 1)
return func(ctx context.Context) (*metadata.Metadata, error) {
meta := <-ch
if meta == nil {
return nil, fmt.Errorf("error fetching metadata")
}
return meta, nil
}, ch
}

func makeMetadata(id string) *metadata.Metadata {
return &metadata.Metadata{
CloudMetadata: &types.CloudMetadata{
AWS: &types.AWSInfo{
InstanceID: id,
},
},
}
}

func TestNewHeartbeatFetchMetadata(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

heartbeat, err := NewSSHServerHeartbeat(SSHServerHeartbeatConfig{
InventoryHandle: &fakeDownstreamHandle{},
GetServer: func() *types.ServerV2 {
return &types.ServerV2{
Spec: types.ServerSpecV2{},
}
},
})
require.NoError(t, err)
metadataGetter, metaCh := mockMetadataGetter()
t.Cleanup(func() { close(metaCh) })
inner := heartbeat.inner.(*sshServerHeartbeatV2)
inner.getMetadata = metadataGetter

// Metadata won't be set before metadata getter returns.
server := inner.getServer(ctx)
assert.Nil(t, server.GetCloudMetadata(), "Metadata was set before background process returned")

// Metadata won't be set if the getter fails.
metaCh <- nil
time.Sleep(100 * time.Millisecond) // Wait for goroutines to complete
assert.Nil(t, inner.getServer(ctx).GetCloudMetadata(), "Metadata was set despite metadata getter failing")

// getServer gets updated metadata value.
metaCh <- makeMetadata("foo")
time.Sleep(100 * time.Millisecond) // Wait for goroutines to complete
meta := inner.getServer(ctx).GetCloudMetadata()
assert.NotNil(t, meta, "Heartbeat never got metadata")
assert.Equal(t, "foo", meta.AWS.InstanceID)

// Metadata won't be fetched more than once.
metaCh <- makeMetadata("bar")
time.Sleep(100 * time.Millisecond) // Wait for goroutines to complete
meta = inner.getServer(ctx).GetCloudMetadata()
assert.NotNil(t, meta, "Lost metadata")
assert.NotEqual(t, "bar", meta.AWS.InstanceID)
}

0 comments on commit 92c8442

Please sign in to comment.