RabbitMQ 學習筆記1 – RabbitMQ簡介和AMQP詳解
- 2020 年 4 月 9 日
- 筆記
0. 背景
消息隊列(Message Queue)提供一個異步通信機制,消息的發送者不必苦苦等待着消息被處理完成,轉而繼續自己的工作。消息中間件負責處理網絡通信,如果網絡連接不可用,消息被暫存於隊列當中,當網絡暢通的時候再用。消息隊列在企業中應用很廣泛,可選擇的有ActiveMQ、RabbitMQ,Kafka,阿里巴巴自主開發RocketMQ等。本文討論 RabbitMQ 。

RabbitMQ
1.介紹
欲了解 RabbitMQ 先要了解 MQ。 RabbitMQ 是 MQ 的一種實現。
1.1 MQ 介紹
MQ(Message Queue)消息隊列,是基礎數據結構中「先進先出」的一種數據結構。一般用來解決應用解耦,異步消息,流量削鋒等問題,實現高性能,高可用,可伸縮和最終一致性架構。
它由這些組成:
- 生產者:生產者產生消息,並把消息放入隊列。
- 隊列:把要傳輸的數據(消息)放在隊列中,用隊列機制來實現消息傳遞。生產者產生消息並把消息放入隊列,然後由消費者去處理。
- 消費者:消費者可以到指定隊列拉取消息,或者訂閱相應的隊列,由MQ服務端給其推送消息。
作用:
- 解耦:對應用/模塊進行分離,引入了消息代理作為中間消息服務
- 異步:主業務執行中將消息放入MQ後不等待,從業務異步執行返回結果。
- 削峰:高並發情況下,業務異步處理,提供高峰期業務處理能力,避免系統癱瘓
RabbitMQ 是 MQ 的一種實現,下面介紹下 RabBMQ。
1.2 RabbitMQ 介紹
RabBMQ是一個廣泛部署的開源消息代理。

RabbitMQ 流式管道
特點:
- 異步消息傳遞
- 易於部署
- 支持集群,用於高可用性和吞吐量。支持分佈式部署。
- 開發者友好,支持各種流行的開發語言。比如Java,Ruby,GO。
- 方便的 管理與監控 工具
2. AMQP(高級消息隊列協議)概述
RabbitMQ 是一個實現了 AMQP協議 的工具軟件,所以 AMQP 中的概念和準則也適用於 RabbitMQ。下面重點介紹AMQP,它能幫助我們深刻的理解。
AMQP(高級消息隊列協議)是一個網絡協議。它支持符合要求的客戶端應用 和消息中間件代理之間進行通信。
為什麼會有 AMQP?
軟件系統中存在不同廠商的不兼容產品的問題,異構系統的集成是非常昂貴和複雜的。早期的消息傳遞解決方案也非常昂貴,往往專門用於大公司負擔得起。
AMQP 設計目標:
- 為消息中間件創建開放標準
- 啟用各種技術和平台之間的互操作性
AMQP 不僅使這些不同的系統能夠相互通信,而且能夠實現不同的產品。
消息中間件(經紀人)及其角色
「消息代理」 收到 「消息生產者」並將它們 「路由」 到 「消費者」。
(發佈它們的應用程序) –> 消息代理 —> (處理它們的應用程序)
2. AMQP 模型簡介
2.1 工作過程
它工作過程如下圖:
- 消息(Message)被發佈者(Publisher)發送給交換機(Exchange)
- 交換機(Exchange)可以理解成郵局,交換機將收到的消息根據路由規則分發給綁定的隊列(Queue)
- 最後,AMQP代理會將消息投遞給訂閱了此隊列的消費者(Consumer),或者消費者按照需求自行獲取。

image.png
一些其他情況: 消息屬性: 發佈消息時可以給消息指定各種消息屬性(message meta-data)。有些屬性會被消息代理(brokers)使用,有些只能被接收消息的應用所使用。
回執(acknowledgement): 網絡是不可靠的,有可能在處理消息的時候失敗。AMQP 包含了一個消息確認的概念:當一個消息成功到達消費者後(consumer),消費者會通知一下消息代理(broker),這個可以是自動的也可以由開發者執行。
當「消息確認」被啟用的時候,消息代理不會完全將消息從隊列中刪除,直到它收到來自消費者的確認回執(acknowledgement)。
無法到達 當一個消息無法被成功路由時,消息或許會被返回給發佈者並被丟棄。或者,如果消息代理執行了延期操作,消息會被放入一個所謂的死信隊列中。此時,消息發佈者可以選擇某些參數來處理這些特殊情況。
2.2 AMQP 內部模型

