Skip to content

Commit

Permalink
Merge branch 'master' into chris/alert-webhooks
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisSchinnerl committed Aug 18, 2023
2 parents b348416 + 9bb5aae commit fccf6e0
Show file tree
Hide file tree
Showing 17 changed files with 371 additions and 192 deletions.
64 changes: 52 additions & 12 deletions alerts/alerts.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package alerts

import (
"context"
"errors"
"fmt"
"sort"
"strings"
Expand Down Expand Up @@ -33,6 +34,11 @@ const (
)

type (
Alerter interface {
RegisterAlert(_ context.Context, a Alert) error
DismissAlerts(_ context.Context, ids ...types.Hash256) error
}

// Severity indicates the severity of an alert.
Severity uint8

Expand Down Expand Up @@ -98,27 +104,33 @@ func (s *Severity) UnmarshalJSON(b []byte) error {
return nil
}

// Register registers a new alert with the manager
func (m *Manager) Register(ctx context.Context, a Alert) {
if a.ID == (types.Hash256{}) {
panic("cannot register alert with empty ID") // developer error
} else if a.Timestamp.IsZero() {
panic("cannot register alert with zero timestamp") // developer error
// RegisterAlert implements the Alerter interface.
func (m *Manager) RegisterAlert(ctx context.Context, alert Alert) error {
if alert.ID == (types.Hash256{}) {
return errors.New("cannot register alert with zero id")
} else if alert.Timestamp.IsZero() {
return errors.New("cannot register alert with zero timestamp")
} else if alert.Severity == 0 {
return errors.New("cannot register alert without severity")
} else if alert.Message == "" {
return errors.New("cannot register alert without a message")
} else if alert.Data == nil || alert.Data["origin"] == "" {
return errors.New("caannot register alert without origin")
}

m.mu.Lock()
m.alerts[a.ID] = a
m.alerts[alert.ID] = alert
m.mu.Unlock()

m.webhookBroadcaster.BroadcastAction(ctx, webhooks.Action{
return m.webhookBroadcaster.BroadcastAction(ctx, webhooks.Action{
Module: webhookModule,
Event: webhookEventRegister,
Payload: a,
Payload: alert,
})
}

// Dismiss removes the alerts with the given IDs.
func (m *Manager) Dismiss(ctx context.Context, ids ...types.Hash256) {
// DismissAlerts implements the Alerter interface.
func (m *Manager) DismissAlerts(ctx context.Context, ids ...types.Hash256) error {
m.mu.Lock()
for _, id := range ids {
delete(m.alerts, id)
Expand All @@ -128,7 +140,7 @@ func (m *Manager) Dismiss(ctx context.Context, ids ...types.Hash256) {
}
m.mu.Unlock()

m.webhookBroadcaster.BroadcastAction(ctx, webhooks.Action{
return m.webhookBroadcaster.BroadcastAction(ctx, webhooks.Action{
Module: webhookModule,
Event: webhookEventDismiss,
Payload: ids,
Expand Down Expand Up @@ -157,3 +169,31 @@ func NewManager(b webhooks.Broadcaster) *Manager {
webhookBroadcaster: b,
}
}

type originAlerter struct {
alerter Alerter
origin string
}

// WithOrigin wraps an Alerter in an originAlerter which always attaches the
// origin field to alerts.
func WithOrigin(alerter Alerter, origin string) Alerter {
return &originAlerter{
alerter: alerter,
origin: origin,
}
}

// RegisterAlert implements the Alerter interface.
func (a *originAlerter) RegisterAlert(ctx context.Context, alert Alert) error {
if alert.Data == nil {
alert.Data = make(map[string]any)
}
alert.Data["origin"] = a.origin
return a.alerter.RegisterAlert(ctx, alert)
}

// DismissAlerts implements the Alerter interface.
func (a *originAlerter) DismissAlerts(ctx context.Context, ids ...types.Hash256) error {
return a.alerter.DismissAlerts(ctx, ids...)
}
14 changes: 10 additions & 4 deletions alerts/alerts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,12 @@ func TestWebhooks(t *testing.T) {
Severity: SeverityWarning,
Timestamp: time.Unix(0, 0),
}
alerts.Register(context.Background(), a)
alerts.Dismiss(context.Background(), types.Hash256{1})
if err := alerts.RegisterAlert(context.Background(), a); err != nil {
t.Fatal(err)
}
if err := alerts.DismissAlerts(context.Background(), types.Hash256{1}); err != nil {
t.Fatal(err)
}

// list hooks
hooks, _ := mgr.Info()
Expand All @@ -78,11 +82,13 @@ func TestWebhooks(t *testing.T) {
}

// perform an action that should not trigger the endpoint
alerts.Register(context.Background(), Alert{
if err := alerts.RegisterAlert(context.Background(), Alert{
ID: types.Hash256{2},
Severity: SeverityWarning,
Timestamp: time.Now(),
})
}); err != nil {
t.Fatal(err)
}

// check events
for i := 0; i < 10; i++ {
Expand Down
13 changes: 13 additions & 0 deletions api/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,19 @@ type ContractAcquireResponse struct {
LockID uint64 `json:"lockID"`
}

// ContractsPrunableDataResponse is the response type for the
// /contracts/prunable endpoint.
type ContractsPrunableDataResponse struct {
Contracts []ContractPrunableData `json:"contracts"`
TotalPrunable uint64 `json:"totalPrunable"`
TotalSize uint64 `json:"totalSize"`
}

type ContractPrunableData struct {
ID types.FileContractID `json:"id"`
ContractSize
}

// HostsRemoveRequest is the request type for the /hosts/remove endpoint.
type HostsRemoveRequest struct {
MaxDowntimeHours ParamDurationHour `json:"maxDowntimeHours"`
Expand Down
7 changes: 7 additions & 0 deletions api/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ type (
Revision *types.FileContractRevision `json:"revision"`
}

// ContractSize contains information about the size of the contract and
// about how much of the contract data can be pruned.
ContractSize struct {
Prunable uint64 `json:"prunable"`
Size uint64 `json:"size"`
}

// ContractMetadata contains all metadata for a contract.
ContractMetadata struct {
ID types.FileContractID `json:"id"`
Expand Down
13 changes: 8 additions & 5 deletions autopilot/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (a *accounts) refillWorkerAccounts(w Worker) {
l = a.l
}
rCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
accountID, refilled, err := refillWorkerAccount(rCtx, a.a, a.ap.alerts, w, workerID, contract, l)
accountID, refilled, err := refillWorkerAccount(rCtx, a.a, a.ap.bus, w, workerID, contract, l)
if err == nil && refilled {
a.l.Infow("Successfully funded account",
"account", accountID,
Expand All @@ -172,7 +172,7 @@ func (a *accounts) refillWorkerAccounts(w Worker) {
// handle registering alert.
alertID := types.HashBytes(append(alertAccountRefillID[:], accountID[:]...))
if err != nil && inSet {
a.ap.alerts.Register(ctx, alerts.Alert{
rerr := a.ap.alerts.RegisterAlert(ctx, alerts.Alert{
ID: alertID,
Severity: alerts.SeverityError,
Message: fmt.Sprintf("failed to refill account: %v", err),
Expand All @@ -183,8 +183,11 @@ func (a *accounts) refillWorkerAccounts(w Worker) {
},
Timestamp: time.Now(),
})
} else {
a.ap.alerts.Dismiss(ctx, alertID)
if rerr != nil {
a.ap.logger.Errorf("failed to register alert: %v", err)
}
} else if err := a.ap.alerts.DismissAlerts(ctx, alertID); err != nil {
a.ap.logger.Errorf("failed to dismiss alert: %v", err)
}
a.markRefillDone(workerID, contract.HostKey)
cancel()
Expand All @@ -193,7 +196,7 @@ func (a *accounts) refillWorkerAccounts(w Worker) {
}
}

func refillWorkerAccount(ctx context.Context, a AccountStore, am *alerts.Manager, w Worker, workerID string, contract api.ContractMetadata, logger *zap.SugaredLogger) (accountID rhpv3.Account, refilled bool, err error) {
func refillWorkerAccount(ctx context.Context, a AccountStore, am alerts.Alerter, w Worker, workerID string, contract api.ContractMetadata, logger *zap.SugaredLogger) (accountID rhpv3.Account, refilled bool, err error) {
// add tracing
ctx, span := tracing.Tracer.Start(ctx, "refillAccount")
span.SetAttributes(attribute.Stringer("host", contract.HostKey))
Expand Down
76 changes: 33 additions & 43 deletions autopilot/autopilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

type Bus interface {
webhooks.Broadcaster
alerts.Alerter

// Accounts
Account(ctx context.Context, id rhpv3.Account, host types.PublicKey) (account api.Account, err error)
Expand Down Expand Up @@ -99,18 +100,18 @@ type Worker interface {
type Autopilot struct {
id string

alerts alerts.Alerter
bus Bus
logger *zap.SugaredLogger
workers *workerPool

mu sync.Mutex
state state

alerts *alerts.Manager
a *accounts
c *contractor
m *migrator
s *scanner
a *accounts
c *contractor
m *migrator
s *scanner

tickerDuration time.Duration
wg sync.WaitGroup
Expand Down Expand Up @@ -168,15 +169,13 @@ func (wp *workerPool) withWorkers(workerFunc func([]Worker)) {
// Handler returns an HTTP handler that serves the autopilot api.
func (ap *Autopilot) Handler() http.Handler {
return jape.Mux(tracing.TracedRoutes(api.DefaultAutopilotID, map[string]jape.Handler{
"GET /alerts": ap.handleGETAlerts,
"POST /alerts/dismiss": ap.handlePOSTAlertsDismiss,
"GET /config": ap.configHandlerGET,
"PUT /config": ap.configHandlerPUT,
"POST /debug/trigger": ap.triggerHandlerPOST,
"POST /hosts": ap.hostsHandlerPOST,
"GET /host/:hostKey": ap.hostHandlerGET,
"GET /status": ap.statusHandlerGET,
"GET /state": ap.stateHandlerGET,
"GET /config": ap.configHandlerGET,
"PUT /config": ap.configHandlerPUT,
"POST /debug/trigger": ap.triggerHandlerPOST,
"POST /hosts": ap.hostsHandlerPOST,
"GET /host/:hostKey": ap.hostHandlerGET,
"GET /status": ap.statusHandlerGET,
"GET /state": ap.stateHandlerGET,
}))
}

Expand All @@ -199,6 +198,7 @@ func (ap *Autopilot) Run() error {
var launchAccountRefillsOnce sync.Once
for {
ap.logger.Info("autopilot iteration starting")
tickerFired := make(chan struct{})
ap.workers.withWorker(func(w Worker) {
defer ap.logger.Info("autopilot iteration ended")
ctx, span := tracing.Tracer.Start(context.Background(), "Autopilot Iteration")
Expand All @@ -208,18 +208,23 @@ func (ap *Autopilot) Run() error {
ap.s.tryUpdateTimeout()
ap.s.tryPerformHostScan(ctx, w, forceScan)

// reset forceScan
forceScan = false

// block until the autopilot is configured
if !ap.blockUntilConfigured(ap.ticker.C) {
if !ap.isStopped() {
if configured, interrupted := ap.blockUntilConfigured(ap.ticker.C); !configured {
if interrupted {
close(tickerFired)
return
}
ap.logger.Error("autopilot stopped before it was able to confirm it was configured in the bus")
return
}

// block until consensus is synced
if !ap.blockUntilSynced(ap.ticker.C) {
if !ap.isStopped() {
if synced, interrupted := ap.blockUntilSynced(ap.ticker.C); !synced {
if interrupted {
close(tickerFired)
return
}
ap.logger.Error("autopilot stopped before consensus was synced")
Expand Down Expand Up @@ -287,7 +292,7 @@ func (ap *Autopilot) Run() error {
ap.logger.Info("autopilot iteration triggered")
ap.ticker.Reset(ap.tickerDuration)
case <-ap.ticker.C:
forceScan = false
case <-tickerFired:
}
}
}
Expand Down Expand Up @@ -340,7 +345,7 @@ func (ap *Autopilot) Uptime() (dur time.Duration) {
return
}

func (ap *Autopilot) blockUntilConfigured(interrupt <-chan time.Time) bool {
func (ap *Autopilot) blockUntilConfigured(interrupt <-chan time.Time) (configured, interrupted bool) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

Expand All @@ -361,18 +366,18 @@ func (ap *Autopilot) blockUntilConfigured(interrupt <-chan time.Time) bool {
if err != nil {
select {
case <-ap.stopChan:
return false
return false, false
case <-interrupt:
return false
return false, true
case <-ticker.C:
continue
}
}
return true
return true, false
}
}

func (ap *Autopilot) blockUntilSynced(interrupt <-chan time.Time) bool {
func (ap *Autopilot) blockUntilSynced(interrupt <-chan time.Time) (synced, interrupted bool) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

Expand All @@ -389,14 +394,14 @@ func (ap *Autopilot) blockUntilSynced(interrupt <-chan time.Time) bool {
if err != nil || !cs.Synced {
select {
case <-ap.stopChan:
return false
return false, false
case <-interrupt:
return false
return false, true
case <-ticker.C:
continue
}
}
return true
return true, false
}
}

Expand Down Expand Up @@ -486,21 +491,6 @@ func (ap *Autopilot) isStopped() bool {
}
}

func (ap *Autopilot) handleGETAlerts(c jape.Context) {
c.Encode(ap.alerts.Active())
}

func (ap *Autopilot) handlePOSTAlertsDismiss(jc jape.Context) {
var ids []types.Hash256
if jc.Decode(&ids) != nil {
return
} else if len(ids) == 0 {
jc.Error(errors.New("no alerts to dismiss"), http.StatusBadRequest)
return
}
ap.alerts.Dismiss(jc.Request.Context(), ids...)
}

func (ap *Autopilot) configHandlerGET(jc jape.Context) {
autopilot, err := ap.bus.Autopilot(jc.Request.Context(), ap.id)
if err != nil && strings.Contains(err.Error(), api.ErrAutopilotNotFound.Error()) {
Expand Down Expand Up @@ -553,7 +543,7 @@ func (ap *Autopilot) triggerHandlerPOST(jc jape.Context) {
// New initializes an Autopilot.
func New(id string, bus Bus, workers []Worker, logger *zap.Logger, heartbeat time.Duration, scannerScanInterval time.Duration, scannerBatchSize, scannerMinRecentFailures, scannerNumThreads uint64, migrationHealthCutoff float64, accountsRefillInterval time.Duration, revisionSubmissionBuffer, migratorParallelSlabsPerWorker uint64, revisionBroadcastInterval time.Duration) (*Autopilot, error) {
ap := &Autopilot{
alerts: alerts.NewManager(bus),
alerts: alerts.WithOrigin(bus, fmt.Sprintf("autopilot.%s", id)),
id: id,
bus: bus,
logger: logger.Sugar().Named(api.DefaultAutopilotID),
Expand Down
Loading

0 comments on commit fccf6e0

Please sign in to comment.