Skip to content

Commit

Permalink
Try to reconnect ethereum RPC
Browse files Browse the repository at this point in the history
  • Loading branch information
martinboehm committed Jun 3, 2019
1 parent 5c2b9f7 commit c409a35
Showing 1 changed file with 49 additions and 11 deletions.
60 changes: 49 additions & 11 deletions bchain/coins/eth/ethrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,8 @@ func NewEthereumRPC(config json.RawMessage, pushHandler func(bchain.Notification
if c.BlockAddressesToKeep < 100 {
c.BlockAddressesToKeep = 100
}
rc, err := rpc.Dial(c.RPCURL)
if err != nil {
return nil, err
}
ec := ethclient.NewClient(rc)

rc, ec, err := openRPC(c.RPCURL)

s := &EthereumRPC{
BaseChain: &bchain.BaseChain{},
Expand Down Expand Up @@ -133,6 +130,15 @@ func NewEthereumRPC(config json.RawMessage, pushHandler func(bchain.Notification
return s, nil
}

func openRPC(url string) (*rpc.Client, *ethclient.Client, error) {
rc, err := rpc.Dial(url)
if err != nil {
return nil, nil, err
}
ec := ethclient.NewClient(rc)
return rc, ec, nil
}

// Initialize initializes ethereum rpc interface
func (b *EthereumRPC) Initialize() error {
ctx, cancel := context.WithTimeout(context.Background(), b.timeout)
Expand Down Expand Up @@ -187,6 +193,16 @@ func (b *EthereumRPC) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOu

b.Mempool.OnNewTxAddr = onNewTxAddr

if err = b.subscribeEvents(); err != nil {
return err
}

b.mempoolInitialized = true

return nil
}

func (b *EthereumRPC) subscribeEvents() error {
if b.isETC {
glog.Info(b.ChainConfig.CoinName, " does not support subscription to newHeads")
} else {
Expand Down Expand Up @@ -224,8 +240,6 @@ func (b *EthereumRPC) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOu
return err
}

b.mempoolInitialized = true

return nil
}

Expand All @@ -246,7 +260,7 @@ func (b *EthereumRPC) subscribe(f func() (*rpc.ClientSubscription, error)) error
}
glog.Error("Subscription error ", e)
timer := time.NewTimer(time.Second * 2)
// try in 1 second interval to resubscribe
// try in 2 second interval to resubscribe
for {
select {
case e = <-s.Err():
Expand All @@ -260,7 +274,7 @@ func (b *EthereumRPC) subscribe(f func() (*rpc.ClientSubscription, error)) error
s = ns
continue Loop
}
glog.Error("Resubscribe error ", e)
glog.Error("Resubscribe error ", err)
timer.Reset(time.Second * 2)
}
}
Expand All @@ -269,8 +283,7 @@ func (b *EthereumRPC) subscribe(f func() (*rpc.ClientSubscription, error)) error
return nil
}

// Shutdown cleans up rpc interface to ethereum
func (b *EthereumRPC) Shutdown(ctx context.Context) error {
func (b *EthereumRPC) closeRPC() {
if b.newBlockSubscription != nil {
b.newBlockSubscription.Unsubscribe()
}
Expand All @@ -280,6 +293,23 @@ func (b *EthereumRPC) Shutdown(ctx context.Context) error {
if b.rpc != nil {
b.rpc.Close()
}
}

func (b *EthereumRPC) reconnectRPC() error {
glog.Info("Reconnecting RPC")
b.closeRPC()
rc, ec, err := openRPC(b.ChainConfig.RPCURL)
if err != nil {
return err
}
b.rpc = rc
b.client = ec
return b.subscribeEvents()
}

// Shutdown cleans up rpc interface to ethereum
func (b *EthereumRPC) Shutdown(ctx context.Context) error {
b.closeRPC()
close(b.chanNewBlock)
glog.Info("rpc: shutdown")
return nil
Expand Down Expand Up @@ -339,6 +369,14 @@ func (b *EthereumRPC) getBestHeader() (*ethtypes.Header, error) {
b.bestHeader = nil
}
}
// if the best header was not updated for 15 minutes, there could be a subscription problem, reconnect RPC
if b.bestHeaderTime.Add(15 * time.Minute).Before(time.Now()) {
err := b.reconnectRPC()
if err != nil {
return nil, err
}
b.bestHeader = nil
}
if b.bestHeader == nil {
var err error
ctx, cancel := context.WithTimeout(context.Background(), b.timeout)
Expand Down

0 comments on commit c409a35

Please sign in to comment.