Skip to content

Commit 9df1ebc

Browse files
committed
patch-chapter4
1 parent 94d6caf commit 9df1ebc

7 files changed

+162
-24
lines changed

第四章/ReadMe.md

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,9 @@
1-
##Using the threading and concurrent.futures Modules
1+
#使用threading和concurrent.futures模块
2+
3+
在上面的章节里,我们列举了一些能够被并行化解决的潜在问题. 在本章中,我们会分析如何使用Python中的threading模块来解决这些问题.
4+
5+
本章包含如下议题:
6+
- 定义什么是线程
7+
- 选择使用threading库还是_thread库
8+
- 使用threading模块来为多个输入同时计算Fibonacci序列
9+
- 使用concurrent.futures模块爬取web信息
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
##使用concurrent.futures模块爬取web信息
2+
3+
下面的章节会实现一个并行的web爬虫.
4+
5+
在实现时,我们会应道concurrent.futures模块中一个很有意思的类,叫做ThreadPoolExecutor 在上一章节的例子中,我们分析了parallel\_fibonacci.py是如何实现并发的,它只是以最原始的方式来使用进程,在某一特定的时候需要我们手工来创建和初始化一个个线程. 然而在大型程序中还想这样手工管理线程就太困难了. 在开发大型程序时,我们常常要用到线程池机制. 线程池是一种用于在进程中管理预先创建的多个线程的一种数据结构. 使用线程池的目的是为了复用线程,这样就可以避免不断的创建线程所照成的资源浪费.
6+
7+
基本上和前一章节一样, 我们将会设计一个算法,该算法分阶段地执行一些任务,并且这些任务也会相互影响. 下面,让我们分析一下这个并发网络爬虫的代码
8+
9+
在导入必要的模块,并设置好日志文件后,我们使用内置模块re来创建一个正则表达式(re模块的完整文档可以在http://docs.python.org/3/howto/regex.html中找到). 我们会使用该正则表达式来过滤爬取阶段返回的连接集合. 相关代码如下所示:
10+
11+
```python
12+
html_link_regex = \
13+
re.compile('<a\s(?:.*?\s)*?href=[\'"](.*?)[\'"].*?>')
14+
```
15+
16+
接下来我们创建一个同步队列来模拟输入数据. 然后我们创建一个名为result\_dict的字典实例. In this, we will correlate the URLs and their respective links as a list structure. 相关代码如下:
17+
18+
```python
19+
urls = queue.Queue()
20+
urls.put('http://www.google.com')
21+
urls.put('http://br.bing.com/')
22+
urls.put('https://duckduckgo.com/')
23+
urls.put('https://github.com/')
24+
urls.put('http://br.search.yahoo.com/')
25+
result_dict = {}
26+
```
27+
28+
再接下来我们定义一个名为group\_urls\_task的函数,该函数用于从同步队列中抽取出URL并存入result\_dict的key值中. 另一个应该留意的细节是,我们调用Queue的get方法是,带了两个参数,第一个参数为True表示阻塞其他线程访问这个同步队列,第二个参数是0.05表示阻塞的超时事件,这样就防止出现由于同步队列中没有元素而等待太长事件的情况出现. 毕竟,在某些情况下,你不会想化太多的时间来等待新元素的到来. 相关代码如下:
29+
30+
```python
31+
def group_urls_task(urls):
32+
try:
33+
url = urls.get(True, 0.05)
34+
result_dict[url] = None
35+
logger.info("[%s] putting url [%s] in dictionary..." % (
36+
threading.current_thread().name, url))
37+
except queue.Empty:
38+
logging.error('Nothing to be done, queue is empty')
39+
```
40+
41+
现在我们需要有一个在爬行阶段执行的任务,该任务将每个url作为参数传递给一个名为crawl\_task的函数. 当将URL所指页面中的所有连接都保存下里之后,爬行阶段就算是完成了. 爬行过程中会返回一个元组,且该元组的第一个元素就是传递給crawl\_task函数的URL参数. 在这个步骤中,会从URL所指页面中抽取出一个连接的列表. 获取URL所指网页的内容需要用到request模块(关于request模块的官方文档请参见https://pypi.python.org/pypi/requests )
42+
43+
```python
44+
def crawl_task(url):
45+
links = []
46+
try:
47+
request_data = requests.get(url)
48+
logger.info("[%s] crawling url [%s] ..." % (
49+
threading.current_thread().name, url))
50+
links = html_link_regex.findall(request_data.text)
51+
except:
52+
logger.error(sys.exc_info()[0])
53+
raise
54+
finally:
55+
return (url, links)
56+
```
57+
58+
进一步分析代码,我们会发现创建了一个concurrent.futures模块中定义的ThreadPoolExecutor对象(关于ThreadPoolExecutor对象的详细信息,请参见 http://docs.python.org/3.3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor)在这个ThreadPoolExecutor对象的构造函数中有一个名为max\_workers的参数,该参数决定了该executor所包含的线程池中的线程数. Within the stage of removal of the URLs from the synchronized queue and insertion of keys into result\_dict, the choice was between using three worker threads.(*这一段不知道怎么翻译*) 该数量可以根据问题的大小而改变. 定义完ThreadPoolExecutor之后,我们还使用with语句来保证结束的清理动作会被执行. 这些清理动作会在超出with语句的作用域时被执行. 在ThreadPoolExecutor对象的作用域内,我们遍历同步队列并且通过ThreadPoolExecutor对象的submit方法来将同步队列作为包含URL的队列引用传递給group\_urls\_task函数. 总之,submit方法接受一个要执行的回调函数及其参数并返回一个Future对象,该Future对象会在未来的某个时候执行该回调函数. 就我们的例子中,该回调函数就是group\_urls\_task,而参数就是同步队列的引用. 然后线程池中的线程就会并行且异步地执行Future对象中预定的函数. 相关代码如下:
59+
60+
```python
61+
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as\
62+
group_link_threads:
63+
for i in range(urls.qsize()):
64+
group_link_threads.submit(group_urls_task, urls)
65+
```
66+
67+
随后,我们还要再创建一个ThreadPoolExecutor对象, 不过这一次我们使用上一阶段中group\_urls\_task所产生的key值作为参数来执行爬行的动作. 这一次我们所使用的代码有些不同.
68+
```python
69+
future_tasks = {crawler_link_threads.submit(crawl_task, url): url
70+
for url in result_dict.keys()}
71+
```
72+
73+
我们映射了一个名为future\_tasks的临时字典对象. 该字段对象包含了submit方法所创建的Future对象,且创建这些Future对象时所使用的参数是result\_dict中的每个URL. 也就是说,根据result\_dict中的每个key,我们创建了future\_tasks中的每个任务. 映射完这个字典对象后,我们还需要搜集这些Future对象执行的结果. 搜集执行结果的方法是,使用concurrent.futures.as\_completed(fs,timeout=None)函数来循环遍历执行futre\_tasks中的各个对象, concurrent.futures.as\_completed(fs, timeout=None)方法会返回一个Future对象的迭代器. 这样我们可以遍历得到这些Future对象的执行结果. 在ThreadPoolExecutor的最后,我们在每个爬行线程中都调用了Future对象的result()方法. 在我们这个例子中,该方法返回结果元组. 这样我们最终得到的future\_tasks结果如下所示.
74+
75+
又一次,我们可以发现每个线程池中的线程执行是乱序的,但这不重要,重要的是,result\_dict中输出的内容就是最终结果.

第四章/使用threading模块解决斐波那契序列多输入问题.md

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
input_list = [3, 10, 5, 7]
2727
```
2828

29-
接下来的一行代码,我们从threading模块中定义了一个Condition对象,该对象根据一定的条件同步线程资源
29+
接下来的一行代码,我们从threading模块中定义了一个Condition对象,该对象根据一定的条件同步各线程存取资源的操作
3030

3131
```python
3232

@@ -35,49 +35,49 @@
3535

3636
使用Condition对象用于控制线程的创建队列。
3737

38-
下一块代码定义了一个很多线程都需要调用的方法,我们把它命名为fibonacci\_task。fibonacci\_task方法接收condition对象作为线程获取shared\_queue中值的协议。方法中,我们只用了with语句(关于更多with语句的用法,请参考[http://docs.python.org/3/reference/compound_stmts.html#with](http://docs.python.org/3/reference/compound_stmts.html#with))简化控制内容。如果没有with语句,我们则需要显式的使用锁,并且最后释放锁。有了with操作,代码隐式的在代码最开始获得锁,并在代码最后释放锁。fibonacci方法中接下来是逻辑处理相关代码,告诉当前线程,当shared\_queue为空时,等待。wait()方法是condition中的主要方法之一。线程将一直等待,直到被通知shared\_queue可以被使用。一旦满足shared\_queue可以被使用的条件,当前线程将接收shared\_queue中的值作为输入计算斐波那契序列的值,最后把输入和输出作为key和value存入fibo\_dict字典。最后,我们调用task_done()方法,通知某一个任务已经被分离并执行。代码如下:
38+
下一块代码定义了一个很多线程都需要调用的方法,我们把它命名为fibonacci\_task。fibonacci\_task方法接收condition对象作为线程获取shared\_queue中值的协议。方法中,我们使用了with语句(关于更多with语句的用法,请参考[http://docs.python.org/3/reference/compound_stmts.html#with](http://docs.python.org/3/reference/compound_stmts.html#with))简化控制内容。如果没有with语句,我们则需要显式的使用锁,并且最后释放锁。有了with操作,代码隐式的在代码最开始获得锁,并在代码最后释放锁。fibonacci方法中接下来的是逻辑处理相关代码,告诉当前线程,当shared\_queue为空时,等待。wait()方法是condition中的主要方法之一。线程将一直等待,直到被通知shared\_queue可以被使用。一旦满足shared\_queue可以被使用的条件,当前线程将接收shared\_queue中的值作为输入计算斐波那契序列的值,最后把输入和输出作为key和value存入fibo\_dict字典。最后,我们调用task_done()方法,通知某一个任务已经被分离并执行。代码如下:
3939

4040
```python
4141

4242
def fibonacci_task(condition):
4343
with condition:
44-
while shared_queue.empty():
45-
logger.info("[%s] - waiting for elements in queue.."
46-
% threading.current_thread().name)
47-
condition.wait()
44+
while shared_queue.empty():
45+
logger.info("[%s] - waiting for elements in queue.."
46+
% threading.current_thread().name)
47+
condition.wait()
4848
else:
49-
value = shared_queue.get()
50-
a, b = 0, 1
51-
for item in range(value):
52-
a, b = b, a + b
53-
fibo_dict[value] = a
49+
value = shared_queue.get()
50+
a, b = 0, 1
51+
for item in range(value):
52+
a, b = b, a + b
53+
fibo_dict[item] = a # 这里书中是fibo_dict[value] = a,但是觉得重复赋值没有意义
5454
shared_queue.task_done()
5555
logger.debug("[%s] fibonacci of key [%d] with
56-
result [%d]" %
57-
(threading.current_thread().name, value,
58-
fibo_dict[value]))
56+
result [%d]" %
57+
(threading.current_thread().name, value,
58+
fibo_dict[value]))
5959
```
6060

61-
我们定义的第二个函数是queue_task,该函数被负责计算shared\_queue的值的线程所调用。我们看到condition对象作为获得shared\_queue的协议。input_list中的每一个值都将被插入到shared_queue中去。当所有的值都被插入到shared\_queue中后,告知负责计算斐波那契序列的方法shared\_queue已经可以使用。
61+
我们定义的第二个函数是queue\_task,该函数被负责计算shared\_queue的值的线程所调用。我们看到condition对象作为获得shared\_queue的协议。input\_list中的每一个值都将被插入到shared\_queue中去。当所有的值都被插入到shared\_queue中后,告知负责计算斐波那契序列的方法shared\_queue已经可以使用。
6262

6363
```python
6464

6565
def queue_task(condition):
6666
logging.debug('Starting queue_task...')
6767
with condition:
68-
for item in input_list:
69-
shared_queue.put(item)
70-
logging.debug("Notifying fibonacci_task threads
71-
that the queue is ready to consume..")
72-
condition.notifyAll()
68+
for item in input_list:
69+
shared_queue.put(item)
70+
logging.debug("Notifying fibonacci_task threads
71+
that the queue is ready to consume..")
72+
condition.notifyAll()
7373
```
7474

7575
接下来我们将创建四个线程等待shared\_queue可以被使用条件。线程将执行target参数作为回调函数,代码如下:
7676

7777
```python
7878

7979
threads = [threading.Thread(
80-
daemon=True, target=fibonacci_task,
80+
daemon=True, target=fibonacci_task,
8181
args=(queue_condition,)) for i in range(4)]
8282
```
8383

@@ -97,7 +97,7 @@
9797
prod.start()
9898
```
9999

100-
最后,我们调用join()方法,调用计算斐波那契序列的所有线程,使用join()方法的目的是,让主线程等待子线程的调用,直到所有子线程执行完毕之后才结束子线程。
100+
最后,我们多计算斐波那契序列的所有线程都调用join()方法,使用join()方法的目的是,让主线程等待子线程的调用,直到所有子线程执行完毕之后才结束子线程。
101101

102102
```python
103103

@@ -106,6 +106,6 @@
106106

107107
程序的执行结果如下:
108108

109-
注意到,第一个fibonacci_task线程被创建和初始化后,它们进入等待状态。同时,queue_task线程被创建并且生成shared\_queue队列。最后,queue\_task方法告知fibonacci_task线程可以执行它们的任务。
109+
注意到,第一个fibonacci\_task线程被创建和初始化后,它们进入等待状态。同时,queue\_task线程被创建并且生成shared\_queue队列。最后,queue\_task方法告知fibonacci_task线程可以执行它们的任务。
110110

111111
注意到,程序每次执行的过程都不一样,这也是多线程的特性之一。

第四章/使用网络爬虫实现python并发模块.md

Whitespace-only changes.

第四章/定义threading模块.md

Whitespace-only changes.

第四章/定义什么是线程.md

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
##什么是线程
2+
线程表示的是进程中不同的执行流程. 让我们把一个程序相信成是一个蜂箱,该程序的一个动作是将划分收集到这个蜂箱内. 这些搜集动作是由许多个工蜂同时进行的. 这个例子中,工蜂扮演的就是线程的角色, 它们工作在进程内部并且共享资源来完成它们的任务.
3+
4+
同一个进程中的线程共享内存空间. 因此开发者的任务就是控制和访问这些内存区域.
5+
6+
###使用线程的优势和劣势
7+
是否使用线程需要权衡利弊,这依赖于用于实现解决方案的编程语言和操作系统.
8+
9+
使用线程的优势如下所示:
10+
- 同一进程的不同线程之间交流的速度,数据单元(data location)的速度和共享信息的速度都很快
11+
- 创建线程的花费要远远少于创建进程的花费,这时因为它无需拷贝主进程上下文环境中的那些信息
12+
- 通过处理器的缓存机制可以优化内存存取,这样就能够充分利用数据局部性(data locality)的优势
13+
14+
使用线程的劣势如下:
15+
- 数据共享可以加速通讯. 然而开发新手使用线程也很容引进难以解决的错误
16+
- 数据共享限制了解决方案的灵活性. 若想将其迁移到分布式架构上,会是件很头疼的事情. 总的来说,它限制了算法的可扩展性
17+
18+
[就Python编程语言来说, 计算密集型的线程会由于GIL的存在而影响程序的性能]
19+
20+
###理解不同类型的线程
21+
存在两种类型的线程:内核线程与用户线程. 其中,内核线程是指由操作系统创建和管理的线程. 内核线程的上下文切换,调度和销毁都由当前操作系统的内核来管理. 而对于用户线程,这些东西都由开发者来控制.
22+
23+
每种线程都有其优势:
24+
25+
内核线程的优势如下:
26+
- 一个内核线程其实就是一个进程. 因此即使一个内核线程被阻塞了,其他的内核线程也照样运行
27+
- 不同的内核线程可以运行在不同的CPU上
28+
29+
内核线程的劣势如下:
30+
- 创建线程和线程间同步的消耗太大
31+
- 实现依赖于平台
32+
33+
用户线程的优势如下:
34+
- 用户线程的创建和线程间同步的开销较少
35+
- 用户线程是平台无关的
36+
37+
用户线程的劣势如下:
38+
- 同一进程中的所有用户线程都对应一个内核线程. 因此,若该内核线程被阻塞,则所有相应的用户线程都会被阻塞
39+
- 不同用户线程无法运行在不同CPU上
40+
41+
###定义线程的状态
42+
在线程的生命周期中,有5中状态:
43+
- 新建: 该过程的主要动作就是创建一个新线程, 创建完新线程后,该线程被发送到待执行的线程队列中
44+
- 运行: 该状态下,线程获取到并消耗CPU资源
45+
- 就绪: 该状态下,线程在待执行的线程队列中排队,等待被执行
46+
- 阻塞: 该状态下,线程由于等待某个事件(例如I/O操作)的出现而被阻塞. 这时线程并不使用CPU
47+
- 死亡: 该状态下,线程释放执行时使用的资源并结束整个线程的生命周期
48+
49+
###是使用threading模块还是_thread模块
50+
Python提供了两个模块来实现基于系统的线程:_thread模块(该模块提供了使用线程相关的较低层的API; 它的文档可以在http://docs.python.org/3.3/library/_thread.html 找到)和threading模块(该模块提供了使用线程相关的较高层的API; 它的文档可以在 http://docs.python.org/3.3/library/threading.html 中找到). threading模块提供的接口要比_thread模块的结构更友好一些. 至于具体选择哪个模块取决于开发者, 若开发者觉得在低层操作线程,实现自己的线程池,处理所及其其他原始特性(features)更随手一些的话,他/她会偏向使用_thread,否则threading会是更明智的选择

第四章/总结.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
##总结
2+
在本章,我们聚焦于使用线程的理论方法. 通过使用threading模块和concurrent.futures模块,我们实现了上一章所展示的案例,并通过这种方式展示这些模块的机制和灵活性
3+
4+
下一章我们会使用multiprocessing和ProcessPoolExecutor这两个模块来再依次解决这两个问题.
5+

0 commit comments

Comments
 (0)