Skip to content

Commit

Permalink
Improve async session updates (TykTechnologies#1757)
Browse files Browse the repository at this point in the history
Fixes TykTechnologies#1758 

Current solution for session updates when async is enabled is just to spawn a new goroutine per redis operation. 

This change allows setting a configurable pool of workers to handle the session updates - with the slower parts of the session update such as json marshalling moved into the pool. Default value if unset is currently 15 and worker pool will gracefully exit on gateway shutdown.

Also changed the dummySessionManager in policy_test.go pass by ref to stop go vet failing because of passed mutex.
  • Loading branch information
joshblakeley authored and buger committed Aug 10, 2018
1 parent 0a6d68e commit 54fb5fa
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 13 deletions.
5 changes: 5 additions & 0 deletions api_definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,11 @@ func (a *APISpec) Init(authStore, sessionStore, healthStore, orgStore storage.Ha
a.OrgSessionManager.Init(orgStore)
}

func (a *APISpec) StopSessionManagerPool() {
a.SessionManager.Stop()
a.OrgSessionManager.Stop()
}

func (a *APISpec) getURLStatus(stat URLStatus) RequestStatus {
switch stat {
case Ignored:
Expand Down
116 changes: 105 additions & 11 deletions auth_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"encoding/base64"
"encoding/json"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/satori/go.uuid"
Expand Down Expand Up @@ -35,17 +37,35 @@ type SessionHandler interface {
Sessions(filter string) []string
Store() storage.Handler
ResetQuota(string, *user.SessionState)
Stop()
}

const sessionPoolDefaultSize = 50
const sessionBufferDefaultSize = 1000

// DefaultAuthorisationManager implements AuthorisationHandler,
// requires a storage.Handler to interact with key store
type DefaultAuthorisationManager struct {
store storage.Handler
}

type DefaultSessionManager struct {
store storage.Handler
asyncWrites bool
store storage.Handler
asyncWrites bool
disableCacheSessionState bool
updateChan chan *SessionUpdate
poolSize int
shouldStop uint32
poolWG sync.WaitGroup
bufferSize int
keyPrefix string
}

type SessionUpdate struct {
isHashed bool
keyVal string
session *user.SessionState
ttl int64
}

func (b *DefaultAuthorisationManager) Init(store storage.Handler) {
Expand Down Expand Up @@ -86,6 +106,71 @@ func (b *DefaultSessionManager) Init(store storage.Handler) {
b.asyncWrites = config.Global().UseAsyncSessionWrite
b.store = store
b.store.Connect()

if b.asyncWrites {
// check pool size in config and set to 50 if unset
b.poolSize = config.Global().SessionUpdatePoolSize
if b.poolSize <= 0 {
b.poolSize = sessionPoolDefaultSize
}
//check size for channel buffer and set to 1000 if unset
b.bufferSize = config.Global().SessionUpdateBufferSize
if b.bufferSize <= 0 {
b.bufferSize = sessionBufferDefaultSize
}

log.WithField("SessionManager poolsize", b.poolSize).Debug("Session update async pool size")

b.updateChan = make(chan *SessionUpdate, b.bufferSize)

b.keyPrefix = b.store.GetKeyPrefix()

//start worker pool
atomic.SwapUint32(&b.shouldStop, 0)
for i := 0; i < b.poolSize; i++ {
b.poolWG.Add(1)
go b.updateWorker()
}
}
}

func (b *DefaultSessionManager) updateWorker() {
defer b.poolWG.Done()

for u := range b.updateChan {

v, err := json.Marshal(u.session)
if err != nil {
log.WithError(err).Error("Error marshalling session for async session update")
continue
}

if u.isHashed {
u.keyVal = b.keyPrefix + u.keyVal
err := b.store.SetRawKey(u.keyVal, string(v), u.ttl)
if err != nil {
log.WithError(err).Error("Error updating hashed key")
}
continue

}

err = b.store.SetKey(u.keyVal, string(v), u.ttl)
if err != nil {
log.WithError(err).Error("Error updating key")
}
}
}

func (b *DefaultSessionManager) Stop() {
if atomic.LoadUint32(&b.shouldStop) == 0 {
// flag to stop adding data to chan
atomic.SwapUint32(&b.shouldStop, 1)
// close update channel
close(b.updateChan)
// wait for workers to finish
b.poolWG.Wait()
}
}

func (b *DefaultSessionManager) Store() storage.Handler {
Expand All @@ -111,26 +196,35 @@ func (b *DefaultSessionManager) ResetQuota(keyName string, session *user.Session
// UpdateSession updates the session state in the storage engine
func (b *DefaultSessionManager) UpdateSession(keyName string, session *user.SessionState,
resetTTLTo int64, hashed bool) error {
v, _ := json.Marshal(session)

if hashed {
keyName = b.store.GetKeyPrefix() + keyName
}

// async update and return if needed
if b.asyncWrites {
if hashed {
go b.store.SetRawKey(keyName, string(v), resetTTLTo)
if atomic.LoadUint32(&b.shouldStop) > 0 {
return nil
}

go b.store.SetKey(keyName, string(v), resetTTLTo)
sessionUpdate := &SessionUpdate{
isHashed: hashed,
keyVal: keyName,
session: session,
ttl: resetTTLTo,
}

// send sessionupdate object through channel to pool
b.updateChan <- sessionUpdate

return nil
}

v, err := json.Marshal(session)
if err != nil {
log.Error("Error marshalling session for sync update")
return err
}

// sync update
var err error
if hashed {
keyName = b.store.GetKeyPrefix() + keyName
err = b.store.SetRawKey(keyName, string(v), resetTTLTo)
} else {
err = b.store.SetKey(keyName, string(v), resetTTLTo)
Expand Down
6 changes: 6 additions & 0 deletions cli/lint/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,12 @@ const confSchema = `{
"optimisations_use_async_session_write": {
"type": "boolean"
},
"session_update_pool_size":{
"type": "integer"
},
"session_update_buffer_size":{
"type": "integer"
},
"pid_file_location": {
"type": "string"
},
Expand Down
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ type Config struct {
AnalyticsConfig AnalyticsConfigConfig `json:"analytics_config"`
HealthCheck HealthCheckConfig `json:"health_check"`
UseAsyncSessionWrite bool `json:"optimisations_use_async_session_write"`
SessionUpdatePoolSize int `json:"session_update_pool_size"`
SessionUpdateBufferSize int `json:"session_update_buffer_size"`
AllowMasterKeys bool `json:"allow_master_keys"`
HashKeys bool `json:"hash_keys"`
HashKeyFunction string `json:"hash_key_function"`
Expand Down
9 changes: 9 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -1004,6 +1004,15 @@ func main() {
analytics.Stop()
}

// if using async session writes stop workers
if config.Global().UseAsyncSessionWrite {
DefaultOrgStore.Stop()
for i := range apiSpecs {
apiSpecs[i].StopSessionManagerPool()
}

}

// write pprof profiles
writeProfiles()

Expand Down
2 changes: 1 addition & 1 deletion policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type dummySessionManager struct {
DefaultSessionManager
}

func (dummySessionManager) UpdateSession(key string, sess *user.SessionState, ttl int64, hashed bool) error {
func (*dummySessionManager) UpdateSession(key string, sess *user.SessionState, ttl int64, hashed bool) error {
return nil
}

Expand Down
3 changes: 2 additions & 1 deletion session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ func (l *SessionLimiter) doRollingWindowWrite(key, rateLimiterKey, rateLimiterSe
// and another subtraction because of the preemptive limit
subtractor = 2
}

// The test TestRateLimitForAPIAndRateLimitAndQuotaCheck
// will only work with ththese two lines here
//log.Info("break: ", (int(currentSession.Rate) - subtractor))
if ratePerPeriodNow > int(currentSession.Rate)-subtractor {
// Set a sentinel value with expire
Expand Down

0 comments on commit 54fb5fa

Please sign in to comment.