Celery:小試牛刀

Celery是如何工作的?

Celery 由於 其分佈式體系結構,在某種程度上可能難以理解。下圖是典型Django-Celery設置的高級示意圖(FROM O’REILLY):

當請求到達時,您可以在處理它時調用Celery任務。調用任務的函數會立即返回,而不會阻塞當前進程。實際上,任務尚未完成執行,但是任務消息已進入任務隊列(或許多可能的任務隊列之一)。

workers 是獨立的進程,用於監視任務隊列中是否有新任務並實際執行它們,他們拿起任務消息、處理任務、存儲結果。

一、安裝一個broker

Celery需要一個發送和接收消息的解決方案,即一個消息代理(message broker)服務,常用的broker包括:

RabbitMQ功能齊全,穩定,耐用且易於安裝,是生產環境的絕佳選擇。

Ubuntu安裝:

$ sudo apt-get install rabbitmq-server

Docker安裝:

$ docker run -d -p 5672:5672 rabbitmq

Redis

Redis也具有完整的功能,但是在突然終止或電源故障的情況下更容易丟失數據。

Ubuntu安裝:

$ sudo apt install redis-server

Docker安裝:

$ docker run -d -p 6379:6379 redis

二、安裝Celery

$ pip install celery

三、編寫Celery任務代碼

首先導入Celery,創建一個Celery對象,這個對象將作為一個操作 Celery 的入口,如創建任務,管理workers等。

以下示例會把所有東西都寫在一個模塊中,但是對於大型項目,您需要創建一個專用模塊

# tasks.py
import time
from celery import Celery

app = Celery('tasks', ,broker='pyamqp://guest@localhost//')

@app.task
def add(x, y):
    print('--------start---------')
    for i in range(5):
        print(f'第{i}秒')
    print('--------over----------')
    return x + y

第一個參數是當前模塊的名稱,這是唯一的必需參數。

第二個參數指定要使用的消息代理的URL。這裡使用RabbitMQ(也是默認選項)。

若使用Redis:

app = Celery('tasks', broker='redis://localhost:6379/0')

四、啟動 worker 進程

$ celery -A tasks worker --loglevel=INFO

在生產環境中,需要在後台將工作程序作為守護程序運行。為此,需要使用 平台提供的工具 或 類似supervisord的工具

五、調用任務

調用任務需要導入帶有celery示例的模塊,這裡沒有重新創建一個模塊導入,而是使用命令行模式。要調用我們定義的任務,可以使用delay()(詳情參閱 調用任務):

>>> from tasks import add
>>> add.delay(4, 4)

調用任務將返回一個AsyncResult實例,這可用於檢查任務的狀態,等待任務完成或獲取其返回值(或者如果任務失敗,則獲取異常和回溯)

默認情況下執行任務不返回結果。為了執行遠程過程調用或跟蹤數據庫中的任務結果,需要配置result backend

六、獲取運行結果

如果要跟蹤任務的狀態,Celery需要將狀態存儲或發送到某個地方。有多個result backend可供選擇:SQLAlchemy / Django ORM, MongoDBMemcachedRedisRPCRabbitMQ / AMQP)等。

下面使用 RPC 作為result backend,該後端將狀態作為瞬態消息發送回去。使用backend參數配置Celery對象的result backend

app = Celery('tasks', backend='rpc://', broker='pyamqp://')

或者,如果使用 Redis 作為result backend,但仍然使用 RabbitMQ 作為 broker(流行的組合):

app = Celery('tasks', backend='redis://localhost', broker='pyamqp://')

更多result backend配置參閱「result backend

我們再次調用該任務:

>>> result = add.delay(4, 4)
>>> result.ready() # 檢查是否完成任務,返回布爾值

詳情見有關celery.result對象的完整參考

七、配置Celery

對於大多數使用情況,默認配置就夠了,但是可以配置更多選項使Celery根據需要工作。詳細配置見「配置和默認值」

可以直接在應用程序上設置配置,也可以使用專用的配置模塊設置配置。例如配置用於序列化任務負載的默認序列化器:

# 配置一個設置:
app.conf.task_serializer = 'json'

# 一次配置許多設置,則可以使用update
app.conf.update(
    task_serializer='json',
    accept_content=['json'],  # Ignore other content
    result_serializer='json',
    timezone='Europe/Oslo',
    enable_utc=True,
)

對於較大的項目,建議使用專用的配置模塊。

app.config_from_object('celeryconfig')

celeryconfig.py必須可用於從當前目錄或Python路徑中加載

celeryconfig.py

broker_url = 'pyamqp://'
result_backend = 'rpc://'

task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True

參考