Skip to content

Commit

Permalink
use goroutines to monitor unhealthy apis
Browse files Browse the repository at this point in the history
Signed-off-by: Justin Kolberg <[email protected]>
  • Loading branch information
amdprophet committed Feb 25, 2019
1 parent 022ef68 commit 14c3086
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 12 deletions.
31 changes: 31 additions & 0 deletions uchiwa/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ func (f *DatacenterFetcher) Fetch() {
// goroutines have properly returned.
go func() {
wg.Wait()
d.checkAPIHealth()
cancel()
}()

Expand Down Expand Up @@ -242,6 +243,36 @@ func (d *DatacenterSnapshotFetcher) determineHealth() structs.SensuHealth {
return structs.SensuHealth{Output: "ok", Status: 0}
}

// checkAPIHealth will find unhealthy APIs and spawn a goroutine to check their
// health if not already checking
func (d *DatacenterSnapshotFetcher) checkAPIHealth() {
for i, api := range d.datacenter.APIs {
if !api.Healthy && !api.CheckingHealth {
go d.startAPIHealthChecker(i)
}
}
}

func (d *DatacenterSnapshotFetcher) startAPIHealthChecker(i int) {
d.datacenter.APIs[i].CheckingHealth = true
api := d.datacenter.APIs[i]
logger.Warningf("sensu api is unhealthy: %s (datacenter: %s)", api.URL, d.datacenter.Name)
for {
timer := time.NewTimer(time.Second * 10)
select {
case <-timer.C:
logger.Warningf("checking health of sensu api %s (datacenter: %s)", api.URL, d.datacenter.Name)
_, err := d.datacenter.GetInfoFromAPI(i)
if err == nil {
logger.Warningf("sensu api is healthy again: %s (datacenter: %s)", api.URL, d.datacenter.Name)
d.datacenter.APIs[i].CheckingHealth = false
return
}
logger.Warningf("sensu api is still unhealthy: %s (datacenter: %s)", api.URL, d.datacenter.Name)
}
}
}

func (d *DatacenterSnapshotFetcher) fetchStashes(ctx context.Context, errCh chan error) {
defer d.wg.Done()

Expand Down
17 changes: 17 additions & 0 deletions uchiwa/sensu/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,20 @@ func (s *Sensu) GetInfo() (*structs.Info, error) {

return &info, nil
}

// GetInfo returns a pointer to a structs.Info struct containing the
// Sensu version and the transport and Redis connection information
func (s *Sensu) GetInfoFromAPI(i int) (*structs.Info, error) {
api := &s.APIs[i]
body, _, err := s.getBytesFromAPI(api, "info")
if err != nil {
return nil, err
}

var info structs.Info
if err := json.Unmarshal(body, &info); err != nil {
return nil, fmt.Errorf("Parsing JSON-encoded response body: %v", err)
}

return &info, nil
}
43 changes: 31 additions & 12 deletions uchiwa/sensu/loadbalancing.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,27 @@ func (s *Sensu) getBytes(endpoint string) ([]byte, *http.Response, error) {
shuffledRange := shuffle(makeRange(len(apis)))

for _, i := range shuffledRange {
logger.Debugf("GET %s/%s (%s)", s.APIs[i].URL, endpoint, s.Name)
bytes, res, err = apis[i].getBytes(endpoint)
bytes, res, err = s.getBytesFromAPI(apis[i], endpoint)
if err == nil {
return bytes, res, err
return bytes, res, nil
}
s.APIs[i].Healthy = false
logger.Debugf("GET %s/%s (%s) returned: %v", s.APIs[i].URL, endpoint, s.Name, err)
}

return nil, nil, err
}

func (s *Sensu) getBytesFromAPI(api *API, endpoint string) ([]byte, *http.Response, error) {
logger.Debugf("GET %s/%s (%s)", api.URL, endpoint, s.Name)
bytes, res, err := api.getBytes(endpoint)
if err != nil {
api.Healthy = false
return bytes, res, err
}
api.Healthy = true
logger.Debugf("GET %s/%s (%s) returned: %v", api.URL, endpoint, s.Name, err)
return bytes, res, err
}

func (s *Sensu) getSlice(ctx context.Context, endpoint string, limit int) ([]interface{}, error) {
var err error
var slice []interface{}
Expand Down Expand Up @@ -89,18 +98,28 @@ func (s *Sensu) getMap(endpoint string) (map[string]interface{}, error) {
shuffledRange := shuffle(makeRange(len(apis)))

for _, i := range shuffledRange {
logger.Debugf("GET %s/%s (%s)", s.APIs[i].URL, endpoint, s.Name)
logger.Debugf("GET %s/%s (%s)", apis[i].URL, endpoint, s.Name)
m, err = apis[i].getMap(endpoint)
if err == nil {
return m, err
}
s.APIs[i].Healthy = false
logger.Debugf("GET %s/%s (%s) returned: %v", s.APIs[i].URL, endpoint, s.Name, err)
apis[i].Healthy = false
logger.Debugf("GET %s/%s (%s) returned: %v", apis[i].URL, endpoint, s.Name, err)
}

return nil, err
}

func (s *Sensu) getMapFromAPI(api *API, endpoint string) (map[string]interface{}, error) {

m, err := api.getMap(endpoint)
if err != nil {
api.Healthy = false

}
return m, err
}

func (s *Sensu) postPayload(endpoint string, payload string) (map[string]interface{}, error) {
var err error
var m map[string]interface{}
Expand All @@ -125,10 +144,10 @@ func (s *Sensu) postPayload(endpoint string, payload string) (map[string]interfa

// healthyAPIs returns a list of APIs with Healthy set to true or returns an error when there are
// no healthy APIs
func (s *Sensu) healthyAPIs() ([]API, error) {
var healthyAPIs []API
for _, api := range s.APIs {
logger.Debugf("API %s is healthy? %t", api.URL, api.Healthy)
func (s *Sensu) healthyAPIs() ([]*API, error) {
var healthyAPIs []*API
for i := range s.APIs {
api := &s.APIs[i]
if api.Healthy {
healthyAPIs = append(healthyAPIs, api)
}
Expand Down
1 change: 1 addition & 0 deletions uchiwa/sensu/sensu.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type API struct {
URL string
User string
Healthy bool
CheckingHealth bool

Client http.Client
}
Expand Down

0 comments on commit 14c3086

Please sign in to comment.