Skip to content

Commit

Permalink
=gofmt
Browse files Browse the repository at this point in the history
  • Loading branch information
liangdas committed Sep 30, 2017
1 parent 07a00ca commit 2dbacdc
Show file tree
Hide file tree
Showing 47 changed files with 882 additions and 888 deletions.
119 changes: 58 additions & 61 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,23 @@ import (
"flag"
"fmt"
"github.com/liangdas/mqant/conf"
"github.com/liangdas/mqant/gate"
"github.com/liangdas/mqant/log"
"github.com/liangdas/mqant/module/base"
"github.com/liangdas/mqant/module"
"github.com/liangdas/mqant/module/base"
"github.com/liangdas/mqant/module/modules"
"github.com/liangdas/mqant/rpc"
"github.com/liangdas/mqant/rpc/base"
opentracing "github.com/opentracing/opentracing-go"
"hash/crc32"
"math"
"os"
"os/signal"
"strings"
"github.com/liangdas/mqant/rpc/base"
"github.com/liangdas/mqant/module/modules"
opentracing "github.com/opentracing/opentracing-go"
"os/exec"
"os/signal"
"path/filepath"
"github.com/liangdas/mqant/gate"
"strings"
)


func NewApp(version string) module.App {
app := new(DefaultApp)
app.routes = map[string]func(app module.App, Type string, hash string) module.ServerSession{}
Expand All @@ -48,24 +47,24 @@ func NewApp(version string) module.App {
index := int(math.Abs(float64(crc32.ChecksumIEEE([]byte(hash))))) % len(servers)
return servers[index]
}
app.rpcserializes=map[string]module.RPCSerialize{}
app.rpcserializes = map[string]module.RPCSerialize{}
app.version = version
return app
}

type DefaultApp struct {
module.App
version string
serverList map[string]module.ServerSession
settings conf.Config
routes map[string]func(app module.App, Type string, hash string) module.ServerSession
defaultRoutes func(app module.App, Type string, hash string) module.ServerSession
rpcserializes map[string]module.RPCSerialize
getTracer func ()opentracing.Tracer
configurationLoaded func (app module.App)
startup func (app module.App)
moduleInited func (app module.App,module module.Module)
judgeGuest func(session gate.Session)bool
version string
serverList map[string]module.ServerSession
settings conf.Config
routes map[string]func(app module.App, Type string, hash string) module.ServerSession
defaultRoutes func(app module.App, Type string, hash string) module.ServerSession
rpcserializes map[string]module.RPCSerialize
getTracer func() opentracing.Tracer
configurationLoaded func(app module.App)
startup func(app module.App)
moduleInited func(app module.App, module module.Module)
judgeGuest func(session gate.Session) bool
}

