Skip to content

Commit

Permalink
=redis切换回redigo,redis-go断线重连有问题
Browse files Browse the repository at this point in the history
  • Loading branch information
liangdas committed Aug 20, 2017
1 parent 233073b commit 03b7ed1
Show file tree
Hide file tree
Showing 9 changed files with 308 additions and 247 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ QQ交流群 :463735103
go get github.com/golang/net/context
go get github.com/opentracing/basictracer-go
go get github.com/opentracing/opentracing-go
go get github.com/go-redis/redis
go get github.com/garyburd/redigo

# 文档

Expand Down
81 changes: 49 additions & 32 deletions rpc/base/redis_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ import (
"github.com/liangdas/mqant/rpc/pb"
"github.com/liangdas/mqant/rpc"
"github.com/liangdas/mqant/rpc/util"
"github.com/garyburd/redigo/redis"
)

type RedisClient struct {
//callinfos map[string]*ClinetCallInfo
callinfos *utils.BeeMap
cmutex sync.Mutex //操作callinfos的锁
psc redis.PubSubConn
info *conf.Redis
queueName string
callbackqueueName string
Expand All @@ -42,20 +44,23 @@ func createQueueName()string{
return fmt.Sprintf("callbackqueueName:%d",time.Now().Nanosecond())
}
func NewRedisClient(info *conf.Redis) (client *RedisClient, err error) {
var url = info.Uri
client = new(RedisClient)
client.callinfos = utils.NewBeeMap()
client.info=info
client.callbackqueueName = createQueueName()
client.queueName = info.Queue
client.done = make(chan error)
psc := redis.PubSubConn{Conn: utils.GetRedisFactory().GetPool(url).Get()}
psc.Subscribe(client.callbackqueueName)

pool,errs:=utils.GetRedisFactory().GetPool(info.Uri)
if errs!=nil{
return nil,err
}
if errs := pool.Expire(client.callbackqueueName,time.Second*60).Err(); err != nil {
pool:=utils.GetRedisFactory().GetPool(info.Uri).Get()
defer pool.Close()
_, errs:=pool.Do("EXPIRE", client.callbackqueueName, 60)
if errs != nil {
log.Warning(errs.Error())
}
client.psc = psc
go client.on_response_handle(client.done)
client.on_timeout_handle(nil) //处理超时请求的协程
return client, nil
Expand All @@ -66,15 +71,13 @@ func NewRedisClient(info *conf.Redis) (client *RedisClient, err error) {
//}
}

func (c *RedisClient) Done() (error) {
func (c *RedisClient) Done() (err error) {
pool:=utils.GetRedisFactory().GetPool(c.info.Uri).Get()
defer pool.Close()
//关闭amqp链接通道
c.psc.Unsubscribe(c.callbackqueueName)
//删除临时通道
pool,errs:=utils.GetRedisFactory().GetPool(c.info.Uri)
if errs!=nil{
return errs
}
if errs := pool.Del(c.callbackqueueName).Err(); errs != nil {
log.Warning(errs.Error())
}
pool.Do("DEL", c.callbackqueueName)
//err = c.psc.Close()
//清理 callinfos 列表
for key, clinetCallInfo := range c.callinfos.Items() {
Expand All @@ -86,13 +89,15 @@ func (c *RedisClient) Done() (error) {
}
}
c.callinfos = nil
return nil
return
}

/**
消息请求
*/
func (c *RedisClient) Call(callInfo mqrpc.CallInfo, callback chan rpcpb.ResultInfo) error {
pool:=utils.GetRedisFactory().GetPool(c.info.Uri).Get()
defer pool.Close()
var err error
if c.callinfos == nil {
return fmt.Errorf("RedisClient is closed")
Expand All @@ -111,11 +116,9 @@ func (c *RedisClient) Call(callInfo mqrpc.CallInfo, callback chan rpcpb.ResultIn
if err != nil {
return err
}
pool,errs:=utils.GetRedisFactory().GetPool(c.info.Uri)
if errs!=nil{
return err
}
if err := pool.LPush(c.queueName, body).Err(); err != nil {

_, err = pool.Do("PUBLISH", c.queueName, body)
if err != nil {
log.Warning("Publish: %s", err)
return err
}
Expand All @@ -126,17 +129,16 @@ func (c *RedisClient) Call(callInfo mqrpc.CallInfo, callback chan rpcpb.ResultIn
消息请求 不需要回复
*/
func (c *RedisClient) CallNR(callInfo mqrpc.CallInfo) error {
pool:=utils.GetRedisFactory().GetPool(c.info.Uri).Get()
defer pool.Close()
var err error

body, err := c.Marshal(&callInfo.RpcInfo)
if err != nil {
return err
}
pool,errs:=utils.GetRedisFactory().GetPool(c.info.Uri)
if errs!=nil{
return err
}
if err := pool.LPush(c.queueName, body).Err(); err != nil {
_, err = pool.Do("PUBLISH", c.queueName, body)
if err != nil {
log.Warning("Publish: %s", err)
return err
}
Expand Down Expand Up @@ -178,14 +180,9 @@ func (c *RedisClient) on_timeout_handle(args interface{}) {
*/
func (c *RedisClient) on_response_handle(done chan error) {
for {
pool,errs:=utils.GetRedisFactory().GetPool(c.info.Uri)
if errs!=nil{
log.Error(errs.Error())
return
}
result, err := pool.BRPop(1*time.Second, c.callbackqueueName).Result()
if err == nil {
resultInfo,err := c.UnmarshalResult([]byte(result[1]))
switch v := c.psc.Receive().(type) {
case redis.Message:
resultInfo,err := c.UnmarshalResult(v.Data)
if err != nil {
log.Error("Unmarshal faild", err)
} else {
Expand All @@ -197,6 +194,26 @@ func (c *RedisClient) on_response_handle(done chan error) {
//删除
c.callinfos.Delete(correlation_id)
}
case redis.PMessage:
resultInfo,err := c.UnmarshalResult(v.Data)
if err != nil {
log.Error("Unmarshal faild", err)
} else {
correlation_id := resultInfo.Cid
clinetCallInfo := c.callinfos.Get(correlation_id)
if clinetCallInfo != nil {
clinetCallInfo.(ClinetCallInfo).call <- *resultInfo
}
//删除
c.callinfos.Delete(correlation_id)
}
case redis.Subscription:
log.Info("%s: %s %d\n", v.Channel, v.Kind, v.Count)
case error:
log.Error("on_response_handle",v.Error())
return
default:

}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,12 @@ import (
"github.com/liangdas/mqant/rpc/pb"
"github.com/liangdas/mqant/rpc"
"github.com/liangdas/mqant/rpc/util"
"github.com/garyburd/redigo/redis"
)

type RedisClient struct {
//callinfos map[string]*ClinetCallInfo
callinfos *utils.BeeMap
cmutex sync.Mutex //操作callinfos的锁
psc redis.PubSubConn
info *conf.Redis
queueName string
callbackqueueName string
Expand All @@ -44,23 +42,20 @@ func createQueueName()string{
return fmt.Sprintf("callbackqueueName:%d",time.Now().Nanosecond())
}
func NewRedisClient(info *conf.Redis) (client *RedisClient, err error) {
var url = info.Uri
client = new(RedisClient)
client.callinfos = utils.NewBeeMap()
client.info=info
client.callbackqueueName = createQueueName()
client.queueName = info.Queue
client.done = make(chan error)
psc := redis.PubSubConn{Conn: utils.GetRedisFactory().GetPool(url).Get()}
psc.Subscribe(client.callbackqueueName)

pool:=utils.GetRedisFactory().GetPool(info.Uri).Get()
defer pool.Close()
_, errs:=pool.Do("EXPIRE", client.callbackqueueName, 60)
if errs != nil {
pool,errs:=utils.GetRedisFactory().GetPool(info.Uri)
if errs!=nil{
return nil,err
}
if errs := pool.Expire(client.callbackqueueName,time.Second*60).Err(); err != nil {
log.Warning(errs.Error())
}
client.psc = psc
go client.on_response_handle(client.done)
client.on_timeout_handle(nil) //处理超时请求的协程
return client, nil
Expand All @@ -71,13 +66,15 @@ func NewRedisClient(info *conf.Redis) (client *RedisClient, err error) {
//}
}

func (c *RedisClient) Done() (err error) {
pool:=utils.GetRedisFactory().GetPool(c.info.Uri).Get()
defer pool.Close()
//关闭amqp链接通道
c.psc.Unsubscribe(c.callbackqueueName)
func (c *RedisClient) Done() (error) {
//删除临时通道
pool.Do("DEL", c.callbackqueueName)
pool,errs:=utils.GetRedisFactory().GetPool(c.info.Uri)
if errs!=nil{
return errs
}
if errs := pool.Del(c.callbackqueueName).Err(); errs != nil {
log.Warning(errs.Error())
}
//err = c.psc.Close()
//清理 callinfos 列表
for key, clinetCallInfo := range c.callinfos.Items() {
Expand All @@ -89,15 +86,13 @@ func (c *RedisClient) Done() (err error) {
}
}
c.callinfos = nil
return
return nil
}

/**
消息请求
*/
func (c *RedisClient) Call(callInfo mqrpc.CallInfo, callback chan rpcpb.ResultInfo) error {
pool:=utils.GetRedisFactory().GetPool(c.info.Uri).Get()
defer pool.Close()
var err error
if c.callinfos == nil {
return fmt.Errorf("RedisClient is closed")
Expand All @@ -116,9 +111,11 @@ func (c *RedisClient) Call(callInfo mqrpc.CallInfo, callback chan rpcpb.ResultIn
if err != nil {
return err
}

_, err = pool.Do("PUBLISH", c.queueName, body)
if err != nil {
pool,errs:=utils.GetRedisFactory().GetPool(c.info.Uri)
if errs!=nil{
return err
}
if err := pool.LPush(c.queueName, body).Err(); err != nil {
log.Warning("Publish: %s", err)
return err
}
Expand All @@ -129,16 +126,17 @@ func (c *RedisClient) Call(callInfo mqrpc.CallInfo, callback chan rpcpb.ResultIn
消息请求 不需要回复
*/
func (c *RedisClient) CallNR(callInfo mqrpc.CallInfo) error {
pool:=utils.GetRedisFactory().GetPool(c.info.Uri).Get()
defer pool.Close()
var err error

body, err := c.Marshal(&callInfo.RpcInfo)
if err != nil {
return err
}
_, err = pool.Do("PUBLISH", c.queueName, body)
if err != nil {
pool,errs:=utils.GetRedisFactory().GetPool(c.info.Uri)
if errs!=nil{
return err
}
if err := pool.LPush(c.queueName, body).Err(); err != nil {
log.Warning("Publish: %s", err)
return err
}
Expand Down Expand Up @@ -180,22 +178,14 @@ func (c *RedisClient) on_timeout_handle(args interface{}) {
*/
func (c *RedisClient) on_response_handle(done chan error) {
for {
switch v := c.psc.Receive().(type) {
case redis.Message:
resultInfo,err := c.UnmarshalResult(v.Data)
if err != nil {
log.Error("Unmarshal faild", err)
} else {
correlation_id := resultInfo.Cid
clinetCallInfo := c.callinfos.Get(correlation_id)
if clinetCallInfo != nil {
clinetCallInfo.(ClinetCallInfo).call <- *resultInfo
}
//删除
c.callinfos.Delete(correlation_id)
}
case redis.PMessage:
resultInfo,err := c.UnmarshalResult(v.Data)
pool,errs:=utils.GetRedisFactory().GetPool(c.info.Uri)
if errs!=nil{
log.Error(errs.Error())
return
}
result, err := pool.BRPop(1*time.Second, c.callbackqueueName).Result()
if err == nil {
resultInfo,err := c.UnmarshalResult([]byte(result[1]))
if err != nil {
log.Error("Unmarshal faild", err)
} else {
Expand All @@ -207,13 +197,6 @@ func (c *RedisClient) on_response_handle(done chan error) {
//删除
c.callinfos.Delete(correlation_id)
}
case redis.Subscription:
log.Info("%s: %s %d\n", v.Channel, v.Kind, v.Count)
case error:
log.Error("on_response_handle",v.Error())
return
default:

}

}
Expand Down
Loading

0 comments on commit 03b7ed1

Please sign in to comment.