Skip to content

Commit 788246b

Browse files
author
wangchao
committed
更新
1 parent 96631ed commit 788246b

14 files changed

+367
-231
lines changed
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
# 分发简单任务
2+
3+
在之前,我们已经建立好环境。下面测试一下环境,发送一个计算平方根的任务。
4+
5+
定义任务模块`tasks.py`。在开始,导入必须的模块。
6+
7+
```python
8+
from math import sqrt
9+
from celery import Celery
10+
```
11+
12+
然后,创建`Celery`实例,代表客户端应用:
13+
14+
```python
15+
app = Celery('tasks', broker='redis://192.168.25.21:6379/0')
16+
```
17+
18+
在初始化时我们传入了模块的名称和`broker`的地址。
19+
20+
然后,启动`result backend`,如下:
21+
22+
```python
23+
app.config.CELERY_RESULT_BACKEND = 'redis://192.168.25.21:6379/0'
24+
25+
# 较新的版本(v5.2.7)直接填充在celery app的初始化参数中.
26+
app = Celery('tasks', broker='redis://localhost/0', backend='redis://localhost/0')
27+
```
28+
29+
`@app.tack`装饰器定义任务:
30+
31+
```python
32+
@app.task
33+
def sqrt_task(value):
34+
return sqrt(value)
35+
```
36+
37+
到此,我们完成了`tasks.py`模块的定义,我们需要初始化服务端的`workers`。我们创建了一个单独的目录叫做`8397_07_broker`。拷贝`tasks.py`模块到这个目录,运行如下命令:
38+
39+
```shell
40+
$celery –A tasks worker –-loglevel=INFO
41+
```
42+
43+
上述命令初始化了**Clery Server**`—A`代表`Celery`应用。下图是初始化的部分截图
44+
45+
```shell
46+
$# celery -A tasks worker --loglevel=INFO
47+
/opt/celery_env/lib/python3.9/site-packages/celery/platforms.py:840: SecurityWarning: You're running the worker with superuser privileges: this is
48+
absolutely not recommended!
49+
50+
Please specify a different user using the --uid option.
51+
52+
User information: uid=0 euid=0 gid=0 egid=0
53+
54+
warnings.warn(SecurityWarning(ROOT_DISCOURAGED.format(
55+
56+
-------------- [email protected] v5.2.7 (dawn-chorus)
57+
--- ***** -----
58+
-- ******* ---- Linux-3.10.0-957.el7.x86_64-x86_64-with-glibc2.17 2023-03-06 16:12:10
59+
- *** --- * ---
60+
- ** ---------- [config]
61+
- ** ---------- .> app: tasks:0x7fe5cbea9b80
62+
- ** ---------- .> transport: redis://localhost:6379/0
63+
- ** ---------- .> results: redis://localhost/0
64+
- *** --- * --- .> concurrency: 2 (prefork)
65+
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
66+
--- ***** -----
67+
-------------- [queues]
68+
.> celery exchange=celery(direct) key=celery
69+
70+
71+
[tasks]
72+
. tasks.square_root
73+
74+
[2023-03-06 16:12:10,866: INFO/MainProcess] Connected to redis://localhost:6379/0
75+
[2023-03-06 16:12:10,871: INFO/MainProcess] mingle: searching for neighbors
76+
[2023-03-06 16:12:11,897: INFO/MainProcess] mingle: all alone
77+
[2023-03-06 16:12:11,929: INFO/MainProcess] [email protected] ready.
78+
```
79+
80+
现在,**Celery Server**等待接收任务并且发送给`workers`。
81+
82+
下一步就是在客户端创建应用调用`tasks`。
83+
84+
!!! info ""
85+
86+
上述步骤不能忽略,因为下面会用在之前创建的东西。
87+
88+
在客户端机器,我们有**celery_env**虚拟环境,现在创建一个`task_dispatcher.py`模块很简单,如下步骤;
89+
90+
1. 导入logging模块来显示程序执行信息,导入Celery模块:
91+
92+
```python
93+
import logging
94+
from celery import Celery
95+
```
96+
97+
2. 下一步是创建Celery实例,和服务端一样:
98+
99+
```python
100+
#logger configuration...
101+
app = Celery('tasks', broker='redis://192.168.25.21:6379/0')
102+
app.conf.CELERY_RESULT_BACKEND = 'redis://192.168.25.21:6397/0'
103+
```
104+
105+
由于我们在接下的内容中要复用这个模块来实现任务的调用,下面我们创建一个方法来封装`sqrt_task(value)`的发送,我们将创建`manage_sqrt_task(value)`方法:
106+
107+
```python
108+
def manage_sqrt_task(value):
109+
result = app.send_task('tasks.sqrt_task', args=(value,))
110+
logging.info(result.get())
111+
```
112+
113+
从上述代码我们发现客户端应用不需要知道服务端的实现。通过**Celery**类中的`send_task`方法,我们传入`module.task`格式的字符串和以元组的方式传入参数就可以调用一个任务。最后,我们看一看`log`中的结果。
114+
在`__main__`中,我们调用了`manage_sqrt_task(value)`方法:
115+
116+
```python
117+
if __name__ == '__main__':
118+
manage_sqrt_task(4)
119+
```
120+
121+
下面的截图是执行`task_dispatcher.py`文件的结果:
122+
123+
```shell
124+
[2023-03-06 16:18:45,481: INFO/MainProcess] Task tasks.sqrt_task[3ecab729-f1cb-4f29-bb47-b713b2e563ed] received
125+
[2023-03-06 16:18:45,500: INFO/ForkPoolWorker-2] Task tasks.sqrt_task[3ecab729-f1cb-4f29-bb47-b713b2e563ed] succeeded in 0.015412827953696251s: 2.0
126+
```
127+
128+
在客户端,通过`get()`方法得到结果,这是通过`send_task()`返回的`AsyncResult`实例中的重要特征。结果如下图:
129+
130+
```shell
131+
$# python task_dispatcher.py
132+
2023-03-06 16:26:05,841 - 2.0
133+
```
134+
135+
## 完整案例
136+
137+
`tasks.py`
138+
139+
```python
140+
from math import sqrt
141+
from celery import Celery
142+
143+
app = Celery('tasks', broker='redis://localhost/0', backend='redis://localhost/0')
144+
145+
146+
@app.task
147+
def sqrt_task(value):
148+
return sqrt(value)
149+
```
150+
151+
`task_dispatcher.py`
152+
153+
```python
154+
import logging
155+
from celery import Celery
156+
157+
logger = logging.getLogger()
158+
logger.setLevel(logging.DEBUG)
159+
formatter = logging.Formatter('%(asctime)s - %(message)s')
160+
161+
ch = logging.StreamHandler()
162+
ch.setLevel(logging.DEBUG)
163+
ch.setFormatter(formatter)
164+
logger.addHandler(ch)
165+
166+
app = Celery('tasks', broker='redis://localhost/0', backend='redis://localhost/0')
167+
168+
def manage_sqrt_task(value):
169+
result = app.send_task('tasks.sqrt_task', args=(value,))
170+
logger.info(result.get())
171+
172+
173+
if __name__ == '__main__':
174+
print(manage_sqrt_task(4))
175+
```

