Skip to content
This repository has been archived by the owner on Aug 23, 2022. It is now read-only.

Commit

Permalink
Redis connection changes for go1.0.3 compatibility
Browse files Browse the repository at this point in the history
We now create new connections for every redis request.  Since the
connections are cached, this should not impact performance with network
overhead.  This helps us by not having the requirement of having locks
on the shared connection.  Furthermore, we can process more concurrent
requests this way.  Finally, we don't run up on the issue of go1.0.3
processing global scopes slightly differently.
  • Loading branch information
Micha Gorelick committed Jan 28, 2013
1 parent 2710113 commit 0e46b58
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 49 deletions.
13 changes: 3 additions & 10 deletions goforget/forget.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

var (
VERSION = "0.1"
VERSION = "0.2"
showVersion = flag.Bool("version", false, "print version string")
httpAddress = flag.String("http", ":8080", "HTTP service address (e.g., ':8080')")
redisHost = flag.String("redis-host", "", "Redis host in the form host:port:db.")
Expand Down Expand Up @@ -217,22 +217,15 @@ func main() {
return
}

var err error
rdb, err = ConnectRedis(*redisHost)
if err != nil {
log.Printf("Could not connect to redis host: %s: %s", *redisHost, err)
return
} else {
log.Printf("Connected to %s", *redisHost)
}
redisServer = NewRedisServer(*redisHost)

log.Printf("Starting %d update worker(s)", *nWorkers)
workerWaitGroup := sync.WaitGroup{}
updateChan = make(chan *Distribution, 10) //25 * *nWorkers)
for i := 0; i < *nWorkers; i++ {
workerWaitGroup.Add(1)
go func() {
UpdateRedis(*redisHost, updateChan)
UpdateRedis(updateChan)
workerWaitGroup.Done()
}()
}
Expand Down
102 changes: 63 additions & 39 deletions goforget/redis_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,59 @@ import (
"log"
"math"
"strings"
"sync"
"time"
)

var rdb redis.Conn
var rLock sync.RWMutex
type RedisServer struct {
Raw string
Host string
Port string
Db string

func UpdateRedis(redisHost string, readChan chan *Distribution) {
lredis, err := ConnectRedis(redisHost)
hostname string
}

func NewRedisServer(rawString string) *RedisServer {
parts := strings.Split(rawString, ":")
if len(parts) != 3 {
log.Panicf("redis-host must be in the form host:port:db")
}
return &RedisServer{
Raw: rawString,
Host: parts[0],
Port: parts[1],
Db: parts[2],
hostname: parts[0] + ":" + parts[1],
}
}

func (rs *RedisServer) Connect() (redis.Conn, error) {
rdb, err := redis.Dial("tcp", rs.hostname)
if err == nil {
ok, err := rdb.Do("SELECT", rs.Db)
if ok != "OK" || err != nil {
return nil, err
}
} else {
return nil, err
}
return rdb, nil
}

var redisServer *RedisServer

func UpdateRedis(readChan chan *Distribution) error {
lredis, err := redisServer.Connect()
if err != nil {
log.Printf("Could not connect to redis host: %s: %s", redisHost, err)
return
return err
}

for dist := range readChan {
log.Printf("Updating distribution: %s", dist.Name)
ok := UpdateDistribution(lredis, dist)
if !ok {
log.Println("Failed to update: %s", dist.Name)
log.Printf("Failed to update: %s", dist.Name)
}
}
}
Expand All @@ -39,7 +73,7 @@ func UpdateDistribution(rconn redis.Conn, dist *Distribution) bool {
if dist.Full() == false {
err := dist.Fill()
if err != nil {
log.Printf("Could not update %s: %s", dist.Name, err)
log.Printf("Could not fill %s: %s", dist.Name, err)
return false
}
dist.Decay()
Expand All @@ -51,12 +85,10 @@ func UpdateDistribution(rconn redis.Conn, dist *Distribution) bool {
return true
}

rLock.Lock()
rconn.Send("MULTI")

if dist.Z == 0 {
rconn.Send("DISCARD")
rLock.Unlock()
return false
}

Expand All @@ -83,7 +115,6 @@ func UpdateDistribution(rconn redis.Conn, dist *Distribution) bool {
rconn.Send("EXPIRE", TName, expTime)

_, err := rconn.Do("EXEC")
rLock.Unlock()
if err != nil {
log.Printf("Could not update %s: %s", dist.Name, err)
return false
Expand All @@ -92,65 +123,58 @@ func UpdateDistribution(rconn redis.Conn, dist *Distribution) bool {
}

func GetField(distribution, field string) ([]interface{}, error) {
rLock.RLock()
rdb, err := redisServer.Connect()
if err != nil {
return nil, err
}

rdb.Send("MULTI")
rdb.Send("ZSCORE", distribution, field)
rdb.Send("GET", fmt.Sprintf("%s.%s", distribution, "_Z"))
rdb.Send("GET", fmt.Sprintf("%s.%s", distribution, "_T"))
data, err := redis.MultiBulk(rdb.Do("EXEC"))
rLock.RUnlock()
return data, err
}

func GetNMostProbable(distribution string, N int) ([]interface{}, error) {
rLock.RLock()
rdb, err := redisServer.Connect()
if err != nil {
return nil, err
}

rdb.Send("MULTI")
rdb.Send("ZREVRANGEBYSCORE", distribution, "+INF", "-INF", "WITHSCORES", "LIMIT", 0, N)
rdb.Send("GET", fmt.Sprintf("%s.%s", distribution, "_Z"))
rdb.Send("GET", fmt.Sprintf("%s.%s", distribution, "_T"))
data, err := redis.MultiBulk(rdb.Do("EXEC"))
rLock.RUnlock()
return data, err
}

func IncrField(distribution string, fields []string, N int) error {
rLock.Lock()
rdb, err := redisServer.Connect()
if err != nil {
return err
}

rdb.Send("MULTI")
for _, field := range fields {
rdb.Send("ZINCRBY", distribution, N, field)
}
rdb.Send("INCRBY", fmt.Sprintf("%s.%s", distribution, "_Z"), N)
rdb.Send("SETNX", fmt.Sprintf("%s.%s", distribution, "_T"), int(time.Now().Unix()))
_, err := rdb.Do("EXEC")
rLock.Unlock()
_, err = rdb.Do("EXEC")
return err
}

func GetDistribution(distribution string) ([]interface{}, error) {
rLock.RLock()
rdb, err := redisServer.Connect()
if err != nil {
return nil, err
}

rdb.Send("MULTI")
rdb.Send("GET", fmt.Sprintf("%s.%s", distribution, "_T"))
rdb.Send("ZRANGE", distribution, 0, -1, "WITHSCORES")
data, err := redis.MultiBulk(rdb.Do("EXEC"))
rLock.RUnlock()
return data, err
}

func ConnectRedis(host string) (redis.Conn, error) {
parts := strings.Split(host, ":")

if len(parts) != 3 {
log.Panicf("redis-host must be in the form host:port:db")
}

rdb, err := redis.Dial("tcp", fmt.Sprintf("%s:%s", parts[0], parts[1]))
if err == nil {
ok, err := rdb.Do("SELECT", parts[2])
if ok != "OK" || err != nil {
return nil, err
}
} else {
return nil, err
}
return rdb, nil
}

0 comments on commit 0e46b58

Please sign in to comment.