image.png
3. 交換機(Exchange)
交換機是要掌握的重點,這一章節重點來講。
交換機 用來傳輸消息的,交換機拿到一個消息之後將它路由給一個隊列。
它的傳輸策略是由交換機類型和被稱作綁定(bindings)的規則所決定的。
四種交換機:
Name(交換機類型) |
默認名稱 |
---|---|
Direct exchange(直連交換機) |
(空字符串) , amq.direct |
Fanout exchange(扇型交換機) |
amq.fanout |
Topic exchange(主題交換機) |
amq.topic |
Headers exchange(頭交換機) |
amq.match (and amq.headers in RabbitMQ) |
交換機狀態 交換機可以有兩個狀態:持久(durable)、暫存(transient)。持久化的交換機會在消息代理(broker)重啟後依舊存在,而暫存的交換機則不會。並不是所有的應用場景都需要持久化的交換機。
下面分別說明四種交換機類型
3.1 直連型交換機( Direct Exchange)
消息可以攜帶一個屬性 「路由鍵(routing key)」,以輔助標識被路由的方式。直連型交換機(direct exchange)根據消息攜帶的路由鍵將消息投遞給對應隊列的。
它如何工作:
- 將一個隊列綁定到某個交換機上,同時賦予該綁定(Binding)一個路由鍵(routing key)
- 當一個攜帶着路由鍵為 「key1」 的消息被發送給直連交換機時,交換機會把它路由給 「Binding名稱等於 key1」 的隊列。

直連型交換機圖例
總結: Binding 的 Routing Key 要和 消息的 Routing Key 完全匹配
3.2 扇型交換機 ( Fanout Exchange)
扇型交換機將消息路由給綁定到它身上的所有隊列,而不理會綁定的路由鍵。
如果N個隊列綁定到某個扇型交換機上,當有消息發送給此扇型交換機時,交換機會將消息的拷貝分別發送給這所有的N個隊列。扇型用來交換機處理消息的廣播路由(broadcast routing)。
案例:
- MMO遊戲可以使用它來處理排行榜更新等全局事件
- 體育新聞網站可以用它來實時地將比分更新分發給多端
- 在群聊的時候,它被用來分發消息給參與群聊的用戶。

扇型交換機圖例
總結 不管 消息的Routing Key,廣播給這個交換機下的所有綁定隊列。
3.3 主題交換機( Topic Exchanges)
主題交換機通過對消息的路由鍵
和 「綁定的主題名稱」 進行模式匹配,將消息路由給匹配成功的隊列。
它的工作方式:
- 為綁定的 Routing Key 指定一個 「主題」。模式匹配用用 *, # 等字符進行模糊匹配。比如 usa.# 表示 以 usa.開頭的多個消息 到這裡來。
- 交換機將按消息的 Routing Key 的值的不同路由到 匹配的主題隊列。
主題交換機經常用來實現各種分發/訂閱模式及其變種。主題交換機通常用來實現消息的多播路由(multicast routing)。

