1、Celery的概念
Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery, 举几个实例场景中可用的例子:
异步任务:将耗时的操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音频处理等等
定时任务: 每天定时执行爬虫爬取指定内容
还可以使用celery实现简单的分布式爬虫系统等等
Celery 在执行任务时需要通过一个消息中间件(Broker)来接收和发送任务消息,以及存储任务结果, 一般使用rabbitMQ or Redis,
2、Celery的特点
- 简单,易于使用和维护,有丰富的文档。
- 高效,支持多线程、多进程、协程模式运行,单个celery进程每分钟可以处理数百万个任务。
- 灵活,celery中几乎每个部分都可以自定义扩展。
3、Celery的作用
应用解耦,异步处理,流量削锋,消息通讯。
celery通过消息(任务)进行通信,
celery通常使用一个叫Broker(中间人/消息中间件/消息队列/任务队列)来协助clients(任务的发出者/客户端)和worker(任务的处理者/工作进程)进行通信的.
clients发出消息到任务队列中,broker将任务队列中的信息派发给worker来处理。
client ---> 消息 --> Broker(消息队列) -----> 消息 ---> worker(celery运行起来的工作进程)
消息队列(Message Queue),也叫消息队列中间件,简称消息中间件,它是一个独立运行的程序,表示在消息的传输过程中临时保存消息的容器。
所谓的消息,是指代在两台计算机或2个应用程序之间传送的数据。消息可以非常简单,例如文本字符串或者数字,也可以是更复杂的json数据或hash数据等。
所谓的队列,是一种先进先出、后进呼后出的数据结构,python中的list数据类型就可以很方便地用来实现队列结构。
目前开发中,使用较多的消息队列有RabbitMQ,Kafka,RocketMQ,MetaMQ,ZeroMQ,ActiveMQ等,当然,像redis、mysql、MongoDB,也可以充当消息中间件,但是相对而言,没有上面那么专业和性能稳定。
并发任务10k以下的,直接使用redis
并发任务10k以上,1000k以下的,直接使用RabbitMQ
并发任务1000k以上的,直接使用RocketMQ
4、Celery的运行架构
Celery的运行架构由三部分组成,消息队列(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。
一个celery系统可以包含很多的worker和broker
Celery本身不提供消息队列功能,但是可以很方便地和第三方提供的消息中间件进行集成,包括Redis,RabbitMQ,RocketMQ等
5、Celery执行流程图
6、Celery安装
pip install -U celery -i https://pypi.tuna.tsinghua.edu.cn/simple
注意:
Celery不建议在windows系统下使用,Celery在4.0版本以后不再支持windows系统,所以如果要在windows下使用只能安装4.0以前的版本,而且即便是4.0之前的版本,在windows系统下也是不能单独使用的,需要安装gevent、geventlet或eventlet协程模块
7、基本使用
使用celery第一件要做的最为重要的事情是需要先创建一个Celery实例对象,我们一般叫做celery应用对象,或者更简单直接叫做一个app。app应用对象是我们使用celery所有功能的入口,比如启动celery、创建任务,管理任务,执行任务等.
celery框架有2种使用方式
- 一种是单独一个项目目录,
- 另一种就是Celery集成到web项目框架中。
8、Celery 使用示例
8.1 Celery作为一个单独项目运行
例如,mycelery代码目录直接放在项目根目录下即可,路径如下:
服务端项目根目录/
└── mycelery/
├── settings.py # 配置文件
├── __init__.py
├── main.py # 入口程序
└── sms/ # 异步任务目录,这里拿发送短信来举例,一个类型的任务就一个目录
└── tasks.py # 任务的文件,文件名必须是tasks.py!!!每一个任务就是一个被装饰的函数,写在任务文件中
main.py,代码:
from celery import Celery
# 实例化celery应用,参数一般为项目应用名
app = Celery("blog")
# 通过app实例对象加载配置文件
app.config_from_object("mycelery.settings")
# 注册任务, 自动搜索并加载任务
# 参数必须必须是一个列表,里面的每一个任务都是任务的路径名称
# app.autodiscover_tasks(["任务1","任务2",....])
app.autodiscover_tasks(["mycelery.sms","mycelery.email"])
# 启动Celery的终端命令
# 强烈建议切换目录到项目的根目录下启动celery!!
# celery -A mycelery.main worker --loglevel=info
配置文件settings.py,代码:
# 任务队列的链接地址
broker_url = 'redis://127.0.0.1:6379/14'
# 结果队列的链接地址
result_backend = 'redis://127.0.0.1:6379/15'
关于配置信息的官方文档:https://docs.celeryproject.org/en/master/userguide/configuration.html
创建任务文件sms/tasks.py,任务文件名必须固定为"tasks.py",并创建任务,代码:
from ..main import app
from utils.tencent_sms import send_message
"""
# 如果不添加name参数,启动celery后的任务名称就会很长
[tasks]
. mycelery.sms.tasks.send_sms1
. mycelery.sms.tasks.send_sms3
. send_sms2
. send_sms4
"""
@app.task(name="send_sms1")
def send_sms1():
"""没有任务参数,没有返回结果的异步任务"""
print('任务:send_sms1执行了。。。')
# s2 = send.sms