Skip to content

Commit

Permalink
=v1.2.0版本 极大的优化了mqant性能,跳调整了框架目录结构
Browse files Browse the repository at this point in the history
  • Loading branch information
liangdas committed May 19, 2017
1 parent 1535f61 commit 30f4a23
Show file tree
Hide file tree
Showing 43 changed files with 2,184 additions and 1,262 deletions.
9 changes: 2 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,6 @@
# mqant
mqant是一款基于Golang语言的简洁,高效,高性能的分布式游戏服务器框架,研发的初衷是要实现一款能支持高并发,高性能,高实时性,的游戏服务器框架,也希望mqant未来能够做即时通讯和物联网方面的应用

# pymqant
pymqant是已经mqant相同的设计原理用python实现的,python版本设计的初衷并不是替代golang语言版本的mqant,而是希望mqant能充分利用python语言的优势(丰富的开源库),因此python是golang语言版本的mqant辅助版本

[https://github.com/liangdas/pymqant](https://github.com/liangdas/pymqant)

## 可以用pymqant来干什么?
理论上pymqant也实现了mqant完全相同的功能,只是缺少一个网关模块,可以用pymqant的flask实现游戏的web api模块,mqant实现对性能要求较高的游戏核心逻辑模块,pymqant模块与mqant模块之间RPC方法可以无缝相互调用,实现双向通信。



Expand Down Expand Up @@ -78,6 +71,8 @@ bug请直接通过issue提交


##版本日志
###[v1.3.0新特性](https://github.com/liangdas/mqant/wiki/v1.3.0)

###[v1.2.0新特性](https://github.com/liangdas/mqant/wiki/v1.2.0)

###[v1.1.0新特性](https://github.com/liangdas/mqant/wiki/v1.1.0)
Expand Down
90 changes: 61 additions & 29 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package app
package defaultApp

import (
"flag"
"fmt"
"github.com/liangdas/mqant/conf"
"github.com/liangdas/mqant/log"
"github.com/liangdas/mqant/module/base"
"github.com/liangdas/mqant/module"
"github.com/liangdas/mqant/rpc"
"hash/crc32"
Expand All @@ -27,13 +28,16 @@ import (
"os/signal"
"path/filepath"
"strings"
"github.com/liangdas/mqant/rpc/base"
"github.com/liangdas/mqant/module/modules"
)


func NewApp(version string) module.App {
app := new(DefaultApp)
app.routes = map[string]func(app module.App, Type string, hash string) *module.ServerSession{}
app.serverList = map[string]*module.ServerSession{}
app.defaultRoutes = func(app module.App, Type string, hash string) *module.ServerSession {
app.routes = map[string]func(app module.App, Type string, hash string) module.ServerSession{}
app.serverList = map[string]module.ServerSession{}
app.defaultRoutes = func(app module.App, Type string, hash string) module.ServerSession {
//默认使用第一个Server
servers := app.GetServersByType(Type)
if len(servers) == 0 {
Expand All @@ -42,17 +46,19 @@ func NewApp(version string) module.App {
index := int(math.Abs(float64(crc32.ChecksumIEEE([]byte(hash))))) % len(servers)
return servers[index]
}
app.rpcserializes=map[string]module.RPCSerialize{}
app.version = version
return app
}

type DefaultApp struct {
module.App
version string
serverList map[string]*module.ServerSession
serverList map[string]module.ServerSession
settings conf.Config
routes map[string]func(app module.App, Type string, hash string) *module.ServerSession
defaultRoutes func(app module.App, Type string, hash string) *module.ServerSession
routes map[string]func(app module.App, Type string, hash string) module.ServerSession
defaultRoutes func(app module.App, Type string, hash string) module.ServerSession
rpcserializes map[string]module.RPCSerialize
}

func (app *DefaultApp) Run(debug bool, mods ...module.Module) error {
Expand Down Expand Up @@ -84,8 +90,8 @@ func (app *DefaultApp) Run(debug bool, mods ...module.Module) error {
app.Configure(conf.Conf) //配置信息

log.Info("mqant %v starting up", app.version)
manager := module.NewModuleManager()
manager.RegisterRunMod(module.TimerModule()) //注册时间轮模块 每一个进程都默认运行
manager := basemodule.NewModuleManager()
manager.RegisterRunMod(modules.TimerModule()) //注册时间轮模块 每一个进程都默认运行
// module
for i := 0; i < len(mods); i++ {
manager.Register(mods[i])
Expand All @@ -101,11 +107,11 @@ func (app *DefaultApp) Run(debug bool, mods ...module.Module) error {
log.Info("mqant closing down (signal: %v)", sig)
return nil
}
func (app *DefaultApp) Route(moduleType string, fn func(app module.App, Type string, hash string) *module.ServerSession) error {
func (app *DefaultApp) Route(moduleType string, fn func(app module.App, Type string, hash string) module.ServerSession) error {
app.routes[moduleType] = fn
return nil
}
func (app *DefaultApp) getRoute(moduleType string) func(app module.App, Type string, hash string) *module.ServerSession {
func (app *DefaultApp) getRoute(moduleType string) func(app module.App, Type string, hash string) module.ServerSession {
fn := app.routes[moduleType]
if fn == nil {
//如果没有设置的路由,则使用默认的
Expand All @@ -114,6 +120,19 @@ func (app *DefaultApp) getRoute(moduleType string) func(app module.App, Type str
return fn
}


func (app *DefaultApp) AddRPCSerialize(name string, Interface module.RPCSerialize) error{
if _,ok:=app.rpcserializes[name];ok{
return fmt.Errorf("The name(%s) has been occupied",name)
}
app.rpcserializes[name]=Interface
return nil
}

func (app *DefaultApp)GetRPCSerialize()(map[string]module.RPCSerialize){
return app.rpcserializes
}

func (app *DefaultApp) Configure(settings conf.Config) error {
app.settings = settings
return nil
Expand All @@ -122,27 +141,23 @@ func (app *DefaultApp) Configure(settings conf.Config) error {
/**
*/
func (app *DefaultApp) OnInit(settings conf.Config) error {
app.serverList = make(map[string]*module.ServerSession)
app.serverList = make(map[string]module.ServerSession)
for Type, ModuleInfos := range settings.Module {
for _, moduel := range ModuleInfos {
m := app.serverList[moduel.Id]
if m != nil {
//如果Id已经存在,说明有两个相同Id的模块,这种情况不能被允许,这里就直接抛异常 强制崩溃以免以后调试找不到问题
panic(fmt.Sprintf("ServerId (%s) Type (%s) of the modules already exist Can not be reused ServerId (%s) Type (%s)", m.Id, m.Stype, moduel.Id, Type))
panic(fmt.Sprintf("ServerId (%s) Type (%s) of the modules already exist Can not be reused ServerId (%s) Type (%s)", m.GetId(), m.GetType(), moduel.Id, Type))
}
client, err := mqrpc.NewRPCClient()
client, err := defaultrpc.NewRPCClient(app,moduel.Id)
if err != nil {
continue
}
if moduel.Rabbitmq != nil {
//如果远程的rpc存在则创建一个对应的客户端
client.NewRemoteClient(moduel.Rabbitmq)
}
session := &module.ServerSession{
Id: moduel.Id,
Stype: Type,
Rpc: client,
}
session := basemodule.NewServerSession(moduel.Id,Type,client)
app.serverList[moduel.Id] = session
log.Info("RPCClient create success type(%s) id(%s)", Type, moduel.Id)
}
Expand All @@ -152,44 +167,44 @@ func (app *DefaultApp) OnInit(settings conf.Config) error {

func (app *DefaultApp) OnDestroy() error {
for id, session := range app.serverList {
err := session.Rpc.Done()
err := session.GetRpc().Done()
if err != nil {
log.Warning("RPCClient close fail type(%s) id(%s)", session.Stype, id)
log.Warning("RPCClient close fail type(%s) id(%s)", session.GetType(), id)
} else {
log.Info("RPCClient close success type(%s) id(%s)", session.Stype, id)
log.Info("RPCClient close success type(%s) id(%s)", session.GetType(), id)
}
}
return nil
}

func (app *DefaultApp) RegisterLocalClient(serverId string, server *mqrpc.RPCServer) error {
func (app *DefaultApp) RegisterLocalClient(serverId string, server mqrpc.RPCServer) error {
if session, ok := app.serverList[serverId]; ok {
return session.Rpc.NewLocalClient(server)
return session.GetRpc().NewLocalClient(server)
} else {
return fmt.Errorf("Server(%s) Not Found", serverId)
}
return nil
}

func (app *DefaultApp) GetServersById(serverId string) (*module.ServerSession, error) {
func (app *DefaultApp) GetServersById(serverId string) (module.ServerSession, error) {
if session, ok := app.serverList[serverId]; ok {
return session, nil
} else {
return nil, fmt.Errorf("Server(%s) Not Found", serverId)
}
}

func (app *DefaultApp) GetServersByType(Type string) []*module.ServerSession {
sessions := make([]*module.ServerSession, 0)
func (app *DefaultApp) GetServersByType(Type string) []module.ServerSession {
sessions := make([]module.ServerSession, 0)
for _, session := range app.serverList {
if session.Stype == Type {
if session.GetType() == Type {
sessions = append(sessions, session)
}
}
return sessions
}

func (app *DefaultApp) GetRouteServers(filter string, hash string) (s *module.ServerSession, err error) {
func (app *DefaultApp) GetRouteServers(filter string, hash string) (s module.ServerSession, err error) {
sl := strings.Split(filter, "@")
if len(sl) == 2 {
moduleID := sl[1]
Expand Down Expand Up @@ -226,3 +241,20 @@ func (app *DefaultApp) RpcInvokeNR(module module.RPCModule, moduleType string, _
}
return server.CallNR(_func, params...)
}

func (app *DefaultApp) RpcInvokeArgs(module module.RPCModule, moduleType string, _func string, ArgsType []string,args [][]byte) (result interface{}, err string) {
server, e := app.GetRouteServers(moduleType, module.GetServerId())
if e != nil {
err = e.Error()
return
}
return server.CallArgs(_func, ArgsType,args)
}

func (app *DefaultApp) RpcInvokeNRArgs(module module.RPCModule, moduleType string, _func string, ArgsType []string,args [][]byte) (err error) {
server, err := app.GetRouteServers(moduleType, module.GetServerId())
if err != nil {
return
}
return server.CallNRArgs(_func, ArgsType,args)
}
44 changes: 36 additions & 8 deletions gate/define.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,40 @@ package gate
net代理服务 处理器
*/
type GateHandler interface {
Bind(Sessionid string, Userid string) (result interface{}, err string) //Bind the session with the the Userid.
UnBind(Sessionid string) (result interface{}, err string) //UnBind the session with the the Userid.
Set(Sessionid string, key string, value interface{}) (result interface{}, err string) //Set values (one or many) for the session.
Bind(Sessionid string, Userid string) (result Session, err string) //Bind the session with the the Userid.
UnBind(Sessionid string) (result Session, err string) //UnBind the session with the the Userid.
Set(Sessionid string, key string, value string) (result Session, err string) //Set values (one or many) for the session.
Remove(Sessionid string, key string) (result interface{}, err string) //Remove value from the session.
Push(Sessionid string, Settings map[string]interface{}) (result interface{}, err string) //推送信息给Session
Push(Sessionid string, Settings map[string]string) (result Session, err string) //推送信息给Session
Send(Sessionid string, topic string, body []byte) (result interface{}, err string) //Send message to the session.
Close(Sessionid string) (result interface{}, err string) //主动关闭连接
Update(Sessionid string) (result interface{}, err string) //更新整个Session 通常是其他模块拉取最新数据
Update(Sessionid string) (result Session, err string) //更新整个Session 通常是其他模块拉取最新数据
}

type Session interface {
GetIP() string
GetNetwork() string
GetUserid() string
GetSessionid() string
GetServerid() string
GetSettings() map[string]string
SetIP(ip string)
SetNetwork(network string)
SetUserid(userid string)
SetSessionid(sessionid string)
SetServerid(serverid string)
SetSettings(settings map[string]string)
Serializable()([]byte,error)
Update() (err string)
Bind(Userid string) (err string)
UnBind() (err string)
Push() (err string)
Set(key string, value string) (err string)
Get(key string) (result string)
Remove(key string) (err string)
Send(topic string, body []byte) (err string)
SendNR(topic string, body []byte) (err string)
Close() (err string)
}

/**
Expand All @@ -35,7 +61,7 @@ type StorageHandler interface {
存储用户的Session信息
Session Bind Userid以后每次设置 settings都会调用一次Storage
*/
Storage(Userid string, settings map[string]interface{}) (err error)
Storage(Userid string, settings map[string]string) (err error)
/**
强制删除Session信息
*/
Expand All @@ -44,7 +70,7 @@ type StorageHandler interface {
获取用户Session信息
Bind Userid时会调用Query获取最新信息
*/
Query(Userid string) (settings map[string]interface{}, err error)
Query(Userid string) (settings map[string]string, err error)
/**
用户心跳,一般用户在线时1s发送一次
可以用来延长Session信息过期时间
Expand All @@ -60,6 +86,8 @@ type Agent interface {
WriteMsg(topic string, body []byte) error
Close()
Destroy()
RevNum() int64
SendNum() int64
IsClosed() bool
GetSession() *Session
GetSession() Session
}
Loading

0 comments on commit 30f4a23

Please sign in to comment.