diff --git a/README.md b/README.md index 5a50a33..592e93b 100644 --- a/README.md +++ b/README.md @@ -24,6 +24,10 @@ QQ交流群 :463735103 go get github.com/gorilla/websocket go get github.com/streadway/amqp go get github.com/golang/protobuf + go get github.com/golang/net/context + go get github.com/gogo/protobuf + go get github.com/opentracing/basictracer-go + go get github.com/opentracing/opentracing-go # 文档 @@ -59,13 +63,19 @@ QQ交流群 :463735103 [框架架构](https://github.com/liangdas/mqant/wiki/mqant%E6%A1%86%E6%9E%B6%E6%A6%82%E8%BF%B0) + + ## 下一步计划 1. 分布式架构管理模块(Master) + 1. 模块发现 + 2. 模块管理 + 1. 模块动态添加删除 + 2. 模块状态监控 2. 异常日志监控和汇报 1. 异常日志分类汇总 2. 定时将异常日志发送到Email 3. 定时将异常日志通过webhook发送到团队协作工具中(钉钉,worktile等) -3. rpc添加track分布式跟踪系统的接口[Appdash,用Go实现的分布式系统跟踪神器](http://tonybai.com/2015/06/17/appdash-distributed-systems-tracing-in-go/) +3. 【已完成】rpc添加track分布式跟踪系统的接口[Appdash,用Go实现的分布式系统跟踪神器](http://tonybai.com/2015/06/17/appdash-distributed-systems-tracing-in-go/) ## 贡献者 @@ -82,6 +92,8 @@ bug请直接通过issue提交 ## 版本日志 +### [v1.4.0新特性](https://github.com/liangdas/mqant/wiki/v1.4.0) + ### [v1.3.0新特性](https://github.com/liangdas/mqant/wiki/v1.3.0) ### [v1.2.0新特性](https://github.com/liangdas/mqant/wiki/v1.2.0) diff --git a/app/app.go b/app/app.go index 1412889..da26ad5 100644 --- a/app/app.go +++ b/app/app.go @@ -30,6 +30,7 @@ import ( "strings" "github.com/liangdas/mqant/rpc/base" "github.com/liangdas/mqant/module/modules" + opentracing "github.com/opentracing/opentracing-go" ) @@ -59,6 +60,7 @@ type DefaultApp struct { routes map[string]func(app module.App, Type string, hash string) module.ServerSession defaultRoutes func(app module.App, Type string, hash string) module.ServerSession rpcserializes map[string]module.RPCSerialize + getTracer func ()opentracing.Tracer } func (app *DefaultApp) Run(debug bool, mods ...module.Module) error { @@ -258,3 +260,13 @@ func (app *DefaultApp) RpcInvokeNRArgs(module module.RPCModule, moduleType strin } return server.CallNRArgs(_func, ArgsType,args) } +func (app *DefaultApp)DefaultTracer(_func func ()opentracing.Tracer) error{ + app.getTracer=_func + return nil +} +func (app *DefaultApp)GetTracer()opentracing.Tracer{ + if app.getTracer!=nil{ + return app.getTracer() + } + return nil +} \ No newline at end of file diff --git a/conf/config.go b/conf/config.go index fbf7c86..5ed8ff7 100755 --- a/conf/config.go +++ b/conf/config.go @@ -26,10 +26,7 @@ import ( var ( LenStackBuf = 1024 - LogLevel = "debug" - LogPath = "" - LogFlag = 0 - RpcExpired = 5 //远程访问最后期限值 单位秒[默认5秒] 这个值指定了在客户端可以等待服务端多长时间来应答 + Conf = Config{} ) @@ -42,9 +39,20 @@ func LoadConfig(Path string) { } type Config struct { + Rpc Rpc Module map[string][]*ModuleSettings Mqtt Mqtt Master Master + Tracing Tracing +} + +type Tracing struct { + Addr string //collector server eg 127.0.0.1:7701 + Enable bool +} + +type Rpc struct { + RpcExpired int //远程访问最后期限值 单位秒[默认5秒] 这个值指定了在客户端可以等待服务端多长时间来应答 } type Rabbitmq struct { diff --git a/gate/gate_handler.go b/gate/base/gate_handler.go similarity index 61% rename from gate/gate_handler.go rename to gate/base/gate_handler.go index ce5700d..42b0a30 100644 --- a/gate/gate_handler.go +++ b/gate/base/gate_handler.go @@ -11,16 +11,17 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -package gate +package basegate import ( + "github.com/liangdas/mqant/gate" "github.com/liangdas/mqant/log" "github.com/liangdas/mqant/utils" ) type handler struct { - AgentLearner - GateHandler + gate.AgentLearner + gate.GateHandler gate *Gate sessions *utils.BeeMap //连接列表 } @@ -34,97 +35,104 @@ func NewGateHandler(gate *Gate) *handler { } //当连接建立 并且MQTT协议握手成功 -func (h *handler) Connect(a Agent) { +func (h *handler) Connect(a gate.Agent) { if a.GetSession() != nil { h.sessions.Set(a.GetSession().GetSessionid(), a) } } //当连接关闭 或者客户端主动发送MQTT DisConnect命令 -func (h *handler) DisConnect(a Agent) { +func (h *handler) DisConnect(a gate.Agent) { if a.GetSession() != nil { h.sessions.Delete(a.GetSession().GetSessionid()) } } +func (h *handler)OnDestroy(){ + for _,v:=range h.sessions.Items(){ + v.(gate.Agent).Close() + } + h.sessions.DeleteAll() +} + /** *更新整个Session 通常是其他模块拉取最新数据 */ -func (h *handler) Update(Sessionid string) (result Session, err string) { +func (h *handler) Update(Sessionid string) (result gate.Session, err string) { agent := h.sessions.Get(Sessionid) if agent == nil { err = "No Sesssion found" return } - result = agent.(Agent).GetSession() + result = agent.(gate.Agent).GetSession() return } /** *Bind the session with the the Userid. */ -func (h *handler) Bind(Sessionid string, Userid string) (result Session, err string) { +func (h *handler) Bind(Sessionid string, Userid string) (result gate.Session, err string) { agent := h.sessions.Get(Sessionid) if agent == nil { err = "No Sesssion found" return } - agent.(Agent).GetSession().SetUserid(Userid) + agent.(gate.Agent).GetSession().SetUserid(Userid) - if h.gate.storage != nil && agent.(Agent).GetSession().GetUserid() != "" { + if h.gate.storage != nil && agent.(gate.Agent).GetSession().GetUserid() != "" { //可以持久化 settings, err := h.gate.storage.Query(Userid) if err == nil && settings != nil { //有已持久化的数据,可能是上一次连接保存的 - if agent.(Agent).GetSession().GetSettings() == nil { - agent.(Agent).GetSession().SetSettings(settings) + if agent.(gate.Agent).GetSession().GetSettings() == nil { + agent.(gate.Agent).GetSession().SetSettings(settings) } else { //合并两个map 并且以 agent.(Agent).GetSession().Settings 已有的优先 for k, v := range settings { - if _, ok := agent.(Agent).GetSession().GetSettings()[k]; ok { + if _, ok := agent.(gate.Agent).GetSession().GetSettings()[k]; ok { //不用替换 } else { - agent.(Agent).GetSession().GetSettings()[k] = v + agent.(gate.Agent).GetSession().GetSettings()[k] = v } } //数据持久化 - h.gate.storage.Storage(Userid, agent.(Agent).GetSession().GetSettings()) + h.gate.storage.Storage(Userid, agent.(gate.Agent).GetSession().GetSettings()) } } } - result = agent.(Agent).GetSession() + result = agent.(gate.Agent).GetSession() return } /** *UnBind the session with the the Userid. */ -func (h *handler) UnBind(Sessionid string) (result Session, err string) { +func (h *handler) UnBind(Sessionid string) (result gate.Session, err string) { agent := h.sessions.Get(Sessionid) if agent == nil { err = "No Sesssion found" return } - agent.(Agent).GetSession().SetUserid("") - result = agent.(Agent).GetSession() + agent.(gate.Agent).GetSession().SetUserid("") + result = agent.(gate.Agent).GetSession() return } /** *Push the session with the the Userid. */ -func (h *handler) Push(Sessionid string, Settings map[string]string) (result Session, err string) { +func (h *handler) Push(Sessionid string, Settings map[string]string) (result gate.Session, err string) { agent := h.sessions.Get(Sessionid) if agent == nil { err = "No Sesssion found" return } - agent.(Agent).GetSession().SetSettings(Settings) - result = agent.(Agent).GetSession() - if h.gate.storage != nil && agent.(Agent).GetSession().GetUserid() != "" { - err := h.gate.storage.Storage(agent.(Agent).GetSession().GetUserid(), agent.(Agent).GetSession().GetSettings()) + agent.(gate.Agent).GetSession().SetSettings(Settings) + result = agent.(gate.Agent).GetSession() + if h.gate.storage != nil && agent.(gate.Agent).GetSession().GetUserid() != "" { + err := h.gate.storage.Storage(agent.(gate.Agent).GetSession().GetUserid(), agent.(gate.Agent).GetSession().GetSettings()) if err != nil { log.Error("gate session storage failure") } @@ -136,17 +144,17 @@ func (h *handler) Push(Sessionid string, Settings map[string]string) (result Ses /** *Set values (one or many) for the session. */ -func (h *handler) Set(Sessionid string, key string, value string) (result Session, err string) { +func (h *handler) Set(Sessionid string, key string, value string) (result gate.Session, err string) { agent := h.sessions.Get(Sessionid) if agent == nil { err = "No Sesssion found" return } - agent.(Agent).GetSession().GetSettings()[key] = value - result = agent.(Agent).GetSession() + agent.(gate.Agent).GetSession().GetSettings()[key] = value + result = agent.(gate.Agent).GetSession() - if h.gate.storage != nil && agent.(Agent).GetSession().GetUserid() != "" { - err := h.gate.storage.Storage(agent.(Agent).GetSession().GetUserid(), agent.(Agent).GetSession().GetSettings()) + if h.gate.storage != nil && agent.(gate.Agent).GetSession().GetUserid() != "" { + err := h.gate.storage.Storage(agent.(gate.Agent).GetSession().GetUserid(), agent.(gate.Agent).GetSession().GetSettings()) if err != nil { log.Error("gate session storage failure") } @@ -164,11 +172,11 @@ func (h *handler) Remove(Sessionid string, key string) (result interface{}, err err = "No Sesssion found" return } - delete(agent.(Agent).GetSession().GetSettings(), key) - result = agent.(Agent).GetSession() + delete(agent.(gate.Agent).GetSession().GetSettings(), key) + result = agent.(gate.Agent).GetSession() - if h.gate.storage != nil && agent.(Agent).GetSession().GetUserid() != "" { - err := h.gate.storage.Storage(agent.(Agent).GetSession().GetUserid(), agent.(Agent).GetSession().GetSettings()) + if h.gate.storage != nil && agent.(gate.Agent).GetSession().GetUserid() != "" { + err := h.gate.storage.Storage(agent.(gate.Agent).GetSession().GetUserid(), agent.(gate.Agent).GetSession().GetSettings()) if err != nil { log.Error("gate session storage failure") } @@ -186,7 +194,7 @@ func (h *handler) Send(Sessionid string, topic string, body []byte) (result inte err = "No Sesssion found" return } - e := agent.(Agent).WriteMsg(topic, body) + e := agent.(gate.Agent).WriteMsg(topic, body) if e != nil { err = e.Error() } else { @@ -204,6 +212,6 @@ func (h *handler) Close(Sessionid string) (result interface{}, err string) { err = "No Sesssion found" return } - agent.(Agent).Close() + agent.(gate.Agent).Close() return } diff --git a/gate/mqtt/mqtt.go b/gate/base/mqtt/mqtt.go similarity index 100% rename from gate/mqtt/mqtt.go rename to gate/base/mqtt/mqtt.go diff --git a/gate/mqtt/mqtt_client_server.go b/gate/base/mqtt/mqtt_client_server.go similarity index 100% rename from gate/mqtt/mqtt_client_server.go rename to gate/base/mqtt/mqtt_client_server.go diff --git a/gate/mqtt/mqtt_test.go b/gate/base/mqtt/mqtt_test.go similarity index 100% rename from gate/mqtt/mqtt_test.go rename to gate/base/mqtt/mqtt_test.go diff --git a/gate/mqtt/pack_queue.go b/gate/base/mqtt/pack_queue.go similarity index 100% rename from gate/mqtt/pack_queue.go rename to gate/base/mqtt/pack_queue.go diff --git a/gate/mqtt_agent.go b/gate/base/mqtt_agent.go similarity index 96% rename from gate/mqtt_agent.go rename to gate/base/mqtt_agent.go index a34b456..40f0342 100644 --- a/gate/mqtt_agent.go +++ b/gate/base/mqtt_agent.go @@ -11,14 +11,15 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -package gate +package basegate import ( "bufio" "encoding/json" "fmt" "github.com/liangdas/mqant/conf" - "github.com/liangdas/mqant/gate/mqtt" + "github.com/liangdas/mqant/gate" + "github.com/liangdas/mqant/gate/base/mqtt" "github.com/liangdas/mqant/log" "github.com/liangdas/mqant/network" "github.com/liangdas/mqant/utils/uuid" @@ -34,8 +35,8 @@ type resultInfo struct { } type agent struct { - Agent - session Session + gate.Agent + session gate.Session conn network.Conn r *bufio.Reader w *bufio.Writer @@ -51,7 +52,7 @@ func (a *agent) IsClosed() bool { return a.isclose } -func (a *agent) GetSession() Session { +func (a *agent) GetSession() gate.Session { return a.session } @@ -97,6 +98,9 @@ func (a *agent) Run() (err error) { log.Error("gate create agent fail",err.Error()) return } + + + a.gate.agentLearner.Connect(a) //发送连接成功的事件 //回复客户端 CONNECT @@ -185,7 +189,7 @@ func (a *agent) OnRecover(pack *mqtt.Pack) { } else { hash = a.gate.GetServerId() } - + a.session.CreateRootSpan("gate") serverSession, err := a.gate.GetRouteServers(topics[0], hash) if err != nil { if msgid != "" { diff --git a/gate/mqtt_gate.go b/gate/base/mqtt_gate.go similarity index 55% rename from gate/mqtt_gate.go rename to gate/base/mqtt_gate.go index c2c3d3d..e5fed62 100644 --- a/gate/mqtt_gate.go +++ b/gate/base/mqtt_gate.go @@ -11,10 +11,11 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -package gate +package basegate import ( "bufio" + "github.com/liangdas/mqant/gate" "github.com/liangdas/mqant/conf" "github.com/liangdas/mqant/network" "time" @@ -44,32 +45,32 @@ type Gate struct { CertFile string KeyFile string // - handler GateHandler - agentLearner AgentLearner - storage StorageHandler + handler gate.GateHandler + agentLearner gate.AgentLearner + storage gate.StorageHandler } /** 设置Session信息持久化接口 */ -func (gate *Gate) SetStorageHandler(storage StorageHandler) error { - gate.storage = storage +func (this *Gate) SetStorageHandler(storage gate.StorageHandler) error { + this.storage = storage return nil } -func (gate *Gate) GetStorageHandler() (storage StorageHandler) { - return gate.storage +func (this *Gate) GetStorageHandler() (storage gate.StorageHandler) { + return this.storage } -func (gate *Gate)OnConfChanged(settings *conf.ModuleSettings) { +func (this *Gate)OnConfChanged(settings *conf.ModuleSettings) { } /** 自定义rpc参数序列化反序列化 Session */ -func (gate *Gate)Serialize(param interface{})(ptype string,p []byte, err error){ +func (this *Gate)Serialize(param interface{})(ptype string,p []byte, err error){ switch v2:=param.(type) { - case Session: + case gate.Session: bytes,err:=v2.Serializable() if err != nil{ return RPC_PARAM_SESSION_TYPE,nil,err @@ -80,10 +81,10 @@ func (gate *Gate)Serialize(param interface{})(ptype string,p []byte, err error){ } } -func (gate *Gate)Deserialize(ptype string,b []byte)(param interface{},err error){ +func (this *Gate)Deserialize(ptype string,b []byte)(param interface{},err error){ switch ptype { case RPC_PARAM_SESSION_TYPE: - mps,errs:= NewSession(gate.App,b) + mps,errs:= NewSession(this.App,b) if errs!=nil{ return nil,errs } @@ -93,76 +94,76 @@ func (gate *Gate)Deserialize(ptype string,b []byte)(param interface{},err error) } } -func (gate *Gate)GetTypes()([]string){ +func (this *Gate)GetTypes()([]string){ return []string{RPC_PARAM_SESSION_TYPE} } -func (gate *Gate) OnInit(subclass module.RPCModule, app module.App, settings *conf.ModuleSettings) { - gate.BaseModule.OnInit(subclass, app, settings) //这是必须的 +func (this *Gate) OnInit(subclass module.RPCModule, app module.App, settings *conf.ModuleSettings) { + this.BaseModule.OnInit(subclass, app, settings) //这是必须的 //添加Session结构体的序列化操作类 - err:=app.AddRPCSerialize("gate",gate) + err:=app.AddRPCSerialize("gate",this) if err!=nil{ log.Warning("Adding session structures failed to serialize interfaces",err.Error()) } - gate.MaxConnNum = int(settings.Settings["MaxConnNum"].(float64)) - gate.MaxMsgLen = uint32(settings.Settings["MaxMsgLen"].(float64)) - gate.WSAddr = settings.Settings["WSAddr"].(string) - gate.HTTPTimeout = time.Second * time.Duration(settings.Settings["HTTPTimeout"].(float64)) - gate.TCPAddr = settings.Settings["TCPAddr"].(string) + this.MaxConnNum = int(settings.Settings["MaxConnNum"].(float64)) + this.MaxMsgLen = uint32(settings.Settings["MaxMsgLen"].(float64)) + this.WSAddr = settings.Settings["WSAddr"].(string) + this.HTTPTimeout = time.Second * time.Duration(settings.Settings["HTTPTimeout"].(float64)) + this.TCPAddr = settings.Settings["TCPAddr"].(string) if Tls, ok := settings.Settings["Tls"]; ok { - gate.Tls = Tls.(bool) + this.Tls = Tls.(bool) } else { - gate.Tls = false + this.Tls = false } if CertFile, ok := settings.Settings["CertFile"]; ok { - gate.CertFile = CertFile.(string) + this.CertFile = CertFile.(string) } else { - gate.CertFile = "" + this.CertFile = "" } if KeyFile, ok := settings.Settings["KeyFile"]; ok { - gate.KeyFile = KeyFile.(string) + this.KeyFile = KeyFile.(string) } else { - gate.KeyFile = "" + this.KeyFile = "" } if MinHBStorage, ok := settings.Settings["MinHBStorage"]; ok { - gate.MinStorageHeartbeat = int64(MinHBStorage.(float64)) + this.MinStorageHeartbeat = int64(MinHBStorage.(float64)) } else { - gate.MinStorageHeartbeat = 60 + this.MinStorageHeartbeat = 60 } - handler := NewGateHandler(gate) + handler := NewGateHandler(this) - gate.agentLearner = handler - gate.handler = handler + this.agentLearner = handler + this.handler = handler - gate.GetServer().Register("Update", gate.handler.Update) - gate.GetServer().Register("Bind", gate.handler.Bind) - gate.GetServer().Register("UnBind", gate.handler.UnBind) - gate.GetServer().Register("Push", gate.handler.Push) - gate.GetServer().Register("Set", gate.handler.Set) - gate.GetServer().Register("Remove", gate.handler.Remove) - gate.GetServer().Register("Send", gate.handler.Send) - gate.GetServer().Register("Close", gate.handler.Close) + this.GetServer().Register("Update", this.handler.Update) + this.GetServer().Register("Bind", this.handler.Bind) + this.GetServer().Register("UnBind", this.handler.UnBind) + this.GetServer().Register("Push", this.handler.Push) + this.GetServer().Register("Set", this.handler.Set) + this.GetServer().Register("Remove", this.handler.Remove) + this.GetServer().Register("Send", this.handler.Send) + this.GetServer().Register("Close", this.handler.Close) } -func (gate *Gate) Run(closeSig chan bool) { +func (this *Gate) Run(closeSig chan bool) { var wsServer *network.WSServer - if gate.WSAddr != "" { + if this.WSAddr != "" { wsServer = new(network.WSServer) - wsServer.Addr = gate.WSAddr - wsServer.MaxConnNum = gate.MaxConnNum - wsServer.MaxMsgLen = gate.MaxMsgLen - wsServer.HTTPTimeout = gate.HTTPTimeout - wsServer.Tls = gate.Tls - wsServer.CertFile = gate.CertFile - wsServer.KeyFile = gate.KeyFile + wsServer.Addr = this.WSAddr + wsServer.MaxConnNum = this.MaxConnNum + wsServer.MaxMsgLen = this.MaxMsgLen + wsServer.HTTPTimeout = this.HTTPTimeout + wsServer.Tls = this.Tls + wsServer.CertFile = this.CertFile + wsServer.KeyFile = this.KeyFile wsServer.NewAgent = func(conn *network.WSConn) network.Agent { a := &agent{ conn: conn, - gate: gate, + gate: this, r: bufio.NewReader(conn), w: bufio.NewWriter(conn), isclose: false, @@ -174,17 +175,17 @@ func (gate *Gate) Run(closeSig chan bool) { } var tcpServer *network.TCPServer - if gate.TCPAddr != "" { + if this.TCPAddr != "" { tcpServer = new(network.TCPServer) - tcpServer.Addr = gate.TCPAddr - tcpServer.MaxConnNum = gate.MaxConnNum - tcpServer.Tls = gate.Tls - tcpServer.CertFile = gate.CertFile - tcpServer.KeyFile = gate.KeyFile + tcpServer.Addr = this.TCPAddr + tcpServer.MaxConnNum = this.MaxConnNum + tcpServer.Tls = this.Tls + tcpServer.CertFile = this.CertFile + tcpServer.KeyFile = this.KeyFile tcpServer.NewAgent = func(conn *network.TCPConn) network.Agent { a := &agent{ conn: conn, - gate: gate, + gate: this, r: bufio.NewReader(conn), w: bufio.NewWriter(conn), isclose: false, @@ -202,6 +203,9 @@ func (gate *Gate) Run(closeSig chan bool) { tcpServer.Start() } <-closeSig + if this.handler!=nil{ + this.handler.OnDestroy() + } if wsServer != nil { wsServer.Close() } @@ -210,6 +214,6 @@ func (gate *Gate) Run(closeSig chan bool) { } } -func (gate *Gate) OnDestroy() { - gate.BaseModule.OnDestroy() //这是必须的 +func (this *Gate) OnDestroy() { + this.BaseModule.OnDestroy() //这是必须的 } diff --git a/gate/base/session.go b/gate/base/session.go new file mode 100644 index 0000000..c1593b8 --- /dev/null +++ b/gate/base/session.go @@ -0,0 +1,445 @@ +// Copyright 2014 mqant Author. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package basegate + +import ( + "fmt" + "github.com/golang/protobuf/proto" + "github.com/liangdas/mqant/module" + "github.com/liangdas/mqant/gate" + opentracing "github.com/opentracing/opentracing-go" + "github.com/liangdas/mqant/log" +) + +type sessionagent struct { + app module.App + session *session + span opentracing.Span +} + +func NewSession(app module.App, data []byte) (gate.Session,error) { + agent:=&sessionagent{ + app:app, + } + se := &session{} + err := proto.Unmarshal(data, se) + if err != nil { + return nil,err + } // 测试结果 + agent.session=se + return agent,nil +} + +func NewSessionByMap(app module.App, data map[string]interface{}) (gate.Session,error) { + agent:=&sessionagent{ + app:app, + session:new(session), + } + err:=agent.updateMap(data) + if err!=nil{ + return nil,err + } + return agent,nil +} + +func (this *sessionagent) GetIP() string { + return this.session.GetIP() +} + +func (this *sessionagent) GetNetwork() string { + return this.session.GetNetwork() +} + +func (this *sessionagent) GetUserid() string { + return this.session.GetUserid() +} + +func (this *sessionagent) GetSessionid() string { + return this.session.GetSessionid() +} + +func (this *sessionagent) GetServerid() string { + return this.session.GetServerid() +} + +func (this *sessionagent) GetSettings() map[string]string { + return this.session.GetSettings() +} + + +func (this *sessionagent)SetIP(ip string){ + this.session.IP=ip +} +func (this *sessionagent)SetNetwork(network string){ + this.session.Network=network +} +func (this *sessionagent)SetUserid(userid string){ + this.session.Userid=userid +} +func (this *sessionagent)SetSessionid(sessionid string){ + this.session.Sessionid=sessionid +} +func (this *sessionagent)SetServerid(serverid string){ + this.session.Serverid=serverid +} +func (this *sessionagent)SetSettings(settings map[string]string){ + this.session.Settings=settings +} + +func (this *sessionagent) updateMap(s map[string]interface{})error { + Userid := s["Userid"] + if Userid != nil { + this.session.Userid = Userid.(string) + } + IP := s["IP"] + if IP != nil { + this.session.IP = IP.(string) + } + Network := s["Network"] + if Network != nil { + this.session.Network = Network.(string) + } + Sessionid := s["Sessionid"] + if Sessionid != nil { + this.session.Sessionid = Sessionid.(string) + } + Serverid := s["Serverid"] + if Serverid != nil { + this.session.Serverid = Serverid.(string) + } + Settings := s["Settings"] + if Settings != nil { + this.session.Settings = Settings.(map[string]string) + } + return nil +} + +func (this *sessionagent) update(s gate.Session)error { + Userid := s.GetUserid() + this.session.Userid = Userid + IP := s.GetIP() + this.session.IP = IP + Network := s.GetNetwork() + this.session.Network = Network + Sessionid := s.GetSessionid() + this.session.Sessionid = Sessionid + Serverid := s.GetServerid() + this.session.Serverid = Serverid + Settings := s.GetSettings() + this.session.Settings = Settings + return nil +} + +func (this *sessionagent)Serializable()([]byte,error){ + data, err := proto.Marshal(this.session) + if err != nil { + return nil,err + } // 进行解码 + return data,nil +} + + +func (this *sessionagent) Update() (err string) { + if this.app == nil { + err = fmt.Sprintf("Module.App is nil") + return + } + server, e := this.app.GetServersById(this.session.Serverid) + if e != nil { + err = fmt.Sprintf("Service not found id(%s)", this.session.Serverid) + return + } + result, err := server.Call("Update", this.session.Sessionid) + if err == "" { + if result != nil { + //绑定成功,重新更新当前Session + this.update(result.(gate.Session)) + } + } + return +} + +func (this *sessionagent) Bind(Userid string) (err string) { + if this.app == nil { + err = fmt.Sprintf("Module.App is nil") + return + } + server, e := this.app.GetServersById(this.session.Serverid) + if e != nil { + err = fmt.Sprintf("Service not found id(%s)", this.session.Serverid) + return + } + result, err := server.Call("Bind", this.session.Sessionid, Userid) + if err == "" { + if result != nil { + //绑定成功,重新更新当前Session + this.update(result.(gate.Session)) + } + } + return +} + +func (this *sessionagent) UnBind() (err string) { + if this.app == nil { + err = fmt.Sprintf("Module.App is nil") + return + } + server, e := this.app.GetServersById(this.session.Serverid) + if e != nil { + err = fmt.Sprintf("Service not found id(%s)", this.session.Serverid) + return + } + result, err := server.Call("UnBind", this.session.Sessionid) + if err == "" { + if result != nil { + //绑定成功,重新更新当前Session + this.update(result.(gate.Session)) + } + } + return +} + +func (this *sessionagent) Push() (err string) { + if this.app == nil { + err = fmt.Sprintf("Module.App is nil") + return + } + server, e := this.app.GetServersById(this.session.Serverid) + if e != nil { + err = fmt.Sprintf("Service not found id(%s)", this.session.Serverid) + return + } + result, err := server.Call("Push", this.session.Sessionid, this.session.Settings) + if err == "" { + if result != nil { + //绑定成功,重新更新当前Session + this.update(result.(gate.Session)) + } + } + return +} + +func (this *sessionagent) Set(key string, value string) (err string) { + if this.app == nil { + err = fmt.Sprintf("Module.App is nil") + return + } + if this.session.Settings == nil { + this.session.Settings=map[string]string{} + } + this.session.Settings[key] = value + //server,e:=session.app.GetServersById(session.Serverid) + //if e!=nil{ + // err=fmt.Sprintf("Service not found id(%s)",session.Serverid) + // return + //} + //result,err:=server.Call("Set",session.Sessionid,key,value) + //if err==""{ + // if result!=nil{ + // //绑定成功,重新更新当前Session + // session.update(result.(map[string]interface {})) + // } + //} + return +} + +func (this *sessionagent) Get(key string) (result string) { + if this.session.Settings == nil { + return + } + result = this.session.Settings[key] + return +} + +func (this *sessionagent) Remove(key string) (err string) { + if this.app == nil { + err = fmt.Sprintf("Module.App is nil") + return + } + if this.session.Settings == nil { + this.session.Settings=map[string]string{} + } + delete(this.session.Settings, key) + //server,e:=session.app.GetServersById(session.Serverid) + //if e!=nil{ + // err=fmt.Sprintf("Service not found id(%s)",session.Serverid) + // return + //} + //result,err:=server.Call("Remove",session.Sessionid,key) + //if err==""{ + // if result!=nil{ + // //绑定成功,重新更新当前Session + // session.update(result.(map[string]interface {})) + // } + //} + return +} +func (this *sessionagent) Send(topic string, body []byte) (string) { + if this.app == nil { + return fmt.Sprintf("Module.App is nil") + } + server, e := this.app.GetServersById(this.session.Serverid) + if e != nil { + return fmt.Sprintf("Service not found id(%s)", this.session.Serverid) + } + _, err := server.Call("Send", this.session.Sessionid, topic, body) + //span:=this.ExtractSpan(topic) + //if span!=nil{ + // span.LogEventWithPayload("SendToClient",map[string]string{ + // "topic":topic, + // "err":err, + // }) + // span.Finish() + //} + return err +} + +func (this *sessionagent) SendNR(topic string, body []byte) (string) { + if this.app == nil { + return fmt.Sprintf("Module.App is nil") + } + server, e := this.app.GetServersById(this.session.Serverid) + if e != nil { + return fmt.Sprintf("Service not found id(%s)", this.session.Serverid) + } + e = server.CallNR("Send", this.session.Sessionid, topic, body) + if e != nil { + return e.Error() + } + //span:=this.ExtractSpan(topic) + //if span!=nil{ + // span.LogEventWithPayload("SendToClient",map[string]string{ + // "topic":topic, + // }) + // span.Finish() + //} + return "" +} + +func (this *sessionagent) Close() (err string) { + if this.app == nil { + err = fmt.Sprintf("Module.App is nil") + return + } + server, e := this.app.GetServersById(this.session.Serverid) + if e != nil { + err = fmt.Sprintf("Service not found id(%s)", this.session.Serverid) + return + } + _, err = server.Call("Close", this.session.Sessionid) + return +} + +/** +每次rpc调用都拷贝一份新的Session进行传输 + */ +func (this *sessionagent) Clone()gate.Session{ + agent:=&sessionagent{ + app:this.app, + span:this.Span(), + } + se := &session{ + IP :this.session.IP, + Network :this.session.Network, + Userid :this.session.Userid, + Sessionid :this.session.Sessionid, + Serverid :this.session.Serverid, + Settings :this.session.Settings, + } + //这个要换成本次RPC调用的新Span + se.Carrier=this.inject() + + agent.session=se + return agent +} + +func (this *sessionagent)inject()map[string]string{ + if this.app.GetTracer()==nil{ + return nil + } + if this.Span()==nil{ + return nil + } + carrier := &opentracing.TextMapCarrier{} + err := this.app.GetTracer().Inject( + this.Span().Context(), + opentracing.TextMap, + carrier) + if err!=nil{ + log.Warning("session.session.Carrier Inject Fail",err.Error()) + return nil + }else{ + m:=map[string]string{} + carrier.ForeachKey(func(key, val string) error{ + m[key]=val + return nil + }) + return m + } +} +func (this *sessionagent)extract(gCarrier map[string]string)(opentracing.SpanContext, error){ + carrier := &opentracing.TextMapCarrier{} + for v,k:=range gCarrier{ + carrier.Set(v,k) + } + return this.app.GetTracer().Extract(opentracing.TextMap, carrier) +} +func (this *sessionagent)LoadSpan(operationName string)opentracing.Span{ + if this.app.GetTracer()==nil{ + return nil + } + if this.span==nil{ + if this.session.Carrier!=nil{ + //从已有记录恢复 + clientContext, err := this.extract(this.session.Carrier) + if err == nil { + this.span = this.app.GetTracer().StartSpan( + operationName, opentracing.ChildOf(clientContext)) + } else { + log.Warning("session.session.Carrier Extract Fail",err.Error()) + } + } + } + return this.span +} +func (this *sessionagent)CreateRootSpan(operationName string)opentracing.Span{ + if this.app.GetTracer()==nil{ + return nil + } + this.span = this.app.GetTracer().StartSpan(operationName) + this.session.Carrier=this.inject() + return this.span +} +func (this *sessionagent)Span()opentracing.Span{ + return this.span +} + +func (this *sessionagent)TracCarrier()map[string]string{ + return this.session.Carrier +} +/** +从Session的 Span继承一个新的Span + */ +func (this *sessionagent)ExtractSpan(operationName string)opentracing.Span{ + if this.app.GetTracer()==nil{ + return nil + } + if this.Span()!=nil{ + span := this.app.GetTracer().StartSpan(operationName,opentracing.ChildOf(this.Span().Context())) + return span + } + return nil +} + + diff --git a/gate/session.pb.go b/gate/base/session.pb.go similarity index 78% rename from gate/session.pb.go rename to gate/base/session.pb.go index f1b0dae..ada14ad 100644 --- a/gate/session.pb.go +++ b/gate/base/session.pb.go @@ -10,7 +10,7 @@ It is generated from these files: It has these top-level messages: Session */ -package gate +package basegate import proto "github.com/golang/protobuf/proto" import fmt "fmt" @@ -36,6 +36,7 @@ type session struct { Sessionid string `protobuf:"bytes,4,opt,name=Sessionid" json:"Sessionid,omitempty"` Serverid string `protobuf:"bytes,5,opt,name=Serverid" json:"Serverid,omitempty"` Settings map[string]string `protobuf:"bytes,6,rep,name=Settings" json:"Settings,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + Carrier map[string]string `protobuf:"bytes,7,rep,name=Carrier" json:"Carrier,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` } func (m *session) Reset() { *m = session{} } @@ -85,6 +86,13 @@ func (m *session) GetSettings() map[string]string { return nil } +func (m *session) GetCarrier() map[string]string { + if m != nil { + return m.Carrier + } + return nil +} + func init() { proto.RegisterType((*session)(nil), "gate.Session") } @@ -92,18 +100,20 @@ func init() { func init() { proto.RegisterFile("session/session.proto", fileDescriptor0) } var fileDescriptor0 = []byte{ - // 207 bytes of a gzipped FileDescriptorProto + // 239 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x2d, 0x4e, 0x2d, 0x2e, 0xce, 0xcc, 0xcf, 0xd3, 0x87, 0xd2, 0x7a, 0x05, 0x45, 0xf9, 0x25, 0xf9, 0x42, 0x2c, 0xe9, 0x89, - 0x25, 0xa9, 0x4a, 0xbf, 0x19, 0xb9, 0xd8, 0x83, 0x21, 0xe2, 0x42, 0x7c, 0x5c, 0x4c, 0x9e, 0x01, + 0x25, 0xa9, 0x4a, 0x2f, 0x99, 0xb8, 0xd8, 0x83, 0x21, 0xe2, 0x42, 0x7c, 0x5c, 0x4c, 0x9e, 0x01, 0x12, 0x8c, 0x0a, 0x8c, 0x1a, 0x9c, 0x41, 0x4c, 0x9e, 0x01, 0x42, 0x12, 0x5c, 0xec, 0x7e, 0xa9, 0x25, 0xe5, 0xf9, 0x45, 0xd9, 0x12, 0x4c, 0x60, 0x41, 0x18, 0x57, 0x48, 0x8c, 0x8b, 0x2d, 0xb4, 0x38, 0xb5, 0x28, 0x33, 0x45, 0x82, 0x19, 0x2c, 0x01, 0xe5, 0x09, 0xc9, 0x70, 0x71, 0x42, 0x0d, 0xcb, 0x4c, 0x91, 0x60, 0x01, 0x4b, 0x21, 0x04, 0x84, 0xa4, 0xb8, 0x38, 0x82, 0x53, 0x8b, 0xca, 0xc0, 0xfa, 0x58, 0xc1, 0x92, 0x70, 0xbe, 0x90, 0x39, 0x48, 0xae, 0xa4, 0x24, 0x33, 0x2f, 0xbd, 0x58, 0x82, 0x4d, 0x81, 0x59, 0x83, 0xdb, 0x48, 0x5a, 0x0f, 0xe4, 0x40, 0x3d, 0xa8, 0x76, 0x3d, - 0x98, 0xac, 0x6b, 0x5e, 0x49, 0x51, 0x65, 0x10, 0x5c, 0xb1, 0x94, 0x35, 0x17, 0x2f, 0x8a, 0x94, - 0x90, 0x00, 0x17, 0x73, 0x76, 0x6a, 0x25, 0xd4, 0x1b, 0x20, 0xa6, 0x90, 0x08, 0x17, 0x6b, 0x59, - 0x62, 0x4e, 0x69, 0x2a, 0xd4, 0x17, 0x10, 0x8e, 0x15, 0x93, 0x05, 0x63, 0x12, 0x1b, 0x38, 0x28, - 0x8c, 0x01, 0x01, 0x00, 0x00, 0xff, 0xff, 0x8e, 0xe7, 0xbc, 0x38, 0x23, 0x01, 0x00, 0x00, + 0x98, 0xac, 0x6b, 0x5e, 0x49, 0x51, 0x65, 0x10, 0x5c, 0xb1, 0x90, 0x21, 0x17, 0x5b, 0x48, 0x51, + 0x62, 0x72, 0x6a, 0x91, 0x04, 0x3b, 0x58, 0x9b, 0x24, 0xaa, 0x36, 0x88, 0x1c, 0x44, 0x13, 0x54, + 0xa1, 0x94, 0x35, 0x17, 0x2f, 0x8a, 0x69, 0x42, 0x02, 0x5c, 0xcc, 0xd9, 0xa9, 0x95, 0x50, 0x9f, + 0x83, 0x98, 0x42, 0x22, 0x5c, 0xac, 0x65, 0x89, 0x39, 0xa5, 0xa9, 0x50, 0x8f, 0x43, 0x38, 0x56, + 0x4c, 0x16, 0x8c, 0x52, 0x96, 0x5c, 0xdc, 0x48, 0x66, 0x92, 0xa2, 0x35, 0x89, 0x0d, 0x1c, 0xf0, + 0xc6, 0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0x93, 0xd1, 0x71, 0xc8, 0x91, 0x01, 0x00, 0x00, } diff --git a/gate/session.proto b/gate/base/session.proto similarity index 85% rename from gate/session.proto rename to gate/base/session.proto index e8f30f2..ee59448 100644 --- a/gate/session.proto +++ b/gate/base/session.proto @@ -7,4 +7,5 @@ message session { string Sessionid = 4; string Serverid = 5; map Settings = 6; + map Tracer=7; } \ No newline at end of file diff --git a/gate/session_test.go b/gate/base/session_test.go similarity index 98% rename from gate/session_test.go rename to gate/base/session_test.go index 70cc51f..ff6fd8e 100644 --- a/gate/session_test.go +++ b/gate/base/session_test.go @@ -11,7 +11,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -package gate +package basegate import ( "github.com/golang/protobuf/proto" "testing" diff --git a/gate/define.go b/gate/define.go index 4c501d9..421a22d 100644 --- a/gate/define.go +++ b/gate/define.go @@ -13,6 +13,9 @@ // limitations under the License. package gate +import ( + opentracing "github.com/opentracing/opentracing-go" +) /** net代理服务 处理器 */ @@ -25,6 +28,7 @@ type GateHandler interface { Send(Sessionid string, topic string, body []byte) (result interface{}, err string) //Send message to the session. Close(Sessionid string) (result interface{}, err string) //主动关闭连接 Update(Sessionid string) (result Session, err string) //更新整个Session 通常是其他模块拉取最新数据 + OnDestroy() //退出事件,主动关闭所有的连接 } type Session interface { @@ -51,6 +55,27 @@ type Session interface { Send(topic string, body []byte) (err string) SendNR(topic string, body []byte) (err string) Close() (err string) + Clone()Session + /** + 通过Carrier数据构造本次rpc调用的tracing Span,如果没有就创建一个新的 + */ + CreateRootSpan(operationName string)opentracing.Span + /** + 通过Carrier数据构造本次rpc调用的tracing Span,如果没有就返回nil + */ + LoadSpan(operationName string)opentracing.Span + /** + 获取本次rpc调用的tracing Span + */ + Span()opentracing.Span + /** + 从Session的 Span继承一个新的Span + */ + ExtractSpan(operationName string)opentracing.Span + /** + 获取Tracing的Carrier 可能为nil + */ + TracCarrier()map[string]string } /** diff --git a/gate/session.go b/gate/session.go deleted file mode 100644 index 57a40cd..0000000 --- a/gate/session.go +++ /dev/null @@ -1,327 +0,0 @@ -// Copyright 2014 mqant Author. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package gate - -import ( - "fmt" - "github.com/golang/protobuf/proto" - "github.com/liangdas/mqant/module" -) - -type sessionagent struct { - app module.App - session *session -} - -func NewSession(app module.App, data []byte) (Session,error) { - agent:=&sessionagent{ - app:app, - } - se := &session{} - err := proto.Unmarshal(data, se) - if err != nil { - return nil,err - } // 测试结果 - agent.session=se - return agent,nil -} - -func NewSessionByMap(app module.App, data map[string]interface{}) (Session,error) { - agent:=&sessionagent{ - app:app, - session:new(session), - } - err:=agent.updateMap(data) - if err!=nil{ - return nil,err - } - return agent,nil -} - -func (session *sessionagent) GetIP() string { - return session.session.GetIP() -} - -func (session *sessionagent) GetNetwork() string { - return session.session.GetNetwork() -} - -func (session *sessionagent) GetUserid() string { - return session.session.GetUserid() -} - -func (session *sessionagent) GetSessionid() string { - return session.session.GetSessionid() -} - -func (session *sessionagent) GetServerid() string { - return session.session.GetServerid() -} - -func (session *sessionagent) GetSettings() map[string]string { - return session.session.GetSettings() -} - - -func (session *sessionagent)SetIP(ip string){ - session.session.IP=ip -} -func (session *sessionagent)SetNetwork(network string){ - session.session.Network=network -} -func (session *sessionagent)SetUserid(userid string){ - session.session.Userid=userid -} -func (session *sessionagent)SetSessionid(sessionid string){ - session.session.Sessionid=sessionid -} -func (session *sessionagent)SetServerid(serverid string){ - session.session.Serverid=serverid -} -func (session *sessionagent)SetSettings(settings map[string]string){ - session.session.Settings=settings -} - -func (session *sessionagent) updateMap(s map[string]interface{})error { - Userid := s["Userid"] - if Userid != nil { - session.session.Userid = Userid.(string) - } - IP := s["IP"] - if IP != nil { - session.session.IP = IP.(string) - } - Network := s["Network"] - if Network != nil { - session.session.Network = Network.(string) - } - Sessionid := s["Sessionid"] - if Sessionid != nil { - session.session.Sessionid = Sessionid.(string) - } - Serverid := s["Serverid"] - if Serverid != nil { - session.session.Serverid = Serverid.(string) - } - Settings := s["Settings"] - if Settings != nil { - session.session.Settings = Settings.(map[string]string) - } - return nil -} - -func (session *sessionagent) update(s Session)error { - Userid := s.GetUserid() - session.session.Userid = Userid - IP := s.GetIP() - session.session.IP = IP - Network := s.GetNetwork() - session.session.Network = Network - Sessionid := s.GetSessionid() - session.session.Sessionid = Sessionid - Serverid := s.GetServerid() - session.session.Serverid = Serverid - Settings := s.GetSettings() - session.session.Settings = Settings - return nil -} - -func (session *sessionagent)Serializable()([]byte,error){ - data, err := proto.Marshal(session.session) - if err != nil { - return nil,err - } // 进行解码 - return data,nil -} - - -func (session *sessionagent) Update() (err string) { - if session.app == nil { - err = fmt.Sprintf("Module.App is nil") - return - } - server, e := session.app.GetServersById(session.session.Serverid) - if e != nil { - err = fmt.Sprintf("Service not found id(%s)", session.session.Serverid) - return - } - result, err := server.Call("Update", session.session.Sessionid) - if err == "" { - if result != nil { - //绑定成功,重新更新当前Session - session.update(result.(Session)) - } - } - return -} - -func (session *sessionagent) Bind(Userid string) (err string) { - if session.app == nil { - err = fmt.Sprintf("Module.App is nil") - return - } - server, e := session.app.GetServersById(session.session.Serverid) - if e != nil { - err = fmt.Sprintf("Service not found id(%s)", session.session.Serverid) - return - } - result, err := server.Call("Bind", session.session.Sessionid, Userid) - if err == "" { - if result != nil { - //绑定成功,重新更新当前Session - session.update(result.(Session)) - } - } - return -} - -func (session *sessionagent) UnBind() (err string) { - if session.app == nil { - err = fmt.Sprintf("Module.App is nil") - return - } - server, e := session.app.GetServersById(session.session.Serverid) - if e != nil { - err = fmt.Sprintf("Service not found id(%s)", session.session.Serverid) - return - } - result, err := server.Call("UnBind", session.session.Sessionid) - if err == "" { - if result != nil { - //绑定成功,重新更新当前Session - session.update(result.(Session)) - } - } - return -} - -func (session *sessionagent) Push() (err string) { - if session.app == nil { - err = fmt.Sprintf("Module.App is nil") - return - } - server, e := session.app.GetServersById(session.session.Serverid) - if e != nil { - err = fmt.Sprintf("Service not found id(%s)", session.session.Serverid) - return - } - result, err := server.Call("Push", session.session.Sessionid, session.session.Settings) - if err == "" { - if result != nil { - //绑定成功,重新更新当前Session - session.update(result.(Session)) - } - } - return -} - -func (session *sessionagent) Set(key string, value string) (err string) { - if session.app == nil { - err = fmt.Sprintf("Module.App is nil") - return - } - if session.session.Settings == nil { - session.session.Settings=map[string]string{} - } - session.session.Settings[key] = value - //server,e:=session.app.GetServersById(session.Serverid) - //if e!=nil{ - // err=fmt.Sprintf("Service not found id(%s)",session.Serverid) - // return - //} - //result,err:=server.Call("Set",session.Sessionid,key,value) - //if err==""{ - // if result!=nil{ - // //绑定成功,重新更新当前Session - // session.update(result.(map[string]interface {})) - // } - //} - return -} - -func (session *sessionagent) Get(key string) (result string) { - if session.session.Settings == nil { - return - } - result = session.session.Settings[key] - return -} - -func (session *sessionagent) Remove(key string) (err string) { - if session.app == nil { - err = fmt.Sprintf("Module.App is nil") - return - } - if session.session.Settings == nil { - session.session.Settings=map[string]string{} - } - delete(session.session.Settings, key) - //server,e:=session.app.GetServersById(session.Serverid) - //if e!=nil{ - // err=fmt.Sprintf("Service not found id(%s)",session.Serverid) - // return - //} - //result,err:=server.Call("Remove",session.Sessionid,key) - //if err==""{ - // if result!=nil{ - // //绑定成功,重新更新当前Session - // session.update(result.(map[string]interface {})) - // } - //} - return -} -func (session *sessionagent) Send(topic string, body []byte) (err string) { - if session.app == nil { - err = fmt.Sprintf("Module.App is nil") - return - } - server, e := session.app.GetServersById(session.session.Serverid) - if e != nil { - err = fmt.Sprintf("Service not found id(%s)", session.session.Serverid) - return - } - _, err = server.Call("Send", session.session.Sessionid, topic, body) - return -} - -func (session *sessionagent) SendNR(topic string, body []byte) (err string) { - if session.app == nil { - err = fmt.Sprintf("Module.App is nil") - return - } - server, e := session.app.GetServersById(session.session.Serverid) - if e != nil { - err = fmt.Sprintf("Service not found id(%s)", session.session.Serverid) - return - } - e = server.CallNR("Send", session.session.Sessionid, topic, body) - if e != nil { - err = e.Error() - } - return "" -} - -func (session *sessionagent) Close() (err string) { - if session.app == nil { - err = fmt.Sprintf("Module.App is nil") - return - } - server, e := session.app.GetServersById(session.session.Serverid) - if e != nil { - err = fmt.Sprintf("Service not found id(%s)", session.session.Serverid) - return - } - _, err = server.Call("Close", session.session.Sessionid) - return -} diff --git a/module/base/base_module.go b/module/base/base_module.go index 7cbca6e..08ea4b4 100644 --- a/module/base/base_module.go +++ b/module/base/base_module.go @@ -80,7 +80,7 @@ func (m *BaseModule) OnInit(subclass module.RPCModule, app module.App, settings m.settings = settings m.statistical = map[string]*StatisticalMethod{} //创建一个远程调用的RPC - m.GetServer().OnInit(app, settings) + m.GetServer().OnInit(subclass,app, settings) m.GetServer().GetRPCServer().SetListener(m) } diff --git a/module/base/rpcserver.go b/module/base/rpcserver.go index 47eefe7..ea5923c 100644 --- a/module/base/rpcserver.go +++ b/module/base/rpcserver.go @@ -29,9 +29,9 @@ type rpcserver struct { func (s *rpcserver) GetId() string { return s.settings.Id } -func (s *rpcserver) OnInit(app module.App, settings *conf.ModuleSettings) { +func (s *rpcserver) OnInit(module module.Module,app module.App, settings *conf.ModuleSettings) { s.settings = settings - server, err := defaultrpc.NewRPCServer(app) //默认会创建一个本地的RPC + server, err := defaultrpc.NewRPCServer(app,module) //默认会创建一个本地的RPC if err != nil { log.Warning("Dial: %s", err) } diff --git a/module/module.go b/module/module.go index 0285a4f..9a65139 100644 --- a/module/module.go +++ b/module/module.go @@ -16,6 +16,7 @@ package module import ( "github.com/liangdas/mqant/conf" "github.com/liangdas/mqant/rpc" + opentracing "github.com/opentracing/opentracing-go" ) type ServerSession interface { GetId()string @@ -55,6 +56,10 @@ type App interface { AddRPCSerialize(name string, Interface RPCSerialize) error GetRPCSerialize()(map[string]RPCSerialize) + + DefaultTracer(func ()opentracing.Tracer) error + + GetTracer() opentracing.Tracer } type Module interface { diff --git a/module/modules/master_module.go b/module/modules/master_module.go index a888dd1..df0a548 100644 --- a/module/modules/master_module.go +++ b/module/modules/master_module.go @@ -6,7 +6,6 @@ package modules import ( "encoding/json" "github.com/liangdas/mqant/conf" - "github.com/liangdas/mqant/log" "github.com/liangdas/mqant/module/modules/master" "io" "io/ioutil" @@ -119,26 +118,26 @@ func (m *Master) OnInit(app module.App, settings *conf.ModuleSettings) { } func (m *Master) Run(closeSig chan bool) { - if m.app.GetSettings().Master.WebHost != "" { - //app := golf.New() - //app.Static("/", m.app.GetSettings().Master.WebRoot) - //app.Run(m.app.GetSettings().Master.WebHost) - l, _ := net.Listen("tcp", m.app.GetSettings().Master.WebHost) - m.listener = l - go func() { - log.Info("Master web server Listen : %s", m.app.GetSettings().Master.WebHost) - http.Handle("/", http.StripPrefix("/", http.FileServer(http.Dir(m.app.GetSettings().Master.WebRoot)))) - http.HandleFunc("/api/process/list.json", m.ProcessList) - http.HandleFunc("/api/process/state/update.json", m.UpdateProcessState) - http.HandleFunc("/api/process/start.json", m.StartProcess) - http.HandleFunc("/api/process/stop.json", m.StopProcess) - http.HandleFunc("/api/module/list.json", m.ModuleList) - http.Serve(m.listener, nil) - }() - <-closeSig - log.Info("Master web server Shutting down...") - m.listener.Close() - } + //if m.app.GetSettings().Master.WebHost != "" { + // //app := golf.New() + // //app.Static("/", m.app.GetSettings().Master.WebRoot) + // //app.Run(m.app.GetSettings().Master.WebHost) + // l, _ := net.Listen("tcp", m.app.GetSettings().Master.WebHost) + // m.listener = l + // go func() { + // log.Info("Master web server Listen : %s", m.app.GetSettings().Master.WebHost) + // http.Handle("/", http.StripPrefix("/", http.FileServer(http.Dir(m.app.GetSettings().Master.WebRoot)))) + // http.HandleFunc("/api/process/list.json", m.ProcessList) + // http.HandleFunc("/api/process/state/update.json", m.UpdateProcessState) + // http.HandleFunc("/api/process/start.json", m.StartProcess) + // http.HandleFunc("/api/process/stop.json", m.StopProcess) + // http.HandleFunc("/api/module/list.json", m.ModuleList) + // http.Serve(m.listener, nil) + // }() + // <-closeSig + // log.Info("Master web server Shutting down...") + // m.listener.Close() + //} } diff --git a/network/tcp_server.go b/network/tcp_server.go index 4846b6a..215c5bc 100644 --- a/network/tcp_server.go +++ b/network/tcp_server.go @@ -29,7 +29,6 @@ type TCPServer struct { MaxConnNum int NewAgent func(*TCPConn) Agent ln net.Listener - conns ConnSet mutexConns sync.Mutex wgLn sync.WaitGroup wgConns sync.WaitGroup @@ -47,10 +46,6 @@ func (server *TCPServer) init() { log.Warning("%v", err) } - if server.MaxConnNum <= 0 { - server.MaxConnNum = 10000 - log.Warning("invalid MaxConnNum, reset to %v", server.MaxConnNum) - } if server.NewAgent == nil { log.Warning("NewAgent must not be nil") } @@ -67,7 +62,6 @@ func (server *TCPServer) init() { } server.ln = ln - server.conns = make(ConnSet) } func (server *TCPServer) run() { server.wgLn.Add(1) @@ -94,15 +88,6 @@ func (server *TCPServer) run() { } tempDelay = 0 - server.mutexConns.Lock() - if len(server.conns) >= server.MaxConnNum { - server.mutexConns.Unlock() - conn.Close() - log.Warning("too many connections") - continue - } - server.conns[conn] = struct{}{} - server.mutexConns.Unlock() server.wgConns.Add(1) tcpConn := newTCPConn(conn) @@ -112,9 +97,6 @@ func (server *TCPServer) run() { // cleanup tcpConn.Close() - server.mutexConns.Lock() - delete(server.conns, conn) - server.mutexConns.Unlock() agent.OnClose() server.wgConns.Done() @@ -125,12 +107,5 @@ func (server *TCPServer) run() { func (server *TCPServer) Close() { server.ln.Close() server.wgLn.Wait() - - server.mutexConns.Lock() - for conn := range server.conns { - conn.Close() - } - server.conns = nil - server.mutexConns.Unlock() server.wgConns.Wait() } diff --git a/network/ws_server.go b/network/ws_server.go index 8db3f98..ed8a967 100644 --- a/network/ws_server.go +++ b/network/ws_server.go @@ -41,7 +41,6 @@ type WSHandler struct { maxMsgLen uint32 newAgent func(*WSConn) Agent upgrader websocket.Upgrader - conns WebsocketConnSet mutexConns sync.Mutex wg sync.WaitGroup } @@ -61,20 +60,6 @@ func (handler *WSHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { handler.wg.Add(1) defer handler.wg.Done() - handler.mutexConns.Lock() - if handler.conns == nil { - handler.mutexConns.Unlock() - conn.Close() - return - } - if len(handler.conns) >= handler.maxConnNum { - handler.mutexConns.Unlock() - conn.Close() - log.Warning("too many connections") - return - } - handler.conns[conn] = struct{}{} - handler.mutexConns.Unlock() wsConn := newWSConn(conn) agent := handler.newAgent(wsConn) @@ -83,7 +68,6 @@ func (handler *WSHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // cleanup wsConn.Close() handler.mutexConns.Lock() - delete(handler.conns, conn) handler.mutexConns.Unlock() agent.OnClose() } @@ -94,14 +78,6 @@ func (server *WSServer) Start() { log.Warning("%v", err) } - if server.MaxConnNum <= 0 { - server.MaxConnNum = 10000 - log.Warning("invalid MaxConnNum, reset to %v", server.MaxConnNum) - } - if server.MaxMsgLen <= 0 { - server.MaxMsgLen = 4096 - log.Warning("invalid MaxMsgLen, reset to %v", server.MaxMsgLen) - } if server.HTTPTimeout <= 0 { server.HTTPTimeout = 10 * time.Second log.Warning("invalid HTTPTimeout, reset to %v", server.HTTPTimeout) @@ -125,7 +101,6 @@ func (server *WSServer) Start() { maxConnNum: server.MaxConnNum, maxMsgLen: server.MaxMsgLen, newAgent: server.NewAgent, - conns: make(WebsocketConnSet), upgrader: websocket.Upgrader{ HandshakeTimeout: server.HTTPTimeout, Subprotocols: []string{"mqttv3.1"}, @@ -147,12 +122,5 @@ func (server *WSServer) Start() { func (server *WSServer) Close() { server.ln.Close() - server.handler.mutexConns.Lock() - for conn := range server.handler.conns { - conn.Close() - } - server.handler.conns = nil - server.handler.mutexConns.Unlock() - server.handler.wg.Wait() } diff --git a/rpc/base/amqp_client.go b/rpc/base/amqp_client.go index aa7a8d5..d057ac3 100644 --- a/rpc/base/amqp_client.go +++ b/rpc/base/amqp_client.go @@ -207,6 +207,9 @@ func (c *AMQPClient) on_timeout_handle(args interface{}) { } } + + + /** 接收应答信息 */ diff --git a/rpc/base/rpc_client.go b/rpc/base/rpc_client.go index f149cd4..b7d9ee9 100644 --- a/rpc/base/rpc_client.go +++ b/rpc/base/rpc_client.go @@ -24,6 +24,7 @@ import ( "github.com/liangdas/mqant/rpc/util" "github.com/liangdas/mqant/rpc" "github.com/liangdas/mqant/module" + "github.com/liangdas/mqant/gate" ) type RPCClient struct { @@ -72,12 +73,13 @@ func (c *RPCClient) Done() (err error) { return } + func (c *RPCClient) CallArgs(_func string, ArgsType []string,args [][]byte ) (interface{}, string) { var correlation_id = uuid.Rand().Hex() rpcInfo := &rpcpb.RPCInfo{ Fn: *proto.String(_func), Reply: *proto.Bool(true), - Expired: *proto.Int64((time.Now().UTC().Add(time.Second * time.Duration(conf.RpcExpired)).UnixNano()) / 1000000), + Expired: *proto.Int64((time.Now().UTC().Add(time.Second * time.Duration(c.app.GetSettings().Rpc.RpcExpired)).UnixNano()) / 1000000), Cid: *proto.String(correlation_id), Args: args, ArgsType:ArgsType, @@ -117,7 +119,7 @@ func (c *RPCClient) CallNRArgs(_func string, ArgsType []string,args [][]byte ) ( rpcInfo := &rpcpb.RPCInfo{ Fn: *proto.String(_func), Reply: *proto.Bool(false), - Expired: *proto.Int64((time.Now().UTC().Add(time.Second * time.Duration(conf.RpcExpired)).UnixNano()) / 1000000), + Expired: *proto.Int64((time.Now().UTC().Add(time.Second * time.Duration(c.app.GetSettings().Rpc.RpcExpired)).UnixNano()) / 1000000), Cid: *proto.String(correlation_id), Args: args, ArgsType:ArgsType, @@ -155,6 +157,12 @@ func (c *RPCClient) Call(_func string, params ...interface{}) (interface{}, stri if err != nil{ return nil, fmt.Sprintf( "args[%d] error %s",k,err.Error()) } + + switch v2:=param.(type) { //多选语句switch + case gate.Session: + //如果参数是这个需要拷贝一份新的再传 + param=v2.Clone() + } } return c.CallArgs(_func,ArgsType,args) } @@ -170,6 +178,12 @@ func (c *RPCClient) CallNR(_func string, params ...interface{}) (err error) { if err != nil{ return fmt.Errorf( "args[%d] error %s",k,err.Error()) } + + switch v2:=param.(type) { //多选语句switch + case gate.Session: + //如果参数是这个需要拷贝一份新的再传 + param=v2.Clone() + } } return c.CallNRArgs(_func,ArgsType,args) } diff --git a/rpc/base/rpc_server.go b/rpc/base/rpc_server.go index 24fab14..47860c7 100644 --- a/rpc/base/rpc_server.go +++ b/rpc/base/rpc_server.go @@ -25,11 +25,14 @@ import ( "github.com/liangdas/mqant/rpc/util" "github.com/liangdas/mqant/module" "github.com/liangdas/mqant/rpc" + "github.com/liangdas/mqant/gate" + opentracing "github.com/opentracing/opentracing-go" ) type RPCServer struct { + module module.Module app module.App functions map[string]mqrpc.FunctionInfo remote_server *AMQPServer @@ -43,9 +46,10 @@ type RPCServer struct { } -func NewRPCServer(app module.App) (mqrpc.RPCServer, error) { +func NewRPCServer(app module.App,module module.Module) (mqrpc.RPCServer, error) { rpc_server := new(RPCServer) rpc_server.app=app + rpc_server.module=module rpc_server.call_chan_done = make(chan error) rpc_server.functions = make(map[string]mqrpc.FunctionInfo) rpc_server.mq_chan = make(chan mqrpc.CallInfo,50) @@ -200,7 +204,6 @@ func (s *RPCServer) runFunc(callInfo mqrpc.CallInfo, callbacks chan<- mqrpc.Call resultInfo := rpcpb.NewResultInfo(Cid,Error,argsutil.NULL,nil) callInfo.Result = *resultInfo callbacks <- callInfo - if s.listener != nil { s.listener.OnError(callInfo.RpcInfo.Fn, &callInfo, fmt.Errorf(Error)) } @@ -240,21 +243,12 @@ func (s *RPCServer) runFunc(callInfo mqrpc.CallInfo, callbacks chan<- mqrpc.Call //} //typ := reflect.TypeOf(_func) - var in []reflect.Value - if len(ArgsType)>0{ - in = make([]reflect.Value, len(params)) - for k,v:=range ArgsType{ - v,err:=argsutil.Bytes2Args(s.app,v,params[k]) - if err!=nil{ - _errorCallback(callInfo.RpcInfo.Cid,fmt.Sprintf("args[%d] [%s] Types not allowed",k,reflect.TypeOf(params[k]))) - return - } - in[k] = reflect.ValueOf(v) - } - } + s.wg.Add(1) s.executing++ _runFunc := func() { + var span opentracing.Span=nil + defer func() { if r := recover(); r != nil { var rn = "" @@ -271,6 +265,11 @@ func (s *RPCServer) runFunc(callInfo mqrpc.CallInfo, callbacks chan<- mqrpc.Call log.Error("rpc func(%s) error %s\n ----Stack----\n%s",callInfo.RpcInfo.Fn,rn,errstr) _errorCallback(callInfo.RpcInfo.Cid,rn) } + + if span!=nil{ + span.Finish() + } + s.wg.Add(-1) s.executing-- }() @@ -278,9 +277,35 @@ func (s *RPCServer) runFunc(callInfo mqrpc.CallInfo, callbacks chan<- mqrpc.Call //t:=RandInt64(2,3) //time.Sleep(time.Second*time.Duration(t)) // f 为函数地址 + + var in []reflect.Value + if len(ArgsType)>0{ + in = make([]reflect.Value, len(params)) + for k,v:=range ArgsType{ + v,err:=argsutil.Bytes2Args(s.app,v,params[k]) + if err!=nil{ + _errorCallback(callInfo.RpcInfo.Cid,fmt.Sprintf("args[%d] [%s] Types not allowed",k,reflect.TypeOf(params[k]))) + return + } + switch v2:=v.(type) { //多选语句switch + case gate.Session: + //尝试加载Span + span=v2.LoadSpan(fmt.Sprintf("%s/%s",s.module.GetType(),callInfo.RpcInfo.Fn)) + if span!=nil{ + span.SetTag("UserId",v2.GetUserid()) + span.SetTag("Func",callInfo.RpcInfo.Fn) + } + } + in[k] = reflect.ValueOf(v) + } + } + out := f.Call(in) var rs []interface{} if len(out) != 2 { + if span!=nil{ + span.LogEventWithPayload("Error","The number of prepare is not adapted.") + } _errorCallback(callInfo.RpcInfo.Cid,"The number of prepare is not adapted.") return } @@ -292,6 +317,9 @@ func (s *RPCServer) runFunc(callInfo mqrpc.CallInfo, callbacks chan<- mqrpc.Call } argsType,args,err:=argsutil.ArgsTypeAnd2Bytes(s.app,rs[0]) if err!=nil{ + if span!=nil{ + span.LogEventWithPayload("Error",err.Error()) + } _errorCallback(callInfo.RpcInfo.Cid,err.Error()) return } @@ -303,6 +331,12 @@ func (s *RPCServer) runFunc(callInfo mqrpc.CallInfo, callbacks chan<- mqrpc.Call ) callInfo.Result = *resultInfo callbacks <- callInfo + + if span!=nil{ + span.LogEventWithPayload("Result.Type",argsType) + span.LogEventWithPayload("Result",string(args)) + } + if s.listener != nil { s.listener.OnComplete(callInfo.RpcInfo.Fn, &callInfo, resultInfo, time.Now().UnixNano()-exec_time) } diff --git a/utils/base62.go b/utils/base62.go new file mode 100644 index 0000000..f0a7dca --- /dev/null +++ b/utils/base62.go @@ -0,0 +1,52 @@ +// Copyright 2014 loolgame Author. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package utils +import ( + "math" + "strings" +) + +const CODE62 = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" +const CODE_LENTH = 62 +var EDOC = map[string]int64{"0":0,"1":1,"2":2,"3":3,"4":4,"5":5,"6":6,"7":7,"8":8,"9":9,"a":10,"b":11,"c":12,"d":13,"e":14,"f":15,"g":16,"h":17,"i":18,"j":19,"k":20,"l":21,"m":22,"n":23,"o":24,"p":25,"q":26,"r":27,"s":28,"t":29,"u":30,"v":31,"w":32,"x":33,"y":34,"z":35,"A":36,"B":37,"C":38,"D":39,"E":40,"F":41,"G":42,"H":43,"I":44,"J":45,"K":46,"L":47,"M":48,"N":49,"O":50,"P":51,"Q":52,"R":53,"S":54,"T":55,"U":56,"V":57,"W":58,"X":59,"Y":60,"Z":61, } + +/** + * 编码 整数 为 base62 字符串 + */ +func IntToBase62(number int64) string { + if number == 0 { + return "0" + } + result := make([]byte , 0) + for number > 0 { + round := number / CODE_LENTH + remain := number % CODE_LENTH + result = append(result,CODE62[remain]) + number = round + } + return string(result) +} + + +/** + * 解码字符串为整数 + */ +func Base62ToInt(str string) int64 { + str = strings.TrimSpace(str) + var result int64 = 0 + for index,char := range []byte(str){ + result += EDOC[string(char)] * int64(math.Pow(CODE_LENTH,float64(index))) + } + return result +} diff --git a/utils/base62_test.go b/utils/base62_test.go new file mode 100644 index 0000000..463e5fa --- /dev/null +++ b/utils/base62_test.go @@ -0,0 +1,44 @@ +// Copyright 2014 loolgame Author. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package utils +import ( + "testing" +) +func TestBase62ToInt(t *testing.T) { + i:=Base62ToInt("LLqbOL1") + assertEqual(t,int64(100600020001),i) + + i1:=Base62ToInt("eg") + assertEqual(t,int64(1006),i1) + + i2:=Base62ToInt("2cq") + assertEqual(t,int64(100690),i2) + + i3:=Base62ToInt("mim3") + assertEqual(t,int64(800690),i3) +} + +func TestIntToBase62(t *testing.T) { + b:=IntToBase62(100600020001) + assertEqual(t,"LLqbOL1",b) + + b1:=IntToBase62(1006) + assertEqual(t,"eg",b1) + + b2:=IntToBase62(100690) + assertEqual(t,"2cq",b2) + + b3:=IntToBase62(800690) + assertEqual(t,"mim3",b3) +} \ No newline at end of file diff --git a/utils/safemap.go b/utils/safemap.go index 3b3ff5c..3b3dade 100644 --- a/utils/safemap.go +++ b/utils/safemap.go @@ -78,6 +78,15 @@ func (m *BeeMap) Delete(k interface{}) { m.lock.Unlock() } +func (m *BeeMap) DeleteAll() { + m.lock.Lock() + for k, _ := range m.bm { + delete(m.bm, k) + } + + m.lock.Unlock() +} + // Items returns all items in safemap. func (m *BeeMap) Items() map[interface{}]interface{} { m.lock.RLock() diff --git a/version.go b/version.go index 64c9746..eacc640 100644 --- a/version.go +++ b/version.go @@ -1,3 +1,3 @@ package mqant -const Version = "1.2.0" +const Version = "1.4.0"