image.png
使用案例:
- 由多個人完成的後台任務,每個人負責處理某些特定的任務
- 股票價格更新涉及到分類或者標籤的新聞更新(
總結: 綁定 的 Routing Key 和 消息的 Routing Key 進行字符串的模糊匹配。
3.4 頭交換機 (Headers exchange)
頭交換機使用多個消息屬性來代替路由鍵建立路由規則。通過判斷消息頭的值能否與指定的綁定相匹配來確立路由規則。
在實際中並不常用。
4. 相關實體概念
4.1 隊列( Queue )
隊列 存儲着即將被應用消費掉的消息。
名稱 可以為隊列指定一個名稱。
隊列持久化
- 持久化隊列(Durable queues)會被存儲在磁盤上,當消息代理(broker)重啟的時候,它可以被重新恢復。
- 沒有被持久化的隊列稱作暫存隊列(Transient queues)
4.2 綁定(Binding)
綁定是交換機(exchange)將消息(message)路由給隊列(queue)所需遵循的規則。
如果要指示交換機「E」將消息路由給隊列「Q」,那麼「Q」就需要與「E」進行綁定。綁定操作需要定義一個可選的路由鍵(routing key)屬性給某些類型的交換機。
路由鍵的意義在於從發送給交換機的眾多消息中選擇出某些消息,將其路由給綁定的隊列。
4.3 消費者 ( Consumer )
消費者即使用消息的客戶。
消費者標識 每個消費者(訂閱者)都有一個叫做消費者標籤的標識符。它可以被用來退訂消息。
一個隊列可以註冊多個消費者,也可以註冊一個獨享的消費者(當獨享消費者存在時,其他消費者即被排除在外)。
4.4 消息確認 (acknowledgement)
什麼時候刪除消息才是正確的?有兩種情況
- 自動確認模式:當消息代理(broker)將消息發送給應用後立即刪除。
- 顯式確認模式:待應用(application)發送一個確認回執(acknowledgement)後再刪除消息。
在顯式模式下,由消費者來選擇什麼時候發送確認回執(acknowledgement)。
- 應用可以在收到消息後立即發送
- 或將未處理的消息存儲後發送
- 或等到消息被處理完畢後再發送確認回執。
如果一個消費者在尚未發送確認回執的情況下掛掉了,那代理會將消息重新投遞給另一個消費者。如果當時沒有可用的消費者了,消息代理會死等下一個註冊到此隊列的消費者,然後再次嘗試投遞。
拒絕消息
當一個消費者接收到某條消息後,處理過程有可能成功,有可能失敗。
應用可以向消息代理表明,本條消息由於「拒絕消息(Rejecting Messages)」的原因處理失敗了(或者未能在此時完成)。當拒絕某條消息時,應用可以告訴消息代理如何處理這條消息——銷毀它或者重新放入隊列。
4.5 消息 ( Message )
消息的組成:
- 消息屬性
- 消息主體(有效載荷)
消息屬性(Attributes)常見的有:
- Content type(內容類型)
- Content encoding(內容編碼)
- Routing key(路由鍵)
- Delivery mode (persistent or not) 投遞模式(持久化 或 非持久化)
- Message priority(消息優先權)
- Message publishing timestamp(消息發佈的時間戳)
- Expiration period(消息有效期)
- Publisher application id(發佈應用的ID)
消息體:
- 消息體即消息實際攜帶的數據,消息代理不會檢查或者修改有效載荷。
- 消息可以只包含屬性而不攜帶有效載荷。
- 它通常會使用類似JSON這種序列化的格式數據。
- 常常約定使用"content-type" 和 "content-encoding" 這兩個字段分辨消息。
4.5 連接 (Connection)
AMQP 連接通常是長連接。AMQP是一個使用TCP提供可靠投遞的應用層協議。AMQP使用認證機制並且提供TLS(SSL)保護。
當一個應用不再需要連接到AMQP代理的時候,需要優雅的釋放掉AMQP連接,而不是直接將TCP連接關閉。
4.6 通道 (channels)
AMQP 提供了通道(channels)來處理多連接,可以把通道理解成共享一個TCP連接的多個輕量化連接。
這可以應對有些應用需要建立多個連接的情形,開啟多個TCP連接會消耗掉過多的系統資源。
在多線程/進程的應用中,為每個線程/進程開啟一個通道(channel)是很常見的,並且這些通道不能被線程/進程共享。
通道號 通道之間是完全隔離的,因此每個AMQP方法都需要攜帶一個通道號,這樣客戶端就可以指定此方法是為哪個通道準備的。
4.7 虛擬主機 (vhost)
為了在一個單獨的代理上實現多個隔離的環境(用戶、用戶組、交換機、隊列 等),AMQP提供了一個虛擬主機(virtual hosts – vhosts)的概念。
這跟Web servers虛擬主機概念非常相似,這為AMQP實體提供了完全隔離的環境。當連接被建立的時候,AMQP客戶端來指定使用哪個虛擬主機。
5. 參考:
https://www.cnblogs.com/dwlsxj/p/RabbitMQ.html
http://rabbitmq.mr-ping.com/AMQP/AMQP_0-9-1_Model_Explained.html
https://rubydoc.info/github/ruby-amqp/amqp/master/file/docs/AMQP091ModelExplained.textile
https://www.rabbitmq.com/blog/tag/amqp-10/
https://github.com/rabbitmq/rabbitmq-amqp1.0
https://www.cnblogs.com/dwlsxj/p/RabbitMQ.html
https://www.cnblogs.com/sgh1023/p/11217017.html
END