diff --git a/.gitignore b/.gitignore index 56841532..6fd32b92 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,7 @@ *.py[cod] +*.swp + +.ropeproject # C extensions *.so @@ -33,4 +36,4 @@ nosetests.xml # Mr Developer .mr.developer.cfg .project -.pydevproject \ No newline at end of file +.pydevproject diff --git a/README.rst b/README.rst index 6ac5dd3d..9874fa00 100644 --- a/README.rst +++ b/README.rst @@ -59,6 +59,12 @@ Enable the components in your `settings.py`: # Schedule requests using a stack (LIFO). SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.SpiderStack' + # Max idle time to prevent the spider from being closed when distributed crawling + # this only work if queue class is SpiderQueue or SpiderStack + # and may also block the same time when your spider start at the first time (because the queue is empty). + SCHEDULER_IDLE_BEFORE_CLOSE = 10 + + # store scraped item in redis for post-processing ITEM_PIPELINES = [ 'scrapy_redis.pipelines.RedisPipeline', diff --git a/scrapy_redis/queue.py b/scrapy_redis/queue.py index 583c090f..a95dc4f8 100644 --- a/scrapy_redis/queue.py +++ b/scrapy_redis/queue.py @@ -37,7 +37,7 @@ def push(self, request): """Push a request""" raise NotImplementedError - def pop(self): + def pop(self, timeout=0): """Pop a request""" raise NotImplementedError @@ -57,9 +57,14 @@ def push(self, request): """Push a request""" self.server.lpush(self.key, self._encode_request(request)) - def pop(self): + def pop(self, timeout=0): """Pop a request""" - data = self.server.rpop(self.key) + if timeout > 0: + data = self.server.brpop(self.key, timeout) + if isinstance(data, tuple): + data = data[1] + else: + data = self.server.rpop(self.key) if data: return self._decode_request(data) @@ -77,8 +82,11 @@ def push(self, request): pairs = {data: -request.priority} self.server.zadd(self.key, **pairs) - def pop(self): - """Pop a request""" + def pop(self, timeout=0): + """ + Pop a request + timeout not support in this queue class + """ # use atomic range/remove using multi/exec pipe = self.server.pipeline() pipe.multi() @@ -99,9 +107,15 @@ def push(self, request): """Push a request""" self.server.lpush(self.key, self._encode_request(request)) - def pop(self): + def pop(self, timeout=0): """Pop a request""" - data = self.server.lpop(self.key) + if timeout > 0: + data = self.server.blpop(self.key, timeout) + if isinstance(data, tuple): + data = data[1] + else: + data = self.server.lpop(self.key) + if data: return self._decode_request(data) diff --git a/scrapy_redis/scheduler.py b/scrapy_redis/scheduler.py index 0ffcf5d6..183c8885 100644 --- a/scrapy_redis/scheduler.py +++ b/scrapy_redis/scheduler.py @@ -11,12 +11,13 @@ QUEUE_KEY = '%(spider)s:requests' QUEUE_CLASS = 'scrapy_redis.queue.SpiderPriorityQueue' DUPEFILTER_KEY = '%(spider)s:dupefilter' +IDLE_BEFORE_CLOSE = 0 class Scheduler(object): """Redis-based scheduler""" - def __init__(self, server, persist, queue_key, queue_cls, dupefilter_key): + def __init__(self, server, persist, queue_key, queue_cls, dupefilter_key, idle_before_close): """Initialize scheduler. Parameters @@ -26,12 +27,14 @@ def __init__(self, server, persist, queue_key, queue_cls, dupefilter_key): queue_key : str queue_cls : queue class dupefilter_key : str + idle_before_close : int """ self.server = server self.persist = persist self.queue_key = queue_key self.queue_cls = queue_cls self.dupefilter_key = dupefilter_key + self.idle_before_close = idle_before_close self.stats = None def __len__(self): @@ -45,8 +48,9 @@ def from_settings(cls, settings): queue_key = settings.get('SCHEDULER_QUEUE_KEY', QUEUE_KEY) queue_cls = load_object(settings.get('SCHEDULER_QUEUE_CLASS', QUEUE_CLASS)) dupefilter_key = settings.get('DUPEFILTER_KEY', DUPEFILTER_KEY) + idle_before_close = settings.get('SCHEDULER_IDLE_BEFORE_CLOSE', IDLE_BEFORE_CLOSE) server = redis.Redis(host, port) - return cls(server, persist, queue_key, queue_cls, dupefilter_key) + return cls(server, persist, queue_key, queue_cls, dupefilter_key, idle_before_close) @classmethod def from_crawler(cls, crawler): @@ -59,6 +63,8 @@ def open(self, spider): self.spider = spider self.queue = self.queue_cls(self.server, spider, self.queue_key) self.df = RFPDupeFilter(self.server, self.dupefilter_key % {'spider': spider.name}) + if self.idle_before_close < 0: + self.idle_before_close = 0 # notice if there are requests already in the queue to resume the crawl if len(self.queue): spider.log("Resuming crawl (%d requests scheduled)" % len(self.queue)) @@ -76,7 +82,8 @@ def enqueue_request(self, request): self.queue.push(request) def next_request(self): - request = self.queue.pop() + block_pop_timeout = self.idle_before_close + request = self.queue.pop(block_pop_timeout) if request and self.stats: self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider) return request