Skip to content

Commit

Permalink
Add relay monitor URLs (flashbots#296)
Browse files Browse the repository at this point in the history
* Add relay monitor URLs

* logging fix

* remove channel
  • Loading branch information
avalonche authored Sep 7, 2022
1 parent 5c0d54d commit 6f3cfda
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 21 deletions.
24 changes: 21 additions & 3 deletions cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cli
import (
"flag"
"fmt"
"net/url"
"os"
"strconv"
"strings"
Expand Down Expand Up @@ -39,9 +40,10 @@ var (
logJSON = flag.Bool("json", defaultLogJSON, "log in JSON format instead of text")
logLevel = flag.String("loglevel", defaultLogLevel, "minimum loglevel: trace, debug, info, warn/warning, error, fatal, panic")

listenAddr = flag.String("addr", defaultListenAddr, "listen-address for mev-boost server")
relayURLs = flag.String("relays", "", "relay urls - single entry or comma-separated list (scheme://pubkey@host)")
relayCheck = flag.Bool("relay-check", defaultRelayCheck, "check relay status on startup and on the status API call")
listenAddr = flag.String("addr", defaultListenAddr, "listen-address for mev-boost server")
relayURLs = flag.String("relays", "", "relay urls - single entry or comma-separated list (scheme://pubkey@host)")
relayCheck = flag.Bool("relay-check", defaultRelayCheck, "check relay status on startup and on the status API call")
relayMonitorURLs = flag.String("relay-monitors", "", "relay monitor urls - single entry or comma-separated list (scheme://host)")

relayTimeoutMsGetHeader = flag.Int("request-timeout-getheader", defaultTimeoutMsGetHeader, "timeout for getHeader requests to the relay [ms]")
relayTimeoutMsGetPayload = flag.Int("request-timeout-getpayload", defaultTimeoutMsGetPayload, "timeout for getPayload requests to the relay [ms]")
Expand Down Expand Up @@ -114,10 +116,14 @@ func Main() {
}
log.WithField("relays", relays).Infof("using %d relays", len(relays))

relayMonitors := parseRelayMonitorURLs(*relayMonitorURLs)
log.WithField("relay-monitors", relayMonitors).Infof("using %d relay monitors", len(relayMonitors))

opts := server.BoostServiceOpts{
Log: log,
ListenAddr: *listenAddr,
Relays: relays,
RelayMonitors: relayMonitors,
GenesisForkVersionHex: genesisForkVersionHex,
RelayCheck: *relayCheck,
RequestTimeoutGetHeader: time.Duration(*relayTimeoutMsGetHeader) * time.Millisecond,
Expand Down Expand Up @@ -165,3 +171,15 @@ func parseRelayURLs(relayURLs string) []server.RelayEntry {
}
return ret
}

func parseRelayMonitorURLs(relayMonitorURLs string) []*url.URL {
ret := []*url.URL{}
for _, entry := range strings.Split(relayMonitorURLs, ",") {
relay, err := url.Parse(entry)
if err != nil {
log.WithError(err).WithField("relayMonitorURL", entry).Fatal("Invalid relay monitor URL")
}
ret = append(ret, relay)
}
return ret
}
7 changes: 2 additions & 5 deletions server/relay_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,9 @@ func (r *RelayEntry) String() string {
return r.URL.String()
}

// GetURI returns the full request URI with scheme, host, path and args.
// GetURI returns the full request URI with scheme, host, path and args for the relay.
func (r *RelayEntry) GetURI(path string) string {
u2 := *r.URL
u2.User = nil
u2.Path = path
return u2.String()
return GetURI(r.URL, path)
}

// NewRelayEntry creates a new instance based on an input string
Expand Down
45 changes: 35 additions & 10 deletions server/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -41,6 +42,7 @@ type BoostServiceOpts struct {
Log *logrus.Entry
ListenAddr string
Relays []RelayEntry
RelayMonitors []*url.URL
GenesisForkVersionHex string
RelayCheck bool

Expand All @@ -51,11 +53,12 @@ type BoostServiceOpts struct {

// BoostService - the mev-boost service
type BoostService struct {
listenAddr string
relays []RelayEntry
log *logrus.Entry
srv *http.Server
relayCheck bool
listenAddr string
relays []RelayEntry
relayMonitors []*url.URL
log *logrus.Entry
srv *http.Server
relayCheck bool

builderSigningDomain types.Domain
httpClientGetHeader http.Client
Expand All @@ -78,11 +81,12 @@ func NewBoostService(opts BoostServiceOpts) (*BoostService, error) {
}

return &BoostService{
listenAddr: opts.ListenAddr,
relays: opts.Relays,
log: opts.Log.WithField("module", "service"),
relayCheck: opts.RelayCheck,
bids: make(map[bidRespKey]bidResp),
listenAddr: opts.ListenAddr,
relays: opts.Relays,
relayMonitors: opts.RelayMonitors,
log: opts.Log.WithField("module", "service"),
relayCheck: opts.RelayCheck,
bids: make(map[bidRespKey]bidResp),

builderSigningDomain: builderSigningDomain,
httpClientGetHeader: http.Client{
Expand Down Expand Up @@ -173,6 +177,25 @@ func (m *BoostService) startBidCacheCleanupTask() {
}
}

func (m *BoostService) sendValidatorRegistrationsToRelayMonitors(payload []types.SignedValidatorRegistration) {
log := m.log.WithField("method", "sendValidatorRegistrationsToRelayMonitors")
for {
log = log.WithField("numRegistrations", len(payload))
for _, relayMonitor := range m.relayMonitors {
go func(relayMonitor *url.URL) {
url := GetURI(relayMonitor, pathRegisterValidator)
log := m.log.WithField("url", url)
_, err := SendHTTPRequest(context.Background(), m.httpClientRegVal, http.MethodPost, url, UserAgent(""), payload, nil)
if err != nil {
log.WithError(err).Warn("error calling registerValidator on relay monitor")
return
}
log.Debug("sent validator registrations to relay monitor")
}(relayMonitor)
}
}
}

func (m *BoostService) handleRoot(w http.ResponseWriter, req *http.Request) {
m.respondOK(w, nilResponse)
}
Expand Down Expand Up @@ -256,6 +279,8 @@ func (m *BoostService) handleRegisterValidator(w http.ResponseWriter, req *http.
}(relay)
}

go m.sendValidatorRegistrationsToRelayMonitors(payload)

for i := 0; i < len(m.relays); i++ {
respErr := <-relayRespCh
if respErr == nil {
Expand Down
6 changes: 3 additions & 3 deletions server/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (be *testBackend) request(t *testing.T, method string, path string, payload

func TestNewBoostServiceErrors(t *testing.T) {
t.Run("errors when no relays", func(t *testing.T) {
_, err := NewBoostService(BoostServiceOpts{testLog, ":123", []RelayEntry{}, "0x00000000", true, time.Second, time.Second, time.Second})
_, err := NewBoostService(BoostServiceOpts{testLog, ":123", []RelayEntry{}, []*url.URL{}, "0x00000000", true, time.Second, time.Second, time.Second})
require.Error(t, err)
})
}
Expand Down Expand Up @@ -221,12 +221,12 @@ func TestRegisterValidator(t *testing.T) {
})

t.Run("mev-boost relay timeout works with slow relay", func(t *testing.T) {
backend := newTestBackend(t, 1, 5*time.Millisecond) // 10ms max
backend := newTestBackend(t, 1, 100*time.Millisecond) // 10ms max
rr := backend.request(t, http.MethodPost, path, payload)
require.Equal(t, http.StatusOK, rr.Code)

// Now make the relay return slowly, mev-boost should return an error
backend.relays[0].ResponseDelay = 10 * time.Millisecond
backend.relays[0].ResponseDelay = 150 * time.Millisecond
rr = backend.request(t, http.MethodPost, path, payload)
require.Equal(t, `{"code":502,"message":"no successful relay response"}`+"\n", rr.Body.String())
require.Equal(t, http.StatusBadGateway, rr.Code)
Expand Down
9 changes: 9 additions & 0 deletions server/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"

Expand Down Expand Up @@ -100,6 +101,14 @@ func DecodeJSON(r io.Reader, dst any) error {
return nil
}

// GetURI returns the full request URI with scheme, host, path and args.
func GetURI(url *url.URL, path string) string {
u2 := *url
u2.User = nil
u2.Path = path
return u2.String()
}

// bidResp are entries in the bids cache
type bidResp struct {
t time.Time
Expand Down

0 comments on commit 6f3cfda

Please sign in to comment.