Skip to content

Commit bb4b68b

Browse files
author
liufei01
committed
Add chapter 7
1 parent 839e853 commit bb4b68b

9 files changed

+343
-0
lines changed

第七章/分发简单任务.md

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
在之前,我们已经建立好环境。下面测试一下环境,发送一个计算平方根的任务。
2+
定义任务模块tasks.py。在开始,导入必须的模块。
3+
4+
from math import sqrt
5+
from celery import Celery
6+
7+
然后,创建Celery实例,代表客户端应用:
8+
9+
app = Celery('tasks', broker='redis://192.168.25.21:6379/0')
10+
11+
在初始化时我们传入了模块的名称和broker的地址。
12+
然后,启动result backend,如下:
13+
14+
app.config.CELERY_RESULT_BACKEND = 'redis://192.168.25.21:6379/0'
15+
16+
@app.tack装饰器定义任务:
17+
18+
@app.task
19+
def square_root(value):
20+
return sqrt(value)
21+
22+
到此,我们完成了tasks.py模块的定义,我们需要初始化服务端的workers。我们创建了一个单独的目录叫做8397_07_broker。拷贝tasks.py模块到这个目录,运行如下命令:
23+
24+
$celery –A tasks worker –-loglevel=INFO
25+
26+
上述命令初始化了Clery Server,—A代表Celery应用。下图是初始化的部分截图
27+
28+
![](图片链接地址)
29+
30+
现在,Celery Server等待接收任务并且发送给workers。
31+
下一步就是在客户端创建应用调用tasks。
32+
33+
> 上述步骤不能忽略,因为下面会用在之前创建的东西。
34+
35+
在客户端机器,我们有celery_env虚拟环境,现在创建一个task_dispatcher.py模块很简单,如下步骤;
36+
1. 导入logging模块来显示程序执行信息,导入Celery模块:
37+
38+
import logging
39+
from celery import Celery
40+
41+
2. 下一步是创建Celery实例,和服务端一样:
42+
43+
#logger configuration...
44+
app = Celery('tasks',
45+
broker='redis://192.168.25.21:6379/0')
46+
app.conf.CELERY_RESULT_BACKEND =
47+
'redis://192.168.25.21:6397/0'
48+
49+
50+
由于我们在接下的内容中要复用这个模块来实现任务的调用,下面我们创建一个方法来封装sqrt_task(value)的发送,我们将创建manage_sqrt_task(value)方法:
51+
52+
def manage_sqrt_task(value):
53+
result = app.send_task('tasks.sqrt_task', args=(value,))
54+
logging.info(result.get())
55+
56+
从上述代码我们发现客户端应用不需要知道服务端的实现。通过Celery类中的send_task方法,我们传入module.task格式的字符串和以元组的方式传入参数就可以调用一个任务。最后,我们看一看log中的结果。
57+
__main__中,我们调用了manage_sqrt_task(value)方法:
58+
59+
if __name__ == '__main__':
60+
manage_sqrt_task(4)
61+
62+
下面的截图是执行task_dispatcher.py文件的结果:
63+
64+
![](图片链接地址)
65+
66+
在客户端,通过get()方法得到结果,这是通过send_task()返回的AsyncResult实例中的重要特征。结果如下图:
67+
68+
![](图片链接地址)
69+
70+

第七章/建立环境.md

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
### 建立环境
2+
3+
在本章,我们将使用两台linux机器。第一台主机名foshan,作为客户端,执行app Celery分发的任务。另一台机器主机名Phoenix,作为broker、result backend和任务队列。
4+
5+
### 配置客户端机器
6+
7+
首先配置客户端机器。在这台机器上,用pyvenv建立Python3.3的虚拟环境。用pyvenv的目的是隔离每个项目的开发环境。执行以下命令能够创建虚拟环境:
8+
9+
$pyvenv celery_env
10+
11+
上述命令在当前路径创建一个名为celery_env的文件夹,里面包含所有Python开发环境必须的结构。下图是该目录所包含的内容:
12+
13+
![](图片链接地址)
14+
15+
在创建了虚拟环境之后,我们就可以开始工作并安装需要使用的包。然而,首先我们得激活这个环境,执行以下命名:
16+
17+
$source bin/activate
18+
19+
当命令行提示符改变了,例如在左边出现celery_env,就说明激活完成。所有你安装的包都只在这个目录下有效,而不是在整个系统中有效。
20+
21+
> 用--system-site-packages标识可以创建能够访问系统site-packages的虚拟环境,但是不推荐使用。
22+
23+
现在,我们有一个虚拟环境,假设已经安装好了setuptools或者pip。下面为客户端安装必须的包,如下命令:
24+
25+
$pip install celery
26+
27+
下图是已经安装好的framework v3.1.9,将在本书中使用该版本。
28+
29+
![](图片链接地址)
30+
31+
现在我们要在Celery中安装支持的Redis,这样客户端就可以通过broker传输消息了。用如下命令:
32+
33+
$pip install celery[redis]
34+
35+
现在我们的客户端环境配置好了,在开始编码之前,我们必须配置好服务器端的环境。
36+
37+
38+
### 配置服务器
39+
40+
为了配置服务器,我们首先安装Redis,Redis将作为broker和result backend。使用如下命令:
41+
42+
$sudo apt-get install redis-server
43+
44+
启动Redis:
45+
46+
$redis-server
47+
48+
如果成功,会出现类似下图中的输出
49+
50+
![](图片链接地址)
51+

