輕量級消息隊列 Django-Q 輕度體驗

前言

最近做的這個項目(基於Django),需要做個功能,實現定時採集車輛定位。

這讓我想起來幾年前那個OneCat項目,當時我用的是Celery這個很重的組件

Celery實在是太重了,後來我做公眾號採集平台的時候,又接觸了Django-RQ和Django-Q這倆,前者是對RQ的封裝,讓RQ和Django更好的結合在一起;後者是一個全新的「多進程任務隊列」組件,相比起celery很輕量,當時使用的時候就給我留下不錯的印象。

於是這個項目我決定繼續使用Django-Q來實現一些非同步操作和定時任務。

關於Django-Q

官方介紹:

A multiprocessing task queue for Django

快速開始

安裝

pip install django-q

添加到 INSTALLED_APPS

INSTALLED_APPS = (
    # other apps
    'django_q',
)

資料庫遷移

由於Django-Q會把執行結果放到資料庫里,所以要執行一下資料庫遷移的操作

python manage.py migrate

這個操作會生成 django_q_ormqdjango_q_scheduledjango_q_task 三個表

配置

因為本身項目用的快取就是Redis,所以我直接用Redis作為消息隊列的後端(broker)

Django-Q支援很多種後端,除了Redis還有Disque、IronMQ、Amazon SQS、MongoDB或者是Django的ORM~

settings.py 中添加以下配置:

Q_CLUSTER = {
    'name': 'project_name',
    'workers': 4,
    'recycle': 500,
    'timeout': 60,
    'compress': True,
    'cpu_affinity': 1,
    'save_limit': 250,
    'queue_limit': 500,
    'label': 'Django Q',
    'redis': {
        'host': 127.0.0.1',
        'port': 6379,
        'db': 0,
    }
}

啟動服務

python manage.py qcluster

搞定,現在消息隊列服務已經跑起來了

我們可以添加非同步任務或者定時任務

非同步任務

最簡單的方式是使用它提供的 async_task 方法,添加一個新的非同步任務到隊列中

來寫個例子,輸入一個數,求階乘之後開平方

import math

def demo_task(number: int):
    return math.sqrt(math.factorial(number))

啟動任務

然後來添加一個非同步任務

from django_q.tasks import async_task, Task

def task_finish(task: Task):
    print(f'任務 {task.name}(ID:{task.id})完成!')

task_id = async_task(
    demo_task, 10,
    task_name='任務名稱',
    hook=task_finish,
)

可以看到,直接調用 async_task 方法就行

這個方法的定義是

async_task(func: Any, *args: Any, **kwargs: Any)

傳入要非同步執行的方法之後,可以把該方法的參數跟在後面傳進去,也可以用 kwargs 的方式傳入

這兩種方式都可以的:

  • async_task(demo_task, 10)
  • async_task(demo_task, number=10)

我個人比較喜歡第一種,因為Django-Q本身有幾個命名參數,比如 task_namehooktimeout之類的,用第一種方式傳參不容易和Django-Q默認的命名參數衝突。

獲取執行結果

有兩種方式獲取任務的執行結果:

  • admin後台
  • 使用 result 方法,在程式碼中獲取

第一種方式無需贅述,在安裝Django-Q組件後執行了資料庫遷移,就會生成 Failed tasksScheduled tasksSuccessful tasks 三個admin模組,顧名思義,在 Failed tasksSuccessful tasks 中可以看到任務的執行結果,也就是我們寫在 demo_task 里的返回值。

第二種方式,程式碼如下:

from django_q.tasks import result

task_result = result(task_id)

task_id 傳入就可以查詢任務執行的結果,如果任務還沒執行完,那結果就是 None

這個 result 方法還有個 wait 參數,可以設置等待時間,單位是毫秒

執行完成回調

上面程式碼中,我們還設置了 hook 參數

作用就是任務執行完成之後,執行 task_finish 這個函數

task_finish 里可以通過 task 參數獲取任務資訊

就是這樣~

async_task 的其他參數

