Skip to content

Commit

Permalink
Merge branch 'master' into adding-evict-method
Browse files Browse the repository at this point in the history
  • Loading branch information
marcosy committed Jan 25, 2019
2 parents 98babed + c569c06 commit 8219fc2
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 29 deletions.
43 changes: 20 additions & 23 deletions pkg/agent/plugin/workloadattestor/k8s/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/spiffe/spire/proto/agent/workloadattestor"
"github.com/spiffe/spire/proto/common"
spi "github.com/spiffe/spire/proto/common/plugin"
"github.com/zeebo/errs"
)

const (
Expand All @@ -31,6 +32,8 @@ const (
containerMaybeInPod
)

var k8sErr = errs.Class("k8s")

type k8sPlugin struct {
kubeletReadOnlyPort int
maxPollAttempts int
Expand Down Expand Up @@ -78,11 +81,9 @@ func (p *k8sPlugin) Attest(ctx context.Context, req *workloadattestor.AttestRequ
p.mtx.RLock()
defer p.mtx.RUnlock()

resp := workloadattestor.AttestResponse{}

cgroups, err := cgroups.GetCgroups(req.Pid, p.fs)
if err != nil {
return &resp, err
return nil, k8sErr.Wrap(err)
}

var containerID string
Expand All @@ -108,7 +109,7 @@ func (p *k8sPlugin) Attest(ctx context.Context, req *workloadattestor.AttestRequ

// Not a Kubernetes pod
if containerID == "" {
return &resp, nil
return &workloadattestor.AttestResponse{}, nil
}

// Poll pod information and search for the pod with the container. If
Expand All @@ -117,15 +118,16 @@ func (p *k8sPlugin) Attest(ctx context.Context, req *workloadattestor.AttestRequ
for attempt := 1; ; attempt++ {
list, err := p.getPodListFromInsecureKubeletPort()
if err != nil {
return &resp, err
return nil, k8sErr.Wrap(err)
}

notAllContainersReady := false
for _, item := range list.Items {
switch lookUpContainerInPod(containerID, item.Status) {
case containerInPod:
resp.Selectors = getSelectorsFromPodInfo(item)
return &resp, nil
return &workloadattestor.AttestResponse{
Selectors: getSelectorsFromPodInfo(item),
}, nil
case containerMaybeInPod:
notAllContainersReady = true
case containerNotInPod:
Expand All @@ -136,14 +138,17 @@ func (p *k8sPlugin) Attest(ctx context.Context, req *workloadattestor.AttestRequ
// uninitialized containers, then the search is over.
if !notAllContainersReady || attempt >= p.maxPollAttempts {
log.Printf("container id %q not found (attempt %d of %d)", containerID, attempt, p.maxPollAttempts)
return &resp, fmt.Errorf("no selectors found")
return nil, k8sErr.New("no selectors found")
}

// wait a bit for containers to initialize before trying again.
log.Printf("container id %q not found (attempt %d of %d); trying again in %s", containerID, attempt, p.maxPollAttempts, p.pollRetryInterval)

// TODO: bail early via context cancelation
time.Sleep(p.pollRetryInterval)
select {
case <-time.After(p.pollRetryInterval):
case <-ctx.Done():
return nil, k8sErr.New("no selectors found: %v", ctx.Err())
}
}
}

Expand Down Expand Up @@ -224,31 +229,23 @@ func (p *k8sPlugin) Configure(ctx context.Context, req *spi.ConfigureRequest) (*
p.mtx.Lock()
defer p.mtx.Unlock()

resp := &spi.ConfigureResponse{}

// Parse HCL config payload into config struct
config := &k8sPluginConfig{}
hclTree, err := hcl.Parse(req.Configuration)
if err != nil {
resp.ErrorList = []string{err.Error()}
return resp, err
}
err = hcl.DecodeObject(&config, hclTree)
if err != nil {
resp.ErrorList = []string{err.Error()}
return resp, err
config := new(k8sPluginConfig)
if err := hcl.Decode(config, req.Configuration); err != nil {
return nil, k8sErr.Wrap(err)
}

// set up defaults
if config.MaxPollAttempts <= 0 {
config.MaxPollAttempts = defaultMaxPollAttempts
}

var err error
var pollRetryInterval time.Duration
if config.PollRetryInterval != "" {
pollRetryInterval, err = time.ParseDuration(config.PollRetryInterval)
if err != nil {
return resp, err
return nil, k8sErr.Wrap(err)
}
}
if pollRetryInterval <= 0 {
Expand Down
43 changes: 37 additions & 6 deletions pkg/agent/plugin/workloadattestor/k8s/k8s_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ var (
ctx = context.Background()
)

func InitPlugin(t *testing.T, client httpClient, fs cgroups.FileSystem) workloadattestor.WorkloadAttestor {
func InitPlugin(t *testing.T, client httpClient, fs cgroups.FileSystem, opts ...func(*k8sPlugin)) workloadattestor.WorkloadAttestor {
pluginConfig := &spi.ConfigureRequest{
Configuration: validConfig,
}
Expand All @@ -54,6 +54,9 @@ func InitPlugin(t *testing.T, client httpClient, fs cgroups.FileSystem) workload
// the default retry config is much too long for tests.
p.pollRetryInterval = time.Millisecond
p.maxPollAttempts = 3
for _, opt := range opts {
opt(p)
}
return p
}

Expand Down Expand Up @@ -144,6 +147,33 @@ func TestK8s_AttestPidInPodAfterRetry(t *testing.T) {
require.NotEmpty(t, resp.Selectors)
}

func TestK8s_AttestPidNotInPodCancelsEarly(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

podListNotRunning, err := ioutil.ReadFile(podListNotRunningFilePath)
require.NoError(t, err)

mockHttpClient := http_client_mock.NewMockhttpClient(mockCtrl)
mockHttpClient.EXPECT().Get(podsURL).Return(
&http.Response{
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(bytes.NewReader(podListNotRunning)),
}, nil)

mockFilesystem := filesystem_mock.NewMockfileSystem(mockCtrl)
mockFilesystem.EXPECT().Open(pidCgroupPath).Return(os.Open(cgPidInPodFilePath))

plugin := InitPlugin(t, mockHttpClient, mockFilesystem, func(p *k8sPlugin) { p.pollRetryInterval = time.Hour })
ctx, cancel := context.WithCancel(ctx)
cancel()
req := workloadattestor.AttestRequest{Pid: int32(pid)}
resp, err := plugin.Attest(ctx, &req)
require.Error(t, err)
require.Contains(t, err.Error(), "k8s: no selectors found: context canceled")
require.Nil(t, resp)
}

func TestK8s_AttestPidNotInPodAfterRetry(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
Expand Down Expand Up @@ -175,7 +205,8 @@ func TestK8s_AttestPidNotInPodAfterRetry(t *testing.T) {
req := workloadattestor.AttestRequest{Pid: int32(pid)}
resp, err := plugin.Attest(ctx, &req)
require.Error(t, err)
require.Empty(t, resp.Selectors)
require.Contains(t, err.Error(), "k8s: no selectors found")
require.Nil(t, resp)
}

func TestK8s_AttestPidNotInPod(t *testing.T) {
Expand Down Expand Up @@ -207,13 +238,13 @@ func TestK8s_ConfigureValidConfig(t *testing.T) {
}

func TestK8s_ConfigureInvalidConfig(t *testing.T) {
assert := assert.New(t)
p := New()
r, err := p.Configure(ctx, &spi.ConfigureRequest{
res, err := p.Configure(ctx, &spi.ConfigureRequest{
Configuration: invalidConfig,
})
assert.Error(err)
assert.Equal(&spi.ConfigureResponse{ErrorList: []string{`strconv.ParseInt: parsing "invalid": invalid syntax`}}, r)
require.Error(t, err)
require.Contains(t, err.Error(), `k8s: strconv.ParseInt: parsing "invalid": invalid syntax`)
require.Nil(t, res)
}

func TestK8s_GetPluginInfo(t *testing.T) {
Expand Down

0 comments on commit 8219fc2

Please sign in to comment.