第七章/总结.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
在本章,我们论述了Celery的分布式任务队列,并且对Celery的架构、组件、基本环境搭建、运行简单应用有了了解。如果单独讲Celery可以写一本书,通过本章需要你对Celery有基本的认识。
2+
在下一章,我们将学习asyncio模块和怎样以异步的方式执行程序。下一章也会涉及到协程(coroutines)和asyncio中协程的应用。
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
计算斐波那契数列的任务已经实现并且运行。所有任务都被发送到默认的Celery队列。然而,有方法路由任务到不同的队列。让我们重构服务器端的架构来实现路由任务。我们将根据任务类型定义队列。
2+
在服务器端启动Celery server时,我们将建立三种不同的队列,它们将被workers消费。斐波那契数列任务有fibo_queue,平方根任务有sqrt_queue,网络爬虫任务有 webcrawler_queue。然而,划分队列有什么好处呢。以下列举一些:
3+
4+
- 相同类型的任务划分为一组,使得监控更加简单
5+
- 定义worker从一个队列取任务,性能更佳
6+
- 可在性能更好的机器上建立任务更重的队列
7+
8+
为了再server上建立队列,我们只需用如下命令初始化Celery:
9+
10+
$celery –A tasks –Q sqrt_queue,fibo_queue,webcrawler_queue worker --loglevel=info
11+
12+
下图是在服务端截图:
13+
14+
![](图片链接地址)
15+
16+
在进行下一个例子之前,我们路由现有的任务到队列中。在服务端,在task_dispatcher.py模块中,我们将修改send_task调用来将任务发送到不同的队列。修改如下:
17+
18+
app.send_task('tasks.sqrt_task', args=(value,),
19+
queue='sqrt_queue', routing_key='sqrt_queue')
20+
21+
然后,修改fibo_task调用,如下:
22+
23+
app.send_task('tasks.fibo_task', args=(x,), queue='fibo_queue',
24+
routing_key='fibo_queue')
25+
26+
> 如果有兴趣监控队列、统计任务数量或者其他,请参考Celery文档 http://celery.readthedocs.org/en/latest/userguide/monitoring.html。
27+
> 在任何情况用Redis,redis-cli都可以作为一个工具。队列、任务、workders都可以被监控,详见 http://celery.readthedocs.org/en/latest/userguide/monitoring.html#workers.
28+

第七章/理解Celery.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
Celery是一个框架,该框架提供机制来简化构建分布式系统的过程。Celery框架通过在同一个网络内的机器之间交换信息来分发任务。任务(task)在Celery中是一个关键的概念,任何形式的job在分发之前都必须封装成任务。
2+
3+
## 为什么使用Celery
4+
5+
Celery有如下优点:
6+
- 分发任务是透明的
7+
- 在并发worker启动时,对其改变很小
8+
- 支持同步、异步、周期和计划任务
9+
- 如果出现了错误,会重新执行任务
10+
11+
> 很多开发者都认为同步任务和实时任务是一样的,实际上它们是完全不同的。对于实时任务,它有一个时间窗口,任务执行必须在Deadline之前完成。如果经过分析,任务在时间窗口内完成不了,那么它将被终止或者暂停直到下次能够完成,而同步任务是当任务执行完后才返回结果。

