Skip to content

Commit

Permalink
all: reorganize code a bit
Browse files Browse the repository at this point in the history
* Move BaseMiddleware to middleware.go
* Merge util_http_helpers into reverse_proxy (the file got small, and
  it's mostly used and fitting there)
* Merge redis_notifier_outbound into redis_signals, now the file holds
  both outgoing and incoming redis notifications

Updates TykTechnologies#334.
  • Loading branch information
mvdan authored and buger committed Sep 15, 2017
1 parent 57ba65f commit f4c2cd2
Show file tree
Hide file tree
Showing 14 changed files with 359 additions and 380 deletions.
2 changes: 1 addition & 1 deletion handler_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (e *ErrorHandler) HandleError(w http.ResponseWriter, r *http.Request, errMs

rawRequest := ""
rawResponse := ""
if RecordDetail(r) {
if recordDetail(r) {
requestCopy := copyRequest(r)
// Get the wire format representation
var wireFormatReq bytes.Buffer
Expand Down
195 changes: 23 additions & 172 deletions handler_success.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"strings"
"time"

"github.com/pmylund/go-cache"
cache "github.com/pmylund/go-cache"
)

// Enums for keys to be stored in a session context - this is how gorilla expects
Expand All @@ -36,172 +36,6 @@ type ReturningHttpHandler interface {
CopyResponse(io.Writer, io.Reader)
}

// BaseMiddleware wraps up the ApiSpec and Proxy objects to be included in a
// middleware handler, this can probably be handled better.
type BaseMiddleware struct {
Spec *APISpec
Proxy ReturningHttpHandler
}

func (t *BaseMiddleware) Base() *BaseMiddleware { return t }

func (t *BaseMiddleware) Init() {}
func (t *BaseMiddleware) IsEnabledForSpec() bool {
return true
}
func (t *BaseMiddleware) Config() (interface{}, error) {
return nil, nil
}

func (t *BaseMiddleware) OrgSession(key string) (SessionState, bool) {
// Try and get the session from the session store
session, found := t.Spec.OrgSessionManager.SessionDetail(key)
if found && globalConf.EnforceOrgDataAge {
// If exists, assume it has been authorized and pass on
// We cache org expiry data
log.Debug("Setting data expiry: ", session.OrgID)
go t.SetOrgExpiry(session.OrgID, session.DataExpires)
}
return session, found
}

func (t *BaseMiddleware) SetOrgExpiry(orgid string, expiry int64) {
ExpiryCache.Set(orgid, expiry, cache.DefaultExpiration)
}

func (t *BaseMiddleware) OrgSessionExpiry(orgid string) int64 {
log.Debug("Checking: ", orgid)
cachedVal, found := ExpiryCache.Get(orgid)
if !found {
go t.OrgSession(orgid)
log.Debug("no cached entry found, returning 7 days")
return 604800
}

return cachedVal.(int64)
}

// ApplyPolicyIfExists will check if a policy is loaded, if it is, it will overwrite the session state to use the policy values
func (t *BaseMiddleware) ApplyPolicyIfExists(key string, session *SessionState) {
if session.ApplyPolicyID == "" {
return
}
policiesMu.RLock()
policy, ok := policiesByID[session.ApplyPolicyID]
policiesMu.RUnlock()
if !ok {
return
}
// Check ownership, policy org owner must be the same as API,
// otherwise youcould overwrite a session key with a policy from a different org!
if policy.OrgID != t.Spec.OrgID {
log.Error("Attempting to apply policy from different organisation to key, skipping")
return
}

if policy.Partitions.Quota || policy.Partitions.RateLimit || policy.Partitions.Acl {
// This is a partitioned policy, only apply what is active
if policy.Partitions.Quota {
// Quotas
session.QuotaMax = policy.QuotaMax
session.QuotaRenewalRate = policy.QuotaRenewalRate
}

if policy.Partitions.RateLimit {
// Rate limting
session.Allowance = policy.Rate // This is a legacy thing, merely to make sure output is consistent. Needs to be purged
session.Rate = policy.Rate
session.Per = policy.Per
if policy.LastUpdated != "" {
session.LastUpdated = policy.LastUpdated
}
}

if policy.Partitions.Acl {
// ACL
session.AccessRights = policy.AccessRights
session.HMACEnabled = policy.HMACEnabled
}

} else {
// This is not a partitioned policy, apply everything
// Quotas
session.QuotaMax = policy.QuotaMax
session.QuotaRenewalRate = policy.QuotaRenewalRate

// Rate limting
session.Allowance = policy.Rate // This is a legacy thing, merely to make sure output is consistent. Needs to be purged
session.Rate = policy.Rate
session.Per = policy.Per
if policy.LastUpdated != "" {
session.LastUpdated = policy.LastUpdated
}

// ACL
session.AccessRights = policy.AccessRights
session.HMACEnabled = policy.HMACEnabled
}

// Required for all
session.IsInactive = policy.IsInactive
session.Tags = policy.Tags

// Update the session in the session manager in case it gets called again
t.Spec.SessionManager.UpdateSession(key, session, getLifetime(t.Spec, session))
}

// CheckSessionAndIdentityForValidKey will check first the Session store for a valid key, if not found, it will try
// the Auth Handler, if not found it will fail
func (t *BaseMiddleware) CheckSessionAndIdentityForValidKey(key string) (SessionState, bool) {
// Try and get the session from the session store
log.Debug("Querying local cache")
// Check in-memory cache
if !globalConf.LocalSessionCache.DisableCacheSessionState {
cachedVal, found := SessionCache.Get(key)
if found {
log.Debug("--> Key found in local cache")
session := cachedVal.(SessionState)
t.ApplyPolicyIfExists(key, &session)
return session, true
}
}

// Check session store
log.Debug("Querying keystore")
session, found := t.Spec.SessionManager.SessionDetail(key)
if found {
// If exists, assume it has been authorized and pass on
// cache it
go SessionCache.Set(key, session, cache.DefaultExpiration)

// Check for a policy, if there is a policy, pull it and overwrite the session values
t.ApplyPolicyIfExists(key, &session)
log.Debug("--> Got key")
return session, true
}

log.Debug("Querying authstore")
// 2. If not there, get it from the AuthorizationHandler
session, found = t.Spec.AuthManager.IsKeyAuthorised(key)
if found {
// If not in Session, and got it from AuthHandler, create a session with a new TTL
log.Info("Recreating session for key: ", key)

// cache it
go SessionCache.Set(key, session, cache.DefaultExpiration)

// Check for a policy, if there is a policy, pull it and overwrite the session values
t.ApplyPolicyIfExists(key, &session)

log.Debug("Lifetime is: ", getLifetime(t.Spec, &session))
// Need to set this in order for the write to work!
session.LastUpdated = time.Now().String()
t.Spec.SessionManager.UpdateSession(key, &session, getLifetime(t.Spec, &session))
}

return session, found
}

// SuccessHandler represents the final ServeHTTP() request for a proxied API request
type SuccessHandler struct {
*BaseMiddleware
Expand Down Expand Up @@ -241,7 +75,7 @@ func (s *SuccessHandler) RecordHit(r *http.Request, timing int64, code int, requ

rawRequest := ""
rawResponse := ""
if RecordDetail(r) {
if recordDetail(r) {
// Get the wire format representation
var wireFormatReq bytes.Buffer
requestCopy.Write(&wireFormatReq)
Expand Down Expand Up @@ -316,6 +150,23 @@ func (s *SuccessHandler) RecordHit(r *http.Request, timing int64, code int, requ
}
}

func recordDetail(r *http.Request) bool {
// Are we even checking?
if !globalConf.EnforceOrgDataDeailLogging {
return globalConf.AnalyticsConfig.EnableDetailedRecording
}

// We are, so get session data
ses := r.Context().Value(OrgSessionContext)
if ses == nil {
// no session found, use global config
return globalConf.AnalyticsConfig.EnableDetailedRecording
}

// Session found
return ses.(SessionState).EnableDetailedRecording
}

// ServeHTTP will store the request details in the analytics store if necessary and proxy the request to it's
// final destination, this is invoked by the ProxyHandler or right at the start of a request chain if the URL
// Spec states the path is Ignored
Expand All @@ -329,7 +180,7 @@ func (s *SuccessHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) *http
}

var copiedRequest *http.Request
if RecordDetail(r) {
if recordDetail(r) {
copiedRequest = copyRequest(r)
}

Expand All @@ -342,7 +193,7 @@ func (s *SuccessHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) *http

if resp != nil {
var copiedResponse *http.Response
if RecordDetail(r) {
if recordDetail(r) {
copiedResponse = copyResponse(resp)
}
s.RecordHit(r, int64(millisec), resp.StatusCode, copiedRequest, copiedResponse)
Expand All @@ -361,7 +212,7 @@ func (s *SuccessHandler) ServeHTTPWithCache(w http.ResponseWriter, r *http.Reque
}

var copiedRequest *http.Request
if RecordDetail(r) {
if recordDetail(r) {
copiedRequest = copyRequest(r)
}

Expand All @@ -370,7 +221,7 @@ func (s *SuccessHandler) ServeHTTPWithCache(w http.ResponseWriter, r *http.Reque
t2 := time.Now()

var copiedResponse *http.Response
if RecordDetail(r) {
if recordDetail(r) {
copiedResponse = copyResponse(inRes)
}

Expand Down
2 changes: 1 addition & 1 deletion host_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"time"

"github.com/jeffail/tunny"
"github.com/pmylund/go-cache"
cache "github.com/pmylund/go-cache"
)

const (
Expand Down
Loading

0 comments on commit f4c2cd2

Please sign in to comment.