RabbitMQ 入门 (Go) – 3. 模拟传感器,生成数据并发布

现在,我们需要模拟传感器,生成数据,并发布到 RabbitMQ

建立传感器项目

 GOPATH src 下建立文件夹 sensors,使用 go mod init 初始化,并创建 main.go

同时别忘了安装 amqp 的包:go get -u github.com/streadway/amqp

 

我们要生成一些模拟数据,生成数据有一定的范围(位于一个最大值和最小值之间),如下图:

 

因此,我们需要这样几个配置参数:

  1. 传感器的名称

  2. 传感器数据的更新频率

  3. 模拟生成数据的最大值

  4. 模拟生成数据的最小值

  5. 与前一次生成数据的差值的最大值(变化幅度的最大值)

 

设置命令行参数并读取

在这个项目里,我们需要通过命令行参数来传递配置,并在 Go 程序里面进行解析和读取。我们可以使用 os.Args 来搞这些命令行参数,但是更好的办法是使用 flag 这个包(其内部实现使用的也是 os.Args)。

 

我们先看代码:

  1.  5-9 行,我们声明了 5 个命令行参数。都是使用 flag 包下相应的函数实现的。

    1. 这几个命令行参数分别表示传感器名称、模拟数据的更新频率、模拟数据的最大值、最小值以及变化幅度的最大值。

    2. 这些命令行参数的类型分别是 stringuintfloat64float64float64

    3. 这些函数的参数都类似:

      1. 第一个参数是命令行参数的名称

      2. 第二个参数是命令行参数的默认值

      3. 第三个参数是参数的描述/帮助

  2.  main 函数里,我们调用     flat.Parse() 函数,就可以将命令行的参数值解析到 5-9 行声明变量里面。

 

我们测试一下,命令行输入 go run . –help,其结果如下:

 

生成模拟数据

要生成模拟传感器的数据,需要使用到 math/rand  time 这两个包。

先看代码:

  1.  17 行,我们需要一个 *rand.Rand 类型来生成随机数,它又需要一个源,这里使用 time.Now().UnixNano() 生成源,这样做的好处是因为这个时间纳秒数永远不会重复。

  2.  19 行,声明 value,它表示传感器的数值,在这先生成一个初始值。

  3.  20 行,是额定值,在这里也就是最大值最小值的中间平均值。

  4.  25 行,把更新频率(每秒更新的次数)转化为了两次更新之间的时间间隔(毫秒),并解析成 time.Duration 这个类型。

  5.  26 行,time.Tick 函数会返回一个 time  Channel,该函数会按照提供的时间间隔不断触发,并向这个 Channel 发送当前时间。

  6.  28 行,使用 for range 来处理 signal 这个 Channel,每次 Channel 中有数据传递过来,我们就使用 calcValue 这个函数来生成新的模拟数据。

  7.  29 行,把生成的最新数据打印一下即可。

 

calcValue 函数

生成模拟数据的逻辑是如果数据偏离额定值,那么尽量让下次生成的值向额定值靠拢。

这部分可根据自己的特定需求来实现,不必和我的相同。

先看代码:

  1.  35 行,声明了 maxStep  minStep 两个变量,表示本次更新相比上次所能够发生的最大变化和最小变化幅度。

  2.  36 – 42     行,区分当前值大于额定值或小于额定值两种情况,按不同的逻辑得出 maxStep  minStep

  3.  44 行,使用 maxStep      minStep 以及随机数生成新的 value 数据。

 

运行 sensors 项目

使用 go run . 运行,命令行参数使用默认值即可:

一切正常的话,它就会每秒钟生成 5 次数据。

 

如何运行多个传感器

生产环境中,通常会接收来自多个传感器的数据。

这里,我们让每个传感器都设置自己的路由 Key,所以 RabbitMQ 将会为每个 Key 创建一个 Queue

但是这也会引起问题,就是之前章节里面的那个协调程序如何发现这些传感器呢?

