python—Celery非同步分散式

一、Celery非同步分散式

Celery  是一個python開發的非同步分散式任務調度模組,是一個消息傳輸的中間件,可以理解為一個郵箱,每當應用程式調用celery的非同步任務時,會向broker傳遞消息,然後celery的worker從中取消息

Celery  用於存儲消息以及celery執行的一些消息和結果

對於brokers,官方推薦是rabbitmq和redis

對於backend,也就是指資料庫,為了簡單一般使用redis

使用redis連接url格式:

redis://:password@hostname:port/db_number

1)定義連接腳本tasks.py

#!/usr/bin/env python  from celery import Celery  broker = "redis://192.168.2.230:6379/1"  backend = "redis://192.168.2.230:6379/2"  app = Celery("tasks", broker=broker, backend=backend)    @app.task  def add(x,y):      return x+y

2)安裝啟動celery

pip install celery

pip install redis

啟動方式:celery -A huang tasks -l info  #-l 等同於 –loglevel

3)執行測試 huang.py 

#!/usr/bin/env python  from tasks import add    re = add.delay(10,20)    print(re.result)   #任務返回值  print(re.ready)     #如果任務被執行返回True,其他情況返回False    print(re.get(timeout=1))  #帶參數的等待,最後返回結果  print(re.status)  #任務當前狀態

運行結果:

30

<bound method AsyncResult.ready of <AsyncResult: d2e0a2d8-cdd9-4fe3-a8bb-81fe3c53ba9a>>

30

SUCCESS

4)根據成功返回的key或celery介面輸出的資訊,查看redis存儲

說明:停止celery服務,執行完huang.py之後,再啟動celery服務也是有保存數據的

二、celery多進程

1)配置文件 celeryconfig.py

#!/usr/bin/env python  #-*- coding:utf-8 -*-    from kombu import Exchange,Queue    BROKER_URL = "redis://192.168.2.230:6379/3"  CELERY_RESULT_BACKEND = "redis://192.168.2.230:6379/4"    CELERY_QUEUES = (  Queue("default",Exchange("default"),routing_key="default"),  Queue("for_task_A",Exchange("for_task_A"),routing_key="for_task_A"),  Queue("for_task_B",Exchange("for_task_B"),routing_key="for_task_B")  )    CELERY_ROUTES = {  'tasks.taskA':{"queue":"for_task_A","routing_key":"for_task_A"},  'tasks.taskB':{"queue":"for_task_B","routing_key":"for_task_B"}  }

2)tasks.py

#!/usr/bin/env python  #-*- coding:utf-8 -*-    from celery import Celery    app = Celery()  app.config_from_object("celeryconfig")    @app.task      def taskA(x,y):      return x+y        @app.task      def taskB(x,y,z):      return x+y+z

3)啟動celery

celery -A tasks worker –loglevel info

4)執行腳本huang2.py

#!/usr/bin/env python  #-*- coding:utf-8 -*-    from tasks import taskA,taskB    re = taskA.delay(10,20)    print(re.result)   #任務返回值  print(re.ready)     #如果任務被執行返回True,其他情況返回False  print(re.get(timeout=1))  #帶參數的等待,最後返回結果  print(re.status)  #任務當前狀態    re2 = taskB.delay(10,20,30)  print(re2.result)  print(re2.ready)  print(re2.get(timeout=1))  print(re2.status)

5)運行結果

None

<bound method AsyncResult.ready of <AsyncResult: e34a8490-05a7-473e-a082-f4956cabfc99>>

30

SUCCESS

None

<bound method AsyncResult.ready of <AsyncResult: 3c5cd839-dbe2-4e63-ba4e-86e8c79d943f>>

60

SUCCESS