forked from davyxu/cellnet
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathreq_type.go
129 lines (94 loc) · 2.96 KB
/
req_type.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package rpc
import (
"github.com/davyxu/cellnet"
"reflect"
"sync"
"time"
)
func CallType(sesOrPeer interface{}, reqMsg interface{}, timeout time.Duration, userCallback interface{}) {
callType(sesOrPeer, false, reqMsg, timeout, userCallback)
}
func CallSyncType(sesOrPeer interface{}, reqMsg interface{}, timeout time.Duration, userCallback interface{}) {
callType(sesOrPeer, true, reqMsg, timeout, userCallback)
}
// 异步RPC请求,按消息类型,一般用于客户端请求
// ud: peer/session, reqMsg:请求用的消息, userCallback: 返回消息类型回调 func( ackMsg *ackMsgType, error )
func callType(sesOrPeer interface{}, sync bool, reqMsg interface{}, timeout time.Duration, userCallback interface{}) {
// 获取回调第一个参数
funcType := reflect.TypeOf(userCallback)
if funcType.Kind() != reflect.Func {
panic("type rpc callback require 'func'")
}
if funcType.NumIn() != 2 {
panic("callback func param format like 'func(ack *YouMsgACK)'")
}
ackType := funcType.In(0)
if ackType.Kind() != reflect.Ptr {
panic("callback func param format like 'func(ack *YouMsgACK)'")
}
ackType = ackType.Elem()
callFunc := func(rawACK interface{}, err error) {
vCall := reflect.ValueOf(userCallback)
if rawACK == nil {
rawACK = reflect.New(ackType).Interface()
}
var errV reflect.Value
if err == nil {
errV = nilError
} else {
errV = reflect.ValueOf(err)
}
vCall.Call([]reflect.Value{reflect.ValueOf(rawACK), errV})
}
ses, err := getPeerSession(sesOrPeer)
if err != nil {
callFunc(nil, err)
return
}
createTypeRequest(sync, ackType, timeout, func() {
ses.Send(reqMsg)
}, callFunc)
}
var (
nilError = reflect.Zero(reflect.TypeOf((*error)(nil)).Elem())
callByType sync.Map // map[reflect.Type]func(interface{})
)
func createTypeRequest(sync bool, ackType reflect.Type, timeout time.Duration, onSend func(), onRecv func(rawACK interface{}, err error)) {
if sync {
feedBack := make(chan interface{})
callByType.Store(ackType, feedBack)
defer callByType.Delete(ackType)
onSend()
select {
case ack := <-feedBack:
onRecv(ack, nil)
case <-time.After(timeout):
onRecv(nil, ErrTimeout)
}
} else {
callByType.Store(ackType, func(rawACK interface{}, err error) {
onRecv(rawACK, err)
callByType.Delete(ackType)
})
onSend()
// 丢弃超时的类型,避免重复请求时,将第二次请求的消息删除
}
}
type TypeRPCHooker struct {
}
func (TypeRPCHooker) OnInboundEvent(inputEvent cellnet.Event) (outputEvent cellnet.Event) {
incomingMsgType := reflect.TypeOf(inputEvent.Message()).Elem()
if rawFeedback, ok := callByType.Load(incomingMsgType); ok {
switch feedBack := rawFeedback.(type) {
case func(rawACK interface{}, err error):
feedBack(inputEvent.Message(), nil)
case chan interface{}:
feedBack <- inputEvent.Message()
}
return inputEvent
}
return inputEvent
}
func (TypeRPCHooker) OnOutboundEvent(inputEvent cellnet.Event) (outputEvent cellnet.Event) {
return inputEvent
}