forked from miguelgrinberg/python-socketio
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclient.py
641 lines (561 loc) · 27.1 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
import itertools
import logging
import random
import signal
import threading
import engineio
import six
from . import exceptions
from . import namespace
from . import packet
default_logger = logging.getLogger('socketio.client')
reconnecting_clients = []
def signal_handler(sig, frame): # pragma: no cover
"""SIGINT handler.
Notify any clients that are in a reconnect loop to abort. Other
disconnection tasks are handled at the engine.io level.
"""
for client in reconnecting_clients[:]:
client._reconnect_abort.set()
return original_signal_handler(sig, frame)
original_signal_handler = None
class Client(object):
"""A Socket.IO client.
This class implements a fully compliant Socket.IO web client with support
for websocket and long-polling transports.
:param reconnection: ``True`` if the client should automatically attempt to
reconnect to the server after an interruption, or
``False`` to not reconnect. The default is ``True``.
:param reconnection_attempts: How many reconnection attempts to issue
before giving up, or 0 for infinity attempts.
The default is 0.
:param reconnection_delay: How long to wait in seconds before the first
reconnection attempt. Each successive attempt
doubles this delay.
:param reconnection_delay_max: The maximum delay between reconnection
attempts.
:param randomization_factor: Randomization amount for each delay between
reconnection attempts. The default is 0.5,
which means that each delay is randomly
adjusted by +/- 50%.
:param logger: To enable logging set to ``True`` or pass a logger object to
use. To disable logging set to ``False``. The default is
``False``. Note that fatal errors are logged even when
``logger`` is ``False``.
:param binary: ``True`` to support binary payloads, ``False`` to treat all
payloads as text. On Python 2, if this is set to ``True``,
``unicode`` values are treated as text, and ``str`` and
``bytes`` values are treated as binary. This option has no
effect on Python 3, where text and binary payloads are
always automatically discovered.
:param json: An alternative json module to use for encoding and decoding
packets. Custom json modules must have ``dumps`` and ``loads``
functions that are compatible with the standard library
versions.
The Engine.IO configuration supports the following settings:
:param request_timeout: A timeout in seconds for requests. The default is
5 seconds.
:param ssl_verify: ``True`` to verify SSL certificates, or ``False`` to
skip SSL certificate verification, allowing
connections to servers with self signed certificates.
The default is ``True``.
:param engineio_logger: To enable Engine.IO logging set to ``True`` or pass
a logger object to use. To disable logging set to
``False``. The default is ``False``. Note that
fatal errors are logged even when
``engineio_logger`` is ``False``.
"""
def __init__(self, reconnection=True, reconnection_attempts=0,
reconnection_delay=1, reconnection_delay_max=5,
randomization_factor=0.5, logger=False, binary=False,
json=None, **kwargs):
global original_signal_handler
if original_signal_handler is None and \
threading.current_thread() == threading.main_thread():
original_signal_handler = signal.signal(signal.SIGINT,
signal_handler)
self.reconnection = reconnection
self.reconnection_attempts = reconnection_attempts
self.reconnection_delay = reconnection_delay
self.reconnection_delay_max = reconnection_delay_max
self.randomization_factor = randomization_factor
self.binary = binary
engineio_options = kwargs
engineio_logger = engineio_options.pop('engineio_logger', None)
if engineio_logger is not None:
engineio_options['logger'] = engineio_logger
if json is not None:
packet.Packet.json = json
engineio_options['json'] = json
self.eio = self._engineio_client_class()(**engineio_options)
self.eio.on('connect', self._handle_eio_connect)
self.eio.on('message', self._handle_eio_message)
self.eio.on('disconnect', self._handle_eio_disconnect)
if not isinstance(logger, bool):
self.logger = logger
else:
self.logger = default_logger
if not logging.root.handlers and \
self.logger.level == logging.NOTSET:
if logger:
self.logger.setLevel(logging.INFO)
else:
self.logger.setLevel(logging.ERROR)
self.logger.addHandler(logging.StreamHandler())
self.connection_url = None
self.connection_headers = None
self.connection_transports = None
self.connection_namespaces = None
self.socketio_path = None
self.sid = None
self.connected = False
self.namespaces = []
self.handlers = {}
self.namespace_handlers = {}
self.callbacks = {}
self._binary_packet = None
self._reconnect_task = None
self._reconnect_abort = self.eio.create_event()
def is_asyncio_based(self):
return False
def on(self, event, handler=None, namespace=None):
"""Register an event handler.
:param event: The event name. It can be any string. The event names
``'connect'``, ``'message'`` and ``'disconnect'`` are
reserved and should not be used.
:param handler: The function that should be invoked to handle the
event. When this parameter is not given, the method
acts as a decorator for the handler function.
:param namespace: The Socket.IO namespace for the event. If this
argument is omitted the handler is associated with
the default namespace.
Example usage::
# as a decorator:
@sio.on('connect')
def connect_handler():
print('Connected!')
# as a method:
def message_handler(msg):
print('Received message: ', msg)
sio.send( 'response')
sio.on('message', message_handler)
The ``'connect'`` event handler receives no arguments. The
``'message'`` handler and handlers for custom event names receive the
message payload as only argument. Any values returned from a message
handler will be passed to the client's acknowledgement callback
function if it exists. The ``'disconnect'`` handler does not take
arguments.
"""
namespace = namespace or '/'
def set_handler(handler):
if namespace not in self.handlers:
self.handlers[namespace] = {}
self.handlers[namespace][event] = handler
return handler
if handler is None:
return set_handler
set_handler(handler)
def event(self, *args, **kwargs):
"""Decorator to register an event handler.
This is a simplified version of the ``on()`` method that takes the
event name from the decorated function.
Example usage::
@sio.event
def my_event(data):
print('Received data: ', data)
The above example is equivalent to::
@sio.on('my_event')
def my_event(data):
print('Received data: ', data)
A custom namespace can be given as an argument to the decorator::
@sio.event(namespace='/test')
def my_event(data):
print('Received data: ', data)
"""
if len(args) == 1 and len(kwargs) == 0 and callable(args[0]):
# the decorator was invoked without arguments
# args[0] is the decorated function
return self.on(args[0].__name__)(args[0])
else:
# the decorator was invoked with arguments
def set_handler(handler):
return self.on(handler.__name__, *args, **kwargs)(handler)
return set_handler
def register_namespace(self, namespace_handler):
"""Register a namespace handler object.
:param namespace_handler: An instance of a :class:`Namespace`
subclass that handles all the event traffic
for a namespace.
"""
if not isinstance(namespace_handler, namespace.ClientNamespace):
raise ValueError('Not a namespace instance')
if self.is_asyncio_based() != namespace_handler.is_asyncio_based():
raise ValueError('Not a valid namespace class for this client')
namespace_handler._set_client(self)
self.namespace_handlers[namespace_handler.namespace] = \
namespace_handler
def connect(self, url, headers={}, transports=None,
namespaces=None, socketio_path='socket.io'):
"""Connect to a Socket.IO server.
:param url: The URL of the Socket.IO server. It can include custom
query string parameters if required by the server.
:param headers: A dictionary with custom headers to send with the
connection request.
:param transports: The list of allowed transports. Valid transports
are ``'polling'`` and ``'websocket'``. If not
given, the polling transport is connected first,
then an upgrade to websocket is attempted.
:param namespaces: The list of custom namespaces to connect, in
addition to the default namespace. If not given,
the namespace list is obtained from the registered
event handlers.
:param socketio_path: The endpoint where the Socket.IO server is
installed. The default value is appropriate for
most cases.
Example usage::
sio = socketio.Client()
sio.connect('http://localhost:5000')
"""
self.connection_url = url
self.connection_headers = headers
self.connection_transports = transports
self.connection_namespaces = namespaces
self.socketio_path = socketio_path
if namespaces is None:
namespaces = set(self.handlers.keys()).union(
set(self.namespace_handlers.keys()))
elif isinstance(namespaces, six.string_types):
namespaces = [namespaces]
self.connection_namespaces = namespaces
self.namespaces = [n for n in namespaces if n != '/']
try:
self.eio.connect(url, headers=headers, transports=transports,
engineio_path=socketio_path)
except engineio.exceptions.ConnectionError as exc:
six.raise_from(exceptions.ConnectionError(exc.args[0]), None)
self.connected = True
def wait(self):
"""Wait until the connection with the server ends.
Client applications can use this function to block the main thread
during the life of the connection.
"""
while True:
self.eio.wait()
self.sleep(1) # give the reconnect task time to start up
if not self._reconnect_task:
break
self._reconnect_task.join()
if self.eio.state != 'connected':
break
def emit(self, event, data=None, namespace=None, callback=None):
"""Emit a custom event to one or more connected clients.
:param event: The event name. It can be any string. The event names
``'connect'``, ``'message'`` and ``'disconnect'`` are
reserved and should not be used.
:param data: The data to send to the client or clients. Data can be of
type ``str``, ``bytes``, ``list`` or ``dict``. If a
``list`` or ``dict``, the data will be serialized as JSON.
:param namespace: The Socket.IO namespace for the event. If this
argument is omitted the event is emitted to the
default namespace.
:param callback: If given, this function will be called to acknowledge
the the client has received the message. The arguments
that will be passed to the function are those provided
by the client. Callback functions can only be used
when addressing an individual client.
Note: this method is not thread safe. If multiple threads are emitting
at the same time on the same client connection, messages composed of
multiple packets may end up being sent in an incorrect sequence. Use
standard concurrency solutions (such as a Lock object) to prevent this
situation.
"""
namespace = namespace or '/'
if namespace != '/' and namespace not in self.namespaces:
raise exceptions.BadNamespaceError(
namespace + ' is not a connected namespace.')
self.logger.info('Emitting event "%s" [%s]', event, namespace)
if callback is not None:
id = self._generate_ack_id(namespace, callback)
else:
id = None
if six.PY2 and not self.binary:
binary = False # pragma: nocover
else:
binary = None
# tuples are expanded to multiple arguments, everything else is sent
# as a single argument
if isinstance(data, tuple):
data = list(data)
elif data is not None:
data = [data]
else:
data = []
self._send_packet(packet.Packet(packet.EVENT, namespace=namespace,
data=[event] + data, id=id,
binary=binary))
def send(self, data, namespace=None, callback=None):
"""Send a message to one or more connected clients.
This function emits an event with the name ``'message'``. Use
:func:`emit` to issue custom event names.
:param data: The data to send to the client or clients. Data can be of
type ``str``, ``bytes``, ``list`` or ``dict``. If a
``list`` or ``dict``, the data will be serialized as JSON.
:param namespace: The Socket.IO namespace for the event. If this
argument is omitted the event is emitted to the
default namespace.
:param callback: If given, this function will be called to acknowledge
the the client has received the message. The arguments
that will be passed to the function are those provided
by the client. Callback functions can only be used
when addressing an individual client.
"""
self.emit('message', data=data, namespace=namespace,
callback=callback)
def call(self, event, data=None, namespace=None, timeout=60):
"""Emit a custom event to a client and wait for the response.
:param event: The event name. It can be any string. The event names
``'connect'``, ``'message'`` and ``'disconnect'`` are
reserved and should not be used.
:param data: The data to send to the client or clients. Data can be of
type ``str``, ``bytes``, ``list`` or ``dict``. If a
``list`` or ``dict``, the data will be serialized as JSON.
:param namespace: The Socket.IO namespace for the event. If this
argument is omitted the event is emitted to the
default namespace.
:param timeout: The waiting timeout. If the timeout is reached before
the client acknowledges the event, then a
``TimeoutError`` exception is raised.
Note: this method is not thread safe. If multiple threads are emitting
at the same time on the same client connection, messages composed of
multiple packets may end up being sent in an incorrect sequence. Use
standard concurrency solutions (such as a Lock object) to prevent this
situation.
"""
callback_event = self.eio.create_event()
callback_args = []
def event_callback(*args):
callback_args.append(args)
callback_event.set()
self.emit(event, data=data, namespace=namespace,
callback=event_callback)
if not callback_event.wait(timeout=timeout):
raise exceptions.TimeoutError()
return callback_args[0] if len(callback_args[0]) > 1 \
else callback_args[0][0] if len(callback_args[0]) == 1 \
else None
def disconnect(self):
"""Disconnect from the server."""
# here we just request the disconnection
# later in _handle_eio_disconnect we invoke the disconnect handler
for n in self.namespaces:
self._send_packet(packet.Packet(packet.DISCONNECT, namespace=n))
self._send_packet(packet.Packet(
packet.DISCONNECT, namespace='/'))
self.connected = False
self.eio.disconnect(abort=True)
def transport(self):
"""Return the name of the transport used by the client.
The two possible values returned by this function are ``'polling'``
and ``'websocket'``.
"""
return self.eio.transport()
def start_background_task(self, target, *args, **kwargs):
"""Start a background task using the appropriate async model.
This is a utility function that applications can use to start a
background task using the method that is compatible with the
selected async mode.
:param target: the target function to execute.
:param args: arguments to pass to the function.
:param kwargs: keyword arguments to pass to the function.
This function returns an object compatible with the `Thread` class in
the Python standard library. The `start()` method on this object is
already called by this function.
"""
return self.eio.start_background_task(target, *args, **kwargs)
def sleep(self, seconds=0):
"""Sleep for the requested amount of time using the appropriate async
model.
This is a utility function that applications can use to put a task to
sleep without having to worry about using the correct call for the
selected async mode.
"""
return self.eio.sleep(seconds)
def _send_packet(self, pkt):
"""Send a Socket.IO packet to the server."""
encoded_packet = pkt.encode()
if isinstance(encoded_packet, list):
binary = False
for ep in encoded_packet:
self.eio.send(ep, binary=binary)
binary = True
else:
self.eio.send(encoded_packet, binary=False)
def _generate_ack_id(self, namespace, callback):
"""Generate a unique identifier for an ACK packet."""
namespace = namespace or '/'
if namespace not in self.callbacks:
self.callbacks[namespace] = {0: itertools.count(1)}
id = six.next(self.callbacks[namespace][0])
self.callbacks[namespace][id] = callback
return id
def _handle_connect(self, namespace):
namespace = namespace or '/'
self.logger.info('Namespace {} is connected'.format(namespace))
self._trigger_event('connect', namespace=namespace)
if namespace == '/':
for n in self.namespaces:
self._send_packet(packet.Packet(packet.CONNECT, namespace=n))
elif namespace not in self.namespaces:
self.namespaces.append(namespace)
def _handle_disconnect(self, namespace):
if not self.connected:
return
namespace = namespace or '/'
if namespace == '/':
for n in self.namespaces:
self._trigger_event('disconnect', namespace=n)
self.namespaces = []
self._trigger_event('disconnect', namespace=namespace)
if namespace in self.namespaces:
self.namespaces.remove(namespace)
if namespace == '/':
self.connected = False
def _handle_event(self, namespace, id, data):
namespace = namespace or '/'
self.logger.info('Received event "%s" [%s]', data[0], namespace)
r = self._trigger_event(data[0], namespace, *data[1:])
if id is not None:
# send ACK packet with the response returned by the handler
# tuples are expanded as multiple arguments
if r is None:
data = []
elif isinstance(r, tuple):
data = list(r)
else:
data = [r]
if six.PY2 and not self.binary:
binary = False # pragma: nocover
else:
binary = None
self._send_packet(packet.Packet(packet.ACK, namespace=namespace,
id=id, data=data, binary=binary))
def _handle_ack(self, namespace, id, data):
namespace = namespace or '/'
self.logger.info('Received ack [%s]', namespace)
callback = None
try:
callback = self.callbacks[namespace][id]
except KeyError:
# if we get an unknown callback we just ignore it
self.logger.warning('Unknown callback received, ignoring.')
else:
del self.callbacks[namespace][id]
if callback is not None:
callback(*data)
def _handle_error(self, namespace, data):
namespace = namespace or '/'
self.logger.info('Connection to namespace {} was rejected'.format(
namespace))
if data is None:
data = tuple()
elif not isinstance(data, (tuple, list)):
data = (data,)
self._trigger_event('connect_error', namespace, *data)
if namespace in self.namespaces:
self.namespaces.remove(namespace)
if namespace == '/':
self.namespaces = []
self.connected = False
def _trigger_event(self, event, namespace, *args):
"""Invoke an application event handler."""
# first see if we have an explicit handler for the event
if namespace in self.handlers and event in self.handlers[namespace]:
return self.handlers[namespace][event](*args)
# or else, forward the event to a namespace handler if one exists
elif namespace in self.namespace_handlers:
return self.namespace_handlers[namespace].trigger_event(
event, *args)
def _handle_reconnect(self):
self._reconnect_abort.clear()
reconnecting_clients.append(self)
attempt_count = 0
current_delay = self.reconnection_delay
while True:
delay = current_delay
current_delay *= 2
if delay > self.reconnection_delay_max:
delay = self.reconnection_delay_max
delay += self.randomization_factor * (2 * random.random() - 1)
self.logger.info(
'Connection failed, new attempt in {:.02f} seconds'.format(
delay))
if self._reconnect_abort.wait(delay):
self.logger.info('Reconnect task aborted')
break
attempt_count += 1
try:
self.connect(self.connection_url,
headers=self.connection_headers,
transports=self.connection_transports,
namespaces=self.connection_namespaces,
socketio_path=self.socketio_path)
except (exceptions.ConnectionError, ValueError):
pass
else:
self.logger.info('Reconnection successful')
self._reconnect_task = None
break
if self.reconnection_attempts and \
attempt_count >= self.reconnection_attempts:
self.logger.info(
'Maximum reconnection attempts reached, giving up')
break
reconnecting_clients.remove(self)
def _handle_eio_connect(self):
"""Handle the Engine.IO connection event."""
self.logger.info('Engine.IO connection established')
self.sid = self.eio.sid
def _handle_eio_message(self, data):
"""Dispatch Engine.IO messages."""
if self._binary_packet:
pkt = self._binary_packet
if pkt.add_attachment(data):
self._binary_packet = None
if pkt.packet_type == packet.BINARY_EVENT:
self._handle_event(pkt.namespace, pkt.id, pkt.data)
else:
self._handle_ack(pkt.namespace, pkt.id, pkt.data)
else:
pkt = packet.Packet(encoded_packet=data)
if pkt.packet_type == packet.CONNECT:
self._handle_connect(pkt.namespace)
elif pkt.packet_type == packet.DISCONNECT:
self._handle_disconnect(pkt.namespace)
elif pkt.packet_type == packet.EVENT:
self._handle_event(pkt.namespace, pkt.id, pkt.data)
elif pkt.packet_type == packet.ACK:
self._handle_ack(pkt.namespace, pkt.id, pkt.data)
elif pkt.packet_type == packet.BINARY_EVENT or \
pkt.packet_type == packet.BINARY_ACK:
self._binary_packet = pkt
elif pkt.packet_type == packet.ERROR:
self._handle_error(pkt.namespace, pkt.data)
else:
raise ValueError('Unknown packet type.')
def _handle_eio_disconnect(self):
"""Handle the Engine.IO disconnection event."""
self.logger.info('Engine.IO connection dropped')
if self.connected:
for n in self.namespaces:
self._trigger_event('disconnect', namespace=n)
self._trigger_event('disconnect', namespace='/')
self.namespaces = []
self.connected = False
self.callbacks = {}
self._binary_packet = None
self.sid = None
if self.eio.state == 'connected' and self.reconnection:
self._reconnect_task = self.start_background_task(
self._handle_reconnect)
def _engineio_client_class(self):
return engineio.Client