RabbitMQ 入门 (Go) – 2. 发布和接收消息

本文我将使用 Go 语言在 RabbitMQ 上发布和接收消息。 

Go 的标准库本身并没有 RabbitMQ 的原生绑定,但是有一个第三方库确能够支持 RabbitMQ,它的源码在 //github.com/streadway/amqp ,其文档在 //pkg.go.dev/github.com/streadway/amqp 

 

发布消息到 RabbitMQ 

建立一个 Go 的项目,并使用 go mod init 进行初始化: 

mod init 
go: creating new go.mod: module demo 
main .go

 

使用 go get -u github.com/streadway/amqp 命令来安装这个库: 

get —u github . com/streadway/amqp 
go: downloading github . com/streadway/amqp vl.e.e 
go: github . com/streadway/amqp upgrade vl.e.e

 

获取 Queue 

代码如下: 

x 
ma ln.go 
-GO main.go > getQueue 
1 
2 
3 
4 
5 
6 
7 
8 
9 
10 
11 
12 
13 
14 
15 
16 
17 
18 
19 
20 
21 
22 
23 
24 
25 
26 
27 
28 
29 
30 
31 
32 
33 
package main 
import ( 
" fmt " 
"log" 
" github.com/streadway/amqp" 
func main() { } 
func failOnError(err 
error, 
if 
err 
log. Fatalf("%s: %s", 
string) { 
msg 
err, msg) 
panic(fmt.Sprintf( "%s: , 
err, msg)) 
func getQueue() (*amqp.Connection, *amqp.Channel, *amqp.Queue) { 
amqp. Dial( " amqp : //guestölocalhost : 5672 " ) 
conn, err 
failOnError(err, 
"Failed to connect to RabbitMQ") 
conn. Channel() 
ch, err 
failOnError(err, 
"Failed to open a channel") 
ch . hello " , 
q, err 
false, 
false, 
false, 
false, 
nil) 
failOnError(err, 
"Failed to declare a queue") 
conn, ch, 
return
  1. 首先导入需要的包,主要是 streadway/amqp 

  2.  12 行,编写处理错误的函数 failOnError 

  3.  19 行,编写可以获得 AMQP ConnectionChannelQueue 的帮助函数 getQueue() 

我们知道我们需要将消息发布到 Exchange 上面,但是如果使用默认 Exchange 的话,就可以使用一个捷径:我们可以将消息直接发送到 Queue 的名称上(但实际并不是直接发送到 Queue 上面) 

  1. getQueue() 函数不用任何参数,它返回三个对象: 

    1. *amqp.Connection 表示应用和 RabbitMQ 之间的网络连接 

    2. *amqp.Channel 位于 Connection 之上,它提供了用于双方通信的通道。通过把 Connection  Channel 分开,客户应用中就可以在同一个 Connection 上拥有多个 Channel 用来通信,这样就减少了对资源的需求。 

    3. *amqp.Queue 也就是队列 

  2.  20 行,使用 amqp  Dial 函数可以返回一个 ConnectionDial 函数的参数是 RabbitMQ  URLURL 里面需要包含用户凭证。 

  3.  22 行,通过调用 Connection 对象上的 Channel 方法,创建一个 Channel 

  4.  24 行,通过调用 Channel 对象上的 QueueDeclare 方法,返回一个 Queue。注意:这个 Queue 不一定是被创建的,如果不存在指定名称的 Queue,那么 RabbitMQ 就会创建一个;如果存在指定名称的 Queue,但是和指定的配置不同,那么 RabbitMQ 就会拒绝这个请求,并抛出错误。 

  5. QueueDeclare 方法参数: 

    1. 第一个参数是 Queue 的名称:我们就写死一个 hello 

    2. 第二个参数是 durable bool,表示是否将添加到 Queue 的消息存储在硬盘上。如果这个参数值为 true,那么 RabbitMQ 服务器重启之后消息依然会存在。但是它会导致处理消息的能力明显下降。这里我把它设为 false 

    3. 第三个参数 autoDelete bool,它会告诉 RabbitMQ 如果消息没有消费者应该怎么做: 

      1. true:消息就会从 Queue 中删除 

      2. false:将消息保留直到某个消费者前来获取该消息。这里我把它设为 false 

    4. 第四个参数 exclusive bool,它允许我们把这个 Queue 设置为只能从请求它的那个 Connection 上进行访问。 

      1. 如果它为 true,但想创建一个来自其它 Connection 的同名 Queue,那么就会报错 

      2. 如果它是 false,那么想创建一个来自其它 Connection 的同名 Queue 的结果就是:两个 Connection 都连接到同一个 Queue,两个 Connection 会共享它。这里我把它设为 false 

    5. 第五个参数 noWait bool 

      1. 如果为 true,这个 Queue 就被认为已经在服务器上存在了,将它返回即可,如果它不存在,那么就会报错 

      2. 所以这里设置为 false,因为我要创建的 Queue 在服务器上不存在。 

    6. 第六个参数 args amqp.Table,这个参数用于某些特定场景,例如声明一些要被这个 Queue 匹配的 Headers,如果这个 Queue 被绑定到 Header Exchange 的话。这里我传的是 nil 

  6. 如果第 24 行的 QueueDeclare 方法调用成功,那么就会得到一个绑定到 Default Exchange  Queue 

  7. 注意:Default Exchange 的类型是 Direct,也就是说任何没有路由 Key(和 Queue 的名称相同,在这里就是 hello)的消息传进来,将会被直接通过 exchange 送往输出的 Queue 

  8.  31 行,将 3 个对象返回即可,注意 q 我们返回的是指针 

 

发布消息 

10 
11 
12 
13 
14 
15 
16 
17 
18 
19 
20 
21 
22 
23 
24 
25 
26 
27 
28 
29 
30 
31 
32 
33 
func main() { 
for { 
server() 
func server() 
getQueue( ) 
conn, ch, q 
conn. Close() 
defer 
ch . Close() 
defer 
amqp. Publishing{ 
msg 
"text/plain" , 
Content Type: 
Body : 
ch . Publish( 
q. Name , 
false, 
false, 
msg) 
[]byte( "Hello RabbitMQ"),
  1. 16 行,编写了一个 server 函数 

  2. 17 行,通过 getQueue 来获得 ConnectionChannel  Queue 

  3. 1819 行,按顺序 defer 关闭 Connection  Channel 

  4. 21 行,创建一个消息 amqp.Publishing 结构体,很多参数都是可选的,这里我设置两个: 

    1. ContentType 会指明消息的类型。RabbitMQ 会把数据变成字节流来传输,所以它其实并不关心消息的类型。但是如果你往同一个 Queue 发送不同类型的消息,那么还是设置一下这个字段比较好,便于区分消息的类型。 

    2. Body 可能是该结构体中最重要的字段:它的类型是 Byte Slice,里面包含着要传送的数据。 

  5.  26 行,将消息发布到消息代理上。这里我们使用 Channel 上的 Publish 方法,其参数有: 

    1. 第一个参数 Exchange:“”表示使用 Default Exchange,它没有名称 

    2. 第二个参数是路由 Key:本例中,需要把它设置为 Queue 的名称 

    3. 第三、四各参数 mandatory boolimmediate bool:用于发生者需要确认消息是否被传递成功,以及什么时候传递成功的。 

    4. 第五个参数就是消息本身:也就是 msg 

  6.  main 函数中调用 server 函数。这里循环调用是为了看看 RabbitMQ 的性能,你可以只调用一次。 

 

运行程序 

运行 go run . 命令: 

C: go run .

 

打开管理控制台: 

Connections 
Overview 
v Totals 
Queued messages last minute 
10.0 k 
7.5k 
5.0 k 
2.5 k 
0.0 k 
Message rates 
300 Is 
200 Is 
100 Is 
last minute 
RabbitMQ 3.8.11 
Channels 
Exchanges: 8 
Erlang 22.3 
Exchanges 
Queues: 10 
Queues 
Ready 
Unacked 
Total 
Publish 
Publisher 
confirm 
Deliver 
(manual 
ack) 
Admin 
. 9,636 
• 9,636 
• 2101s 
0.00/s 
• 0.00/s 
Deliver 
(auto ack) 
Consumer 
Redelivered 
Memory ? 
137 Mia 
13 Gig high watæ-rrMk 
• 0.00/s 
• 0.00/s 
• 0.00/s 
Disk space 
62 GiB 
48 Mia low 
Get 
(manual 
ack) 
Get (auto 
ack) 
Get 
(empty) 
• 0.00/s 
• 0.00/s 
• 0.00/s 
disc 
Unroutable 
(return) 
Unroutable 
(drop) 
Disk read 
Disk write 
• 0.00/s 
0.00/s 
. 0.00/s 
• 0.00/s 
Global counts ? 
Connections: O 
Nodes 
Name 
Channels: O 
Consumers: O 
File descriptors 
Socket descriptors 
58893 available 
Erlang processes 
1048576 a 
Uptime 
10d Oh 
I nfo 
basic 
rss 
Reset stats 
This node 
rabbit@DESKTOP-NMTR5KP 
65336 
Churn statistics 
All nodes

 

可以看到目前有 8  Exchange10  Queue,有 9636 个消息 

 

切换到 Exchange 画面: 

Overview 
Connections 
Channels 
Exchanges 
D Regex ? 
Queues 
Admin 
Exchanges 
All exchanges (8) 
Pagination 
Page 1 
Name 
of 1 
- Filter: 
Type 
direct 
fanout 
direct 
fanout 
headers 
headers 
topic 
topic 
Features 
Message rate in Message ra ut 
(AMQP default) 
SensorDiscoverv 
amq.direct 
amq.fanout 
amq.headers 
amq.match 
amq.rabbitmq.trace 
amq.topic 
215/s 
0.00/s 
0.00/s 
215/s 
0.00/s 
Add a new exchange

可以看到 Default Exchange 的消息速率。 

 

切换到 Queues 画面: 

Overview 
Queues 
Connections 
Channels 
Exchanges 
D Regex ? 
Queues 
Admin 
All queues (10) 
Pagination 
Page 1 
Overview 
Name 
SensorList 
of 1 
- Filter: 
Message rates 
incoming deliver / get 
Type 
classic 
classic 
classic 
classic 
classic 
classic 
classic 
classic 
classic 
classic 
Features 
amq.gen—ZetkP61eFdklOsxGgdasw 
amq.gen-3svxjPR-ed94XPEmTTNQZg 
amq.gen-AMvrYwyt1 VxL6LQ91cbN6g 
amq.gen-fzyopkh14TJhYvNRvnG78Q 
amq.gen-hyqzs7 K9SEW9WOz3LcIaUw 
amq.gen-zJ8h6tps7fQ_LJ4ejOKD6LQ 
Sta te 
idle 
idle 
idle 
idle 
idle 
idle 
idle 
idle 
idle 
Messages 
Ready 
Unacked 
50,595 
Total 
50 595 
hello 
sensor 
Add a new queue 
HTTP API server Docs 
Tutorials 
Community Support 
Community Slack 
Commercial Support 
o.oo,'s 
0.00/s 
0.00/s 
198/s 
0.0 /s 
Plugins 
0.00/s 
0.00/s 
0.00/s 
0.00/s 
GitHub 
ack 
o.oo,'s 
0.00/s 
0.00/s 
0.00/s 
Changelog

可以看到 hello 这个 Queue 的运行信息。 

 

打开 hello Queue 

Overview 
Connections 
RabbitMQ 3.8.11 
Channels 
Erlang 22.3 
Exchanges 
Queue hello 
Overview 
Queued messages last minute 
75 k 
25k 
Message rates last minute ? 
300 Is 
200 Is 
100 Is 
Details 
Features 
Policy 
Operator policy 
Effective policy definition 
Consumers 
. no consumers 
Bindings 
From 
Ready 
Unacked 
Total 
Publish 
runmng 
Admin 
. 69,004 
• 69,004 
• 1261s 
Messages 
Message body bytes 
Process memory 
State 
Consumers 
Consumer utilisation ? 
Routing key Arguments 
7 
Total 
69,004 
943 kiB 
54 MiB 
Ready 
69,004 
943 kiB 
Unacked 
In memory 
69,004 
943 kiB 
Persistent 
Transient, Paged Out 
(Default exchange binding) 
This queue

可以看到 hello Queue 的运行信息。 

 

移动到下面, 

点击 Get Messages 按钮 

RabbitMQ 3.8.11 Erlang 22.3 
Overview 
Connections 
Channels 
Exchanges 
Admin 
Get messages 
Warning: getting messages from a queue is a destructive action. 
Ack Mode: 
Encoding: 
Messages: 
Nack message requeue true v 
Auto string / base64 v 
Get Message(s) 
Message 1 
The server reported 80252 messages remaining. 
Exchange 
Routing Key 
Redelivered 
Properties 
Payload 
14 bytes 
(AMQP default) 
hello 
content_type: text/plain 
Hello RabbitMQ 
Move messages 
To move messages, the shovel plugin must be enabled, try: 
S rabbiting-plugins enable rabbitmq_shovel 
Delete

 

就可以看到 Queue 里面的消息。 

 

接收消息

 RabbitMQ 接收消息与向 RabbitMQ 发布消息很类似。 接收消息仍然需要 ConnectionChannelQueue,但是交互方式略有不同。 

 

我将之前的程序代码更新一下,以便可以在同一个 Queue 里同时发送和接收消息: 

9 
10 
11 
12 
13 
14 
15 
16 
17 
18 
19 
20 
21 
22 
23 
24 
25 
26 
27 
28 
29 
30 
31 
32 
33 
34 
35 
36 
func main() { 
go client() 
go server() 
var a string 
fmt.Scanln(8a) 
func client() { 
— getQueue() 
conn, ch, q 
conn.Close() 
defer 
ch . Close() 
defer 
msgs, err 
q. Name , 
true, 
false, 
false, 
false, 
nil) 
— ch. Consume( 
// queue string 
// consumer string 
// autoAck bool 
// exclusive bool 
// noLocdl bool 
// noWait bool 
// args amqp. Table 
failOnError(err, 
"Failed to register a consumer") 
msgs { 
for msg 
range 
log. Printf( "Receive message: 
%s\n" , 
msg. Body)
  1. 首先在 18 行建立 client 函数,用于处理接收消息的逻辑 

  2. 19 行,通过 getQueue() 获得 ConnectionChannelQueue 

  3. 23 行,调用 Channel 上的 Consume 方法。这个方法返回一个 Go  Channel,每从服务器接收到消息,就可以通过这个 Go Channel 获得。 

  4. Consume 方法的参数: 

    1. 第一个参数是接收消息的 queue 的名称,注意接收消息不需要关心 exchangeexchange 只有在发布消息时才用到。 

    2. 第二个参数 consumer string,它会唯一的识别出 Queue 对应的 Connection。它被 RabbitMQ 内部使用,来确定谁正在监听这个 Queue。当 Direct Exchange 的多个客户端都从同一个 Queue 接收消息的时候,这个就很重要了。RabbitMQ 会把消息分发到各个客户端,粗略的看作是一种负载均衡。它对于想要取消连接的客户端也很重要,这就可以让 RabbitMQ 知道以后不要再向这个接收者发送消息了。这里我们传进一个“”,让 RabbitMQ 为我们赋一个值,因为我们没有追踪的需求。 

    3. 第三个参数 autoAck bool,表示是否想在成功接收消息后自动确认。这个值通常设置为 true,从而让服务器可以立即将这个消息移除,以便节省资源。但是如果接收的消息需要做其它可能失败的动作,例如存储到数据库,那么可能最好将它设置为 false,然后手动进行确认。 

    4. 第四个参数 exclusive bool,它可以确认这个客户端是这个 Queue 的唯一消费者。如果它设为 true,而其它客户端已经注册了,或稍后有其它 Queue 将要监听这个 Queue,就会发生错误。这里设为 false 

    5. 第五个参数 noLocal bool,防止 RabbitMQ 发送消息给与发送者处于同一个 Connection 的客户端。这里设为 false 

    6. 第六个参数 noWait bool,如果为 true,就不会等待服务器确认请求,而会立即进行传送。这里设为 false 

    7. 第七个参数 args amqp.Table,对于 Queue  Server 有特定语义的参数可以放在这里。此例中,我放的是 nil 

  5.  33 行,使用 for 循环来处理 msgs 这个 Go Channel,来接收消息。这里在接收消息后,我只简单的将其打印。 

  6.  1112 行,分别使用 goroutine 来运行 client  server,注意目前必须把 client 放在前面。 

  7.  15 行的目的就是让这两个 goroutine 保持存活。 

 

运行 

我把 server 函数做了调整,让其持续不断的发布消息: 

38 
39 
40 
41 
42 
43 
44 
45 
46 
47 
48 
49 
50 
51 
52 
53 
54 
55 
56 
func server() { 
getQueue( ) 
conn, ch, q 
conn. Close() 
defer 
ch . Close() 
defer 
msg 
for 
amqp. Publishing{ 
"text/plain" , 
Content Type: 
Body : 
ch . Publish( 
q. Name , 
false, 
false, 
msg) 
[]byte( "Hello RabbitMQ"),

 

然后使用 go run . 运行以后,你将看到终端里不断在刷新这样的消息: 

2921/93/24 
2921/93/24 
2921/93/24 
2921/93/24 
2921/93/24 
2921/93/24 
2921/93/24 
2921/93/24 
2921/93/24 
2921/93/24 
2921/93/24 
2921/93/24 
2921/93/24 
2921/93/24 
2921/93/24 
2921/93/24 
2921/93/24 
2921/93/24 
2921/93/24 
2921/93/24 
2921/93/24 
2921/93/24 
2921/93/24 
2921/93/24 
2921/93/24 
Go src- go run . 
22:26. 
22:26. 
22:26. 
22:26. 
22:26. 
22:26. 
22:26. 
22:26. 
22:26. 
22:26. 
22:26. 
22:26. 
22:26. 
22:26. 
22:26. 
22:26. 
22:26. 
22:26. 
22:26. 
22:26. 
22:26. 
22:26. 
22:26. 
22:26. 
22:26. 
• 91 
• 91 
• 91 
• 91 
• 91 
• 91 
• 91 
• 91 
• 91 
• 91 
• 91 
• 91 
• 91 
• 91 
• 91 
• 91 
• 91 
• 91 
• 91 
• 91 
• 91 
• 91 
• 91 
• 91 
• 91 
x 
Receive 
Receive 
Receive 
Receive 
Receive 
Receive 
Receive 
Receive 
Receive 
Receive 
Receive 
Receive 
Receive 
Receive 
Receive 
Receive 
Receive 
Receive 
Receive 
Receive 
Receive 
Receive 
Receive 
Receive 
Receive 
message : 
message : 
message : 
message : 
message : 
message : 
message : 
message : 
message : 
message : 
message : 
message : 
message : 
message : 
message : 
message : 
message : 
message : 
message : 
message : 
message : 
message : 
message : 
message : 
messaqe : 
Hello 
Hello 
Hello 
Hello 
Hello 
Hello 
Hello 
Hello 
Hello 
Hello 
Hello 
Hello 
Hello 
Hello 
Hello 
Hello 
Hello 
Hello 
Hello 
Hello 
Hello 
Hello 
Hello 
Hello 
Hello 
RabbitMQ 
RabbitMQ 
RabbitMQ 
RabbitMQ 
RabbitMQ 
RabbitMQ 
RabbitMQ 
RabbitMQ 
RabbitMQ 
RabbitMQ 
RabbitMQ 
RabbitMQ 
RabbitMQ 
RabbitMQ 
RabbitMQ 
RabbitMQ 
RabbitMQ 
RabbitMQ 
RabbitMQ 
RabbitMQ 
RabbitMQ 
RabbitMQ 
RabbitMQ 
RabbitMQ 
RabbitMO

 

通过管理控制台,我们可以看到发布和接收消息的速率: 

Connections 
Overview 
v Totals 
Queued messages last minute 
Message rates last minute ? 
30 k/s 
20 Vs 
10 
Global counts ? 
Channels 
Exchanges 
Queues 
Ready 
Unacked 
Total 
Publish 
Publisher 
confirm 
Deliver 
(manual 
ack) 
Admin 
. 712 
• 712 
. 21,343/s 
0.00/s 
• 0.00/s 
Deliver 
(auto ack) 
Consumer 
Redelivered 
• 21,343/s 
• 0.00/s 
• 0.00/s

 

Overview 
Connections 
Channels 
Exchanges 
D Regex ? 
Queues 
Admin 
Exchanges 
All exchanges (8) 
Pagination 
Page 1 
Name 
of 1 
- Filter: 
Type 
direct 
fanout 
direct 
fanout 
headers 
headers 
topic 
topic 
Features 
Message rate in Message rate out 
(AMQP default) 
SensorDiscoverv 
amq.direct 
amq.fanout 
amq.headers 
amq.match 
amq.rabbitmq.trace 
amq.topic 
20,939/s 
0.00/s 
0.00/s 
20,956/s 
0.00/s