Skip to content

Commit

Permalink
=v1.4.0
Browse files Browse the repository at this point in the history
  • Loading branch information
liangdas committed Jun 18, 2017
1 parent 42e6b9b commit fa2a54f
Show file tree
Hide file tree
Showing 29 changed files with 844 additions and 539 deletions.
14 changes: 13 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ QQ交流群 :463735103
go get github.com/gorilla/websocket
go get github.com/streadway/amqp
go get github.com/golang/protobuf
go get github.com/golang/net/context
go get github.com/gogo/protobuf
go get github.com/opentracing/basictracer-go
go get github.com/opentracing/opentracing-go

# 文档

Expand Down Expand Up @@ -59,13 +63,19 @@ QQ交流群 :463735103

[框架架构](https://github.com/liangdas/mqant/wiki/mqant%E6%A1%86%E6%9E%B6%E6%A6%82%E8%BF%B0)



## 下一步计划
1. 分布式架构管理模块(Master)
1. 模块发现
2. 模块管理
1. 模块动态添加删除
2. 模块状态监控
2. 异常日志监控和汇报
1. 异常日志分类汇总
2. 定时将异常日志发送到Email
3. 定时将异常日志通过webhook发送到团队协作工具中(钉钉,worktile等)
3. rpc添加track分布式跟踪系统的接口[Appdash,用Go实现的分布式系统跟踪神器](http://tonybai.com/2015/06/17/appdash-distributed-systems-tracing-in-go/)
3. 【已完成】rpc添加track分布式跟踪系统的接口[Appdash,用Go实现的分布式系统跟踪神器](http://tonybai.com/2015/06/17/appdash-distributed-systems-tracing-in-go/)

## 贡献者

Expand All @@ -82,6 +92,8 @@ bug请直接通过issue提交

## 版本日志

### [v1.4.0新特性](https://github.com/liangdas/mqant/wiki/v1.4.0)

### [v1.3.0新特性](https://github.com/liangdas/mqant/wiki/v1.3.0)

### [v1.2.0新特性](https://github.com/liangdas/mqant/wiki/v1.2.0)
Expand Down
12 changes: 12 additions & 0 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"strings"
"github.com/liangdas/mqant/rpc/base"
"github.com/liangdas/mqant/module/modules"
opentracing "github.com/opentracing/opentracing-go"
)


Expand Down Expand Up @@ -59,6 +60,7 @@ type DefaultApp struct {
routes map[string]func(app module.App, Type string, hash string) module.ServerSession
defaultRoutes func(app module.App, Type string, hash string) module.ServerSession
rpcserializes map[string]module.RPCSerialize
getTracer func ()opentracing.Tracer
}

func (app *DefaultApp) Run(debug bool, mods ...module.Module) error {
Expand Down Expand Up @@ -258,3 +260,13 @@ func (app *DefaultApp) RpcInvokeNRArgs(module module.RPCModule, moduleType strin
}
return server.CallNRArgs(_func, ArgsType,args)
}
func (app *DefaultApp)DefaultTracer(_func func ()opentracing.Tracer) error{
app.getTracer=_func
return nil
}
func (app *DefaultApp)GetTracer()opentracing.Tracer{
if app.getTracer!=nil{
return app.getTracer()
}
return nil
}
16 changes: 12 additions & 4 deletions conf/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,7 @@ import (

var (
LenStackBuf = 1024
LogLevel = "debug"
LogPath = ""
LogFlag = 0
RpcExpired = 5 //远程访问最后期限值 单位秒[默认5秒] 这个值指定了在客户端可以等待服务端多长时间来应答

Conf = Config{}
)

Expand All @@ -42,9 +39,20 @@ func LoadConfig(Path string) {
}

type Config struct {
Rpc Rpc
Module map[string][]*ModuleSettings
Mqtt Mqtt
Master Master
Tracing Tracing
}

type Tracing struct {
Addr string //collector server eg 127.0.0.1:7701
Enable bool
}

type Rpc struct {
RpcExpired int //远程访问最后期限值 单位秒[默认5秒] 这个值指定了在客户端可以等待服务端多长时间来应答
}

type Rabbitmq struct {
Expand Down
78 changes: 43 additions & 35 deletions gate/gate_handler.go → gate/base/gate_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,17 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package gate
package basegate

import (
"github.com/liangdas/mqant/gate"
"github.com/liangdas/mqant/log"
"github.com/liangdas/mqant/utils"
)

type handler struct {
AgentLearner
GateHandler
gate.AgentLearner
gate.GateHandler
gate *Gate
sessions *utils.BeeMap //连接列表
}
Expand All @@ -34,97 +35,104 @@ func NewGateHandler(gate *Gate) *handler {
}

//当连接建立 并且MQTT协议握手成功
func (h *handler) Connect(a Agent) {
func (h *handler) Connect(a gate.Agent) {
if a.GetSession() != nil {
h.sessions.Set(a.GetSession().GetSessionid(), a)
}
}

//当连接关闭 或者客户端主动发送MQTT DisConnect命令
func (h *handler) DisConnect(a Agent) {
func (h *handler) DisConnect(a gate.Agent) {
if a.GetSession() != nil {
h.sessions.Delete(a.GetSession().GetSessionid())
}
}

func (h *handler)OnDestroy(){
for _,v:=range h.sessions.Items(){
v.(gate.Agent).Close()
}
h.sessions.DeleteAll()
}

/**
*更新整个Session 通常是其他模块拉取最新数据
*/
func (h *handler) Update(Sessionid string) (result Session, err string) {
func (h *handler) Update(Sessionid string) (result gate.Session, err string) {
agent := h.sessions.Get(Sessionid)
if agent == nil {
err = "No Sesssion found"
return
}
result = agent.(Agent).GetSession()
result = agent.(gate.Agent).GetSession()
return
}

/**
*Bind the session with the the Userid.
*/
func (h *handler) Bind(Sessionid string, Userid string) (result Session, err string) {
func (h *handler) Bind(Sessionid string, Userid string) (result gate.Session, err string) {
agent := h.sessions.Get(Sessionid)
if agent == nil {
err = "No Sesssion found"
return
}
agent.(Agent).GetSession().SetUserid(Userid)
agent.(gate.Agent).GetSession().SetUserid(Userid)

if h.gate.storage != nil && agent.(Agent).GetSession().GetUserid() != "" {
if h.gate.storage != nil && agent.(gate.Agent).GetSession().GetUserid() != "" {
//可以持久化
settings, err := h.gate.storage.Query(Userid)
if err == nil && settings != nil {
//有已持久化的数据,可能是上一次连接保存的
if agent.(Agent).GetSession().GetSettings() == nil {
agent.(Agent).GetSession().SetSettings(settings)
if agent.(gate.Agent).GetSession().GetSettings() == nil {
agent.(gate.Agent).GetSession().SetSettings(settings)
} else {
//合并两个map 并且以 agent.(Agent).GetSession().Settings 已有的优先
for k, v := range settings {
if _, ok := agent.(Agent).GetSession().GetSettings()[k]; ok {
if _, ok := agent.(gate.Agent).GetSession().GetSettings()[k]; ok {
//不用替换
} else {
agent.(Agent).GetSession().GetSettings()[k] = v
agent.(gate.Agent).GetSession().GetSettings()[k] = v
}
}
//数据持久化
h.gate.storage.Storage(Userid, agent.(Agent).GetSession().GetSettings())
h.gate.storage.Storage(Userid, agent.(gate.Agent).GetSession().GetSettings())

}
}
}

result = agent.(Agent).GetSession()
result = agent.(gate.Agent).GetSession()
return
}

/**
*UnBind the session with the the Userid.
*/
func (h *handler) UnBind(Sessionid string) (result Session, err string) {
func (h *handler) UnBind(Sessionid string) (result gate.Session, err string) {
agent := h.sessions.Get(Sessionid)
if agent == nil {
err = "No Sesssion found"
return
}
agent.(Agent).GetSession().SetUserid("")
result = agent.(Agent).GetSession()
agent.(gate.Agent).GetSession().SetUserid("")
result = agent.(gate.Agent).GetSession()
return
}

/**
*Push the session with the the Userid.
*/
func (h *handler) Push(Sessionid string, Settings map[string]string) (result Session, err string) {
func (h *handler) Push(Sessionid string, Settings map[string]string) (result gate.Session, err string) {
agent := h.sessions.Get(Sessionid)
if agent == nil {
err = "No Sesssion found"
return
}
agent.(Agent).GetSession().SetSettings(Settings)
result = agent.(Agent).GetSession()
if h.gate.storage != nil && agent.(Agent).GetSession().GetUserid() != "" {
err := h.gate.storage.Storage(agent.(Agent).GetSession().GetUserid(), agent.(Agent).GetSession().GetSettings())
agent.(gate.Agent).GetSession().SetSettings(Settings)
result = agent.(gate.Agent).GetSession()
if h.gate.storage != nil && agent.(gate.Agent).GetSession().GetUserid() != "" {
err := h.gate.storage.Storage(agent.(gate.Agent).GetSession().GetUserid(), agent.(gate.Agent).GetSession().GetSettings())
if err != nil {
log.Error("gate session storage failure")
}
Expand All @@ -136,17 +144,17 @@ func (h *handler) Push(Sessionid string, Settings map[string]string) (result Ses
/**
*Set values (one or many) for the session.
*/
func (h *handler) Set(Sessionid string, key string, value string) (result Session, err string) {
func (h *handler) Set(Sessionid string, key string, value string) (result gate.Session, err string) {
agent := h.sessions.Get(Sessionid)
if agent == nil {
err = "No Sesssion found"
return
}
agent.(Agent).GetSession().GetSettings()[key] = value
result = agent.(Agent).GetSession()
agent.(gate.Agent).GetSession().GetSettings()[key] = value
result = agent.(gate.Agent).GetSession()

if h.gate.storage != nil && agent.(Agent).GetSession().GetUserid() != "" {
err := h.gate.storage.Storage(agent.(Agent).GetSession().GetUserid(), agent.(Agent).GetSession().GetSettings())
if h.gate.storage != nil && agent.(gate.Agent).GetSession().GetUserid() != "" {
err := h.gate.storage.Storage(agent.(gate.Agent).GetSession().GetUserid(), agent.(gate.Agent).GetSession().GetSettings())
if err != nil {
log.Error("gate session storage failure")
}
Expand All @@ -164,11 +172,11 @@ func (h *handler) Remove(Sessionid string, key string) (result interface{}, err
err = "No Sesssion found"
return
}
delete(agent.(Agent).GetSession().GetSettings(), key)
result = agent.(Agent).GetSession()
delete(agent.(gate.Agent).GetSession().GetSettings(), key)
result = agent.(gate.Agent).GetSession()

if h.gate.storage != nil && agent.(Agent).GetSession().GetUserid() != "" {
err := h.gate.storage.Storage(agent.(Agent).GetSession().GetUserid(), agent.(Agent).GetSession().GetSettings())
if h.gate.storage != nil && agent.(gate.Agent).GetSession().GetUserid() != "" {
err := h.gate.storage.Storage(agent.(gate.Agent).GetSession().GetUserid(), agent.(gate.Agent).GetSession().GetSettings())
if err != nil {
log.Error("gate session storage failure")
}
Expand All @@ -186,7 +194,7 @@ func (h *handler) Send(Sessionid string, topic string, body []byte) (result inte
err = "No Sesssion found"
return
}
e := agent.(Agent).WriteMsg(topic, body)
e := agent.(gate.Agent).WriteMsg(topic, body)
if e != nil {
err = e.Error()
} else {
Expand All @@ -204,6 +212,6 @@ func (h *handler) Close(Sessionid string) (result interface{}, err string) {
err = "No Sesssion found"
return
}
agent.(Agent).Close()
agent.(gate.Agent).Close()
return
}
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
16 changes: 10 additions & 6 deletions gate/mqtt_agent.go → gate/base/mqtt_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package gate
package basegate

import (
"bufio"
"encoding/json"
"fmt"
"github.com/liangdas/mqant/conf"
"github.com/liangdas/mqant/gate/mqtt"
"github.com/liangdas/mqant/gate"
"github.com/liangdas/mqant/gate/base/mqtt"
"github.com/liangdas/mqant/log"
"github.com/liangdas/mqant/network"
"github.com/liangdas/mqant/utils/uuid"
Expand All @@ -34,8 +35,8 @@ type resultInfo struct {
}

type agent struct {
Agent
session Session
gate.Agent
session gate.Session
conn network.Conn
r *bufio.Reader
w *bufio.Writer
Expand All @@ -51,7 +52,7 @@ func (a *agent) IsClosed() bool {
return a.isclose
}

func (a *agent) GetSession() Session {
func (a *agent) GetSession() gate.Session {
return a.session
}

Expand Down Expand Up @@ -97,6 +98,9 @@ func (a *agent) Run() (err error) {
log.Error("gate create agent fail",err.Error())
return
}



a.gate.agentLearner.Connect(a) //发送连接成功的事件

//回复客户端 CONNECT
Expand Down Expand Up @@ -185,7 +189,7 @@ func (a *agent) OnRecover(pack *mqtt.Pack) {
} else {
hash = a.gate.GetServerId()
}

a.session.CreateRootSpan("gate")
serverSession, err := a.gate.GetRouteServers(topics[0], hash)
if err != nil {
if msgid != "" {
Expand Down
Loading

0 comments on commit fa2a54f

Please sign in to comment.