RabbitMQ 入門 (Go) – 3. 模擬傳感器,生成數據並發佈

現在,我們需要模擬傳感器,生成數據,並發佈到 RabbitMQ

建立傳感器項目

 GOPATH src 下建立文件夾 sensors,使用 go mod init 初始化,並創建 main.go

同時別忘了安裝 amqp 的包:go get -u github.com/streadway/amqp

 

我們要生成一些模擬數據,生成數據有一定的範圍(位於一個最大值和最小值之間),如下圖:

 

因此,我們需要這樣幾個配置參數:

  1. 傳感器的名稱

  2. 傳感器數據的更新頻率

  3. 模擬生成數據的最大值

  4. 模擬生成數據的最小值

  5. 與前一次生成數據的差值的最大值(變化幅度的最大值)

 

設置命令行參數並讀取

在這個項目里,我們需要通過命令行參數來傳遞配置,並在 Go 程序裏面進行解析和讀取。我們可以使用 os.Args 來搞這些命令行參數,但是更好的辦法是使用 flag 這個包(其內部實現使用的也是 os.Args)。

 

我們先看代碼:

  1.  5-9 行,我們聲明了 5 個命令行參數。都是使用 flag 包下相應的函數實現的。

    1. 這幾個命令行參數分別表示傳感器名稱、模擬數據的更新頻率、模擬數據的最大值、最小值以及變化幅度的最大值。

    2. 這些命令行參數的類型分別是 stringuintfloat64float64float64

    3. 這些函數的參數都類似:

      1. 第一個參數是命令行參數的名稱

      2. 第二個參數是命令行參數的默認值

      3. 第三個參數是參數的描述/幫助

  2.  main 函數里,我們調用     flat.Parse() 函數,就可以將命令行的參數值解析到 5-9 行聲明變量裏面。

 

我們測試一下,命令行輸入 go run . –help,其結果如下:

 

生成模擬數據

要生成模擬傳感器的數據,需要使用到 math/rand  time 這兩個包。

先看代碼:

  1.  17 行,我們需要一個 *rand.Rand 類型來生成隨機數,它又需要一個源,這裡使用 time.Now().UnixNano() 生成源,這樣做的好處是因為這個時間納秒數永遠不會重複。

  2.  19 行,聲明 value,它表示傳感器的數值,在這先生成一個初始值。

  3.  20 行,是額定值,在這裡也就是最大值最小值的中間平均值。

  4.  25 行,把更新頻率(每秒更新的次數)轉化為了兩次更新之間的時間間隔(毫秒),並解析成 time.Duration 這個類型。

  5.  26 行,time.Tick 函數會返回一個 time  Channel,該函數會按照提供的時間間隔不斷觸發,並向這個 Channel 發送當前時間。

  6.  28 行,使用 for range 來處理 signal 這個 Channel,每次 Channel 中有數據傳遞過來,我們就使用 calcValue 這個函數來生成新的模擬數據。

  7.  29 行,把生成的最新數據打印一下即可。

 

calcValue 函數

生成模擬數據的邏輯是如果數據偏離額定值,那麼盡量讓下次生成的值向額定值靠攏。

這部分可根據自己的特定需求來實現,不必和我的相同。

先看代碼:

  1.  35 行,聲明了 maxStep  minStep 兩個變量,表示本次更新相比上次所能夠發生的最大變化和最小變化幅度。

  2.  36 – 42     行,區分當前值大於額定值或小於額定值兩種情況,按不同的邏輯得出 maxStep  minStep

  3.  44 行,使用 maxStep      minStep 以及隨機數生成新的 value 數據。

 

運行 sensors 項目

使用 go run . 運行,命令行參數使用默認值即可:

一切正常的話,它就會每秒鐘生成 5 次數據。

 

如何運行多個傳感器

生產環境中,通常會接收來自多個傳感器的數據。

這裡,我們讓每個傳感器都設置自己的路由 Key,所以 RabbitMQ 將會為每個 Key 創建一個 Queue

但是這也會引起問題,就是之前章節裏面的那個協調程序如何發現這些傳感器呢?

