Skip to content

Commit

Permalink
add SCHEDULER_IDLE_BEFORE_CLOSE to prevent the spider from being clos…
Browse files Browse the repository at this point in the history
…ed when distributed crawling
  • Loading branch information
MOON-CLJ committed Apr 23, 2013
1 parent 01ee168 commit 69938bc
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 11 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
*.py[cod]
*.swp

.ropeproject

# C extensions
*.so
Expand Down Expand Up @@ -33,4 +36,4 @@ nosetests.xml
# Mr Developer
.mr.developer.cfg
.project
.pydevproject
.pydevproject
6 changes: 6 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ Enable the components in your `settings.py`:
# Schedule requests using a stack (LIFO).
SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.SpiderStack'
# Max idle time to prevent the spider from being closed when distributed crawling
# this only work if queue class is SpiderQueue or SpiderStack
# and may also block the same time when your spider start at the first time (because the queue is empty).
SCHEDULER_IDLE_BEFORE_CLOSE = 10
# store scraped item in redis for post-processing
ITEM_PIPELINES = [
'scrapy_redis.pipelines.RedisPipeline',
Expand Down
28 changes: 21 additions & 7 deletions scrapy_redis/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def push(self, request):
"""Push a request"""
raise NotImplementedError

def pop(self):
def pop(self, timeout=0):
"""Pop a request"""
raise NotImplementedError

Expand All @@ -57,9 +57,14 @@ def push(self, request):
"""Push a request"""
self.server.lpush(self.key, self._encode_request(request))

def pop(self):
def pop(self, timeout=0):
"""Pop a request"""
data = self.server.rpop(self.key)
if timeout > 0:
data = self.server.brpop(self.key, timeout)
if isinstance(data, tuple):
data = data[1]
else:
data = self.server.rpop(self.key)
if data:
return self._decode_request(data)

Expand All @@ -77,8 +82,11 @@ def push(self, request):
pairs = {data: -request.priority}
self.server.zadd(self.key, **pairs)

def pop(self):
"""Pop a request"""
def pop(self, timeout=0):
"""
Pop a request
timeout not support in this queue class
"""
# use atomic range/remove using multi/exec
pipe = self.server.pipeline()
pipe.multi()
Expand All @@ -99,9 +107,15 @@ def push(self, request):
"""Push a request"""
self.server.lpush(self.key, self._encode_request(request))

def pop(self):
def pop(self, timeout=0):
"""Pop a request"""
data = self.server.lpop(self.key)
if timeout > 0:
data = self.server.blpop(self.key, timeout)
if isinstance(data, tuple):
data = data[1]
else:
data = self.server.lpop(self.key)

if data:
return self._decode_request(data)

Expand Down
13 changes: 10 additions & 3 deletions scrapy_redis/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@
QUEUE_KEY = '%(spider)s:requests'
QUEUE_CLASS = 'scrapy_redis.queue.SpiderPriorityQueue'
DUPEFILTER_KEY = '%(spider)s:dupefilter'
IDLE_BEFORE_CLOSE = 0


class Scheduler(object):
"""Redis-based scheduler"""

def __init__(self, server, persist, queue_key, queue_cls, dupefilter_key):
def __init__(self, server, persist, queue_key, queue_cls, dupefilter_key, idle_before_close):
"""Initialize scheduler.
Parameters
Expand All @@ -26,12 +27,14 @@ def __init__(self, server, persist, queue_key, queue_cls, dupefilter_key):
queue_key : str
queue_cls : queue class
dupefilter_key : str
idle_before_close : int
"""
self.server = server
self.persist = persist
self.queue_key = queue_key
self.queue_cls = queue_cls
self.dupefilter_key = dupefilter_key
self.idle_before_close = idle_before_close
self.stats = None

def __len__(self):
Expand All @@ -45,8 +48,9 @@ def from_settings(cls, settings):
queue_key = settings.get('SCHEDULER_QUEUE_KEY', QUEUE_KEY)
queue_cls = load_object(settings.get('SCHEDULER_QUEUE_CLASS', QUEUE_CLASS))
dupefilter_key = settings.get('DUPEFILTER_KEY', DUPEFILTER_KEY)
idle_before_close = settings.get('SCHEDULER_IDLE_BEFORE_CLOSE', IDLE_BEFORE_CLOSE)
server = redis.Redis(host, port)
return cls(server, persist, queue_key, queue_cls, dupefilter_key)
return cls(server, persist, queue_key, queue_cls, dupefilter_key, idle_before_close)

@classmethod
def from_crawler(cls, crawler):
Expand All @@ -59,6 +63,8 @@ def open(self, spider):
self.spider = spider
self.queue = self.queue_cls(self.server, spider, self.queue_key)
self.df = RFPDupeFilter(self.server, self.dupefilter_key % {'spider': spider.name})
if self.idle_before_close < 0:
self.idle_before_close = 0
# notice if there are requests already in the queue to resume the crawl
if len(self.queue):
spider.log("Resuming crawl (%d requests scheduled)" % len(self.queue))
Expand All @@ -76,7 +82,8 @@ def enqueue_request(self, request):
self.queue.push(request)

def next_request(self):
request = self.queue.pop()
block_pop_timeout = self.idle_before_close
request = self.queue.pop(block_pop_timeout)
if request and self.stats:
self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)
return request
Expand Down

0 comments on commit 69938bc

Please sign in to comment.