Skip to content

Commit

Permalink
*: pass context.Context to client-go functions
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Pasquier <[email protected]>
  • Loading branch information
simonpasquier committed Sep 3, 2020
1 parent d1e9fc7 commit 053da63
Show file tree
Hide file tree
Showing 12 changed files with 188 additions and 181 deletions.
25 changes: 16 additions & 9 deletions cmd/operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,29 +244,36 @@ func Main() int {
cfg.Namespaces.ThanosRulerAllowList = cfg.Namespaces.AllowList
}

ctx, cancel := context.WithCancel(context.Background())
wg, ctx := errgroup.WithContext(ctx)
r := prometheus.NewRegistry()
po, err := prometheuscontroller.New(cfg, log.With(logger, "component", "prometheusoperator"), r)

po, err := prometheuscontroller.New(ctx, cfg, log.With(logger, "component", "prometheusoperator"), r)
if err != nil {
fmt.Fprint(os.Stderr, "instantiating prometheus controller failed: ", err)
cancel()
return 1
}

ao, err := alertmanagercontroller.New(cfg, log.With(logger, "component", "alertmanageroperator"), r)
ao, err := alertmanagercontroller.New(ctx, cfg, log.With(logger, "component", "alertmanageroperator"), r)
if err != nil {
fmt.Fprint(os.Stderr, "instantiating alertmanager controller failed: ", err)
cancel()
return 1
}

to, err := thanoscontroller.New(cfg, log.With(logger, "component", "thanosoperator"), r)
to, err := thanoscontroller.New(ctx, cfg, log.With(logger, "component", "thanosoperator"), r)
if err != nil {
fmt.Fprint(os.Stderr, "instantiating thanos controller failed: ", err)
cancel()
return 1
}

mux := http.NewServeMux()
web, err := api.New(cfg, log.With(logger, "component", "api"))
if err != nil {
fmt.Fprint(os.Stderr, "instantiating api failed: ", err)
cancel()
return 1
}
admit := admission.New(log.With(logger, "component", "admissionwebhook"))
Expand All @@ -276,6 +283,7 @@ func Main() int {
l, err := net.Listen("tcp", cfg.ListenAddress)
if err != nil {
fmt.Fprint(os.Stderr, "listening failed", cfg.ListenAddress, err)
cancel()
return 1
}

Expand All @@ -288,6 +296,7 @@ func Main() int {
cfg.ServerTLSConfig.ClientCAFile, cfg.ServerTLSConfig.MinVersion, cfg.ServerTLSConfig.CipherSuites)
if tlsConfig == nil || err != nil {
fmt.Fprint(os.Stderr, "invalid TLS config", err)
cancel()
return 1
}
}
Expand Down Expand Up @@ -321,12 +330,9 @@ func Main() int {
mux.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol))
mux.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace))

ctx, cancel := context.WithCancel(context.Background())
wg, ctx := errgroup.WithContext(ctx)

wg.Go(func() error { return po.Run(ctx.Done()) })
wg.Go(func() error { return ao.Run(ctx.Done()) })
wg.Go(func() error { return to.Run(ctx.Done()) })
wg.Go(func() error { return po.Run(ctx) })
wg.Go(func() error { return ao.Run(ctx) })
wg.Go(func() error { return to.Run(ctx) })