首先,我們可以讓每個傳感器使用路由 Key 向一個所有傳感器和協調程序都知曉的路徑中發送一個消息。但這隻能解決問題的一半,另一半我們以後再說。

 

將傳感器數據發佈到 RabbitMQ

創建傳感器的消息類型

這裡會使用到 encoding/gob 包。

看代碼:

  •  sensors 包中創建 model 包,並建立 models.go 文件。

  • 在 models.go 的第 12 行,建立 SensorMessage 作為傳感器傳遞消息的類型,裏面包含三個字段分別是傳感器名稱、數值和時間戳。

  • 很顯然我們不能把 Go  struct 類型直接扔到 RabbitMQ 裏面,但我們項目中的各種客戶端只涉及到 Go 語言,所以在這裡我使用 Go 語言的 gob 來對消息進行編碼,這樣會更高效一些。如果這個項目是跨語言的我可能會使用 JSON  Protocol Buffers

  •  model 包的 init 函數裏面,需要使用 gob 包的 Register 函數把將要編碼的類型進行註冊,這樣依賴於這個包的其它 Go 程序就可以把     SensorMessage 這個類型的消息對象發送過去了

 

建立 Queue 相關的工具包

建立 tools 包,並建立 queuetools.go 文件,其內容如下:

代碼內容與之前的項目類似,就不解釋了。

 

發佈傳感器數據到 RabbitMQ

這裡還會使用到 bytes 包。

回到 main.go,修改代碼:

  1. 前面添加了獲取 Channel  Queue 的代碼。其中第 37 行比較重要,因為我們不能保證在程序運行時,使用 Queue 名稱作為路由 Key  Queue 存在,而使用 GetQueue 函數,就可以保證這個 Queue 會被正確的設置,並準備好被我們使用了。

  2.  42 行,使用 bytes 包創建了一個 *bytes.Buffer,它用來來承載編碼後的數據,這個 Buffer 可以重複利用,所以實在 for range 的外部聲明的。

    1. 但是每次使用 Buffer 都需要進行重置,也就是第 53 行的作用,這樣以前的數據就會被移除,Buffer 的指針會回到初始位置。

  3.  43 行,使用 gob  Buffer 來創建編碼器 

  4.  54 行,使用 編碼器的 Encode 方法對消息進行編碼。

  5.  56 行,創建要發送給 RabbitMQ 的消息(amqp.Publishing 類型),這裡只需要填寫 Body 字段即可,其它的字段根據自己的需求選填即可。

  6.  60 行,使用 Channel 來發佈消息,這裡使用的是默認的 Exchange,路由 key 就是 Queue 的名字,最後一個參數就是發佈的消息。

 

運行程序

運行 sensors 包:

 

打開控制台:

可以看到發送頻率確實是每秒 5 次。

 

打開 sensor Queue

目前已經有 384 條消息了,都沒有被發送。

 

隨便點開一個消息查看其內容:

可以看到 Body 應該是 Base64 編碼的。因為 gob 編碼器使用的是二進制消息格式,儘可能的高效,所以在控制台裏面它沒有一個有意義的表述展示。

 

然後,先停止運行程序。

 

傳感器上線時通知協調程序

最後我們就來處理上面那個問題:當傳感器上線的時候,得讓協調程序知道,並發送數據。

因為每個傳感器都創建了一個自己的 Queue,所以在沒有幫助的情況下,協調程序將無法有效知道這些傳感器。

這個問題實際上具體需要做兩件事,我們先來做第一件事:

多個傳感器他們 Queue 的名稱是不一樣的,是動態的,所以我們需要一個大家都知道的 Queue,它用來將每個新創建的傳感器的 Queue 名稱發送給協調程序。

 

首先,在 queuetools.go 裏面添加這個 Queue 的名稱,使用一個常量保存:

 

然後,在 main.go 里,使用這個名稱創建一個 Queue,並將傳感器的 Queue 的名稱發佈上去:

 

再次運行 sensor 

打開控制台:

可以看到 SensorList Queue 出現了。

 

進入到 SensorList Queue,看它的 Message

 

可以看到當前這一個傳感器的名字 sensor 就在裏面。