Skip to content

Commit

Permalink
Moved analytics to a redis list, much cleaner
Browse files Browse the repository at this point in the history
  • Loading branch information
lonelycode committed May 15, 2015
1 parent b0b0ea0 commit bcec5ea
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 16 deletions.
28 changes: 12 additions & 16 deletions analytics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main
import (
"encoding/csv"
"fmt"
"github.com/nu7hatch/gouuid"
"gopkg.in/vmihailenco/msgpack.v2"
"labix.org/v2/mgo"
"os"
Expand Down Expand Up @@ -33,6 +32,10 @@ type AnalyticsRecord struct {
ExpireAt time.Time `bson:"expireAt" json:"expireAt"`
}

const (
ANALYTICS_KEYNAME string = "tyk-system-analytics"
)

func (a *AnalyticsRecord) SetExpiry(expiresInSeconds int64) {
var expiry time.Duration

Expand Down Expand Up @@ -80,17 +83,14 @@ func (r RedisAnalyticsHandler) RecordHit(thisRecord AnalyticsRecord) error {
thisRecord.APIKey = publicHash(thisRecord.APIKey)

encoded, err := msgpack.Marshal(thisRecord)
u5, _ := uuid.NewV4()

keyName := fmt.Sprintf("%d%d%d%d-%s", thisRecord.Year, thisRecord.Month, thisRecord.Day, thisRecord.Hour, u5.String())

if err != nil {
log.Error("Error encoding analytics data:")
log.Error(err)
return AnalyticsError{}
}

r.Store.SetKey(keyName, string(encoded), 0)
r.Store.AppendToSet(ANALYTICS_KEYNAME, string(encoded))

return nil
}
Expand Down Expand Up @@ -197,32 +197,28 @@ func (m *MongoPurger) PurgeCache() {
m.PurgeCache()
} else {
analyticsCollection := m.dbSession.DB("").C(config.AnalyticsConfig.MongoCollection)
KeyValueMap := m.Store.GetKeysAndValues()

if len(KeyValueMap) > 0 {
keys := make([]interface{}, len(KeyValueMap), len(KeyValueMap))
keyNames := make([]string, len(KeyValueMap), len(KeyValueMap))
AnalyticsValues := m.Store.GetAndDeleteSet(ANALYTICS_KEYNAME)

i := 0
for k, v := range KeyValueMap {
keyNames[i] = k
if len(AnalyticsValues) > 0 {
keys := make([]interface{}, len(AnalyticsValues), len(AnalyticsValues))

for i, v := range AnalyticsValues {
decoded := AnalyticsRecord{}
err := msgpack.Unmarshal([]byte(v), &decoded)
err := msgpack.Unmarshal(v.([]byte), &decoded)
log.Warning("Decoded Record: ", decoded)
if err != nil {
log.Error("Couldn't unmarshal analytics data:")
log.Error(err)
} else {
keys[i] = interface{}(decoded)
}
i++
}

err := analyticsCollection.Insert(keys...)
if err != nil {
log.Error("Problem inserting to mongo collection")
log.Error(err)
} else {
m.Store.DeleteRawKeys(keyNames, "analytics-")
}
}
}
Expand Down
55 changes: 55 additions & 0 deletions storage_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,61 @@ func (r *RedisStorageManager) Publish(channel string, message string) error {
return nil
}

func (r *RedisStorageManager) GetAndDeleteSet(keyName string) []interface{} {
db := r.pool.Get()
defer db.Close()

log.Warning("Getting raw gkey set: ", keyName)
if db == nil {
log.Warning("Connection dropped, connecting..")
r.Connect()
r.GetAndDeleteSet(keyName)
} else {
log.Warning("keyName is: ", keyName)
fixedKey := r.fixKey(keyName)
log.Warning("Fixed keyname is: ", fixedKey)
db.Send("MULTI")
// Get all the elements
db.Send("LRANGE", fixedKey, 0, -1)
// Trim it to zero
db.Send("DEL", fixedKey)
// Execute
r, err := redis.Values(db.Do("EXEC"))

vals := r[0].([]interface{})

log.Warning("Returned: ", vals)

if err != nil {
log.Error("Multi command failed: ", err)
}

return vals
}
return []interface{}{}
}

func (r *RedisStorageManager) AppendToSet(keyName string, value string) {
db := r.pool.Get()
defer db.Close()

log.Warning("Pushing to raw key set: ", keyName)
if db == nil {
log.Warning("Connection dropped, connecting..")
r.Connect()
r.AppendToSet(keyName, value)
} else {
_, err := db.Do("RPUSH", r.fixKey(keyName), value)

if err != nil {
log.Error("Error trying to delete keys:")
log.Error(err)
}

return
}
}

// IncrementWithExpire will increment a key in redis
func (r *RedisStorageManager) SetRollingWindow(keyName string, per int64, expire int64) int {
db := r.pool.Get()
Expand Down

0 comments on commit bcec5ea

Please sign in to comment.