forked from VNOI-Admin/OJ
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathevent_poster_amqp.py
55 lines (41 loc) · 1.3 KB
/
event_poster_amqp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import json
import threading
from time import time
import pika
from django.conf import settings
from pika.exceptions import AMQPError
__all__ = ['EventPoster', 'post', 'last']
class EventPoster(object):
def __init__(self):
self._connect()
self._exchange = settings.EVENT_DAEMON_AMQP_EXCHANGE
def _connect(self):
self._conn = pika.BlockingConnection(pika.URLParameters(settings.EVENT_DAEMON_AMQP))
self._chan = self._conn.channel()
def post(self, channel, message, tries=0):
try:
id = int(time() * 1000000)
self._chan.basic_publish(self._exchange, '',
json.dumps({'id': id, 'channel': channel, 'message': message}))
return id
except AMQPError:
if tries > 10:
raise
self._connect()
return self.post(channel, message, tries + 1)
_local = threading.local()
def _get_poster():
if 'poster' not in _local.__dict__:
_local.poster = EventPoster()
return _local.poster
def post(channel, message):
try:
return _get_poster().post(channel, message)
except AMQPError:
try:
del _local.poster
except AttributeError:
pass
return 0
def last():
return int(time() * 1000000)