-
Notifications
You must be signed in to change notification settings - Fork 1.6k
/
Copy pathspiders.py
297 lines (237 loc) · 10.6 KB
/
spiders.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
import json
import time
from collections.abc import Iterable
from scrapy import FormRequest, signals
from scrapy import version_info as scrapy_version
from scrapy.exceptions import DontCloseSpider
from scrapy.spiders import CrawlSpider, Spider
from scrapy_redis.utils import TextColor
from . import connection, defaults
from .utils import bytes_to_str, is_dict
class RedisMixin:
"""Mixin class to implement reading urls from a redis queue."""
redis_key = None
redis_batch_size = None
redis_encoding = None
# Redis client placeholder.
server = None
# Idle start time
spider_idle_start_time = int(time.time())
max_idle_time = None
def start_requests(self):
"""Returns a batch of start requests from redis."""
return self.next_requests()
def setup_redis(self, crawler=None):
"""Setup redis connection and idle signal.
This should be called after the spider has set its crawler object.
"""
if self.server is not None:
return
if crawler is None:
# We allow optional crawler argument to keep backwards
# compatibility.
# XXX: Raise a deprecation warning.
crawler = getattr(self, "crawler", None)
if crawler is None:
raise ValueError("crawler is required")
settings = crawler.settings
if self.redis_key is None:
self.redis_key = settings.get(
"REDIS_START_URLS_KEY",
defaults.START_URLS_KEY,
)
self.redis_key = self.redis_key % {"name": self.name}
if not self.redis_key.strip():
raise ValueError("redis_key must not be empty")
if self.redis_batch_size is None:
self.redis_batch_size = settings.getint(
"CONCURRENT_REQUESTS", defaults.REDIS_CONCURRENT_REQUESTS
)
try:
self.redis_batch_size = int(self.redis_batch_size)
except (TypeError, ValueError):
raise ValueError("redis_batch_size must be an integer")
if self.redis_encoding is None:
self.redis_encoding = settings.get(
"REDIS_ENCODING", defaults.REDIS_ENCODING
)
self.logger.info(
"Reading start URLs from redis key '%(redis_key)s' "
"(batch size: %(redis_batch_size)s, encoding: %(redis_encoding)s)",
self.__dict__,
)
self.server = connection.from_settings(crawler.settings)
if settings.getbool("REDIS_START_URLS_AS_SET", defaults.START_URLS_AS_SET):
self.fetch_data = self.server.spop
self.count_size = self.server.scard
elif settings.getbool("REDIS_START_URLS_AS_ZSET", defaults.START_URLS_AS_ZSET):
self.fetch_data = self.pop_priority_queue
self.count_size = self.server.zcard
else:
self.fetch_data = self.pop_list_queue
self.count_size = self.server.llen
if self.max_idle_time is None:
self.max_idle_time = settings.get(
"MAX_IDLE_TIME_BEFORE_CLOSE", defaults.MAX_IDLE_TIME
)
try:
self.max_idle_time = int(self.max_idle_time)
except (TypeError, ValueError):
raise ValueError("max_idle_time must be an integer")
# The idle signal is called when the spider has no requests left,
# that's when we will schedule new requests from redis queue
crawler.signals.connect(self.spider_idle, signal=signals.spider_idle)
def pop_list_queue(self, redis_key, batch_size):
with self.server.pipeline() as pipe:
pipe.lrange(redis_key, 0, batch_size - 1)
pipe.ltrim(redis_key, batch_size, -1)
datas, _ = pipe.execute()
return datas
def pop_priority_queue(self, redis_key, batch_size):
with self.server.pipeline() as pipe:
pipe.zrevrange(redis_key, 0, batch_size - 1)
pipe.zremrangebyrank(redis_key, -batch_size, -1)
datas, _ = pipe.execute()
return datas
def next_requests(self):
"""Returns a request to be scheduled or none."""
# XXX: Do we need to use a timeout here?
found = 0
datas = self.fetch_data(self.redis_key, self.redis_batch_size)
for data in datas:
reqs = self.make_request_from_data(data)
if isinstance(reqs, Iterable):
for req in reqs:
yield req
# XXX: should be here?
found += 1
self.logger.info(f"start req url:{req.url}")
elif reqs:
yield reqs
found += 1
else:
self.logger.debug(f"Request not made from data: {data}")
if found:
self.logger.debug(f"Read {found} requests from '{self.redis_key}'")
def make_request_from_data(self, data):
"""Returns a `Request` instance for data coming from Redis.
Overriding this function to support the `json` requested `data` that contains
`url` ,`meta` and other optional parameters. `meta` is a nested json which contains sub-data.
Along with:
After accessing the data, sending the FormRequest with `url`, `meta` and addition `formdata`, `method`
For example:
.. code:: json
{
"url": "https://example.com",
"meta": {
"job-id":"123xsd",
"start-date":"dd/mm/yy",
},
"url_cookie_key":"fertxsas",
"method":"POST",
}
If `url` is empty, return `[]`. So you should verify the `url` in the data.
If `method` is empty, the request object will set method to 'GET', optional.
If `meta` is empty, the request object will set `meta` to an empty dictionary, optional.
This json supported data can be accessed from 'scrapy.spider' through response.
'request.url', 'request.meta', 'request.cookies', 'request.method'
Parameters
----------
data : bytes
Message from redis.
"""
formatted_data = bytes_to_str(data, self.redis_encoding)
if is_dict(formatted_data):
parameter = json.loads(formatted_data)
else:
self.logger.warning(
f"{TextColor.WARNING}WARNING: String request is deprecated, please use JSON data format. "
f"Detail information, please check https://github.com/rmax/scrapy-redis#features{TextColor.ENDC}"
)
return FormRequest(formatted_data, dont_filter=True)
if parameter.get("url", None) is None:
self.logger.warning(
f"{TextColor.WARNING}The data from Redis has no url key in push data{TextColor.ENDC}"
)
return []
url = parameter.pop("url")
method = parameter.pop("method").upper() if "method" in parameter else "GET"
metadata = parameter.pop("meta") if "meta" in parameter else {}
return FormRequest(
url, dont_filter=True, method=method, formdata=parameter, meta=metadata
)
def schedule_next_requests(self):
"""Schedules a request if available"""
# TODO: While there is capacity, schedule a batch of redis requests.
for req in self.next_requests():
# see https://github.com/scrapy/scrapy/issues/5994
if scrapy_version >= (2, 6):
self.crawler.engine.crawl(req)
else:
self.crawler.engine.crawl(req, spider=self)
def spider_idle(self):
"""
Schedules a request if available, otherwise waits.
or close spider when waiting seconds > MAX_IDLE_TIME_BEFORE_CLOSE.
MAX_IDLE_TIME_BEFORE_CLOSE will not affect SCHEDULER_IDLE_BEFORE_CLOSE.
"""
if self.server is not None and self.count_size(self.redis_key) > 0:
self.spider_idle_start_time = int(time.time())
self.schedule_next_requests()
idle_time = int(time.time()) - self.spider_idle_start_time
if self.max_idle_time != 0 and idle_time >= self.max_idle_time:
return
raise DontCloseSpider
class RedisSpider(RedisMixin, Spider):
"""Spider that reads urls from redis queue when idle.
Attributes
----------
redis_key : str (default: REDIS_START_URLS_KEY)
Redis key where to fetch start URLs from..
redis_batch_size : int (default: CONCURRENT_REQUESTS)
Number of messages to fetch from redis on each attempt.
redis_encoding : str (default: REDIS_ENCODING)
Encoding to use when decoding messages from redis queue.
Settings
--------
REDIS_START_URLS_KEY : str (default: "<spider.name>:start_urls")
Default Redis key where to fetch start URLs from..
REDIS_START_URLS_BATCH_SIZE : int (deprecated by CONCURRENT_REQUESTS)
Default number of messages to fetch from redis on each attempt.
REDIS_START_URLS_AS_SET : bool (default: False)
Use SET operations to retrieve messages from the redis queue. If False,
the messages are retrieve using the LPOP command.
REDIS_ENCODING : str (default: "utf-8")
Default encoding to use when decoding messages from redis queue.
"""
@classmethod
def from_crawler(cls, crawler, *args, **kwargs):
obj = super().from_crawler(crawler, *args, **kwargs)
obj.setup_redis(crawler)
return obj
class RedisCrawlSpider(RedisMixin, CrawlSpider):
"""Spider that reads urls from redis queue when idle.
Attributes
----------
redis_key : str (default: REDIS_START_URLS_KEY)
Redis key where to fetch start URLs from..
redis_batch_size : int (default: CONCURRENT_REQUESTS)
Number of messages to fetch from redis on each attempt.
redis_encoding : str (default: REDIS_ENCODING)
Encoding to use when decoding messages from redis queue.
Settings
--------
REDIS_START_URLS_KEY : str (default: "<spider.name>:start_urls")
Default Redis key where to fetch start URLs from..
REDIS_START_URLS_BATCH_SIZE : int (deprecated by CONCURRENT_REQUESTS)
Default number of messages to fetch from redis on each attempt.
REDIS_START_URLS_AS_SET : bool (default: True)
Use SET operations to retrieve messages from the redis queue.
REDIS_ENCODING : str (default: "utf-8")
Default encoding to use when decoding messages from redis queue.
"""
@classmethod
def from_crawler(cls, crawler, *args, **kwargs):
obj = super().from_crawler(crawler, *args, **kwargs)
obj.setup_redis(crawler)
return obj