還在用crontab? 分散式定時任務了解一下

前言

日常任務開放中,我們會有很多非同步、批量、定時、延遲任務要處理,go-zero中有 go-queue,推薦使用 go-queue 去處理,go-queue 本身也是基於 go-zero 開發的,其本身是有兩種模式:

  • dq:依賴於 beanstalkd ,適合延時、定時任務執行;
  • kq:依賴於 kafka ,適用於非同步、批量任務執行;

本篇就先從 dq 開始,慢慢探究 go-queue 背後執行的邏輯。

dq 簡介

dq 封裝底層 beanstalkd 操作,分散式存儲,延遲、定時設置。重啟服務可以重新執行,但是消息不會丟失,因為消息的處理都交由 beanstalkd 完成。

可以看出使用非常簡單,同時 dq 中使用了 redis setnx 保證了每個消息只被消費一次。但是在生產者端沒有使用 redis 做消息存儲,這個和前面描述的一致。

dq 的整體架構做了簡單介紹,下面就開始正式的探索 🔨

生產者 example

func main() {
	producer := dq.NewProducer([]dq.Beanstalk{
		{
			Endpoint: "localhost:11300",
			Tube:     "tube",
		},
		{
			Endpoint: "localhost:11301",
			Tube:     "tube",
		},
	})
	for i := 1000; i < 1005; i++ {
    // Delay:延遲執行
		_, err := producer.Delay([]byte(strconv.Itoa(i)), time.Second*5)
    // At:在某一個時刻執行
		//_, err := producer.At([]byte(strconv.Itoa(i)), time.Now().Add(time.Second*5))
		if err != nil {
			fmt.Println(err)
		}
	}
}

從使用上,簡單分為兩步:

  1. NewProducer(opts):將本地隊列的埠配置和主題配置傳入生產者;
  2. producer.Delay():使用剛創建好的 生產者,調用它的 Delay() 。將需要非同步發送的消息傳入,Delay 還需要傳入延遲執行的時間。

需要說明的是:創建的 producer 是一個介面,Delay() 只是介面其中的一個方法。後續會其他的方法和內部設計。那我們就繼續往下探索吧~~~

深入生產者執行流程

下面從 example 的程式碼進去,看整個函數的調用鏈。

初始化

dq.NewProducer([]dq.Beanstalk{{opt1}, {opt2}, ...})	// 初始化生產者
	|- NewProducerNode(endpoint, tube)								// endpoint,tube 來自傳入的配置數組

緊接著就到 producerNode.go ,這個部分就會牽涉到 beanstalk 的初始化:

NewProducerNode(endpoint, tube)
	|- conn: newConnection(endpoint, tube)
		|- return &connection{}	

這就涉及到 beanstalkconnection.conn -> *beanstalk.Conn

但是在 newConnection() 中並沒有對 beanstalk.Conn 進行初始化,這屬於 延遲初始化

Delay

首先是生產者端調用 producer.Delay(data, timesecond) ,就把消息插入到內部隊列,timesecond 就是延遲執行的時間。我們來看看 Delay() 到底做了什麼?

p.Delay(data, timesecond)
	|- p.wrap(data, time)			// 將 data 和 time 包裝到一塊
		|- p.insert(nodeFn)
			|- node.Delay() 			// for rangre p.node 每一個node都執行一遍 `Delay()`

p.insert 就是將上一步封裝好的 data 傳遞給 p{cluster} 的每一個node去執行 node.Delay

在前面的 初始化 說過,最開始是沒有對 conn 進行初始化,那現在要插入數據,總不能不初始化這個 conn

node.Delay()									// 配置中的每個node都執行 `Delay()`
	|- node.conn.get()					// 獲取node中的conn【conn==nil,就初始化一個conn】
	|- _, err := conn.Put(data, deplay, opts...)
		|- node.conn.reset() 			// 出現err情況下,如OOM/Timeout等情況 -> 關閉conn,防止泄漏

所以最後 Delay 實際上是執行 tube.Put(data, delay)

tube.Put(data, delay)
	|- tube.Conn.cmd("put", ...)		// 生產者發布job

這裡就涉及到 beanstalkPut 操作:首先看看生產者 Put 指令參數說明:

put <pri> <delay> <ttr> <bytes> <data>
  • <pri> :優先順序,值越小優先順序越高,默認為1024;
  • <delay> :延遲 ready 秒數,在這段時間 job 為 delayed 狀態;
  • <ttr>time to run ,允許 worker 執行的最大秒數,如果 worker 在這段時間不能 delete,release,bury job,那麼當 job 超時,伺服器將自動 release 此job;
  • <bytes>job body的長度,不包含\r\n
  • <data>: job body data;

OK。那插入 job 成功,響應什麼呢?

INSERTED <id>\r\n

返回的 id 是插入 job 的任務標識。到此 Put 分析完畢,跟著程式碼走一遍:

tube.Put(data, priority, daley, ttr)
	|- tube.Conn.cmd("put", ...)
	|- tube.Conn.readResp("INSERTED id")
|- return id, err			// 將id返回

這樣我們在 example 中直接可以看到的 生產者 執行的操作就介紹完了。上圖,圖更好說話:

producer interface

那麼除了 example 中使用的 Delay() ,還有其餘幾個方法:

Producer interface {
  At(body []byte, at time.Time) (string, error)
  Close() error
  Delay(body []byte, delay time.Duration) (string, error)
  Revoke(ids string) error
}
  • At:指定某個時間執行【實質也是執行 Delay()
  • Close:關閉全部node的連接
  • Delay:延遲執行。傳入延遲的時間。
  • Revoke:實質上是當出現最小寫入節點<2時,觸發添加失敗,將添加成功的job刪除掉。

當然,事實上 dq 使用上,開發者只需要使用 At/Delay 就行了。也就是你只要知道你的任務是定時觸發還是延遲觸發即可。剩下的,dq 內部的封裝都已經幫你做好了。

框架地址

//github.com/tal-tech/go-queue

同時在 go-queue 也大量使用 go-zero 的流式處理庫 fx

//github.com/tal-tech/go-zero

歡迎使用 go-queuestar 支援我們!一起構建 go-zero 生態!👍

go-zero 系列文章見『微服務實踐』公眾號