Skip to content

Commit

Permalink
Drop cache if configuration changes.
Browse files Browse the repository at this point in the history
Allow object TTL's to be configurable.
Default apikey TTL to 15m to avoid auth bottleneck.
  • Loading branch information
Sean Cunningham committed Jun 10, 2021
1 parent 0cd883a commit aadaa4c
Show file tree
Hide file tree
Showing 16 changed files with 346 additions and 383 deletions.
8 changes: 4 additions & 4 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,11 @@ SOFTWARE.

--------------------------------------------------------------------------------
Dependency : github.com/dgraph-io/ristretto
Version: v0.0.3
Version: v0.1.0
Licence type (autodetected): Apache-2.0
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/dgraph-io/ristretto@v0.0.3/LICENSE:
Contents of probable licence file $GOMODCACHE/github.com/dgraph-io/ristretto@v0.1.0/LICENSE:

Apache License
Version 2.0, January 2004
Expand Down Expand Up @@ -35585,11 +35585,11 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

--------------------------------------------------------------------------------
Dependency : golang.org/x/sys
Version: v0.0.0-20200625212154-ddb9806d33ae
Version: v0.0.0-20200930185726-fdedc70b468f
Licence type (autodetected): BSD-3-Clause
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/golang.org/x/[email protected]20200625212154-ddb9806d33ae/LICENSE:
Contents of probable licence file $GOMODCACHE/golang.org/x/[email protected]20200930185726-fdedc70b468f/LICENSE:

Copyright (c) 2009 The Go Authors. All rights reserved.

Expand Down
24 changes: 17 additions & 7 deletions cmd/fleet/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,10 @@ import (
"github.com/rs/zerolog/log"
)

const (
kAPIKeyTTL = 5 * time.Second
)

var (
ErrApiKeyNotEnabled = errors.New("APIKey not enabled")
ErrAgentCorrupted = errors.New("agent record corrupted")
ErrAgentInactive = errors.New("agent inactive")
)

