Skip to content

Commit

Permalink
=v1.6.5
Browse files Browse the repository at this point in the history
  • Loading branch information
liangdas committed Dec 5, 2017
1 parent ea86901 commit 469d0a8
Show file tree
Hide file tree
Showing 19 changed files with 1,717 additions and 355 deletions.
1 change: 1 addition & 0 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ func (app *DefaultApp) OnInit(settings conf.Config) error {

func (app *DefaultApp) OnDestroy() error {
for id, session := range app.serverList {
log.Info("RPCClient closeing type(%s) id(%s)", session.GetType(), id)
err := session.GetRpc().Done()
if err != nil {
log.Warning("RPCClient close fail type(%s) id(%s)", session.GetType(), id)
Expand Down
11 changes: 7 additions & 4 deletions conf/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ func LoadConfig(Path string) {
if err := readFileInto(Path); err != nil {
panic(err)
}

if Conf.Rpc.MaxCoroutine == 0 {
Conf.Rpc.MaxCoroutine = 100
}
}

type Config struct {
Expand All @@ -48,9 +50,10 @@ type Config struct {
}

type Rpc struct {
RpcExpired int //远程访问最后期限值 单位秒[默认5秒] 这个值指定了在客户端可以等待服务端多长时间来应答
LogSuccess bool //是否打印请求处理成功的日志
Log bool //是否打印RPC的日志
MaxCoroutine int //模块同时可以创建的最大协程数量默认是100
RpcExpired int //远程访问最后期限值 单位秒[默认5秒] 这个值指定了在客户端可以等待服务端多长时间来应答
LogSuccess bool //是否打印请求处理成功的日志
Log bool //是否打印RPC的日志
}

type Rabbitmq struct {
Expand Down
12 changes: 6 additions & 6 deletions gate/base/gate_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,14 @@ func (h *handler) Bind(Sessionid string, Userid string) (result gate.Session, er
data, err := h.gate.GetStorageHandler().Query(Userid)
if err == nil && data != nil {
//有已持久化的数据,可能是上一次连接保存的
impSession,err:=h.gate.NewSession(data)
if err==nil{
impSession, err := h.gate.NewSession(data)
if err == nil {
if agent.(gate.Agent).GetSession().GetSettings() == nil {
agent.(gate.Agent).GetSession().SetSettings(impSession.GetSettings())
} else {
//合并两个map 并且以 agent.(Agent).GetSession().Settings 已有的优先
settings:=impSession.GetSettings()
if settings!=nil{
settings := impSession.GetSettings()
if settings != nil {
for k, v := range settings {
if _, ok := agent.(gate.Agent).GetSession().GetSettings()[k]; ok {
//不用替换
Expand All @@ -110,9 +110,9 @@ func (h *handler) Bind(Sessionid string, Userid string) (result gate.Session, er
//数据持久化
h.gate.GetStorageHandler().Storage(Userid, agent.(gate.Agent).GetSession())
}
}else{
} else {
//解析持久化数据失败
log.Error("Sesssion Resolve fail",err.Error())
log.Error("Sesssion Resolve fail %s", err.Error())
}
}
}
Expand Down
29 changes: 15 additions & 14 deletions gate/base/mqtt_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ import (
"github.com/liangdas/mqant/gate"
"github.com/liangdas/mqant/gate/base/mqtt"
"github.com/liangdas/mqant/log"
"github.com/liangdas/mqant/module"
"github.com/liangdas/mqant/network"
"github.com/liangdas/mqant/rpc/util"
"github.com/liangdas/mqant/utils/uuid"
"math/rand"
"runtime"
"strings"
"time"
"github.com/liangdas/mqant/module"
)

type resultInfo struct {
Expand All @@ -39,7 +39,7 @@ type resultInfo struct {

type agent struct {
gate.Agent
module module.RPCModule
module module.RPCModule
session gate.Session
conn network.Conn
r *bufio.Reader
Expand All @@ -51,20 +51,21 @@ type agent struct {
rev_num int64
send_num int64
}
func NewMqttAgent(module module.RPCModule)*agent{

func NewMqttAgent(module module.RPCModule) *agent {
a := &agent{
module:module,
module: module,
}
return a
}
func (this *agent) OnInit(gate gate.Gate,conn network.Conn)error{
this.conn=conn
this.gate=gate
this.r=bufio.NewReaderSize(conn,256)
this.w=bufio.NewWriterSize(conn,256)
this.isclose=false
this.rev_num=0
this.send_num=0
func (this *agent) OnInit(gate gate.Gate, conn network.Conn) error {
this.conn = conn
this.gate = gate
this.r = bufio.NewReaderSize(conn, 256)
this.w = bufio.NewWriterSize(conn, 256)
this.isclose = false
this.rev_num = 0
this.send_num = 0
return nil
}
func (a *agent) IsClosed() bool {
Expand Down Expand Up @@ -177,7 +178,7 @@ func (a *agent) OnRecover(pack *mqtt.Pack) {
topics := strings.Split(*pub.GetTopic(), "/")
var msgid string
if len(topics) < 2 {
errorstr:="Topic must be [moduleType@moduleID]/[handler]|[moduleType@moduleID]/[handler]/[msgid]"
errorstr := "Topic must be [moduleType@moduleID]/[handler]|[moduleType@moduleID]/[handler]/[msgid]"
log.Error(errorstr)
toResult(a, *pub.GetTopic(), nil, errorstr)
return
Expand Down Expand Up @@ -208,7 +209,7 @@ func (a *agent) OnRecover(pack *mqtt.Pack) {
} else {
hash = a.module.GetServerId()
}
if (a.gate.GetTracingHandler() != nil) && a.gate.GetTracingHandler().OnRequestTracing(a.session,*pub.GetTopic(),pub.GetMsg()) {
if (a.gate.GetTracingHandler() != nil) && a.gate.GetTracingHandler().OnRequestTracing(a.session, *pub.GetTopic(), pub.GetMsg()) {
a.session.CreateRootSpan("gate")
}

Expand Down
36 changes: 18 additions & 18 deletions gate/base/mqtt_gate.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ type Gate struct {
storage gate.StorageHandler
tracing gate.TracingHandler

createAgent func () gate.Agent
createAgent func() gate.Agent
}

func (this *Gate) defaultCreateAgentd() gate.Agent{
func (this *Gate) defaultCreateAgentd() gate.Agent {
a := NewMqttAgent(this.GetModule())
return a
}
Expand Down Expand Up @@ -87,7 +87,7 @@ func (this *Gate) SetTracingHandler(tracing gate.TracingHandler) error {
/**
设置创建客户端Agent的函数
*/
func (this *Gate) SetCreateAgent(cfunc func () gate.Agent) error {
func (this *Gate) SetCreateAgent(cfunc func() gate.Agent) error {
this.createAgent = cfunc
return nil
}
Expand All @@ -111,15 +111,15 @@ func (this *Gate) GetTracingHandler() gate.TracingHandler {
return this.tracing
}

func (this *Gate) GetModule() module.RPCModule{
func (this *Gate) GetModule() module.RPCModule {
return this.GetSubclass()
}

func (this *Gate) NewSession(data []byte) (gate.Session, error){
return NewSession(this.App,data)
func (this *Gate) NewSession(data []byte) (gate.Session, error) {
return NewSession(this.App, data)
}
func (this *Gate) NewSessionByMap(data map[string]interface{}) (gate.Session, error){
return NewSessionByMap(this.App,data)
func (this *Gate) NewSessionByMap(data map[string]interface{}) (gate.Session, error) {
return NewSessionByMap(this.App, data)
}

func (this *Gate) OnConfChanged(settings *conf.ModuleSettings) {
Expand Down Expand Up @@ -223,12 +223,12 @@ func (this *Gate) Run(closeSig chan bool) {
wsServer.Tls = this.Tls
wsServer.CertFile = this.CertFile
wsServer.KeyFile = this.KeyFile
wsServer.NewAgent =func(conn *network.WSConn) network.Agent{
if this.createAgent==nil{
this.createAgent=this.defaultCreateAgentd
wsServer.NewAgent = func(conn *network.WSConn) network.Agent {
if this.createAgent == nil {
this.createAgent = this.defaultCreateAgentd
}
agent:= this.createAgent()
agent.OnInit(this,conn)
agent := this.createAgent()
agent.OnInit(this, conn)
return agent
}
}
Expand All @@ -241,12 +241,12 @@ func (this *Gate) Run(closeSig chan bool) {
tcpServer.Tls = this.Tls
tcpServer.CertFile = this.CertFile
tcpServer.KeyFile = this.KeyFile
tcpServer.NewAgent =func(conn *network.TCPConn) network.Agent{
if this.createAgent==nil{
this.createAgent=this.defaultCreateAgentd
tcpServer.NewAgent = func(conn *network.TCPConn) network.Agent {
if this.createAgent == nil {
this.createAgent = this.defaultCreateAgentd
}
agent:= this.createAgent()
agent.OnInit(this,conn)
agent := this.createAgent()
agent.OnInit(this, conn)
return agent
}
}
Expand Down
5 changes: 3 additions & 2 deletions gate/base/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,13 +421,14 @@ func (this *sessionagent) TracCarrier() map[string]string {
return this.session.Carrier
}
func (this *sessionagent) TracId() string {
if this.TracCarrier()!=nil{
if tid,ok:=this.TracCarrier()["ot-tracer-traceid"];ok{
if this.TracCarrier() != nil {
if tid, ok := this.TracCarrier()["ot-tracer-traceid"]; ok {
return tid
}
}
return ""
}

/**
从Session的 Span继承一个新的Span
*/
Expand Down
10 changes: 5 additions & 5 deletions gate/define.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
package gate

import (
opentracing "github.com/opentracing/opentracing-go"
"github.com/liangdas/mqant/network"
opentracing "github.com/opentracing/opentracing-go"
)

/**
Expand Down Expand Up @@ -106,7 +106,7 @@ type StorageHandler interface {
获取用户Session信息
Bind Userid时会调用Query获取最新信息
*/
Query(Userid string) ( data []byte, err error)
Query(Userid string) (data []byte, err error)
/**
用户心跳,一般用户在线时1s发送一次
可以用来延长Session信息过期时间
Expand All @@ -118,7 +118,7 @@ type TracingHandler interface {
/**
是否需要对本次客户端请求进行跟踪
*/
OnRequestTracing(session Session,topic string,msg []byte) bool
OnRequestTracing(session Session, topic string, msg []byte) bool
}

type AgentLearner interface {
Expand All @@ -132,7 +132,7 @@ type SessionLearner interface {
}

type Agent interface {
OnInit(gate Gate,conn network.Conn)error
OnInit(gate Gate, conn network.Conn) error
WriteMsg(topic string, body []byte) error
Close()
Run() (err error)
Expand All @@ -153,4 +153,4 @@ type Gate interface {
GetTracingHandler() TracingHandler
NewSession(data []byte) (Session, error)
NewSessionByMap(data map[string]interface{}) (Session, error)
}
}
1 change: 1 addition & 0 deletions module/base/rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func (s *rpcserver) OnInit(module module.Module, app module.App, settings *conf.
}
func (s *rpcserver) OnDestroy() {
if s.server != nil {
log.Info("RPCServer closeing id(%s)", s.settings.Id)
err := s.server.Done()
if err != nil {
log.Warning("RPCServer close fail id(%s) error(%s)", s.settings.Id, err)
Expand Down
Loading

0 comments on commit 469d0a8

Please sign in to comment.