forked from davyxu/cellnet
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsyncrecv.go
66 lines (47 loc) · 1.23 KB
/
syncrecv.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package proc
import (
"github.com/davyxu/cellnet"
"reflect"
"sync"
)
// 同步接收消息器, 可选件,可作为流程测试辅助工具
type SyncReceiver struct {
evChan chan cellnet.Event
callback func(ev cellnet.Event)
}
// 将处理回调返回给BindProcessorHandler用于注册
func (self *SyncReceiver) EventCallback() cellnet.EventCallback {
return self.callback
}
// 持续阻塞,直到某个消息到达后,使用回调返回消息
func (self *SyncReceiver) Recv(callback cellnet.EventCallback) *SyncReceiver {
callback(<-self.evChan)
return self
}
// 持续阻塞,直到某个消息到达后,返回消息
func (self *SyncReceiver) WaitMessage(msgName string) (msg interface{}) {
var wg sync.WaitGroup
meta := cellnet.MessageMetaByFullName(msgName)
if meta == nil {
panic("unknown message name:" + msgName)
}
wg.Add(1)
self.Recv(func(ev cellnet.Event) {
inMeta := cellnet.MessageMetaByType(reflect.TypeOf(ev.Message()))
if inMeta == meta {
msg = ev.Message()
wg.Done()
}
})
wg.Wait()
return
}
func NewSyncReceiver(p cellnet.Peer) *SyncReceiver {
self := &SyncReceiver{
evChan: make(chan cellnet.Event),
}
self.callback = func(ev cellnet.Event) {
self.evChan <- ev
}
return self
}