Skip to content

Commit

Permalink
v1.2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
liangdas committed Mar 21, 2017
1 parent 167621c commit 16fec55
Show file tree
Hide file tree
Showing 54 changed files with 8,723 additions and 118 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ bug请直接通过issue提交


##版本日志
###[v1.2.0新特性](https://github.com/liangdas/mqant/wiki/v1.2.0)
###[v1.1.0新特性](https://github.com/liangdas/mqant/wiki/v1.1.0)

Expand Down
2 changes: 1 addition & 1 deletion app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (app *DefaultApp) Run(debug bool, mods ...module.Module) error {
ApplicationDir, _ := filepath.Split(ApplicationPath)
defaultPath := fmt.Sprintf("%sconf/server.conf", ApplicationDir)
confPath := flag.String("conf", defaultPath, "Server configuration file path")
ProcessID := flag.String("pid", "development", "Server group?")
ProcessID := flag.String("pid", "development", "Server ProcessID?")
Logdir := flag.String("log", fmt.Sprintf("%slogs", ApplicationDir), "Log file directory?")
flag.Parse() //解析输入的参数

Expand Down
2 changes: 2 additions & 0 deletions gate/mqtt_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ func (a *agent) OnRecover(pack *mqtt.Pack) {
b, err := json.Marshal(r)
if err == nil {
a.WriteMsg(Topic, b)
}else{
log.Error(err.Error())
}
return
}
Expand Down
4 changes: 2 additions & 2 deletions module/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (mer *ModuleManager) Init(app App, ProcessID string) {
m.wg.Add(1)
go run(m)
}
timer.SetTimer(3000, mer.ReportStatistics, nil) //统计汇报定时任务
timer.SetTimer(3, mer.ReportStatistics, nil) //统计汇报定时任务
}

