-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcrawl_webpages.py
325 lines (255 loc) · 13.3 KB
/
crawl_webpages.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
import os
import time
import urllib
import random
import validators
import pickle
import asyncio
import aiohttp
import signal
from multiprocessing import Pool, cpu_count, Process
from dotenv import load_dotenv
from rocksdict import Rdict
from pebble import ProcessPool
from concurrent.futures import TimeoutError
from utils.crawl_parse_utils import check_url_relevance, get_url_text_and_links
load_dotenv()
# #### Crawler design
#
# We now have a frontier of URLs. We now need to carefully crawl all linked webpages while ensuring to only index relevant webpages.
#
# **Design of the crawler**: We can see that the crawling process is _network bound_, i.e, the bottleneck is the network latency and the server rate limit. So despite the [GIL](https://wiki.python.org/moin/GlobalInterpreterLock), we can safely use multiple threads within the same python process without any performance issues. For the purposes of small-scale crawling, we do not need multiple processes. This has some advasntages:
#
# 1. We do not need complex synchronisation mechanisms between different threads. The Python GIL ensures that all shared accesses are safe. (Because threads are _concurrent_, and not _parallel_).
# 2. We can simply use a python list as our shared data structure for the frontier! (List operations such as append and pop are thread-safe by [default](https://web.archive.org/web/20201108091210/http://effbot.org/pyfaq/what-kinds-of-global-value-mutation-are-thread-safe.htm))
#
# Enqueuing and dequeueing as simply done through list append and list pop operations.
MAX_DEPTH = 7 # Maximum depth to crawl.
TIME_BETWEEN_REQUESTS = 1.0 # Number of seconds to wait between requests to the same domain
EXPAND_FRONTIER = 0.5 # Probability of expanding the frontier
PARALLEL_REQUESTS = 4096 # Number of parallel requests to make
STOP_EVENT = asyncio.Event() # Flag to stop the crawl
RETRY_FAILED = False
# load the frontier URLs
with open('../data/crawling_data/frontier_urls.pkl', 'rb') as f:
frontier = pickle.load(f)
print(f"Loaded {len(frontier)} URLs from the frontier")
# each frontier entry has the url, the depth, and the domain of the URL
frontier = [(url, MAX_DEPTH, urllib.parse.urlparse(url).netloc) for url in frontier if validators.url(url)]
current_crawl_state = {
"frontier": frontier, # list of URLs to be crawled
"visited": set(), # list of URLs that have been crawled (we only store the URL, not the content)
"failed": set(), # list of URLs that have failed to be crawled.
"rejected": set(), # list of URLs that were rejected based on key word relevance
"last_saved": time.time(), # timestamp of the last save
"to_visit": set(), # list of URLs that are yet to be crawled
"all_discovered_urls": set(
[
item[0] for item in frontier \
if not any(item[0].endswith(x) for x in ['.jpg', '.jpeg', '.png', '.gif', '.mp4', '.avi', '.webm'])
]
) # list of all URLs that have been processed
}
del frontier
if os.path.exists('../data/crawling_data/crawl_state.pkl'):
with open('../data/crawling_data/crawl_state.pkl', 'rb') as f:
current_crawl_state = pickle.load(f)
if RETRY_FAILED:
print("Frontier size:", len(current_crawl_state['frontier']))
current_crawl_state['frontier'].extend([(x, 7, urllib.parse.urlparse(x).netloc) for x in current_crawl_state['failed'] if not any(x.endswith(y) for y in ['.jpg', '.jpeg', '.png', '.gif', '.mp4', '.avi', '.webm'])])
print("Frontier size:", len(current_crawl_state['frontier']))
current_crawl_state['failed'] = set()
print("Failed size:", len(current_crawl_state['failed']))
# #### Storing the crawl results
#
# We store the results of the crawl in a `rocksDB` instance, which is a simple key-value store. We use the `rocksdict` library that provides a nice, dict-like interface to the key-value store. This takes care of caching data on memory, and flushing the results to the database as required.
# open the dictionary file
db = Rdict('../data/crawling_data/crawl_data')
db_titles = Rdict('../data/runtime_data/titles')
# Save the crawl_state file in a subprocess - saves time.
p = None
def write_pickle_file(obj, path):
print("Saving pickle using subprocess!")
with open(path, 'wb') as f:
pickle.dump(obj, f)
print("Completed saving the state!")
def save_state():
"""
Save the current crawl state to disk as a pickle file.
"""
global p
current_crawl_state["last_saved"] = time.time()
print("--------------------------------------------------")
print(f"Saved state at {time.time()}")
print(f"Visited {len(current_crawl_state['visited'])} URLs")
print(f"Frontier has {len(current_crawl_state['frontier'])} URLs")
print(f"Failed to crawl {len(current_crawl_state['failed'])} URLs")
print(f"Rejected {len(current_crawl_state['rejected'])} URLs")
print("--------------------------------------------------")
db.flush()
if p is None or not p.is_alive():
# Run this only if the previous subprocess has finished writing to disk.
p = Process(target=write_pickle_file, args=(current_crawl_state, '../data/crawl_state.pkl'))
p.daemon = False
p.start()
# #### Crawling
#
# We use a multi-threaded crawler for the reasons discussed above. Most of the operations we use below are thread-safe except for the following:
#
# * The `pop` operation is itself atomic, but we _read_ the length of the list to pop a random URL. This can cause issues and is not thread safe. So we use a mutex lock to ensure only one thread pops at once.
# * Similar to the above, `dict` operations are thread safe, but we need to ensure only one thread reads the dict at a time. So we use another lock.
# * To ensure only one thread saves the state at a time, we use another lock.
#
# The crawling process is straightforward:
# 1. Pop a URL from the frontier
# 2. Check if enough time has passed since the previous request to the domain.
# 3. If yes, retrieve the contents of the URL
# 4. Extract contents and links from the URL.
# 5. Append the links to the frontier, and save the contents of the URL.
async def get_url_content(url, session):
"""fetch the content of the URL using aiohttp.
We use aiohttp to fetch the content of the URL asynchronously
Arguments
---------
url : str
the URL to fetch
Returns
-------
str
the content of the URL
"""
user_agents = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:124.0) Gecko/20100101 Firefox/124.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36 Edg/123.0.2420.81",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36 OPR/109.0.0.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 14.4; rv:124.0) Gecko/20100101 Firefox/124.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 14_4_1) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.4.1 Safari/605.1.15",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 14_4_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36 OPR/109.0.0.0",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36",
"Mozilla/5.0 (X11; Linux i686; rv:124.0) Gecko/20100101 Firefox/124.0",
]
headers = {"User-Agent": random.choice(user_agents)}
if any(url.endswith(x) for x in ['.jpg', '.jpeg', '.png', '.gif', '.mp4', '.avi', '.webm']):
return None
return_value = None
try:
async with session.get(url, timeout=30, headers=headers) as response:
if url.endswith('.pdf'):
return_value = await response.read()
else:
return_value = await response.text()
except (aiohttp.ClientError, UnicodeDecodeError, ValueError, LookupError) as e:
print(f"Failed to fetch {url}: {e}")
except asyncio.TimeoutError:
print(f"Failed to fetch {url}: Timeout")
return_value = 'timeouterror'
return return_value
def sample_frontier():
"""sample the frontier to get a random sample of URLs to crawl.
Returns
-------
list
list of URLs to crawl
list
list of depths of the URLs
"""
urls = []
depths = []
domains_frequency = {}
sample_indices = list(random.sample(list(range(len(current_crawl_state['frontier']))), PARALLEL_REQUESTS * 10))
sample_indices.sort(key = lambda x: current_crawl_state['frontier'][x][1], reverse = True)
# sample 10x the samples, sort the samples by the depth value - need to prioritise the root nodes.
for idx in sample_indices:
url, depth, domain = current_crawl_state['frontier'][idx]
if url in current_crawl_state['all_discovered_urls']:
continue
if domain not in domains_frequency:
domains_frequency[domain] = 0
if domains_frequency[domain] < 3:
urls.append(url)
depths.append(depth)
domains_frequency[domain] += 1
if len(urls) == PARALLEL_REQUESTS:
break
return urls, depths
async def crawl_webpages():
"""crawl the webpages in the frontier.
This function will run indefinitely and will crawl the webpages in the frontier.
"""
while len(current_crawl_state["frontier"]) > 0:
urls, depths = sample_frontier()
connector = aiohttp.TCPConnector(limit=None)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [get_url_content(url, session) for url in urls]
url_content_raw = await asyncio.gather(*tasks)
await connector.close()
url_contents = []
with ProcessPool() as pool:
future = pool.map(get_url_text_and_links, zip(urls, url_content_raw), timeout=10)
iterator = future.result()
while True:
try:
result = next(iterator)
url_contents.append(result)
except StopIteration:
break
except TimeoutError as error:
print("function took longer than %d seconds")
url_contents.append((None, None, None))
assert len(url_contents) == len(urls)
all_new_links = set()
url_depth_map = {}
for url, (url_text, url_links, url_title), depth in zip(urls, url_contents, depths):
current_crawl_state['all_discovered_urls'].add(url)
if url_title is not None:
db_titles[url] = url_title
if url_text is None:
if url_links is not None:
#language does not match.
current_crawl_state['rejected'].add(url)
else:
# failed to fetch the URL content
current_crawl_state["failed"].add(url)
continue
if url_text == 'timeouterror':
print(f"URL {url} timed out. Adding it to frontier to try again later.")
current_crawl_state["frontier"].append((url, depth, urllib.parse.urlparse(url).netloc))
continue
# save the text content to the dictionary
db[url] = url_text
# check if the URL is relevant
if not check_url_relevance(url_text):
current_crawl_state["rejected"].add(url)
continue
current_crawl_state["visited"].add(url)
if depth > 0:
all_new_links.update(url_links)
for l in url_links:
url_depth_map[l] = depth - 1
# now process all the new links - add them to the frontier or to_visit, after checking if they are already discovered
all_new_links = all_new_links - current_crawl_state['all_discovered_urls']
for link in all_new_links:
# check if it is a valid URL, and not an image or a video
if validators.url(link) and not any(url.endswith(x) for x in ['.jpg', '.jpeg', '.png', '.gif', '.mp4', '.avi', '.webm']):
# add the link to the frontier with a probability to avoid the frontier becoming too large
if random.random() < EXPAND_FRONTIER:
current_crawl_state["frontier"].append((link, url_depth_map[link], urllib.parse.urlparse(link).netloc))
else:
current_crawl_state["to_visit"].add((link, url_depth_map[link], urllib.parse.urlparse(link).netloc))
save_state()
if STOP_EVENT.is_set():
break
def signal_handler(sig, frame):
print("KeyboardInterrupt received, shutting down...")
STOP_EVENT.set()
# # Setup signal handling
loop = asyncio.get_event_loop()
# Setup signal handling
loop.add_signal_handler(signal.SIGINT, lambda: signal_handler(signal.SIGINT, None))
try:
loop.run_until_complete(crawl_webpages())
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()