Skip to content

Commit

Permalink
v1.6.7网关新增批量发送消息接口,移除时间轮功能
Browse files Browse the repository at this point in the history
  • Loading branch information
liangdas committed Feb 27, 2018
1 parent e83767b commit 02ddeb3
Show file tree
Hide file tree
Showing 11 changed files with 68 additions and 12 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ bug请直接通过issue提交

## 版本日志

### [v1.6.7新特性](https://github.com/liangdas/mqant/wiki/v1.6.7)

### [v1.6.6新特性](https://github.com/liangdas/mqant/wiki/v1.6.6)

### [v1.6.5新特性](https://github.com/liangdas/mqant/wiki/v1.6.5)
Expand Down
3 changes: 1 addition & 2 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/liangdas/mqant/log"
"github.com/liangdas/mqant/module"
"github.com/liangdas/mqant/module/base"
"github.com/liangdas/mqant/module/modules"
"github.com/liangdas/mqant/rpc"
"github.com/liangdas/mqant/rpc/base"
opentracing "github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -145,7 +144,7 @@ func (app *DefaultApp) Run(debug bool, mods ...module.Module) error {
}

manager := basemodule.NewModuleManager()
manager.RegisterRunMod(modules.TimerModule()) //注册时间轮模块 每一个进程都默认运行
//manager.RegisterRunMod(modules.TimerModule()) //注册时间轮模块 每一个进程都默认运行
// module
for i := 0; i < len(mods); i++ {
mods[i].OnAppConfigurationLoaded(app)
Expand Down
35 changes: 35 additions & 0 deletions gate/base/gate_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/liangdas/mqant/gate"
"github.com/liangdas/mqant/log"
"github.com/liangdas/mqant/utils"
"strings"
)

