RabbitMQ 入門 (Go) – 3. 模擬傳感器,生成數據並發佈
現在,我們需要模擬傳感器,生成數據,並發佈到 RabbitMQ。
建立傳感器項目
在 GOPATH src 下建立文件夾 sensors,使用 go mod init 初始化,並創建 main.go。
同時別忘了安裝 amqp 的包:go get -u github.com/streadway/amqp
我們要生成一些模擬數據,生成數據有一定的範圍(位於一個最大值和最小值之間),如下圖:
因此,我們需要這樣幾個配置參數:
-
傳感器的名稱
-
傳感器數據的更新頻率
-
模擬生成數據的最大值
-
模擬生成數據的最小值
-
與前一次生成數據的差值的最大值(變化幅度的最大值)
設置命令行參數並讀取
在這個項目里,我們需要通過命令行參數來傳遞配置,並在 Go 程序裏面進行解析和讀取。我們可以使用 os.Args 來搞這些命令行參數,但是更好的辦法是使用 flag 這個包(其內部實現使用的也是 os.Args)。
我們先看代碼:
-
第 5-9 行,我們聲明了 5 個命令行參數。都是使用 flag 包下相應的函數實現的。
-
這幾個命令行參數分別表示傳感器名稱、模擬數據的更新頻率、模擬數據的最大值、最小值以及變化幅度的最大值。
-
這些命令行參數的類型分別是 string,uint,float64,float64,float64。
-
這些函數的參數都類似:
-
第一個參數是命令行參數的名稱
-
第二個參數是命令行參數的默認值
-
第三個參數是參數的描述/幫助
-
在 main 函數里,我們調用 flat.Parse() 函數,就可以將命令行的參數值解析到 5-9 行聲明變量裏面。
我們測試一下,命令行輸入 go run . –help,其結果如下:
生成模擬數據
要生成模擬傳感器的數據,需要使用到 math/rand 和 time 這兩個包。
先看代碼:
-
第 17 行,我們需要一個 *rand.Rand 類型來生成隨機數,它又需要一個源,這裡使用 time.Now().UnixNano() 生成源,這樣做的好處是因為這個時間納秒數永遠不會重複。
-
第 19 行,聲明 value,它表示傳感器的數值,在這先生成一個初始值。
-
第 20 行,是額定值,在這裡也就是最大值最小值的中間平均值。
-
第 25 行,把更新頻率(每秒更新的次數)轉化為了兩次更新之間的時間間隔(毫秒),並解析成 time.Duration 這個類型。
-
第 26 行,time.Tick 函數會返回一個 time 的 Channel,該函數會按照提供的時間間隔不斷觸發,並向這個 Channel 發送當前時間。
-
第 28 行,使用 for range 來處理 signal 這個 Channel,每次 Channel 中有數據傳遞過來,我們就使用 calcValue 這個函數來生成新的模擬數據。
-
第 29 行,把生成的最新數據打印一下即可。
calcValue 函數
生成模擬數據的邏輯是如果數據偏離額定值,那麼盡量讓下次生成的值向額定值靠攏。
這部分可根據自己的特定需求來實現,不必和我的相同。
先看代碼:
-
第 35 行,聲明了 maxStep 和 minStep 兩個變量,表示本次更新相比上次所能夠發生的最大變化和最小變化幅度。
-
第 36 – 42 行,區分當前值大於額定值或小於額定值兩種情況,按不同的邏輯得出 maxStep 和 minStep
-
第 44 行,使用 maxStep 和 minStep 以及隨機數生成新的 value 數據。
運行 sensors 項目
使用 go run . 運行,命令行參數使用默認值即可:
一切正常的話,它就會每秒鐘生成 5 次數據。
如何運行多個傳感器
生產環境中,通常會接收來自多個傳感器的數據。
這裡,我們讓每個傳感器都設置自己的路由 Key,所以 RabbitMQ 將會為每個 Key 創建一個 Queue:
但是這也會引起問題,就是之前章節裏面的那個協調程序如何發現這些傳感器呢?
首先,我們可以讓每個傳感器使用路由 Key 向一個所有傳感器和協調程序都知曉的路徑中發送一個消息。但這隻能解決問題的一半,另一半我們以後再說。
將傳感器數據發佈到 RabbitMQ
創建傳感器的消息類型
這裡會使用到 encoding/gob 包。
看代碼:
-
在 sensors 包中創建 model 包,並建立 models.go 文件。
-
在 models.go 的第 12 行,建立 SensorMessage 作為傳感器傳遞消息的類型,裏面包含三個字段分別是傳感器名稱、數值和時間戳。
-
很顯然我們不能把 Go 的 struct 類型直接扔到 RabbitMQ 裏面,但我們項目中的各種客戶端只涉及到 Go 語言,所以在這裡我使用 Go 語言的 gob 來對消息進行編碼,這樣會更高效一些。如果這個項目是跨語言的我可能會使用 JSON 或 Protocol Buffers。
-
在 model 包的 init 函數裏面,需要使用 gob 包的 Register 函數把將要編碼的類型進行註冊,這樣依賴於這個包的其它 Go 程序就可以把 SensorMessage 這個類型的消息對象發送過去了
建立 Queue 相關的工具包
建立 tools 包,並建立 queuetools.go 文件,其內容如下:
代碼內容與之前的項目類似,就不解釋了。
發佈傳感器數據到 RabbitMQ
這裡還會使用到 bytes 包。
回到 main.go,修改代碼:
-
前面添加了獲取 Channel 和 Queue 的代碼。其中第 37 行比較重要,因為我們不能保證在程序運行時,使用 Queue 名稱作為路由 Key 的 Queue 存在,而使用 GetQueue 函數,就可以保證這個 Queue 會被正確的設置,並準備好被我們使用了。
-
第 42 行,使用 bytes 包創建了一個 *bytes.Buffer,它用來來承載編碼後的數據,這個 Buffer 可以重複利用,所以實在 for range 的外部聲明的。
-
但是每次使用 Buffer 都需要進行重置,也就是第 53 行的作用,這樣以前的數據就會被移除,Buffer 的指針會回到初始位置。
-
第 43 行,使用 gob 和 Buffer 來創建編碼器 。
-
第 54 行,使用 編碼器的 Encode 方法對消息進行編碼。
-
第 56 行,創建要發送給 RabbitMQ 的消息(amqp.Publishing 類型),這裡只需要填寫 Body 字段即可,其它的字段根據自己的需求選填即可。
-
第 60 行,使用 Channel 來發佈消息,這裡使用的是默認的 Exchange,路由 key 就是 Queue 的名字,最後一個參數就是發佈的消息。
運行程序
運行 sensors 包:
打開控制台:
可以看到發送頻率確實是每秒 5 次。
打開 sensor Queue:
目前已經有 384 條消息了,都沒有被發送。
隨便點開一個消息查看其內容:
可以看到 Body 應該是 Base64 編碼的。因為 gob 編碼器使用的是二進制消息格式,儘可能的高效,所以在控制台裏面它沒有一個有意義的表述展示。
然後,先停止運行程序。
傳感器上線時通知協調程序
最後我們就來處理上面那個問題:當傳感器上線的時候,得讓協調程序知道,並發送數據。
因為每個傳感器都創建了一個自己的 Queue,所以在沒有幫助的情況下,協調程序將無法有效知道這些傳感器。
這個問題實際上具體需要做兩件事,我們先來做第一件事:
多個傳感器他們 Queue 的名稱是不一樣的,是動態的,所以我們需要一個大家都知道的 Queue,它用來將每個新創建的傳感器的 Queue 名稱發送給協調程序。
首先,在 queuetools.go 裏面添加這個 Queue 的名稱,使用一個常量保存:
然後,在 main.go 里,使用這個名稱創建一個 Queue,並將傳感器的 Queue 的名稱發佈上去:
再次運行 sensor 包
打開控制台:
可以看到 SensorList Queue 出現了。
進入到 SensorList Queue,看它的 Message:
可以看到當前這一個傳感器的名字 sensor 就在裏面。