From 9898086dfc586e33412ce5f07dc5f93ca16a38dc Mon Sep 17 00:00:00 2001 From: jmcshane Date: Wed, 14 Apr 2021 11:28:39 -0500 Subject: [PATCH] Implementing TriggerGroups as inline EventListener resource This feature allows an operator to specify a set of interceptors that will be executed before a group of triggers are selected and executed. This allows common data to be passed from interceptor execution down to multiple triggers to solve a set of common use cases across multiple Triggers. This feature is enabled for now inline in the EventListener spec, but in the future may be enabled only in alpha once the feature gates proposal is implemented within this project. Addresses #945 --- cmd/triggerrun/cmd/root.go | 2 +- docs/eventlisteners.md | 85 ++++++++ .../eventlistener-triggergroup.yaml | 22 ++ examples/v1beta1/triggergroups/rbac.yaml | 1 + examples/v1beta1/triggergroups/trigger.yaml | 55 +++++ pkg/apis/triggers/register.go | 3 + .../triggers/v1beta1/event_listener_types.go | 23 ++- .../v1beta1/event_listener_validation.go | 14 ++ .../v1beta1/event_listener_validation_test.go | 74 ++++++- .../triggers/v1beta1/zz_generated.deepcopy.go | 57 +++++ pkg/interceptors/server/server.go | 1 + pkg/sink/sink.go | 194 ++++++++++++------ pkg/sink/sink_test.go | 63 +++++- test/builder/eventlistener_test.go | 3 +- third_party/LICENSE | 27 --- .../golang-lru/simplelru/lru_interface.go | 4 +- .../vendor/golang.org/x/crypto/LICENSE | 27 --- third_party/vendor/golang.org/x/net/LICENSE | 27 --- third_party/vendor/golang.org/x/text/LICENSE | 27 --- 19 files changed, 525 insertions(+), 184 deletions(-) create mode 100644 examples/v1beta1/triggergroups/eventlistener-triggergroup.yaml create mode 120000 examples/v1beta1/triggergroups/rbac.yaml create mode 100644 examples/v1beta1/triggergroups/trigger.yaml delete mode 100644 third_party/LICENSE delete mode 100644 third_party/vendor/golang.org/x/crypto/LICENSE delete mode 100644 third_party/vendor/golang.org/x/net/LICENSE delete mode 100644 third_party/vendor/golang.org/x/text/LICENSE diff --git a/cmd/triggerrun/cmd/root.go b/cmd/triggerrun/cmd/root.go index 52dcd5c6a..bb6e26a12 100644 --- a/cmd/triggerrun/cmd/root.go +++ b/cmd/triggerrun/cmd/root.go @@ -196,7 +196,7 @@ func processTriggerSpec(kubeClient kubernetes.Interface, client triggersclientse log := eventLog.With(zap.String(triggers.TriggerLabelKey, r.EventListenerName)) - finalPayload, header, iresp, err := r.ExecuteInterceptors(*tri, request, body, log, eventID) + finalPayload, header, iresp, err := r.ExecuteTriggerInterceptors(*tri, request, body, log, eventID, map[string]interface{}{}) if err != nil { log.Error(err) return nil, err diff --git a/docs/eventlisteners.md b/docs/eventlisteners.md index 8dd096958..702e25fb2 100644 --- a/docs/eventlisteners.md +++ b/docs/eventlisteners.md @@ -21,6 +21,7 @@ or more [`Interceptors`](./interceptors.md). - [Structure of an `EventListener`](#structure-of-an-eventlistener) - [Specifying the Kubernetes service account](#specifiying-the-kubernetes-service-account) - [Specifying `Triggers`](#specifying-triggers) +- [Specifying `TriggerGroups`](#specifying-trigger-groups) - [Specifying `Resources`](#specifying-resources) - [Specifying a `kubernetesResource` object](#specifying-a-kubernetesresource-object) - [Specifying `Replicas`](#specifying-replicas) @@ -166,6 +167,90 @@ rules: verbs: ["impersonate"] ``` +## Specifying `TriggerGroups` + +`TriggerGroups` is a feature that allows you to specify a set of interceptors that will process before a set of +`Trigger` resources are processed by the eventlistener. The goal of this feature is described in +[TEP-0053](https://github.com/tektoncd/community/blob/main/teps/0053-nested-triggers.md).TriggerGroups` allow for +a common set of interceptors to be defined inline in the `EventListenerSpec` before `Triggers` are invoked. + +`TriggerGroups` is currently an `alpha` feature. To use it, you use use the v1beta1 API version with the +`enable-api-fields` [feature flag set to `alpha`](./install.md#Customizing-the-Triggers-Controller-behavior). + +You can optionally specify one or more `Triggers` that define the actions to take when the `EventListener` detects a qualifying event. You can specify *either* a reference to an +external `Trigger` object *or* reference/define the `TriggerBindings`, `TriggerTemplates`, and `Interceptors` in the `Trigger` definition. A `TriggerGroup` definition specifies the following fields: + +- `name` - (optional) a valid [Kubernetes name](https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set) that uniquely identifies the `TriggerGroup` +- `interceptors` - a list of [`Interceptors`](#specifying-interceptors) that will process event payload data before passing it to the downstream `Triggers` +- `triggerSelector` - a combination of a Kubernetes `labelSelector` and a `namespaceSelector` as defined later [in this document](#constraining-eventlisteners-to-specific-namespaces). These two fields work together to define the `Triggers` that will be processed once `Interceptors` processing completes. + +Below is an example EventListener that defines an inline `triggerGroup`: + +```yaml +apiVersion: triggers.tekton.dev/v1beta1 +kind: EventListener +metadata: + name: eventlistener +spec: + triggerGroups: + - name: github-pr-group + interceptors: + - name: "validate GitHub payload and filter on eventType" + ref: + name: "github" + params: + - name: "secretRef" + value: + secretName: github-secret + secretKey: secretToken + - name: "eventTypes" + value: ["pull_request"] + triggerSelector: + labelSelector: + matchLabels: + type: github-pr +``` + +This configuration would first process any event that is sent to the `EventListener` and determine if it matches +the outlined conditions. If it passes these conditions, it will use the `triggerSelector` matching criteria to determine +the target `Trigger` resources to continue processing. + +Any `extensions` fields added during `triggerGroup` processing are passed to the downstream `Trigger` execution. This allows +for shared data across all Triggers that are processed after group execution completes. As an example, `extensions.myfield` would +be available to all `Trigger` resources matched by this group: + +```yaml +apiVersion: triggers.tekton.dev/v1beta1 +kind: EventListener +metadata: + name: eventlistener +spec: + triggerGroups: + - name: cel-filter-group + interceptors: + - name: "validate body and add field" + ref: + name: "cel" + params: + - name: "filter" + value: "body.action in ['opened', 'reopened']" + - name: "overlays" + value: + - key: myfield + expression: "body.pull_request.head.sha.truncate(7)" + triggerSelector: + namespaceSelector: + matchNames: + - foo + labelSelector: + matchLabels: + type: cel-preprocessed +``` + +At this time, each `TriggerGroup` determines its own downstream Triggers, so if two separate groups select the same +downstream `Trigger` resources, it may be executed multiple times. If you use this feature, ensure that `Trigger` resources +are labeled to be queried by the appropriate set of `TriggerGroups`. + ## Specifying `Resources` You can optionally customize the sink deployment for your `EventListener` using the `resources` field. It accepts the following types of objects: diff --git a/examples/v1beta1/triggergroups/eventlistener-triggergroup.yaml b/examples/v1beta1/triggergroups/eventlistener-triggergroup.yaml new file mode 100644 index 000000000..6d05b15e1 --- /dev/null +++ b/examples/v1beta1/triggergroups/eventlistener-triggergroup.yaml @@ -0,0 +1,22 @@ +apiVersion: triggers.tekton.dev/v1beta1 +kind: EventListener +metadata: + name: listener-triggergroup +spec: + serviceAccountName: tekton-triggers-example-sa + triggerGroups: + - name: github-pr + interceptors: + - ref: + name: "cel" + params: + - name: "filter" + value: "header.match('X-GitHub-Event', 'pull_request')" + - name: "overlays" + value: + - key: truncated_sha + expression: "body.pull_request.head.sha.truncate(7)" + triggerSelector: + labelSelector: + matchLabels: + type: github-pr \ No newline at end of file diff --git a/examples/v1beta1/triggergroups/rbac.yaml b/examples/v1beta1/triggergroups/rbac.yaml new file mode 120000 index 000000000..92dc8540e --- /dev/null +++ b/examples/v1beta1/triggergroups/rbac.yaml @@ -0,0 +1 @@ +../../rbac.yaml \ No newline at end of file diff --git a/examples/v1beta1/triggergroups/trigger.yaml b/examples/v1beta1/triggergroups/trigger.yaml new file mode 100644 index 000000000..8b47a0760 --- /dev/null +++ b/examples/v1beta1/triggergroups/trigger.yaml @@ -0,0 +1,55 @@ +apiVersion: triggers.tekton.dev/v1beta1 +kind: Trigger +metadata: + name: trigger + labels: + type: github-pr +spec: + bindings: + - name: gitrevision + value: $(extensions.truncated_sha) + - name: gitrepositoryurl + value: $(body.repository.url) + - name: contenttype + value: $(header.Content-Type) + template: + ref: pipeline-template +--- +apiVersion: triggers.tekton.dev/v1beta1 +kind: TriggerTemplate +metadata: + name: pipeline-template +spec: + params: + - name: gitrevision + description: The git revision + default: main + - name: gitrepositoryurl + description: The git repository url + - name: message + description: The message to print + default: This is the default message + - name: contenttype + description: The Content-Type of the event + resourcetemplates: + - apiVersion: tekton.dev/v1beta1 + kind: PipelineRun + metadata: + generateName: simple-pipeline-run- + spec: + pipelineRef: + name: simple-pipeline + params: + - name: message + value: $(tt.params.message) + - name: contenttype + value: $(tt.params.contenttype) + resources: + - name: git-source + resourceSpec: + type: git + params: + - name: revision + value: $(tt.params.gitrevision) + - name: url + value: $(tt.params.gitrepositoryurl) diff --git a/pkg/apis/triggers/register.go b/pkg/apis/triggers/register.go index e297afe66..f3b2a0612 100644 --- a/pkg/apis/triggers/register.go +++ b/pkg/apis/triggers/register.go @@ -12,4 +12,7 @@ const ( // TriggerLabelKey is used as the label identifier for a Trigger TriggerLabelKey = "/trigger" + + // TriggerGroupLabelKey is used as a label identifier for a TriggerGroup + TriggerGroupLabelKey = "/triggergroup" ) diff --git a/pkg/apis/triggers/v1beta1/event_listener_types.go b/pkg/apis/triggers/v1beta1/event_listener_types.go index 9289cc487..63457da7f 100644 --- a/pkg/apis/triggers/v1beta1/event_listener_types.go +++ b/pkg/apis/triggers/v1beta1/event_listener_types.go @@ -59,9 +59,11 @@ var _ kmeta.OwnerRefable = (*EventListener)(nil) type EventListenerSpec struct { ServiceAccountName string `json:"serviceAccountName,omitempty"` Triggers []EventListenerTrigger `json:"triggers"` - NamespaceSelector NamespaceSelector `json:"namespaceSelector,omitempty"` - LabelSelector *metav1.LabelSelector `json:"labelSelector,omitempty"` - Resources Resources `json:"resources,omitempty"` + // Trigger groups allow for centralized processing of an interceptor chain + TriggerGroups []EventListenerTriggerGroup `json:"triggerGroups"` + NamespaceSelector NamespaceSelector `json:"namespaceSelector,omitempty"` + LabelSelector *metav1.LabelSelector `json:"labelSelector,omitempty"` + Resources Resources `json:"resources,omitempty"` } type Resources struct { @@ -112,6 +114,19 @@ type EventListenerTrigger struct { ServiceAccountName string `json:"serviceAccountName,omitempty"` } +// EventListenerTriggerGroup defines a group of Triggers that share a common set of interceptors +type EventListenerTriggerGroup struct { + Name string `json:"name"` + Interceptors []*TriggerInterceptor `json:"interceptors"` + TriggerSelector EventListenerTriggerSelector `json:"triggerSelector"` +} + +// EventListenerTriggerSelector defines ways to select a group of triggers using their metadata +type EventListenerTriggerSelector struct { + NamespaceSelector NamespaceSelector `json:"namespaceSelector,omitempty"` + LabelSelector *metav1.LabelSelector `json:"labelSelector,omitempty"` +} + // EventInterceptor provides a hook to intercept and pre-process events type EventInterceptor = TriggerInterceptor @@ -123,7 +138,7 @@ type SecretRef struct { SecretName string `json:"secretName,omitempty"` } -// EventListenerBinding refers to a particular TriggerBinding or ClusterTriggerBindingresource. +// EventListenerBinding refers to a particular TriggerBinding or ClusterTriggerBinding resource. type EventListenerBinding = TriggerSpecBinding // EventListenerTemplate refers to a particular TriggerTemplate resource. diff --git a/pkg/apis/triggers/v1beta1/event_listener_validation.go b/pkg/apis/triggers/v1beta1/event_listener_validation.go index 5bdab0a9d..1c81e9798 100644 --- a/pkg/apis/triggers/v1beta1/event_listener_validation.go +++ b/pkg/apis/triggers/v1beta1/event_listener_validation.go @@ -72,6 +72,20 @@ func (s *EventListenerSpec) validate(ctx context.Context) (errs *apis.FieldError if s.Resources.CustomResource != nil { errs = errs.Also(validateCustomObject(s.Resources.CustomResource).ViaField("spec.resources.customResource")) } + + for i, group := range s.TriggerGroups { + errs = errs.Also(group.validate(ctx).ViaField(fmt.Sprintf("spec.triggerGroups[%d]", i))) + } + return errs +} + +func (g *EventListenerTriggerGroup) validate(ctx context.Context) (errs *apis.FieldError) { + if g.TriggerSelector.LabelSelector == nil && len(g.TriggerSelector.NamespaceSelector.MatchNames) == 0 { + errs = errs.Also(apis.ErrMissingOneOf("triggerSelector.labelSelector", "triggerSelector.namespaceSelector")) + } + if len(g.Interceptors) == 0 { + errs = errs.Also(apis.ErrMissingField("interceptors")) + } return errs } diff --git a/pkg/apis/triggers/v1beta1/event_listener_validation_test.go b/pkg/apis/triggers/v1beta1/event_listener_validation_test.go index 0dab7eb4c..62a12db14 100644 --- a/pkg/apis/triggers/v1beta1/event_listener_validation_test.go +++ b/pkg/apis/triggers/v1beta1/event_listener_validation_test.go @@ -482,7 +482,35 @@ func Test_EventListenerValidate(t *testing.T) { }}, }, }, - }} + }, { + name: "Valid event listener with TriggerGroup and namespaceSelector", + el: &triggersv1beta1.EventListener{ + ObjectMeta: metav1.ObjectMeta{ + Name: "name", + Namespace: "namespace", + }, + Spec: triggersv1beta1.EventListenerSpec{ + TriggerGroups: []triggersv1beta1.EventListenerTriggerGroup{{ + Name: "my-group", + Interceptors: []*triggersv1beta1.TriggerInterceptor{{ + Ref: triggersv1beta1.InterceptorRef{ + Name: "cel", + }, + Params: []triggersv1beta1.InterceptorParams{{ + Name: "filter", + Value: test.ToV1JSON(t, "has(body.repository)"), + }}, + }}, + TriggerSelector: triggersv1beta1.EventListenerTriggerSelector{ + NamespaceSelector: triggersv1beta1.NamespaceSelector{ + MatchNames: []string{ + "foobar", + }, + }, + }, + }}, + }, + }}} for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { @@ -1160,6 +1188,50 @@ func TestEventListenerValidate_error(t *testing.T) { }, }, wantErr: apis.ErrMultipleOneOf("spec.triggers[0].template or bindings or interceptors", "spec.triggers[0].triggerRef"), + }, { + name: "missing label and namespace selector", + el: &triggersv1beta1.EventListener{ + ObjectMeta: metav1.ObjectMeta{ + Name: "name", + Namespace: "namespace", + }, + Spec: triggersv1beta1.EventListenerSpec{ + TriggerGroups: []triggersv1beta1.EventListenerTriggerGroup{{ + Name: "my-group", + Interceptors: []*triggersv1beta1.TriggerInterceptor{{ + Ref: triggersv1beta1.InterceptorRef{ + Name: "cel", + }, + Params: []triggersv1beta1.InterceptorParams{{ + Name: "filter", + Value: test.ToV1JSON(t, "has(body.repository)"), + }}, + }}, + }}, + }, + }, + wantErr: apis.ErrMissingOneOf("spec.triggerGroups[0].triggerSelector.labelSelector", "spec.triggerGroups[0].triggerSelector.namespaceSelector"), + }, { + name: "triggerGroup requires interceptor", + el: &triggersv1beta1.EventListener{ + ObjectMeta: metav1.ObjectMeta{ + Name: "name", + Namespace: "namespace", + }, + Spec: triggersv1beta1.EventListenerSpec{ + TriggerGroups: []triggersv1beta1.EventListenerTriggerGroup{{ + Name: "my-group", + TriggerSelector: triggersv1beta1.EventListenerTriggerSelector{ + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "foo": "bar", + }, + }, + }, + }}, + }, + }, + wantErr: apis.ErrMissingField("spec.triggerGroups[0].interceptors"), }} for _, tc := range tests { diff --git a/pkg/apis/triggers/v1beta1/zz_generated.deepcopy.go b/pkg/apis/triggers/v1beta1/zz_generated.deepcopy.go index 3e3d9de68..78d789b21 100644 --- a/pkg/apis/triggers/v1beta1/zz_generated.deepcopy.go +++ b/pkg/apis/triggers/v1beta1/zz_generated.deepcopy.go @@ -256,6 +256,13 @@ func (in *EventListenerSpec) DeepCopyInto(out *EventListenerSpec) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.TriggerGroups != nil { + in, out := &in.TriggerGroups, &out.TriggerGroups + *out = make([]EventListenerTriggerGroup, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } in.NamespaceSelector.DeepCopyInto(&out.NamespaceSelector) if in.LabelSelector != nil { in, out := &in.LabelSelector, &out.LabelSelector @@ -338,6 +345,56 @@ func (in *EventListenerTrigger) DeepCopy() *EventListenerTrigger { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *EventListenerTriggerGroup) DeepCopyInto(out *EventListenerTriggerGroup) { + *out = *in + if in.Interceptors != nil { + in, out := &in.Interceptors, &out.Interceptors + *out = make([]*TriggerInterceptor, len(*in)) + for i := range *in { + if (*in)[i] != nil { + in, out := &(*in)[i], &(*out)[i] + *out = new(TriggerInterceptor) + (*in).DeepCopyInto(*out) + } + } + } + in.TriggerSelector.DeepCopyInto(&out.TriggerSelector) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new EventListenerTriggerGroup. +func (in *EventListenerTriggerGroup) DeepCopy() *EventListenerTriggerGroup { + if in == nil { + return nil + } + out := new(EventListenerTriggerGroup) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *EventListenerTriggerSelector) DeepCopyInto(out *EventListenerTriggerSelector) { + *out = *in + in.NamespaceSelector.DeepCopyInto(&out.NamespaceSelector) + if in.LabelSelector != nil { + in, out := &in.LabelSelector, &out.LabelSelector + *out = new(v1.LabelSelector) + (*in).DeepCopyInto(*out) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new EventListenerTriggerSelector. +func (in *EventListenerTriggerSelector) DeepCopy() *EventListenerTriggerSelector { + if in == nil { + return nil + } + out := new(EventListenerTriggerSelector) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *GitHubInterceptor) DeepCopyInto(out *GitHubInterceptor) { *out = *in diff --git a/pkg/interceptors/server/server.go b/pkg/interceptors/server/server.go index 27a772dc7..cbf5c5bae 100644 --- a/pkg/interceptors/server/server.go +++ b/pkg/interceptors/server/server.go @@ -119,6 +119,7 @@ func (is *Server) ExecuteInterceptor(r *http.Request) ([]byte, error) { } is.Logger.Debugf("Interceptor Request is: %+v", ireq) iresp := ii.Process(ctx, &ireq) + is.Logger.Infof("Interceptor response is: %+v", iresp) respBytes, err := json.Marshal(iresp) if err != nil { return nil, internal(err) diff --git a/pkg/sink/sink.go b/pkg/sink/sink.go index 916b03710..fc36e2c43 100644 --- a/pkg/sink/sink.go +++ b/pkg/sink/sink.go @@ -44,6 +44,10 @@ import ( "k8s.io/client-go/kubernetes" ) +var ( + emptyExtensions = map[string]interface{}{} +) + // Sink defines the sink resource for processing incoming events for the // EventListener. type Sink struct { @@ -58,7 +62,7 @@ type Sink struct { Recorder *Recorder Auth AuthOverride PayloadValidation bool - // WGProcessTriggers keeps track of triggers currently being processed + // WGProcessTriggers keeps track of triggers or triggerGroups currently being processed // Currently only used in tests to wait for all triggers to finish processing WGProcessTriggers *sync.WaitGroup @@ -113,69 +117,39 @@ func (r Sink) HandleEvent(response http.ResponseWriter, request *http.Request) { eventID := template.UUID() log = log.With(zap.String(triggers.EventIDLabelKey, eventID)) log.Debugf("handling event with path %s, payload: %s and header: %v", request.URL.Path, string(event), request.Header) - var trItems []*triggersv1.Trigger - labelSelector := labels.Everything() - if el.Spec.LabelSelector != nil { - labelSelector, err = metav1.LabelSelectorAsSelector(el.Spec.LabelSelector) - if err != nil { - r.Logger.Errorf("Failed to create label selector: %s", err) - r.recordCountMetrics(failTag) - response.WriteHeader(http.StatusInternalServerError) - return - } - } - var triggerFunc func() ([]*triggersv1.Trigger, error) - switch { - case len(el.Spec.NamespaceSelector.MatchNames) == 1 && el.Spec.NamespaceSelector.MatchNames[0] == "*": - triggerFunc = func() ([]*triggersv1.Trigger, error) { - return r.TriggerLister.List(labelSelector) - } - case len(el.Spec.NamespaceSelector.MatchNames) != 0: - triggerFunc = func() ([]*triggersv1.Trigger, error) { - var trList []*triggersv1.Trigger - for _, v := range el.Spec.NamespaceSelector.MatchNames { - trNsList, err := r.TriggerLister.Triggers(v).List(labelSelector) - if err != nil { - return nil, err - } - trList = append(trList, trNsList...) - } - return trList, nil - } - case len(el.Spec.NamespaceSelector.MatchNames) == 0: - if el.Spec.LabelSelector != nil { - triggerFunc = func() ([]*triggersv1.Trigger, error) { - return r.TriggerLister.Triggers(el.GetNamespace()).List(labelSelector) - } - } - } - if triggerFunc != nil { - trList, err := triggerFunc() - if err != nil { - log.Errorf("Error getting Triggers: %s", err) - r.recordCountMetrics(failTag) - response.WriteHeader(http.StatusInternalServerError) - return - } - trItems = append(trItems, trList...) + trItems, err := r.selectTriggers(el.Spec.NamespaceSelector, el.Spec.LabelSelector) + if err != nil { + r.Logger.Errorf("unable to select configured mergedTriggers: %s", err) + response.WriteHeader(http.StatusInternalServerError) + return } - triggers, err := r.merge(el.Spec.Triggers, trItems) + // Process any ungroupedTriggers + mergedTriggers, err := r.merge(el.Spec.Triggers, trItems) if err != nil { - log.Errorf("Error merging Triggers: %s", err) + log.Errorf("error merging triggers: %s", err) response.WriteHeader(http.StatusInternalServerError) return } - // Execute each Trigger - r.WGProcessTriggers.Add(len(triggers)) - for _, t := range triggers { + r.WGProcessTriggers.Add(len(mergedTriggers)) + for _, t := range mergedTriggers { go func(t triggersv1.Trigger) { defer r.WGProcessTriggers.Done() localRequest := request.Clone(request.Context()) - r.processTrigger(t, localRequest, event, eventID, log) + r.processTrigger(t, localRequest, event, eventID, log, emptyExtensions) }(*t) } + // Process grouped triggers + for _, group := range el.Spec.TriggerGroups { + r.WGProcessTriggers.Add(1) + go func(g triggersv1.EventListenerTriggerGroup) { + defer r.WGProcessTriggers.Done() + localRequest := request.Clone(request.Context()) + r.processTriggerGroups(g, localRequest, event, eventID, log, r.WGProcessTriggers) + }(group) + } + r.recordCountMetrics(successTag) response.WriteHeader(http.StatusAccepted) response.Header().Set("Content-Type", "application/json") @@ -220,10 +194,105 @@ func (r Sink) merge(et []triggersv1.EventListenerTrigger, trItems []*triggersv1. return triggers, nil } -func (r Sink) processTrigger(t triggersv1.Trigger, request *http.Request, event []byte, eventID string, eventLog *zap.SugaredLogger) { +func (r Sink) processTriggerGroups(g triggersv1.EventListenerTriggerGroup, request *http.Request, event []byte, eventID string, eventLog *zap.SugaredLogger, wg *sync.WaitGroup) { + log := eventLog.With(zap.String(triggers.TriggerGroupLabelKey, g.Name)) + + extensions := map[string]interface{}{} + payload, header, resp, err := r.ExecuteInterceptors(g.Interceptors, request, event, log, eventID, fmt.Sprintf("namespaces/%s/triggerGroups/%s", r.EventListenerNamespace, g.Name), r.EventListenerNamespace, extensions) + if err != nil { + log.Error(err) + return + } + if resp != nil { + if resp.Extensions != nil { + for k, v := range resp.Extensions { + extensions[k] = v + } + } + if !resp.Continue { + eventLog.Infof("interceptor stopped trigger processing: %v", resp.Status.Err()) + return + } + } + + trItems, err := r.selectTriggers(g.TriggerSelector.NamespaceSelector, g.TriggerSelector.LabelSelector) + if err != nil { + return + } + + // Create a new HTTP request that contains the body and header from any interceptors in the TriggerGroup + // This request will be passed on to the triggers in this group + triggerReq := request.Clone(request.Context()) + triggerReq.Header = header + triggerReq.Body = ioutil.NopCloser(bytes.NewBuffer(payload)) + + wg.Add(len(trItems)) + for _, t := range trItems { + go func(t triggersv1.Trigger) { + defer wg.Done() + // TODO(dibyom): We might be able to get away with only cloning if necessary + // i.e. if there are interceptors and iff those interceptors will modify the body/header (i.e. webhook) + localRequest := triggerReq.Clone(triggerReq.Context()) + r.processTrigger(t, localRequest, event, eventID, log, extensions) + }(*t) + } + + return +} + +func (r Sink) selectTriggers(namespaceSelector triggersv1.NamespaceSelector, labelSelector *metav1.LabelSelector) ([]*triggersv1.Trigger, error) { + var trItems []*triggersv1.Trigger + var err error + targetLabels := labels.Everything() + if labelSelector != nil { + targetLabels, err = metav1.LabelSelectorAsSelector(labelSelector) + if err != nil { + r.Logger.Errorf("failed to create label selector: %v", err) + return nil, err + } + } + var triggerFunc func() ([]*triggersv1.Trigger, error) + switch { + case len(namespaceSelector.MatchNames) == 1 && namespaceSelector.MatchNames[0] == "*": + triggerFunc = func() ([]*triggersv1.Trigger, error) { + return r.TriggerLister.List(targetLabels) + } + case len(namespaceSelector.MatchNames) != 0: + triggerFunc = func() ([]*triggersv1.Trigger, error) { + var trList []*triggersv1.Trigger + for _, v := range namespaceSelector.MatchNames { + trNsList, err := r.TriggerLister.Triggers(v).List(targetLabels) + if err != nil { + return nil, err + } + trList = append(trList, trNsList...) + } + return trList, nil + } + case len(namespaceSelector.MatchNames) == 0: + if labelSelector != nil { + triggerFunc = func() ([]*triggersv1.Trigger, error) { + return r.TriggerLister.Triggers(r.EventListenerNamespace).List(targetLabels) + } + } + } + if triggerFunc == nil { + return trItems, nil + } + trList, err := triggerFunc() + if err != nil { + r.Logger.Errorf("Error getting Triggers: %v", err) + return nil, err + } + trItems = append(trItems, trList...) + + return trItems, nil +} + +func (r Sink) processTrigger(t triggersv1.Trigger, request *http.Request, event []byte, eventID string, eventLog *zap.SugaredLogger, extensions map[string]interface{}) { log := eventLog.With(zap.String(triggers.TriggerLabelKey, t.Name)) - finalPayload, header, iresp, err := r.ExecuteInterceptors(t, request, event, log, eventID) + finalPayload, header, iresp, err := r.ExecuteTriggerInterceptors(t, request, event, log, eventID, extensions) if err != nil { log.Error(err) return @@ -244,7 +313,6 @@ func (r Sink) processTrigger(t triggersv1.Trigger, request *http.Request, event log.Error(err) return } - extensions := map[string]interface{}{} if iresp != nil && iresp.Extensions != nil { extensions = iresp.Extensions } @@ -264,10 +332,14 @@ func (r Sink) processTrigger(t triggersv1.Trigger, request *http.Request, event go r.recordResourceCreation(resources) } +func (r Sink) ExecuteTriggerInterceptors(t triggersv1.Trigger, in *http.Request, event []byte, log *zap.SugaredLogger, eventID string, extensions map[string]interface{}) ([]byte, http.Header, *triggersv1.InterceptorResponse, error) { + return r.ExecuteInterceptors(t.Spec.Interceptors, in, event, log, eventID, fmt.Sprintf("namespaces/%s/triggers/%s", t.Namespace, t.Name), t.Namespace, extensions) +} + // ExecuteInterceptor executes all interceptors for the Trigger and returns back the body, header, and InterceptorResponse to use. // When TEP-0022 is fully implemented, this function will only return the InterceptorResponse and error. -func (r Sink) ExecuteInterceptors(t triggersv1.Trigger, in *http.Request, event []byte, log *zap.SugaredLogger, eventID string) ([]byte, http.Header, *triggersv1.InterceptorResponse, error) { - if len(t.Spec.Interceptors) == 0 { +func (r Sink) ExecuteInterceptors(trInt []*triggersv1.TriggerInterceptor, in *http.Request, event []byte, log *zap.SugaredLogger, eventID string, triggerID string, namespace string, extensions map[string]interface{}) ([]byte, http.Header, *triggersv1.InterceptorResponse, error) { + if len(trInt) == 0 { return event, in.Header, nil, nil } @@ -276,16 +348,16 @@ func (r Sink) ExecuteInterceptors(t triggersv1.Trigger, in *http.Request, event request := triggersv1.InterceptorRequest{ Body: string(event), Header: in.Header.Clone(), - Extensions: map[string]interface{}{}, // Empty extensions for the first interceptor in chain + Extensions: extensions, Context: &triggersv1.TriggerContext{ EventURL: in.URL.String(), EventID: eventID, // t.Name might not be fully accurate until we get rid of triggers inlined within EventListener - TriggerID: fmt.Sprintf("namespaces/%s/triggers/%s", t.Namespace, t.Name), // TODO: t.Name might be wrong + TriggerID: triggerID, }, } - for _, i := range t.Spec.Interceptors { + for _, i := range trInt { if i.Webhook != nil { // Old style interceptor body, err := extendBodyWithExtensions([]byte(request.Body), request.Extensions) if err != nil { @@ -297,7 +369,7 @@ func (r Sink) ExecuteInterceptors(t triggersv1.Trigger, in *http.Request, event URL: in.URL, Body: ioutil.NopCloser(bytes.NewBuffer(body)), } - interceptor := webhook.NewInterceptor(i.Webhook, r.HTTPClient, t.Namespace, log) + interceptor := webhook.NewInterceptor(i.Webhook, r.HTTPClient, namespace, log) res, err := interceptor.ExecuteTrigger(req) if err != nil { return nil, nil, nil, err diff --git a/pkg/sink/sink_test.go b/pkg/sink/sink_test.go index e2eb15cdb..c8f7732ae 100644 --- a/pkg/sink/sink_test.go +++ b/pkg/sink/sink_test.go @@ -889,6 +889,59 @@ func TestHandleEvent(t *testing.T) { TaskRef: &pipelinev1.TaskRef{Name: "git-clone"}, }, }}, + }, { + name: "single trigger within EventListener triggerGroup", + resources: test.Resources{ + Triggers: []*triggersv1beta1.Trigger{{ + ObjectMeta: metav1.ObjectMeta{ + Name: "git-clone-trigger", + Namespace: namespace, + Labels: map[string]string{ + "foo": "bar", + }, + }, + Spec: triggersv1beta1.TriggerSpec{ + Bindings: []*triggersv1beta1.TriggerSpecBinding{{ + Ref: "git-clone", + Kind: triggersv1beta1.NamespacedTriggerBindingKind, + }}, + Template: triggersv1beta1.TriggerSpecTemplate{ + Ref: ptr.String("git-clone"), + }, + }, + }}, + EventListeners: []*triggersv1beta1.EventListener{{ + ObjectMeta: metav1.ObjectMeta{ + Name: eventListenerName, + Namespace: namespace, + UID: types.UID(elUID), + }, + Spec: triggersv1beta1.EventListenerSpec{ + TriggerGroups: []triggersv1beta1.EventListenerTriggerGroup{{ + Name: "filter-event", + Interceptors: []*triggersv1beta1.TriggerInterceptor{{ + Ref: triggersv1beta1.InterceptorRef{Name: "cel"}, + Params: []triggersv1beta1.InterceptorParams{{ + Name: "filter", + Value: test.ToV1JSON(t, "has(body.head_commit)"), + }}, + }}, + TriggerSelector: triggersv1beta1.EventListenerTriggerSelector{ + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "foo": "bar", + }, + }, + }, + }}, + }, + }}, + TriggerBindings: []*triggersv1beta1.TriggerBinding{gitCloneTB}, + TriggerTemplates: []*triggersv1beta1.TriggerTemplate{gitCloneTT}, + ClusterInterceptors: []*triggersv1alpha1.ClusterInterceptor{cel}, + }, + eventBody: eventBody, + want: []pipelinev1.TaskRun{gitCloneTaskRun}, }} for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { @@ -897,7 +950,7 @@ func TestHandleEvent(t *testing.T) { sink, dynamicClient := getSinkAssets(t, tc.resources, elName, tc.webhookInterceptor) metricsRecorder := &MetricsHandler{Handler: http.HandlerFunc(sink.HandleEvent)} - ts := httptest.NewServer(http.HandlerFunc(metricsRecorder.Intercept(sink.NewMetricsRecorderInterceptor()))) + ts := httptest.NewServer(metricsRecorder.Intercept(sink.NewMetricsRecorderInterceptor())) defer ts.Close() req, err := http.NewRequest("POST", ts.URL, bytes.NewReader(tc.eventBody)) if err != nil { @@ -1073,7 +1126,7 @@ func TestExecuteInterceptor_Sequential(t *testing.T) { if err != nil { t.Fatalf("http.NewRequest: %v", err) } - resp, header, _, err := r.ExecuteInterceptors(trigger, req, []byte(`{}`), logger.Sugar(), eventID) + resp, header, _, err := r.ExecuteTriggerInterceptors(trigger, req, []byte(`{}`), logger.Sugar(), eventID, map[string]interface{}{}) if err != nil { t.Fatalf("executeInterceptors: %v", err) } @@ -1144,7 +1197,7 @@ func TestExecuteInterceptor_error(t *testing.T) { if err != nil { t.Fatalf("http.NewRequest: %v", err) } - if resp, _, _, err := s.ExecuteInterceptors(trigger, req, nil, logger.Sugar(), eventID); err == nil { + if resp, _, _, err := s.ExecuteTriggerInterceptors(trigger, req, nil, logger.Sugar(), eventID, map[string]interface{}{}); err == nil { t.Errorf("expected error, got: %+v, %v", string(resp), err) } @@ -1169,7 +1222,7 @@ func TestExecuteInterceptor_NotContinue(t *testing.T) { }}}, } url, _ := url.Parse("http://example.com") - _, _, resp, err := s.ExecuteInterceptors(trigger, &http.Request{URL: url}, json.RawMessage(`{"head": "blah"}`), s.Logger, "eventID") + _, _, resp, err := s.ExecuteTriggerInterceptors(trigger, &http.Request{URL: url}, json.RawMessage(`{"head": "blah"}`), s.Logger, "eventID", map[string]interface{}{}) if err != nil { t.Fatalf("ExecuteInterceptor() unexpected error: %v", err) } @@ -1246,7 +1299,7 @@ func TestExecuteInterceptor_ExtensionChaining(t *testing.T) { t.Fatalf("http.NewRequest: %v", err) } body := fmt.Sprintf(`{"sha": "%s"}`, sha) - resp, _, iresp, err := s.ExecuteInterceptors(trigger, req, []byte(body), s.Logger, eventID) + resp, _, iresp, err := s.ExecuteTriggerInterceptors(trigger, req, []byte(body), s.Logger, eventID, map[string]interface{}{}) if err != nil { t.Fatalf("executeInterceptors: %v", err) } diff --git a/test/builder/eventlistener_test.go b/test/builder/eventlistener_test.go index 9d8c60c87..40b677c14 100644 --- a/test/builder/eventlistener_test.go +++ b/test/builder/eventlistener_test.go @@ -593,8 +593,7 @@ func TestEventListenerBuilder(t *testing.T) { ), ), ), - }, - } + }} for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { if diff := cmp.Diff(tt.normal, tt.builder, cmpopts.IgnoreTypes(apis.Condition{}.LastTransitionTime.Inner.Time)); diff != "" { diff --git a/third_party/LICENSE b/third_party/LICENSE deleted file mode 100644 index 6a66aea5e..000000000 --- a/third_party/LICENSE +++ /dev/null @@ -1,27 +0,0 @@ -Copyright (c) 2009 The Go Authors. All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are -met: - - * Redistributions of source code must retain the above copyright -notice, this list of conditions and the following disclaimer. - * Redistributions in binary form must reproduce the above -copyright notice, this list of conditions and the following disclaimer -in the documentation and/or other materials provided with the -distribution. - * Neither the name of Google Inc. nor the names of its -contributors may be used to endorse or promote products derived from -this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/third_party/github.com/hashicorp/golang-lru/simplelru/lru_interface.go b/third_party/github.com/hashicorp/golang-lru/simplelru/lru_interface.go index a0b97e3f7..92d70934d 100644 --- a/third_party/github.com/hashicorp/golang-lru/simplelru/lru_interface.go +++ b/third_party/github.com/hashicorp/golang-lru/simplelru/lru_interface.go @@ -34,6 +34,6 @@ type LRUCache interface { // Clears all cache entries. Purge() - // Resizes cache, returning number evicted - Resize(int) int + // Resizes cache, returning number evicted + Resize(int) int } diff --git a/third_party/vendor/golang.org/x/crypto/LICENSE b/third_party/vendor/golang.org/x/crypto/LICENSE deleted file mode 100644 index 6a66aea5e..000000000 --- a/third_party/vendor/golang.org/x/crypto/LICENSE +++ /dev/null @@ -1,27 +0,0 @@ -Copyright (c) 2009 The Go Authors. All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are -met: - - * Redistributions of source code must retain the above copyright -notice, this list of conditions and the following disclaimer. - * Redistributions in binary form must reproduce the above -copyright notice, this list of conditions and the following disclaimer -in the documentation and/or other materials provided with the -distribution. - * Neither the name of Google Inc. nor the names of its -contributors may be used to endorse or promote products derived from -this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/third_party/vendor/golang.org/x/net/LICENSE b/third_party/vendor/golang.org/x/net/LICENSE deleted file mode 100644 index 6a66aea5e..000000000 --- a/third_party/vendor/golang.org/x/net/LICENSE +++ /dev/null @@ -1,27 +0,0 @@ -Copyright (c) 2009 The Go Authors. All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are -met: - - * Redistributions of source code must retain the above copyright -notice, this list of conditions and the following disclaimer. - * Redistributions in binary form must reproduce the above -copyright notice, this list of conditions and the following disclaimer -in the documentation and/or other materials provided with the -distribution. - * Neither the name of Google Inc. nor the names of its -contributors may be used to endorse or promote products derived from -this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/third_party/vendor/golang.org/x/text/LICENSE b/third_party/vendor/golang.org/x/text/LICENSE deleted file mode 100644 index 6a66aea5e..000000000 --- a/third_party/vendor/golang.org/x/text/LICENSE +++ /dev/null @@ -1,27 +0,0 @@ -Copyright (c) 2009 The Go Authors. All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are -met: - - * Redistributions of source code must retain the above copyright -notice, this list of conditions and the following disclaimer. - * Redistributions in binary form must reproduce the above -copyright notice, this list of conditions and the following disclaimer -in the documentation and/or other materials provided with the -distribution. - * Neither the name of Google Inc. nor the names of its -contributors may be used to endorse or promote products derived from -this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.