Celery

Celery

官网

Celery 官网://www.celeryproject.org/

Celery 官方文档英文版://docs.celeryproject.org/en/latest/index.html

Celery 官方文档中文版://docs.jinkan.org/docs/celery/

Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度

# 官网解释
"""
Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.
"""

Celery异步任务框架

"""
1)可以不依赖任何服务器,通过自身命令,启动服务(内部支持socket)
2)celery服务为为其他项目服务提供异步解决任务需求的
注:会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求

人是一个独立运行的服务 | 医院也是一个独立运行的服务
	正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题
	人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求
"""

Celery架构

Celery的架构由三部分组成,消息中间件(message broker)、任务执行单元(worker)和 任务执行结果存储(task result store)组成。

1

消息中间件

Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等

任务执行单元

Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

任务结果存储

Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等

使用场景

异步执行:解决耗时任务,将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等

延迟执行:解决延迟任务

定时执行:解决周期(周期)任务,比如每天数据统计

Celery的安装配置

安装:pip install celery

消息中间件:RabbitMQ/Redis

app=Celery(‘任务名’, broker=’xxx’, backend=’xxx’)

注意如果是windows平台还需要安装:pip install eventlet

两种celery任务结构:提倡用包管理,结构更清晰

方式一:简单使用

image-20220502204920533

# 第一步:定义一个py文件(名字随意,celery_task) 
"""celery_task.py"""
from celery import Celery
backend = 'redis://127.0.0.1:6379/1'   # 结果存储
broker = 'redis://127.0.0.1:6379/2'    # 消息中间件
app = Celery(__name__,broker=broker,backend=backend)  # __name__区分__main__

# 被它修饰,就变成了celery的任务
@app.task
def add(a,b):
    return a+b

# 第二步:提交任务(新建一个py文件:submit_task)
"""submit_task.py"""
from celery_task import add
# 异步调用
# 只是把任务提交到了redis中,但是没有执行,返回一个唯一标识,后期使用唯一标识去看任务执行结果
res=add.delay(33,41)
print(res)  # 2ddb35df-25f2-4f7c-8405-0bd7b1fa5645


# 第三步:任务执行单元执行,使用命令启动worker   
格式:celery -A 文件名  worker  -l 日志输出级别   (win平台+-P eventlet)
celery -A celery_task worker -l info -P eventlet
'''
celery_task:py文件的名字
-l info:日志输出级别是info 
-P eventlet  在win平台需要下载,pip  install  eventlet
'''
#如果队列里有任务,就会执行,如果没有任务,worker就等在这                                    
                                    
# 第四步:查询结果是否执行完成  get_result.py   
"""get_result.py"""
from celery_task import app

from celery.result import AsyncResult

id = '2ddb35df-25f2-4f7c-8405-0bd7b1fa5645'
if __name__ == '__main__':
    asy = AsyncResult(id=id, app=app)
    if asy.successful():
        result = asy.get()
        print(result)
    elif asy.failed():
        print('任务失败')
    elif asy.status == 'PENDING':
        print('任务等待中被执行')
    elif asy.status == 'RETRY':
        print('任务异常后正在重试')
    elif asy.status == 'STARTED':
        print('任务已经开始被执行')

方法二:包管理结构(推荐)

随便定义包名,但是包内必须要有celery.py

1 包结构
	celery_task
        __init__.py
        celery.py
        course_task.py
        home_task.py
        user_task.py

步骤

  • 创建包,包下写celery.py文件,文件内写celery任务

    from celery import Celery
    backend = 'redis://127.0.0.1:6379/1'
    broker = 'redis://127.0.0.1:6379/2'
    app = Celery(__name__, broker=broker, backend=backend,include=['celery_task.add_task'])
    # include内写app管理的任务
    
  • 任意位置提交任务

    from celery_task.add_task import add
    from celery_task.celery import app
    res=add.delay(100,200)
    print(res)
    # 提交任务delay在任意位置提交就可以,只需将celery任务导过来即可
    
  • 启动worker包管理只需去包所在的根路径启动就可以了,不需要切换路径到包内去启动worker,因为包下有celery.py了

    scripts> celery -A celery_task worker -l info -P eventlet
    
  • 查看结果

    from celery_task import app
    
    from celery.result import AsyncResult
    
    id = '2ddb35df-25f2-4f7c-8405-0bd7b1fa5645'
    if __name__ == '__main__':
        asy = AsyncResult(id=id, app=app)
        if asy.successful():
            result = asy.get()
            print(result)
        elif asy.failed():
            print('任务失败')
        elif asy.status == 'PENDING':
            print('任务等待中被执行')
        elif asy.status == 'RETRY':
            print('任务异常后正在重试')
        elif asy.status == 'STARTED':
            print('任务已经开始被执行')
    

【待续】

Tags: