RabbitMQ 入門 (Go) – 4. 使用 Fanout Exchange 做服務發現(上)

到目前為止,我們項目的結果大致如下:

 

 

  • 感測器生成的模擬數據(包含感測器名稱、數據、時間戳)是通過感測器在運行時動態創建的 Queue 來發送的。這些 Queue 很難直接被發現。

  • 為了解決這個問題,我創建了另一個消息,它包含各感測器的 Queue 的路由 key,這個消息是在一個「眾所周知」的 Queue 上發布的,所以協調器就可以得到感測器的路由資訊。

  • 感測器的數據是發布在默認的 Direct     Exchange 上,也就是說只有一個消費者可以收到這個消息,這就是我們想要的效果。具體的,無論有多少個協調器,RabbitMQ 會保證只有一個協調器會收到資訊,並且只會收到一次。

  • 然後,用於發現感測器的路徑確有不同的需求,如果存在多個協調器,那麼當感測器上線的時候,所有的協調器都必須得知,所以就不能使用 Direct     Exchange 了。這時使用 Fanout Exchange 就比較合理了,Fanout Exchange 將會同時通知所有附加在 Exchange 上面的 Queue,也就是把感測器的路由資訊發送給所有在線的協調器。

  • 但是這也有其他問題:如果沒有接收者監聽,那麼這些路由資訊不會保留,這個問題稍後再解決,我們先把發布路由資訊的 Exchange 從 Direct 改為 Fanout。

 

使用 Fanout Exchange 發布感測器路由資訊

目前,在感測器項目中,我們使用默認的 Direct Exchange 來發布感測器路由消息:

 

 

看一下管理控制台,可以看到 RabbitMQ 還提供了一個 Fanout Exchange 也就是 amq.fanout: 

 

 

 

 

修改程式碼,暫時改用 amq.fanout 來發布感測器路由資訊:

 

 

 

  1. 首先,刪除第 38 行的程式碼,它原是用來創建一個 Queue 以便協調程式可以接收到感測器的路由資訊。現在,這個工作將由 Exchange 的消費者們來完成,它們會創建自己的 Queue 來監聽這個 Exchange。

  2. 第 43 行,把路由 Key 改為 「」,因為 Fanout Exchange 不需要使用該 Key 來決定消息發往哪裡,它會把消息進行複製並發送到每個綁定到它的 Queue 上面。

  3. 最後,第 42 行,把 exchange 這個參數改為 amq.fanout。

 

運行 sensors 項目查看效果

 

 

 

打開控制台:

 

 

 

可以看到 amq.fanout 確實有數據了,儘管現在的消息傳遞速率為 0。

 

點進去:

 

 

 

可以看到一個路由資訊,但是因為沒有任何 Queue 綁定到這個 Exchange,這個消息就丟失了,因為消息無處可發。

  

重建協調器

在最早幾節內容中,我做了一個非常簡單的協調器程式,它可以簡單的發布和接收消息。為了配合我們的應用場景,我們需要建立一個更健壯一些的協調器。它的主要職責是:通過消息代理(RabbitMQ)與感測器進行交互。

不過首先,為了程式碼復用,我對現有的項目結構進行調整:

 

 

 

我把項目的外層目錄名從 sensors 改為 demo,然後在裡面建立sensors 文件夾,把 main.go 移動到 sensors 裡面,並改名為 sensor.go。

 

