forked from Jrohy/trojan
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwebsocket.go
146 lines (133 loc) · 2.78 KB
/
websocket.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package util
import (
"errors"
"fmt"
"github.com/gorilla/websocket"
"net/http"
"sync"
)
// http升级websocket协议的配置
var wsUpgrader = websocket.Upgrader{
// 允许所有CORS跨域请求
CheckOrigin: func(r *http.Request) bool {
return true
},
}
// WsMessage websocket消息
type WsMessage struct {
MessageType int
Data []byte
}
// WsConnection 封装websocket连接
type WsConnection struct {
wsSocket *websocket.Conn // 底层websocket
inChan chan *WsMessage // 读取队列
outChan chan *WsMessage // 发送队列
mutex sync.Mutex // 避免重复关闭管道
isClosed bool
CloseChan chan byte // 关闭通知
}
// 读取协程
func (wsConn *WsConnection) wsReadLoop() {
var (
msgType int
data []byte
msg *WsMessage
err error
)
for {
// 读一个message
if msgType, data, err = wsConn.wsSocket.ReadMessage(); err != nil {
fmt.Println("Read error: " + err.Error())
goto CLOSED
}
msg = &WsMessage{
msgType,
data,
}
// 放入请求队列
select {
case wsConn.inChan <- msg:
if string(data) == "exit" {
goto CLOSED
}
case <-wsConn.CloseChan:
goto CLOSED
}
}
CLOSED:
wsConn.WsClose()
}
// 发送协程
func (wsConn *WsConnection) wsWriteLoop() {
var (
msg *WsMessage
err error
)
for {
select {
// 取一个应答
case msg = <-wsConn.outChan:
// 写给websocket
if err = wsConn.wsSocket.WriteMessage(msg.MessageType, msg.Data); err != nil {
fmt.Println(err)
goto CLOSED
}
case <-wsConn.CloseChan:
goto CLOSED
}
}
CLOSED:
wsConn.WsClose()
}
// InitWebsocket 初始化ws
func InitWebsocket(resp http.ResponseWriter, req *http.Request) (wsConn *WsConnection, err error) {
var (
wsSocket *websocket.Conn
)
// 应答客户端告知升级连接为websocket
if wsSocket, err = wsUpgrader.Upgrade(resp, req, nil); err != nil {
return
}
wsConn = &WsConnection{
wsSocket: wsSocket,
inChan: make(chan *WsMessage, 1000),
outChan: make(chan *WsMessage, 1000),
CloseChan: make(chan byte),
isClosed: false,
}
// 读协程
go wsConn.wsReadLoop()
// 写协程
go wsConn.wsWriteLoop()
return
}
// WsWrite 发送消息
func (wsConn *WsConnection) WsWrite(messageType int, data []byte) (err error) {
select {
case wsConn.outChan <- &WsMessage{messageType, data}:
case <-wsConn.CloseChan:
err = errors.New("websocket closed")
}
return
}
// WsRead 读取消息
func (wsConn *WsConnection) WsRead() (msg *WsMessage, err error) {
select {
case msg = <-wsConn.inChan:
return
case <-wsConn.CloseChan:
err = errors.New("websocket closed")
}
return
}
// WsClose 关闭连接
func (wsConn *WsConnection) WsClose() {
wsConn.wsSocket.Close()
wsConn.mutex.Lock()
defer wsConn.mutex.Unlock()
if !wsConn.isClosed {
wsConn.isClosed = true
close(wsConn.CloseChan)
}
}