Skip to content

Commit

Permalink
去掉BaseSession
Browse files Browse the repository at this point in the history
  • Loading branch information
davyxu committed Feb 8, 2018
1 parent 1adeb4d commit a78e4b1
Show file tree
Hide file tree
Showing 22 changed files with 71 additions and 66 deletions.
8 changes: 4 additions & 4 deletions event.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package cellnet

// 接收到消息
type RecvMsgEvent struct {
Ses BaseSession
Ses Session
Msg interface{}
}

func (self *RecvMsgEvent) BaseSession() BaseSession {
func (self *RecvMsgEvent) Session() Session {
return self.Ses
}

Expand All @@ -20,14 +20,14 @@ func (self *RecvMsgEvent) Send(msg interface{}) {

// 会话开始发送数据事件
type SendMsgEvent struct {
Ses BaseSession
Ses Session
Msg interface{} // 用户需要发送的消息
}

func (self *SendMsgEvent) Message() interface{} {
return self.Msg
}

func (self *SendMsgEvent) BaseSession() BaseSession {
func (self *SendMsgEvent) Session() Session {
return self.Ses
}
8 changes: 4 additions & 4 deletions eventproc.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package cellnet

type Event interface {
BaseSession() BaseSession
Session() Session
Message() interface{}
}

// 消息处理器
type MessageProcessor interface {
OnRecvMessage(ses BaseSession) (raw interface{}, err error)
OnSendMessage(ses BaseSession, raw interface{}) error
OnRecvMessage(ses Session) (raw interface{}, err error)
OnSendMessage(ses Session, raw interface{}) error
}

// 处理钩子
Expand All @@ -35,7 +35,7 @@ type UserMessageHandlerQueued func(ev Event)

func (self UserMessageHandlerQueued) OnEvent(ev Event) {

SessionQueuedCall(ev.BaseSession(), func() {
SessionQueuedCall(ev.Session(), func() {

self(ev)
})
Expand Down
10 changes: 5 additions & 5 deletions meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,12 @@ func RegisterMessageMeta(meta *MessageMeta) {

if meta.ID == 0 {
panic("message meta require 'ID' field: " + meta.TypeName())
}

if _, ok := metaByID[meta.ID]; ok {
panic(fmt.Sprintf("Duplicate message meta register by id: %d", meta.ID))
} else {
metaByID[meta.ID] = meta
}
if _, ok := metaByID[meta.ID]; ok {
panic(fmt.Sprintf("Duplicate message meta register by id: %d", meta.ID))
} else {
metaByID[meta.ID] = meta
}

}
Expand Down
4 changes: 4 additions & 0 deletions peer/http/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ import (
"reflect"
)

type HttpRequestor interface {
Request(method string, raw interface{}) (interface{}, error)
}

type httpConnector struct {
peer.CorePeerProperty
peer.CoreProcessorBundle
Expand Down
10 changes: 9 additions & 1 deletion peer/http/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ func (self *httpSession) Raw() interface{} {
return nil
}

func (self *httpSession) ID() int64 {
return 0
}

// 取原始连接
func (self *httpSession) Close() {
}

// 取会话归属的通讯端
func (self *httpSession) Peer() cellnet.Peer {
return self.peerInterface
Expand All @@ -40,7 +48,7 @@ func (self *httpSession) Send(raw interface{}) {
self.SendMessage(&cellnet.SendMsgEvent{self, raw})
}

func newHttpSession(peerIns cellnet.Peer, req *http.Request, response http.ResponseWriter) cellnet.BaseSession {
func newHttpSession(peerIns cellnet.Peer, req *http.Request, response http.ResponseWriter) cellnet.Session {

return &httpSession{
req: req,
Expand Down
4 changes: 2 additions & 2 deletions peer/procbundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (self *CoreProcessorBundle) SetEventHandler(v cellnet.EventHandler) {

var notHandled = errors.New("not handled")

func (self *CoreProcessorBundle) ReadMessage(ses cellnet.BaseSession) (msg interface{}, err error) {
func (self *CoreProcessorBundle) ReadMessage(ses cellnet.Session) (msg interface{}, err error) {

if self.proc != nil {
return self.proc.OnRecvMessage(ses)
Expand All @@ -49,7 +49,7 @@ func (self *CoreProcessorBundle) SendMessage(ev cellnet.Event) {
}

if self.proc != nil {
self.proc.OnSendMessage(ev.BaseSession(), ev.Message())
self.proc.OnSendMessage(ev.Session(), ev.Message())
}
}

Expand Down
4 changes: 2 additions & 2 deletions proc/http/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type LogHooker struct {
func (LogHooker) OnInboundEvent(raw cellnet.Event) {

msg := raw.Message()
ses := raw.BaseSession()
ses := raw.Session()

if msglog.IsBlockedMessageByID(cellnet.MessageToID(msg)) {
return
Expand All @@ -35,7 +35,7 @@ func (LogHooker) OnInboundEvent(raw cellnet.Event) {
func (LogHooker) OnOutboundEvent(raw cellnet.Event) {

msg := raw.Message()
ses := raw.BaseSession()
ses := raw.Session()

if msglog.IsBlockedMessageByID(cellnet.MessageToID(msg)) {
return
Expand Down
4 changes: 2 additions & 2 deletions proc/http/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type MessageProc struct {

var errNotHandled = errors.New("request not handled")

func (MessageProc) OnRecvMessage(ses cellnet.BaseSession) (msg interface{}, err error) {
func (MessageProc) OnRecvMessage(ses cellnet.Session) (msg interface{}, err error) {

httpContext := ses.(HttpContext)
req := httpContext.Request()
Expand All @@ -42,7 +42,7 @@ func (MessageProc) OnRecvMessage(ses cellnet.BaseSession) (msg interface{}, err
return nil, errNotHandled
}

func (MessageProc) OnSendMessage(ses cellnet.BaseSession, raw interface{}) error {
func (MessageProc) OnSendMessage(ses cellnet.Session, raw interface{}) error {

httpContext := ses.(HttpContext)
resp := httpContext.Response()
Expand Down
4 changes: 2 additions & 2 deletions proc/kcp/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type kcpContext struct {
readSignal chan struct{}
exitTickSignal chan struct{}

ses cellnet.BaseSession
ses cellnet.Session

closed bool
}
Expand Down Expand Up @@ -44,7 +44,7 @@ func (self *kcpContext) tickLoop() {

}

func newContext(ses cellnet.BaseSession) *kcpContext {
func newContext(ses cellnet.Session) *kcpContext {

var self *kcpContext

Expand Down
10 changes: 5 additions & 5 deletions proc/kcp/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

const kcpTag = "kcp"

func mustKCPContext(ses cellnet.BaseSession) (ctx *kcpContext) {
func mustKCPContext(ses cellnet.Session) (ctx *kcpContext) {
if ses.(cellnet.PropertySet).GetProperty(kcpTag, &ctx) {
return
} else {
Expand All @@ -20,7 +20,7 @@ func mustKCPContext(ses cellnet.BaseSession) (ctx *kcpContext) {
type MessageProc struct {
}

func (MessageProc) OnRecvMessage(ses cellnet.BaseSession) (msg interface{}, err error) {
func (MessageProc) OnRecvMessage(ses cellnet.Session) (msg interface{}, err error) {

ctx := mustKCPContext(ses)

Expand All @@ -41,7 +41,7 @@ func (MessageProc) OnRecvMessage(ses cellnet.BaseSession) (msg interface{}, err
return
}

func (MessageProc) OnSendMessage(ses cellnet.BaseSession, msg interface{}) error {
func (MessageProc) OnSendMessage(ses cellnet.Session, msg interface{}) error {

return mustKCPContext(ses).sendMessage(msg)
}
Expand All @@ -56,9 +56,9 @@ func (self udpEventHooker) OnInboundEvent(ev cellnet.Event) {

switch ev.Message().(type) {
case *cellnet.SessionInit:
ev.BaseSession().(cellnet.PropertySet).SetProperty(kcpTag, newContext(ev.BaseSession()))
ev.Session().(cellnet.PropertySet).SetProperty(kcpTag, newContext(ev.Session()))
case *cellnet.SessionClosed:
mustKCPContext(ev.BaseSession()).Close()
mustKCPContext(ev.Session()).Close()
}
}

Expand Down
4 changes: 2 additions & 2 deletions proc/msglog/proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type LogHooker struct {
func (LogHooker) OnInboundEvent(ev cellnet.Event) {

msg := ev.Message()
ses := ev.BaseSession().(cellnet.Session)
ses := ev.Session()

if IsBlockedMessageByID(cellnet.MessageToID(msg)) {
return
Expand Down Expand Up @@ -46,7 +46,7 @@ func (LogHooker) OnInboundEvent(ev cellnet.Event) {
func (LogHooker) OnOutboundEvent(ev cellnet.Event) {

msg := ev.Message()
ses := ev.BaseSession().(cellnet.Session)
ses := ev.Session()

if rawPkt, ok := msg.(comm.RawPacket); ok {
rawMsg, _, err := codec.DecodeMessage(rawPkt.MsgID, rawPkt.MsgData)
Expand Down
2 changes: 1 addition & 1 deletion proc/rpc/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type RecvMsgEvent struct {
callid int64
}

func (self *RecvMsgEvent) BaseSession() cellnet.BaseSession {
func (self *RecvMsgEvent) Session() cellnet.Session {
return self.ses
}

Expand Down
12 changes: 6 additions & 6 deletions proc/rpc/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,23 @@ func (self RPCHooker) OnInboundEvent(ev cellnet.Event) {
return
}

peerInfo := ev.BaseSession().Peer().(interface {
peerInfo := ev.Session().Peer().(interface {
Name() string
})

log.Debugf("#rpc recv(%s)@%d %s(%d) | %s",
peerInfo.Name(),
ev.BaseSession().(cellnet.Session).ID(),
ev.Session().ID(),
meta.TypeName(),
meta.ID,
cellnet.MessageToString(msg))

poster := ev.BaseSession().Peer().(peer.MessagePoster)
poster := ev.Session().Peer().(peer.MessagePoster)

switch ev.Message().(type) {
case *RemoteCallREQ: // 服务端收到客户端的请求

poster.PostEvent(&RecvMsgEvent{ev.BaseSession().(cellnet.Session), msg, rpcMsg.GetCallID()})
poster.PostEvent(&RecvMsgEvent{ev.Session(), msg, rpcMsg.GetCallID()})

case *RemoteCallACK: // 客户端收到服务器的回应
request := getRequest(rpcMsg.GetCallID())
Expand All @@ -68,13 +68,13 @@ func (self RPCHooker) OnOutboundEvent(ev cellnet.Event) {
return
}

peerInfo := ev.BaseSession().Peer().(interface {
peerInfo := ev.Session().Peer().(interface {
Name() string
})

log.Debugf("#rpc send(%s)@%d %s(%d) | %s",
peerInfo.Name(),
ev.BaseSession().(cellnet.Session).ID(),
ev.Session().ID(),
meta.TypeName(),
meta.ID,
cellnet.MessageToString(msg))
Expand Down
4 changes: 2 additions & 2 deletions proc/tcp/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
type MessageProc struct {
}

func (MessageProc) OnRecvMessage(ses cellnet.BaseSession) (msg interface{}, err error) {
func (MessageProc) OnRecvMessage(ses cellnet.Session) (msg interface{}, err error) {

reader, ok := ses.Raw().(io.Reader)

Expand All @@ -23,7 +23,7 @@ func (MessageProc) OnRecvMessage(ses cellnet.BaseSession) (msg interface{}, err
return RecvLTVPacket(reader)
}

func (MessageProc) OnSendMessage(ses cellnet.BaseSession, msg interface{}) error {
func (MessageProc) OnSendMessage(ses cellnet.Session, msg interface{}) error {

writer, ok := ses.Raw().(io.Writer)

Expand Down
4 changes: 2 additions & 2 deletions proc/udp/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ import (
type MessageProc struct {
}

func (MessageProc) OnRecvMessage(ses cellnet.BaseSession) (msg interface{}, err error) {
func (MessageProc) OnRecvMessage(ses cellnet.Session) (msg interface{}, err error) {

data := ses.Raw().(udp.DataReader).ReadData()

return RecvLTVPacket(data)
}

func (MessageProc) OnSendMessage(ses cellnet.BaseSession, msg interface{}) error {
func (MessageProc) OnSendMessage(ses cellnet.Session, msg interface{}) error {

writer := ses.(udp.DataWriter)

Expand Down
2 changes: 1 addition & 1 deletion queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func NewEventQueue() EventQueue {
}
}

func SessionQueuedCall(ses BaseSession, callback func()) {
func SessionQueuedCall(ses Session, callback func()) {
if ses == nil {
return
}
Expand Down
9 changes: 2 additions & 7 deletions session.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package cellnet

// 基础会话
type BaseSession interface {
// 长连接
type Session interface {

// 获得原始的Socket连接
Raw() interface{}
Expand All @@ -11,11 +11,6 @@ type BaseSession interface {

// 发送消息,消息需要以指针格式传入
Send(msg interface{})
}

// 长连接
type Session interface {
BaseSession

// 断开
Close()
Expand Down
6 changes: 3 additions & 3 deletions tests/echo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,13 @@ func echo_StartServer(context *echoContext) {

fmt.Printf("server recv %+v\n", msg)

ev.BaseSession().(cellnet.Session).Send(&TestEchoACK{
ev.Session().Send(&TestEchoACK{
Msg: msg.Msg,
Value: msg.Value,
})

case *cellnet.SessionClosed:
fmt.Println("session closed: ", ev.BaseSession().(cellnet.Session).ID())
fmt.Println("session closed: ", ev.Session().ID())
}

})
Expand All @@ -91,7 +91,7 @@ func echo_StartClient(echoContext *echoContext) {
switch msg := ev.Message().(type) {
case *cellnet.SessionConnected:
fmt.Println("client connected")
ev.BaseSession().(cellnet.Session).Send(&TestEchoACK{
ev.Session().Send(&TestEchoACK{
Msg: "hello",
Value: 1234,
})
Expand Down
2 changes: 1 addition & 1 deletion tests/gracefulexit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func recreateConn_StartServer() {

fmt.Printf("server recv %+v\n", msg)

ev.BaseSession().(cellnet.Session).Send(&TestEchoACK{
ev.Session().Send(&TestEchoACK{
Msg: msg.Msg,
Value: msg.Value,
})
Expand Down
Loading

0 comments on commit a78e4b1

Please sign in to comment.