docs/chapter7/index.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
# 使用Celery分发任务
22

3-
在上一章中,我们了解并使用了 并行 Python。 我们看到了案例研究的实施,包括斐波那契数列项和使用并行 Python 模块的 Web 爬虫。 我们学习了如何使用管道在进程之间建立通信,以及如何在网络中的不同机器之间分配进程。 在本章中,我们将研究如何使用 `Celery` 框架在网络中的不同机器之间分配任务
3+
在上一章中,我们了解并使用了**并行 Python**(parallel Python)。 我们看到了案例研究的实施,包括斐波那契数列项和使用并行 Python 模块的 Web 爬虫。 我们学习了如何使用管道在进程之间建立通信,以及如何在网络中的不同机器之间分配进程。 在本章中,我们将研究如何使用 Celery 框架在网络中的不同机器之间分发任务
44

55
在本章中,我们将讨论以下主题:
66

77
- 理解 Celery
88
- 理解 Celery 的架构
9-
- 设置环境
9+
- 搭建环境
1010
- 分派一个简单的任务
1111
- 使用 Celery 获取斐波那契数列项
1212
- 使用 Celery 制作分布式网络爬虫
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
# 建立环境
2+
3+
在本节中,我们将在 `Linux` 中设置两台机器。 第一个,主机名 `foshan`,将执行客户端角色,应用程序 `Celery` 将在其中调度要执行的任务。 另一台主机名为 `Phoenix` 的机器将执行**代理**(broker)、**结果后端**(result backend)和worker使用的队列的角色。
4+
5+
## 配置客户端机器
6+
7+
让我们开始设置客户端机器。 在这台机器上,我们将使用 `pyvenv` 工具设置一个 `Python 3.3` 的虚拟环境。 `pyvenv` 的目标是不使用额外的模块污染操作系统中存在的 `Python`,而是将每个项目所需的开发环境分开。 我们将执行以下命令来创建我们的虚拟环境:
8+
9+
```shell
10+
$pyvenv celery_env
11+
```
12+
13+
上述命令在当前路径创建一个名为`celery_env`的文件夹,里面包含所有Python开发环境必须的结构。下图是该目录所包含的内容:
14+
15+
```shell
16+
# 这里使用的最新的python venv模块
17+
$# ./Python-3.9.14/python -m venv celery_env
18+
$# ls celery_env/
19+
bin include lib lib64 pyvenv.cfg
20+
```
21+
22+
在创建了虚拟环境之后,我们就可以开始工作并安装需要使用的包。然而,首先我们得激活这个环境,执行以下命名:
23+
24+
```shell
25+
$# source celery_env/bin/activate
26+
```
27+
28+
当命令行提示符改变了,例如在左边出现`celery_env`,就说明激活完成。所有你安装的包都只在这个目录下有效,而不是在整个系统中有效。
29+
30+
```shell
31+
(celery_env) $# ls celery_env/
32+
bin include lib lib64 pyvenv.cfg
33+
```
34+
35+
!!! info ""
36+
37+
用`--system-site-packages`标识可以创建能够访问系统`site-packages`的虚拟环境,但是不推荐使用。
38+
39+
现在,我们有一个虚拟环境,假设已经安装好了`setuptools`或者`pip`。下面为客户端安装必须的包,如下命令:
40+
41+
```shell
42+
$pip install celery
43+
```
44+
45+
下图是已经安装好的framework v3.1.9,将在本书中使用该版本。
46+
47+
```shell
48+
# 由于当前(2023)python2已不再支持,顾这里安装的最新版本v5.2.7
49+
(celery_env) $# python
50+
Python 3.9.14 (main, Sep 19 2022, 12:04:09)
51+
[GCC 4.8.5 20150623 (Red Hat 4.8.5-44)] on linux
52+
Type "help", "copyright", "credits" or "license" for more information.
53+
>>> import celery
54+
>>> celery.VERSION
55+
version_info_t(major=5, minor=2, micro=7, releaselevel='', serial='')
56+
>>>
57+
```
58+
59+
现在我们要在**Celery**中安装支持的**Redis**,这样客户端就可以通过`broker`传输消息了。用如下命令:
60+
61+
```shell
62+
$pip install celery[redis]
63+
```
64+
65+
现在我们的客户端环境配置好了,在开始编码之前,我们必须配置好服务器端的环境。
66+
67+
## 配置服务器
68+
69+
为了配置服务器,我们首先安装**Redis****Redis**将作为`broker``result backend`。使用如下命令:
70+
71+
```shell
72+
$sudo apt-get install redis-server
73+
```
74+
75+
启动Redis:
76+
77+
```shell
78+
$redis-server
79+
```
80+
81+
如果成功,会出现类似下图中的输出
82+
83+
```log
84+
2905:C 06 Mar 15:53:46.571 * supervised by systemd, will signal readiness
85+
_._
86+
_.-``__ ''-._
87+
_.-`` `. `_. ''-._ Redis 3.2.12 (00000000/0) 64 bit
88+
.-`` .-```. ```\/ _.,_ ''-._
89+
( ' , .-` | `, ) Running in standalone mode
90+
|`-._`-...-` __...-.``-._|'` _.-'| Port: 6379
91+
| `-._ `._ / _.-' | PID: 2905
92+
`-._ `-._ `-./ _.-' _.-'
93+
|`-._`-._ `-.__.-' _.-'_.-'|
94+
| `-._`-._ _.-'_.-' | http://redis.io
95+
`-._ `-._`-.__.-'_.-' _.-'
96+
|`-._`-._ `-.__.-' _.-'_.-'|
97+
| `-._`-._ _.-'_.-' |
98+
`-._ `-._`-.__.-'_.-' _.-'
99+
`-._ `-.__.-' _.-'
100+
`-._ _.-'
101+
`-.__.-'
102+
103+
2905:M 06 Mar 15:53:46.574 # WARNING: The TCP backlog setting of 511 cannot be enforced because /proc/sys/net/core/somaxconn is set to the lower value of 128.
104+
2905:M 06 Mar 15:53:46.574 # Server started, Redis version 3.2.12
105+
2905:M 06 Mar 15:53:46.574 # WARNING overcommit_memory is set to 0! Background save may fail under low memory condition. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.
106+
2905:M 06 Mar 15:53:46.574 # WARNING you have Transparent Huge Pages (THP) support enabled in your kernel. This will create latency and memory usage issues with Redis. To fix this issue run the command 'echo never > /sys/kernel/mm/transparent_hugepage/enabled' as root, and add it to your /etc/rc.local in order to retain the setting after a reboot. Redis must be restarted after THP is disabled.
107+
2905:M 06 Mar 15:53:46.574 * The server is now ready to accept connections on port 6379
108+
```