創建非同步任務的這個方法還有很多參數,官網文檔寫得還算可以,很多參數都是 Q_CLUSTER 配置裡面有的,在 async_task 里設置這些參數就會覆蓋默認的配置。

我直接搬運一波,權當翻譯文檔了~

除了上面介紹到的 task_namehook 還有這些參數:

  • group: str 任務的分組名稱
  • save 配置任務運行結果的存儲後端,不過文檔里只是一句話的介紹,具體如何配置還得研究一下。(稍微看了一下源碼,沒搞懂,動態語言太折磨人了)
  • timeout: int 任務超時時間,單位是秒。回顧一下前面的 Q_CLUSTER 配置,裡面有 timeout 配置,設置這個參數可以覆蓋前面的配置,如果任務運行超出了這個時間,就會被直接終止。
  • ack_failures: bool 設置為True時,也承認失敗的任務。這會導致失敗的任務被視為成功交付,從而將其從任務隊列中刪除。默認值為False。(說實話我沒看懂是啥意思)
  • sync: bool 設置為True的時候,所有非同步任務會變成同步執行,這個功能在測試的時候比較有用。默認是False。
  • cached 這個參數既可以設置為True,也可以傳入數字,代表快取過期時間。根據文檔描述,非同步任務的執行結果會存在資料庫里,當這個參數設置為True的時候,結果不寫入資料庫,而是保存在快取里。這個功能在短時間內要大量執行非同步任務,且不需要把結果立刻寫入資料庫的情況下比較有用,可以提高性能。
  • broker 需要傳入一個 Broker 對象的實例,用來控制這個非同步任務在哪個Broker里執行。
  • q_options: dict 這是最後一個參數了。我下面單獨介紹一下

q_options 參數

根據前面啟動任務的部分,我們啟動非同步任務的時候,可以通過命名參數向任務方法傳遞參數,比如:

async_task(demo_task, number=10)

async_task 這個方法本身又有很多參數,如果這個參數名稱和我們要執行的任務 demo_task 參數重名的話,這些參數就被 async_task 拿走了,我們的任務 demo_task 就拿不到這些參數了。

怎麼辦?

q_options 參數就是為了解決這個問題

可以把要傳給 async_task 的參數都包裝在一個 dict 裡面,然後通過 q_options 參數傳入

假如我們的 demo_task 是這樣的:

def demo_task(number: int, timeout: int):
  ...

除了 number 這個參數,還要接收一個跟 async_task 自有參數重名的 timeout 參數,使用 q_options 的解決方案如下

opts = {
    'hook': 'hooks.print_result',
    'group': 'math',
    'timeout': 30
}

async_task(demo_task, number=10, timeout=100, q_options=opts)

這樣既能……又能……,完美啊~

當然我還是建議用 *args 的方式傳參,這樣就沒有參數重名的問題了。

定時任務

有兩種方式添加定時任務

  • 在程式碼添加
  • admin後台

在程式碼中添加

比較簡單,直接上程式碼

from django_q.tasks import schedule

schedule(
  'demo_task',
  schedule_type=Schedule.MINUTES,
  minutes=1,
  task_name='任務名稱',
)

有一點注意的是,因為添加後的定時任務是要保存在資料庫中的

所以需要把要執行的方法(包含完整包名),以字元串的形式傳入

假如在我們的Django項目中,要執行的是在 apps/test/tasks.py 文件中的 demo_task 方法

那麼需要把 apps.test.tasks.demo_task 這個完整的名稱傳入

在admin中添加也是一樣

時間間隔設置

Django-Q的定時任務有很多類型:

  • 一次性
  • 按x分鐘執行一次
  • 每小時一次
  • 每天
  • 每周
  • 每月
  • 每季度
  • 每年
  • Cron表達式

注意,即使是Cron表達式,定時任務執行的最短間隔也是1分鐘

這點我一開始不知道,用Cron表達式寫了個15秒的任務,但執行時間根本不對,然後我翻了一下github上的issues,看到作者的解答才知道~

那個Issues的地址://github.com/Koed00/django-q/issues/179

作者的回復:

