Skip to content

Commit

Permalink
internal: add event broadcaster to subscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
peterjan committed Jun 4, 2024
1 parent 0c004f1 commit 0e19777
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 7 deletions.
2 changes: 1 addition & 1 deletion alerts/alerts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ var _ webhooks.WebhookStore = (*testWebhookStore)(nil)

func TestWebhooks(t *testing.T) {
store := &testWebhookStore{}
mgr, err := webhooks.NewManager(zap.NewNop().Sugar(), store)
mgr, err := webhooks.NewManager(store, zap.NewNop())
if err != nil {
t.Fatal(err)
}
Expand Down
3 changes: 2 additions & 1 deletion chain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ var ForEachHostAnnouncement = chain.ForEachHostAnnouncement

type (
ChainManager interface {
Tip() types.ChainIndex
Block(id types.BlockID) (types.Block, bool)
OnReorg(fn func(types.ChainIndex)) (cancel func())
Tip() types.ChainIndex
UpdatesSince(index types.ChainIndex, max int) (rus []chain.RevertUpdate, aus []chain.ApplyUpdate, err error)
}

Expand Down
31 changes: 31 additions & 0 deletions internal/bus/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package bus

import (
"context"
"time"

"go.sia.tech/renterd/webhooks"
"go.uber.org/zap"
)

type (
EventBroadcaster struct {
broadcaster webhooks.Broadcaster
logger *zap.SugaredLogger
}
)

func NewEventBroadcaster(b webhooks.Broadcaster, l *zap.Logger) EventBroadcaster {
return EventBroadcaster{
broadcaster: b,
logger: l.Named("events").Sugar(),
}
}

func (b EventBroadcaster) BroadcastEvent(e webhooks.WebhookEvent) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
if err := b.broadcaster.BroadcastAction(ctx, e.Event()); err != nil {
b.logger.Errorw("failed to broadcast event", "event", e, "error", err)
}
cancel()
}
7 changes: 6 additions & 1 deletion internal/node/chainsubscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"go.sia.tech/coreutils/wallet"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/chain"
"go.sia.tech/renterd/internal/bus"
"go.sia.tech/renterd/internal/utils"
"go.uber.org/zap"
)
Expand All @@ -29,6 +30,7 @@ type (
ChainSubscriber struct {
cm chain.ChainManager
cs chain.ChainStore
events bus.EventBroadcaster
logger *zap.SugaredLogger

announcementMaxAge time.Duration
Expand Down Expand Up @@ -66,7 +68,7 @@ type (
// NewChainSubscriber creates a new chain subscriber that will sync with the
// given chain manager and chain store. The returned subscriber is already
// running and can be shut down by calling the Close method.
func NewChainSubscriber(cm chain.ChainManager, cs chain.ChainStore, walletAddress types.Address, announcementMaxAge time.Duration, logger *zap.Logger) (_ *ChainSubscriber, err error) {
func NewChainSubscriber(cm chain.ChainManager, cs chain.ChainStore, events bus.EventBroadcaster, walletAddress types.Address, announcementMaxAge time.Duration, logger *zap.Logger) (_ *ChainSubscriber, err error) {
if announcementMaxAge == 0 {
return nil, errors.New("announcementMaxAge must be non-zero")
}
Expand All @@ -76,6 +78,7 @@ func NewChainSubscriber(cm chain.ChainManager, cs chain.ChainStore, walletAddres
subscriber := &ChainSubscriber{
cm: cm,
cs: cs,
events: events,
logger: logger.Sugar(),

announcementMaxAge: announcementMaxAge,
Expand Down Expand Up @@ -246,6 +249,8 @@ func (s *ChainSubscriber) sync() error {
}
s.logger.Debugw("processed updates successfully", "new_height", index.Height, "new_block_id", index.ID, "ms", time.Since(istart).Milliseconds())
cnt++

// TODO: broadcast consensus update
}

s.logger.Debugw("sync completed", "start_height", index.Height, "block_id", index.ID, "ms", time.Since(start).Milliseconds(), "iterations", cnt)
Expand Down
8 changes: 6 additions & 2 deletions internal/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"go.sia.tech/renterd/bus"
"go.sia.tech/renterd/chain"
"go.sia.tech/renterd/config"
ibus "go.sia.tech/renterd/internal/bus"
"go.sia.tech/renterd/stores"
"go.sia.tech/renterd/webhooks"
"go.sia.tech/renterd/worker"
Expand Down Expand Up @@ -133,14 +134,17 @@ func NewBus(cfg BusConfig, dir string, seed types.PrivateKey, logger *zap.Logger
}

// create webhooks manager
wh, err := webhooks.NewManager(logger.Named("webhooks").Sugar(), sqlStore)
wh, err := webhooks.NewManager(sqlStore, logger)
if err != nil {
return nil, nil, nil, nil, err
}

// hookup webhooks <-> alerts
alertsMgr.RegisterWebhookBroadcaster(wh)

// create event broadcaster
events := ibus.NewEventBroadcaster(wh, logger)

// create chain manager
store, state, err := chain.NewDBStore(bdb, cfg.Network, cfg.Genesis)
if err != nil {
Expand All @@ -149,7 +153,7 @@ func NewBus(cfg BusConfig, dir string, seed types.PrivateKey, logger *zap.Logger
cm := chain.NewManager(store, state)

// create chain subscriber
cs, err := NewChainSubscriber(cm, sqlStore, types.StandardUnlockHash(seed.PublicKey()), time.Duration(cfg.AnnouncementMaxAgeHours)*time.Hour, logger.Named("chainsubscriber"))
cs, err := NewChainSubscriber(cm, sqlStore, events, types.StandardUnlockHash(seed.PublicKey()), time.Duration(cfg.AnnouncementMaxAgeHours)*time.Hour, logger.Named("chainsubscriber"))
if err != nil {
return nil, nil, nil, nil, err
}
Expand Down
8 changes: 6 additions & 2 deletions webhooks/webhooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ type (
Event string `json:"event"`
Payload interface{} `json:"payload,omitempty"`
}

WebhookEvent interface {
Event() Event
}
)

type Manager struct {
Expand Down Expand Up @@ -213,10 +217,10 @@ func (w Webhook) String() string {
return fmt.Sprintf("%v.%v.%v", w.URL, w.Module, w.Event)
}

func NewManager(logger *zap.SugaredLogger, store WebhookStore) (*Manager, error) {
func NewManager(store WebhookStore, logger *zap.Logger) (*Manager, error) {
shutdownCtx, shutdownCtxCancel := context.WithCancel(context.Background())
m := &Manager{
logger: logger.Named("webhooks"),
logger: logger.Named("webhooks").Sugar(),
store: store,

shutdownCtx: shutdownCtx,
Expand Down

0 comments on commit 0e19777

Please sign in to comment.