Skip to content

Commit

Permalink
feat: add timeout for update cluster info (argoproj#14511)
Browse files Browse the repository at this point in the history
* chore: simplified parsing of startup parameters

Signed-off-by: yyzxw <[email protected]>

* feat: add timeout for update cluster info

Signed-off-by: yyzxw <[email protected]>

---------

Signed-off-by: yyzxw <[email protected]>
  • Loading branch information
yyzxw authored Aug 17, 2023
1 parent f9961a0 commit 2dbc6c7
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,7 @@ import (
argosettings "github.com/argoproj/argo-cd/v2/util/settings"
)

// TODO: load this using Cobra.
func getSubmoduleEnabled() bool {
return env.ParseBoolFromEnv(common.EnvGitSubmoduleEnabled, true)
}
var gitSubmoduleEnabled = env.ParseBoolFromEnv(common.EnvGitSubmoduleEnabled, true)

func NewCommand() *cobra.Command {
var (
Expand Down Expand Up @@ -156,7 +153,7 @@ func NewCommand() *cobra.Command {
}

repoClientset := apiclient.NewRepoServerClientset(argocdRepoServer, repoServerTimeoutSeconds, tlsConfig)
argoCDService, err := services.NewArgoCDService(argoCDDB, getSubmoduleEnabled(), repoClientset, enableNewGitFileGlobbing)
argoCDService, err := services.NewArgoCDService(argoCDDB, gitSubmoduleEnabled, repoClientset, enableNewGitFileGlobbing)
errors.CheckError(err)

terminalGenerators := map[string]generators.Generator{
Expand Down
47 changes: 15 additions & 32 deletions cmd/argocd-repo-server/commands/argocd_repo_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,33 +35,16 @@ import (

const (
// CLIName is the name of the CLI
cliName = "argocd-repo-server"
gnuPGSourcePath = "/app/config/gpg/source"

defaultPauseGenerationAfterFailedGenerationAttempts = 3
defaultPauseGenerationOnFailureForMinutes = 60
defaultPauseGenerationOnFailureForRequests = 0
cliName = "argocd-repo-server"
)

func getGnuPGSourcePath() string {
return env.StringFromEnv(common.EnvGPGDataPath, gnuPGSourcePath)
}

func getPauseGenerationAfterFailedGenerationAttempts() int {
return env.ParseNumFromEnv(common.EnvPauseGenerationAfterFailedAttempts, defaultPauseGenerationAfterFailedGenerationAttempts, 0, math.MaxInt32)
}

func getPauseGenerationOnFailureForMinutes() int {
return env.ParseNumFromEnv(common.EnvPauseGenerationMinutes, defaultPauseGenerationOnFailureForMinutes, 0, math.MaxInt32)
}

func getPauseGenerationOnFailureForRequests() int {
return env.ParseNumFromEnv(common.EnvPauseGenerationRequests, defaultPauseGenerationOnFailureForRequests, 0, math.MaxInt32)
}

func getSubmoduleEnabled() bool {
return env.ParseBoolFromEnv(common.EnvGitSubmoduleEnabled, true)
}
var (
gnuPGSourcePath = env.StringFromEnv(common.EnvGPGDataPath, "/app/config/gpg/source")
pauseGenerationAfterFailedGenerationAttempts = env.ParseNumFromEnv(common.EnvPauseGenerationAfterFailedAttempts, 3, 0, math.MaxInt32)
pauseGenerationOnFailureForMinutes = env.ParseNumFromEnv(common.EnvPauseGenerationMinutes, 60, 0, math.MaxInt32)
pauseGenerationOnFailureForRequests = env.ParseNumFromEnv(common.EnvPauseGenerationRequests, 0, 0, math.MaxInt32)
gitSubmoduleEnabled = env.ParseBoolFromEnv(common.EnvGitSubmoduleEnabled, true)
)

func NewCommand() *cobra.Command {
var (
Expand Down Expand Up @@ -124,10 +107,10 @@ func NewCommand() *cobra.Command {
cacheutil.CollectMetrics(redisClient, metricsServer)
server, err := reposerver.NewServer(metricsServer, cache, tlsConfigCustomizer, repository.RepoServerInitConstants{
ParallelismLimit: parallelismLimit,
PauseGenerationAfterFailedGenerationAttempts: getPauseGenerationAfterFailedGenerationAttempts(),
PauseGenerationOnFailureForMinutes: getPauseGenerationOnFailureForMinutes(),
PauseGenerationOnFailureForRequests: getPauseGenerationOnFailureForRequests(),
SubmoduleEnabled: getSubmoduleEnabled(),
PauseGenerationAfterFailedGenerationAttempts: pauseGenerationAfterFailedGenerationAttempts,
PauseGenerationOnFailureForMinutes: pauseGenerationOnFailureForMinutes,
PauseGenerationOnFailureForRequests: pauseGenerationOnFailureForRequests,
SubmoduleEnabled: gitSubmoduleEnabled,
MaxCombinedDirectoryManifestsSize: maxCombinedDirectoryManifestsQuantity,
CMPTarExcludedGlobs: cmpTarExcludedGlobs,
AllowOutOfBoundsSymlinks: allowOutOfBoundsSymlinks,
Expand Down Expand Up @@ -181,12 +164,12 @@ func NewCommand() *cobra.Command {
err = gpg.InitializeGnuPG()
errors.CheckError(err)

log.Infof("Populating GnuPG keyring with keys from %s", getGnuPGSourcePath())
added, removed, err := gpg.SyncKeyRingFromDirectory(getGnuPGSourcePath())
log.Infof("Populating GnuPG keyring with keys from %s", gnuPGSourcePath)
added, removed, err := gpg.SyncKeyRingFromDirectory(gnuPGSourcePath)
errors.CheckError(err)
log.Infof("Loaded %d (and removed %d) keys from keyring", len(added), len(removed))

go func() { errors.CheckError(reposerver.StartGPGWatcher(getGnuPGSourcePath())) }()
go func() { errors.CheckError(reposerver.StartGPGWatcher(gnuPGSourcePath)) }()
}

log.Infof("argocd-repo-server is listening on %s", listener.Addr())
Expand Down
9 changes: 2 additions & 7 deletions cmd/argocd-server/commands/argocd_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,10 @@ const (
)

var (
failureRetryCount = 0
failureRetryPeriodMilliSeconds = 100
failureRetryCount = env.ParseNumFromEnv(failureRetryCountEnv, 0, 0, 10)
failureRetryPeriodMilliSeconds = env.ParseNumFromEnv(failureRetryPeriodMilliSecondsEnv, 100, 0, 1000)
)

func init() {
failureRetryCount = env.ParseNumFromEnv(failureRetryCountEnv, failureRetryCount, 0, 10)
failureRetryPeriodMilliSeconds = env.ParseNumFromEnv(failureRetryPeriodMilliSecondsEnv, failureRetryPeriodMilliSeconds, 0, 1000)
}

// NewCommand returns a new instance of an argocd command
func NewCommand() *cobra.Command {
var (
Expand Down
35 changes: 27 additions & 8 deletions controller/clusterinfoupdater.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package controller
import (
"context"
"fmt"
"time"

"github.com/argoproj/argo-cd/v2/util/env"
"github.com/argoproj/gitops-engine/pkg/cache"
"github.com/argoproj/gitops-engine/pkg/utils/kube"
log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"time"

"github.com/argoproj/argo-cd/v2/controller/metrics"
appv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
Expand All @@ -19,7 +21,13 @@ import (
)

const (
secretUpdateInterval = 10 * time.Second
defaultSecretUpdateInterval = 10 * time.Second

EnvClusterInfoTimeout = "ARGO_CD_UPDATE_CLUSTER_INFO_TIMEOUT"
)

var (
clusterInfoTimeout = env.ParseDurationFromEnv(EnvClusterInfoTimeout, defaultSecretUpdateInterval, defaultSecretUpdateInterval, 1*time.Minute)
)

type clusterInfoUpdater struct {
Expand All @@ -30,6 +38,7 @@ type clusterInfoUpdater struct {
clusterFilter func(cluster *appv1.Cluster) bool
projGetter func(app *appv1.Application) (*appv1.AppProject, error)
namespace string
lastUpdated time.Time
}

func NewClusterInfoUpdater(
Expand All @@ -41,12 +50,12 @@ func NewClusterInfoUpdater(
projGetter func(app *appv1.Application) (*appv1.AppProject, error),
namespace string) *clusterInfoUpdater {

return &clusterInfoUpdater{infoSource, db, appLister, cache, clusterFilter, projGetter, namespace}
return &clusterInfoUpdater{infoSource, db, appLister, cache, clusterFilter, projGetter, namespace, time.Time{}}
}

func (c *clusterInfoUpdater) Run(ctx context.Context) {
c.updateClusters()
ticker := time.NewTicker(secretUpdateInterval)
ticker := time.NewTicker(clusterInfoTimeout)
for {
select {
case <-ctx.Done():
Expand All @@ -59,13 +68,23 @@ func (c *clusterInfoUpdater) Run(ctx context.Context) {
}

func (c *clusterInfoUpdater) updateClusters() {
if time.Since(c.lastUpdated) < clusterInfoTimeout {
return
}

ctx, cancel := context.WithTimeout(context.Background(), clusterInfoTimeout)
defer func() {
cancel()
c.lastUpdated = time.Now()
}()

infoByServer := make(map[string]*cache.ClusterInfo)
clustersInfo := c.infoSource.GetClustersInfo()
for i := range clustersInfo {
info := clustersInfo[i]
infoByServer[info.Server] = &info
}
clusters, err := c.db.ListClusters(context.Background())
clusters, err := c.db.ListClusters(ctx)
if err != nil {
log.Warnf("Failed to save clusters info: %v", err)
return
Expand All @@ -82,15 +101,15 @@ func (c *clusterInfoUpdater) updateClusters() {
}
_ = kube.RunAllAsync(len(clustersFiltered), func(i int) error {
cluster := clustersFiltered[i]
if err := c.updateClusterInfo(cluster, infoByServer[cluster.Server]); err != nil {
if err := c.updateClusterInfo(ctx, cluster, infoByServer[cluster.Server]); err != nil {
log.Warnf("Failed to save clusters info: %v", err)
}
return nil
})
log.Debugf("Successfully saved info of %d clusters", len(clustersFiltered))
}

func (c *clusterInfoUpdater) updateClusterInfo(cluster appv1.Cluster, info *cache.ClusterInfo) error {
func (c *clusterInfoUpdater) updateClusterInfo(ctx context.Context, cluster appv1.Cluster, info *cache.ClusterInfo) error {
apps, err := c.appLister.List(labels.Everything())
if err != nil {
return fmt.Errorf("error while fetching the apps list: %w", err)
Expand All @@ -103,7 +122,7 @@ func (c *clusterInfoUpdater) updateClusterInfo(cluster appv1.Cluster, info *cach
continue
}
}
if err := argo.ValidateDestination(context.Background(), &a.Spec.Destination, c.db); err != nil {
if err := argo.ValidateDestination(ctx, &a.Spec.Destination, c.db); err != nil {
continue
}
if a.Spec.Destination.Server == cluster.Server {
Expand Down
2 changes: 1 addition & 1 deletion controller/clusterinfoupdater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestClusterSecretUpdater(t *testing.T) {
lister := applisters.NewApplicationLister(appInformer.GetIndexer()).Applications(fakeNamespace)
updater := NewClusterInfoUpdater(nil, argoDB, lister, appCache, nil, nil, fakeNamespace)

err = updater.updateClusterInfo(*cluster, info)
err = updater.updateClusterInfo(context.Background(), *cluster, info)
assert.NoError(t, err, "Invoking updateClusterInfo failed.")

var clusterInfo v1alpha1.ClusterInfo
Expand Down
2 changes: 2 additions & 0 deletions docs/operator-manual/high_availability.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ controller replicas. To enable sharding increase the number of replicas in `argo
and repeat the number of replicas in the `ARGOCD_CONTROLLER_REPLICAS` environment variable. The strategic merge patch below
demonstrates changes required to configure two controller replicas.

* By default, the controller will update the cluster information every 10 seconds. If there is a problem with your cluster network environment that is causing the update time to take a long time, you can try modifying the environment variable `ARGO_CD_UPDATE_CLUSTER_INFO_TIMEOUT` to increase the timeout (the unit is seconds).

```yaml
apiVersion: apps/v1
kind: StatefulSet
Expand Down
8 changes: 1 addition & 7 deletions util/config/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,7 @@ func unmarshalObject(data []byte, obj interface{}) error {
if err != nil {
return err
}

err = json.Unmarshal(jsonData, &obj)
if err != nil {
return err
}

return err
return json.Unmarshal(jsonData, &obj)
}

// MarshalLocalYAMLFile writes JSON or YAML to a file on disk.
Expand Down

0 comments on commit 2dbc6c7

Please sign in to comment.