The current design has a heartbeat of 30 seconds, which means the schedule table can’t have schedules below that. Most of this is explained in the architecture docs. Because of the way the internal loop is set up, a resolution under a dozen seconds or so, quickly becomes unreliable.

I always imagined tasks that need accuracy measured in seconds, would use a delayed tasks strategy where a few seconds delay is either added through the broker or inside the task itself.

The problem with all this, is that a task is currently always added to the back of the queue.
So even with a 1 second resolution on the schedule, the task still has to wait it’s execution time. Which can of course vary wildly depending on the broker type, worker capacity and current workload.

這點感覺有些雞肋,如果要高頻執行的任務,那隻能選擇Celery了

在admin後台添加

這個更簡單,傻瓜式操作

所以這部分略過了~

docker部署

現在後端服務基本是用docker部署的

為了能在docker中使用Django-Q

我們需要在原有Django容器的基礎上,再起一個同樣的容器,然後入口改成qcluster的啟動命令

這裡有個issues也有討論這個問題://github.com/Koed00/django-q/issues/513

來個 docker-compose.yml 的例子

version: "3.9"
services:  
  redis:
    image: redis:alpine
    ports:
      - 6379:6379
  web:
    build: .
    command: python manage.py runserver 0.0.0.0:8000
    volumes:
      - .:/code
    ports:
      - "8000:8000"
    depends_on:
      - redis
      - django_q
  django_q:
    build: .
    command: python manage.py qcluster
    volumes:
      - .:/code
    depends_on:
      - redis

一個簡單的例子

其他的類似環境變數這些,根據實際情況來

注意:

  • Django容器有的東西(環境變數、依賴),Django-Q也要同步加進去
  • Django項目程式碼修改之後,如果是通過uwsgi之類的自動重啟服務,那要注意Django-Q不會自動重啟,需要手動執行 docker-compose restart django_q ,才能使修改的程式碼生效

其他

命令行工具

Django-Q還提供了一些命令行工具

  • 監控cluster執行情況:python manage.py qmonitor
  • 監控內容:python manage.py qmemory
  • 查看當前狀態資訊:python manage.py qinfo

除了使用命令監控,還可以在程式碼里做監控,不過我暫時沒用到,所以還沒研究,有需要的同學可以直接看文檔

admin自定義

安裝完Django-Q後,會在admin出現三個菜單,跟普通的Django app一樣,這些也是通過 admin 註冊進去的,因此我們可以重新註冊這些 ModelAdmin 來自定義admin上的操作介面

來一段官方關於失敗任務介面的程式碼:

from django_q import models as q_models
from django_q import admin as q_admin

admin.site.unregister([q_models.Failure])
@admin.register(q_models.Failure)
class ChildClassAdmin(q_admin.FailAdmin):
    list_display = (
        'name',
        'func',
        'result',
        'started',
        # add attempt_count to list_display
        'attempt_count'
    )

跟普通的 ModelAdmin 是一樣的

我們可以自行添加搜索框、過濾欄位之類的。記得要先執行 admin.site.unregister([q_models.Failure]) 取消之前Django-Q自己註冊的 ModelAdmin 對象。

訊號

Django內置訊號系統,我之前有寫過一篇簡單的文章介紹:3分鐘看懂Python後端必須知道的Django的訊號機制

Django-Q提供了兩類訊號:

  • 任務加入消息隊列前
  • 任務執行前

例子程式碼如下:

from django.dispatch import receiver
from django_q.signals import pre_enqueue, pre_execute

@receiver(pre_enqueue)
def my_pre_enqueue_callback(sender, task, **kwargs):
    print("Task {} will be enqueued".format(task["name"]))

@receiver(pre_execute)
def my_pre_execute_callback(sender, func, task, **kwargs):
    print("Task {} will be executed by calling {}".format(
          task["name"], func))

有需要的話可以註冊消息接收器,做一些處理。(不過我暫時是沒用上)

小結

搞定~

Django-Q使用下來的體驗還是不錯的,足夠輕量,部署足夠方便,足以應付大部分場景了~

參考資料