Skip to content

Commit

Permalink
use high level interface kumbu to connect to multipul queues
Browse files Browse the repository at this point in the history
  • Loading branch information
binux committed May 22, 2015
1 parent b55607d commit 708b004
Show file tree
Hide file tree
Showing 10 changed files with 207 additions and 30 deletions.
11 changes: 5 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ pyspider [![Build Status]][Travis CI] [![Coverage Status]][Coverage] [![Try]][De

A Powerful Spider(Web Crawler) System in Python. **[TRY IT NOW!][Demo]**

- Write script in python with powerful API
- Python 2&3
- Write script in Python
- Powerful WebUI with script editor, task monitor, project manager and result viewer
- Javascript pages supported!
- MySQL, MongoDB, SQLite, PostgreSQL as database backend
- Task priority, retry, periodical, recrawl by age and more
- Distributed architecture
- [MySQL](https://www.mysql.com/), [MongoDB](https://www.mongodb.org/), [Redis](http://redis.io/), [SQLite](https://www.sqlite.org/), [PostgreSQL](http://www.postgresql.org/) with [SQLAlchemy](http://www.sqlalchemy.org/) as database backend
- [RabbitMQ](http://www.rabbitmq.com/), [Beanstalk](http://kr.github.com/beanstalkd/), [Redis](http://redis.io/) and [Kombu](http://kombu.readthedocs.org/) as message queue
- Task priority, retry, periodical, recrawl by age, etc...
- Distributed architecture, Crawl Javascript pages, Python 2&3, etc...

Documentation: [http://docs.pyspider.org/](http://docs.pyspider.org/)
Tutorial: [http://docs.pyspider.org/en/latest/tutorial/](http://docs.pyspider.org/en/latest/tutorial/)
Expand Down
21 changes: 12 additions & 9 deletions docs/Command-Line.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,18 @@ type:
#### --message-queue

```
rabbitmq:
amqp://username:password@host:5672/%2F
Refer: https://www.rabbitmq.com/uri-spec.html
beanstalk:
beanstalk://host:11300/
redis:
redis://host:6379/db
builtin:
None
rabbitmq:
amqp://username:password@host:5672/%2F
see https://www.rabbitmq.com/uri-spec.html
beanstalk:
beanstalk://host:11300/
redis:
redis://host:6379/db
kombu:
kombu+transport://userid:password@hostname:port/virtual_host
see http://kombu.readthedocs.org/en/latest/userguide/connections.html#urls
builtin:
None
```

#### --phantomjs-proxy
Expand Down
16 changes: 9 additions & 7 deletions docs/Deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,15 @@ type:
You can use connection URL to specify the message queue:

```
rabbitmq:
amqp://username:password@host:5672/%2F
Refer: https://www.rabbitmq.com/uri-spec.html
beanstalk:
beanstalk://host:11300/
redis:
redis://host:6379/db
rabbitmq:
amqp://username:password@host:5672/%2F
Refer: https://www.rabbitmq.com/uri-spec.html
beanstalk:
beanstalk://host:11300/
redis:
redis://host:6379/db
builtin:
None
```

running
Expand Down
11 changes: 5 additions & 6 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ pyspider [![Build Status][Build Status]][Travis CI] [![Coverage Status][Coverage

A Powerful Spider(Web Crawler) System in Python. **[TRY IT NOW!][Demo]**

- Write script in python with powerful API
- Python 2&3
- Write script in Python
- Powerful WebUI with script editor, task monitor, project manager and result viewer
- Javascript pages supported!
- MySQL, MongoDB, SQLite, PostgreSQL as database backend
- Task priority, retry, periodical, recrawl by age and more
- Distributed architecture
- [MySQL](https://www.mysql.com/), [MongoDB](https://www.mongodb.org/), [Redis](http://redis.io/), [SQLite](https://www.sqlite.org/), [PostgreSQL](http://www.postgresql.org/) with [SQLAlchemy](http://www.sqlalchemy.org/) as database backend
- [RabbitMQ](http://www.rabbitmq.com/), [Beanstalk](http://kr.github.com/beanstalkd/), [Redis](http://redis.io/) and [Kombu](http://kombu.readthedocs.org/) as message queue
- Task priority, retry, periodical, recrawl by age, ...
- Distributed architecture, Crawl Javascript pages, Python 2&3, ...


Sample Code
Expand Down
10 changes: 9 additions & 1 deletion pyspider/message_queue/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@ def connect_message_queue(name, url=None, maxsize=0):
rabbitmq:
amqp://username:password@host:5672/%2F
Refer: https://www.rabbitmq.com/uri-spec.html
see https://www.rabbitmq.com/uri-spec.html
beanstalk:
beanstalk://host:11300/
redis:
redis://host:6379/db
kombu:
kombu+transport://userid:password@hostname:port/virtual_host
see http://kombu.readthedocs.org/en/latest/userguide/connections.html#urls
builtin:
None
"""
Expand All @@ -49,5 +52,10 @@ def connect_message_queue(name, url=None, maxsize=0):
db = 0

return Queue(name, parsed.hostname, parsed.port, db=db, maxsize=maxsize)
else:
if url.startswith('kombu+'):
url = url[len('kombu+'):]
from .kombu_queue import Queue
return Queue(name, url, maxsize=maxsize)

raise Exception('unknow connection url: %s', url)
117 changes: 117 additions & 0 deletions pyspider/message_queue/kombu_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
# Author: Binux<[email protected]>
# http://binux.me
# Created on 2015-05-22 20:54:01

import time
import umsgpack
from kombu import Connection, enable_insecure_serializers
from kombu.serialization import register
from kombu.exceptions import ChannelError
from six.moves import queue as BaseQueue


register('umsgpack', umsgpack.packb, umsgpack.unpackb, 'application/x-msgpack')
enable_insecure_serializers(['umsgpack'])


class KombuQueue(object):
"""
kombu is a high-level interface for multiple message queue backends.
KombuQueue is built on top of kombu API.
"""

Empty = BaseQueue.Empty
Full = BaseQueue.Full
max_timeout = 0.3

def __init__(self, name, url="amqp://", maxsize=0, lazy_limit=True):
"""
Constructor for KombuQueue
url: http://kombu.readthedocs.org/en/latest/userguide/connections.html#urls
maxsize: an integer that sets the upperbound limit on the number of
items that can be placed in the queue.
"""
self.name = name
self.conn = Connection(url)
self.queue = self.conn.SimpleQueue(self.name, no_ack=True, serializer='umsgpack')

self.maxsize = maxsize
self.lazy_limit = lazy_limit
if self.lazy_limit and self.maxsize:
self.qsize_diff_limit = int(self.maxsize * 0.1)
else:
self.qsize_diff_limit = 0
self.qsize_diff = 0

def qsize(self):
try:
return self.queue.qsize()
except ChannelError:
return 0

def empty(self):
if self.qsize() == 0:
return True
else:
return False

def full(self):
if self.maxsize and self.qsize() >= self.maxsize:
return True
else:
return False

def put(self, obj, block=True, timeout=None):
if not block:
return self.put_nowait()

start_time = time.time()
while True:
try:
return self.put_nowait(obj)
except BaseQueue.Full:
if timeout:
lasted = time.time() - start_time
if timeout > lasted:
time.sleep(min(self.max_timeout, timeout - lasted))
else:
raise
else:
time.sleep(self.max_timeout)

def put_nowait(self, obj):
if self.lazy_limit and self.qsize_diff < self.qsize_diff_limit:
pass
elif self.full():
raise BaseQueue.Full
else:
self.qsize_diff = 0
return self.queue.put(obj)

def get(self, block=True, timeout=None):
try:
ret = self.queue.get(block, timeout)
return ret.payload
except self.queue.Empty:
raise BaseQueue.Empty

def get_nowait(self):
try:
ret = self.queue.get_nowait()
return ret.payload
except self.queue.Empty:
raise BaseQueue.Empty

def delete(self):
self.queue.queue.delete()

def __del__(self):
self.queue.close()


Queue = KombuQueue
1 change: 0 additions & 1 deletion pyspider/message_queue/redis_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ def full(self):

def put_nowait(self, obj):
if self.lazy_limit and self.last_qsize < self.maxsize:
print(self.name, self.last_qsize)
pass
elif self.full():
raise self.Full
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ SQLAlchemy>=0.9.7
six
amqp>=1.3.0
redis
kombu
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
'unittest2>=0.5.1',
'SQLAlchemy>=0.9.7',
'redis',
'kombu',
],
},

Expand Down
48 changes: 48 additions & 0 deletions tests/test_message_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,51 @@ def tearDownClass(self):
self.q2.get()
while not self.q3.empty():
self.q3.get()

class TestKombuQueue(TestMessageQueue, unittest.TestCase):
kombu_url = 'kombu+memory://'

@classmethod
def setUpClass(self):
from pyspider.message_queue import connect_message_queue
with utils.timeout(3):
self.q1 = connect_message_queue('test_queue', self.kombu_url, maxsize=5)
self.q2 = connect_message_queue('test_queue', self.kombu_url, maxsize=5)
self.q3 = connect_message_queue('test_queue_for_threading_test', self.kombu_url)
while not self.q1.empty():
self.q1.get()
while not self.q2.empty():
self.q2.get()
while not self.q3.empty():
self.q3.get()

@classmethod
def tearDownClass(self):
while not self.q1.empty():
self.q1.get()
self.q1.delete()
while not self.q2.empty():
self.q2.get()
self.q2.delete()
while not self.q3.empty():
self.q3.get()
self.q3.delete()

@unittest.skip('test cannot pass, get is buffered')
@unittest.skipIf(os.environ.get('IGNORE_RABBITMQ'), 'no rabbitmq server for test.')
class TestKombuAmpqQueue(TestKombuQueue):
kombu_url = 'kombu+amqp://'

@unittest.skip('test cannot pass, put is buffered')
@unittest.skipIf(os.environ.get('IGNORE_REDIS'), 'no redis server for test.')
class TestKombuRedisQueue(TestKombuQueue):
kombu_url = 'kombu+redis://'

@unittest.skip('test cannot pass, get is buffered')
@unittest.skipIf(os.environ.get('IGNORE_BEANSTALK'), 'no beanstalk server for test.')
class TestKombuBeanstalkQueue(TestKombuQueue):
kombu_url = 'kombu+beanstalk://'

@unittest.skipIf(os.environ.get('IGNORE_MONGODB'), 'no rabbitmq server for test.')
class TestKombuMongoDBQueue(TestKombuQueue):
kombu_url = 'kombu+mongodb://'

0 comments on commit 708b004

Please sign in to comment.