RabbitMQ 入門 (Go) – 6. 數據持久化(上)
從本節開始,我介紹一下如何將相關數據持久化到資料庫,也就是上圖中藍色的部分。
目前的問題
我先運行 6 個感測器和2 個協調器,這裡我使用了批處理文件:
運行後,看一下 RabbitMQ 的管理控制台:
注意上面前面幾個 Queue,這些 Queue 就是我們讓感測器和協調器監聽那兩個 Fanout Exchange 時創建的,因為這兩個 Exchange 不使用路由 Key 來決定接收者,我使用了空字元串「」作為這些 Queue 的名稱,而RabbitMQ 就會為它們賦予一個唯一的名字。
因為目前創建的 Queue 都是臨時的,如果我重新啟動系統,RabbitMQ會創建另一套不同的 Queue 來完成工作,這樣的話系統資源就會被慢慢的耗盡,所以這個問題需要解決。
調整 autoDelete 參數
首先修改 tools 包下的 queuetools.go 裡面的GetQueue 函數,添加一個 autoDelete 參數:
GetQueue 函數會確保創建一個Queue 從而能接收到消息。剛創建它的時候,我的意圖是讓它作用於 Direct Exchange 和命名的 Queue。後來我對它進行了擴展使用,也可以應用於匿名的 Queue。
再說一下 autoDelete 參數的作用是:若值為 true,那麼如果一個 Queue 它沒有被註冊任何的使用者,這個 Queue 就會被刪除。針對上述問題中的臨時 Queue,這就是我想要的效果。但是針對感測器的數據 Queue,我還是希望在系統重啟後,這些 Queue 能夠保留。
所以,我為該函數添加了一個 autoDelete 參數,在創建 Queue 的時候,可以對 autoDelete 進行設置。
有三個調用該函數的地方需要調整程式碼,先打開 sensors.go:
-
針對感測器傳送數據的 Queue,我要讓它能夠保留下來,所以 autoDelete 就是 false
-
而 discoveryQueue 是用來監聽協調器的「發現」請求的,我想讓每個感測器每次上線都會得到一個新的 Queue,這裡 autoDelete 就設置為 true,這樣的話 RabbitMQ 就會把舊的 Queue 自動清理掉。
調整 queuelistener.go 裡面的調用:
這裡得到的臨時 Queue 是用來監聽感測器上線時或響應協調器發現請求時來發布數據 Queue 名稱的。
這裡函數調用的 autoDelete 參數也設置為 true,從而讓它們可以自動被清除掉。
測試運行
把之前的 Queue 都刪掉:
然後再運行 5 個感測器和2 個協調器:
現在又是很多的 Queue。
然後我們再停掉所有的感測器和協調器:
可以看到感測器傳送數據的 Queue 被保留了,而其它的臨時 Queue 都自動刪除掉了,這就是我們想要的效果。
泛化事件數據
到目前為止,系統中只發布了一種類型的事件(接收到感測器數據時的事件),而且目前還沒有任何使用者監聽這個事件。接下來我們就要完善事件這部分功能了,但首先必須做出一些優化修改,以便能真正滿足需求。
目前 eventaggregator.go 裡面包含了所有添加監聽者以及向監聽者發布事件的方法。
但現在的情況是事件的使用者也知道如何自行發布事件,這點不太好,因為它們不需要這樣做。程式碼修改如下:
-
為了盡量少的暴露功能,我為事件的使用者創建了 EventRaiser 這個介面,它裡面只有一個 AddListener 方法,與已經實現的 AddListener 方法相幾乎完全匹配。
-
但是我把介面里 AddListener 的第二個參數,也就是回調函數裡面的參數類型改為了 interface{},從而可以接收多種類型的數據。
-
相應的,後邊所有涉及事件數據參數的地方都改為 interface{}
現在 EventAggregator 被泛化了,我也可以發布其它類型的事件了。
來到 queuelistener.go,我想在協調器發現數據源之後,發布一個事件:
這個事件的名稱叫做 DataSourceDiscovered,事件數據就是 Queue 的名稱,由於這個參數的類型是 interface{},所以它可以正常的傳遞進去。
創建數據的使用者
目前,我們整個系統的設計一共有三層,而數據源和數據的使用者是通過協調器分開的。這樣做的好處是,關於如何處理消息的業務邏輯都集中在協調器這一層上面了,而數據源和數據使用者層僅關注它們自身的任務即可。
為了達到這個目的,需要在 coordinator 目錄下創建一個 databaseconsumer.go 文件:
這個文件的作用是監聽整個系統發出的事件,並決定哪些事件可以轉發到數據管理包(我一會要建立的)。
dataconsumer.go
首先看一下 dataconsumer.go 文件的內容:
-
第 15 行建立 DatabaseConsumer struct,它有5個欄位:
-
第一個欄位類型是 EventRaiser 介面,該介面只能用於監聽,而不能發布事件,這就是該介面的目的。
-
接下來三個欄位都是與 RabbitMQ 相關的。
-
最後一個欄位是註冊的監聽器的 Queue 名稱的集合。
-
第 23 行,為 DatabaseConsumer 創建一個構造函數。它接收 EventRaiser 作為參數,並創建 RabbitMQ 相關的連接、Channel、Queue 為 DatabaseConsumer 各欄位賦值。
-
第 29 行創建 Queue 時用到了一個 Queue 的名稱,這個 Queue 是用來做持久化的,它是眾所周知的,它的名稱存放在 queuetools.go 文件里:
-
第 31 行就是監聽數據源被發現的事件,回調函數的參數類型是空介面(其實就是事件的名稱)。在回調函數內,調用我隨後要建立的 SubscribeToDataEvent 方法,把 eventData 轉化為字元串傳遞進去。
下面看一看 SubscribeToDataEvent 方法:
-
該方法的參數是事件的名稱。
-
第 39 行,對已註冊的監聽器進行遍歷,如果傳進來的事件名稱已註冊,return 即可。
-
否則的話,需要註冊這個數據源,這個事件的名稱是 MessageReceived_+Queue 的名稱。
-
第 45 行的回調函數,我將傳入一個立即執行的匿名函數,它會返回我們真正需要使用的回調函數,也就是閉包。這種做法的好處就是返回的函數可以捕獲其被定義的作用域內的變數,這樣的話真正的回調函數就可以擁有一些可持續的「狀態」(也就是 prevTime 和 buf)。這裡我的需求是至少要間隔 5 秒鐘以上,才記錄一次(到資料庫)。
-
回調函數內其它的邏輯都很簡單,就不逐行介紹了。
-
第 67 行,發布消息使用的是 Default Exchange,並路由到持久化的那個 Queue。
修改 queuelistener.go 裡面的構造函數
讓其傳入 EventAggregator 作為參數並賦值給 QueueListener 的 ea 欄位。
修改協調器的 main 函數
-
創建包級共享的 DatabaseConsumer 變數,在 main 里用構造函數進行創建並賦值。
-
創建 EventAggregator,並傳遞給 DatabaseConsumer 和 QueueListener,讓他們共享同一個 EventAggregator。