Skip to content

Commit

Permalink
Add discovery-side label reconciler (gravitational#27476)
Browse files Browse the repository at this point in the history
This change adds a service to the auth server that periodically iterates through
stored ServerInfos and updates the labels of associated SSH servers over their
inventory control stream.
  • Loading branch information
atburke authored Jul 14, 2023
1 parent 7bfe300 commit da64b0c
Show file tree
Hide file tree
Showing 30 changed files with 3,105 additions and 2,522 deletions.
6 changes: 6 additions & 0 deletions api/client/inventory.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,8 @@ func (i *downstreamICS) runRecvLoop(stream proto.AuthService_InventoryControlStr
msg = *oneOf.GetHello()
case oneOf.GetPing() != nil:
msg = *oneOf.GetPing()
case oneOf.GetUpdateLabels() != nil:
msg = *oneOf.GetUpdateLabels()
default:
// TODO: log unknown message variants once we have a better story around
// logging in api/* packages.
Expand Down Expand Up @@ -503,6 +505,10 @@ func (i *upstreamICS) runSendLoop(stream proto.AuthService_InventoryControlStrea
oneOf.Msg = &proto.DownstreamInventoryOneOf_Ping{
Ping: &msg,
}
case proto.DownstreamInventoryUpdateLabels:
oneOf.Msg = &proto.DownstreamInventoryOneOf_UpdateLabels{
UpdateLabels: &msg,
}
default:
sendMsg.errC <- trace.BadParameter("cannot send unexpected upstream msg type: %T", msg)
continue
Expand Down
438 changes: 222 additions & 216 deletions api/client/proto/authservice.pb.go

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions api/internalutils/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,3 +312,16 @@ func PageFunc[T any](fn func() ([]T, error), doneFuncs ...func()) Stream[T] {
},
}
}

// Take takes the next n items from a stream. It returns a slice of the items
// and the result of the last call to stream.Next().
func Take[T any](stream Stream[T], n int) ([]T, bool) {
items := make([]T, 0, n)
for i := 0; i < n; i++ {
if !stream.Next() {
return items, false
}
items = append(items, stream.Item())
}
return items, true
}
57 changes: 57 additions & 0 deletions api/internalutils/stream/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,3 +348,60 @@ func TestCollectPages(t *testing.T) {
})
}
}

