Skip to content

Commit

Permalink
Improve keepalive functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
buger committed Feb 26, 2018
1 parent 01e7d5e commit 176b5c4
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 59 deletions.
5 changes: 0 additions & 5 deletions api_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,9 +661,4 @@ func loadApps(specs []*APISpec, muxer *mux.Router) {
log.WithFields(logrus.Fields{
"prefix": "main",
}).Info("Initialised API Definitions")

if config.Global.SlaveOptions.UseRPC {
startRPCKeepaliveWatcher(rpcAuthStore)
startRPCKeepaliveWatcher(rpcOrgStore)
}
}
33 changes: 1 addition & 32 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -991,38 +991,6 @@ func getHostDetails() {
}
}

var KeepaliveRunning bool

func startRPCKeepaliveWatcher(engine *RPCStorageHandler) {
if KeepaliveRunning {
return
}

go func() {
log.WithFields(logrus.Fields{
"prefix": "RPC Conn Mgr",
}).Info("[RPC Conn Mgr] Starting keepalive watcher...")
for {
KeepaliveRunning = true
rpcKeepAliveCheck(engine)
if engine == nil {
log.WithFields(logrus.Fields{
"prefix": "RPC Conn Mgr",
}).Info("No engine, break")
KeepaliveRunning = false
break
}
if engine.Killed {
log.WithFields(logrus.Fields{
"prefix": "RPC Conn Mgr",
}).Debug("[RPC Conn Mgr] this connection killed")
KeepaliveRunning = false
break
}
}
}()
}

func getGlobalStorageHandler(keyPrefix string, hashKeys bool) storage.Handler {
if config.Global.SlaveOptions.UseRPC {
return &RPCStorageHandler{KeyPrefix: keyPrefix, HashKeys: hashKeys, UserKey: config.Global.SlaveOptions.APIKey, Address: config.Global.SlaveOptions.ConnectionString}
Expand Down Expand Up @@ -1217,6 +1185,7 @@ func start() {

RPCListener.Connect()
go rpcReloadLoop(config.Global.SlaveOptions.RPCKey)
go RPCListener.StartRPCKeepaliveWatcher()
go RPCListener.StartRPCLoopCheck(config.Global.SlaveOptions.RPCKey)
}

Expand Down
2 changes: 1 addition & 1 deletion rpc_analytics_purger.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (r RPCPurger) PurgeLoop(ticker <-chan time.Time) {
// PurgeCache will pull the data from the in-memory store and drop it into the specified MongoDB collection
func (r *RPCPurger) PurgeCache() {
if _, err := RPCFuncClientSingleton.Call("Ping", nil); err != nil {
log.Error("Failed to ping RPC: ", err)
log.Error("Can't purge cache, failed to ping RPC: ", err)
return
}

Expand Down
50 changes: 29 additions & 21 deletions rpc_storage_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,27 +60,6 @@ var (
GlobalRPCPingTimeout time.Duration
)

func rpcKeepAliveCheck(r *RPCStorageHandler) {
if !RPCClientIsConnected {
return
}

log.Debug("Getting keyspace check test key")
err := r.SetKey("0000", "0000", 10)
// This error message comes from RPC layer
if err != nil && err.Error() != "Write dissallowed for API Tokens" {
log.WithFields(logrus.Fields{
"prefix": "RPC Conn Mgr",
}).Warning("Handler seems to have disconnected, attempting reconnect")
} else {
if err != nil {
log.Debug("RPC Still alive")
}
}

time.Sleep(time.Second * 10)
}

// RPCStorageHandler is a storage manager that uses the redis database.
type RPCStorageHandler struct {
KeyPrefix string
Expand Down Expand Up @@ -924,6 +903,35 @@ func (r *RPCStorageHandler) StartRPCLoopCheck(orgId string) {
}
}

func (r *RPCStorageHandler) StartRPCKeepaliveWatcher() {
log.WithFields(logrus.Fields{
"prefix": "RPC Conn Mgr",
}).Info("[RPC Conn Mgr] Starting keepalive watcher...")
for {

if err := r.SetKey("0000", "0000", 10); err != nil {
if r.IsAccessError(err) {
if r.Login() {
continue
}
}

if strings.Contains(err.Error(), "Cannot obtain response during timeout") {
log.WithFields(logrus.Fields{
"prefix": "RPC Conn Mgr",
}).Info("Can't connect to RPC layer")
continue
}
}

log.WithFields(logrus.Fields{
"prefix": "RPC Conn Mgr",
}).Info("RPC is alive")

time.Sleep(10 * time.Second)
}
}

// CheckForKeyspaceChanges will poll for keysace changes
func (r *RPCStorageHandler) CheckForKeyspaceChanges(orgId string) {
log.Debug("Checking for keyspace changes...")
Expand Down

0 comments on commit 176b5c4

Please sign in to comment.