python—Celery非同步分散式
- 2020 年 1 月 6 日
- 筆記
一、Celery非同步分散式
Celery 是一個python開發的非同步分散式任務調度模組,是一個消息傳輸的中間件,可以理解為一個郵箱,每當應用程式調用celery的非同步任務時,會向broker傳遞消息,然後celery的worker從中取消息
Celery 用於存儲消息以及celery執行的一些消息和結果
對於brokers,官方推薦是rabbitmq和redis
對於backend,也就是指資料庫,為了簡單一般使用redis
使用redis連接url格式:
redis://:password@hostname:port/db_number
1)定義連接腳本tasks.py
#!/usr/bin/env python from celery import Celery broker = "redis://192.168.2.230:6379/1" backend = "redis://192.168.2.230:6379/2" app = Celery("tasks", broker=broker, backend=backend) @app.task def add(x,y): return x+y
2)安裝啟動celery
pip install celery
pip install redis
啟動方式:celery -A huang tasks -l info #-l 等同於 –loglevel
3)執行測試 huang.py
#!/usr/bin/env python from tasks import add re = add.delay(10,20) print(re.result) #任務返回值 print(re.ready) #如果任務被執行返回True,其他情況返回False print(re.get(timeout=1)) #帶參數的等待,最後返回結果 print(re.status) #任務當前狀態
運行結果:
30
<bound method AsyncResult.ready of <AsyncResult: d2e0a2d8-cdd9-4fe3-a8bb-81fe3c53ba9a>>
30
SUCCESS
4)根據成功返回的key或celery介面輸出的資訊,查看redis存儲
說明:停止celery服務,執行完huang.py之後,再啟動celery服務也是有保存數據的
二、celery多進程
1)配置文件 celeryconfig.py
#!/usr/bin/env python #-*- coding:utf-8 -*- from kombu import Exchange,Queue BROKER_URL = "redis://192.168.2.230:6379/3" CELERY_RESULT_BACKEND = "redis://192.168.2.230:6379/4" CELERY_QUEUES = ( Queue("default",Exchange("default"),routing_key="default"), Queue("for_task_A",Exchange("for_task_A"),routing_key="for_task_A"), Queue("for_task_B",Exchange("for_task_B"),routing_key="for_task_B") ) CELERY_ROUTES = { 'tasks.taskA':{"queue":"for_task_A","routing_key":"for_task_A"}, 'tasks.taskB':{"queue":"for_task_B","routing_key":"for_task_B"} }
2)tasks.py
#!/usr/bin/env python #-*- coding:utf-8 -*- from celery import Celery app = Celery() app.config_from_object("celeryconfig") @app.task def taskA(x,y): return x+y @app.task def taskB(x,y,z): return x+y+z
3)啟動celery
celery -A tasks worker –loglevel info
4)執行腳本huang2.py
#!/usr/bin/env python #-*- coding:utf-8 -*- from tasks import taskA,taskB re = taskA.delay(10,20) print(re.result) #任務返回值 print(re.ready) #如果任務被執行返回True,其他情況返回False print(re.get(timeout=1)) #帶參數的等待,最後返回結果 print(re.status) #任務當前狀態 re2 = taskB.delay(10,20,30) print(re2.result) print(re2.ready) print(re2.get(timeout=1)) print(re2.status)
5)運行結果
None
<bound method AsyncResult.ready of <AsyncResult: e34a8490-05a7-473e-a082-f4956cabfc99>>
30
SUCCESS
None
<bound method AsyncResult.ready of <AsyncResult: 3c5cd839-dbe2-4e63-ba4e-86e8c79d943f>>
60
SUCCESS