Celery:進一步探索
- 2021 年 3 月 9 日
- 筆記
- Celery, Middleware, Python
一、創建Celery專用模組
對於大型項目,一般需要創建一個專用模組,便於管理。
1.1 模組結構
proj/__init__.py
/celery.py
/tasks.py
proj/celery.py
from celery import Celery
app = Celery('proj',
broker='amqp://',
backend='rpc://',
include=['proj.tasks'])
app.conf.update(
result_expires=3600,
)
# include參數是 worker 進程啟動時要導入的模組的列表。需要在此處添加我們的任務模組,以便 workers 能夠找到我們的任務。
proj/tasks.py
from .celery import app
@app.task
def add(x, y):
return x + y
@app.task
def mul(x, y):
return x * y
@app.task
def xsum(numbers):
return sum(numbers)
1.2 啟動 worker 進程
$ celery -A proj worker -l INFO
- Concurrency:是用於同時處理任務的最大工作進程數,默認是CPU核心數,詳情見
celery worker -c
。Celery還支援使用Eventlet,Gevent並在單個執行緒中運行(請參閱Concurrency) - task events:是否發送Celery的監控資訊,用於如Flower之類的實時監控程式,詳見Monitoring and Management guide.
- queues:worker進程可以拿取任務的隊列集合,可以規定worker一次從多個隊列中拿取任務,訂製更高級的生產者消費者模型時參見Routing Guide、Workers Guide.
1.3 停止 worker 進程
當worker進程已經在前台運行時,使用 Control-c 即可停止運行。
1.4 後台運行 worker 進程
在生產環境,一般將Celery worker在後台運行,且使用celery multi
命令在後台啟動一個或多個worker進程。
啟動:
$ celery multi start w1 -A proj -l INFO
重啟:
$ celery multi restart w1 -A proj -l INFO
停止:
# 停止,但不會等待工作程式關閉
$ celery multi stop w1 -A proj -l INFO
# 停止,並確保在退出之前已完成所有當前正在執行的任務
$ celery multi stopwait w1 -A proj -l INFO
二、調用 Celery 任務
可以使用delay()
函數調用任務:
>>> from proj.tasks import add
>>> add.delay(2, 2)
delay()
方法實際上是apply_async()
方法的快捷方式:
>>> add.apply_async((2, 2))
使用apply_async()
方法可以指定選項,如運行時間,應發送到的隊列等:
>>> add.apply_async((2, 2), queue='lopri', countdown=10)
# 任務將被發送到名為的隊列中lopri,並且任務將最早在消息發送後10秒鐘執行
若直接應用任務將在當前進程中執行任務,不會發送任何消息:
>>> add(2, 2)
4
使用上述三種方法組成了 Celery 任務調用的API介面, Calling User Guide 中有更詳盡的描述。
三、查看任務狀態
每個任務調用都將被賦予一個唯一的標識符(UUID)即任務ID:
>>> res.id
d6b3aea2-fb9b-4ebc-8da4-848818db9114
result.get()
出錯時默認情況下會拋出異常,傳遞propagate
參數可以不拋出異常,而是返回一個異常對象
>>> res.get(propagate=False)
TypeError("unsupported operand type(s) for +: 'int' and 'str'")
查看任務執行結果:
>>> res.failed()
True
>>> res.successful()
False
>>> res.state
'FAILURE'
- 關於查看任務狀態的詳細設置: States
- 關於執行任務的詳細設置: Calling Guide
四、設計任務工作流
4.1 函數簽名
是有我們可能希望將一個 「任務調用」 傳遞給另一個進程,或者作為另一個函數的參數,於是Celery為此使用了一種稱為signature的函數。它包裝單個「 任務調用」 的參數和執行選項,這個 signature 可以傳遞給函數,甚至可以序列化並通過網路發送。
創建一個任務簽名:
>>> add.signature((2, 2), countdown=10)
tasks.add(2, 2)
# 快捷方式:
>>> add.s(2, 2)
tasks.add(2, 2)
一個簽名也是一個任務,也可以調用delay
和apply_async
方法執行,區別在於簽名可能已經指定了參數簽名,完整的簽名可以直接執行:
>>> s1 = add.s(2, 2)
>>> res = s1.delay()
>>> res.get()
4
也可以創建不完整的簽名,在在調用簽名時補全其他參數(重複的參數會被新參數替代):
>>> s2 = add.s(2)
>>> res = s2.delay(8)
>>> res.get()
10
所以,創建一個函數簽名到底用來幹啥呢?下面要說的 canvas 原語會用到。
4.2 原語
所謂原語,一般是指由若干條指令組成的程式段,用來實現某個特定功能,在執行過程中不可被中斷。我們執行非同步任務時,也可能會遇到這樣的業務場景,即一組任務要麼全部成功,要麼全部失敗。 canvas 原語就是用來定義這一組任務的執行,通過多種方式組合它們以構成複雜的工作流程。
canvas 原語包括以下六種:
Groups
一個 group 會並行調用任務列表,返回一個特殊的結果實例,該實例讓我們可以將結果作為一個組進行檢查,並按順序檢索返回值。
>>> from celery import group
>>> from proj.tasks import add
>>> group(add.s(i, i) for i in range(10))().get()
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
使用不完整簽名函數的group:
>>> g = group(add.s(i) for i in range(10))
>>> g(10).get()
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
Chains
鏈式調用,前者的結果作為後者的一個輸入:
>>> from celery import chain
>>> from proj.tasks import add, mul
# (4 + 4) * 8
>>> chain(add.s(4, 4) | mul.s(8))().get()
64
partial chain:
>>> # (? + 4) * 8
>>> g = chain(add.s(4) | mul.s(8))
>>> g(4).get()
64
可以直接省略chain關鍵字:
>>> (add.s(4, 4) | mul.s(8))().get()
64
Chords
chords 用於調用一個 group 的結果:
>>> from celery import chord
>>> from proj.tasks import add, xsum
>>> chord((add.s(i, i) for i in range(10)), xsum.s())().get()
90
一個連接到 group 的 chain 會自動轉化為一個chord:
>>> (group(add.s(i, i) for i in range(10)) | xsum.s())().get()
90
由於原語都是用簽名函數,所以可以隨意組合,如:
>>> upload_document.s(file) | group(apply_filter.s() for filter in filters)
有關工作流程的更多資訊:Canvas
五、路由
Celery支援AMQP協議提供的所有路由功能,也支援簡單的路由規則,即:將消息發送的指定的隊列。
task_routes
參數可以按名稱路由任務,並將所有內容集中在一個位置:
app.conf.update(
task_routes = {
'proj.tasks.add': {'queue': 'hipri'},
},
)
然後,可以讓 worker進程 在指定的隊列中拿取任務:
$ celery -A proj worker -Q hipri
還可以為 worker進程 指定多個隊列,例如讓 worker進程 從 hipri隊列 和 默認隊列 中拿取任務(由於歷史原因,celery隊列就是默認隊列):
$ celery -A proj worker -Q hipri,celery
隊列的順序無關緊要,因為 worker進程 將給予隊列同等的權重,要了解有關路由的更多資訊,包括充分利用AMQP路由功能,參閱Routing Guide。
六、遠程監控
如果使用RabbitMQ,Redis或Qpid作為代理,則可以在運行時監控 worker進程。
例如,可以查看worker進程當前正在執行的任務:
$ celery -A proj inspect active
這是通過使用廣播消息傳遞實現的,因此群集中的每個 worker 都將接收所有遠程控制命令。如果未提供目的地,那麼每個 worker 都會響應並回復請求,使用--destination
選項指定一個或多個 worker 對請求執行操作,這是 worker 主機名的逗號分隔列表:
$ celery -A proj inspect active [email protected]
其他更多監控命令,參考Monitoring Guide
七、時區
Celery內部和消息默認使用UTC時區,當 worker 收到一條消息(例如設置了倒計時)時,它將該UTC時間轉換為本地時間。如果希望使用與系統時區不同的時區,則必須使用以下timezone
設置進行配置:
app.conf.timezone = 'Asia/Shanghai'
八、進一步優化
默認配置未針對吞吐量進行優化,默認情況下,它嘗試在許多短期任務和較少的長期任務之間折衷,即吞吐量和公平調度間的折衷。
如果有嚴格的公平調度要求,或者要針對吞吐量進行優化,參閱Optimizing Guide。
如果使用RabbitMQ,則可以安裝librabbitmq模組,這是用C實現的AMQP客戶端:
$ pip install librabbitmq