然後建立 coordinator 文件夾,在裡面建立 queuelistener.go 文件,內容較多,我分為三個圖展示:

 

 

 

  1. 第 15 行,建立 QueueListener struct,它裡面包含發現感測器數值 Queue 的邏輯,接收它們的消息,並把它們在一個事件聚合器裡面翻譯成事件。不過目前它主要聚焦獲取消息這項工作,所以它有三個欄位:

    1. 到 RabbitMQ 的連接

    2. 在該連接上的 Channel

    3. 一個 Map,當作註冊表,裡面存放著這個協調器所監聽的源,使用 Map 可以防止將同一個感測器註冊兩次,而當感測器下線的時候可以通過這個 Map 來關閉監聽(這個我就不實現了)

  2. 第 21 行,建立一個構造函數,它可以返回一個 *QueueListener

 

 

 第 31 行創建一個方法 ListenForNewSource:

  1. 它可以讓 QueueListener 發現新的感測器,在這裡創建 Queue 的時候,我們不關心 Queue 的名稱,所以 name 參數為「」,這樣的話 RabbitMQ 會為它創建一個唯一的名稱。

  2. 但是當 Queue 被創建時,它會默認綁定到 Direct Exchange。而在之前,我剛把程式碼修改為讓感測器通過 amq.fanout Exchange 來發布它們的資訊,所以我們需要把這個 Queue 重新綁定到那個上面。這裡就使用 Channel 上的 QueueBind 方法來實現(第 33 行)。

  3. QueueBind 方法參數:

    1. 第一個參數是剛剛創建的 Queue 的名稱,這就是要綁定的 Queue

    2. 第二個參數是路由 Key,由於 Fanout Exchange 會忽略這個參數,所以這裡寫「」

    3. 第三個參數是要綁定的 Exchange 的名稱,也就是 amq.fanout

    4. 第四個參數,如果把 noWait 設置為 true,那麼萬一綁定不成功,就會把 Channel 關閉。這裡我把它設為 false,因為我知道 Exchange 和 Queue 都會存在,如果失敗,那麼會關閉 Channel 並發生錯誤。

    5. 第五個參數不需要,設為 nil

  4. 第 40 行,設置消息的接收,返回 Go Channel,這裡的參數需要用到 Queue 的名稱

  5. 第 49 行,通過 for range 來處理通過 Go Channel 發過來的消息。如果接收到消息,表示有新的感測器上線了。

  6. 第 50 行,在有感測器上線後,通過 Consume 方法和 msg.Body(也就是感測器的名稱),來讀取感測器的模擬數據。記得我們把感測器的模擬數據發布到了默認的 Direct Exchange 上面,所以每次只會把消息傳遞給一個接收者,這意味著,當我註冊了多個協調器的時候,它們將共享到這些 Queue 的訪問,當這些發生的時候,RabbitMQ 將會輪流傳送給每一個註冊的接收者。這也就允許我們對協調器進行橫向擴展,而且不影響整個系統其餘的部分。

  7. 第 59 行,判斷感測器是否在該協調器中註冊,如果沒有,那就進行註冊。

  8. 第 62 行,使用 goroutine 來調用 AddListener 方法,該方法程式碼如下:

 

 

  1. 這個方法將會監聽 Go Channel 中的消息

  2. 在裡面使用 for range 來等待 Go Channel 傳送消息

  3. 在這裡,我們把二進位數據轉化為我們可以在程式里使用的數據,也就是     SensorMessage 類型

  4. 然後暫時先列印即可

 

建立協調器的 main

在 coordinator 目錄下建立 exec 文件夾,目的是創建 main package,在裡面創建 main.go 程式碼如下:

 

 

  1. 第 9 行,我們創建一個 QueueListener

  2. 第 10 行,使用 goroutine 讓他進行監聽,防止阻塞主執行緒

  3. 第 12-13 行的目的就是讓程式一直存活,防止 goroutine 停止運行。

最後 sensor.go 裡面有一處程式碼需要修改,在 main 函數的 for 循環裡面,每次使用 encoder 的時候都需要 重新創建一個,所以我添加了 63 行的程式碼:

 

 

運行 

我們運行一下試試,注意:一定要先運行 coordinator 項目,然後再運行 sensors 項目,否則會有問題。 下面左側是 coordinator,右側是 sensors:

 

 

可以看到 coordinator(協調器)可以讀取到感測器的數據了。 

這裡我們使用了一個最簡單最基本的機制來做感測器 Queue 的發現。

 
 
 
Exit mobile version