Skip to content

Commit

Permalink
Add test for event source failover.
Browse files Browse the repository at this point in the history
  • Loading branch information
xperimental committed Nov 10, 2016
1 parent 71130f8 commit be7ac71
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 9 deletions.
24 changes: 15 additions & 9 deletions main/bamboo/bamboo.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,17 +188,23 @@ func listenToZookeeper(conf configuration.Configuration, eventBus *event_bus.Eve
}

func listenToMarathonEventStream(conf *configuration.Configuration, sub api.EventSubscriptionAPI) {
go func() {
ticker := time.NewTicker(1 * time.Second)
for _ = range ticker.C {
for _, marathon := range conf.Marathon.Endpoints() {
ch := connectToMarathonEventStream(marathon, conf.Marathon.User, conf.Marathon.Password)
for payload := range ch {
sub.Notify(payload)
}
ticker := time.NewTicker(1 * time.Second)
go listenToMarathonEventStreamLoop(conf, &sub, ticker.C)
}

type eventBusSink interface {
Notify(payload []byte)
}

func listenToMarathonEventStreamLoop(conf *configuration.Configuration, sink eventBusSink, ticker <-chan time.Time) {
for _ = range ticker {
for _, marathon := range conf.Marathon.Endpoints() {
ch := connectToMarathonEventStream(marathon, conf.Marathon.User, conf.Marathon.Password)
for payload := range ch {
sink.Notify(payload)
}
}
}()
}
}

func connectToMarathonEventStream(marathon, user, password string) <-chan []byte {
Expand Down
88 changes: 88 additions & 0 deletions main/bamboo/bamboo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ import (
"net/http"
"net/http/httptest"
"reflect"
"strings"
"testing"
"time"

"github.com/QubitProducts/bamboo/configuration"
)

func eventSourceHandler(lines ...string) http.Handler {
Expand Down Expand Up @@ -117,3 +121,87 @@ func TestConnectToMarathonEventStream(t *testing.T) {
}
})
}

type stubEventSink struct {
payloads [][]byte
}

func (s *stubEventSink) Notify(payload []byte) {
s.payloads = append(s.payloads, payload)
}

func TestListenToMarathonEventStream(t *testing.T) {
for _, test := range []struct {
name string
handlers []http.Handler
ticks []time.Time
payloads [][]byte
}{
{
name: "no-ticks",
},
{
name: "invalid-server",
ticks: []time.Time{
time.Unix(0, 0),
},
},
{
name: "single server",
handlers: []http.Handler{
eventSourceHandler("data: payload"),
},
ticks: []time.Time{
time.Unix(0, 0),
},
payloads: [][]byte{
[]byte("payload\n"),
},
},
{
name: "failover",
handlers: []http.Handler{
eventSourceHandler("data: payload", "data: payload2"),
eventSourceHandler("data: payload3", "data: payload4"),
},
ticks: []time.Time{
time.Unix(0, 0),
},
payloads: [][]byte{
[]byte("payload\n"),
[]byte("payload2\n"),
[]byte("payload3\n"),
[]byte("payload4\n"),
},
},
} {
t.Run(test.name, func(t *testing.T) {
stubSink := &stubEventSink{}
ticker := make(chan time.Time)

urls := []string{}
for _, h := range test.handlers {
s := httptest.NewServer(h)
urls = append(urls, s.URL)
}
conf := &configuration.Configuration{
Marathon: configuration.Marathon{
Endpoint: strings.Join(urls, ","),
},
}

go func() {
for _, t := range test.ticks {
ticker <- t
}
close(ticker)
}()

listenToMarathonEventStreamLoop(conf, stubSink, ticker)

if !reflect.DeepEqual(stubSink.payloads, test.payloads) {
t.Errorf("got payloads %s, wanted %s", stubSink.payloads, test.payloads)
}
})
}
}

0 comments on commit be7ac71

Please sign in to comment.