Celery:進一步探索

一、創建Celery專用模組

對於大型項目,一般需要創建一個專用模組,便於管理。

1.1 模組結構

proj/__init__.py
    /celery.py
    /tasks.py

proj/celery.py

from celery import Celery

app = Celery('proj',
             broker='amqp://',
             backend='rpc://',
             include=['proj.tasks'])

app.conf.update(
    result_expires=3600,
)

# include參數是 worker 進程啟動時要導入的模組的列表。需要在此處添加我們的任務模組,以便 workers 能夠找到我們的任務。

proj/tasks.py

from .celery import app

@app.task
def add(x, y):
    return x + y

@app.task
def mul(x, y):
    return x * y

@app.task
def xsum(numbers):
    return sum(numbers)

1.2 啟動 worker 進程

$ celery -A proj worker -l INFO

  • Concurrency:是用於同時處理任務的最大工作進程數,默認是CPU核心數,詳情見celery worker -c。Celery還支援使用Eventlet,Gevent並在單個執行緒中運行(請參閱Concurrency
  • task events:是否發送Celery的監控資訊,用於如Flower之類的實時監控程式,詳見Monitoring and Management guide.
  • queues:worker進程可以拿取任務的隊列集合,可以規定worker一次從多個隊列中拿取任務,訂製更高級的生產者消費者模型時參見Routing GuideWorkers Guide.

1.3 停止 worker 進程

當worker進程已經在前台運行時,使用 Control-c 即可停止運行。

1.4 後台運行 worker 進程

在生產環境,一般將Celery worker在後台運行,且使用celery multi命令在後台啟動一個或多個worker進程。

啟動:

$ celery multi start w1 -A proj -l INFO

重啟:

$ celery  multi restart w1 -A proj -l INFO

停止:

# 停止,但不會等待工作程式關閉
$ celery multi stop w1 -A proj -l INFO

# 停止,並確保在退出之前已完成所有當前正在執行的任務
$ celery multi stopwait w1 -A proj -l INFO

詳情見 daemonization tutorial.

二、調用 Celery 任務

可以使用delay()函數調用任務:

>>> from proj.tasks import add
>>> add.delay(2, 2)

delay()方法實際上是apply_async()方法的快捷方式:

>>> add.apply_async((2, 2))

使用apply_async()方法可以指定選項,如運行時間,應發送到的隊列等:

>>> add.apply_async((2, 2), queue='lopri', countdown=10)
# 任務將被發送到名為的隊列中lopri,並且任務將最早在消息發送後10秒鐘執行

若直接應用任務將在當前進程中執行任務,不會發送任何消息:

>>> add(2, 2)
4

使用上述三種方法組成了 Celery 任務調用的API介面, Calling User Guide 中有更詳盡的描述。

三、查看任務狀態

每個任務調用都將被賦予一個唯一的標識符(UUID)即任務ID:

>>> res.id
d6b3aea2-fb9b-4ebc-8da4-848818db9114

result.get()出錯時默認情況下會拋出異常,傳遞propagate參數可以不拋出異常,而是返回一個異常對象

>>> res.get(propagate=False)
TypeError("unsupported operand type(s) for +: 'int' and 'str'")

查看任務執行結果:

>>> res.failed()
True

>>> res.successful()
False

>>> res.state
'FAILURE'
  • 關於查看任務狀態的詳細設置: States
  • 關於執行任務的詳細設置: Calling Guide

四、設計任務工作流

4.1 函數簽名

是有我們可能希望將一個 「任務調用」 傳遞給另一個進程,或者作為另一個函數的參數,於是Celery為此使用了一種稱為signature的函數。它包裝單個「 任務調用」 的參數和執行選項,這個 signature 可以傳遞給函數,甚至可以序列化並通過網路發送。

創建一個任務簽名:

>>> add.signature((2, 2), countdown=10)
tasks.add(2, 2)

# 快捷方式:
>>> add.s(2, 2)
tasks.add(2, 2)

一個簽名也是一個任務,也可以調用delayapply_async方法執行,區別在於簽名可能已經指定了參數簽名,完整的簽名可以直接執行:

>>> s1 = add.s(2, 2)
>>> res = s1.delay()
>>> res.get()
4

也可以創建不完整的簽名,在在調用簽名時補全其他參數(重複的參數會被新參數替代):

>>> s2 = add.s(2)
>>> res = s2.delay(8)
>>> res.get()
10

所以,創建一個函數簽名到底用來幹啥呢?下面要說的 canvas 原語會用到。

4.2 原語

所謂原語,一般是指由若干條指令組成的程式段,用來實現某個特定功能,在執行過程中不可被中斷。我們執行非同步任務時,也可能會遇到這樣的業務場景,即一組任務要麼全部成功,要麼全部失敗。 canvas 原語就是用來定義這一組任務的執行,通過多種方式組合它們以構成複雜的工作流程。

canvas 原語包括以下六種:

Groups

一個 group 會並行調用任務列表,返回一個特殊的結果實例,該實例讓我們可以將結果作為一個組進行檢查,並按順序檢索返回值。

>>> from celery import group
>>> from proj.tasks import add

>>> group(add.s(i, i) for i in range(10))().get()
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

使用不完整簽名函數的group:

>>> g = group(add.s(i) for i in range(10))
>>> g(10).get()
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

Chains

鏈式調用,前者的結果作為後者的一個輸入:

>>> from celery import chain
>>> from proj.tasks import add, mul

# (4 + 4) * 8
>>> chain(add.s(4, 4) | mul.s(8))().get()
64

partial chain:

>>> # (? + 4) * 8
>>> g = chain(add.s(4) | mul.s(8))
>>> g(4).get()
64

可以直接省略chain關鍵字:

>>> (add.s(4, 4) | mul.s(8))().get()
64

Chords

chords 用於調用一個 group 的結果:

>>> from celery import chord
>>> from proj.tasks import add, xsum

>>> chord((add.s(i, i) for i in range(10)), xsum.s())().get()
90

一個連接到 group 的 chain 會自動轉化為一個chord:

>>> (group(add.s(i, i) for i in range(10)) | xsum.s())().get()
90

由於原語都是用簽名函數,所以可以隨意組合,如:

>>> upload_document.s(file) | group(apply_filter.s() for filter in filters)

有關工作流程的更多資訊:Canvas

五、路由

Celery支援AMQP協議提供的所有路由功能,也支援簡單的路由規則,即:將消息發送的指定的隊列。

task_routes參數可以按名稱路由任務,並將所有內容集中在一個位置:

app.conf.update(
    task_routes = {
        'proj.tasks.add': {'queue': 'hipri'},
    },
)

然後,可以讓 worker進程 在指定的隊列中拿取任務:

$ celery -A proj worker -Q hipri

還可以為 worker進程 指定多個隊列,例如讓 worker進程 從 hipri隊列 和 默認隊列 中拿取任務(由於歷史原因,celery隊列就是默認隊列):

$ celery -A proj worker -Q hipri,celery

隊列的順序無關緊要,因為 worker進程 將給予隊列同等的權重,要了解有關路由的更多資訊,包括充分利用AMQP路由功能,參閱Routing Guide

六、遠程監控

如果使用RabbitMQ,Redis或Qpid作為代理,則可以在運行時監控 worker進程。

例如,可以查看worker進程當前正在執行的任務:

$ celery -A proj inspect active

這是通過使用廣播消息傳遞實現的,因此群集中的每個 worker 都將接收所有遠程控制命令。如果未提供目的地,那麼每個 worker 都會響應並回復請求,使用--destination選項指定一個或多個 worker 對請求執行操作,這是 worker 主機名的逗號分隔列表:

$ celery -A proj inspect active [email protected]

其他更多監控命令,參考Monitoring Guide

七、時區

Celery內部和消息默認使用UTC時區,當 worker 收到一條消息(例如設置了倒計時)時,它將該UTC時間轉換為本地時間。如果希望使用與系統時區不同的時區,則必須使用以下timezone設置進行配置:

app.conf.timezone = 'Asia/Shanghai'

八、進一步優化

默認配置未針對吞吐量進行優化,默認情況下,它嘗試在許多短期任務和較少的長期任務之間折衷,即吞吐量和公平調度間的折衷。

如果有嚴格的公平調度要求,或者要針對吞吐量進行優化,參閱Optimizing Guide

如果使用RabbitMQ,則可以安裝librabbitmq模組,這是用C實現的AMQP客戶端:

$ pip install librabbitmq