RabbitMQ 入門 (Go) – 2. 發布和接收消息

本文我將使用 Go 語言在 RabbitMQ 上發布和接收消息。 

Go 的標準庫本身並沒有 RabbitMQ 的原生綁定,但是有一個第三方庫確能夠支援 RabbitMQ,它的源碼在 //github.com/streadway/amqp ,其文檔在 //pkg.go.dev/github.com/streadway/amqp 

 

發布消息到 RabbitMQ 

建立一個 Go 的項目,並使用 go mod init 進行初始化: 

mod init 
go: creating new go.mod: module demo 
main .go

 

使用 go get -u github.com/streadway/amqp 命令來安裝這個庫: 

get —u github . com/streadway/amqp 
go: downloading github . com/streadway/amqp vl.e.e 
go: github . com/streadway/amqp upgrade vl.e.e

 

獲取 Queue 

程式碼如下: 

x 
ma ln.go 
-GO main.go > getQueue 
1 
2 
3 
4 
5 
6 
7 
8 
9 
10 
11 
12 
13 
14 
15 
16 
17 
18 
19 
20 
21 
22 
23 
24 
25 
26 
27 
28 
29 
30 
31 
32 
33 
package main 
import ( 
" fmt " 
"log" 
" github.com/streadway/amqp" 
func main() { } 
func failOnError(err 
error, 
if 
err 
log. Fatalf("%s: %s", 
string) { 
msg 
err, msg) 
panic(fmt.Sprintf( "%s: , 
err, msg)) 
func getQueue() (*amqp.Connection, *amqp.Channel, *amqp.Queue) { 
amqp. Dial( " amqp : //guestölocalhost : 5672 " ) 
conn, err 
failOnError(err, 
"Failed to connect to RabbitMQ") 
conn. Channel() 
ch, err 
failOnError(err, 
"Failed to open a channel") 
ch . hello " , 
q, err 
false, 
false, 
false, 
false, 
nil) 
failOnError(err, 
"Failed to declare a queue") 
conn, ch, 
return
  1. 首先導入需要的包,主要是 streadway/amqp 

  2.  12 行,編寫處理錯誤的函數 failOnError 

  3.  19 行,編寫可以獲得 AMQP ConnectionChannelQueue 的幫助函數 getQueue() 

我們知道我們需要將消息發布到 Exchange 上面,但是如果使用默認 Exchange 的話,就可以使用一個捷徑:我們可以將消息直接發送到 Queue 的名稱上(但實際並不是直接發送到 Queue 上面) 

  1. getQueue() 函數不用任何參數,它返回三個對象: 

    1. *amqp.Connection 表示應用和 RabbitMQ 之間的網路連接 

    2. *amqp.Channel 位於 Connection 之上,它提供了用於雙方通訊的通道。通過把 Connection  Channel 分開,客戶應用中就可以在同一個 Connection 上擁有多個 Channel 用來通訊,這樣就減少了對資源的需求。 

    3. *amqp.Queue 也就是隊列 

  2.  20 行,使用 amqp  Dial 函數可以返回一個 ConnectionDial 函數的參數是 RabbitMQ  URLURL 裡面需要包含用戶憑證。 

  3.  22 行,通過調用 Connection 對象上的 Channel 方法,創建一個 Channel 

  4.  24 行,通過調用 Channel 對象上的 QueueDeclare 方法,返回一個 Queue。注意:這個 Queue 不一定是被創建的,如果不存在指定名稱的 Queue,那麼 RabbitMQ 就會創建一個;如果存在指定名稱的 Queue,但是和指定的配置不同,那麼 RabbitMQ 就會拒絕這個請求,並拋出錯誤。 

  5. QueueDeclare 方法參數: 

    1. 第一個參數是 Queue 的名稱:我們就寫死一個 hello 

    2. 第二個參數是 durable bool,表示是否將添加到 Queue 的消息存儲在硬碟上。如果這個參數值為 true,那麼 RabbitMQ 伺服器重啟之後消息依然會存在。但是它會導致處理消息的能力明顯下降。這裡我把它設為 false 

    3. 第三個參數 autoDelete bool,它會告訴 RabbitMQ 如果消息沒有消費者應該怎麼做: 

      1. true:消息就會從 Queue 中刪除 

      2. false:將消息保留直到某個消費者前來獲取該消息。這裡我把它設為 false 

    4. 第四個參數 exclusive bool,它允許我們把這個 Queue 設置為只能從請求它的那個 Connection 上進行訪問。 

      1. 如果它為 true,但想創建一個來自其它 Connection 的同名 Queue,那麼就會報錯 

      2. 如果它是 false,那麼想創建一個來自其它 Connection 的同名 Queue 的結果就是:兩個 Connection 都連接到同一個 Queue,兩個 Connection 會共享它。這裡我把它設為 false 

    5. 第五個參數 noWait bool 

      1. 如果為 true,這個 Queue 就被認為已經在伺服器上存在了,將它返回即可,如果它不存在,那麼就會報錯 

      2. 所以這裡設置為 false,因為我要創建的 Queue 在伺服器上不存在。 

    6. 第六個參數 args amqp.Table,這個參數用於某些特定場景,例如聲明一些要被這個 Queue 匹配的 Headers,如果這個 Queue 被綁定到 Header Exchange 的話。這裡我傳的是 nil 

  6. 如果第 24 行的 QueueDeclare 方法調用成功,那麼就會得到一個綁定到 Default Exchange  Queue 

  7. 注意:Default Exchange 的類型是 Direct,也就是說任何沒有路由 Key(和 Queue 的名稱相同,在這裡就是 hello)的消息傳進來,將會被直接通過 exchange 送往輸出的 Queue 

  8.  31 行,將 3 個對象返回即可,注意 q 我們返回的是指針 

 

發布消息 

10 
11 
12 
13 
14 
15 
16 
17 
18 
19 
20 
21 
22 
23 
24 
25 
26 
27 
28 
29 
30 
31 
32 
33 
func main() { 
for { 
server() 
func server() 
getQueue( ) 
conn, ch, q 
conn. Close() 
defer 
ch . Close() 
defer 
amqp. Publishing{ 
msg 
"text/plain" , 
Content Type: 
Body : 
ch . Publish( 
q. Name , 
false, 
false, 
msg) 
[]byte( "Hello RabbitMQ"),
  1. 16 行,編寫了一個 server 函數 

  2. 17 行,通過 getQueue 來獲得 ConnectionChannel  Queue 

  3. 1819 行,按順序 defer 關閉 Connection  Channel 

  4. 21 行,創建一個消息 amqp.Publishing 結構體,很多參數都是可選的,這裡我設置兩個: 

    1. ContentType 會指明消息的類型。RabbitMQ 會把數據變成位元組流來傳輸,所以它其實並不關心消息的類型。但是如果你往同一個 Queue 發送不同類型的消息,那麼還是設置一下這個欄位比較好,便於區分消息的類型。 

    2. Body 可能是該結構體中最重要的欄位:它的類型是 Byte Slice,裡面包含著要傳送的數據。 

  5.  26 行,將消息發布到消息代理上。這裡我們使用 Channel 上的 Publish 方法,其參數有: 

    1. 第一個參數 Exchange:「」表示使用 Default Exchange,它沒有名稱 

    2. 第二個參數是路由 Key:本例中,需要把它設置為 Queue 的名稱 

    3. 第三、四各參數 mandatory boolimmediate bool:用於發生者需要確認消息是否被傳遞成功,以及什麼時候傳遞成功的。 

    4. 第五個參數就是消息本身:也就是 msg 

  6.  main 函數中調用 server 函數。這裡循環調用是為了看看 RabbitMQ 的性能,你可以只調用一次。 

 

運行程式 

運行 go run . 命令: 

C: go run .

 

打開管理控制台: 

Connections 
Overview 
v Totals 
Queued messages last minute 
10.0 k 
7.5k 
5.0 k 
2.5 k 
0.0 k 
Message rates 
300 Is 
200 Is 
100 Is 
last minute 
RabbitMQ 3.8.11 
Channels 
Exchanges: 8 
Erlang 22.3 
Exchanges 
Queues: 10 
Queues 
Ready 
Unacked 
Total 
Publish 
Publisher 
confirm 
Deliver 
(manual 
ack) 
Admin 
. 9,636 
• 9,636 
• 2101s 
0.00/s 
• 0.00/s 
Deliver 
(auto ack) 
Consumer 
Redelivered 
Memory ? 
137 Mia 
13 Gig high watæ-rrMk 
• 0.00/s 
• 0.00/s 
• 0.00/s 
Disk space 
62 GiB 
48 Mia low 
Get 
(manual 
ack) 
Get (auto 
ack) 
Get 
(empty) 
• 0.00/s 
• 0.00/s 
• 0.00/s 
disc 
Unroutable 
(return) 
Unroutable 
(drop) 
Disk read 
Disk write 
• 0.00/s 
0.00/s 
. 0.00/s 
• 0.00/s 
Global counts ? 
Connections: O 
Nodes 
Name 
Channels: O 
Consumers: O 
File descriptors 
Socket descriptors 
58893 available 
Erlang processes 
1048576 a 
Uptime 
10d Oh 
I nfo 
basic 
rss 
Reset stats 
This node 
rabbit@DESKTOP-NMTR5KP 
65336 
Churn statistics 
All nodes

 

可以看到目前有 8  Exchange10  Queue,有 9636 個消息 

 

切換到 Exchange 畫面: 

Overview 
Connections 
Channels 
Exchanges 
D Regex ? 
Queues 
Admin 
Exchanges 
All exchanges (8) 
Pagination 
Page 1 
Name 
of 1 
- Filter: 
Type 
direct 
fanout 
direct 
fanout 
headers 
headers 
topic 
topic 
Features 
Message rate in Message ra ut 
(AMQP default) 
SensorDiscoverv 
amq.direct 
amq.fanout 
amq.headers 
amq.match 
amq.rabbitmq.trace 
amq.topic 
215/s 
0.00/s 
0.00/s 
215/s 
0.00/s 
Add a new exchange

可以看到 Default Exchange 的消息速率。 

 

切換到 Queues 畫面: 

Overview 
Queues 
Connections 
Channels 
Exchanges 
D Regex ? 
Queues 
Admin 
All queues (10) 
Pagination 
Page 1 
Overview 
Name 
SensorList 
of 1 
- Filter: 
Message rates 
incoming deliver / get 
Type 
classic 
classic 
classic 
classic 
classic 
classic 
classic 
classic 
classic 
classic 
Features 
amq.gen—ZetkP61eFdklOsxGgdasw 
amq.gen-3svxjPR-ed94XPEmTTNQZg 
amq.gen-AMvrYwyt1 VxL6LQ91cbN6g 
amq.gen-fzyopkh14TJhYvNRvnG78Q 
amq.gen-hyqzs7 K9SEW9WOz3LcIaUw 
amq.gen-zJ8h6tps7fQ_LJ4ejOKD6LQ 
Sta te 
idle 
idle 
idle 
idle 
idle 
idle 
idle 
idle 
idle 
Messages 
Ready 
Unacked 
50,595 
Total 
50 595 
hello 
sensor 
Add a new queue 
HTTP API server Docs 
Tutorials 
Community Support 
Community Slack 
Commercial Support 
o.oo,'s 
0.00/s 
0.00/s 
198/s 
0.0 /s 
Plugins 
0.00/s 
0.00/s 
0.00/s 
0.00/s 
GitHub 
ack 
o.oo,'s 
0.00/s 
0.00/s 
0.00/s 
Changelog

可以看到 hello 這個 Queue 的運行資訊。 

 

打開 hello Queue 

Overview 
Connections 
RabbitMQ 3.8.11 
Channels 
Erlang 22.3 
Exchanges 
Queue hello 
Overview 
Queued messages last minute 
75 k 
25k 
Message rates last minute ? 
300 Is 
200 Is 
100 Is 
Details 
Features 
Policy 
Operator policy 
Effective policy definition 
Consumers 
. no consumers 
Bindings 
From 
Ready 
Unacked 
Total 
Publish 
runmng 
Admin 
. 69,004 
• 69,004 
• 1261s 
Messages 
Message body bytes 
Process memory 
State 
Consumers 
Consumer utilisation ? 
Routing key Arguments 
7 
Total 
69,004 
943 kiB 
54 MiB 
Ready 
69,004 
943 kiB 
Unacked 
In memory 
69,004 
943 kiB 
Persistent 
Transient, Paged Out 
(Default exchange binding) 
This queue

可以看到 hello Queue 的運行資訊。 

 

移動到下面, 

點擊 Get Messages 按鈕 

RabbitMQ 3.8.11 Erlang 22.3 
Overview 
Connections 
Channels 
Exchanges 
Admin 
Get messages 
Warning: getting messages from a queue is a destructive action. 
Ack Mode: 
Encoding: 
Messages: 
Nack message requeue true v 
Auto string / base64 v 
Get Message(s) 
Message 1 
The server reported 80252 messages remaining. 
Exchange 
Routing Key 
Redelivered 
Properties 
Payload 
14 bytes 
(AMQP default) 
hello 
content_type: text/plain 
Hello RabbitMQ 
Move messages 
To move messages, the shovel plugin must be enabled, try: 
S rabbiting-plugins enable rabbitmq_shovel 
Delete

 

就可以看到 Queue 裡面的消息。 

 

接收消息

 RabbitMQ 接收消息與向 RabbitMQ 發布消息很類似。 接收消息仍然需要 ConnectionChannelQueue,但是交互方式略有不同。 

 

我將之前的程式程式碼更新一下,以便可以在同一個 Queue 里同時發送和接收消息: 

9 
10 
11 
12 
13 
14 
15 
16 
17 
18 
19 
20 
21 
22 
23 
24 
25 
26 
27 
28 
29 
30 
31 
32 
33 
34 
35 
36 
func main() { 
go client() 
go server() 
var a string 
fmt.Scanln(8a) 
func client() { 
— getQueue() 
conn, ch, q 
conn.Close() 
defer 
ch . Close() 
defer 
msgs, err 
q. Name , 
true, 
false, 
false, 
false, 
nil) 
— ch. Consume( 
// queue string 
// consumer string 
// autoAck bool 
// exclusive bool 
// noLocdl bool 
// noWait bool 
// args amqp. Table 
failOnError(err, 
"Failed to register a consumer") 
msgs { 
for msg 
range 
log. Printf( "Receive message: 
%s\n" , 
msg. Body)
  1. 首先在 18 行建立 client 函數,用於處理接收消息的邏輯 

  2. 19 行,通過 getQueue() 獲得 ConnectionChannelQueue 

  3. 23 行,調用 Channel 上的 Consume 方法。這個方法返回一個 Go  Channel,每從伺服器接收到消息,就可以通過這個 Go Channel 獲得。 

  4. Consume 方法的參數: 

    1. 第一個參數是接收消息的 queue 的名稱,注意接收消息不需要關心 exchangeexchange 只有在發布消息時才用到。 

    2. 第二個參數 consumer string,它會唯一的識別出 Queue 對應的 Connection。它被 RabbitMQ 內部使用,來確定誰正在監聽這個 Queue。當 Direct Exchange 的多個客戶端都從同一個 Queue 接收消息的時候,這個就很重要了。RabbitMQ 會把消息分發到各個客戶端,粗略的看作是一種負載均衡。它對於想要取消連接的客戶端也很重要,這就可以讓 RabbitMQ 知道以後不要再向這個接收者發送消息了。這裡我們傳進一個「」,讓 RabbitMQ 為我們賦一個值,因為我們沒有追蹤的需求。 

    3. 第三個參數 autoAck bool,表示是否想在成功接收消息後自動確認。這個值通常設置為 true,從而讓伺服器可以立即將這個消息移除,以便節省資源。但是如果接收的消息需要做其它可能失敗的動作,例如存儲到資料庫,那麼可能最好將它設置為 false,然後手動進行確認。 

    4. 第四個參數 exclusive bool,它可以確認這個客戶端是這個 Queue 的唯一消費者。如果它設為 true,而其它客戶端已經註冊了,或稍後有其它 Queue 將要監聽這個 Queue,就會發生錯誤。這裡設為 false 

    5. 第五個參數 noLocal bool,防止 RabbitMQ 發送消息給與發送者處於同一個 Connection 的客戶端。這裡設為 false 

    6. 第六個參數 noWait bool,如果為 true,就不會等待伺服器確認請求,而會立即進行傳送。這裡設為 false 

    7. 第七個參數 args amqp.Table,對於 Queue  Server 有特定語義的參數可以放在這裡。此例中,我放的是 nil 

  5.  33 行,使用 for 循環來處理 msgs 這個 Go Channel,來接收消息。這裡在接收消息後,我只簡單的將其列印。 

  6.  1112 行,分別使用 goroutine 來運行 client  server,注意目前必須把 client 放在前面。 

  7.  15 行的目的就是讓這兩個 goroutine 保持存活。 

 

運行 

我把 server 函數做了調整,讓其持續不斷的發布消息: 

38 
39 
40 
41 
42 
43 
44 
45 
46 
47 
48 
49 
50 
51 
52 
53 
54 
55 
56 
func server() { 
getQueue( ) 
conn, ch, q 
conn. Close() 
defer 
ch . Close() 
defer 
msg 
for 
amqp. Publishing{ 
"text/plain" , 
Content Type: 
Body : 
ch . Publish( 
q. Name , 
false, 
false, 
msg) 
[]byte( "Hello RabbitMQ"),

 

然後使用 go run . 運行以後,你將看到終端里不斷在刷新這樣的消息: 

2921/93/24 
2921/93/24 
2921/93/24 
2921/93/24 
2921/93/24 
2921/93/24 
2921/93/24 
2921/93/24 
2921/93/24 
2921/93/24 
2921/93/24 
2921/93/24 
2921/93/24 
2921/93/24 
2921/93/24 
2921/93/24 
2921/93/24 
2921/93/24 
2921/93/24 
2921/93/24 
2921/93/24 
2921/93/24 
2921/93/24 
2921/93/24 
2921/93/24 
Go src- go run . 
22:26. 
22:26. 
22:26. 
22:26. 
22:26. 
22:26. 
22:26. 
22:26. 
22:26. 
22:26. 
22:26. 
22:26. 
22:26. 
22:26. 
22:26. 
22:26. 
22:26. 
22:26. 
22:26. 
22:26. 
22:26. 
22:26. 
22:26. 
22:26. 
22:26. 
• 91 
• 91 
• 91 
• 91 
• 91 
• 91 
• 91 
• 91 
• 91 
• 91 
• 91 
• 91 
• 91 
• 91 
• 91 
• 91 
• 91 
• 91 
• 91 
• 91 
• 91 
• 91 
• 91 
• 91 
• 91 
x 
Receive 
Receive 
Receive 
Receive 
Receive 
Receive 
Receive 
Receive 
Receive 
Receive 
Receive 
Receive 
Receive 
Receive 
Receive 
Receive 
Receive 
Receive 
Receive 
Receive 
Receive 
Receive 
Receive 
Receive 
Receive 
message : 
message : 
message : 
message : 
message : 
message : 
message : 
message : 
message : 
message : 
message : 
message : 
message : 
message : 
message : 
message : 
message : 
message : 
message : 
message : 
message : 
message : 
message : 
message : 
messaqe : 
Hello 
Hello 
Hello 
Hello 
Hello 
Hello 
Hello 
Hello 
Hello 
Hello 
Hello 
Hello 
Hello 
Hello 
Hello 
Hello 
Hello 
Hello 
Hello 
Hello 
Hello 
Hello 
Hello 
Hello 
Hello 
RabbitMQ 
RabbitMQ 
RabbitMQ 
RabbitMQ 
RabbitMQ 
RabbitMQ 
RabbitMQ 
RabbitMQ 
RabbitMQ 
RabbitMQ 
RabbitMQ 
RabbitMQ 
RabbitMQ 
RabbitMQ 
RabbitMQ 
RabbitMQ 
RabbitMQ 
RabbitMQ 
RabbitMQ 
RabbitMQ 
RabbitMQ 
RabbitMQ 
RabbitMQ 
RabbitMQ 
RabbitMO

 

通過管理控制台,我們可以看到發布和接收消息的速率: 

Connections 
Overview 
v Totals 
Queued messages last minute 
Message rates last minute ? 
30 k/s 
20 Vs 
10 
Global counts ? 
Channels 
Exchanges 
Queues 
Ready 
Unacked 
Total 
Publish 
Publisher 
confirm 
Deliver 
(manual 
ack) 
Admin 
. 712 
• 712 
. 21,343/s 
0.00/s 
• 0.00/s 
Deliver 
(auto ack) 
Consumer 
Redelivered 
• 21,343/s 
• 0.00/s 
• 0.00/s

 

Overview 
Connections 
Channels 
Exchanges 
D Regex ? 
Queues 
Admin 
Exchanges 
All exchanges (8) 
Pagination 
Page 1 
Name 
of 1 
- Filter: 
Type 
direct 
fanout 
direct 
fanout 
headers 
headers 
topic 
topic 
Features 
Message rate in Message rate out 
(AMQP default) 
SensorDiscoverv 
amq.direct 
amq.fanout 
amq.headers 
amq.match 
amq.rabbitmq.trace 
amq.topic 
20,939/s 
0.00/s 
0.00/s 
20,956/s 
0.00/s