Skip to content

Commit

Permalink
TT-72 when worker node in emergency mode, dont search keys in rpc (Ty…
Browse files Browse the repository at this point in the history
…kTechnologies#3368)

* on rpc down, dont search keys in rpc

* fixed rpc tests partially

* add debug lines

* fix TestSyncAPISpecsRPCSuccess

* clean rpc tests

* better name to var

* remove debug lines

* better name to function

* enable rpc tests in ci-tests.sh

* remove ToDo

* update rpc test to check that we do not call getKeys when rpc is down

* add test function in rpc client

* checking if authmanager is RPC before the emergency check

* fixing rpc emergency mode logic in middleware

* fmting middleware

* removing cli.HTTPProfile

Co-authored-by: tbuchaillot <[email protected]>
  • Loading branch information
sredxny and tbuchaillot authored Feb 17, 2021
1 parent 8650e18 commit 17b0f15
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 9 deletions.
4 changes: 1 addition & 3 deletions bin/ci-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,5 @@ for pkg in ${PKGS}; do
done

# run rpc tests separately
# TODO: fix rpc tests and enable this
#rpc_tests='SyncAPISpecsRPC|OrgSessionWithRPCDown'
rpc_tests='OrgSessionWithRPCDown'
rpc_tests='SyncAPISpecsRPC|OrgSessionWithRPCDown'
show go test -v -timeout ${TEST_TIMEOUT} -coverprofile=test.cov github.com/TykTechnologies/tyk/gateway -p 1 -run '"'${rpc_tests}'"' || fatal "Test Failed"
5 changes: 5 additions & 0 deletions gateway/auth_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type AuthorisationHandler interface {
Init(storage.Handler)
KeyAuthorised(string) (user.SessionState, bool)
KeyExpired(*user.SessionState) bool
Store() storage.Handler
}

// SessionHandler handles all update/create/access session functions and deals exclusively with
Expand Down Expand Up @@ -81,6 +82,10 @@ func (b *DefaultAuthorisationManager) KeyAuthorised(keyName string) (user.Sessio
return newSession.Clone(), true
}

func (b *DefaultAuthorisationManager) Store() storage.Handler {
return b.store
}

// KeyExpired checks if a key has expired, if the value of user.SessionState.Expires is 0, it will be ignored
func (b *DefaultAuthorisationManager) KeyExpired(newSession *user.SessionState) bool {
if newSession.Expires >= 1 {
Expand Down
5 changes: 5 additions & 0 deletions gateway/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,11 @@ func (t BaseMiddleware) CheckSessionAndIdentityForValidKey(originalKey *string,
return session.Clone(), true
}

if _, ok := t.Spec.AuthManager.Store().(*RPCStorageHandler); ok && rpc.IsEmergencyMode() {
return session.Clone(), false
}

// Only search in RPC if it's not in emergency mode
t.Logger().Debug("Querying authstore")
// 2. If not there, get it from the AuthorizationHandler
session, found = t.Spec.AuthManager.KeyAuthorised(key)
Expand Down
28 changes: 24 additions & 4 deletions gateway/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,14 @@ func TestSyncAPISpecsRPCFailure_CheckGlobals(t *testing.T) {
return `[]`, nil
})

rpc := startRPCMock(dispatcher)
defer stopRPCMock(rpc)
rpcMock := startRPCMock(dispatcher)
defer stopRPCMock(rpcMock)

store := RPCStorageHandler{}
store.Connect()
rpc.ForceConnected(t)

// Three cases: 1 API, 2 APIs and Malformed data
exp := []int{0, 1, 2, 2}
for _, e := range exp {
DoReload()
Expand All @@ -166,6 +171,8 @@ func TestSyncAPISpecsRPCFailure_CheckGlobals(t *testing.T) {

func TestSyncAPISpecsRPCSuccess(t *testing.T) {
// Test RPC
rpc.UseSyncLoginRPC = true
var GetKeyCounter int
dispatcher := gorpc.NewDispatcher()
dispatcher.AddFunc("GetApiDefinitions", func(clientAddr string, dr *apidef.DefRequest) (string, error) {
return jsonMarshalString(BuildAPI(func(spec *APISpec) {
Expand All @@ -179,12 +186,14 @@ func TestSyncAPISpecsRPCSuccess(t *testing.T) {
return true
})
dispatcher.AddFunc("GetKey", func(clientAddr, key string) (string, error) {
GetKeyCounter++
return jsonMarshalString(CreateStandardSession()), nil
})

t.Run("RPC is live", func(t *testing.T) {
rpc := startRPCMock(dispatcher)
defer stopRPCMock(rpc)
GetKeyCounter = 0
rpcMock := startRPCMock(dispatcher)
defer stopRPCMock(rpcMock)
ts := StartTest()
defer ts.Close()

Expand All @@ -207,10 +216,15 @@ func TestSyncAPISpecsRPCSuccess(t *testing.T) {
if count != 1 {
t.Error("Should return array with one spec", apiSpecs)
}

if GetKeyCounter != 2 {
t.Error("getKey should have been called 2 times")
}
})

t.Run("RPC down, cold start, load backup", func(t *testing.T) {
// Point rpc to non existent address
GetKeyCounter = 0
globalConf := config.Global()
globalConf.SlaveOptions.ConnectionString = testHttpFailure
globalConf.SlaveOptions.UseRPC = true
Expand All @@ -227,6 +241,7 @@ func TestSyncAPISpecsRPCSuccess(t *testing.T) {
time.Sleep(100 * time.Millisecond)
DoReload()

rpc.SetEmergencyMode(t, true)
cachedAuth := map[string]string{"Authorization": "test"}
notCachedAuth := map[string]string{"Authorization": "nope1"}
// Stil works, since it knows about cached key
Expand All @@ -235,6 +250,11 @@ func TestSyncAPISpecsRPCSuccess(t *testing.T) {
{Path: "/sample", Headers: notCachedAuth, Code: 403},
}...)

// when rpc in emergency mode, then we must not
// request keys in rpc
if GetKeyCounter != 0 {
t.Error("getKey should have been called 0 times")
}
stopRPCMock(nil)
})

Expand Down
26 changes: 24 additions & 2 deletions rpc/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strconv"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/cenkalti/backoff/v4"
Expand Down Expand Up @@ -42,6 +43,9 @@ var (
rpcLoginMu sync.Mutex

rpcConnectMu sync.Mutex

// UseSyncLoginRPC for tests where we dont need to execute as a goroutine
UseSyncLoginRPC bool
)

// ErrRPCIsDown this is returned when we can't reach rpc server.
Expand Down Expand Up @@ -288,15 +292,22 @@ func Connect(connConfig Config, suppressRegister bool, dispatcherFuncs map[strin
funcClientSingleton = dispatcher.NewFuncClient(clientSingleton)
}

go Login()

handleLogin()
if !suppressRegister {
register()
go checkDisconnect()
}
return true
}

func handleLogin() {
if UseSyncLoginRPC == true {
Login()
return
}
go Login()
}

// Login tries to login to the rpc sever. Returns true if it succeeds and false
// if it fails.
func Login() bool {
Expand Down Expand Up @@ -490,3 +501,14 @@ func loadDispatcher(dispatcherFuncs map[string]interface{}) {
addedFuncs[funcName] = true
}
}

// ForceConnected only intended to be used in tests
// do not use it for any other thing
func ForceConnected(t *testing.T) {
values.clientIsConnected.Store(true)
}

// SetEmergencyMode used in tests to force emergency mode
func SetEmergencyMode(t *testing.T, value bool) {
values.SetEmergencyMode(value)
}

0 comments on commit 17b0f15

Please sign in to comment.