Skip to content

Commit bf0a442

Browse files
committed
Merge pull request Voidly#5 from tanghaodong25/master
提交第五章
2 parents 8411c45 + 0515a3a commit bf0a442

8 files changed

+196
-2
lines changed

第五章/ReadMe.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,8 @@
1-
##Using Multiprocessing and ProcessPoolExecutor
1+
##Using Multiprocessing and ProcessPoolExecutor
2+
上章中,我们学习了如何使用threading模块解决两个问题。通过本章的学习,我们将学习如何使用multiprocessing模块解决上章的两个问题,我们将使用和上章类似的接口实现。然而,我们会使用多进程机制。
3+
本章将覆盖如下一个知识点:
4+
* 理解进程的概念
5+
* 理解多进程通信
6+
* 使用多进程解决斐波那契数列多输入问题
7+
* 使用ProcessPoolExecutor模块设计网络爬虫
8+
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
下面我们将使用多进程解决多输入情况下的斐波那契数列问题,而不是之前我们使用的多线程的方法。
2+
multiprocessing_fibonacci.py程序使用multiprocessing模块,为了顺利执行,还导入了如下模块:
3+
```python
4+
import sys, time, random, re, requests
5+
import concurrent.futures
6+
from multiprocessing import, cpu_count, current_process, Manager
7+
```
8+
9+
上面一些导入模块在之前的章节中提及过,然而,下面中的一些模块我们需要特别注意:
10+
* cpu_count: 该方法允许获得机器cpu的数量
11+
* current_process: 该方法可以获得当前进程的信息,比如进程名称
12+
* Manager: 该类通过对象的方式允许功在多进程之间共享python对象
13+
14+
下面的代码中我们可以注意到第一个方法有点不同,它将产生15个1到20之间的整数,这些整数将被当作fibo_dict的key使用。
15+
接下来让我们一起来看producer_task方法,如下:
16+
```python
17+
def producer_task(q, fibo_dict):
18+
for i in range(5):
19+
value = random.randint(1, 20)
20+
fibo_dict[value] = None
21+
22+
print("Producer [%s] putting value [%d] into queue.." % (current_process().name, value))
23+
q.put(value)
24+
25+
```
26+
27+
下面将定义一个函数来计算fibo_dict中key对应的斐波那契数列值,和之前章节介绍计算斐波那契序列值不同的是,这里把fibo_dict当作参数传入不同的processes。
28+
29+
下面是consumer_task方法,如下:
30+
```python
31+
def consumer_task(q, fibo_dict):
32+
while not q.empty():
33+
value = q.get(True, 0.05)
34+
a, b = 0, 1
35+
for item in range(value):
36+
a, b = b, a+b
37+
fibo_dict[value] = a
38+
print("consumer [%s] getting value [%d] from queue..." % (current_process().name, value))
39+
```
40+
41+
更进一步,我们来看main函数中的代码,main函数中下面几个变量被定义:
42+
* data_queue: 该参数由multiprocessing.Queueu来创建,是进程安全的
43+
* number_of_cpus: 该参数由multiprocessing.cpu_count方法获得,获得机器cpu的个数
44+
* fibo_dict: 这个字典类型变量从Manager实例获得,保存多进程计算结果
45+
46+
然后,我们将创建producer进程,并传入data_queue队列,data_queue队列值由producer_task方法获得:
47+
```python
48+
producer = Process(target=producer_task, args=(data_queue, fibo_dict))
49+
producer.start()
50+
producer.join()
51+
```
52+
53+
我们可以注意到Process实例的初始化过程和我们之前的Thread实例初始化过程类似。初始化函数接收target参数作为进程中要执行的函数,和args参数作为target传入的函数的参数。接下来我们通过start方式开始进程,然后使用join方法,等待producer进程执行完毕。
54+
55+
下面一块代码中,我们将定义consumer_list队列,存入初始化过的consumer进程。使用list存储consumer对象的原因是在所有进程结束开始后调用join方法。循环中的每一个worker被调用后,下一个worker将等待上一个worker执行完毕后才开始执行,下面代码将描述这一过程:
56+
```python
57+
consumer_list = []
58+
cpu = cpu_count()
59+
print(cpu)
60+
for i in range(cpu):
61+
consumer = Process(target=consumer_task, args=(data_queue, fibo_dict))
62+
consumer.start()
63+
consumer_list.append(consumer)
64+
[consumer.join() for consumer in consumer_list]
65+
```
66+
最终我们将迭代输出fibo_dict中的结果,如下面截图所示:
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
正如concurrent.futures模块提供的ThreadPoolExecutor类方便了操控多线程程序,多进程同样有一个类叫ProcessPoolExecutor。ProcessPoolExecutor类同样由concurrent.futures包提供,我们将使用该类执行我们的网络爬虫程序。为了完成这个任务,我们创建了一个叫process_pool_executor_web_crawler.py的python模块。
2+
3+
代码要导入的包如之前章节中我们介绍的,如requests, Manager模块等等。对于任务的定义,我们延用上章线程程序中的代码,只是改动其中小部分代码,还有一点不一样,进程程序中,我们向任务函数传入参数,而不是使用全局变量。代码如下所是:
4+
group_urls_task函数定义如下:
5+
```python
6+
def group_urls_task(urls, result_dict, html_link_regex)
7+
```
8+
crawl_task函数定义如下:
9+
```python
10+
def crawl_task(url, html_link_regex)
11+
12+
```
13+
14+
现在我们将来看下面一小部分代码,做了细微的变化。在main函数中,我们获得Manager类的实例,该实例使得我们获得可以被多进程功效的queue和dict。我们使用Manager.Queue()方法获得queue实例来存储我们将要爬得url。使用Manager.dict()方法获取dict,来存储爬虫的结果。下面的代码将介绍上面的定义:
15+
```python
16+
if __name__ == '__main__':
17+
manager = Manager()
18+
urls = manager.Queue()
19+
urls.put("http://br.bing.com/")
20+
urls.put("https://github.com")
21+
result_dict = manager.dict()
22+
```
23+
24+
接着,我们将定义爬虫程序中将要用到的正则表达式和介绍如何获取机器的cpu个数,程序如下:
25+
```python
26+
html_link_regex = \
27+
re.compile('<a\s(?:.*?\s)*?href=[\'"](.*?)[\'"].*?>')
28+
number_of_cpus = cpu_count()
29+
```
30+
31+
最后一块代码中,我们会注意到API中的一致性模块concurrent.futures。下面的代码正是我们上章使用ThreadPoolExecutor模块时使用到的。我们可以把ThreadPoolExecutor变为ProcessPoolExecutor,并不会影响到CPU绑定的GIL问题。注意下面的程序,创建ProcessPoolExecutor时会根据机器cpu数限定进程的数目。第一个exucutor是为了手机将被爬的URL,把这些URLs保存在一个字典中,key为url而value为None。第二个executor执行爬虫程序。
32+
首先是第一个executor:
33+
```python
34+
with concurrent.futures.ProcessPoolExecutor(max_workers=number_of_cpus) as group_link_processes:
35+
for i in range(urls.qsize()):
36+
group_link_processes.submit(group_urls_task, urls, result_dict, html_link_regex)
37+
```
38+
39+
第二个executor程序如下:
40+
```python
41+
with concurrent.futures.ProcessPoolExecutor(max_workers=number_of_cpus) as crawler_link_processes:
42+
future_tasks = {crawler_link_processes.submit(crawl_task, url, html_link_regex): url for url in result_dict.keys()}
43+
for future in concurrent.futures.as_completed(future_tasks):
44+
result_dict[future.result()[0]] = future.result()[1]
45+
```
46+
47+
程序运行结果如下图:

第五章/实现多进程间通信.md

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
python中的multiprocessing模块支持两种方式在进程间通信,都是基于消息传递机制。之前我们介绍过,消息传递机制缺乏同步机制,在交换的进程中不可复制数据。
2+
##使用multiprocessing.Pipe模块
3+
4+
pipe管道在两个端点间搭建一种通信机制,通过在进程间建立通道使得进程间可以相互通信。
5+
为了更好的说明multiprocessing.Pipe对象的使用方法,我们将介绍一个python程序,包含A,B两个进程。进程A发送1到10之间的一个数给进程B,进程B将打印该数,接下来让我们一步步介绍这个程序。
6+
我们首先导入一些我们程序中需要的包,如下:
7+
```python
8+
import os, random
9+
from multiprocessing import Process, Pipe
10+
```
11+
12+
通过os模块的os.getpid()方法使得我们获得进程的PID。os.getpid()将以一种透明的方式返回程序的PID,在我们的程序中,它分别返回producer_task进程和consumer_task进程的PID。
13+
下面我们将定义producer_task方法,该方法返回1到10之间的一个随机数。producer_task方法的关键是调用conn.send方法,conn以参数的形式在主函数中传入producer_task方法。producer_task方法如下:
14+
```python
15+
def producer_task(conn):
16+
value = random.randint(1, 10)
17+
conn.send(value)
18+
print('Value [%d] send by PID [%d]' % (value, os.getpid()))
19+
conn.close()
20+
```
21+
consumer进程将要执行的任务也很简单,它唯一的任务就是接收A进程传递过程的参数,接收本进程的PID,最终打印出来。consumer进程的中传入的consumer_task方法如下:
22+
```python
23+
def consumer_task(conn)
24+
print('Value [%d] received by PID [%d]' % (conn.recv(), os.getpid()))
25+
```
26+
27+
最后一块将介绍如何调用Pipe()方法创建两个连接对象分别用于producer进程和consumer进程,然后通过参数形式各自传递到consumer_task方法和producer_task方法中去,主函数具体如下所是:
28+
```python
29+
if __name__ == '__main__':
30+
producer_conn, consumer_conn = Pipe()
31+
consumer = Process(target=consumer_task,args=(consumer_conn,))
32+
producer = Process(target=producer_task,args=(producer_conn,))
33+
34+
consumer.start()
35+
producer.start()
36+
37+
consumer.join()
38+
producer.join()
39+
```
40+
定义好进程之后,我们便可以调用进程对象的start方法开始执行进程,join方法用于分别等待producer进程和consumer进程执行完毕。下面的截图中我们将看到程序的输出:
41+
42+
##理解multiprocessing.Queue模块
43+
之前小节中我们分析了如何在进程间创建通信通道来传递消息,现在我们将分析如何更有效的传递消息,这里我们使用mutilprocessing模块下的Queue对象。multoprocessing.Queue对象方法和queue.Queue对象方法类似。然后内在实现却不尽相同,比如multiprocess模块使用了内部线程feeder,把缓冲区中的数据传入目标线程相关连接的管道中。管道和队列机制均使用了消息传递机制,节省了使用同步机制带来的开销。