// This authenticates that the provided API key exists and is enabled.
Expand Down Expand Up @@ -67,9 +64,8 @@ func authApiKey(r *http.Request, bulker bulk.Bulk, c cache.Cache) (*apikey.ApiKe
RawJSON("meta", info.Metadata).
Msg("ApiKey authenticated")

if info.Enabled {
c.SetApiKey(*key, kAPIKeyTTL)
} else {
c.SetApiKey(*key, info.Enabled)
if !info.Enabled {
err = ErrApiKeyNotEnabled
log.Info().
Err(err).
Expand Down Expand Up @@ -126,5 +122,19 @@ func authAgent(r *http.Request, id string, bulker bulk.Bulk, c cache.Cache) (*mo
return nil, ErrAgentCorrupted
}

// validate active, an api key can be valid for an inactive agent record
// if it is in our cache and has not timed out.
if !agent.Active {
log.Info().
Err(ErrAgentInactive).
Str("agentId", id).
Str(EcsHttpRequestId, r.Header.Get(logger.HeaderRequestID)).
Msg("agent record inactive")

// Update the cache to mark the api key id associated with this agent as not enabled
c.SetApiKey(*key, false)
return nil, ErrAgentInactive
}

return agent, nil
}
2 changes: 1 addition & 1 deletion cmd/fleet/handleAck.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (ack *AckT) handleAckEvents(ctx context.Context, agent *model.Agent, events
return errors.New("no matching action")
}
action = actions[0]
ack.cache.SetAction(action, time.Minute)
ack.cache.SetAction(action)
}

acr := model.ActionResult{
Expand Down
7 changes: 3 additions & 4 deletions cmd/fleet/handleArtifacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,8 @@ import (
)

const (
defaultMaxParallel = 8 // TODO: configurable
defaultCacheTTL = time.Hour * 24 // TODO: configurable
defaultThrottleTTL = time.Minute // TODO: configurable
defaultMaxParallel = 8 // TODO: configurable
defaultThrottleTTL = time.Minute // TODO: configurable
)

var (
Expand Down Expand Up @@ -244,7 +243,7 @@ func (at ArtifactT) getArtifact(ctx context.Context, zlog zerolog.Logger, ident,
art.Body = dstPayload

// Update the cache.
at.cache.SetArtifact(*art, defaultCacheTTL)
at.cache.SetArtifact(*art)

return art, nil
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/fleet/handleCheckin.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ func processPolicy(ctx context.Context, bulker bulk.Bulk, agentId, reqId string,
Str("newHash", defaultRole.Sha2).
Msg("Generating a new API key")

defaultOutputApiKey, err := generateOutputApiKey(ctx, bulker.Client(), agent.Id, policy.DefaultOutputName, defaultRole.Raw)
defaultOutputApiKey, err := generateOutputApiKey(ctx, bulker, agent.Id, policy.DefaultOutputName, defaultRole.Raw)
if err != nil {
zlog.Error().Err(err).Msg("fail generate output key")
return nil, err
Expand Down
32 changes: 19 additions & 13 deletions cmd/fleet/handleEnroll.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/elastic/fleet-server/v7/internal/pkg/model"
"github.com/elastic/fleet-server/v7/internal/pkg/sqn"

"github.com/elastic/go-elasticsearch/v7"
"github.com/gofrs/uuid"
"github.com/hashicorp/go-version"
"github.com/julienschmidt/httprouter"
Expand All @@ -34,9 +33,6 @@ import (
const (
kEnrollMod = "enroll"

kCacheAccessInitTTL = time.Second * 30 // Cache a bit longer to handle expensive initial checkin
kCacheEnrollmentTTL = time.Second * 30

EnrollEphemeral = "EPHEMERAL"
EnrollPermanent = "PERMANENT"
EnrollTemporary = "TEMPORARY"
Expand Down Expand Up @@ -195,7 +191,7 @@ func _enroll(ctx context.Context, bulker bulk.Bulk, c cache.Cache, req EnrollReq

agentId := u.String()

accessApiKey, err := generateAccessApiKey(ctx, bulker.Client(), agentId)
accessApiKey, err := generateAccessApiKey(ctx, bulker, agentId)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -238,7 +234,7 @@ func _enroll(ctx context.Context, bulker bulk.Bulk, c cache.Cache, req EnrollReq
}

// We are Kool & and the Gang; cache the access key to avoid the roundtrip on impending checkin
c.SetApiKey(*accessApiKey, kCacheAccessInitTTL)
c.SetApiKey(*accessApiKey, true)

return &resp, nil
}
Expand Down Expand Up @@ -312,15 +308,25 @@ func createFleetAgent(ctx context.Context, bulker bulk.Bulk, id string, agent mo
return nil
}

func generateAccessApiKey(ctx context.Context, client *elasticsearch.Client, agentId string) (*apikey.ApiKey, error) {
return apikey.Create(ctx, client, agentId, "", []byte(kFleetAccessRolesJSON),
apikey.NewMetadata(agentId, apikey.TypeAccess))
func generateAccessApiKey(ctx context.Context, bulk bulk.Bulk, agentId string) (*apikey.ApiKey, error) {
return bulk.ApiKeyCreate(
ctx,
agentId,
"",
[]byte(kFleetAccessRolesJSON),
apikey.NewMetadata(agentId, apikey.TypeAccess),
)
}

func generateOutputApiKey(ctx context.Context, client *elasticsearch.Client, agentId, outputName string, roles []byte) (*apikey.ApiKey, error) {
func generateOutputApiKey(ctx context.Context, bulk bulk.Bulk, agentId, outputName string, roles []byte) (*apikey.ApiKey, error) {
name := fmt.Sprintf("%s:%s", agentId, outputName)
return apikey.Create(ctx, client, name, "", roles,
apikey.NewMetadata(agentId, apikey.TypeOutput))
return bulk.ApiKeyCreate(
ctx,
name,
"",
roles,
apikey.NewMetadata(agentId, apikey.TypeOutput),
)
}

func (et *EnrollerT) fetchEnrollmentKeyRecord(ctx context.Context, id string) (*model.EnrollmentApiKey, error) {
Expand All @@ -340,7 +346,7 @@ func (et *EnrollerT) fetchEnrollmentKeyRecord(ctx context.Context, id string) (*
}

cost := int64(len(rec.ApiKey))
et.cache.SetEnrollmentApiKey(id, rec, cost, kCacheEnrollmentTTL)
et.cache.SetEnrollmentApiKey(id, rec, cost)

return &rec, nil
}
Expand Down
70 changes: 43 additions & 27 deletions cmd/fleet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,23 @@ func installSignalHandler() context.Context {
}

func makeCache(cfg *config.Config) (cache.Cache, error) {
cacheCfg := makeCacheConfig(cfg)
log.Info().Interface("cfg", cacheCfg).Msg("makeCache")
return cache.New(cacheCfg)
}

log.Info().
Int64("numCounters", cfg.Inputs[0].Cache.NumCounters).
Int64("maxCost", cfg.Inputs[0].Cache.MaxCost).
Msg("makeCache")
func makeCacheConfig(cfg *config.Config) cache.Config {
ccfg := cfg.Inputs[0].Cache

cacheCfg := cache.Config{
NumCounters: cfg.Inputs[0].Cache.NumCounters,
MaxCost: cfg.Inputs[0].Cache.MaxCost,
return cache.Config{
NumCounters: ccfg.NumCounters,
MaxCost: ccfg.MaxCost,
ActionTTL: ccfg.ActionTTL,
EnrollKeyTTL: ccfg.EnrollKeyTTL,
ArtifactTTL: ccfg.ArtifactTTL,
ApiKeyTTL: ccfg.ApiKeyTTL,
ApiKeyJitter: ccfg.ApiKeyJitter,
}

return cache.New(cacheCfg)
}

func initLogger(cfg *config.Config, version, commit string) (*logger.Logger, error) {
Expand Down Expand Up @@ -110,12 +115,7 @@ func getRunCommand(version, commit string) func(cmd *cobra.Command, args []strin
return err
}

c, err := makeCache(cfg)
if err != nil {
return err
}

agent, err := NewAgentMode(cliCfg, os.Stdin, c, version, l)
agent, err := NewAgentMode(cliCfg, os.Stdin, version, l)
if err != nil {
return err
}
Expand Down Expand Up @@ -144,12 +144,7 @@ func getRunCommand(version, commit string) func(cmd *cobra.Command, args []strin
return err
}

c, err := makeCache(cfg)
if err != nil {
return err
}

srv, err := NewFleetServer(cfg, c, version, status.NewLog())
srv, err := NewFleetServer(cfg, version, status.NewLog())
if err != nil {
return err
}
Expand Down Expand Up @@ -186,7 +181,6 @@ type firstCfg struct {

type AgentMode struct {
cliCfg *ucfg.Config
cache cache.Cache
version string

reloadables []reload.Reloadable
Expand All @@ -201,12 +195,11 @@ type AgentMode struct {
startChan chan struct{}
}

func NewAgentMode(cliCfg *ucfg.Config, reader io.Reader, c cache.Cache, version string, reloadables ...reload.Reloadable) (*AgentMode, error) {
func NewAgentMode(cliCfg *ucfg.Config, reader io.Reader, version string, reloadables ...reload.Reloadable) (*AgentMode, error) {
var err error

a := &AgentMode{
cliCfg: cliCfg,
cache: c,
version: version,
reloadables: reloadables,
}
Expand Down Expand Up @@ -252,7 +245,7 @@ func (a *AgentMode) Run(ctx context.Context) error {
srvCtx, srvCancel := context.WithCancel(ctx)
defer srvCancel()
log.Info().Msg("received initial configuration starting Fleet Server")
srv, err := NewFleetServer(cfg.cfg, a.cache, a.version, status.NewChained(status.NewLog(), a.agent))
srv, err := NewFleetServer(cfg.cfg, a.version, status.NewChained(status.NewLog(), a.agent))
if err != nil {
// unblock startChan even though there was an error
a.startChan <- struct{}{}
Expand Down Expand Up @@ -400,17 +393,23 @@ type FleetServer struct {
}

// NewFleetServer creates the actual fleet server service.
func NewFleetServer(cfg *config.Config, c cache.Cache, verStr string, reporter status.Reporter) (*FleetServer, error) {
func NewFleetServer(cfg *config.Config, verStr string, reporter status.Reporter) (*FleetServer, error) {
verCon, err := buildVersionConstraint(verStr)
if err != nil {
return nil, err
}

cache, err := makeCache(cfg)
if err != nil {
return nil, err
}

return &FleetServer{
ver: verStr,
verCon: verCon,
cfg: cfg,
cfgCh: make(chan *config.Config, 1),
cache: c,
cache: cache,
reporter: reporter,
}, nil
}
Expand Down Expand Up @@ -469,6 +468,16 @@ LOOP:
f.reporter.Status(proto.StateObserved_STARTING, "Starting", nil)
}

// Create or recreate cache
if configCacheChanged(curCfg, newCfg) {
cacheCfg := makeCacheConfig(newCfg)
err := f.cache.Reconfigure(cacheCfg)
log.Info().Err(err).Interface("cfg", cacheCfg).Msg("Reconfigure cache")
if err != nil {
return err
}
}

// Start or restart profiler
if configChangedProfiler(curCfg, newCfg) {
stop(proCancel, proEg)
Expand Down Expand Up @@ -535,6 +544,13 @@ func configChangedServer(curCfg, newCfg *config.Config) bool {
return curCfg == nil || curCfg.Inputs[0].Server != newCfg.Inputs[0].Server
}

func configCacheChanged(curCfg, newCfg *config.Config) bool {
if curCfg == nil {
return false
}
return curCfg.Inputs[0].Cache != newCfg.Inputs[0].Cache
}

func safeWait(g *errgroup.Group, to time.Duration) (err error) {
waitCh := make(chan error)
go func() {
Expand Down
8 changes: 1 addition & 7 deletions cmd/fleet/server_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"

"github.com/elastic/fleet-server/v7/internal/pkg/cache"
"github.com/elastic/fleet-server/v7/internal/pkg/config"
"github.com/elastic/fleet-server/v7/internal/pkg/logger"
"github.com/elastic/fleet-server/v7/internal/pkg/sleep"
Expand Down Expand Up @@ -63,11 +62,6 @@ func startTestServer(ctx context.Context) (*tserver, error) {
return nil, err
}

c, err := cache.New(cache.Config{NumCounters: 100, MaxCost: 100000})
if err != nil {
return nil, err
}

logger.Init(cfg)

port, err := ftesting.FreePort()
Expand All @@ -82,7 +76,7 @@ func startTestServer(ctx context.Context) (*tserver, error) {
cfg.Inputs[0].Server = *srvcfg
log.Info().Uint16("port", port).Msg("Test fleet server")

srv, err := NewFleetServer(cfg, c, serverVersion, status.NewLog())
srv, err := NewFleetServer(cfg, serverVersion, status.NewLog())
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.16
require (
github.com/Pallinder/go-randomdata v1.2.0
github.com/aleksmaus/generate v0.0.0-20210326194607-c630e07a2742
github.com/dgraph-io/ristretto v0.0.3
github.com/dgraph-io/ristretto v0.1.0
github.com/elastic/beats/v7 v7.11.1
github.com/elastic/elastic-agent-client/v7 v7.0.0-20200709172729-d43b7ad5833a
github.com/elastic/go-elasticsearch/v7 v7.13.1
Expand Down
7 changes: 4 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,8 @@ github.com/devigned/tab v0.1.2-0.20190607222403-0c15cf42f9a2/go.mod h1:XG9mPq0dF
github.com/dgraph-io/badger/v2 v2.2007.3-0.20201012072640-f5a7e0a1c83b h1:mUDs72Rlzv6A4YN8w3Ra3hU9x/plOQPcQjZYL/1f5SM=
github.com/dgraph-io/badger/v2 v2.2007.3-0.20201012072640-f5a7e0a1c83b/go.mod h1:26P/7fbL4kUZVEVKLAKXkBXKOydDmM2p1e+NhhnBCAE=
github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E=
github.com/dgraph-io/ristretto v0.0.3 h1:jh22xisGBjrEVnRZ1DVTpBVQm0Xndu8sMl0CWDzSIBI=
github.com/dgraph-io/ristretto v0.0.3/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E=
github.com/dgraph-io/ristretto v0.1.0 h1:Jv3CGQHp9OjuMBSne1485aDpUkTKEcUqF+jm/LuerPI=
github.com/dgraph-io/ristretto v0.1.0/go.mod h1:fux0lOrBhrVCJd3lcTHsIJhq1T2rokOu6v9Vcb3Q9ug=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgrijalva/jwt-go v3.2.1-0.20190620180102-5e25c22bd5d6+incompatible h1:4jGdduO4ceTJFKf0IhgaB8NJapGqKHwC2b4xQ/cXujM=
github.com/dgrijalva/jwt-go v3.2.1-0.20190620180102-5e25c22bd5d6+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
Expand Down Expand Up @@ -922,8 +922,9 @@ golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200622214017-ed371f2e16b4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae h1:Ih9Yo4hSPImZOpfGuA4bR/ORKTAbhZo2AbWNRCnevdo=
golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f h1:+Nyd8tzPX9R7BWHguqsrbFdRx3WQ/1ib8I44HXV5yTA=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
Expand Down
Loading

0 comments on commit aadaa4c

Please sign in to comment.