Skip to content

Commit

Permalink
Refactor webhook route config (argoproj#243)
Browse files Browse the repository at this point in the history
* refactor(): webhook route config

* refactor(): webhook route config and removing trello gateway

* refactor(): route config

* fix(): bug introduced from argoproj#221

* chore(): fix calendar gateway example

* fix(): sns gateway test

* test(): recover test

* fix(): makefile

* chore(): updating comments

* chore(): updating makefile image version

* refactor(): rename RouteConfig interface to RouteManager
  • Loading branch information
VaibhavPage authored Mar 21, 2019
1 parent cb6aa09 commit 5084e6d
Show file tree
Hide file tree
Showing 50 changed files with 657 additions and 526 deletions.
1 change: 1 addition & 0 deletions examples/gateways/github-gateway-configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ metadata:
name: github-gateway-configmap
data:
project_1: |-
id: 1234
owner: "owner-example"
repository: "repo-example"
hook:
Expand Down
1 change: 1 addition & 0 deletions examples/gateways/gitlab-gateway-configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ metadata:
name: gitlab-gateway-configmap
data:
project_1: |-
id: 1234
projectId: "1"
hook:
endpoint: "/push"
Expand Down
27 changes: 23 additions & 4 deletions gateways/common/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,35 @@ func (f *FakeHttpWriter) WriteHeader(status int) {
f.HeaderStatus = status
}

func GetFakeRouteConfig() *RouteConfig {
return &RouteConfig{
type FakeRouteConfig struct {
route *Route
}

func (f *FakeRouteConfig) GetRoute() *Route {
return f.route
}

func (f *FakeRouteConfig) RouteHandler(writer http.ResponseWriter, request *http.Request) {
}

func (f *FakeRouteConfig) PostStart() error {
return nil
}

func (f *FakeRouteConfig) PostStop() error {
return nil
}

func GetFakeRoute() *Route {
logger := common.GetLoggerContext(common.LoggerConf()).Logger()
return &Route{
Webhook: Hook,
EventSource: &gateways.EventSource{
Name: "fake-event-source",
Data: "hello",
Id: "123",
},
Log: common.GetLoggerContext(common.LoggerConf()).Logger(),
Configs: make(map[string]interface{}),
Logger: &logger,
StartCh: make(chan struct{}),
}
}
Expand Down
158 changes: 97 additions & 61 deletions gateways/common/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ type WebhookHelper struct {
// ActiveEndpoints keep track of endpoints that are already registered with server and their status active or inactive
ActiveEndpoints map[string]*Endpoint
// RouteActivateChan handles assigning new route to server.
RouteActivateChan chan *RouteConfig
RouteActivateChan chan RouteManager
// RouteDeactivateChan handles deactivating existing route
RouteDeactivateChan chan *RouteConfig
RouteDeactivateChan chan RouteManager
}

// HTTP Muxer
Expand All @@ -73,16 +73,20 @@ type activeServer struct {
errChan chan error
}

// RouteConfig contains configuration about an http route
type RouteConfig struct {
Webhook *Webhook
Configs map[string]interface{}
EventSource *gateways.EventSource
Log zerolog.Logger
StartCh chan struct{}
RouteActiveHandler func(writer http.ResponseWriter, request *http.Request, rc *RouteConfig)
PostActivate func(rc *RouteConfig) error
PostStop func(rc *RouteConfig) error
// Route contains common information for a route
type Route struct {
Webhook *Webhook
Logger *zerolog.Logger
StartCh chan struct{}
EventSource *gateways.EventSource
}

// RouteManager is an interface to manage the configuration for a route
type RouteManager interface {
GetRoute() *Route
RouteHandler(writer http.ResponseWriter, request *http.Request)
PostStart() error
PostStop() error
}

// endpoint contains state of an http endpoint
Expand All @@ -93,14 +97,14 @@ type Endpoint struct {
DataCh chan []byte
}

// NewWebhookHelper returns new webhook helper
// NewWebhookHelper returns new Webhook helper
func NewWebhookHelper() *WebhookHelper {
return &WebhookHelper{
ActiveEndpoints: make(map[string]*Endpoint),
ActiveServers: make(map[string]*activeServer),
Mutex: sync.Mutex{},
RouteActivateChan: make(chan *RouteConfig),
RouteDeactivateChan: make(chan *RouteConfig),
RouteActivateChan: make(chan RouteManager),
RouteDeactivateChan: make(chan RouteManager),
}
}

Expand All @@ -110,11 +114,12 @@ func InitRouteChannels(helper *WebhookHelper) {
select {
case config := <-helper.RouteActivateChan:
// start server if it has not been started on this port
config.startHttpServer(helper)
config.StartCh <- struct{}{}
startHttpServer(config, helper)
startCh := config.GetRoute().StartCh
startCh <- struct{}{}

case config := <-helper.RouteDeactivateChan:
webhook := config.Webhook
webhook := config.GetRoute().Webhook
_, ok := helper.ActiveServers[webhook.Port]
if ok {
helper.ActiveEndpoints[webhook.Endpoint].Active = false
Expand All @@ -129,33 +134,34 @@ func (s *server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

// starts a http server
func (rc *RouteConfig) startHttpServer(helper *WebhookHelper) {
func startHttpServer(routeManager RouteManager, helper *WebhookHelper) {
// start a http server only if no other configuration previously started the server on given port
helper.Mutex.Lock()
if _, ok := helper.ActiveServers[rc.Webhook.Port]; !ok {
r := routeManager.GetRoute()
if _, ok := helper.ActiveServers[r.Webhook.Port]; !ok {
s := &server{
mux: http.NewServeMux(),
}
rc.Webhook.mux = s.mux
rc.Webhook.srv = &http.Server{
Addr: ":" + fmt.Sprintf("%s", rc.Webhook.Port),
r.Webhook.mux = s.mux
r.Webhook.srv = &http.Server{
Addr: ":" + fmt.Sprintf("%s", r.Webhook.Port),
Handler: s,
}
errChan := make(chan error, 1)
helper.ActiveServers[rc.Webhook.Port] = &activeServer{
helper.ActiveServers[r.Webhook.Port] = &activeServer{
srv: s.mux,
errChan: errChan,
}

// start http server
go func() {
var err error
if rc.Webhook.ServerCertPath == "" || rc.Webhook.ServerKeyPath == "" {
err = rc.Webhook.srv.ListenAndServe()
if r.Webhook.ServerCertPath == "" || r.Webhook.ServerKeyPath == "" {
err = r.Webhook.srv.ListenAndServe()
} else {
err = rc.Webhook.srv.ListenAndServeTLS(rc.Webhook.ServerCertPath, rc.Webhook.ServerKeyPath)
err = r.Webhook.srv.ListenAndServeTLS(r.Webhook.ServerCertPath, r.Webhook.ServerKeyPath)
}
rc.Log.Error().Err(err).Str("event-source", rc.EventSource.Name).Str("port", rc.Webhook.Port).Msg("http server stopped")
r.Logger.Error().Err(err).Str("event-source", r.EventSource.Name).Str("port", r.Webhook.Port).Msg("http server stopped")
if err != nil {
errChan <- err
}
Expand All @@ -165,76 +171,87 @@ func (rc *RouteConfig) startHttpServer(helper *WebhookHelper) {
}

// activateRoute activates route
func (rc *RouteConfig) activateRoute(helper *WebhookHelper) {
helper.RouteActivateChan <- rc
func activateRoute(routeManager RouteManager, helper *WebhookHelper) {
r := routeManager.GetRoute()
helper.RouteActivateChan <- routeManager

<-rc.StartCh
<-r.StartCh

if rc.Webhook.mux == nil {
if r.Webhook.mux == nil {
helper.Mutex.Lock()
rc.Webhook.mux = helper.ActiveServers[rc.Webhook.Port].srv
r.Webhook.mux = helper.ActiveServers[r.Webhook.Port].srv
helper.Mutex.Unlock()
}

rc.Log.Info().Str("event-source-name", rc.EventSource.Name).Str("port", rc.Webhook.Port).Str("endpoint", rc.Webhook.Endpoint).Msg("adding route handler")
if _, ok := helper.ActiveEndpoints[rc.Webhook.Endpoint]; !ok {
helper.ActiveEndpoints[rc.Webhook.Endpoint] = &Endpoint{
r.Logger.Info().Str("event-source-name", r.EventSource.Name).Str("port", r.Webhook.Port).Str("endpoint", r.Webhook.Endpoint).Msg("adding route handler")

if _, ok := helper.ActiveEndpoints[r.Webhook.Endpoint]; !ok {
helper.ActiveEndpoints[r.Webhook.Endpoint] = &Endpoint{
Active: true,
DataCh: make(chan []byte),
}
rc.Webhook.mux.HandleFunc(rc.Webhook.Endpoint, func(writer http.ResponseWriter, request *http.Request) {
rc.RouteActiveHandler(writer, request, rc)
})
r.Webhook.mux.HandleFunc(r.Webhook.Endpoint, routeManager.RouteHandler)
}
helper.ActiveEndpoints[rc.Webhook.Endpoint].Active = true
helper.ActiveEndpoints[r.Webhook.Endpoint].Active = true

rc.Log.Info().Str("event-source-name", rc.EventSource.Name).Str("port", rc.Webhook.Port).Str("endpoint", rc.Webhook.Endpoint).Msg("route handler added")
r.Logger.Info().Str("event-source-name", r.EventSource.Name).Str("port", r.Webhook.Port).Str("endpoint", r.Webhook.Endpoint).Msg("route handler added")
}

func (rc *RouteConfig) processChannels(helper *WebhookHelper, eventStream gateways.Eventing_StartEventSourceServer) error {
func processChannels(routeManager RouteManager, helper *WebhookHelper, eventStream gateways.Eventing_StartEventSourceServer) error {
r := routeManager.GetRoute()

for {
select {
case data := <-helper.ActiveEndpoints[rc.Webhook.Endpoint].DataCh:
rc.Log.Info().Str("event-source-name", rc.EventSource.Name).Msg("new event received, dispatching to gateway client")
case data := <-helper.ActiveEndpoints[r.Webhook.Endpoint].DataCh:
r.Logger.Info().Str("event-source-name", r.EventSource.Name).Msg("new event received, dispatching to gateway client")
err := eventStream.Send(&gateways.Event{
Name: rc.EventSource.Name,
Name: r.EventSource.Name,
Payload: data,
})
if err != nil {
rc.Log.Error().Err(err).Str("event-source-name", rc.EventSource.Name).Msg("failed to send event")
r.Logger.Error().Err(err).Str("event-source-name", r.EventSource.Name).Msg("failed to send event")
return err
}

case <-eventStream.Context().Done():
rc.Log.Info().Str("event-source-name", rc.EventSource.Name).Msg("connection is closed by client")
helper.RouteDeactivateChan <- rc
r.Logger.Info().Str("event-source-name", r.EventSource.Name).Msg("connection is closed by client")
helper.RouteDeactivateChan <- routeManager
return nil

// this error indicates that the server has stopped running
case err := <-helper.ActiveServers[rc.Webhook.Port].errChan:
case err := <-helper.ActiveServers[r.Webhook.Port].errChan:
return err
}
}
}

func DefaultPostActivate(rc *RouteConfig) error {
return nil
}
func ProcessRoute(routeManager RouteManager, helper *WebhookHelper, eventStream gateways.Eventing_StartEventSourceServer) error {
r := routeManager.GetRoute()

func DefaultPostStop(rc *RouteConfig) error {
return nil
}
r.Logger.Info().Str("event-source", r.EventSource.Name).Msg("validating the route")
if err := validateRoute(routeManager.GetRoute()); err != nil {
r.Logger.Error().Err(err).Str("event-source", r.EventSource.Name).Msg("error occurred validating route")
return err
}

r.Logger.Info().Str("event-source", r.EventSource.Name).Msg("activating the route")
activateRoute(routeManager, helper)

func ProcessRoute(rc *RouteConfig, helper *WebhookHelper, eventStream gateways.Eventing_StartEventSourceServer) error {
rc.activateRoute(helper)
if err := rc.PostActivate(rc); err != nil {
r.Logger.Info().Str("event-source", r.EventSource.Name).Msg("running post start")
if err := routeManager.PostStart(); err != nil {
r.Logger.Error().Err(err).Str("event-source", r.EventSource.Name).Msg("error occurred in post start")
return err
}
if err := rc.processChannels(helper, eventStream); err != nil {

r.Logger.Info().Str("event-source", r.EventSource.Name).Msg("processing channels")
if err := processChannels(routeManager, helper, eventStream); err != nil {
r.Logger.Error().Err(err).Str("event-source", r.EventSource.Name).Msg("error occurred in process channel")
return err
}
if err := rc.PostStop(rc); err != nil {
rc.Log.Error().Err(err).Msg("error occurred while executing post stop logic")

r.Logger.Info().Str("event-source", r.EventSource.Name).Msg("running post stop")
if err := routeManager.PostStop(); err != nil {
r.Logger.Error().Err(err).Str("event-source", r.EventSource.Name).Msg("error occurred in post stop")
}
return nil
}
Expand All @@ -258,6 +275,25 @@ func ValidateWebhook(w *Webhook) error {
return nil
}

func validateRoute(r *Route) error {
if r == nil {
return fmt.Errorf("route can't be nil")
}
if r.Webhook == nil {
return fmt.Errorf("webhook can't be nil")
}
if r.StartCh == nil {
return fmt.Errorf("start channel can't be nil")
}
if r.EventSource == nil {
return fmt.Errorf("event source can't be nil")
}
if r.Logger == nil {
return fmt.Errorf("logger can't be nil")
}
return nil
}

func FormatWebhookEndpoint(endpoint string) string {
if !strings.HasPrefix(endpoint, "/") {
return fmt.Sprintf("/%s", endpoint)
Expand Down
Loading

0 comments on commit 5084e6d

Please sign in to comment.