docs/chapter7/understanding_celery.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# 理解Celery
2+
3+
**Celery** 是一个框架,该框架提供机制来简化构建分布式系统的过程。 Celery 框架通过在作为网络互连的机器或本地网络之间交换消息来使用**工作单元**(tasks)分布的概念。 任务是 **Celery** 中的关键概念; 我们必须分发的任何类型的工作都必须事先封装在任务中。
4+
5+
## 为什么使用Celery
6+
7+
它以透明的方式在分布在 Internet 上的工作人员或本地工作人员之间分配任务
8+
9+
Celery有如下优点:
10+
11+
- 它以透明的方式在网络上分布的worker或本地网络之间分配任务
12+
- 它通过设置(进程、线程、Gevent、Eventlet)以一种简单的方式改变了worker的并发性
13+
- 支持同步、异步、周期、定时任务
14+
- 它会在出现错误时重新执行任务
15+
16+
!!! info ""
17+
18+
很多开发者都认为**同步任务**(synchronous tasks)和**实时任务**(real-time tasks)是一样的,实际上它们是完全不同的。对于**实时任务**,它有一个时间窗口,任务执行必须在`Deadline`之前完成。如果经过分析,任务在时间窗口内完成不了,那么它将被终止或者暂停直到下次能够完成,而**同步任务**是当任务执行完后才返回结果。
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
# 理解Celery架构
2+
3+
Celery架构基于**可插拔组件**(pluggable components)和根据选择的**消息传输**(代理)(message transport(broker))协议实现的消息交换机制。下图说明了这一点:
4+
5+
![1](../imgs/7-01.png)
6+
7+
现在,让我们详细的介绍Celery的每个组件。
8+
9+
## 处理任务
10+
11+
在上图中的*client*组件,有创建和分派任务到brokers的方法。
12+
13+
分析如下示例代码来演示通过使用`@app.task`装饰器来定义一个任务,它可以被一个**Celery**应用的实例访问,下面代码展示了一个简单的`Hello World app`
14+
15+
```python
16+
@app.task
17+
def hello_world():
18+
return "Hello I'm a celery task"
19+
```
20+
21+
!!! info ""
22+
23+
任何可执行的方法或对象都可以成为任务 (Any callable can be a task.)
24+
25+
正如我们前面提到的,有几种类型的任务:`同步``异步``定期``计划`。 当我们执行任务调用时,它会返回一个 `AsyncResult` 类型的实例。 `AsyncResult` 对象是一个对象,它允许检查任务状态、它的结束,并且很明显,它在存在时返回。 但是,要使用此机制,另一个组件(结果后端)必须处于活动状态。 这将在本章中进一步解释。 要分派任务,我们应该使用任务的以下一些方法:
26+
27+
- `delay(arg, kwarg=value)` : 这是调用 `apply_async` 方法的快捷方式。
28+
- `apply_async((arg,), {'kwarg': value})` : 这允许为任务的执行设置一系列有趣的参数。 其中一些如下:
29+
- `countdown` : 默认任务是立即执行,该参数设置经过`countdown`秒之后执行。
30+
- `expires` : 代表经过多长时间终止。
31+
- `retry` : 此参数决定在连接或发送任务失败的情况下,是否必须重新发送。
32+
- `queue` : 该任务所处的任务队列。
33+
- `serializer` : 这表示磁盘中任务序列化的数据格式,一些示例包括 json、yaml 等。
34+
- `link` : 如果发送的任务成功执行,这将链接一个或多个要执行的任务。
35+
- `link_error` : 这将在任务执行失败的情况下链接一个或多个要执行的任务。
36+
- `apply((arg,), {'kwarg': value})` : 这会以同步方式在本地进程中执行任务,从而阻塞直到结果准备就绪为止。
37+
38+
!!! info ""
39+
40+
Celery 还提供了伴随任务状态的机制,这对于跟踪和映射处理的真实状态非常有用。 有关内置任务状态的更多信息,请访问<http://celery.readthedocs.org/en/latest/reference/celery.states.html>{target="_blank"}
41+
42+
## 理解消息转发(broker)
43+
44+
`broker`绝对是 **Celery** 中的关键组成部分。 通过它,我们可以发送和接收消息并与`worker`沟通。 **Celery** 支持大量的代理。 然而,对于其中一些,并不是所有的 `Celery` 机制都得到了实现。 就功能而言最完整的是 `RabbitMQ``Redis`。 在本书中,我们将使用 `Redis` 作为`broker`和结果后端。 `broker`的功能是在发送任务的客户端应用程序和执行任务的工作线程之间提供一种通信方式。 这是通过使用任务队列完成的。 我们可以有几台带有代理的网络机器等待接收消息以供`workers`使用。
45+
46+
## 理解workers
47+
48+
`Workers`负责执行接收到的任务。**Celery**提供了一系列的机制,我们可以选择最合适的方式来控制`workers`的行为。这些机制如下:
49+
50+
- **并发模式**(Concurrency mode):例如**进程****线程****协程**(Eventlet)和**Gevent**
51+
- **远程控制**(Remote control):使用这种机制,可以通过高优先级队列发送消息到某个特定的`worker`来改变行为,包括在**运行时**(runtime)。
52+
- **撤销任务**(Revoking tasks):使用这种机制,我们可以指示一个或多个`worker`忽略一个或多个任务的执行。
53+
54+
如果需要,可以在运行时设置甚至更改更多功能。 比如`worker`在一段时间内执行的任务数,`worker`从哪个`queue`中消耗的时间最多等等。 有关`worker`的更多信息,请访问<http://docs.celeryproject.org/en/latest/userguide/workers.html#remote-control>{target="_blank"}
55+
56+
## 理解result backends
57+
58+
**结果后端**(result backend)组件的作用是存储返回给客户端应用程序的任务的状态和结果。 从 `Celery` 支持的结果后端,比较出彩的有 `RabbitMQ``Redis``MongoDB``Memcached` 等。 前面列出的每个**结果后端**(result backend)都有优点和缺点。 有关详细信息,请参阅 <http://docs.celeryproject.org/en/latest/userguide/tasks.html#task-result-backends>{target="_blank"}。
59+
60+
现在,我们对 `Celery` 架构及其组件有了一个大致的了解。 因此,让我们建立一个开发环境来实现一些例子。

0 commit comments

Comments
 (0)