forked from davyxu/cellnet
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathqueue.go
137 lines (96 loc) · 2.24 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
/*
dispatcher包提供消息队列, 消息注册+派发
封装消息解包, 打包的过程
*/
package cellnet
import (
"time"
)
type EventQueue interface {
// 注册事件回调
RegisterCallback(id int, f func(interface{}))
// 设置事件截获钩子, 在CallData中调用钩子
InjectData(func(interface{}) bool)
// 投递事件, 通过队列到达消费者端
PostData(data interface{})
// 直接调用消费者端的handler
CallData(data interface{})
// 延时投递
DelayPostData(dur time.Duration, callback func())
}
type evQueue struct {
// 保证注册发生在初始化, 读取发生在之后可以不用锁
contextMap map[int][]func(interface{})
queue chan interface{}
inject func(interface{}) bool
}
// 注册事件回调
func (self *evQueue) RegisterCallback(id int, f func(interface{})) {
// 事件
em, ok := self.contextMap[id]
// 新建
if !ok {
em = make([]func(interface{}), 0)
}
em = append(em, f)
self.contextMap[id] = em
}
// 注入回调, 返回false时表示不再投递
func (self *evQueue) InjectData(f func(interface{}) bool) {
self.inject = f
}
func (self *evQueue) Exists(id int) bool {
_, ok := self.contextMap[id]
return ok
}
// 派发到队列
func (self *evQueue) PostData(data interface{}) {
self.queue <- data
}
func (self *evQueue) DelayPostData(dur time.Duration, callback func()) {
go func() {
time.AfterFunc(dur, func() {
self.queue <- callback
})
}()
}
func (self *evQueue) Count() int {
return len(self.contextMap)
}
func (self *evQueue) CountByID(id int) int {
if v, ok := self.contextMap[id]; ok {
return len(v)
}
return 0
}
type contentIndexer interface {
ContextID() int
}
// 通过数据接口调用
func (self *evQueue) CallData(data interface{}) {
// 先处理注入
if self.inject != nil && !self.inject(data) {
return
}
switch d := data.(type) {
// ID索引的消息
case contentIndexer:
if carr, ok := self.contextMap[d.ContextID()]; ok {
// 遍历所有的回调
for _, c := range carr {
c(data)
}
}
// 直接回调
case func():
d()
}
}
const queueLength = 10
func newEventQueue() *evQueue {
self := &evQueue{
contextMap: make(map[int][]func(interface{})),
queue: make(chan interface{}, queueLength),
}
return self
}