func TestTake(t *testing.T) {
t.Parallel()

intSlice := func(n int) []int {
s := make([]int, 0, n)
for i := 0; i < n; i++ {
s = append(s, i)
}
return s
}

tests := []struct {
name string
input []int
n int
expectedOutput []int
expectMore bool
}{
{
name: "empty stream",
input: []int{},
n: 10,
expectedOutput: []int{},
expectMore: false,
},
{
name: "full stream",
input: intSlice(20),
n: 10,
expectedOutput: intSlice(10),
expectMore: true,
},
{
name: "drain stream of size n",
input: intSlice(10),
n: 10,
expectedOutput: intSlice(10),
expectMore: true,
},
{
name: "drain stream of size < n",
input: intSlice(5),
n: 10,
expectedOutput: intSlice(5),
expectMore: false,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
stream := Slice(tc.input)
output, more := Take(stream, tc.n)
require.Equal(t, tc.expectedOutput, output)
require.Equal(t, tc.expectMore, more)
})
}
}
5 changes: 4 additions & 1 deletion api/proto/teleport/legacy/client/proto/authservice.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2160,8 +2160,11 @@ message DownstreamInventoryHello {

// LabelUpdateKind is the type of service to update labels for.
enum LabelUpdateKind {
// SSHServer is an SSH server.
// SSHServer is a label update for an SSH server.
SSHServer = 0;
// SSHServerCloudLabels is a label update for an SSH server coming from a
// cloud provider.
SSHServerCloudLabels = 1;
}

// InventoryUpdateLabelsRequest is used to request that a specified instance
Expand Down
33 changes: 22 additions & 11 deletions api/proto/teleport/legacy/types/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -789,12 +789,30 @@ message ServerSpecV2 {
repeated string ProxyIDs = 12 [(gogoproto.jsontag) = "proxy_ids,omitempty"];
// PublicAddrs is a list of public addresses where this server can be reached.
repeated string public_addrs = 13;
// CloudMetadata contains info about the cloud instance the server is running
// on, if any.
CloudMetadata CloudMetadata = 14 [(gogoproto.jsontag) = "cloud_metadata,omitempty"];

reserved 8;
reserved 10;
reserved "KubernetesClusters";
}

// AWSInfo contains attributes to match to an EC2 instance.
message AWSInfo {
// AccountID is an AWS account ID.
string AccountID = 1 [(gogoproto.jsontag) = "account_id"];
// InstanceID is an EC2 instance ID.
string InstanceID = 2 [(gogoproto.jsontag) = "instance_id"];
}

// CloudMetadata contains info about the cloud instance a server is running
// on, if any.
message CloudMetadata {
// AWSInfo contains attributes to match to an EC2 instance.
AWSInfo AWS = 1 [(gogoproto.jsontag) = "aws,omitempty"];
}

// AppServerV3 represents a single proxied web app.
message AppServerV3 {
option (gogoproto.goproto_stringer) = false;
Expand Down Expand Up @@ -5823,8 +5841,8 @@ message WatchStatusSpecV1 {
message ServerInfoV1 {
// Kind is the resource kind.
string Kind = 1 [(gogoproto.jsontag) = "kind"];
// SubKind is an optional resource subkind. Currently unused for this resource.
string SubKind = 2 [(gogoproto.jsontag) = "sub_kind,omitempty"];
// SubKind is an optional resource subkind.
string SubKind = 2 [(gogoproto.jsontag) = "sub_kind"];
// Version is the resource version.
string Version = 3 [(gogoproto.jsontag) = "version"];
// Metadata is the resource metadata.
Expand All @@ -5841,15 +5859,8 @@ message ServerInfoV1 {

// ServerInfoSpecV1 contains fields used to match Nodes to this ServerInfo.
message ServerInfoSpecV1 {
// AWSInfo contains attributes to match to an EC2 instance.
message AWSInfo {
// AccountID is an AWS account ID.
string AccountID = 1 [(gogoproto.jsontag) = "account_id"];
// InstanceID is an EC2 instance ID.
string InstanceID = 2 [(gogoproto.jsontag) = "instance_id"];
}
// AWS matches an EC2 instance.
AWSInfo AWS = 1 [(gogoproto.jsontag) = "aws,omitempty"];
reserved 1;
reserved "AWS";
// NewLabels is the set of labels to add to nodes matching this ServerInfo.
map<string, string> NewLabels = 2 [(gogoproto.jsontag) = "new_labels,omitempty"];
}
Expand Down
4 changes: 4 additions & 0 deletions api/types/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,10 @@ const (
// KindServerInfo contains info that should be applied to joining Nodes.
KindServerInfo = "server_info"

// SubKindCloudInfo is a ServerInfo that was created by the Discovery
// service to match with a single discovered instance.
SubKindCloudInfo = "cloud_info"

// MetaNameClusterMaintenanceConfig is the only allowed metadata.name value for the maintenance
// window singleton resource.
MetaNameClusterMaintenanceConfig = "cluster-maintenance-config"
Expand Down
15 changes: 15 additions & 0 deletions api/types/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ type Server interface {

// DeepCopy creates a clone of this server value
DeepCopy() Server

// GetCloudMetadata gets the cloud metadata for the server.
GetCloudMetadata() *CloudMetadata
// SetCloudMetadata sets the server's cloud metadata.
SetCloudMetadata(meta *CloudMetadata)
}

// NewServer creates an instance of Server.
Expand Down Expand Up @@ -471,6 +476,16 @@ func (s *ServerV2) DeepCopy() Server {
return utils.CloneProtoMsg(s)
}

// GetCloudMetadata gets the cloud metadata for the server.
func (s *ServerV2) GetCloudMetadata() *CloudMetadata {
return s.Spec.CloudMetadata
}

// SetCloudMetadata sets the server's cloud metadata.
func (s *ServerV2) SetCloudMetadata(meta *CloudMetadata) {
s.Spec.CloudMetadata = meta
}

// IsAWSConsole returns true if this app is AWS management console.
func (a *App) IsAWSConsole() bool {
return strings.HasPrefix(a.URI, constants.AWSConsoleURL)
Expand Down
10 changes: 7 additions & 3 deletions api/types/server_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package types

import (
"fmt"
"time"

"github.com/gravitational/trace"
Expand Down Expand Up @@ -139,9 +140,6 @@ func (s *ServerInfoV1) MatchSearch(searchValues []string) bool {
utils.MapToStrings(s.GetAllLabels()),
s.GetName(),
)
if s.Spec.AWS != nil {
fieldVals = append(fieldVals, s.Spec.AWS.AccountID, s.Spec.AWS.InstanceID)
}
return MatchSearch(fieldVals, searchValues, nil)
}

Expand All @@ -166,3 +164,9 @@ func (s *ServerInfoV1) CheckAndSetDefaults() error {
s.setStaticFields()
return trace.Wrap(s.Metadata.CheckAndSetDefaults())
}

// GetServerInfoName gets the name of the ServerInfo generated for a discovered
// EC2 instance with this account ID and instance ID.
func (a *AWSInfo) GetServerInfoName() string {
return fmt.Sprintf("aws-%v-%v", a.AccountID, a.InstanceID)
}
Loading

0 comments on commit da64b0c

Please sign in to comment.