From 03b54f48ee16cb5ea285b0896180279f9185534b Mon Sep 17 00:00:00 2001 From: James Kwon <96548424+hongil0316@users.noreply.github.com> Date: Tue, 11 Jun 2024 18:36:56 -0400 Subject: [PATCH] Implement nuking for Kinesis firehose (#719) * implement kinesis firehose * implement kinesis firehose * implement kinesis firehose * implement kinesis firehose * implement kinesis firehose * implement kinesis firehose --- README.md | 2 + aws/resource_registry.go | 1 + aws/resources/kinesis_firehose.go | 64 ++++++++++++++++++ aws/resources/kinesis_firehose_test.go | 86 +++++++++++++++++++++++++ aws/resources/kinesis_firehose_types.go | 63 ++++++++++++++++++ config/config.go | 1 + config/config_test.go | 1 + 7 files changed, 218 insertions(+) create mode 100644 aws/resources/kinesis_firehose.go create mode 100644 aws/resources/kinesis_firehose_test.go create mode 100644 aws/resources/kinesis_firehose_types.go diff --git a/README.md b/README.md index b4fe3040..3123272a 100644 --- a/README.md +++ b/README.md @@ -75,6 +75,7 @@ Cloud-nuke suppports 🔎 inspecting and 🔥💀 deleting the following AWS res | Macie | Member accounts | | SageMaker | Notebook instances | | Kinesis | Streams | +| Kinesis | Firehose | | API Gateway | Gateways (v1 and v2) | | EFS | File systems | | SNS | Topics | @@ -587,6 +588,7 @@ of the file that are supported are listed here. | egress-only-internet-gateway| EgressOnlyInternetGateway | ✅ (Gateway name) | ✅ (Creation Time) | ✅ | ✅ | | kmscustomerkeys | KMSCustomerKeys | ✅ (Key Name) | ✅ (Creation Time) | ❌ | ❌ | | kinesis-stream | KinesisStream | ✅ (Stream Name) | ❌ | ❌ | ✅ | +| kinesis-firehose | KinesisFirehose | ✅ (Delivery Stream Name) | ❌ | ❌ | ✅ | | lambda | LambdaFunction | ✅ (Function Name) | ✅ (Last Modified Time) | ❌ | ✅ | | lc | LaunchConfiguration | ✅ (Launch Configuration Name) | ✅ (Created Time) | ❌ | ✅ | | lt | LaunchTemplate | ✅ (Launch Template Name) | ✅ (Created Time) | ❌ | ✅ | diff --git a/aws/resource_registry.go b/aws/resource_registry.go index d6e85517..a8549fb5 100644 --- a/aws/resource_registry.go +++ b/aws/resource_registry.go @@ -93,6 +93,7 @@ func getRegisteredRegionalResources() []AwsResource { &resources.LoadBalancers{}, &resources.LoadBalancersV2{}, &resources.GuardDuty{}, + &resources.KinesisFirehose{}, &resources.KinesisStreams{}, &resources.KmsCustomerKeys{}, &resources.LambdaFunctions{}, diff --git a/aws/resources/kinesis_firehose.go b/aws/resources/kinesis_firehose.go new file mode 100644 index 00000000..20be21b8 --- /dev/null +++ b/aws/resources/kinesis_firehose.go @@ -0,0 +1,64 @@ +package resources + +import ( + "context" + + awsgo "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/firehose" + "github.com/gruntwork-io/cloud-nuke/config" + "github.com/gruntwork-io/cloud-nuke/logging" + "github.com/gruntwork-io/cloud-nuke/report" + "github.com/gruntwork-io/go-commons/errors" +) + +func (kf *KinesisFirehose) getAll(_ context.Context, configObj config.Config) ([]*string, error) { + allStreams := []*string{} + output, err := kf.Client.ListDeliveryStreamsWithContext(kf.Context, &firehose.ListDeliveryStreamsInput{}) + + for _, stream := range output.DeliveryStreamNames { + if configObj.KinesisFirehose.ShouldInclude(config.ResourceValue{ + Name: stream, + }) { + allStreams = append(allStreams, stream) + } + } + + if err != nil { + return nil, errors.WithStackTrace(err) + } + + return allStreams, nil +} + +func (kf *KinesisFirehose) nukeAll(identifiers []*string) error { + if len(identifiers) == 0 { + logging.Debugf("No Kinesis Firhose to nuke in region %s", kf.Region) + return nil + } + + logging.Debugf("Deleting all Kinesis Firhose in region %s", kf.Region) + var deleted []*string + for _, id := range identifiers { + _, err := kf.Client.DeleteDeliveryStreamWithContext(kf.Context, &firehose.DeleteDeliveryStreamInput{ + AllowForceDelete: awsgo.Bool(true), + DeliveryStreamName: id, + }) + e := report.Entry{ + Identifier: awsgo.StringValue(id), + ResourceType: "Kinesis Firehose", + Error: err, + } + report.Record(e) + + if err != nil { + logging.Debugf("[Failed] %s", err) + } else { + deleted = append(deleted, id) + logging.Debugf("Deleted Kinesis Firehose: %s", *id) + } + } + + logging.Debugf("[OK] %d Kinesis Firehose(s) deleted in %s", len(deleted), kf.Region) + + return nil +} diff --git a/aws/resources/kinesis_firehose_test.go b/aws/resources/kinesis_firehose_test.go new file mode 100644 index 00000000..30905cb9 --- /dev/null +++ b/aws/resources/kinesis_firehose_test.go @@ -0,0 +1,86 @@ +package resources + +import ( + "context" + "regexp" + "testing" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/request" + "github.com/aws/aws-sdk-go/service/firehose" + "github.com/aws/aws-sdk-go/service/firehose/firehoseiface" + "github.com/gruntwork-io/cloud-nuke/config" + "github.com/stretchr/testify/require" +) + +type mockedKinesisFirehose struct { + firehoseiface.FirehoseAPI + ListDeliveryStreamsOutput firehose.ListDeliveryStreamsOutput + DeleteDeliveryStreamOutput firehose.DeleteDeliveryStreamOutput +} + +func (m mockedKinesisFirehose) ListDeliveryStreamsWithContext(aws.Context, *firehose.ListDeliveryStreamsInput, ...request.Option) (*firehose.ListDeliveryStreamsOutput, error) { + return &m.ListDeliveryStreamsOutput, nil +} + +func (m mockedKinesisFirehose) DeleteDeliveryStreamWithContext(aws.Context, *firehose.DeleteDeliveryStreamInput, ...request.Option) (*firehose.DeleteDeliveryStreamOutput, error) { + return &m.DeleteDeliveryStreamOutput, nil +} + +func TestKinesisFirehoseStreams_GetAll(t *testing.T) { + + t.Parallel() + + testName1 := "stream1" + testName2 := "stream2" + ks := KinesisFirehose{ + Client: mockedKinesisFirehose{ + ListDeliveryStreamsOutput: firehose.ListDeliveryStreamsOutput{ + DeliveryStreamNames: []*string{aws.String(testName1), aws.String(testName2)}, + }, + }, + } + + tests := map[string]struct { + configObj config.ResourceType + expected []string + }{ + "emptyFilter": { + configObj: config.ResourceType{}, + expected: []string{testName1, testName2}, + }, + "nameExclusionFilter": { + configObj: config.ResourceType{ + ExcludeRule: config.FilterRule{ + NamesRegExp: []config.Expression{{ + RE: *regexp.MustCompile(testName1), + }}}, + }, + expected: []string{testName2}, + }, + } + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + names, err := ks.getAll(context.Background(), config.Config{ + KinesisFirehose: tc.configObj, + }) + require.NoError(t, err) + require.Equal(t, tc.expected, aws.StringValueSlice(names)) + }) + } + +} + +func TestKinesisFirehose_NukeAll(t *testing.T) { + + t.Parallel() + + ks := KinesisFirehose{ + Client: mockedKinesisFirehose{ + DeleteDeliveryStreamOutput: firehose.DeleteDeliveryStreamOutput{}, + }, + } + + err := ks.nukeAll([]*string{aws.String("test")}) + require.NoError(t, err) +} diff --git a/aws/resources/kinesis_firehose_types.go b/aws/resources/kinesis_firehose_types.go new file mode 100644 index 00000000..bc00a394 --- /dev/null +++ b/aws/resources/kinesis_firehose_types.go @@ -0,0 +1,63 @@ +package resources + +import ( + "context" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/firehose" + "github.com/aws/aws-sdk-go/service/firehose/firehoseiface" + "github.com/gruntwork-io/cloud-nuke/config" + "github.com/gruntwork-io/go-commons/errors" +) + +type KinesisFirehose struct { + BaseAwsResource + Client firehoseiface.FirehoseAPI + Region string + Names []string +} + +func (kf *KinesisFirehose) Init(session *session.Session) { + kf.Client = firehose.New(session) +} + +// ResourceName - The simple name of the AWS resource +func (kf *KinesisFirehose) ResourceName() string { + return "kinesis-firehose" +} + +// ResourceIdentifiers - The names of the Kinesis Streams +func (kf *KinesisFirehose) ResourceIdentifiers() []string { + return kf.Names +} + +func (kf *KinesisFirehose) MaxBatchSize() int { + // Tentative batch size to ensure AWS doesn't throttle. Note that Kinesis Streams does not support bulk delete, so + // we will be deleting this many in parallel using go routines. We pick 35 here, which is half of what the AWS web + // console will do. We pick a conservative number here to avoid hitting AWS API rate limits. + return 35 +} + +func (kf *KinesisFirehose) GetAndSetResourceConfig(configObj config.Config) config.ResourceType { + return configObj.KinesisFirehose +} + +func (kf *KinesisFirehose) GetAndSetIdentifiers(c context.Context, configObj config.Config) ([]string, error) { + identifiers, err := kf.getAll(c, configObj) + if err != nil { + return nil, err + } + + kf.Names = aws.StringValueSlice(identifiers) + return kf.Names, nil +} + +// Nuke - nuke 'em all!!! +func (kf *KinesisFirehose) Nuke(identifiers []string) error { + if err := kf.nukeAll(aws.StringSlice(identifiers)); err != nil { + return errors.WithStackTrace(err) + } + + return nil +} diff --git a/config/config.go b/config/config.go index 4018b337..a09dc1dd 100644 --- a/config/config.go +++ b/config/config.go @@ -68,6 +68,7 @@ type Config struct { IAMUsers ResourceType `yaml:"IAMUsers"` KMSCustomerKeys KMSCustomerKeyResourceType `yaml:"KMSCustomerKeys"` KinesisStream ResourceType `yaml:"KinesisStream"` + KinesisFirehose ResourceType `yaml:"KinesisFirehose"` LambdaFunction ResourceType `yaml:"LambdaFunction"` LambdaLayer ResourceType `yaml:"LambdaLayer"` LaunchConfiguration ResourceType `yaml:"LaunchConfiguration"` diff --git a/config/config_test.go b/config/config_test.go index 5b508fa7..9ffe1dbe 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -97,6 +97,7 @@ func emptyConfig() *Config { ResourceType{FilterRule{}, FilterRule{}, ""}, ResourceType{FilterRule{}, FilterRule{}, ""}, ResourceType{FilterRule{}, FilterRule{}, ""}, + ResourceType{FilterRule{}, FilterRule{}, ""}, EC2ResourceType{false, ResourceType{FilterRule{}, FilterRule{}, ""}}, ResourceType{FilterRule{}, FilterRule{}, ""}, ResourceType{FilterRule{}, FilterRule{}, ""},