第七章/理解Celery架构.md

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
Celery架构基于可插拔组件和用*message transport(broker)*实现的消息交换机制。具体的如下图所示:
2+
![](图片链接地址)
3+
4+
现在,让我们详细的介绍Celery的每个组件。
5+
6+
### 处理任务
7+
8+
在上图中的*client*组件,有创建和分派任务到brokers的方法。
9+
分析如下示例代码来演示通过使用@app.task装饰器来定义一个任务,它可以被一个Celery应用的实例访问,下面代码展示了一个简单的`Hello World app`
10+
11+
@app.task
12+
def hello_world():
13+
return "Hello I'm a celery task"
14+
15+
16+
> 任何可以被调用的方法或对象都可以成为任务
17+
18+
正如我们之前所说,有各种类型的任务:同步、异步、周期和计划任务。当我们调用任务,它返回类型AsyncResult的实例。AsyncResult对象中可以查看任务状态,当任务结束后可以查看返回结果。然而,为了利用这个机制,另一个叫做*result backend*的组件必须启动,这将在本章的后面讲解。为了分派任务,我们可以用下面这些方法:
19+
- delay(arg, kwarg=value) : 它会调用apply_aync方法。
20+
- apply_async((arg,), {'kwarg': value}) : 该方法可以为任务设置很多参数,一些参数如下。
21+
- countdown : 默认任务是立即执行,该参数设置经过countdown秒之后执行。
22+
- expires : 代表经过多长时间终止。
23+
- retry : 如果连接或者发送任务失败,该参数可是重试。
24+
- queue : 任务队列。
25+
- serializer : 数据格式,其他还有json、yaml等等
26+
- link : 连接一个或多个即将执行的任务。
27+
- link_error : 连接一个或多个执行失败的任务。
28+
- apply((arg,), {'kwarg': value}) : 在本地进程以同步的方式执行任务,因而阻塞直到结果就绪。
29+
30+
> Celery 提供了查看任务状态的机制,这在跟踪进程的真实状态非常有用。更多的关于内建任务状态的资料请查看http://celery.readthedocs.org/en/latest/reference/celery.states.html
31+
32+
### 发现消息传输(broker)
33+
34+
broker是Celery中的核心组件,通过broker可以发送和接受消息,来完成和workers的通信。Celery支持大量的brokers。然而,对于某些broker,不是所有的Celery机制都实现了。实现功能最全的是**RabbitMQ****Redis**。在本书中,我们将采用Redis作为broker。broker提供在不同客户端应用之间通信的方法,客户端应用发送任务,workers执行任务。可以有多台带有broker的机器等待接收消息,然后发送消息给workers。
35+
36+
37+
### 理解workers
38+
39+
Workers负责执行接收到的任务。Celery提供了一系列的机制,我们可以选择最合适的方式来控制workers的行为。这些机制如下:
40+
- 并发模式:例如进程、线程、协程(Eventlet)和Gevent
41+
- 远程控制:利用该机制,可以通过高优先级队列发送消息到某个特定的worker来改变行为,包括runtime。
42+
- 撤销任务:利用该机制,可以指挥一个或多个workers来忽略一个或多个任务。
43+
44+
更多的特性可以在运行时设定或者改变。比如,worker在某一段时间执行的任务数,worker在哪个队列消费等等。软玉worker更多的信息可以参考 http://docs.celeryproject.org/en/latest/userguide/workers.html#remote-control
45+
46+
### 理解result backends
47+
48+
Result backend组件存储任务的状态和任务返回给客户端应用的结果。Celery支持的result backend之中,比较出彩的有 RabbitMQ, Redis, MongoDB, Memcached。每个result backend都有各自的优缺点,详见 http://docs.celeryproject.org/en/latest/userguide/tasks.html#task-result-backends
49+
50+
现在,我们对Celery架构有了一个大概的认识。下面我们建立一个开发环境来实现一些例子。

