forked from gruntwork-io/cloud-nuke
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Implement nuking for Kinesis firehose (gruntwork-io#719)
* implement kinesis firehose * implement kinesis firehose * implement kinesis firehose * implement kinesis firehose * implement kinesis firehose * implement kinesis firehose
- Loading branch information
1 parent
99aceba
commit 03b54f4
Showing
7 changed files
with
218 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
package resources | ||
|
||
import ( | ||
"context" | ||
|
||
awsgo "github.com/aws/aws-sdk-go/aws" | ||
"github.com/aws/aws-sdk-go/service/firehose" | ||
"github.com/gruntwork-io/cloud-nuke/config" | ||
"github.com/gruntwork-io/cloud-nuke/logging" | ||
"github.com/gruntwork-io/cloud-nuke/report" | ||
"github.com/gruntwork-io/go-commons/errors" | ||
) | ||
|
||
func (kf *KinesisFirehose) getAll(_ context.Context, configObj config.Config) ([]*string, error) { | ||
allStreams := []*string{} | ||
output, err := kf.Client.ListDeliveryStreamsWithContext(kf.Context, &firehose.ListDeliveryStreamsInput{}) | ||
|
||
for _, stream := range output.DeliveryStreamNames { | ||
if configObj.KinesisFirehose.ShouldInclude(config.ResourceValue{ | ||
Name: stream, | ||
}) { | ||
allStreams = append(allStreams, stream) | ||
} | ||
} | ||
|
||
if err != nil { | ||
return nil, errors.WithStackTrace(err) | ||
} | ||
|
||
return allStreams, nil | ||
} | ||
|
||
func (kf *KinesisFirehose) nukeAll(identifiers []*string) error { | ||
if len(identifiers) == 0 { | ||
logging.Debugf("No Kinesis Firhose to nuke in region %s", kf.Region) | ||
return nil | ||
} | ||
|
||
logging.Debugf("Deleting all Kinesis Firhose in region %s", kf.Region) | ||
var deleted []*string | ||
for _, id := range identifiers { | ||
_, err := kf.Client.DeleteDeliveryStreamWithContext(kf.Context, &firehose.DeleteDeliveryStreamInput{ | ||
AllowForceDelete: awsgo.Bool(true), | ||
DeliveryStreamName: id, | ||
}) | ||
e := report.Entry{ | ||
Identifier: awsgo.StringValue(id), | ||
ResourceType: "Kinesis Firehose", | ||
Error: err, | ||
} | ||
report.Record(e) | ||
|
||
if err != nil { | ||
logging.Debugf("[Failed] %s", err) | ||
} else { | ||
deleted = append(deleted, id) | ||
logging.Debugf("Deleted Kinesis Firehose: %s", *id) | ||
} | ||
} | ||
|
||
logging.Debugf("[OK] %d Kinesis Firehose(s) deleted in %s", len(deleted), kf.Region) | ||
|
||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
package resources | ||
|
||
import ( | ||
"context" | ||
"regexp" | ||
"testing" | ||
|
||
"github.com/aws/aws-sdk-go/aws" | ||
"github.com/aws/aws-sdk-go/aws/request" | ||
"github.com/aws/aws-sdk-go/service/firehose" | ||
"github.com/aws/aws-sdk-go/service/firehose/firehoseiface" | ||
"github.com/gruntwork-io/cloud-nuke/config" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
type mockedKinesisFirehose struct { | ||
firehoseiface.FirehoseAPI | ||
ListDeliveryStreamsOutput firehose.ListDeliveryStreamsOutput | ||
DeleteDeliveryStreamOutput firehose.DeleteDeliveryStreamOutput | ||
} | ||
|
||
func (m mockedKinesisFirehose) ListDeliveryStreamsWithContext(aws.Context, *firehose.ListDeliveryStreamsInput, ...request.Option) (*firehose.ListDeliveryStreamsOutput, error) { | ||
return &m.ListDeliveryStreamsOutput, nil | ||
} | ||
|
||
func (m mockedKinesisFirehose) DeleteDeliveryStreamWithContext(aws.Context, *firehose.DeleteDeliveryStreamInput, ...request.Option) (*firehose.DeleteDeliveryStreamOutput, error) { | ||
return &m.DeleteDeliveryStreamOutput, nil | ||
} | ||
|
||
func TestKinesisFirehoseStreams_GetAll(t *testing.T) { | ||
|
||
t.Parallel() | ||
|
||
testName1 := "stream1" | ||
testName2 := "stream2" | ||
ks := KinesisFirehose{ | ||
Client: mockedKinesisFirehose{ | ||
ListDeliveryStreamsOutput: firehose.ListDeliveryStreamsOutput{ | ||
DeliveryStreamNames: []*string{aws.String(testName1), aws.String(testName2)}, | ||
}, | ||
}, | ||
} | ||
|
||
tests := map[string]struct { | ||
configObj config.ResourceType | ||
expected []string | ||
}{ | ||
"emptyFilter": { | ||
configObj: config.ResourceType{}, | ||
expected: []string{testName1, testName2}, | ||
}, | ||
"nameExclusionFilter": { | ||
configObj: config.ResourceType{ | ||
ExcludeRule: config.FilterRule{ | ||
NamesRegExp: []config.Expression{{ | ||
RE: *regexp.MustCompile(testName1), | ||
}}}, | ||
}, | ||
expected: []string{testName2}, | ||
}, | ||
} | ||
for name, tc := range tests { | ||
t.Run(name, func(t *testing.T) { | ||
names, err := ks.getAll(context.Background(), config.Config{ | ||
KinesisFirehose: tc.configObj, | ||
}) | ||
require.NoError(t, err) | ||
require.Equal(t, tc.expected, aws.StringValueSlice(names)) | ||
}) | ||
} | ||
|
||
} | ||
|
||
func TestKinesisFirehose_NukeAll(t *testing.T) { | ||
|
||
t.Parallel() | ||
|
||
ks := KinesisFirehose{ | ||
Client: mockedKinesisFirehose{ | ||
DeleteDeliveryStreamOutput: firehose.DeleteDeliveryStreamOutput{}, | ||
}, | ||
} | ||
|
||
err := ks.nukeAll([]*string{aws.String("test")}) | ||
require.NoError(t, err) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
package resources | ||
|
||
import ( | ||
"context" | ||
|
||
"github.com/aws/aws-sdk-go/aws" | ||
"github.com/aws/aws-sdk-go/aws/session" | ||
"github.com/aws/aws-sdk-go/service/firehose" | ||
"github.com/aws/aws-sdk-go/service/firehose/firehoseiface" | ||
"github.com/gruntwork-io/cloud-nuke/config" | ||
"github.com/gruntwork-io/go-commons/errors" | ||
) | ||
|
||
type KinesisFirehose struct { | ||
BaseAwsResource | ||
Client firehoseiface.FirehoseAPI | ||
Region string | ||
Names []string | ||
} | ||
|
||
func (kf *KinesisFirehose) Init(session *session.Session) { | ||
kf.Client = firehose.New(session) | ||
} | ||
|
||
// ResourceName - The simple name of the AWS resource | ||
func (kf *KinesisFirehose) ResourceName() string { | ||
return "kinesis-firehose" | ||
} | ||
|
||
// ResourceIdentifiers - The names of the Kinesis Streams | ||
func (kf *KinesisFirehose) ResourceIdentifiers() []string { | ||
return kf.Names | ||
} | ||
|
||
func (kf *KinesisFirehose) MaxBatchSize() int { | ||
// Tentative batch size to ensure AWS doesn't throttle. Note that Kinesis Streams does not support bulk delete, so | ||
// we will be deleting this many in parallel using go routines. We pick 35 here, which is half of what the AWS web | ||
// console will do. We pick a conservative number here to avoid hitting AWS API rate limits. | ||
return 35 | ||
} | ||
|
||
func (kf *KinesisFirehose) GetAndSetResourceConfig(configObj config.Config) config.ResourceType { | ||
return configObj.KinesisFirehose | ||
} | ||
|
||
func (kf *KinesisFirehose) GetAndSetIdentifiers(c context.Context, configObj config.Config) ([]string, error) { | ||
identifiers, err := kf.getAll(c, configObj) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
kf.Names = aws.StringValueSlice(identifiers) | ||
return kf.Names, nil | ||
} | ||
|
||
// Nuke - nuke 'em all!!! | ||
func (kf *KinesisFirehose) Nuke(identifiers []string) error { | ||
if err := kf.nukeAll(aws.StringSlice(identifiers)); err != nil { | ||
return errors.WithStackTrace(err) | ||
} | ||
|
||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters