diff --git a/event.go b/event.go index bb3d9e24..e29fccb3 100644 --- a/event.go +++ b/event.go @@ -2,11 +2,11 @@ package cellnet // 接收到消息 type RecvMsgEvent struct { - Ses BaseSession + Ses Session Msg interface{} } -func (self *RecvMsgEvent) BaseSession() BaseSession { +func (self *RecvMsgEvent) Session() Session { return self.Ses } @@ -20,7 +20,7 @@ func (self *RecvMsgEvent) Send(msg interface{}) { // 会话开始发送数据事件 type SendMsgEvent struct { - Ses BaseSession + Ses Session Msg interface{} // 用户需要发送的消息 } @@ -28,6 +28,6 @@ func (self *SendMsgEvent) Message() interface{} { return self.Msg } -func (self *SendMsgEvent) BaseSession() BaseSession { +func (self *SendMsgEvent) Session() Session { return self.Ses } diff --git a/eventproc.go b/eventproc.go index 33d2cca8..75220488 100644 --- a/eventproc.go +++ b/eventproc.go @@ -1,14 +1,14 @@ package cellnet type Event interface { - BaseSession() BaseSession + Session() Session Message() interface{} } // 消息处理器 type MessageProcessor interface { - OnRecvMessage(ses BaseSession) (raw interface{}, err error) - OnSendMessage(ses BaseSession, raw interface{}) error + OnRecvMessage(ses Session) (raw interface{}, err error) + OnSendMessage(ses Session, raw interface{}) error } // 处理钩子 @@ -35,7 +35,7 @@ type UserMessageHandlerQueued func(ev Event) func (self UserMessageHandlerQueued) OnEvent(ev Event) { - SessionQueuedCall(ev.BaseSession(), func() { + SessionQueuedCall(ev.Session(), func() { self(ev) }) diff --git a/meta.go b/meta.go index 0a12ce48..8bfa4052 100644 --- a/meta.go +++ b/meta.go @@ -88,12 +88,12 @@ func RegisterMessageMeta(meta *MessageMeta) { if meta.ID == 0 { panic("message meta require 'ID' field: " + meta.TypeName()) + } - if _, ok := metaByID[meta.ID]; ok { - panic(fmt.Sprintf("Duplicate message meta register by id: %d", meta.ID)) - } else { - metaByID[meta.ID] = meta - } + if _, ok := metaByID[meta.ID]; ok { + panic(fmt.Sprintf("Duplicate message meta register by id: %d", meta.ID)) + } else { + metaByID[meta.ID] = meta } } diff --git a/peer/http/connector.go b/peer/http/connector.go index 62117cd7..db81d408 100644 --- a/peer/http/connector.go +++ b/peer/http/connector.go @@ -11,6 +11,10 @@ import ( "reflect" ) +type HttpRequestor interface { + Request(method string, raw interface{}) (interface{}, error) +} + type httpConnector struct { peer.CorePeerProperty peer.CoreProcessorBundle diff --git a/peer/http/session.go b/peer/http/session.go index 5d21aaa0..5484da80 100644 --- a/peer/http/session.go +++ b/peer/http/session.go @@ -29,6 +29,14 @@ func (self *httpSession) Raw() interface{} { return nil } +func (self *httpSession) ID() int64 { + return 0 +} + +// 取原始连接 +func (self *httpSession) Close() { +} + // 取会话归属的通讯端 func (self *httpSession) Peer() cellnet.Peer { return self.peerInterface @@ -40,7 +48,7 @@ func (self *httpSession) Send(raw interface{}) { self.SendMessage(&cellnet.SendMsgEvent{self, raw}) } -func newHttpSession(peerIns cellnet.Peer, req *http.Request, response http.ResponseWriter) cellnet.BaseSession { +func newHttpSession(peerIns cellnet.Peer, req *http.Request, response http.ResponseWriter) cellnet.Session { return &httpSession{ req: req, diff --git a/peer/procbundle.go b/peer/procbundle.go index 5b188d30..26492c82 100644 --- a/peer/procbundle.go +++ b/peer/procbundle.go @@ -33,7 +33,7 @@ func (self *CoreProcessorBundle) SetEventHandler(v cellnet.EventHandler) { var notHandled = errors.New("not handled") -func (self *CoreProcessorBundle) ReadMessage(ses cellnet.BaseSession) (msg interface{}, err error) { +func (self *CoreProcessorBundle) ReadMessage(ses cellnet.Session) (msg interface{}, err error) { if self.proc != nil { return self.proc.OnRecvMessage(ses) @@ -49,7 +49,7 @@ func (self *CoreProcessorBundle) SendMessage(ev cellnet.Event) { } if self.proc != nil { - self.proc.OnSendMessage(ev.BaseSession(), ev.Message()) + self.proc.OnSendMessage(ev.Session(), ev.Message()) } } diff --git a/proc/http/logger.go b/proc/http/logger.go index 5a8e8c24..e10975fc 100644 --- a/proc/http/logger.go +++ b/proc/http/logger.go @@ -11,7 +11,7 @@ type LogHooker struct { func (LogHooker) OnInboundEvent(raw cellnet.Event) { msg := raw.Message() - ses := raw.BaseSession() + ses := raw.Session() if msglog.IsBlockedMessageByID(cellnet.MessageToID(msg)) { return @@ -35,7 +35,7 @@ func (LogHooker) OnInboundEvent(raw cellnet.Event) { func (LogHooker) OnOutboundEvent(raw cellnet.Event) { msg := raw.Message() - ses := raw.BaseSession() + ses := raw.Session() if msglog.IsBlockedMessageByID(cellnet.MessageToID(msg)) { return diff --git a/proc/http/setup.go b/proc/http/setup.go index 865582a5..2216ca69 100644 --- a/proc/http/setup.go +++ b/proc/http/setup.go @@ -21,7 +21,7 @@ type MessageProc struct { var errNotHandled = errors.New("request not handled") -func (MessageProc) OnRecvMessage(ses cellnet.BaseSession) (msg interface{}, err error) { +func (MessageProc) OnRecvMessage(ses cellnet.Session) (msg interface{}, err error) { httpContext := ses.(HttpContext) req := httpContext.Request() @@ -42,7 +42,7 @@ func (MessageProc) OnRecvMessage(ses cellnet.BaseSession) (msg interface{}, err return nil, errNotHandled } -func (MessageProc) OnSendMessage(ses cellnet.BaseSession, raw interface{}) error { +func (MessageProc) OnSendMessage(ses cellnet.Session, raw interface{}) error { httpContext := ses.(HttpContext) resp := httpContext.Response() diff --git a/proc/kcp/context.go b/proc/kcp/context.go index 7e3ba03c..7c6bc410 100644 --- a/proc/kcp/context.go +++ b/proc/kcp/context.go @@ -16,7 +16,7 @@ type kcpContext struct { readSignal chan struct{} exitTickSignal chan struct{} - ses cellnet.BaseSession + ses cellnet.Session closed bool } @@ -44,7 +44,7 @@ func (self *kcpContext) tickLoop() { } -func newContext(ses cellnet.BaseSession) *kcpContext { +func newContext(ses cellnet.Session) *kcpContext { var self *kcpContext diff --git a/proc/kcp/setup.go b/proc/kcp/setup.go index e699fba7..6c49ce69 100644 --- a/proc/kcp/setup.go +++ b/proc/kcp/setup.go @@ -9,7 +9,7 @@ import ( const kcpTag = "kcp" -func mustKCPContext(ses cellnet.BaseSession) (ctx *kcpContext) { +func mustKCPContext(ses cellnet.Session) (ctx *kcpContext) { if ses.(cellnet.PropertySet).GetProperty(kcpTag, &ctx) { return } else { @@ -20,7 +20,7 @@ func mustKCPContext(ses cellnet.BaseSession) (ctx *kcpContext) { type MessageProc struct { } -func (MessageProc) OnRecvMessage(ses cellnet.BaseSession) (msg interface{}, err error) { +func (MessageProc) OnRecvMessage(ses cellnet.Session) (msg interface{}, err error) { ctx := mustKCPContext(ses) @@ -41,7 +41,7 @@ func (MessageProc) OnRecvMessage(ses cellnet.BaseSession) (msg interface{}, err return } -func (MessageProc) OnSendMessage(ses cellnet.BaseSession, msg interface{}) error { +func (MessageProc) OnSendMessage(ses cellnet.Session, msg interface{}) error { return mustKCPContext(ses).sendMessage(msg) } @@ -56,9 +56,9 @@ func (self udpEventHooker) OnInboundEvent(ev cellnet.Event) { switch ev.Message().(type) { case *cellnet.SessionInit: - ev.BaseSession().(cellnet.PropertySet).SetProperty(kcpTag, newContext(ev.BaseSession())) + ev.Session().(cellnet.PropertySet).SetProperty(kcpTag, newContext(ev.Session())) case *cellnet.SessionClosed: - mustKCPContext(ev.BaseSession()).Close() + mustKCPContext(ev.Session()).Close() } } diff --git a/proc/msglog/proc.go b/proc/msglog/proc.go index 47f2e84a..a224d79b 100644 --- a/proc/msglog/proc.go +++ b/proc/msglog/proc.go @@ -17,7 +17,7 @@ type LogHooker struct { func (LogHooker) OnInboundEvent(ev cellnet.Event) { msg := ev.Message() - ses := ev.BaseSession().(cellnet.Session) + ses := ev.Session() if IsBlockedMessageByID(cellnet.MessageToID(msg)) { return @@ -46,7 +46,7 @@ func (LogHooker) OnInboundEvent(ev cellnet.Event) { func (LogHooker) OnOutboundEvent(ev cellnet.Event) { msg := ev.Message() - ses := ev.BaseSession().(cellnet.Session) + ses := ev.Session() if rawPkt, ok := msg.(comm.RawPacket); ok { rawMsg, _, err := codec.DecodeMessage(rawPkt.MsgID, rawPkt.MsgData) diff --git a/proc/rpc/event.go b/proc/rpc/event.go index bd93d07c..c790ce5e 100644 --- a/proc/rpc/event.go +++ b/proc/rpc/event.go @@ -11,7 +11,7 @@ type RecvMsgEvent struct { callid int64 } -func (self *RecvMsgEvent) BaseSession() cellnet.BaseSession { +func (self *RecvMsgEvent) Session() cellnet.Session { return self.ses } diff --git a/proc/rpc/setup.go b/proc/rpc/setup.go index bae096d2..e0f184fe 100644 --- a/proc/rpc/setup.go +++ b/proc/rpc/setup.go @@ -29,23 +29,23 @@ func (self RPCHooker) OnInboundEvent(ev cellnet.Event) { return } - peerInfo := ev.BaseSession().Peer().(interface { + peerInfo := ev.Session().Peer().(interface { Name() string }) log.Debugf("#rpc recv(%s)@%d %s(%d) | %s", peerInfo.Name(), - ev.BaseSession().(cellnet.Session).ID(), + ev.Session().ID(), meta.TypeName(), meta.ID, cellnet.MessageToString(msg)) - poster := ev.BaseSession().Peer().(peer.MessagePoster) + poster := ev.Session().Peer().(peer.MessagePoster) switch ev.Message().(type) { case *RemoteCallREQ: // 服务端收到客户端的请求 - poster.PostEvent(&RecvMsgEvent{ev.BaseSession().(cellnet.Session), msg, rpcMsg.GetCallID()}) + poster.PostEvent(&RecvMsgEvent{ev.Session(), msg, rpcMsg.GetCallID()}) case *RemoteCallACK: // 客户端收到服务器的回应 request := getRequest(rpcMsg.GetCallID()) @@ -68,13 +68,13 @@ func (self RPCHooker) OnOutboundEvent(ev cellnet.Event) { return } - peerInfo := ev.BaseSession().Peer().(interface { + peerInfo := ev.Session().Peer().(interface { Name() string }) log.Debugf("#rpc send(%s)@%d %s(%d) | %s", peerInfo.Name(), - ev.BaseSession().(cellnet.Session).ID(), + ev.Session().ID(), meta.TypeName(), meta.ID, cellnet.MessageToString(msg)) diff --git a/proc/tcp/setup.go b/proc/tcp/setup.go index 2e9ae994..e4f43ddf 100644 --- a/proc/tcp/setup.go +++ b/proc/tcp/setup.go @@ -11,7 +11,7 @@ import ( type MessageProc struct { } -func (MessageProc) OnRecvMessage(ses cellnet.BaseSession) (msg interface{}, err error) { +func (MessageProc) OnRecvMessage(ses cellnet.Session) (msg interface{}, err error) { reader, ok := ses.Raw().(io.Reader) @@ -23,7 +23,7 @@ func (MessageProc) OnRecvMessage(ses cellnet.BaseSession) (msg interface{}, err return RecvLTVPacket(reader) } -func (MessageProc) OnSendMessage(ses cellnet.BaseSession, msg interface{}) error { +func (MessageProc) OnSendMessage(ses cellnet.Session, msg interface{}) error { writer, ok := ses.Raw().(io.Writer) diff --git a/proc/udp/setup.go b/proc/udp/setup.go index 44cf0a88..7e762f12 100644 --- a/proc/udp/setup.go +++ b/proc/udp/setup.go @@ -10,14 +10,14 @@ import ( type MessageProc struct { } -func (MessageProc) OnRecvMessage(ses cellnet.BaseSession) (msg interface{}, err error) { +func (MessageProc) OnRecvMessage(ses cellnet.Session) (msg interface{}, err error) { data := ses.Raw().(udp.DataReader).ReadData() return RecvLTVPacket(data) } -func (MessageProc) OnSendMessage(ses cellnet.BaseSession, msg interface{}) error { +func (MessageProc) OnSendMessage(ses cellnet.Session, msg interface{}) error { writer := ses.(udp.DataWriter) diff --git a/queue.go b/queue.go index b3689598..a997cb20 100644 --- a/queue.go +++ b/queue.go @@ -103,7 +103,7 @@ func NewEventQueue() EventQueue { } } -func SessionQueuedCall(ses BaseSession, callback func()) { +func SessionQueuedCall(ses Session, callback func()) { if ses == nil { return } diff --git a/session.go b/session.go index 35c4c363..f1aaf825 100644 --- a/session.go +++ b/session.go @@ -1,7 +1,7 @@ package cellnet -// 基础会话 -type BaseSession interface { +// 长连接 +type Session interface { // 获得原始的Socket连接 Raw() interface{} @@ -11,11 +11,6 @@ type BaseSession interface { // 发送消息,消息需要以指针格式传入 Send(msg interface{}) -} - -// 长连接 -type Session interface { - BaseSession // 断开 Close() diff --git a/tests/echo_test.go b/tests/echo_test.go index ad05aa04..3d9b58c1 100644 --- a/tests/echo_test.go +++ b/tests/echo_test.go @@ -61,13 +61,13 @@ func echo_StartServer(context *echoContext) { fmt.Printf("server recv %+v\n", msg) - ev.BaseSession().(cellnet.Session).Send(&TestEchoACK{ + ev.Session().Send(&TestEchoACK{ Msg: msg.Msg, Value: msg.Value, }) case *cellnet.SessionClosed: - fmt.Println("session closed: ", ev.BaseSession().(cellnet.Session).ID()) + fmt.Println("session closed: ", ev.Session().ID()) } }) @@ -91,7 +91,7 @@ func echo_StartClient(echoContext *echoContext) { switch msg := ev.Message().(type) { case *cellnet.SessionConnected: fmt.Println("client connected") - ev.BaseSession().(cellnet.Session).Send(&TestEchoACK{ + ev.Session().Send(&TestEchoACK{ Msg: "hello", Value: 1234, }) diff --git a/tests/gracefulexit_test.go b/tests/gracefulexit_test.go index 314d24a3..b120c550 100644 --- a/tests/gracefulexit_test.go +++ b/tests/gracefulexit_test.go @@ -31,7 +31,7 @@ func recreateConn_StartServer() { fmt.Printf("server recv %+v\n", msg) - ev.BaseSession().(cellnet.Session).Send(&TestEchoACK{ + ev.Session().Send(&TestEchoACK{ Msg: msg.Msg, Value: msg.Value, }) diff --git a/tests/http_test.go b/tests/http_test.go index 226b3062..dedcdaaa 100644 --- a/tests/http_test.go +++ b/tests/http_test.go @@ -8,7 +8,7 @@ import ( _ "github.com/davyxu/cellnet/codec/httpform" _ "github.com/davyxu/cellnet/codec/json" "github.com/davyxu/cellnet/peer" - _ "github.com/davyxu/cellnet/peer/http" + httppeer "github.com/davyxu/cellnet/peer/http" "github.com/davyxu/cellnet/proc" _ "github.com/davyxu/cellnet/proc/http" "io/ioutil" @@ -29,9 +29,7 @@ func TestHttp(t *testing.T) { switch raw.Message().(type) { case *HttpEchoREQ: - raw.(interface { - Send(interface{}) - }).Send(&HttpEchoACK{ + raw.Session().Send(&HttpEchoACK{ Status: 0, Token: "ok", }) @@ -50,14 +48,14 @@ func TestHttp(t *testing.T) { } func requestor(t *testing.T) { - p := peer.NewPeer("http.Connector") - pset := p.(cellnet.PropertySet) + peerIns := peer.NewPeer("http.Connector") + pset := peerIns.(cellnet.PropertySet) pset.SetProperty("Name", "httpclient") pset.SetProperty("Address", "127.0.0.1:8081") - raw, err := p.(interface { - Request(method string, raw interface{}) (interface{}, error) - }).Request("GET", &HttpEchoREQ{ + requestor := peerIns.(httppeer.HttpRequestor) + + raw, err := requestor.Request("GET", &HttpEchoREQ{ UserName: "kitty", }) diff --git a/tests/rpc_test.go b/tests/rpc_test.go index 402506eb..4af36a90 100644 --- a/tests/rpc_test.go +++ b/tests/rpc_test.go @@ -54,7 +54,7 @@ func syncRPC_OnClientEvent(ev cellnet.Event) { // 同步阻塞请求必须并发启动,否则客户端无法接收数据 go func(id int) { - result, err := rpc.CallSync(ev.BaseSession(), &TestEchoACK{ + result, err := rpc.CallSync(ev.Session(), &TestEchoACK{ Msg: "sync", Value: 1234, }, time.Second*5) @@ -83,7 +83,7 @@ func asyncRPC_OnClientEvent(ev cellnet.Event) { copy := i + 1 - rpc.Call(ev.BaseSession(), &TestEchoACK{ + rpc.Call(ev.Session(), &TestEchoACK{ Msg: "async", Value: 1234, }, time.Second*5, func(feedback interface{}) { diff --git a/tests/udp_ses_test.go b/tests/udp_ses_test.go index 20d8be18..62f13bbd 100644 --- a/tests/udp_ses_test.go +++ b/tests/udp_ses_test.go @@ -40,7 +40,7 @@ func TestUDPClientPositiveClose(t *testing.T) { switch ev.Message().(type) { case *cellnet.SessionConnected: - ev.BaseSession().(cellnet.Session).Close() + ev.Session().Close() case *cellnet.SessionClosed: signal.Done(1) } @@ -67,7 +67,7 @@ func TestUDPServerPositiveClose(t *testing.T) { switch ev.Message().(type) { case *cellnet.SessionAccepted: - ev.BaseSession().(cellnet.Session).Close() + ev.Session().Close() signal.Done(1) } }) @@ -83,7 +83,7 @@ func TestUDPServerPositiveClose(t *testing.T) { switch ev.Message().(type) { case *cellnet.SessionConnected: - ev.BaseSession().(cellnet.Session).Send(&TestEchoACK{ + ev.Session().Send(&TestEchoACK{ Msg: "hello", Value: 1234, })