Skip to content

Commit

Permalink
Alertmanager: Increase likehood of useful results from /grafana/recei…
Browse files Browse the repository at this point in the history
…vers. (grafana#8540)

* Alertmanager: Increase likehood of useful results from /grafana/receivers.

The experimental, extended /receivers API provides information about the last
notification attempt for receivers, including the last error and duration.

Currently, because of how the Alertmanager ring is configured, we only require
a successful response from one replica. For the receivers API however this is
insufficient, because any of the 3 replicas could have sent notifications
recently. It's more likely that one of the replicas will (the replica in
"position 0"), but any replica could send any given notification, so asking
all replicas is arguably more robust.

* Review comments
  • Loading branch information
stevesg authored Jun 27, 2024
1 parent 045798d commit ca80697
Showing 1 changed file with 82 additions and 0 deletions.
82 changes: 82 additions & 0 deletions pkg/alertmanager/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@ func (d *Distributor) isQuorumReadPath(p string) (bool, merger.Merger) {
if strings.HasSuffix(path.Dir(p), "/v2/silence") {
return true, merger.V2SilenceID{}
}
return false, nil
}

func (d *Distributor) isAllReadPath(p string) (bool, merger.Merger) {
// When querying receivers status, we need to wait for responses from all three replicas
// for best quality results. This is because any of the three could have sent a
// notification. This does mean that some requests might fail if a replica is unavailable
// but it has not left the ring or been deemed Unhealthy yet.
if strings.HasSuffix(p, "/api/v1/grafana/receivers") {
return true, merger.ExperimentalReceivers{}
}
Expand Down Expand Up @@ -136,6 +144,10 @@ func (d *Distributor) DistributeRequest(w http.ResponseWriter, r *http.Request)
d.doQuorum(userID, w, r, logger, m)
return
}
if ok, m := d.isAllReadPath(r.URL.Path); ok {
d.doAll(userID, w, r, logger, m)
return
}

// All other paths are just passed to a random replica.
// This is primarily used for serving the web user interface.
Expand All @@ -146,6 +158,76 @@ func (d *Distributor) DistributeRequest(w http.ResponseWriter, r *http.Request)
http.Error(w, "route not supported by distributor", http.StatusNotFound)
}

func (d *Distributor) doAll(userID string, w http.ResponseWriter, r *http.Request, logger log.Logger, m merger.Merger) {
var body []byte
var err error
if r.Body != nil {
body, err = io.ReadAll(http.MaxBytesReader(w, r.Body, d.maxRecvMsgSize))
if err != nil {
if util.IsRequestBodyTooLarge(err) {
http.Error(w, "Request body too large", http.StatusRequestEntityTooLarge)
return
}
level.Error(logger).Log("msg", "failed to read the request body during write", "err", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
}

grpcHeaders := httpToHttpgrpcHeaders(r.Header)

// Only get the set of replicas which contain the specified user.
key := shardByUser(userID)
replicationSet, err := d.alertmanagerRing.Get(key, RingOp, nil, nil, nil)
if err != nil {
respondFromError(err, w, logger)
return
}

// Force waiting for responses from all healthy replicas.
replicationSet.MaxErrors = 0

results, err := replicationSet.Do(r.Context(), 0, func(ctx context.Context, instance *ring.InstanceDesc) (any, error) {
ctx = user.InjectOrgID(ctx, userID)
sp, ctx := opentracing.StartSpanFromContext(ctx, "Distributor.doAll")
defer sp.Finish()

resp, err := d.doRequest(ctx, *instance, &httpgrpc.HTTPRequest{
Method: r.Method,
Url: r.RequestURI,
Body: body,
Headers: grpcHeaders,
})
if err != nil {
return nil, err
}

if resp.Code/100 != 2 {
return nil, httpgrpc.ErrorFromHTTPResponse(resp)
}

return resp, nil
})

if err != nil {
respondFromError(err, w, logger)
return
}

responses := make([]*httpgrpc.HTTPResponse, len(results))
for i := range results {
responses[i] = results[i].(*httpgrpc.HTTPResponse)
}

if len(results) > 0 {
respondFromMultipleHTTPGRPCResponses(w, logger, responses, m)
} else {
// This should not happen.
level.Error(logger).Log("msg", "distributor did not receive any response from alertmanagers, but there were no errors")
w.WriteHeader(http.StatusInternalServerError)
}
}

func (d *Distributor) doQuorum(userID string, w http.ResponseWriter, r *http.Request, logger log.Logger, m merger.Merger) {
var body []byte
var err error
Expand Down

0 comments on commit ca80697

Please sign in to comment.