還在用crontab? 分散式定時任務了解一下
- 2021 年 3 月 8 日
- 筆記
前言
日常任務開放中,我們會有很多非同步、批量、定時、延遲任務要處理,go-zero中有 go-queue
,推薦使用 go-queue
去處理,go-queue
本身也是基於 go-zero
開發的,其本身是有兩種模式:
dq
:依賴於beanstalkd
,適合延時、定時任務執行;kq
:依賴於kafka
,適用於非同步、批量任務執行;
本篇就先從 dq
開始,慢慢探究 go-queue
背後執行的邏輯。
dq 簡介
dq
封裝底層 beanstalkd
操作,分散式存儲,延遲、定時設置。重啟服務可以重新執行,但是消息不會丟失,因為消息的處理都交由 beanstalkd
完成。
可以看出使用非常簡單,同時 dq
中使用了 redis setnx
保證了每個消息只被消費一次。但是在生產者端沒有使用 redis
做消息存儲,這個和前面描述的一致。
對 dq
的整體架構做了簡單介紹,下面就開始正式的探索 🔨
生產者 example
func main() {
producer := dq.NewProducer([]dq.Beanstalk{
{
Endpoint: "localhost:11300",
Tube: "tube",
},
{
Endpoint: "localhost:11301",
Tube: "tube",
},
})
for i := 1000; i < 1005; i++ {
// Delay:延遲執行
_, err := producer.Delay([]byte(strconv.Itoa(i)), time.Second*5)
// At:在某一個時刻執行
//_, err := producer.At([]byte(strconv.Itoa(i)), time.Now().Add(time.Second*5))
if err != nil {
fmt.Println(err)
}
}
}
從使用上,簡單分為兩步:
NewProducer(opts)
:將本地隊列的埠配置和主題配置傳入生產者;producer.Delay()
:使用剛創建好的 生產者,調用它的Delay()
。將需要非同步發送的消息傳入,Delay
還需要傳入延遲執行的時間。
需要說明的是:創建的 producer
是一個介面,Delay()
只是介面其中的一個方法。後續會其他的方法和內部設計。那我們就繼續往下探索吧~~~
深入生產者執行流程
下面從 example
的程式碼進去,看整個函數的調用鏈。
初始化
dq.NewProducer([]dq.Beanstalk{{opt1}, {opt2}, ...}) // 初始化生產者
|- NewProducerNode(endpoint, tube) // endpoint,tube 來自傳入的配置數組
緊接著就到 producerNode.go
,這個部分就會牽涉到 beanstalk
的初始化:
NewProducerNode(endpoint, tube)
|- conn: newConnection(endpoint, tube)
|- return &connection{}
這就涉及到 beanstalk
:connection.conn -> *beanstalk.Conn
。
但是在 newConnection()
中並沒有對 beanstalk.Conn
進行初始化,這屬於 延遲初始化
Delay
首先是生產者端調用 producer.Delay(data, timesecond)
,就把消息插入到內部隊列,timesecond
就是延遲執行的時間。我們來看看 Delay()
到底做了什麼?
p.Delay(data, timesecond)
|- p.wrap(data, time) // 將 data 和 time 包裝到一塊
|- p.insert(nodeFn)
|- node.Delay() // for rangre p.node 每一個node都執行一遍 `Delay()`
而 p.insert
就是將上一步封裝好的 data 傳遞給 p{cluster}
的每一個node去執行 node.Delay
。
在前面的 初始化 說過,最開始是沒有對 conn
進行初始化,那現在要插入數據,總不能不初始化這個 conn
。
node.Delay() // 配置中的每個node都執行 `Delay()`
|- node.conn.get() // 獲取node中的conn【conn==nil,就初始化一個conn】
|- _, err := conn.Put(data, deplay, opts...)
|- node.conn.reset() // 出現err情況下,如OOM/Timeout等情況 -> 關閉conn,防止泄漏
所以最後 Delay
實際上是執行 tube.Put(data, delay)
:
tube.Put(data, delay)
|- tube.Conn.cmd("put", ...) // 生產者發布job
這裡就涉及到 beanstalk
的 Put
操作:首先看看生產者 Put
指令參數說明:
put <pri> <delay> <ttr> <bytes> <data>
<pri>
:優先順序,值越小優先順序越高,默認為1024;<delay>
:延遲ready
秒數,在這段時間 job 為delayed
狀態;<ttr>
:time to run
,允許 worker 執行的最大秒數,如果 worker 在這段時間不能 delete,release,bury job,那麼當 job 超時,伺服器將自動 release 此job;<bytes>
:job body
的長度,不包含\r\n
;<data>
: job body data;
OK。那插入 job
成功,響應什麼呢?
INSERTED <id>\r\n
返回的 id
是插入 job
的任務標識。到此 Put
分析完畢,跟著程式碼走一遍:
tube.Put(data, priority, daley, ttr)
|- tube.Conn.cmd("put", ...)
|- tube.Conn.readResp("INSERTED id")
|- return id, err // 將id返回
這樣我們在 example
中直接可以看到的 生產者 執行的操作就介紹完了。上圖,圖更好說話:
producer interface
那麼除了 example
中使用的 Delay()
,還有其餘幾個方法:
Producer interface {
At(body []byte, at time.Time) (string, error)
Close() error
Delay(body []byte, delay time.Duration) (string, error)
Revoke(ids string) error
}
At
:指定某個時間執行【實質也是執行Delay()
】Close
:關閉全部node的連接Delay
:延遲執行。傳入延遲的時間。Revoke
:實質上是當出現最小寫入節點<2時,觸發添加失敗,將添加成功的job刪除掉。
當然,事實上 dq
使用上,開發者只需要使用 At/Delay
就行了。也就是你只要知道你的任務是定時觸發還是延遲觸發即可。剩下的,dq
內部的封裝都已經幫你做好了。
框架地址
//github.com/tal-tech/go-queue
同時在 go-queue
也大量使用 go-zero
的流式處理庫 fx
。
歡迎使用 go-queue
並 star 支援我們!一起構建 go-zero
生態!👍
go-zero 系列文章見『微服務實踐』公眾號