分佈式任務隊列–Celery的學習筆記
- 2019 年 10 月 3 日
- 筆記
一、Celery簡介
Celery是一個簡單,靈活,可靠的分佈式系統,用於處理大量消息,同時為操作提供維護此類系統所需的工具。它是一個任務隊列,專註於實時處理,同時還支持任務調度。
所謂任務隊列,是一個邏輯上的概念,可以將抽象中的任務發送到指定的執行任務的組件,任務隊列可以跨線程或機器運行。
Celery是基於Python開發的分佈式異步消息任務隊列,通過它可以輕鬆的實現任務的異步處理, 如果你的業務場景中需要用到異步任務,就可以考慮使用celery。
二、Celery使用場景
1.高並發的請求任務,比如需要發送大量請求的網絡爬蟲,就可以使用Celery來加速爬取。
2.異步任務,將耗時的操作交給Celery來完成,比如發送/接收郵件、消息推送等等。
3.定時任務,需要定時運行的程序,比如每天定時執行爬蟲爬取數據。
三、Celery架構
下圖是我找到的一張表示Celery架構的圖:
任務生產者:產生任務並且把任務提交到任務隊列的就是任務生產者。
任務調度Beat:Celery會根據配置文件對任務進行調配,可以按一定時間間隔周期性地執行某些任務。
中間人Broker:Celery使用消息進行通信,需要中間人在客戶端和Worker之間進行傳遞,接收客戶端發送過來的任務,並將任務分配給Worker。
在Celery的文檔中,可以找到官方給出的實現Broker的工具有:
名稱 | 狀態 | 監控 | 遠程控制 |
RabbitMQ | 穩定 | 是 | 是 |
Redis | 穩定 | 是 | 是 |
Amazon SQS | 穩定 | 否 | 否 |
Zookeeper | 實驗性 | 否 | 否 |
消費者Worker:Worker是執行任務的單元,在Celery任務隊列中屬於消費者。Worker會不斷地監聽隊列,一旦有任務添加進來,就會將任務取出來進行執行。Worker還可以運行在多台機器上,只要它們都指向同一個Broker就可以。
結果存儲Backend:結果存儲Backend,顧名思義就是將Worker執行後得到的結果存儲起來。Celery中有幾個內置的結果存儲可供選擇,包括SQLAlchemy / Django ORM、Redis、RabbitMQ、Mamcached等。
四、Celery安裝
Celery4.0版本是支持Python2.7的最後一個版本,所以如果你還在用py2的話,可能要選擇安裝Celery3或者更早的版本。我本人用的Python版本是Python3.7,然後安裝的Celery版本是4.3。安裝的話使用pip安裝就好:
pip install celery
如果pip安裝出錯的話,可以去這個網址進行下載。在使用pip安裝的時候會自動安裝一些相關依賴,如果這些依賴安裝出錯的話,搜一下相應版本的Wheel文件下載安裝即可。
中間件Broker我選擇使用的是Redis,這裡就不說Redis怎麼安裝了,上一篇博客中有Ubuntu下安裝Redis的介紹。
五、Celery使用示例
1.應用
在使用Celery的時候,第一件事是要創建一個Celery實例,一般稱之為應用,簡稱為app。創建一個test.py,其中代碼如下:
1 from celery import Celery 2 3 4 app = Celery("test", broker="redis://127.0.0.1:6379", backend="redis://127.0.0.1:6379") 5 6 7 @app.task 8 def add(x, y): 9 return x + y
2.運行Celery服務器
在創建好應用之後,就可以使用Celery命令執行程序運行Worker了:
celery -A test worker -l info
運行後可以看到如下圖:
有關可用命令行選項的完整列表,執行如下命令:
celery worker –help
3.調用任務
要調用任務,可以使用delay()方法。
該任務會返回一個AsyncResult實例,可用於查詢任務狀態、獲取任務返回值等。此時查看前面運行的服務器,會看到有如下信息:
Received task: test.add[e7f01461-8c4d-4c29-ab6b-27be5084ecd9]
Task test.add[e7f01461-8c4d-4c29-ab6b-27be5084ecd9] succeeded in 0.006505205000166825s: 5
4.查看結果
在前面定義的時候,已經選擇使用Redis作為結果後端了,所以任務執行後的結果會保存到Redis中。而且,在調用任務的時候,還可以進行如下操作:
其中ready()方法會返回該任務是否已經執行,get()方法則會獲取任務返回的結果。
5.配置文件
由於Celery的配置信息比較多,因此一般會創建一個配置文件來保存這些配置信息,通常會命名為celeryconfig.py。在test.py所在文件夾下新建配置文件celeryconfig.py,其中的代碼如下:
1 # broker(消息中間件來接收和發送任務消息) 2 BROKER_URL = 'redis://127.0.0.1:6379' 3 # backend(存儲worker執行的結果) 4 CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379' 5 6 # 設置時間參照,不設置默認使用的UTC時間 7 CELERY_TIMEZONE = 'Asia/Shanghai' 8 # 指定任務的序列化 9 CELERY_TASK_SERIALIZER = 'json' 10 # 指定執行結果的序列化 11 CELERY_RESULT_SERIALIZER = 'json'
然後修改下test.py中的代碼:
1 from celery import Celery 2 3 4 app = Celery("test") 5 app.config_from_object("celerystudy.celeryconfig") 6 7 8 @app.task 9 def add(x, y): 10 return x + y