Skip to content

Commit

Permalink
[Hack] Add concurrency to cluster creation in resource generator. (ar…
Browse files Browse the repository at this point in the history
…goproj#11266)

* add threading to cluster creation

Signed-off-by: Dan Garfield <[email protected]>

* Add default values

Signed-off-by: Dan Garfield <[email protected]>

* Cleanup

Signed-off-by: Dan Garfield <[email protected]>

* Move external dependency to internal

Signed-off-by: Dan Garfield <[email protected]>

* ability to run cluster generation in parallel

Signed-off-by: pashavictorovich <[email protected]>

* fix linter

Signed-off-by: pashavictorovich <[email protected]>

Signed-off-by: Dan Garfield <[email protected]>
Signed-off-by: pashavictorovich <[email protected]>
Co-authored-by: pasha-codefresh <[email protected]>
  • Loading branch information
todaywasawesome and pasha-codefresh authored Dec 4, 2022
1 parent 62c85d6 commit 42efcb3
Show file tree
Hide file tree
Showing 3 changed files with 183 additions and 54 deletions.
120 changes: 67 additions & 53 deletions hack/gen-resources/generators/cluster_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (cg *ClusterGenerator) getClusterCredentials(namespace string, releaseSuffi
return caData, cert, key, nil
}

//TODO: also should provision service for vcluster pod
// TODO: also should provision service for vcluster pod
func (cg *ClusterGenerator) installVCluster(opts *util.GenerateOpts, namespace string, releaseName string) error {
cmd, err := helm.NewCmd("/tmp", "v3", "")
if err != nil {
Expand Down Expand Up @@ -175,71 +175,85 @@ func (cg *ClusterGenerator) retrieveClusterUri(namespace, releaseSuffix string)
return "", nil
}

func (cg *ClusterGenerator) Generate(opts *util.GenerateOpts) error {
for i := 1; i <= opts.ClusterOpts.Samples; i++ {
log.Printf("Generate cluster #%v of #%v", i, opts.ClusterOpts.Samples)
func (cg *ClusterGenerator) generate(i int, opts *util.GenerateOpts) error {
log.Printf("Generate cluster #%v of #%v", i, opts.ClusterOpts.Samples)

namespace := opts.ClusterOpts.NamespacePrefix + "-" + util.GetRandomString()
namespace := opts.ClusterOpts.NamespacePrefix + "-" + util.GetRandomString()

log.Printf("Namespace is %s", namespace)
log.Printf("Namespace is %s", namespace)

releaseSuffix := util.GetRandomString()
releaseSuffix := util.GetRandomString()

log.Printf("Release suffix is %s", namespace)
log.Printf("Release suffix is %s", namespace)

err := cg.installVCluster(opts, namespace, POD_PREFIX+"-"+releaseSuffix)
if err != nil {
log.Printf("Skip cluster installation due error %v", err.Error())
continue
}
err := cg.installVCluster(opts, namespace, POD_PREFIX+"-"+releaseSuffix)
if err != nil {
log.Printf("Skip cluster installation due error %v", err.Error())
}

log.Print("Get cluster credentials")
caData, cert, key, err := cg.getClusterCredentials(namespace, releaseSuffix)
log.Print("Get cluster credentials")
caData, cert, key, err := cg.getClusterCredentials(namespace, releaseSuffix)

for o := 0; o < 5; o++ {
if err == nil {
break
}
log.Printf("Failed to get cluster credentials %s, retrying...", releaseSuffix)
time.Sleep(10 * time.Second)
caData, cert, key, err = cg.getClusterCredentials(namespace, releaseSuffix)
for o := 0; o < 5; o++ {
if err == nil {
break
}
if err != nil {
return err
}

log.Printf("Failed to get cluster credentials %s, retrying...", releaseSuffix)
time.Sleep(10 * time.Second)
caData, cert, key, err = cg.getClusterCredentials(namespace, releaseSuffix)
}
if err != nil {
return err
}

log.Print("Get cluster server uri")
log.Print("Get cluster server uri")

uri, err := cg.retrieveClusterUri(namespace, releaseSuffix)
if err != nil {
return err
}
uri, err := cg.retrieveClusterUri(namespace, releaseSuffix)
if err != nil {
return err
}

log.Printf("Cluster server uri is %s", uri)

log.Print("Create cluster")
_, err = cg.db.CreateCluster(context.TODO(), &argoappv1.Cluster{
Server: uri,
Name: opts.ClusterOpts.ClusterNamePrefix + "-" + util.GetRandomString(),
Config: argoappv1.ClusterConfig{
TLSClientConfig: argoappv1.TLSClientConfig{
Insecure: false,
ServerName: "kubernetes.default.svc",
CAData: caData,
CertData: cert,
KeyData: key,
},
log.Printf("Cluster server uri is %s", uri)

log.Print("Create cluster")
_, err = cg.db.CreateCluster(context.TODO(), &argoappv1.Cluster{
Server: uri,
Name: opts.ClusterOpts.ClusterNamePrefix + "-" + util.GetRandomString(),
Config: argoappv1.ClusterConfig{
TLSClientConfig: argoappv1.TLSClientConfig{
Insecure: false,
ServerName: "kubernetes.default.svc",
CAData: caData,
CertData: cert,
KeyData: key,
},
ConnectionState: argoappv1.ConnectionState{},
ServerVersion: "1.18",
Namespaces: []string{opts.ClusterOpts.DestinationNamespace},
Labels: labels,
})
if err != nil {
return err
}
},
ConnectionState: argoappv1.ConnectionState{},
ServerVersion: "1.18",
Namespaces: []string{opts.ClusterOpts.DestinationNamespace},
Labels: labels,
})
if err != nil {
return err
}
return nil
}

func (cg *ClusterGenerator) Generate(opts *util.GenerateOpts) error {
log.Printf("Excute in parallel with %v", opts.ClusterOpts.Concurrency)

wg := util.New(opts.ClusterOpts.Concurrency)
for l := 1; l <= opts.ClusterOpts.Samples; l++ {
wg.Add()
go func(i int) {
defer wg.Done()
err := cg.generate(i, opts)
if err != nil {
log.Printf("Failed to generate cluster #%v due to : %s", i, err.Error())
}
}(l)
}
wg.Wait()
return nil
}

Expand Down
10 changes: 9 additions & 1 deletion hack/gen-resources/util/gen_options_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package util

import (
"os"

"gopkg.in/yaml.v2"
)

Expand Down Expand Up @@ -34,6 +33,7 @@ type ClusterOpts struct {
ValuesFilePath string `yaml:"valuesFilePath"`
DestinationNamespace string `yaml:"destinationNamespace"`
ClusterNamePrefix string `yaml:"clusterNamePrefix"`
Concurrency int `yaml:"parallel"`
}

type GenerateOpts struct {
Expand All @@ -45,6 +45,12 @@ type GenerateOpts struct {
Namespace string `yaml:"namespace"`
}

func setDefaults(opts *GenerateOpts) {
if opts.ClusterOpts.Concurrency == 0 {
opts.ClusterOpts.Concurrency = 2
}
}

func Parse(opts *GenerateOpts, file string) error {
fp, err := os.ReadFile(file)
if err != nil {
Expand All @@ -55,5 +61,7 @@ func Parse(opts *GenerateOpts, file string) error {
return e
}

setDefaults(opts)

return nil
}
107 changes: 107 additions & 0 deletions hack/gen-resources/util/sizedwaitgroup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// The MIT License (MIT)

// Copyright (c) 2018 Rémy Mathieu

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:

// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.

// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
// https://github.com/remeh/sizedwaitgroup

// Based upon sync.WaitGroup, SizedWaitGroup allows to start multiple
// routines and to wait for their end using the simple API.

// SizedWaitGroup adds the feature of limiting the maximum number of
// concurrently started routines. It could for example be used to start
// multiples routines querying a database but without sending too much
// queries in order to not overload the given database.
//
// Rémy Mathieu © 2016
package util

import (
"context"
"math"
"sync"
)

// SizedWaitGroup has the same role and close to the
// same API as the Golang sync.WaitGroup but adds a limit of
// the amount of goroutines started concurrently.
type SizedWaitGroup struct {
Size int

current chan struct{}
wg sync.WaitGroup
}

// New creates a SizedWaitGroup.
// The limit parameter is the maximum amount of
// goroutines which can be started concurrently.
func New(limit int) SizedWaitGroup {
size := math.MaxInt32 // 2^31 - 1
if limit > 0 {
size = limit
}
return SizedWaitGroup{
Size: size,

current: make(chan struct{}, size),
wg: sync.WaitGroup{},
}
}

// Add increments the internal WaitGroup counter.
// It can be blocking if the limit of spawned goroutines
// has been reached. It will stop blocking when Done is
// been called.
//
// See sync.WaitGroup documentation for more information.
func (s *SizedWaitGroup) Add() {
_ = s.AddWithContext(context.Background())
}

// AddWithContext increments the internal WaitGroup counter.
// It can be blocking if the limit of spawned goroutines
// has been reached. It will stop blocking when Done is
// been called, or when the context is canceled. Returns nil on
// success or an error if the context is canceled before the lock
// is acquired.
//
// See sync.WaitGroup documentation for more information.
func (s *SizedWaitGroup) AddWithContext(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case s.current <- struct{}{}:
break
}
s.wg.Add(1)
return nil
}

// Done decrements the SizedWaitGroup counter.
// See sync.WaitGroup documentation for more information.
func (s *SizedWaitGroup) Done() {
<-s.current
s.wg.Done()
}

// Wait blocks until the SizedWaitGroup counter is zero.
// See sync.WaitGroup documentation for more information.
func (s *SizedWaitGroup) Wait() {
s.wg.Wait()
}

0 comments on commit 42efcb3

Please sign in to comment.