首先,我们可以让每个传感器使用路由 Key 向一个所有传感器和协调程序都知晓的路径中发送一个消息。但这只能解决问题的一半,另一半我们以后再说。

 

将传感器数据发布到 RabbitMQ

创建传感器的消息类型

这里会使用到 encoding/gob 包。

看代码:

  •  sensors 包中创建 model 包,并建立 models.go 文件。

  • 在 models.go 的第 12 行,建立 SensorMessage 作为传感器传递消息的类型,里面包含三个字段分别是传感器名称、数值和时间戳。

  • 很显然我们不能把 Go  struct 类型直接扔到 RabbitMQ 里面,但我们项目中的各种客户端只涉及到 Go 语言,所以在这里我使用 Go 语言的 gob 来对消息进行编码,这样会更高效一些。如果这个项目是跨语言的我可能会使用 JSON  Protocol Buffers

  •  model 包的 init 函数里面,需要使用 gob 包的 Register 函数把将要编码的类型进行注册,这样依赖于这个包的其它 Go 程序就可以把     SensorMessage 这个类型的消息对象发送过去了

 

建立 Queue 相关的工具包

建立 tools 包,并建立 queuetools.go 文件,其内容如下:

代码内容与之前的项目类似,就不解释了。

 

发布传感器数据到 RabbitMQ

这里还会使用到 bytes 包。

回到 main.go,修改代码:

  1. 前面添加了获取 Channel  Queue 的代码。其中第 37 行比较重要,因为我们不能保证在程序运行时,使用 Queue 名称作为路由 Key  Queue 存在,而使用 GetQueue 函数,就可以保证这个 Queue 会被正确的设置,并准备好被我们使用了。

  2.  42 行,使用 bytes 包创建了一个 *bytes.Buffer,它用来来承载编码后的数据,这个 Buffer 可以重复利用,所以实在 for range 的外部声明的。

    1. 但是每次使用 Buffer 都需要进行重置,也就是第 53 行的作用,这样以前的数据就会被移除,Buffer 的指针会回到初始位置。

  3.  43 行,使用 gob  Buffer 来创建编码器 

  4.  54 行,使用 编码器的 Encode 方法对消息进行编码。

  5.  56 行,创建要发送给 RabbitMQ 的消息(amqp.Publishing 类型),这里只需要填写 Body 字段即可,其它的字段根据自己的需求选填即可。

  6.  60 行,使用 Channel 来发布消息,这里使用的是默认的 Exchange,路由 key 就是 Queue 的名字,最后一个参数就是发布的消息。

 

运行程序

运行 sensors 包:

 

打开控制台:

可以看到发送频率确实是每秒 5 次。

 

打开 sensor Queue

目前已经有 384 条消息了,都没有被发送。

 

随便点开一个消息查看其内容:

可以看到 Body 应该是 Base64 编码的。因为 gob 编码器使用的是二进制消息格式,尽可能的高效,所以在控制台里面它没有一个有意义的表述展示。

 

然后,先停止运行程序。

 

传感器上线时通知协调程序

最后我们就来处理上面那个问题:当传感器上线的时候,得让协调程序知道,并发送数据。

因为每个传感器都创建了一个自己的 Queue,所以在没有帮助的情况下,协调程序将无法有效知道这些传感器。

这个问题实际上具体需要做两件事,我们先来做第一件事:

多个传感器他们 Queue 的名称是不一样的,是动态的,所以我们需要一个大家都知道的 Queue,它用来将每个新创建的传感器的 Queue 名称发送给协调程序。

 

首先,在 queuetools.go 里面添加这个 Queue 的名称,使用一个常量保存:

 

然后,在 main.go 里,使用这个名称创建一个 Queue,并将传感器的 Queue 的名称发布上去:

 

再次运行 sensor 

打开控制台:

可以看到 SensorList Queue 出现了。

 

进入到 SensorList Queue,看它的 Message

 

可以看到当前这一个传感器的名字 sensor 就在里面。