/**
Expand Down Expand Up @@ -234,6 +234,6 @@ func (mer *ModuleManager) ReportStatistics(args interface{}) {
default:
}
}
timer.SetTimer(3000, mer.ReportStatistics, nil)
timer.SetTimer(3, mer.ReportStatistics, nil)
}
}
4 changes: 2 additions & 2 deletions module/modules/timer/timingwheel.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func step() {
if r := recover(); r != nil {
buf := make([]byte, 1024)
l := runtime.Stack(buf, false)
log.Error("TimerError %v", buf[:l])
log.Error("TimerError %v", r,string(buf[:l]))
}
}()
doSomething(args)
Expand All @@ -105,7 +105,7 @@ func step() {
}

func Run(closeSig chan bool) {
tick := time.NewTicker(1 * time.Millisecond)
tick := time.NewTicker(1 * time.Second)
defer tick.Stop()
for {
select {
Expand Down
24 changes: 19 additions & 5 deletions rpc/amqp_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (c *AMQPClient) Call(callInfo CallInfo, callback chan ResultInfo) error {
if c.callinfos == nil {
return fmt.Errorf("AMQPClient is closed")
}

callInfo.ReplyTo=c.Consumer.callback_queue
var correlation_id = callInfo.Cid

clinetCallInfo := &ClinetCallInfo{
Expand Down Expand Up @@ -199,7 +199,7 @@ func (c *AMQPClient) on_timeout_handle(args interface{}) {

}
}
timer.SetTimer(1000, c.on_timeout_handle, nil)
timer.SetTimer(1, c.on_timeout_handle, nil)
}
}

Expand All @@ -220,15 +220,14 @@ func (c *AMQPClient) on_response_handle(deliveries <-chan amqp.Delivery, done ch
// d.Body,
//)
d.Ack(false)
var resultInfo ResultInfo
err := json.Unmarshal(d.Body, &resultInfo)
resultInfo,err := c.UnmarshalResult(d.Body)
if err != nil {
log.Error("Unmarshal faild", err)
} else {
correlation_id := resultInfo.Cid
clinetCallInfo := c.callinfos.Get(correlation_id)
if clinetCallInfo != nil {
clinetCallInfo.(ClinetCallInfo).call <- resultInfo
clinetCallInfo.(ClinetCallInfo).call <- *resultInfo
}
//删除
c.callinfos.Delete(correlation_id)
Expand All @@ -244,6 +243,20 @@ func (c *AMQPClient) on_response_handle(deliveries <-chan amqp.Delivery, done ch
}
}

func (c *AMQPClient) UnmarshalResult(data []byte) (*ResultInfo, error) {
//fmt.Println(msg)
//保存解码后的数据,Value可以为任意数据类型
var resultInfo ResultInfo
err := json.Unmarshal(data, &resultInfo)
if err != nil {
return nil, err
} else {
return &resultInfo, err
}

panic("bug")
}

func (c *AMQPClient) Unmarshal(data []byte) (*CallInfo, error) {
//fmt.Println(msg)
//保存解码后的数据,Value可以为任意数据类型
Expand All @@ -260,6 +273,7 @@ func (c *AMQPClient) Unmarshal(data []byte) (*CallInfo, error) {

// goroutine safe
func (c *AMQPClient) Marshal(callInfo *CallInfo) ([]byte, error) {
//map2:= structs.Map(callInfo)
b, err := json.Marshal(callInfo)
return b, err
}
12 changes: 9 additions & 3 deletions rpc/amqp_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (s *AMQPServer) Shutdown() error {
}

func (s *AMQPServer) Callback(callinfo CallInfo) error {
body, _ := json.Marshal(callinfo.Result)
body, _ := s.MarshalResult(callinfo.Result)
return s.response(callinfo.props, body)
}

Expand Down Expand Up @@ -150,12 +150,11 @@ func (s *AMQPServer) on_request_handle(deliveries <-chan amqp.Delivery, done cha
// d.DeliveryTag,
// d.Body,
//)

d.Ack(false)
callInfo, err := s.Unmarshal(d.Body)
if err == nil {
callInfo.props = map[string]interface{}{
"reply_to": d.Headers["reply_to"],
"reply_to": callInfo.ReplyTo,
}

callInfo.agent = s //设置代理为AMQPServer
Expand Down Expand Up @@ -195,3 +194,10 @@ func (s *AMQPServer) Marshal(callInfo *CallInfo) ([]byte, error) {
b, err := json.Marshal(callInfo)
return b, err
}
// goroutine safe
func (s *AMQPServer) MarshalResult(resultInfo ResultInfo) ([]byte, error) {
//log.Error("",map2)
b, err := json.Marshal(resultInfo)
return b, err
}

23 changes: 2 additions & 21 deletions rpc/local_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package mqrpc

import (
"encoding/json"
"fmt"
"github.com/liangdas/mqant/module/modules/timer"
"github.com/liangdas/mqant/utils"
Expand Down Expand Up @@ -103,7 +102,7 @@ func (c *LocalClient) CallNR(callInfo CallInfo) (err error) {
}
}()
//发送消息
c.local_server.local_chan <- callInfo
c.local_server.Write(callInfo)

return nil
}
Expand All @@ -130,7 +129,7 @@ func (c *LocalClient) on_timeout_handle(args interface{}) {

}
}
timer.SetTimer(1000, c.on_timeout_handle, nil)
timer.SetTimer(1, c.on_timeout_handle, nil)
}
}

Expand Down Expand Up @@ -161,22 +160,4 @@ func (c *LocalClient) on_response_handle(deliveries <-chan ResultInfo, done chan
}
}

func (c *LocalClient) Unmarshal(data []byte) (*CallInfo, error) {
//fmt.Println(msg)
//保存解码后的数据,Value可以为任意数据类型
var callInfo CallInfo
err := json.Unmarshal(data, &callInfo)
if err != nil {
return nil, err
} else {
return &callInfo, err
}

panic("bug")
}

// goroutine safe
func (c *LocalClient) Marshal(callInfo *CallInfo) ([]byte, error) {
b, err := json.Marshal(callInfo)
return b, err
}
98 changes: 95 additions & 3 deletions rpc/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (
"github.com/liangdas/mqant/log"
"github.com/liangdas/mqant/utils/uuid"
"time"
"fmt"
"reflect"
"encoding/base64"
)

type RPCClient struct {
Expand Down Expand Up @@ -66,10 +69,53 @@ func (c *RPCClient) Done() (err error) {
消息请求 需要回复
*/
func (c *RPCClient) Call(_func string, params ...interface{}) (interface{}, string) {
var ArgsType []string=nil
var args []interface{}=make([]interface{}, len(params))
if len(params) > 0 {
//prepare in paras
ArgsType = make([]string, len(params))
for k, param := range params {
switch v2:=param.(type) {
case string:
ArgsType[k] = STRING
args[k]=params[k]
case bool:
ArgsType[k] = BOOL
args[k]=params[k]
case int:
ArgsType[k] = INT
args[k]=params[k]
case int64:
ArgsType[k] = LONG
args[k]=params[k]
case float32:
ArgsType[k] = FLOAT
args[k]=params[k]
case float64:
ArgsType[k] = DOUBLE
args[k]=params[k]
case []byte:
//这里要把[]byte转base64
ArgsType[k] = BYTES
uEnc := base64.URLEncoding.EncodeToString(v2)
args[k]=uEnc
case map[string]interface{}:
ArgsType[k] = MAP
args[k]=params[k]
case map[string]string:
ArgsType[k] = MAP
args[k]=params[k]
default:
return nil, fmt.Sprintf( "args[%d] [%s] Types not allowed",k,reflect.TypeOf(param))
}

}
}
var correlation_id = uuid.Rand().Hex()
callInfo := &CallInfo{
Fn: _func,
Args: params,
Args: args,
ArgsType:ArgsType,
Reply: true, //客户端是否需要结果
Expired: (time.Now().UTC().Add(time.Second * time.Duration(conf.RpcExpired)).UnixNano()) / 1000000, //超时日期 unix 时间戳 单位/毫秒 要求服务端与客户端时间精准同步
Cid: correlation_id,
Expand Down Expand Up @@ -104,10 +150,52 @@ func (c *RPCClient) Call(_func string, params ...interface{}) (interface{}, stri
消息请求 不需要回复
*/
func (c *RPCClient) CallNR(_func string, params ...interface{}) (err error) {
var ArgsType []string=nil
var args []interface{}=make([]interface{}, len(params))
if len(params) > 0 {
//prepare in paras
ArgsType = make([]string, len(params))
for k, param := range params {
switch v2:=param.(type) {
case string:
ArgsType[k] = STRING
args[k]=params[k]
case bool:
ArgsType[k] = BOOL
args[k]=params[k]
case int:
ArgsType[k] = INT
args[k]=params[k]
case int64:
ArgsType[k] = LONG
args[k]=params[k]
case float32:
ArgsType[k] = FLOAT
args[k]=params[k]
case float64:
ArgsType[k] = DOUBLE
args[k]=params[k]
case []byte:
//这里要把[]byte转base64
ArgsType[k] = BYTES
uEnc := base64.URLEncoding.EncodeToString(v2)
args[k]=uEnc
case map[string]interface{}:
ArgsType[k] = MAP
args[k]=params[k]
case map[string]string:
ArgsType[k] = MAP
args[k]=params[k]
default:
return fmt.Errorf( "args[%d] [%s] Types not allowed",k,reflect.TypeOf(param))
}

}
}
var correlation_id = uuid.Rand().Hex()
callInfo := &CallInfo{
Fn: _func,
Args: params,
Args: args,
Reply: false, //客户端是否需要结果
Expired: (time.Now().UTC().Add(time.Second * time.Duration(conf.RpcExpired)).UnixNano()) / 1000000, //超时日期 unix 时间戳 单位/毫秒 要求服务端与客户端时间精准同步
Cid: correlation_id,
Expand All @@ -117,7 +205,11 @@ func (c *RPCClient) CallNR(_func string, params ...interface{}) (err error) {
if c.local_client != nil {
err = c.local_client.CallNR(*callInfo)
} else {
err = c.remote_client.CallNR(*callInfo)
if c.remote_client != nil {
err = c.remote_client.CallNR(*callInfo)
} else {
return fmt.Errorf("rpc service connection failed")
}
}

if err != nil {
Expand Down
Loading

0 comments on commit 16fec55

Please sign in to comment.