Skip to content

Commit

Permalink
Analytics recorded and purged
Browse files Browse the repository at this point in the history
  • Loading branch information
lonelycode committed Oct 27, 2015
1 parent ce1de28 commit 7ac7c91
Show file tree
Hide file tree
Showing 7 changed files with 218 additions and 18 deletions.
90 changes: 86 additions & 4 deletions analytics.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,10 @@ func (c CSVPurger) PurgeCache() {
// MongoPurger will purge analytics data into a Mongo database, requires that the Mongo DB string is specified
// in the Config object
type MongoPurger struct {
Store *RedisClusterStorageManager
dbSession *mgo.Session
Store *RedisClusterStorageManager
dbSession *mgo.Session
CollectionName string
SetKeyName string
}

// Connect Connects to Mongo
Expand Down Expand Up @@ -208,9 +210,17 @@ func (m *MongoPurger) PurgeCache() {
m.Connect()
m.PurgeCache()
} else {
analyticsCollection := m.dbSession.DB("").C(config.AnalyticsConfig.MongoCollection)
collectionName := config.AnalyticsConfig.MongoCollection
if m.CollectionName != "" {
collectionName = m.CollectionName
}
analyticsCollection := m.dbSession.DB("").C(collectionName)

AnalyticsValues := m.Store.GetAndDeleteSet(ANALYTICS_KEYNAME)
analyticsKeyName := ANALYTICS_KEYNAME
if m.SetKeyName != "" {
analyticsKeyName = m.SetKeyName
}
AnalyticsValues := m.Store.GetAndDeleteSet(analyticsKeyName)

if len(AnalyticsValues) > 0 {
keys := make([]interface{}, len(AnalyticsValues), len(AnalyticsValues))
Expand All @@ -236,6 +246,78 @@ func (m *MongoPurger) PurgeCache() {

}

// MongoPurger will purge analytics data into a Mongo database, requires that the Mongo DB string is specified
// in the Config object
type MongoUptimePurger struct {
Store *RedisClusterStorageManager
dbSession *mgo.Session
CollectionName string
SetKeyName string
}

// Connect Connects to Mongo
func (m *MongoUptimePurger) Connect() {
var err error
m.dbSession, err = mgo.Dial(config.AnalyticsConfig.MongoURL)
if err != nil {
log.Error("Mongo connection failed:", err)
time.Sleep(5)
m.Connect()
}
}

// StartPurgeLoop starts the loop that will be started as a goroutine and pull data out of the in-memory
// store and into MongoDB
func (m MongoUptimePurger) StartPurgeLoop(nextCount int) {
time.Sleep(time.Duration(nextCount) * time.Second)
m.PurgeCache()
m.StartPurgeLoop(nextCount)
}

// PurgeCache will pull the data from the in-memory store and drop it into the specified MongoDB collection
func (m *MongoUptimePurger) PurgeCache() {
if m.dbSession == nil {
log.Debug("Connecting to analytics store")
m.Connect()
m.PurgeCache()
} else {
collectionName := config.AnalyticsConfig.MongoCollection
if m.CollectionName != "" {
collectionName = m.CollectionName
}
analyticsCollection := m.dbSession.DB("").C(collectionName)

analyticsKeyName := ANALYTICS_KEYNAME
if m.SetKeyName != "" {
analyticsKeyName = m.SetKeyName
}
log.Debug("Getting from: ", analyticsKeyName)
AnalyticsValues := m.Store.GetAndDeleteSet(analyticsKeyName)

if len(AnalyticsValues) > 0 {
keys := make([]interface{}, len(AnalyticsValues), len(AnalyticsValues))

for i, v := range AnalyticsValues {
decoded := UptimeReportData{}
err := msgpack.Unmarshal(v.([]byte), &decoded)
log.Debug("Decoded Record: ", decoded)
if err != nil {
log.Error("Couldn't unmarshal analytics data:")
log.Error(err)
} else {
keys[i] = interface{}(decoded)
}
}

err := analyticsCollection.Insert(keys...)
if err != nil {
log.Error("Problem inserting to mongo collection: ", err)
}
}
}

}

type MockPurger struct {
Store *RedisClusterStorageManager
}
Expand Down
7 changes: 4 additions & 3 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,10 @@ type Config struct {
UptimeTests struct {
Disable bool `json:"disable"`
Config struct {
FailureTriggerSampleSize int `json:"failure_trigger_sample_size"`
TimeWait int `json:"time_wait"`
CheckerPoolSize int `json:"checker_pool_size"`
FailureTriggerSampleSize int `json:"failure_trigger_sample_size"`
TimeWait int `json:"time_wait"`
CheckerPoolSize int `json:"checker_pool_size"`
EnableUptimeAnalytics bool `json:"enable_uptime_analytics"`
} `json:"config"`
} `json:"uptime_tests"`
}
Expand Down
2 changes: 1 addition & 1 deletion event_handler_webhooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (w WebHookHandler) New(handlerConf interface{}) (TykEventHandler, error) {
// Pre-load template on init
webHookTemplate, tErr := template.ParseFiles(thisHandler.conf.TemplatePath)
if tErr != nil {
log.Error("Failed to load webhook template! Using default. Error was: ", tErr)
log.Warning("[WEBHOOK] template failure, using default: ", tErr)
defaultPath := path.Join(config.TemplatePath, "default_webhook.json")
webHookTemplate, _ = template.ParseFiles(defaultPath)
}
Expand Down
9 changes: 8 additions & 1 deletion host_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type HostHealthReport struct {
type HostUptimeChecker struct {
failureCallback func(HostHealthReport)
upCallback func(HostHealthReport)
pingCallback func(HostHealthReport)
workerPoolSize int
sampleTriggerLimit int
checkTimout int
Expand Down Expand Up @@ -98,6 +99,7 @@ func (h *HostUptimeChecker) HostReporter() {
h.upCallback(okHost)
delete(h.unHealthyList, okHost.ID)
}
go h.pingCallback(okHost)

case failedHost := <-h.errorChan:

Expand All @@ -120,6 +122,7 @@ func (h *HostUptimeChecker) HostReporter() {
go h.failureCallback(failedHost)
}
}
go h.pingCallback(failedHost)

case <-h.stopPollingChan:
log.Debug("[HOST CHECKER] Received kill signal")
Expand Down Expand Up @@ -180,7 +183,7 @@ func (h *HostUptimeChecker) CheckHost(toCheck HostData) {
h.okChan <- report
}

func (h *HostUptimeChecker) Init(workers, triggerLimit, timeout int, hostList map[string]HostData, failureCallback func(HostHealthReport), upCallback func(HostHealthReport)) {
func (h *HostUptimeChecker) Init(workers, triggerLimit, timeout int, hostList map[string]HostData, failureCallback func(HostHealthReport), upCallback func(HostHealthReport), pingCallback func(HostHealthReport)) {
h.sampleCache = cache.New(30*time.Second, 5*time.Second)
h.stopPollingChan = make(chan bool)
h.errorChan = make(chan HostHealthReport)
Expand All @@ -189,6 +192,7 @@ func (h *HostUptimeChecker) Init(workers, triggerLimit, timeout int, hostList ma
h.unHealthyList = make(map[string]bool)
h.failureCallback = failureCallback
h.upCallback = upCallback
h.pingCallback = pingCallback

h.workerPoolSize = workers
if workers == 0 {
Expand Down Expand Up @@ -277,6 +281,9 @@ func hostcheck_example() {
// On success
func(fr HostHealthReport) {
log.Info("Host is back up! URL: ", fr.CheckURL)
},
func(fr HostHealthReport) {
log.Info("Host report, URL: ", fr.CheckURL)
})

// Start the check loop
Expand Down
116 changes: 112 additions & 4 deletions host_checker_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
b64 "encoding/base64"
"github.com/lonelycode/go-uuid/uuid"
"github.com/lonelycode/tykcommon"
"gopkg.in/vmihailenco/msgpack.v2"
"net/url"
"time"
)
Expand All @@ -12,12 +13,44 @@ var GlobalHostChecker HostCheckerManager

type HostCheckerManager struct {
Id string
store StorageHandler
store *RedisClusterStorageManager
checker *HostUptimeChecker
stopLoop bool
pollerStarted bool
unhealthyHostList map[string]bool
currentHostList map[string]HostData
Clean Purger
}

type UptimeReportData struct {
URL string
RequestTime int64
ResponseCode int
TCPError bool
ServerError bool
Day int
Month time.Month
Year int
Hour int
TimeStamp time.Time
ExpireAt time.Time `bson:"expireAt" json:"expireAt"`
APIID string
OrgID string
}

func (u *UptimeReportData) SetExpiry(expiresInSeconds int64) {
var expiry time.Duration

expiry = time.Duration(expiresInSeconds) * time.Second

if expiresInSeconds == 0 {
// Expiry is set to 100 years
expiry = (24 * time.Hour) * (365 * 100)
}

t := time.Now()
t2 := t.Add(expiry)
u.ExpireAt = t2
}

const (
Expand All @@ -26,9 +59,11 @@ const (
UnHealthyHostMetaDataHostKey string = "host_name"
PollerCacheKey string = "PollerActiveInstanceID"
PoolerHostSentinelKeyPrefix string = "PollerCheckerInstance:"

UptimeAnalytics_KEYNAME string = "tyk-uptime-analytics"
)

func (hc *HostCheckerManager) Init(store StorageHandler) {
func (hc *HostCheckerManager) Init(store *RedisClusterStorageManager) {
hc.store = store
hc.unhealthyHostList = make(map[string]bool)
// Generate a new ID for ourselves
Expand All @@ -39,6 +74,9 @@ func (hc *HostCheckerManager) Start() {
// Start loop to check if we are active instance
if hc.Id != "" {
go hc.CheckActivePollerLoop()
if config.UptimeTests.Config.EnableUptimeAnalytics {
go hc.UptimePurgeLoop()
}
}
}

Expand Down Expand Up @@ -72,6 +110,24 @@ func (hc *HostCheckerManager) CheckActivePollerLoop() {
}
}

func (hc *HostCheckerManager) UptimePurgeLoop() {
if config.AnalyticsConfig.PurgeDelay == -1 {
log.Warning("Analytics purge turned off, you are responsible for Redis storage maintenance.")
return
}
log.Debug("[HOST CHECK MANAGER] Started analytics purge loop")
for {
if hc.pollerStarted {
if hc.Clean != nil {
log.Debug("[HOST CHECK MANAGER] Purging uptime analytics")
hc.Clean.PurgeCache()
}

}
time.Sleep(time.Duration(config.AnalyticsConfig.PurgeDelay) * time.Second)
}
}

func (hc *HostCheckerManager) AmIPolling() bool {
if hc.store == nil {
log.Error("[HOST CHECK MANAGER] No storage instance set for uptime tests! Disabling poller...")
Expand Down Expand Up @@ -110,7 +166,8 @@ func (hc *HostCheckerManager) StartPoller() {
config.UptimeTests.Config.TimeWait,
hc.currentHostList,
hc.OnHostDown, // On failure
hc.OnHostBackUp) // On success
hc.OnHostBackUp, // On success
hc.OnHostReport) // All reports

// Start the check loop
log.Debug("---> Starting checker")
Expand All @@ -128,6 +185,12 @@ func (hc *HostCheckerManager) getHostKey(report HostHealthReport) string {
return PoolerHostSentinelKeyPrefix + report.MetaData[UnHealthyHostMetaDataHostKey]
}

func (hc *HostCheckerManager) OnHostReport(report HostHealthReport) {
if config.UptimeTests.Config.EnableUptimeAnalytics {
go hc.RecordUptimeAnalytics(report)
}
}

func (hc *HostCheckerManager) OnHostDown(report HostHealthReport) {
log.Debug("Update key: ", hc.getHostKey(report))
hc.store.SetKey(hc.getHostKey(report), "1", int64(config.UptimeTests.Config.TimeWait))
Expand Down Expand Up @@ -234,8 +297,53 @@ func (hc *HostCheckerManager) UpdateTrackingList(hd []HostData) {
}
}

func InitHostCheckManager(store StorageHandler) {
// RecordHit will store an AnalyticsRecord in Redis
func (hc HostCheckerManager) RecordUptimeAnalytics(thisReport HostHealthReport) error {
// If we are obfuscating API Keys, store the hashed representation (config check handled in hashing function)

thisSpec, found := ApiSpecRegister[thisReport.MetaData[UnHealthyHostMetaDataAPIKey]]
thisOrg := ""
if found {
thisOrg = thisSpec.OrgID
}

t := time.Now()

var serverError bool
if thisReport.ResponseCode > 200 {
serverError = true
}
newAnalyticsRecord := UptimeReportData{
URL: thisReport.CheckURL,
RequestTime: int64(thisReport.Latency),
ResponseCode: thisReport.ResponseCode,
TCPError: thisReport.IsTCPError,
ServerError: serverError,
Day: t.Day(),
Month: t.Month(),
Year: t.Year(),
Hour: t.Hour(),
TimeStamp: t,
APIID: thisReport.MetaData[UnHealthyHostMetaDataAPIKey],
OrgID: thisOrg,
}

newAnalyticsRecord.SetExpiry(thisSpec.UptimeTests.Config.ExpireUptimeAnalyticsAfter)

encoded, err := msgpack.Marshal(newAnalyticsRecord)

if err != nil {
log.Error("Error encoding uptime data:", err)
return err
}

hc.store.AppendToSet(UptimeAnalytics_KEYNAME, string(encoded))
return nil
}

func InitHostCheckManager(store *RedisClusterStorageManager, purger Purger) {
GlobalHostChecker = HostCheckerManager{}
GlobalHostChecker.Clean = purger
GlobalHostChecker.Init(store)
GlobalHostChecker.Start()
}
Expand Down
10 changes: 6 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ func setupGlobals() {
log.Panic("Analytics requires Redis Storage backend, please enable Redis in the tyk.conf file.")
}

// Initialise our Host Checker
HealthCheckStore := &RedisClusterStorageManager{KeyPrefix: "host-checker:"}
InitHostCheckManager(HealthCheckStore, nil)

if config.EnableAnalytics {
config.loadIgnoredIPs()
AnalyticsStore := RedisClusterStorageManager{KeyPrefix: "analytics-"}
Expand All @@ -77,7 +81,8 @@ func setupGlobals() {

} else if config.AnalyticsConfig.Type == "mongo" {
log.Debug("Using MongoDB cache purge")
analytics.Clean = &MongoPurger{&AnalyticsStore, nil}
analytics.Clean = &MongoPurger{&AnalyticsStore, nil, "", ""}
GlobalHostChecker.Clean = &MongoUptimePurger{HealthCheckStore, nil, "tyk_uptime_analytics", UptimeAnalytics_KEYNAME}
} else if config.AnalyticsConfig.Type == "rpc" {
log.Debug("Using RPC cache purge")
thisPurger := RPCPurger{Store: &AnalyticsStore, Address: config.SlaveOptions.ConnectionString}
Expand Down Expand Up @@ -807,9 +812,6 @@ func main() {
go RPCListener.StartRPCLoopCheck(config.SlaveOptions.RPCKey)
}

// Initialise our Host Checker
InitHostCheckManager(GetGlobalStorageHandler("host-checker:", false))

// Handle reload when SIGUSR2 is received
l, err := goagain.Listener()
if nil != err {
Expand Down
Loading

0 comments on commit 7ac7c91

Please sign in to comment.