RabbitMQ 入門 (Go) – 5. 使用 Fanout Exchange 做服務發現(下)
到目前為止,我一直專註於如何讓消息進出消息代理,也就是RabbitMQ。
實際上,我們可以繼續使用 RabbitMQ 和它的 Exchanges 來連接這個應用程式的其他部分,但是我想探索一個稍微不同的模型:我想使用協調器來跟蹤哪些類型的消費者得到消息通知。
這樣的話,我斷開了感測器數據生成器和數據使用者之間的連接。
同時為了處理這些數據通訊,我決定使用事件(event)來通知用戶系統中正在發生的事情,並讓他們決定是否要處理數據。
其原理大致如下:
-
在協調器內部,我們有構建好的 QueueListener。
-
我還需要構建另外一個類型,我叫它 EventAggregator。
-
來自RabbitMQ 的消息,它將通過一個非同步的goroutine 進入QueueListener
-
goroutine 將把消息傳輸到一個事件對象(event object)中,並通過事件聚合對象(event aggregation object)進行廣播。
-
該對象將維護任何對事件感興趣的使用者的註冊表,並向其發送事件對象的副本。
-
這使我們能夠通過將數據轉儲到下游的 Queue 來為這些事件註冊其他應用程式,但它也可以讓使用者能夠在協調器內部進行設置,例如日誌系統。
-
最後,如果使用者最終要通過 Queue 將數據發送到另一個應用程式,則可以對其進行預處理,以添加有用的附加數據,而最終使用者不必知道這些附加資訊是如何到達那裡的。
編寫程式碼
創建 EventAggregator
在 coordinator 目錄下添加 eventaggregator.go,程式碼如下:
-
第 28 行,建立 EventData struct,目前它的欄位碰巧和 SensorMessage 是一樣的,但是兩個 struct 的職責不同,所以我們不復用 SensorMessage,而是單獨建立 EventData,以便它們以後可以獨立的進化;
-
第 5 行,建立了 EventAggregator struct,也就是事件聚合,它只有一個 listeners 欄位,是一個 map,它的 key 是事件的名稱,它的值是回調函數的集合。當事件發生的時候,EventAggregator 就輪流調用為該事件註冊的回調函數;
-
第 9 行,就是 EventAggregator 的構造函數;
-
第 16 行,AddListener 方法,使用者通過該方法可以向 EventAggregator 註冊回調函數;
-
第 20 行,PublishEvent 方法用來發布事件。它接收事件名稱和事件的數據作為參數。這裡需要判斷 EventAggregator 里是否已經註冊了該事件,如果註冊了,那麼遍歷其對應的回調函數,並使用事件數據進行調用。
-
調用回調函數時,使用的不是 EventData 的指針,而是 EventData 的副本,這可以保證使用者不會把事件數據搞亂,影響其它使用者
-
取消訂閱的功能我就不做了。
把 EventAggregator 連接到 QueueListener
打開 queuelistener.go,添加程式碼:
-
第19 行,在QueueListener struct 裡面添加欄位ea,類型是 *EventAggregator;
-
第 25 行,在 QueueListener 的構造函數里為 ea 自讀賦初始值。
在 AddListener 方法里,原來只是把原始數據列印到控制台。現在添加如下程式碼:
-
創建一個 EventData,其欄位內容目前和感測器的消息內容一樣;
-
使用 QueueListener 上的 EventAggregator 發布事件:
-
事件的名稱是 MessageReceived_感測器名稱
-
第二個參數就是事件數據
發現早已運行的感測器
最後我們要做的就是如何讓協調器發現在協調器上線前就已經在運行的感測器。
目前我們的做法是這樣的:首先協調器先運行,然後感測器在上線的時候立即把它們的數據Queue 發送過去,使用的是 Fanout Exchange,這樣多個協調器都可以被通知到。
但是,如果感測器先運行,協調器後運行,那麼協調器就無法知道感測器的存在,為了解決這個問題,我這樣做:
-
我在消息代理中也就是 RabbitMQ 里,建立一個新的 Exchange,它是一個 Fanout Exchange,它和其它資訊流的方向正好相反。
-
在這裡,協調器將會向這個 Fanout Exchange 發出一個「發現」請求,這個資訊將會發送給所有的感測器。
-
感測器接收到這個「發現」請求資訊後,將會響應,將它們的數據 Queue 的名稱發送給我們以前建立的那個 Fanout Exchange(中間黃色的)。
-
這裡會出現一些冗餘的資訊,但協調器里有過濾機制,所以就這樣吧。
我們首先測試一下先運行感測器項目,再運行協調器項目的效果:
可以看到,協調器運行起來以後,沒有接收到該感測器的數據。
修改 queuetools
我們要解決的就是這個問題,下面看程式碼,首先看 queuetools.go:
這裡改動不多,就是把要新建立的 Fanout Exchange 的名稱作為常量存在這裡。
注意之前在這裡定義的 SensorListQueue 已經不需要了,可以刪掉。
修改 queuelistener
然後看 queuelistener.go,在這裡為 QueueListener 添加一個DiscoverSensors 方法:
該方法中首先我使用了 ExchangeDeclare 方法來聲明這個新的 Exchange,並進行設置。
雖然項目中還沒用過這個方法,但是裡面大多數參數的作用你應該能夠猜得出來:
-
name:Exchange 的名稱
-
kind:Exchange 的類型,可以是 direct、topic、header 或者 fanout,這裡使用 fanout
-
durable:表示這個 Exchange 是否可持久
-
autoDelete:表示在沒有綁定的情況下是否刪除 Exchange
-
internal:這個參數我們還沒見過,如果想拒絕外部的發布請求,就把這個設為 true。這可以在高級場景中使用,在高級場景中,Exchange 綁定在一起,在消息代理中形成更複雜的拓撲。
-
noWait 和 args 就不介紹了。
現在,協調器可以向這個 Exchange 發布消息了。而我們只需要向它發送一個消息即可,並沒有什麼具體的內容要發送,所以我發布了一個空的 Publishing,這就可以告訴瀏覽器我在尋找它們了。
修改感測器
下面我們讓感測器(sensor.go)對上面發布的「發現」請求進行響應,不過首先,需要重構一下。
把 main 函數裡面當感測器上面時,發布數據 Queue 名稱那部分程式碼提取出來放在單獨的一個函數裡面:
然後在 main 函數相應的位置進行調用:
-
第 39 行,對重構的函數進行調用。
-
第 41 行,創建一個 Queue
-
第 42 行,使用 QueueBind 方法將這個 Queue 和 SensorDiscovery Exchange
-
第 48 行,創建goroutine 運行一個將要新建的函數 listenForDiscoveryRequests。通過使用 goroutine,無論當請求什麼時候進來,這部分邏輯都將可用,而且不會阻塞系統的其餘部分。這裡需要傳入 Queue 的名稱和 Channel。
然後看一下 listenForDiscoveryRequests 函數:
這裡使用 Channel 的 Consume 方法對 Channel進行設置以便能接收「發現」請求。
然後用 for range 來接收「發現」請求。這裡忽略消息本身即可,因為該消息就是一個觸發而已。當消息進來時,調用剛剛重構出來的 publishQueueName 函數即可。
在 queuelistener 里調用發現方法
在 queuelistener.go 的 ListenForNewSource 方法里,在如下位置調用 DiscoverSensors 方法:
為什麼在這裡調用?因為這是可以保證協調器正在監聽感測器路由的消息的第一個地方。
運行測試
先運行一個感測器,然後在運行協調器:
感測器這裡我使用了 freq 參數,讓其每兩秒鐘生成一個數據。
可以看到,在這種情況下協調器也可以發現已經運行的感測器並接收數據了。
你可以運行多個感測器和多個協調器,應該也會好用的。
這也是一種非常簡單的分散式應用吧。