Skip to content

Commit

Permalink
Add a state in cloud account config (#286)
Browse files Browse the repository at this point in the history
* Add a state in cloud account config

The cloud account config contains the service configs and cloud
api clients. Whenever cloud account config update fails, reset the
cloud account config state to invalid.

Any consumer of cloud account config which make the cloud API calls
should check the cloud account config state and proceed only if
its in a valid state.

Signed-off-by: Anand Kumar <[email protected]>

* Fix an issue in tracker module.

Consider a scenario where the following sequence of events are received.
An ANP with Ipaddress block and ATGroup is realized -> Secret delete ->
Sync with cloud finds diff and updates member -> AT Modify -> AT Modofy
-> NP delete -> ProcessTrackers -> AT delete.

This results in a scenario where a tracker has one appliedToSgs, there are
no nps in npIndexers, prevAppliedToSgs is nil, computing NP status will
result in an empty appliedToToNpMap and there by an error.

Fix is to add an additional check to compute Npstatus from
appliedToToNpMap and appliedToSgs.

Signed-off-by: Anand Kumar <[email protected]>

---------

Signed-off-by: Anand Kumar <[email protected]>
  • Loading branch information
Anandkumar26 authored Aug 14, 2023
1 parent c930f97 commit 4cae82e
Show file tree
Hide file tree
Showing 12 changed files with 217 additions and 115 deletions.
7 changes: 4 additions & 3 deletions pkg/accountmanager/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,15 +136,16 @@ func (p *accountPoller) doAccountPolling() {
defer p.mutex.Unlock()

p.pollDone = false
// Ignoring error since it is captured in the CloudProviderAccount CR's status field.
_ = p.cloudInterface.DoInventoryPoll(p.accountNamespacedName)

defer func() {
p.pollDone = true
// Update status on CPA CR after polling.
p.updateAccountStatus(p.cloudInterface)
}()

if err := p.cloudInterface.DoInventoryPoll(p.accountNamespacedName); err != nil {
return
}

cloudInventory, err := p.cloudInterface.GetCloudInventory(p.accountNamespacedName)
if err != nil {
p.log.Error(err, "failed to fetch cloud inventory from internal snapshot", "account",
Expand Down
12 changes: 10 additions & 2 deletions pkg/apiserver/registry/virtualmachinepolicy/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,10 @@ func (r *REST) Get(ctx context.Context, name string, _ *metav1.GetOptions) (runt
return nil, errors.NewNotFound(runtimev1alpha1.Resource("virtualmachinepolicy"), name)
}
// For a given vm namespaced name there should be only one matching tracker.
return r.convertToVmp(objs[0]), nil
if vmp := r.convertToVmp(objs[0]); vmp != nil {
return vmp, nil
}
return nil, nil
}

func (r *REST) List(ctx context.Context, _ *internalversion.ListOptions) (runtime.Object, error) {
Expand All @@ -97,7 +100,9 @@ func (r *REST) List(ctx context.Context, _ *internalversion.ListOptions) (runtim
vmpList := &runtimev1alpha1.VirtualMachinePolicyList{}
for _, obj := range objs {
vmp := r.convertToVmp(obj)
vmpList.Items = append(vmpList.Items, *vmp)
if vmp != nil {
vmpList.Items = append(vmpList.Items, *vmp)
}
}
return vmpList, nil
}
Expand Down Expand Up @@ -135,6 +140,9 @@ func (r *REST) ConvertToTable(_ context.Context, obj runtime.Object, _ runtime.O
func (r *REST) convertToVmp(obj interface{}) *runtimev1alpha1.VirtualMachinePolicy {
failed, inProgress := false, false
tracker := obj.(*networkpolicy.CloudResourceNpTracker)
if len(tracker.NpStatus) == 0 {
return nil
}
for _, status := range tracker.NpStatus {
if status.Realization == runtimev1alpha1.Failed {
failed = true
Expand Down
2 changes: 1 addition & 1 deletion pkg/cloudprovider/plugins/aws/aws_account_mgmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func extractSecret(c client.Client, s *crdv1alpha1.SecretReference) (*crdv1alpha
}

if err = json.Unmarshal(decode, cred); err != nil {
return cred, fmt.Errorf("error unmarshalling credentials: %v/%v", s.Namespace, s.Name)
return cred, fmt.Errorf("%v, error unmarshalling credentials: %v/%v", util.ErrorMsgSecretReference, s.Namespace, s.Name)
}

if (cred.AccessKeyID == "" || cred.AccessKeySecret == "") && cred.RoleArn == "" {
Expand Down
35 changes: 20 additions & 15 deletions pkg/cloudprovider/plugins/aws/aws_security_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ import (
// CreateSecurityGroup invokes cloud api and creates the cloud security group based on securityGroupIdentifier.
func (c *awsCloud) CreateSecurityGroup(securityGroupIdentifier *cloudresource.CloudResource, membershipOnly bool) (*string, error) {
vpcID := securityGroupIdentifier.Vpc
accCfg, found := c.cloudCommon.GetCloudAccountByAccountId(&securityGroupIdentifier.AccountID)
if !found {
return nil, fmt.Errorf("aws account not found managing virtual private cloud [%v]", vpcID)
accCfg, err := c.cloudCommon.GetCloudAccountByAccountId(&securityGroupIdentifier.AccountID)
if err != nil {
return nil, fmt.Errorf("%v, virtual private cloud: %v", err, vpcID)
}

accCfg.LockMutex()
defer accCfg.UnlockMutex()

Expand All @@ -54,10 +55,11 @@ func (c *awsCloud) UpdateSecurityGroupRules(appliedToGroupIdentifier *cloudresou
rmIRule, rmERule := utils.SplitCloudRulesByDirection(rmRules)

vpcID := appliedToGroupIdentifier.Vpc
accCfg, found := c.cloudCommon.GetCloudAccountByAccountId(&appliedToGroupIdentifier.AccountID)
if !found {
return fmt.Errorf("aws account not found managing virtual private cloud [%v]", vpcID)
accCfg, err := c.cloudCommon.GetCloudAccountByAccountId(&appliedToGroupIdentifier.AccountID)
if err != nil {
return fmt.Errorf("%v, virtual private cloud: %v", err, vpcID)
}

accCfg.LockMutex()
defer accCfg.UnlockMutex()

Expand Down Expand Up @@ -145,10 +147,11 @@ func (c *awsCloud) UpdateSecurityGroupRules(appliedToGroupIdentifier *cloudresou
func (c *awsCloud) UpdateSecurityGroupMembers(securityGroupIdentifier *cloudresource.CloudResource,
cloudResourceIdentifiers []*cloudresource.CloudResource, membershipOnly bool) error {
vpcID := securityGroupIdentifier.Vpc
accCfg, found := c.cloudCommon.GetCloudAccountByAccountId(&securityGroupIdentifier.AccountID)
if !found {
return fmt.Errorf("aws account not found managing virtual private cloud [%v]", vpcID)
accCfg, err := c.cloudCommon.GetCloudAccountByAccountId(&securityGroupIdentifier.AccountID)
if err != nil {
return fmt.Errorf("%v, virtual private cloud: %v", err, vpcID)
}

accCfg.LockMutex()
defer accCfg.UnlockMutex()

Expand Down Expand Up @@ -178,10 +181,11 @@ func (c *awsCloud) UpdateSecurityGroupMembers(securityGroupIdentifier *cloudreso
// DeleteSecurityGroup invokes cloud api and deletes the cloud security group. Any attached resource will be moved to default sg.
func (c *awsCloud) DeleteSecurityGroup(securityGroupIdentifier *cloudresource.CloudResource, membershipOnly bool) error {
vpcID := securityGroupIdentifier.Vpc
accCfg, found := c.cloudCommon.GetCloudAccountByAccountId(&securityGroupIdentifier.AccountID)
if !found {
return fmt.Errorf("aws account not found managing virtual private cloud [%v]", vpcID)
accCfg, err := c.cloudCommon.GetCloudAccountByAccountId(&securityGroupIdentifier.AccountID)
if err != nil {
return fmt.Errorf("%v, virtual private cloud: %v", err, vpcID)
}

accCfg.LockMutex()
defer accCfg.UnlockMutex()

Expand Down Expand Up @@ -238,11 +242,12 @@ func (c *awsCloud) GetEnforcedSecurity() []cloudresource.SynchronizationContent
go func(name *types.NamespacedName, sendCh chan<- []cloudresource.SynchronizationContent) {
defer wg.Done()

accCfg, found := c.cloudCommon.GetCloudAccountByName(name)
if !found {
awsPluginLogger().Info("Enforced-security-cloud-view GET for account skipped (account no longer exists)", "account", name)
accCfg, err := c.cloudCommon.GetCloudAccountByName(name)
if err != nil {
awsPluginLogger().Info("Enforced-security-cloud-view GET for account skipped", "err", err)
return
}

accCfg.LockMutex()
defer accCfg.UnlockMutex()

Expand Down
41 changes: 23 additions & 18 deletions pkg/cloudprovider/plugins/aws/aws_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ package aws

import (
"context"
"errors"
"fmt"
"reflect"
"sort"
"strings"
"time"

"github.com/aws/aws-sdk-go/aws"
Expand All @@ -35,6 +35,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client/fake"

"antrea.io/nephe/apis/crd/v1alpha1"
"antrea.io/nephe/pkg/cloudprovider/plugins/internal"
)

var (
Expand Down Expand Up @@ -153,8 +154,8 @@ var _ = Describe("AWS cloud", func() {

err := c.AddProviderAccount(fakeClient, account)
Expect(err).Should(BeNil())
accCfg, found := c.cloudCommon.GetCloudAccountByName(&testAccountNamespacedName)
Expect(found).To(BeTrue())
accCfg, err := c.cloudCommon.GetCloudAccountByName(&testAccountNamespacedName)
Expect(err).To(BeNil())
Expect(accCfg).To(Not(BeNil()))

errPolAdd := c.DoInventoryPoll(&testAccountNamespacedName)
Expand Down Expand Up @@ -188,8 +189,8 @@ var _ = Describe("AWS cloud", func() {

err := c.AddProviderAccount(fakeClient, account)
Expect(err).Should(BeNil())
accCfg, found := c.cloudCommon.GetCloudAccountByName(&testAccountNamespacedName)
Expect(found).To(BeTrue())
accCfg, err := c.cloudCommon.GetCloudAccountByName(&testAccountNamespacedName)
Expect(err).To(BeNil())
Expect(accCfg).To(Not(BeNil()))

errPolAdd := c.DoInventoryPoll(&testAccountNamespacedName)
Expand Down Expand Up @@ -224,8 +225,8 @@ var _ = Describe("AWS cloud", func() {

err := c.AddProviderAccount(fakeClient, account)
Expect(err).Should(BeNil())
accCfg, found := c.cloudCommon.GetCloudAccountByName(&testAccountNamespacedName)
Expect(found).To(BeTrue())
accCfg, err := c.cloudCommon.GetCloudAccountByName(&testAccountNamespacedName)
Expect(err).To(BeNil())
Expect(accCfg).To(Not(BeNil()))

errPolAdd := c.DoInventoryPoll(&testAccountNamespacedName)
Expand Down Expand Up @@ -282,8 +283,8 @@ var _ = Describe("AWS cloud", func() {
err := c.AddProviderAccount(fakeClient, account)

Expect(err).Should(BeNil())
accCfg, found := c.cloudCommon.GetCloudAccountByName(&testAccountNamespacedName)
Expect(found).To(BeTrue())
accCfg, err := c.cloudCommon.GetCloudAccountByName(&testAccountNamespacedName)
Expect(err).To(BeNil())
Expect(accCfg).To(Not(BeNil()))

errSelAdd := c.AddAccountResourceSelector(&testAccountNamespacedName, selector)
Expand All @@ -306,8 +307,8 @@ var _ = Describe("AWS cloud", func() {
c := newAWSCloud(mockawsCloudHelper)
err := c.AddProviderAccount(fakeClient, account)
Expect(err).Should(BeNil())
accCfg, found := c.cloudCommon.GetCloudAccountByName(&testAccountNamespacedName)
Expect(found).To(BeTrue())
accCfg, err := c.cloudCommon.GetCloudAccountByName(&testAccountNamespacedName)
Expect(err).To(BeNil())
Expect(accCfg).To(Not(BeNil()))

errSelAdd := c.AddAccountResourceSelector(&testAccountNamespacedName, selector)
Expand Down Expand Up @@ -698,9 +699,9 @@ func createVpcObject(vpcIDs []string) *ec2.DescribeVpcsOutput {
func checkAccountAddSuccessCondition(c *awsCloud, namespacedName types.NamespacedName, selectorNamespacedName types.NamespacedName,
ids []string) error {
conditionFunc := func() (done bool, e error) {
accCfg, found := c.cloudCommon.GetCloudAccountByName(&namespacedName)
if !found {
return true, errors.New("failed to find account")
accCfg, err := c.cloudCommon.GetCloudAccountByName(&namespacedName)
if err != nil && strings.Contains(err.Error(), internal.AccountConfigNotFound) {
return true, err
}

instances := accCfg.GetServiceConfig().(*ec2ServiceConfig).getCachedInstances(&selectorNamespacedName)
Expand All @@ -723,9 +724,9 @@ func checkAccountAddSuccessCondition(c *awsCloud, namespacedName types.Namespace

func checkVpcPollResult(c *awsCloud, namespacedName types.NamespacedName, ids []string) error {
conditionFunc := func() (done bool, e error) {
accCfg, found := c.cloudCommon.GetCloudAccountByName(&namespacedName)
if !found {
return true, errors.New("failed to find account")
accCfg, err := c.cloudCommon.GetCloudAccountByName(&namespacedName)
if err != nil && strings.Contains(err.Error(), internal.AccountConfigNotFound) {
return true, err
}

vpcs := accCfg.GetServiceConfig().(*ec2ServiceConfig).getCachedVpcs()
Expand All @@ -748,7 +749,11 @@ func checkVpcPollResult(c *awsCloud, namespacedName types.NamespacedName, ids []
}

func getFilters(c *awsCloud, selectorNamespacedName types.NamespacedName) [][]*ec2.Filter {
accCfg, _ := c.cloudCommon.GetCloudAccountByName(&types.NamespacedName{Namespace: "namespace01", Name: "account01"})
accCfg, err := c.cloudCommon.GetCloudAccountByName(&types.NamespacedName{Namespace: "namespace01",
Name: "account01"})
if err != nil && strings.Contains(err.Error(), internal.AccountConfigNotFound) {
return nil
}
if obj, found := accCfg.GetServiceConfig().(*ec2ServiceConfig).instanceFilters[selectorNamespacedName]; found {
return obj
}
Expand Down
44 changes: 24 additions & 20 deletions pkg/cloudprovider/plugins/azure/azure_security_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ func (c *azureCloud) CreateSecurityGroup(securityGroupIdentifier *cloudresource.
var cloudSecurityGroupID string
// find account managing the vnet
vnetID := securityGroupIdentifier.Vpc
accCfg, found := c.cloudCommon.GetCloudAccountByAccountId(&securityGroupIdentifier.AccountID)
if !found {
azurePluginLogger().Info("Azure account not found managing virtual network", vnetID, "vnetID")
return nil, fmt.Errorf("azure account not found managing virtual network [%v]", vnetID)
accCfg, err := c.cloudCommon.GetCloudAccountByAccountId(&securityGroupIdentifier.AccountID)
if err != nil {
return nil, fmt.Errorf("%v, virtual network: %v", err, vnetID)
}

accCfg.LockMutex()
defer accCfg.UnlockMutex()

Expand Down Expand Up @@ -85,10 +85,11 @@ func (c *azureCloud) UpdateSecurityGroupRules(appliedToGroupIdentifier *cloudres
addRules, rmRules []*cloudresource.CloudRule) error {
// find account managing the vnet and get compute service config
vnetID := appliedToGroupIdentifier.Vpc
accCfg, found := c.cloudCommon.GetCloudAccountByAccountId(&appliedToGroupIdentifier.AccountID)
if !found {
return fmt.Errorf("azure account not found managing virtual network [%v]", vnetID)
accCfg, err := c.cloudCommon.GetCloudAccountByAccountId(&appliedToGroupIdentifier.AccountID)
if err != nil {
return fmt.Errorf("%v, virtual network: %v", err, vnetID)
}

accCfg.LockMutex()
defer accCfg.UnlockMutex()

Expand Down Expand Up @@ -151,10 +152,11 @@ func (c *azureCloud) UpdateSecurityGroupRules(appliedToGroupIdentifier *cloudres
func (c *azureCloud) UpdateSecurityGroupMembers(securityGroupIdentifier *cloudresource.CloudResource,
computeResourceIdentifier []*cloudresource.CloudResource, membershipOnly bool) error {
vnetID := securityGroupIdentifier.Vpc
accCfg, found := c.cloudCommon.GetCloudAccountByAccountId(&securityGroupIdentifier.AccountID)
if !found {
return fmt.Errorf("azure account not found managing virtual network [%v]", vnetID)
accCfg, err := c.cloudCommon.GetCloudAccountByAccountId(&securityGroupIdentifier.AccountID)
if err != nil {
return fmt.Errorf("%v, virtual network: %v", err, vnetID)
}

accCfg.LockMutex()
defer accCfg.UnlockMutex()

Expand All @@ -165,19 +167,18 @@ func (c *azureCloud) UpdateSecurityGroupMembers(securityGroupIdentifier *cloudre
// DeleteSecurityGroup invokes cloud api and deletes the cloud application security group.
func (c *azureCloud) DeleteSecurityGroup(securityGroupIdentifier *cloudresource.CloudResource, membershipOnly bool) error {
vnetID := securityGroupIdentifier.Vpc
accCfg, found := c.cloudCommon.GetCloudAccountByAccountId(&securityGroupIdentifier.AccountID)
if !found {
return fmt.Errorf("azure account not found managing virtual network [%v]", vnetID)
accCfg, err := c.cloudCommon.GetCloudAccountByAccountId(&securityGroupIdentifier.AccountID)
if err != nil {
return fmt.Errorf("%v, virtual network: %v", err, vnetID)
}

accCfg.LockMutex()
defer accCfg.UnlockMutex()

computeService := accCfg.GetServiceConfig().(*computeServiceConfig)
location := computeService.credentials.region

_ = computeService.updateSecurityGroupMembers(&securityGroupIdentifier.CloudResourceID, nil, membershipOnly)

var rgName string
_, rgName, _, err := extractFieldsFromAzureResourceID(securityGroupIdentifier.Vpc)
if err != nil {
return err
Expand All @@ -196,9 +197,8 @@ func (c *azureCloud) DeleteSecurityGroup(securityGroupIdentifier *cloudresource.
} else {
cloudAsgName = securityGroupIdentifier.GetCloudName(membershipOnly)
}
err = computeService.asgAPIClient.delete(context.Background(), rgName, cloudAsgName)

return err
return computeService.asgAPIClient.delete(context.Background(), rgName, cloudAsgName)
}

func (c *azureCloud) GetEnforcedSecurity() []cloudresource.SynchronizationContent {
Expand Down Expand Up @@ -226,12 +226,16 @@ func (c *azureCloud) GetEnforcedSecurity() []cloudresource.SynchronizationConten
go func(name *types.NamespacedName, sendCh chan<- []cloudresource.SynchronizationContent) {
defer wg.Done()

accCfg, found := c.cloudCommon.GetCloudAccountByName(name)
if !found {
azurePluginLogger().Info("Enforced-security-cloud-view GET for account skipped (account no longer exists)", "account", name)
accCfg, err := c.cloudCommon.GetCloudAccountByName(name)
if err != nil {
azurePluginLogger().Info("Enforced-security-cloud-view GET for account skipped", "err",
err)
return
}

accCfg.LockMutex()
defer accCfg.UnlockMutex()

computeService := accCfg.GetServiceConfig().(*computeServiceConfig)
if err := computeService.waitForInventoryInit(internal.InventoryInitWaitDuration); err != nil {
azurePluginLogger().Error(err, "enforced-security-cloud-view GET for account skipped", "account", accCfg.GetNamespacedName())
Expand Down
8 changes: 6 additions & 2 deletions pkg/cloudprovider/plugins/azure/azure_security_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,9 @@ var _ = Describe("Azure Cloud Security", func() {
err = c.AddAccountResourceSelector(testAccountNamespacedName, selector)
Expect(err).Should(BeNil())

accCfg, _ := c.cloudCommon.GetCloudAccountByName(testAccountNamespacedName)
accCfg, err := c.cloudCommon.GetCloudAccountByName(testAccountNamespacedName)
Expect(err).To(BeNil())
Expect(accCfg).To(Not(BeNil()))
serviceConfig := accCfg.GetServiceConfig()
selectorNamespacedName := types.NamespacedName{Namespace: selector.Namespace, Name: selector.Name}
inventory := serviceConfig.(*computeServiceConfig).GetCloudInventory()
Expand Down Expand Up @@ -895,7 +897,9 @@ var _ = Describe("Azure Cloud Security", func() {
VnetID: &testVnetID03,
})

accCfg, _ := c.cloudCommon.GetCloudAccountByName(testAccountNamespacedName)
accCfg, err := c.cloudCommon.GetCloudAccountByName(testAccountNamespacedName)
Expect(err).To(BeNil())
Expect(accCfg).To(Not(BeNil()))
serviceConfig := accCfg.GetServiceConfig()
selectorNamespacedName := types.NamespacedName{Namespace: selector.Namespace, Name: selector.Name}
snapshot := serviceConfig.(*computeServiceConfig).resourcesCache.GetSnapshot()
Expand Down
Loading

0 comments on commit 4cae82e

Please sign in to comment.