func (app *DefaultApp) Run(debug bool, mods ...module.Module) error {
Expand All @@ -75,17 +74,17 @@ func (app *DefaultApp) Run(debug bool, mods ...module.Module) error {
Logdir := flag.String("log", "", "Log file directory?")
flag.Parse() //解析输入的参数

ApplicationDir:=""
if *wdPath!=""{
ApplicationDir := ""
if *wdPath != "" {
_, err := os.Open(*wdPath)
if err != nil {
panic(err)
}
os.Chdir(*wdPath)
ApplicationDir,err=os.Getwd()
}else{
ApplicationDir, err = os.Getwd()
} else {
var err error
ApplicationDir,err=os.Getwd()
ApplicationDir, err = os.Getwd()
if err != nil {
file, _ := exec.LookPath(os.Args[0])
ApplicationPath, _ := filepath.Abs(file)
Expand All @@ -95,14 +94,14 @@ func (app *DefaultApp) Run(debug bool, mods ...module.Module) error {
}

defaultConfPath := fmt.Sprintf("%s/bin/conf/server.json", ApplicationDir)
defaultLogPath :=fmt.Sprintf("%s/bin/logs", ApplicationDir)
defaultLogPath := fmt.Sprintf("%s/bin/logs", ApplicationDir)

if *confPath==""{
*confPath=defaultConfPath
if *confPath == "" {
*confPath = defaultConfPath
}

if *Logdir==""{
*Logdir=defaultLogPath
if *Logdir == "" {
*Logdir = defaultLogPath
}

f, err := os.Open(*confPath)
Expand All @@ -121,12 +120,11 @@ func (app *DefaultApp) Run(debug bool, mods ...module.Module) error {
fmt.Println("Server configuration file path :", *confPath)
conf.LoadConfig(f.Name()) //加载配置文件
app.Configure(conf.Conf) //配置信息
log.InitBeego(debug, *ProcessID, *Logdir,conf.Conf.Log)

log.InitBeego(debug, *ProcessID, *Logdir, conf.Conf.Log)

log.Info("mqant %v starting up", app.version)

if app.configurationLoaded!=nil{
if app.configurationLoaded != nil {
app.configurationLoaded(app)
}

Expand All @@ -139,7 +137,7 @@ func (app *DefaultApp) Run(debug bool, mods ...module.Module) error {
}
app.OnInit(app.settings)
manager.Init(app, *ProcessID)
if app.startup!=nil{
if app.startup != nil {
app.startup(app)
}
// close
Expand All @@ -164,16 +162,15 @@ func (app *DefaultApp) getRoute(moduleType string) func(app module.App, Type str
return fn
}


func (app *DefaultApp) AddRPCSerialize(name string, Interface module.RPCSerialize) error{
if _,ok:=app.rpcserializes[name];ok{
return fmt.Errorf("The name(%s) has been occupied",name)
func (app *DefaultApp) AddRPCSerialize(name string, Interface module.RPCSerialize) error {
if _, ok := app.rpcserializes[name]; ok {
return fmt.Errorf("The name(%s) has been occupied", name)
}
app.rpcserializes[name]=Interface
app.rpcserializes[name] = Interface
return nil
}

func (app *DefaultApp)GetRPCSerialize()(map[string]module.RPCSerialize){
func (app *DefaultApp) GetRPCSerialize() map[string]module.RPCSerialize {
return app.rpcserializes
}

Expand All @@ -193,7 +190,7 @@ func (app *DefaultApp) OnInit(settings conf.Config) error {
//如果Id已经存在,说明有两个相同Id的模块,这种情况不能被允许,这里就直接抛异常 强制崩溃以免以后调试找不到问题
panic(fmt.Sprintf("ServerId (%s) Type (%s) of the modules already exist Can not be reused ServerId (%s) Type (%s)", m.GetId(), m.GetType(), moduel.Id, Type))
}
client, err := defaultrpc.NewRPCClient(app,moduel.Id)
client, err := defaultrpc.NewRPCClient(app, moduel.Id)
if err != nil {
continue
}
Expand All @@ -205,7 +202,7 @@ func (app *DefaultApp) OnInit(settings conf.Config) error {
//如果远程的rpc存在则创建一个对应的客户端
client.NewRedisClient(moduel.Redis)
}
session := basemodule.NewServerSession(moduel.Id,Type,client)
session := basemodule.NewServerSession(moduel.Id, Type, client)
app.serverList[moduel.Id] = session
log.Info("RPCClient create success type(%s) id(%s)", Type, moduel.Id)
}
Expand Down Expand Up @@ -290,57 +287,57 @@ func (app *DefaultApp) RpcInvokeNR(module module.RPCModule, moduleType string, _
return server.CallNR(_func, params...)
}

func (app *DefaultApp) RpcInvokeArgs(module module.RPCModule, moduleType string, _func string, ArgsType []string,args [][]byte) (result interface{}, err string) {
func (app *DefaultApp) RpcInvokeArgs(module module.RPCModule, moduleType string, _func string, ArgsType []string, args [][]byte) (result interface{}, err string) {
server, e := app.GetRouteServers(moduleType, module.GetServerId())
if e != nil {
err = e.Error()
return
}
return server.CallArgs(_func, ArgsType,args)
return server.CallArgs(_func, ArgsType, args)
}

func (app *DefaultApp) RpcInvokeNRArgs(module module.RPCModule, moduleType string, _func string, ArgsType []string,args [][]byte) (err error) {
func (app *DefaultApp) RpcInvokeNRArgs(module module.RPCModule, moduleType string, _func string, ArgsType []string, args [][]byte) (err error) {
server, err := app.GetRouteServers(moduleType, module.GetServerId())
if err != nil {
return
}
return server.CallNRArgs(_func, ArgsType,args)
return server.CallNRArgs(_func, ArgsType, args)
}
func (app *DefaultApp)DefaultTracer(_func func ()opentracing.Tracer) error{
app.getTracer=_func
func (app *DefaultApp) DefaultTracer(_func func() opentracing.Tracer) error {
app.getTracer = _func
return nil
}
func (app *DefaultApp)GetTracer()opentracing.Tracer{
if app.getTracer!=nil{
func (app *DefaultApp) GetTracer() opentracing.Tracer {
if app.getTracer != nil {
return app.getTracer()
}
return nil
}

func (app *DefaultApp) GetModuleInited()func (app module.App,module module.Module){
func (app *DefaultApp) GetModuleInited() func(app module.App, module module.Module) {
return app.moduleInited
}

func (app *DefaultApp) GetJudgeGuest()func(session gate.Session)bool{
func (app *DefaultApp) GetJudgeGuest() func(session gate.Session) bool {
return app.judgeGuest
}

func (app *DefaultApp)OnConfigurationLoaded(_func func (app module.App)) error{
app.configurationLoaded=_func
func (app *DefaultApp) OnConfigurationLoaded(_func func(app module.App)) error {
app.configurationLoaded = _func
return nil
}

func (app *DefaultApp)OnModuleInited(_func func (app module.App,module module.Module)) error{
app.moduleInited=_func
func (app *DefaultApp) OnModuleInited(_func func(app module.App, module module.Module)) error {
app.moduleInited = _func
return nil
}

func (app *DefaultApp)OnStartup(_func func (app module.App)) error{
app.startup=_func
func (app *DefaultApp) OnStartup(_func func(app module.App)) error {
app.startup = _func
return nil
}

func (app *DefaultApp)SetJudgeGuest(_func func(session gate.Session)bool) error{
app.judgeGuest=_func
func (app *DefaultApp) SetJudgeGuest(_func func(session gate.Session) bool) error {
app.judgeGuest = _func
return nil
}
}
26 changes: 12 additions & 14 deletions conf/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
var (
LenStackBuf = 1024

Conf = Config{}
Conf = Config{}
)

func LoadConfig(Path string) {
Expand All @@ -39,21 +39,19 @@ func LoadConfig(Path string) {
}

type Config struct {
Log map[string]interface{}
Rpc Rpc
Module map[string][]*ModuleSettings
Mqtt Mqtt
Master Master
Settings map[string]interface{}
Log map[string]interface{}
Rpc Rpc
Module map[string][]*ModuleSettings
Mqtt Mqtt
Master Master
Settings map[string]interface{}
}


type Rpc struct {
RpcExpired int //远程访问最后期限值 单位秒[默认5秒] 这个值指定了在客户端可以等待服务端多长时间来应答
LogSuccess bool //是否打印请求处理成功的日志
RpcExpired int //远程访问最后期限值 单位秒[默认5秒] 这个值指定了在客户端可以等待服务端多长时间来应答
LogSuccess bool //是否打印请求处理成功的日志
}


type Rabbitmq struct {
Uri string
Exchange string
Expand All @@ -64,8 +62,8 @@ type Rabbitmq struct {
}

type Redis struct {
Uri string //redis://:[password]@[ip]:[port]/[db]
Queue string
Uri string //redis://:[password]@[ip]:[port]/[db]
Queue string
}

type ModuleSettings struct {
Expand All @@ -74,7 +72,7 @@ type ModuleSettings struct {
ProcessID string
Settings map[string]interface{}
Rabbitmq *Rabbitmq
Redis *Redis
Redis *Redis
}

type Mqtt struct {
Expand Down
22 changes: 12 additions & 10 deletions gate/base/gate_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
package basegate

import (
"fmt"
"github.com/liangdas/mqant/gate"
"github.com/liangdas/mqant/log"
"github.com/liangdas/mqant/utils"
"fmt"
)

type handler struct {
Expand All @@ -40,7 +40,7 @@ func (h *handler) Connect(a gate.Agent) {
if a.GetSession() != nil {
h.sessions.Set(a.GetSession().GetSessionid(), a)
}
if h.gate.sessionLearner!=nil{
if h.gate.sessionLearner != nil {
h.gate.sessionLearner.Connect(a.GetSession())
}
}
Expand All @@ -50,13 +50,13 @@ func (h *handler) DisConnect(a gate.Agent) {
if a.GetSession() != nil {
h.sessions.Delete(a.GetSession().GetSessionid())
}
if h.gate.sessionLearner!=nil{
if h.gate.sessionLearner != nil {
h.gate.sessionLearner.DisConnect(a.GetSession())
}
}

func (h *handler)OnDestroy(){
for _,v:=range h.sessions.Items(){
func (h *handler) OnDestroy() {
for _, v := range h.sessions.Items() {
v.(gate.Agent).Close()
}
h.sessions.DeleteAll()
Expand Down Expand Up @@ -112,19 +112,21 @@ func (h *handler) Bind(Sessionid string, Userid string) (result gate.Session, er
result = agent.(gate.Agent).GetSession()
return
}

/**
*查询某一个userId是否连接中,这里只是查询这一个网关里面是否有userId客户端连接,如果有多个网关就需要遍历了
*/
func (h *handler) IsConnect(Sessionid string, Userid string) ( bool, string){
func (h *handler) IsConnect(Sessionid string, Userid string) (bool, string) {

for _,agent:=range h.sessions.Items(){
if agent.(gate.Agent).GetSession().GetUserid()==Userid{
return !agent.(gate.Agent).IsClosed(),""
for _, agent := range h.sessions.Items() {
if agent.(gate.Agent).GetSession().GetUserid() == Userid {
return !agent.(gate.Agent).IsClosed(), ""
}
}

return false,fmt.Sprintf("The gateway did not find the corresponding userId 【%s】",Userid)
return false, fmt.Sprintf("The gateway did not find the corresponding userId 【%s】", Userid)
}

/**
*UnBind the session with the the Userid.
*/
Expand Down
Loading

0 comments on commit 2dbacdc

Please sign in to comment.