if tlsConfig != nil {
r, err := rbacproxytls.NewCertReloader(
Expand All @@ -336,6 +342,7 @@ func Main() int {
)
if err != nil {
fmt.Fprint(os.Stderr, "failed to initialize certificate reloader", err)
cancel()
return 1
}

Expand Down
44 changes: 22 additions & 22 deletions pkg/alertmanager/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type Config struct {
}

// New creates a new controller.
func New(c prometheusoperator.Config, logger log.Logger, r prometheus.Registerer) (*Operator, error) {
func New(ctx context.Context, c prometheusoperator.Config, logger log.Logger, r prometheus.Registerer) (*Operator, error) {
cfg, err := k8sutil.NewClusterConfig(c.Host, c.TLSInsecure, &c.TLSConfig)
if err != nil {
return nil, errors.Wrap(err, "instantiating cluster config failed")
Expand Down Expand Up @@ -123,11 +123,11 @@ func New(c prometheusoperator.Config, logger log.Logger, r prometheus.Registerer
return &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.LabelSelector = o.config.AlertManagerSelector
return o.mclient.MonitoringV1().Alertmanagers(namespace).List(context.TODO(), options)
return o.mclient.MonitoringV1().Alertmanagers(namespace).List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.LabelSelector = o.config.AlertManagerSelector
return o.mclient.MonitoringV1().Alertmanagers(namespace).Watch(context.TODO(), options)
return o.mclient.MonitoringV1().Alertmanagers(namespace).Watch(ctx, options)
},
}
}),
Expand Down Expand Up @@ -189,7 +189,7 @@ func (c *Operator) addHandlers() {
}

// Run the controller.
func (c *Operator) Run(stopc <-chan struct{}) error {
func (c *Operator) Run(ctx context.Context) error {
defer c.queue.ShutDown()

errChan := make(chan error)
Expand All @@ -209,20 +209,20 @@ func (c *Operator) Run(stopc <-chan struct{}) error {
return err
}
level.Info(c.logger).Log("msg", "CRD API endpoints ready")
case <-stopc:
case <-ctx.Done():
return nil
}

go c.worker()
go c.worker(ctx)

go c.alrtInf.Run(stopc)
go c.ssetInf.Run(stopc)
if err := c.waitForCacheSync(stopc); err != nil {
go c.alrtInf.Run(ctx.Done())
go c.ssetInf.Run(ctx.Done())
if err := c.waitForCacheSync(ctx.Done()); err != nil {
return err
}
c.addHandlers()

<-stopc
<-ctx.Done()
return nil
}

Expand Down Expand Up @@ -270,20 +270,20 @@ func (c *Operator) enqueue(obj interface{}) {
// worker runs a worker thread that just dequeues items, processes them
// and marks them done. It enforces that the syncHandler is never invoked
// concurrently with the same key.
func (c *Operator) worker() {
for c.processNextWorkItem() {
func (c *Operator) worker(ctx context.Context) {
for c.processNextWorkItem(ctx) {
}
}

func (c *Operator) processNextWorkItem() bool {
func (c *Operator) processNextWorkItem(ctx context.Context) bool {
key, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(key)

c.metrics.ReconcileCounter().Inc()
err := c.sync(key.(string))
err := c.sync(ctx, key.(string))
if err == nil {
c.queue.Forget(key)
return true
Expand Down Expand Up @@ -397,7 +397,7 @@ func (c *Operator) handleStatefulSetUpdate(oldo, curo interface{}) {
}
}

func (c *Operator) sync(key string) error {
func (c *Operator) sync(ctx context.Context, key string) error {
obj, exists, err := c.alrtInf.GetIndexer().GetByKey(key)
if err != nil {
return err
Expand All @@ -420,7 +420,7 @@ func (c *Operator) sync(key string) error {

// Create governing service if it doesn't exist.
svcClient := c.kclient.CoreV1().Services(am.Namespace)
if err = k8sutil.CreateOrUpdateService(svcClient, makeStatefulSetService(am, c.config)); err != nil {
if err = k8sutil.CreateOrUpdateService(ctx, svcClient, makeStatefulSetService(am, c.config)); err != nil {
return errors.Wrap(err, "synchronizing governing service failed")
}

Expand All @@ -437,7 +437,7 @@ func (c *Operator) sync(key string) error {
return errors.Wrap(err, "making the statefulset, to create, failed")
}
operator.SanitizeSTS(sset)
if _, err := ssetClient.Create(context.TODO(), sset, metav1.CreateOptions{}); err != nil {
if _, err := ssetClient.Create(ctx, sset, metav1.CreateOptions{}); err != nil {
return errors.Wrap(err, "creating statefulset failed")
}
return nil
Expand All @@ -449,14 +449,14 @@ func (c *Operator) sync(key string) error {
}

operator.SanitizeSTS(sset)
_, err = ssetClient.Update(context.TODO(), sset, metav1.UpdateOptions{})
_, err = ssetClient.Update(ctx, sset, metav1.UpdateOptions{})
sErr, ok := err.(*apierrors.StatusError)

if ok && sErr.ErrStatus.Code == 422 && sErr.ErrStatus.Reason == metav1.StatusReasonInvalid {
c.metrics.StsDeleteCreateCounter().Inc()
level.Info(c.logger).Log("msg", "resolving illegal update of Alertmanager StatefulSet", "details", sErr.ErrStatus.Details)
propagationPolicy := metav1.DeletePropagationForeground
if err := ssetClient.Delete(context.TODO(), sset.GetName(), metav1.DeleteOptions{PropagationPolicy: &propagationPolicy}); err != nil {
if err := ssetClient.Delete(ctx, sset.GetName(), metav1.DeleteOptions{PropagationPolicy: &propagationPolicy}); err != nil {
return errors.Wrap(err, "failed to delete StatefulSet to avoid forbidden action")
}
return nil
Expand Down Expand Up @@ -492,14 +492,14 @@ func ListOptions(name string) metav1.ListOptions {
}
}

func AlertmanagerStatus(kclient kubernetes.Interface, a *monitoringv1.Alertmanager) (*monitoringv1.AlertmanagerStatus, []v1.Pod, error) {
func AlertmanagerStatus(ctx context.Context, kclient kubernetes.Interface, a *monitoringv1.Alertmanager) (*monitoringv1.AlertmanagerStatus, []v1.Pod, error) {
res := &monitoringv1.AlertmanagerStatus{Paused: a.Spec.Paused}

pods, err := kclient.CoreV1().Pods(a.Namespace).List(context.TODO(), ListOptions(a.Name))
pods, err := kclient.CoreV1().Pods(a.Namespace).List(ctx, ListOptions(a.Name))
if err != nil {
return nil, nil, errors.Wrap(err, "retrieving pods of failed")
}
sset, err := kclient.AppsV1().StatefulSets(a.Namespace).Get(context.TODO(), statefulSetNameFromAlertmanagerName(a.Name), metav1.GetOptions{})
sset, err := kclient.AppsV1().StatefulSets(a.Namespace).Get(ctx, statefulSetNameFromAlertmanagerName(a.Name), metav1.GetOptions{})
if err != nil {
return nil, nil, errors.Wrap(err, "retrieving stateful set failed")
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package api

import (
"context"
"encoding/json"
"net/http"
"regexp"
Expand Down Expand Up @@ -100,7 +99,7 @@ func parsePrometheusStatusUrl(path string) objectReference {
func (api *API) prometheusStatus(w http.ResponseWriter, req *http.Request) {
or := parsePrometheusStatusUrl(req.URL.Path)

p, err := api.mclient.MonitoringV1().Prometheuses(or.namespace).Get(context.TODO(), or.name, metav1.GetOptions{})
p, err := api.mclient.MonitoringV1().Prometheuses(or.namespace).Get(req.Context(), or.name, metav1.GetOptions{})
if err != nil {
if k8sutil.IsResourceNotFoundError(err) {
w.WriteHeader(404)
Expand All @@ -109,7 +108,7 @@ func (api *API) prometheusStatus(w http.ResponseWriter, req *http.Request) {
return
}

p.Status, _, err = prometheus.PrometheusStatus(api.kclient, p)
p.Status, _, err = prometheus.PrometheusStatus(req.Context(), api.kclient, p)
if err != nil {
api.logger.Log("error", err)
}
Expand Down
16 changes: 8 additions & 8 deletions pkg/k8sutil/k8sutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,21 +105,21 @@ func IsResourceNotFoundError(err error) bool {
return false
}

func CreateOrUpdateService(sclient clientv1.ServiceInterface, svc *v1.Service) error {
service, err := sclient.Get(context.TODO(), svc.Name, metav1.GetOptions{})
func CreateOrUpdateService(ctx context.Context, sclient clientv1.ServiceInterface, svc *v1.Service) error {
service, err := sclient.Get(ctx, svc.Name, metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return errors.Wrap(err, "retrieving service object failed")
}

if apierrors.IsNotFound(err) {
_, err = sclient.Create(context.TODO(), svc, metav1.CreateOptions{})
_, err = sclient.Create(ctx, svc, metav1.CreateOptions{})
if err != nil {
return errors.Wrap(err, "creating service object failed")
}
} else {
svc.ResourceVersion = service.ResourceVersion
svc.SetOwnerReferences(mergeOwnerReferences(service.GetOwnerReferences(), svc.GetOwnerReferences()))
_, err := sclient.Update(context.TODO(), svc, metav1.UpdateOptions{})
_, err := sclient.Update(ctx, svc, metav1.UpdateOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return errors.Wrap(err, "updating service object failed")
}
Expand All @@ -128,20 +128,20 @@ func CreateOrUpdateService(sclient clientv1.ServiceInterface, svc *v1.Service) e
return nil
}

func CreateOrUpdateEndpoints(eclient clientv1.EndpointsInterface, eps *v1.Endpoints) error {
endpoints, err := eclient.Get(context.TODO(), eps.Name, metav1.GetOptions{})
func CreateOrUpdateEndpoints(ctx context.Context, eclient clientv1.EndpointsInterface, eps *v1.Endpoints) error {
endpoints, err := eclient.Get(ctx, eps.Name, metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return errors.Wrap(err, "retrieving existing kubelet endpoints object failed")
}

if apierrors.IsNotFound(err) {
_, err = eclient.Create(context.TODO(), eps, metav1.CreateOptions{})
_, err = eclient.Create(ctx, eps, metav1.CreateOptions{})
if err != nil {
return errors.Wrap(err, "creating kubelet endpoints object failed")
}
} else {
eps.ResourceVersion = endpoints.ResourceVersion
_, err = eclient.Update(context.TODO(), eps, metav1.UpdateOptions{})
_, err = eclient.Update(ctx, eps, metav1.UpdateOptions{})
if err != nil {
return errors.Wrap(err, "updating kubelet endpoints object failed")
}
Expand Down
Loading

0 comments on commit 053da63

Please sign in to comment.