Skip to content

Commit

Permalink
Refactor to improve variable naming and code clarity
Browse files Browse the repository at this point in the history
  • Loading branch information
joshuacolvin0 authored and hkalodner committed Jul 22, 2022
1 parent 6f6d121 commit 3d4353e
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 28 deletions.
4 changes: 2 additions & 2 deletions packages/arb-rpc-node/cmd/arb-node/arb-node.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func startup() error {
}

// Message count is 1 based, seqNum is 0 based, so next seqNum to request is same as current message count
seqNumToRequest, err := mon.Core.GetMessageCount()
currentMessageCount, err := mon.Core.GetMessageCount()
if err != nil {
return errors.Wrap(err, "can't get message count")
}
Expand All @@ -294,7 +294,7 @@ func startup() error {
broadcastClient := broadcastclient.NewBroadcastClient(
url,
config.Node.ChainID,
seqNumToRequest,
currentMessageCount,
config.Feed.Input.Timeout,
broadcastClientErrChan,
)
Expand Down
42 changes: 20 additions & 22 deletions packages/arb-util/broadcastclient/broadcastclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ import (
type BroadcastClient struct {
websocketUrl string

// sequence number of the previous message
lastInboxSeqNum *big.Int
mostRecentSeqNum *big.Int

chainId uint64

Expand All @@ -64,25 +63,25 @@ var logger = arblog.Logger.With().Str("component", "broadcaster").Logger()
func NewBroadcastClient(
websocketUrl string,
chainId uint64,
requestedInboxSeqNum *big.Int,
currentMessageCount *big.Int,
idleTimeout time.Duration,
broadcastClientErrChan chan error,
) *BroadcastClient {
var lastSeqNum *big.Int
if requestedInboxSeqNum == nil || requestedInboxSeqNum.Cmp(big.NewInt(0)) <= 0 {
lastSeqNum = big.NewInt(0)
var mostRecentSeqNum *big.Int
if currentMessageCount == nil || currentMessageCount.Cmp(big.NewInt(0)) <= 0 {
mostRecentSeqNum = nil
} else {
lastSeqNum = new(big.Int).Sub(requestedInboxSeqNum, big.NewInt(1))
mostRecentSeqNum = new(big.Int).Sub(currentMessageCount, big.NewInt(1))
}

return &BroadcastClient{
websocketUrl: websocketUrl,
chainId: chainId,
lastInboxSeqNum: lastSeqNum,
connMutex: &sync.Mutex{},
errChan: broadcastClientErrChan,
retryMutex: &sync.Mutex{},
idleTimeout: idleTimeout,
websocketUrl: websocketUrl,
chainId: chainId,
mostRecentSeqNum: mostRecentSeqNum,
connMutex: &sync.Mutex{},
errChan: broadcastClientErrChan,
retryMutex: &sync.Mutex{},
idleTimeout: idleTimeout,
}
}

Expand All @@ -97,7 +96,7 @@ func (bc *BroadcastClient) Connect(ctx context.Context) (chan broadcaster.Broadc
}

func (bc *BroadcastClient) ConnectWithChannel(ctx context.Context, messageReceiver chan broadcaster.BroadcastFeedMessage) error {
earlyFrameData, _, err := bc.connect(ctx, messageReceiver, bc.lastInboxSeqNum)
earlyFrameData, _, err := bc.connect(ctx, messageReceiver, bc.mostRecentSeqNum)
if err != nil {
return err
}
Expand Down Expand Up @@ -128,19 +127,18 @@ func (bc *BroadcastClient) ConnectInBackground(ctx context.Context, messageRecei
var ErrIncorrectFeedServerVersion = errors.New("incorrect feed server version")
var ErrIncorrectChainId = errors.New("incorrect chain id")

func (bc *BroadcastClient) connect(ctx context.Context, messageReceiver chan broadcaster.BroadcastFeedMessage, previousSequenceNumber *big.Int) (io.Reader, chan broadcaster.BroadcastFeedMessage, error) {
func (bc *BroadcastClient) connect(ctx context.Context, messageReceiver chan broadcaster.BroadcastFeedMessage, mostRecentSequenceNumber *big.Int) (io.Reader, chan broadcaster.BroadcastFeedMessage, error) {

if len(bc.websocketUrl) == 0 {
// Nothing to do
return nil, nil, nil
}

var requestedSequenceNumber string
if previousSequenceNumber.Cmp(big.NewInt(0)) > 0 {
// previousSequenceNumber is 1 before current, and we want 1 after current, so add 2.
requestedSequenceNumber = new(big.Int).Add(previousSequenceNumber, big.NewInt(2)).String()
} else {
if mostRecentSequenceNumber == nil {
requestedSequenceNumber = "0"
} else {
requestedSequenceNumber = new(big.Int).Add(mostRecentSequenceNumber, big.NewInt(1)).String()
}
header := ws.HandshakeHeaderHTTP(http.Header{
wsbroadcastserver.HTTPHeaderFeedClientVersion: []string{strconv.Itoa(wsbroadcastserver.FeedClientVersion)},
Expand Down Expand Up @@ -261,7 +259,7 @@ func (bc *BroadcastClient) startBackgroundReader(ctx context.Context, messageRec
currentLastSeqNum = message.FeedItem.BatchItem.LastSeqNum
messageReceiver <- *message
}
bc.lastInboxSeqNum = new(big.Int).Add(currentLastSeqNum, big.NewInt(1))
bc.mostRecentSeqNum = new(big.Int).Add(currentLastSeqNum, big.NewInt(1))

if res.ConfirmedAccumulator.IsConfirmed && bc.ConfirmedAccumulatorListener != nil {
bc.ConfirmedAccumulatorListener <- res.ConfirmedAccumulator.Accumulator
Expand Down Expand Up @@ -300,7 +298,7 @@ func (bc *BroadcastClient) RetryConnect(ctx context.Context, messageReceiver cha
}

bc.retryCount++
earlyFrameData, _, err := bc.connect(ctx, messageReceiver, bc.lastInboxSeqNum)
earlyFrameData, _, err := bc.connect(ctx, messageReceiver, bc.mostRecentSeqNum)
if err == nil {
bc.retrying = false
return earlyFrameData
Expand Down
9 changes: 5 additions & 4 deletions packages/arb-util/broadcastclient/broadcastclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,9 @@ func TestBadBroadcasterSendsCachedMessagesOnClientConnect(t *testing.T) {

func connectAndGetCachedMessages(ctx context.Context, t *testing.T, clientIndex int, wg *sync.WaitGroup, chainId uint64, connectShouldFail bool) error {
broadcastClientErrChan := make(chan error)
requestedSeqNum := big.NewInt(42)
broadcastClient := NewBroadcastClient("ws://127.0.0.1:9842/", chainId, requestedSeqNum, 60*time.Second, broadcastClientErrChan)
startingMessageCount := big.NewInt(43)
expectedNewLastSeqNum := new(big.Int).Sub(startingMessageCount, big.NewInt(1))
broadcastClient := NewBroadcastClient("ws://127.0.0.1:9842/", chainId, startingMessageCount, 60*time.Second, broadcastClientErrChan)
testClient, err := broadcastClient.Connect(ctx)
if err != nil {
if !connectShouldFail {
Expand All @@ -331,8 +332,8 @@ func connectAndGetCachedMessages(ctx context.Context, t *testing.T, clientIndex
case err := <-broadcastClientErrChan:
t.Errorf("broadcast client error: %s", err.Error())
case receivedMsg := <-testClient:
if receivedMsg.FeedItem.BatchItem.LastSeqNum.Cmp(requestedSeqNum) != 0 {
t.Errorf("expected seqnum %d but got %d instead", requestedSeqNum, receivedMsg.FeedItem.BatchItem.LastSeqNum)
if receivedMsg.FeedItem.BatchItem.LastSeqNum.Cmp(expectedNewLastSeqNum) != 0 {
t.Errorf("expected seqnum %d but got %d instead", expectedNewLastSeqNum, receivedMsg.FeedItem.BatchItem.LastSeqNum)
}
t.Logf("client %d received first message: (%v) %v\n", clientIndex, receivedMsg.FeedItem.BatchItem.LastSeqNum, receivedMsg.FeedItem.BatchItem.SequencerMessage)
case <-time.After(10 * time.Second):
Expand Down

0 comments on commit 3d4353e

Please sign in to comment.