type handler struct {
Expand Down Expand Up @@ -233,6 +234,40 @@ func (h *handler) Send(Sessionid string, topic string, body []byte) (result inte
return
}

/**
*批量发送消息,sessionid之间用,分割
*/
func (h *handler) SendBatch(SessionidStr string, topic string, body []byte) (int64,string) {
sessionids:=strings.Split(SessionidStr,",")
var count int64=0
for _,sessionid:=range sessionids{
agent := h.sessions.Get(sessionid)
if agent == nil {
//log.Warning("No Sesssion found")
continue
}
e := agent.(gate.Agent).WriteMsg(topic, body)
if e != nil {
log.Warning("WriteMsg error:",e.Error())
} else {
count++
}
}
return count,""
}
func (h *handler) BroadCast(topic string, body []byte) (int64,string) {
var count int64=0
for _,agent:=range h.sessions.Items(){
e := agent.(gate.Agent).WriteMsg(topic, body)
if e != nil {
log.Warning("WriteMsg error:",e.Error())
} else {
count++
}
}
return count,""
}

/**
*主动关闭连接
*/
Expand Down
2 changes: 1 addition & 1 deletion gate/base/mqtt/mqtt_client_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (c *Client) waitPack(pAndErr *packAndErr) (err error) {
c.recover.OnRecover(pAndErr.pack)
case PINGREQ:
// Reply the heart beat
log.Debug("hb msg")
//log.Debug("hb msg")
err = c.queue.WritePack(GetPingResp(0, pAndErr.pack.GetDup()))
c.recover.OnRecover(pAndErr.pack)
default:
Expand Down
2 changes: 2 additions & 0 deletions gate/base/mqtt_gate.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ func (this *Gate) OnInit(subclass module.RPCModule, app module.App, settings *co
this.GetServer().RegisterGO("Set", this.handler.Set)
this.GetServer().RegisterGO("Remove", this.handler.Remove)
this.GetServer().RegisterGO("Send", this.handler.Send)
this.GetServer().RegisterGO("SendBatch", this.handler.SendBatch)
this.GetServer().RegisterGO("BroadCast", this.handler.BroadCast)
this.GetServer().RegisterGO("IsConnect", this.handler.IsConnect)
this.GetServer().RegisterGO("Close", this.handler.Close)
}
Expand Down
16 changes: 16 additions & 0 deletions gate/base/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,22 @@ func (this *sessionagent) Send(topic string, body []byte) string {
_, err := server.Call("Send", this.session.Sessionid, topic, body)
return err
}

func (this *sessionagent) SendBatch(Sessionids string, topic string, body []byte) (int64,string) {
if this.app == nil {
return 0,fmt.Sprintf("Module.App is nil")
}
server, e := this.app.GetServerById(this.session.Serverid)
if e != nil {
return 0,fmt.Sprintf("Service not found id(%s)", this.session.Serverid)
}
count, err := server.Call("SendBatch", Sessionids, topic, body)
if err!=""{
return 0,err
}
return count.(int64),err
}

func (this *sessionagent) IsConnect(userId string) (bool, string) {
if this.app == nil {
return false, fmt.Sprintf("Module.App is nil")
Expand Down
3 changes: 3 additions & 0 deletions gate/define.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type GateHandler interface {
Remove(Sessionid string, key string) (result interface{}, err string) //Remove value from the session.
Push(Sessionid string, Settings map[string]string) (result Session, err string) //推送信息给Session
Send(Sessionid string, topic string, body []byte) (result interface{}, err string) //Send message
SendBatch(Sessionids string, topic string, body []byte) (int64, string) //批量发送
BroadCast(topic string, body []byte) (int64, string) //广播消息给网关所有在连客户端
//查询某一个userId是否连接中,这里只是查询这一个网关里面是否有userId客户端连接,如果有多个网关就需要遍历了
IsConnect(Sessionid string, Userid string) (result bool, err string)
Close(Sessionid string) (result interface{}, err string) //主动关闭连接
Expand Down Expand Up @@ -59,6 +61,7 @@ type Session interface {
Remove(key string) (err string)
Send(topic string, body []byte) (err string)
SendNR(topic string, body []byte) (err string)
SendBatch(Sessionids string, topic string, body []byte) (int64, string) //想该客户端的网关批量发送消息
//查询某一个userId是否连接中,这里只是查询这一个网关里面是否有userId客户端连接,如果有多个网关就需要遍历了
IsConnect(Userid string) (result bool, err string)
//是否是访客(未登录) ,默认判断规则为 userId==""代表访客
Expand Down
5 changes: 2 additions & 3 deletions module/base/ModuleManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/liangdas/mqant/conf"
"github.com/liangdas/mqant/log"
"github.com/liangdas/mqant/module"
"github.com/liangdas/mqant/module/modules/timer"
)

func NewModuleManager() (m *ModuleManager) {
Expand Down Expand Up @@ -78,7 +77,7 @@ func (mer *ModuleManager) Init(app module.App, ProcessID string) {
m.wg.Add(1)
go run(m)
}
timer.SetTimer(3, mer.ReportStatistics, nil) //统计汇报定时任务
//timer.SetTimer(3, mer.ReportStatistics, nil) //统计汇报定时任务
}

/**
Expand Down Expand Up @@ -133,6 +132,6 @@ func (mer *ModuleManager) ReportStatistics(args interface{}) {
default:
}
}
timer.SetTimer(3, mer.ReportStatistics, nil)
//timer.SetTimer(3, mer.ReportStatistics, nil)
}
}
4 changes: 2 additions & 2 deletions network/ws_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ func (wsConn *WSConn) Close() error {
return wsConn.conn.Close()
}

func (wsConn *WSConn) Write(p []byte) (n int, err error) {
err = wsConn.conn.WriteMessage(websocket.BinaryMessage, p)
func (wsConn *WSConn) Write(p []byte) (int, error) {
err := wsConn.conn.WriteMessage(websocket.BinaryMessage, p)
if err != nil {
return 0, err
}
Expand Down
6 changes: 3 additions & 3 deletions rpc/base/rpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,9 @@ func (s *RPCServer) on_callback_handle(callbacks <-chan mqrpc.CallInfo, done cha
//}
} else {
//对于不需要回复的消息,可以判断一下是否出现错误,打印一些警告
if callInfo.Result.Error != "" {
log.Warning("rpc callback erro :\n%s", callInfo.Result.Error)
}
//if callInfo.Result.Error != "" {
// log.Warning("rpc callback erro :\n%s", callInfo.Result.Error)
//}
}
}
case <-done:
Expand Down
2 changes: 1 addition & 1 deletion version.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
package mqant

const Version = "1.6.6"
const Version = "1.6.7"

0 comments on commit 02ddeb3

Please sign in to comment.