第五章/总结.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
本章中我们介绍了多进程的概念,并使用多进程解决了两个小问题,分别是并行计算斐波那契数列值和设计网络爬虫。
2+
下一章节我们将使用parallel Python模块执行多进程任务,parallel模块并不是python的内部模块。我们还将学习进程间通信相关的知识,使用pipes在进程间通信。

第五章/理解进程的定义.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
线程是操作系统中程序执行和资源调度的基本单位。程序的执行由进程来管理,所涉及到的资源包括数据区,子进程,私有栈以及和其他进程间通信。
2+
3+
##理解进程模型
4+
进程中定义了相关的信息和资源来确保对进程的操纵和控制,操作系统有一个叫PCB的结构来专门来存储这些信息和资源。例如,PCB结构保存如下的信息:
5+
1. Process ID: 这是一个无符号整型数据,标识操作系统中的唯一进程。
6+
2. 程序计数器: 对应下一条要执行的程序指令地址。
7+
3. I/O信息: 包含一组打开的文件和进程相关的设备。
8+
4. 内存分配: 该区域保存进程已经使用内存空间、为该进程预留的内存空间和页表信息。
9+
5. CPU调度: 该区域保存进程优先级信息(and points to the staggering queues)。
10+
6. 优先级: 定义进程获取CPU资源的优先级。
11+
7. 当前状态: 表述该进程是准备状态、等待状态还是运行状态。
12+
8. CPU申请: 保存栈指针和其他信息。
13+
14+
##定义进程状态
15+
进程整个生命周期具有三种状态,分别如下:
16+
* 运行状态: 进程正占用cpu资源。
17+
* 准备状态: 处于进程队列中的进程已经准备好获取cpu资源。
18+
* 等待状态: 进程正在等待执行中的任务所需的I/O操作。

第六章/ReadMe.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,6 @@
1-
##Utilizing Parallel Python
1+
##Utilizing Parallel Python
2+
之前的章节,我们学习了怎么使用multiprocessing模块和ProcessPoolExecutor模块解决两个问题这章将介绍管道和如何使用Parallel Python(PP)利用多进程执行并行任务。
3+
本章会覆盖下面几个知识点:
4+
* 理解进程间通信
5+
* 学习Parallel Python(PP)
6+
* 在SMP架构上使用PP计算斐波那契序列

第六章/理解进程间通信.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
IPC是一种允许多进程间通信的一种机制。
2+
有许多IPC的实现方式,他们依赖于不同架构的运行环境。运行在同一台机器上的进程可以由多种进程间通信方式,比如共享内存、消息队列和管道。如果多进程处于物理上分布式集群上上,我们可以使用RPC的方式。
3+
第五章中我们介绍了Multiprocessing和ProcessPoolExecutor模块,学习了常规的管道的用法。我们学习了共有一个父进程的各个子进程间通信方式,但是有时候我们需要在不相关的进程间传递信息(子进程不共有同一个父进程),我们会想,通过不相关进程的地址空间,是否能够在它们之间建立通信。虽然一个进程不能获取另外一个进程的地址空间,但是我们需要使用一种新的机制——命名管道。
4+
###探索命名管道
5+
对于POSIX系统,例如linux,我们可以把所有归结为文件,我们每操作一个文件,我们可以找到一个文件与之对应,我们还可以找到另外一个文件用于描述这个任务,该文件使得我们可以操控任务对应的文件。
6+
命名管道通过使用文件描述符(该文件描述符对应于要执行的任务,比如FIFO方式读写文件任务)的方式实现进程间IPC通信。在对信息的管理上命名管道不同于常规管道,

0 commit comments

Comments
 (0)