剖析nsq消息隊列(一) 簡介及去中心化實現原理
- 2019 年 10 月 3 日
- 筆記
分散式消息隊列nsq,簡單易用,去中心化的設計使nsq
更健壯,nsq
充分利用了go
語言的goroutine
和channel
來實現的消息處理,程式碼量也不大,讀不了多久就沒了。後期的文章我會把nsq
的源碼分析給大家看。
主要的分析路線如下
- 分析
nsq
的整體框架結構,分析如何做到的無中心化分散式拓撲結構,如何處理的單點故障。 - 分析
nsq
是如何保證消息的可靠性,如何保證消息的處理,對於消息的持久化是如何處理和擴展的。 - 分析
nsq
是如何做的消息的負載處理,即如何把合理的、不超過客戶端消費能力的情況下,把消息分發到不同的客戶端。 - 分析
nsq
提供的一些輔助組件。
這篇帖子,介紹nsq
的主體結構,以及他是如何做到去中心化的分散式拓撲結構,如何處理的單點故障。
幾個組件是需要先大概說一下
nsqd
消息隊列的主體,對消息的接收,處理和把消息分發到客戶端。
nsqlookupd
nsq
拓撲結構資訊的管理者,有了他才能組成一個簡單易用的無中心化的分散式拓撲網路結構。
go-nsq
nsq
官方的go語言客戶端,基本上市面上的主流程式語言都有相應的客戶端在這裡
還有可視化的組件nsqadmin
和一些工具像nsq_to_file
、nsq_stat
、等等,這些在後期的帖子里會介紹
使用方式
兩種方式一種是直接連接另一種是通過nsqlookupd
進行連接
直連方式
nsqd
是獨立運行的,我們可以直接使用部署幾個nsqd
然後使用客戶端直連的方式使用
例子
目前資源有限,我就都在一台機器上模擬了
啟動兩個nsqd
./nsqd -tcp-address ":8000" -http-address ":8001" -data-path=./a
./nsqd -tcp-address ":7000" -http-address ":7001" -data-path=./b
正常啟動會有類似下面的輸出
[nsqd] 2019/08/29 18:42:56.928345 INFO: nsqd v1.1.1-alpha (built w/go1.12.7) [nsqd] 2019/08/29 18:42:56.928512 INFO: ID: 538 [nsqd] 2019/08/29 18:42:56.928856 INFO: NSQ: persisting topic/channel metadata to b/nsqd.dat [nsqd] 2019/08/29 18:42:56.935797 INFO: TCP: listening on [::]:7000 [nsqd] 2019/08/29 18:42:56.935891 INFO: HTTP: listening on [::]:7001
簡單使用
func main() { adds := []string{"127.0.0.1:7000", "127.0.0.1:8000"} config := nsq.NewConfig() topicName := "testTopic1" c, _ := nsq.NewConsumer(topicName, "ch1", config) testHandler := &MyTestHandler{consumer: c} c.AddHandler(testHandler) if err := c.ConnectToNSQDs(adds); err != nil { panic(err) } stats := c.Stats() if stats.Connections == 0 { panic("stats report 0 connections (should be > 0)") } stop := make(chan os.Signal) signal.Notify(stop, os.Interrupt) fmt.Println("server is running....") <-stop } type MyTestHandler struct { consumer *nsq.Consumer } func (m MyTestHandler) HandleMessage(message *nsq.Message) error { fmt.Println(string(message.Body)) return nil }
方法 c.ConnectToNSQDs(adds)
,連接多個nsqd
服務
然後運行多個客戶端實現
這時,我們發送一個消息,
curl -d 'hello world 2' 'http://127.0.0.1:7001/pub?topic=testTopic1'
nsqd會根據他的演算法,把消息分配到一個客戶端
客戶端的輸入如下
2019/08/30 12:05:32 INF 1 [testTopic1/ch1] (127.0.0.1:7000) connecting to nsqd 2019/08/30 12:05:32 INF 1 [testTopic1/ch1] (127.0.0.1:8000) connecting to nsqd server is running.... hello world 2
但是這種做的話,需要客戶端做一些額外的工作,需要頻繁的去檢查所有nsqd
的狀態,如果發現出現問題需要客戶端主動去處理這些問題。
總結
我使用的客戶端庫是官方庫 go-nsq
,使用直接連nsqd
的方式,
- 如果有
nsqd
出現問題,現在的處理方式,他會每隔一段時間執行一次重連操作。想去掉這個連接資訊就要額外做一些處理了。 - 如果對
nsqd
進行橫向擴充,只能是自己民額外的寫一些程式碼調用ConnectToNSQDs
或者ConnectToNSQD
方法
去中心化連接方式 nsqlookupd
官方推薦使用連接nsqlookupd
的方式,nsqlookupd
用於做服務的註冊和發現,這樣可以做到去中心化。
圖中我們運行著多個nsqd
和多個nsqlookupd
的實例,客戶端去連接nsqlookupd
來操作nsqd
例子
我們要先啟動nsqlookupd
,為了演示方便,我啟動兩個nsqlookupd
實例, 三個nsqd
實例
./nsqlookupd -tcp-address ":8200" -http-address ":8201"
./nsqlookupd -tcp-address ":7200" -http-address ":7201"
為了演示橫向擴充,先啟動兩個,客戶端連接後,再啟動第三個。
./nsqd -tcp-address ":8000" -http-address ":8001" --lookupd-tcp-address=127.0.0.1:8200 --lookupd-tcp-address=127.0.0.1:7200 -data-path=./a
./nsqd -tcp-address ":7000" -http-address ":7001" --lookupd-tcp-address=127.0.0.1:8200 --lookupd-tcp-address=127.0.0.1:7200 -data-path=./b
--lookupd-tcp-address
用於指定lookup
的連接地址
客戶端簡單程式碼
package main import ( "fmt" "os" "os/signal" "time" "github.com/nsqio/go-nsq" ) func main() { adds := []string{"127.0.0.1:7201", "127.0.0.1:8201"} config := nsq.NewConfig() config.MaxInFlight = 1000 config.MaxBackoffDuration = 5 * time.Second config.DialTimeout = 10 * time.Second topicName := "testTopic1" c, _ := nsq.NewConsumer(topicName, "ch1", config) testHandler := &MyTestHandler{consumer: c} c.AddHandler(testHandler) if err := c.ConnectToNSQLookupds(adds); err != nil { panic(err) } stop := make(chan os.Signal) signal.Notify(stop, os.Interrupt) fmt.Println("server is running....") <-stop } type MyTestHandler struct { consumer *nsq.Consumer } func (m MyTestHandler) HandleMessage(message *nsq.Message) error { fmt.Println(string(message.Body)) return nil }
方法ConnectToNSQLookupds
就是用於連接nsqlookupd
的,但是需要注意的是,連接的是http
埠7201
和8201
,庫go-nsq
是通過請求其中一個nsqlookupd
的 http 方法http://127.0.0.1:7201/lookup?topic=testTopic1
來得到所有提供topic=testTopic1
的nsqd
列表資訊,然後對所有的nsqd進行
連接,
2019/08/30 13:47:26 INF 1 [testTopic1/ch1] querying nsqlookupd http://127.0.0.1:7201/lookup?topic=testTopic1 2019/08/30 13:47:26 INF 1 [testTopic1/ch1] (li-peng-mc-macbook.local:7000) connecting to nsqd 2019/08/30 13:47:26 INF 1 [testTopic1/ch1] (li-peng-mc-macbook.local:8000) connecting to nsqd
目前我們已經連接了兩個。
我們演示一下橫向擴充,啟動第三個nsqd
./nsqd -tcp-address ":6000" -http-address ":6001" --lookupd-tcp-address=127.0.0.1:8200 --lookupd-tcp-address=127.0.0.1:7200 -data-path=./c
這裡會有一個問題,當我啟動了一個親的nsqd
但是他的topic是空的,我們需指定這新的nsqd
處理哪些topic。
我們可以用nsqadmin
查看所有的topic
./nsqadmin --lookupd-http-address=127.0.0.1:8201 --lookupd-http-address=127.0.0.1:7201
然後去你的nsqd
上去建topic
curl -X POST 'http://127.0.0.1:6001/topic/create?topic=testTopic1'
當然也可以自己寫一些自動化的角本
查看客戶端的日誌輸出
2019/08/30 14:56:01 INF 1 [testTopic1/ch1] querying nsqlookupd http://127.0.0.1:7201/lookup?topic=testTopic1 2019/08/30 14:56:01 INF 1 [testTopic1/ch1] (li-peng-mc-macbook.local:6000) connecting to nsqd
已經連上我們的新nsqd
了
我手動關閉一個nsqd
實例
客戶端的日誌輸出已經斷開了連接
2019/08/30 15:04:20 ERR 1 [testTopic1/ch1] (li-peng-mc-macbook.local:8000) IO error - EOF 2019/08/30 15:04:20 INF 1 [testTopic1/ch1] (li-peng-mc-macbook.local:8000) beginning close 2019/08/30 15:04:20 INF 1 [testTopic1/ch1] (li-peng-mc-macbook.local:8000) readLoop exiting 2019/08/30 15:04:20 INF 1 [testTopic1/ch1] (li-peng-mc-macbook.local:8000) breaking out of writeLoop 2019/08/30 15:04:20 INF 1 [testTopic1/ch1] (li-peng-mc-macbook.local:8000) writeLoop exiting 2019/08/30 15:04:20 INF 1 [testTopic1/ch1] (li-peng-mc-macbook.local:8000) finished draining, cleanup exiting 2019/08/30 15:04:20 INF 1 [testTopic1/ch1] (li-peng-mc-macbook.local:8000) clean close complete 2019/08/30 15:04:20 WRN 1 [testTopic1/ch1] there are 2 connections left alive
並且nsqd
和nsqlookupd
也斷開了連接,客戶端再次從nsqlookupd
取所有的nsqd
的地址時得到的總是可用的地址。
去中心化實現原理
nsqlookupd
用於管理整個網路拓撲結構,nsqd用他實現服務的註冊,客戶端使用他得到所有的nsqd服務節點資訊,然後所有的consumer端連接
實現原理如下,
nsqd
把自己的服務資訊廣播給一個或者多個nsqlookupd
客戶端
連接一個或者多個nsqlookupd
,通過nsqlookupd
得到所有的nsqd
的連接資訊,進行連接消費,- 如果某個
nsqd
出現問題,down機了,會和nsqlookupd
斷開,這樣客戶端
從nsqlookupd
得到的nsqd
的列表永遠是可用的。客戶端
連接的是所有的nsqd
,一個出問題了就用其他的連接,所以也不會受影響。