第七章/用Celery来分发任务.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
在之前的章节,我们学习了Python并行编程。我们用Python并行模块实现了一些实例,包括斐波那契数列和网络爬虫。我们知道了怎样用管道进行进程间通信和如何在同一个网络的不同机器上分发进程。在本章节,我们将学习如何利用Celery框架在同一个网络的不同机器之间分发任务。
2+
在本章,我们将覆盖一下几个主题:
3+
- 理解Celery
4+
- 理解Celery架构
5+
- 建立环境
6+
- 分发简单任务
7+
- 用Celery来获得斐波那契数列的项
8+
- 用Celery来构建一个分布式网络爬虫系统
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
现在我们将用Celery构建网络爬虫。我们已经有了webcrawler_queue,负责hcrawler任务。然而,在服务器端,我们将在tasks.py模块创建crawl_task任务。
2+
首先,导入re(正则表达式)和requests(HTTP lib)模块,代码如下:
3+
4+
import re
5+
import requests
6+
7+
然后,定义正则表达式,和之前的章节一样;
8+
9+
hTML_link_regex = re.compile(
10+
'<a\s(?:.*?\s)*?href=[\'"](.*?)[\'"].*?>')
11+
12+
然后,替换crawl_task方法,添加@app.task装饰器,修改返回信息,如下:
13+
14+
@app.task
15+
def crawl_task(url):
16+
request_data = requests.get(url)
17+
links = html_link_regex.findall(request_data.text)
18+
message = "The task %s found the following links %s.."\
19+
Return message
20+
21+
links列表不一定要和下图匹配:
22+
23+
![](图片链接地址)
24+
25+
在客户端task_dipatcher.py模块实现crawl_task调用。
26+
首先,我们需要列出数据的输入url_list。代码如下:
27+
28+
29+
url_list = ['http://www.google.com',
30+
'http://br.bing.com',
31+
'http://duckduckgo.com',
32+
'http://github.com',
33+
'http://br.search.yahoo.com']
34+
35+
创建manage_crawl_task方法。
36+
37+
def manage_crawl_task(url_list):
38+
async_result_dict = {url: app.send_task('tasks.crawl_task',
39+
args=(url,), queue='webcrawler_queue',
40+
routing_key='webcrawler_queue') for url in url_list}
41+
for key, value in async_result_dict.items():
42+
if value.ready():
43+
logger.info("%s -> %s" % (key, value.get()))
44+
else:
45+
logger.info("The task [%s] is not ready" %
46+
value.task_id)
47+
48+
和之前创建的manage_fibo_task方法一样,async_result_dict字典包含当前URL和AsyncResult结果。然后我们检查任务的状态获取任务结果。
49+
50+
现在我们在__main__中调用该方法:
51+
52+
if __main__ == '__main__':
53+
#manage_sqrt_task(4)
54+
#manage_fibo_task(input_list)
55+
manage_crawl_task(url_list)
56+
57+
运行task_dispatcher.py代码,在服务器端有如下输出:
58+
59+
![](图片链接地址)
60+
61+
最后,客户端的输出如下:
62+
63+
![](图片链接地址)
64+
65+
Celery是一个强大的工具,在本章我们只是用到了基本的东西。更多的内容建议自己在真实的项目中动手去尝试。
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
让我们再一次计算多个输入的斐波那契数列的项,每个都用分布式的方法。现在的方法和之前的方法改变很小。
2+
我们将终止Celery的执行(Ctrl+C)并且在tasks.py模块(之前创建过)加入fibo_task任务。
3+
4+
@app.task
5+
def fibo_task(value):
6+
a, b = 0,1
7+
for item in range(value):
8+
a, b = b, a + b
9+
message = "The Fibonacci calculated with task id %s" \
10+
" was %d" % (fibo_task.request.id, a)
11+
Return (value, message)
12+
13+
通过\<task.reaquest.id\>得到任务的ID,请求对象是task的对象,task对象提供了task执行的上下文。通过上下文可以得到task的ID等信息。
14+
15+
在tasks.py模块加入了新的任务之后,再一次初始化Celery,结果如下图:
16+
17+
![](图片链接地址)
18+
19+
现在我们把fibo_task任务装载到Celery server,我们将在客户端实现对该任务的调用。
20+
21+
在task_dispatcher.py模块,我们会申明input_list,如下:
22+
23+
input_list = [4, 3, 8, 6, 10]
24+
25+
和前面的做法一样,定义manage_fibo_task方法:
26+
27+
def manage_fibo_task(value_list):
28+
async_result_dict = {x: app.send_task('tasks.fibo_task',
29+
args=(x,)) for x in value_list}
30+
for key, value in async_result_dict.items():
31+
logger.info("Value [%d] -> %s" % (key, value.get()[1]))
32+
33+
在manage_fibo_task方法中,创建了一个叫做async_result_dict的字典,key是传入的要计算的值,value是send_task方法返回的AyncResult对象。通过这个方法,我们可以查看任务的结果和状态。
34+
最后,遍历字典得到输入值和输出结果。AsyncResult的get方法能够得到结果。
35+
get()方法会阻塞进程。一个好的方法是调用ready()方法来检查结果是否返回了。
36+
上述循环可以修改为如下;
37+
38+
for key, value in async_result_dict.items():
39+
if value.ready():
40+
logger.info("Value [%d] -> %s" % (key, value.get()[1]))
41+
else:
42+
logger.info("Task [%s] is not ready" % value.task_id)
43+
44+
不同的任务会有不同的延迟时间,为了防止无限等待,可以用get(timeout=x)方法设置超时。
45+
最后,添加manage_fibo_task的调用,传入input_list。代码如下:
46+
47+
if __name__ == '__main__':
48+
#manage_sqrt_task(4)
49+
manage_fibo_task(input_list)
50+
51+
当我们执行task_dispatcher.py后,输入如下:
52+
53+
![](图片链接地址)
54+
55+
在客户端有如下输出:
56+
57+
![](图片链接地址)
58+

0 commit comments

Comments
 (0)