RabbitMQ 入門 (Go) – 4. 使用 Fanout Exchange 做服務發現(上)
到目前為止,我們項目的結果大致如下:
-
感測器生成的模擬數據(包含感測器名稱、數據、時間戳)是通過感測器在運行時動態創建的 Queue 來發送的。這些 Queue 很難直接被發現。
-
為了解決這個問題,我創建了另一個消息,它包含各感測器的 Queue 的路由 key,這個消息是在一個「眾所周知」的 Queue 上發布的,所以協調器就可以得到感測器的路由資訊。
-
感測器的數據是發布在默認的 Direct Exchange 上,也就是說只有一個消費者可以收到這個消息,這就是我們想要的效果。具體的,無論有多少個協調器,RabbitMQ 會保證只有一個協調器會收到資訊,並且只會收到一次。
-
然後,用於發現感測器的路徑確有不同的需求,如果存在多個協調器,那麼當感測器上線的時候,所有的協調器都必須得知,所以就不能使用 Direct Exchange 了。這時使用 Fanout Exchange 就比較合理了,Fanout Exchange 將會同時通知所有附加在 Exchange 上面的 Queue,也就是把感測器的路由資訊發送給所有在線的協調器。
-
但是這也有其他問題:如果沒有接收者監聽,那麼這些路由資訊不會保留,這個問題稍後再解決,我們先把發布路由資訊的 Exchange 從 Direct 改為 Fanout。
使用 Fanout Exchange 發布感測器路由資訊
目前,在感測器項目中,我們使用默認的 Direct Exchange 來發布感測器路由消息:
看一下管理控制台,可以看到 RabbitMQ 還提供了一個 Fanout Exchange 也就是 amq.fanout:
修改程式碼,暫時改用 amq.fanout 來發布感測器路由資訊:
-
首先,刪除第 38 行的程式碼,它原是用來創建一個 Queue 以便協調程式可以接收到感測器的路由資訊。現在,這個工作將由 Exchange 的消費者們來完成,它們會創建自己的 Queue 來監聽這個 Exchange。
-
第 43 行,把路由 Key 改為 「」,因為 Fanout Exchange 不需要使用該 Key 來決定消息發往哪裡,它會把消息進行複製並發送到每個綁定到它的 Queue 上面。
-
最後,第 42 行,把 exchange 這個參數改為 amq.fanout。
運行 sensors 項目查看效果
打開控制台:
可以看到 amq.fanout 確實有數據了,儘管現在的消息傳遞速率為 0。
點進去:
可以看到一個路由資訊,但是因為沒有任何 Queue 綁定到這個 Exchange,這個消息就丟失了,因為消息無處可發。
重建協調器
在最早幾節內容中,我做了一個非常簡單的協調器程式,它可以簡單的發布和接收消息。為了配合我們的應用場景,我們需要建立一個更健壯一些的協調器。它的主要職責是:通過消息代理(RabbitMQ)與感測器進行交互。
不過首先,為了程式碼復用,我對現有的項目結構進行調整:
我把項目的外層目錄名從 sensors 改為 demo,然後在裡面建立sensors 文件夾,把 main.go 移動到 sensors 裡面,並改名為 sensor.go。
然後建立 coordinator 文件夾,在裡面建立 queuelistener.go 文件,內容較多,我分為三個圖展示:
-
第 15 行,建立 QueueListener struct,它裡面包含發現感測器數值 Queue 的邏輯,接收它們的消息,並把它們在一個事件聚合器裡面翻譯成事件。不過目前它主要聚焦獲取消息這項工作,所以它有三個欄位:
-
到 RabbitMQ 的連接
-
在該連接上的 Channel
-
一個 Map,當作註冊表,裡面存放著這個協調器所監聽的源,使用 Map 可以防止將同一個感測器註冊兩次,而當感測器下線的時候可以通過這個 Map 來關閉監聽(這個我就不實現了)
-
第 21 行,建立一個構造函數,它可以返回一個 *QueueListener
第 31 行創建一個方法 ListenForNewSource:
-
它可以讓 QueueListener 發現新的感測器,在這裡創建 Queue 的時候,我們不關心 Queue 的名稱,所以 name 參數為「」,這樣的話 RabbitMQ 會為它創建一個唯一的名稱。
-
但是當 Queue 被創建時,它會默認綁定到 Direct Exchange。而在之前,我剛把程式碼修改為讓感測器通過 amq.fanout Exchange 來發布它們的資訊,所以我們需要把這個 Queue 重新綁定到那個上面。這裡就使用 Channel 上的 QueueBind 方法來實現(第 33 行)。
-
QueueBind 方法參數:
-
第一個參數是剛剛創建的 Queue 的名稱,這就是要綁定的 Queue
-
第二個參數是路由 Key,由於 Fanout Exchange 會忽略這個參數,所以這裡寫「」
-
第三個參數是要綁定的 Exchange 的名稱,也就是 amq.fanout
-
第四個參數,如果把 noWait 設置為 true,那麼萬一綁定不成功,就會把 Channel 關閉。這裡我把它設為 false,因為我知道 Exchange 和 Queue 都會存在,如果失敗,那麼會關閉 Channel 並發生錯誤。
-
第五個參數不需要,設為 nil
-
第 40 行,設置消息的接收,返回 Go Channel,這裡的參數需要用到 Queue 的名稱
-
第 49 行,通過 for range 來處理通過 Go Channel 發過來的消息。如果接收到消息,表示有新的感測器上線了。
-
第 50 行,在有感測器上線後,通過 Consume 方法和 msg.Body(也就是感測器的名稱),來讀取感測器的模擬數據。記得我們把感測器的模擬數據發布到了默認的 Direct Exchange 上面,所以每次只會把消息傳遞給一個接收者,這意味著,當我註冊了多個協調器的時候,它們將共享到這些 Queue 的訪問,當這些發生的時候,RabbitMQ 將會輪流傳送給每一個註冊的接收者。這也就允許我們對協調器進行橫向擴展,而且不影響整個系統其餘的部分。
-
第 59 行,判斷感測器是否在該協調器中註冊,如果沒有,那就進行註冊。
-
第 62 行,使用 goroutine 來調用 AddListener 方法,該方法程式碼如下:
-
這個方法將會監聽 Go Channel 中的消息
-
在裡面使用 for range 來等待 Go Channel 傳送消息
-
在這裡,我們把二進位數據轉化為我們可以在程式里使用的數據,也就是 SensorMessage 類型
-
然後暫時先列印即可
建立協調器的 main
在 coordinator 目錄下建立 exec 文件夾,目的是創建 main package,在裡面創建 main.go 程式碼如下:
-
第 9 行,我們創建一個 QueueListener
-
第 10 行,使用 goroutine 讓他進行監聽,防止阻塞主執行緒
-
第 12-13 行的目的就是讓程式一直存活,防止 goroutine 停止運行。
最後 sensor.go 裡面有一處程式碼需要修改,在 main 函數的 for 循環裡面,每次使用 encoder 的時候都需要 重新創建一個,所以我添加了 63 行的程式碼:
運行
我們運行一下試試,注意:一定要先運行 coordinator 項目,然後再運行 sensors 項目,否則會有問題。 下面左側是 coordinator,右側是 sensors:
可以看到 coordinator(協調器)可以讀取到感測器的數據了。
這裡我們使用了一個最簡單最基本的機制來做感測器 Queue 的發現。