Go之NSQ簡介,原理和使用
NSQ簡介
NSQ是Go語言編寫的一個開源的實時分散式記憶體消息隊列,其性能十分優異。
NSQ 是實時的分散式消息處理平台,其設計的目的是用來大規模地處理每天數以十億計級別的消息。它具有分散式和去中心化拓撲結構,該結構具有無單點故障、故障容錯、高可用性以及能夠保證消息的可靠傳遞的特徵
適合小型項目使用,用來學習消息隊列實現原理、學習 golang channel知識以及如何用 go 來寫分散式,為什麼說適合小型小型項目使用因為,nsq 如果沒有能力進行二次開發的情況存在的問題還是很多的。
NSQ優勢
/*
1. NSQ提倡分散式和分散的拓撲,沒有單點故障,支援容錯和高可用性, 並提供可靠的消息交付保證.
2. NSQ支援橫向擴展,沒有任何集中式代理
3. NSQ易於配置和部署,並且內置了管理介面
*/
NSQ特性
/*
1. 支援無 SPOF 的分散式拓撲
2. 水平擴展(沒有中間件,無縫地添加更多的節點到集群)
3. 低延遲消息傳遞 (性能)
4. 結合負載均衡和多播消息路由風格
5. 擅長面向流媒體(高通量)和工作(低吞吐量)工作負載
6. 主要是記憶體中(除了高水位線消息透明地保存在磁碟上)
7. 運行時發現消費者找到生產者服務(nsqlookupd)
8. 傳輸層安全性 (TLS)
9. 數據格式不可知
10. 一些依賴項(容易部署)和健全的,有界,默認配置
11. 任何語言都有簡單 TCP 協議支援客戶端庫
12. HTTP 介面統計、管理行為和生產者(不需要客戶端庫發布)
13. 為實時檢測集成了 statsd
14. 健壯的集群管理介面 (nsqadmin)
*/
注意點
/*
1. 消息默認不持久化, 可以配置成持久化模式, nsq採用的方式是記憶體+硬碟的模式,當記憶體到一定程度就會持久化到硬碟.
如果將 --mem-queue-size設置為0, 所有的消息將會存儲到磁碟.
伺服器重啟時也會將在記憶體中的消息持久化
2. 每條消息至少傳遞一次
3. 消息不保證有序.
*/
NSQ應用場景
參考上圖利用消息隊列把業務流程中的非關鍵流程非同步化, 從而顯著降低業務請求的響應時間
應用解耦
通過使用消息隊列將不同業務邏輯解耦,降低系統間耦合,提高系統的健壯性, 後續有其他的業務要使用訂單數據可直接訂閱消息隊列, 提高系統的靈活性.
流量削峰
類似秒殺(大秒)等場景下,某一時間可能會產生大量的請求,使用消息隊列能夠為後端處理請求提供一定的緩衝區,保證後端服務的穩定性。
NSQ架構
NSQ模組介紹
nsqd:
是一個進程監聽了http,tcp兩種協議, 用來創建topic,channel, 分發消息給消費者,向nsqlooup 註冊自己的元數據資訊(topic、channel、consumer),自己的服務資訊,最核心模組。
nsqd 是一個守護進程,負責接收,排隊,投遞消息給客戶端。也就是說這個服務是幹活的。它可以獨立運行,不過通常它是由 nsqlookupd 實例所在集群配置的。
/*
特性:
1. 對訂閱了同一個topic,同一個channel的消費者使用負載均衡策略(不是輪詢)
2. 只要channel存在,即使沒有該channel的消費者,也會將生產者的message快取到隊列中(注意消息的過期處理)
3. 保證隊列中的message至少會被消費一次,即使nsqd退出,也會將隊列中的消息暫存磁碟上(結束進程等意外情況除外)
4. 限定記憶體佔用,能夠配置nsqd中每個channel隊列在記憶體中快取的message數量,一旦超出,message將被快取到磁碟中
5. topic,channel一旦建立,將會一直存在,要及時在管理台或者用程式碼清除無效的topic和channel,避免資源的浪費
*/
nsqlookup:
存儲了nsqd的元數據和服務資訊(endpoind),向消費者提供服務發現功能, 向nsqadmin提供數據查詢功能.
nsqlookupd 是守護進程負責管理拓撲資訊。客戶端通過查詢 nsqlookupd 來發現指定話題(topic)的生產者,並且 nsqd 節點廣播話題(topic)和通道(channel)資訊。也就是說nsqlookupd是管理者。
/*
特性:
1. 唯一性,在一個Nsq服務中只有一個nsqlookupd服務。當然也可以在集群中部署多個nsqlookupd,但它們之間是沒有關聯的.
2. 去中心化,即使nsqlookupd崩潰,也會不影響正在運行的nsqd服務
3. 充當nsqd和naqadmin資訊交互的中間件
4. 提供一個http查詢服務,給客戶端定時更新nsqd的地址目錄.
*/
nsqadmin:
簡單的管理介面,展示了topic, channel以及channel上的消費者,也可以創建topic,channel
/*
特性:
1. 提供一個對topic和channel統一管理的操作介面以及各種實時監控數據的展示,介面設計的很簡潔,操作也很簡單
2. 展示所有message的數量
3. 能夠在後台創建topic和channel
4. nsqadmin的所有功能都必須依賴於nsqlookupd,nsqadmin只是向nsqlookupd傳遞用戶操作並展示來自nsqlookupd的數據
*/
NSQ工作模式
Topic和Channel
每個nsqd實例旨在一次處理多個數據流。這些數據流稱為
「topics」
,一個topic
具有1個或多個「channels」
。每個channel
都會收到topic
所有消息的副本,實際上下游的服務是通過對應的channel
來消費topic
消息。
topic
和channel
不是預先配置的。topic
在首次使用時創建,方法是將其發布到指定topic
,或者訂閱指定topic
上的channel
。channel
是通過訂閱指定的channel
在第一次使用時創建的。
topic
和channel
都相互獨立地緩衝數據,防止緩慢的消費者導致其他chennel
的積壓(同樣適用於topic
級別)。
channel
可以並且通常會連接多個客戶端。假設所有連接的客戶端都處於準備接收消息的狀態,則每條消息將被傳遞到隨機客戶端。例如:
生產者向某個topic中發送消息,如果topic有一個或者多個channle,那麼該消息會被複制多分發送到每一個channel中。類似 rabbitmq中的fanout類型,channle類似隊列。 官方說 nsq 是分散式的消息隊列服務,但是在我看來只有channel到消費者這部分提現出來分散式的感覺,nsqd 這個模組其實就是單點的,nsqd 將 topic、channel、以及消息都存儲在了本地磁碟,官方還建議一個生產者使用一個 nsqd,這樣不僅浪費資源還沒有數據備份的保障。一旦 nsqd 所在的主機磁損壞,數據都將丟失。
總而言之,消息是從topic--> channel (每個channel接受該topic的所有消息的副本)多播的,但是從channel --> consumers均勻分布 (每個消費者接受該channel的一部分消息)
NSQ接受和發送消息流程
Centos安裝NSQ
下載
wget //s3.amazonaws.com/bitly-downloads/nsq/nsq-1.2.0.linux-amd64.go1.12.9.tar.gz
tar xf nsq-1.2.0.linux-amd64.go1.12.9.tar.gz -C /usr/local/
本地解析Hosts
[root@nsq-47 ~]# tail -1 /etc/hosts
192.168.43.47 nsq-47
啟動
# 開三個終端,分別按順序啟動
./nsqlookupd
./nsqd --lookupd-tcp-address=192.168.43.47:4160
./nsqadmin --lookupd-http-address=192.168.43.47:4161
# 訪問
//192.168.43.47:4171
Go操作NSQ
安裝go客戶端
/*
go get -u github.com/nsqio/go-nsq
*/
生產者
// nsq_producer/main.go
package main
import (
"bufio"
"fmt"
"os"
"strings"
"github.com/nsqio/go-nsq"
)
// NSQ Producer Demo
var producer *nsq.Producer
// 初始化生產者
func initProducer(str string) (err error) {
config := nsq.NewConfig()
producer, err = nsq.NewProducer(str, config)
if err != nil {
fmt.Printf("create producer failed, err:%v\n", err)
return err
}
return nil
}
func main() {
nsqAddress := "192.168.43.47:4150"
err := initProducer(nsqAddress)
if err != nil {
fmt.Printf("init producer failed, err:%v\n", err)
return
}
reader := bufio.NewReader(os.Stdin) // 從標準輸入讀取
for {
data, err := reader.ReadString('\n')
if err != nil {
fmt.Printf("read string from stdin failed, err:%v\n", err)
continue
}
data = strings.TrimSpace(data)
if strings.ToUpper(data) == "Q" { // 輸入Q退出
break
}
// 向 'topic_demo' publish 數據
err = producer.Publish("topic_demo", []byte(data))
if err != nil {
fmt.Printf("publish msg to nsq failed, err:%v\n", err)
continue
}
}
}
消費者
在
/nodes
這個頁面我們能夠很方便的查看當前接入lookupd
的nsqd
節點。
這個
/counter
頁面顯示了處理的消息數量,因為我們沒有接入消費者,所以處理的消息數量為0。
在
/lookup
介面支援創建topic
和channel
。
// nsq_consumer/main.go
package main
import (
"fmt"
"os"
"os/signal"
"syscall"
"time"
"github.com/nsqio/go-nsq"
)
// NSQ Consumer Demo
// MyHandler 是一個消費者類型
type MyHandler struct {
Title string
}
// HandleMessage 是需要實現的處理消息的方法
func (m *MyHandler) HandleMessage(msg *nsq.Message) (err error) {
fmt.Printf("%s recv from %v, msg:%v\n", m.Title, msg.NSQDAddress, string(msg.Body))
return
}
// 初始化消費者
func initConsumer(topic string, channel string, address string) (err error) {
config := nsq.NewConfig()
config.LookupdPollInterval = 15 * time.Second
c, err := nsq.NewConsumer(topic, channel, config)
if err != nil {
fmt.Printf("create consumer failed, err:%v\n", err)
return
}
consumer := &MyHandler{
Title: "沙河1號",
}
c.AddHandler(consumer)
// if err := c.ConnectToNSQD(address); err != nil { // 直接連NSQD
if err := c.ConnectToNSQLookupd(address); err != nil { // 通過lookupd查詢
return err
}
return nil
}
func main() {
err := initConsumer("topic_demo", "first", "192.168.43.47:4161")
if err != nil {
fmt.Printf("init consumer failed, err:%v\n", err)
return
}
c := make(chan os.Signal) // 定義一個訊號的通道
signal.Notify(c, syscall.SIGINT) // 轉發鍵盤中斷訊號到c
<-c // 阻塞
}