forked from zammad/zammad
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwebsocket_server.rb
227 lines (184 loc) · 5.94 KB
/
websocket_server.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
class WebsocketServer
cattr_reader :clients, :options
def self.run(options)
@options = options
@clients = {}
Rails.configuration.interface = 'websocket'
EventMachine.run do
EventMachine::WebSocket.start( host: @options[:b], port: @options[:p], secure: @options[:s], tls_options: @options[:tls_options] ) do |ws|
# register client connection
ws.onopen do |handshake|
WebsocketServer.onopen(ws, handshake)
end
# unregister client connection
ws.onclose do
WebsocketServer.onclose(ws)
end
# manage messages
ws.onmessage do |msg|
WebsocketServer.onmessage(ws, msg)
end
end
# check unused connections
EventMachine.add_timer(0.5) do
WebsocketServer.check_unused_connections
end
# check open unused connections, kick all connection without activity in the last 2 minutes
EventMachine.add_periodic_timer(120) do
WebsocketServer.check_unused_connections
end
EventMachine.add_periodic_timer(20) do
WebsocketServer.log_status
end
EventMachine.add_periodic_timer(0.4) do
WebsocketServer.send_to_client
end
end
end
def self.onopen(websocket, handshake)
headers = handshake.headers
remote_ip = get_remote_ip(headers)
client_id = websocket.object_id.to_s
log 'notice', 'Client connected.', client_id
Sessions.create( client_id, {}, { type: 'websocket' } )
return if @clients.include? client_id
@clients[client_id] = {
websocket: websocket,
last_ping: Time.now.utc.to_i,
error_count: 0,
headers: headers,
remote_ip: remote_ip,
}
end
def self.onclose(websocket)
client_id = websocket.object_id.to_s
log 'notice', 'Client disconnected.', client_id
# removed from current client list
if @clients.include? client_id
@clients.delete client_id
end
Sessions.destroy(client_id)
end
def self.onmessage(websocket, msg)
client_id = websocket.object_id.to_s
log 'debug', "received: #{msg} ", client_id
begin
data = JSON.parse(msg)
rescue => e
log 'error', "can't parse message: #{msg}, #{e.inspect}", client_id
return
end
# check if connection not already exists
return if !@clients[client_id]
Sessions.touch(client_id) # rubocop:disable Rails/SkipsModelValidations
@clients[client_id][:last_ping] = Time.now.utc.to_i
# spool messages for new connects
if data['spool']
Sessions.spool_create(data)
end
if data['event']
log 'debug', "execute event '#{data['event']}'", client_id
message = Sessions::Event.run(
event: data['event'],
payload: data,
session: @clients[client_id][:session],
remote_ip: @clients[client_id][:remote_ip],
client_id: client_id,
clients: @clients,
options: @options,
)
if message
websocket_send(client_id, message)
end
else
log 'error', "unknown message '#{data.inspect}'", client_id
end
end
def self.get_remote_ip(headers)
return headers['X-Forwarded-For'] if headers && headers['X-Forwarded-For']
nil
end
def self.websocket_send(client_id, data)
msg = if data.class != Array
"[#{data.to_json}]"
else
data.to_json
end
log 'debug', "send #{msg}", client_id
if !@clients[client_id]
log 'error', "no such @clients for #{client_id}", client_id
return
end
@clients[client_id][:websocket].send(msg)
end
def self.check_unused_connections
log 'notice', 'check unused idle connections...'
idle_time_in_sec = 4 * 60
# close unused web socket sessions
@clients.each do |client_id, client|
next if ( client[:last_ping].to_i + idle_time_in_sec ) >= Time.now.utc.to_i
log 'notice', 'closing idle websocket connection', client_id
# remember to not use this connection anymore
client[:disconnect] = true
# try to close regular
client[:websocket].close_websocket
# delete session from client list
sleep 0.3
@clients.delete(client_id)
end
# close unused ajax long polling sessions
clients = Sessions.destroy_idle_sessions(idle_time_in_sec)
clients.each do |client_id|
log 'notice', 'closing idle long polling connection', client_id
end
end
def self.send_to_client
return if @clients.size.zero?
#log 'debug', 'checking for data to send...'
@clients.each do |client_id, client|
next if client[:disconnect]
log 'debug', 'checking for data...', client_id
begin
queue = Sessions.queue(client_id)
next if queue.blank?
log 'notice', 'send data to client', client_id
websocket_send(client_id, queue)
rescue => e
log 'error', 'problem:' + e.inspect, client_id
# disconnect client
client[:error_count] += 1
if client[:error_count] > 20
if @clients.include? client_id
@clients.delete client_id
end
end
end
end
end
def self.log_status
# websocket
log 'notice', "Status: websocket clients: #{@clients.size}"
@clients.each_key do |client_id|
log 'notice', 'working...', client_id
end
# ajax
client_list = Sessions.list
clients = 0
client_list.each_value do |client|
next if client[:meta][:type] == 'websocket'
clients = clients + 1
end
log 'notice', "Status: ajax clients: #{clients}"
client_list.each do |client_id, client|
next if client[:meta][:type] == 'websocket'
log 'notice', 'working...', client_id
end
end
def self.log(level, data, client_id = '-')
if !@options[:v]
return if level == 'debug'
end
puts "#{Time.now.utc.iso8601}:client(#{client_id}) #{data}" # rubocop:disable Rails/Output
#puts "#{Time.now.utc.iso8601}:#{ level }:client(#{ client_id }) #{ data }"
end
end