[RabbitMQ]AMQP 0-9-1:模型

上一篇文章(RabbitMQ:下載 & 安裝)中,我們下載並且安裝了RabbitMQ,並且成功註冊了RabbitMQ服務。本文我們將學習RabbitMQ中最基礎、最重要的概念:AMQP 0-9-1協議模型。

0 前言

要學好一項技術,千萬不要死記硬背那些調用API,而是要理解它的執行邏輯。

RabbitMQ的執行邏輯本質上是它所支援的通訊協議。

RabbitMQ支援很多通訊協議,包括AMQP 0-9-1、AMQP 1.0、MQTT和STOMP等。其中,最重要、最常用的是AMQP 0-9-1(默認)。我們只需要充分理解這個協議,就能夠解決日常工作中絕大部分RabbitMQ相關的問題。

AMQP(Advanced Message Queuing Protocol,高級消息隊列協議)是二進位消息協議,即底層傳輸的消息是二進位數據。它的版本劃分方式為major-minor[-revision]major.minor[.revision]。AMQP 0-9-1是AMQP的0-9-1版本(major=0,minor=9,rivision=1)。

從本質上來看,AMQP 0-9-1具備兩個核心功能:

  • 接收消息。
  • 轉發消息。

為了實現這兩個功能,AMQP 0-9-1提出了兩方面規範:

  • 在服務端層面,提出了AMQ Model(Advanced Message Queuing Protocol
    Model,高級消息隊列協議模型):由一系列具備路由/存儲消息功能的組件,以及組件之間交互的規則組成。
  • 在網路傳輸層面,提出了在AMQ模型下客戶端與服務端以及組件之間交互的規範,具體可以分成兩部分:
    • 方法層(Functional Layer):規範了客戶端與服務端之間、AMQ模型之間的命令。
    • 傳輸層(Transport Layer):規範了二進位數據(包括命令和消息)的傳輸格式。

因此,學習AMQP 0-9-1的核心內容可以分成上述三個部分:AMQ 模型、方法和傳輸數據格式。

本文介紹的是第一部分:AMQ 模型。其餘兩部分可查看後續文章。

1 AMQ模型

再次回顧AMQ Model的定義:由一系列具備路由/存儲消息功能的組件,以及組件之間交互的規則組成。

其中,核心組件包括:

  • Exchange(交換機)。
  • Message Queue(消息隊列)。
  • Binding(綁定)

其他重要的基本概念包括:Message、Connection、Channel以及Virtual Host。

消息中間件工作方式是典型的C/S模式,通常將伺服器中運行的消息中間件程式稱為Broker,而發布和接收消息的客戶端應用程式分別稱為Publisher和Consumer。

AMQ Model的組件就工作於Broker的消息中間件(如rabbitmq-server)進程中。為了方便統一管理,在創建時需要為這些組件指定對應的Virtual Host(類似於Java中的包)。

因此,AMQ模型的結構圖以及工作流程如下:

AMQP0-9-1

  1. Publisher和Consumer使用客戶端工具(如rabbitmq的amqp-client),通過TCP/IP協議連接到指定的Broker。
  2. 服務端或客戶端創建Exchange和Queue,並使用Binding組件進行綁定。
  3. Consumer監聽Queue。
  4. Publisher指定某個Virtual Host下的Exchange和路由規則,發送消息。
  5. Broker中對應的Exchange接收到消息後,根據路由規則將消息轉發給所綁定的某個/某些Queue。
  6. Queue將消息推送給正在監聽它的Consumer。

AMQP協議的主要特點是將路由和存儲消息的功能分給交給Exchange和Message Queue組件,而在AMQP之前的消息隊列協議通常將路由和存儲消息定義在同一個功能組件中。這樣做的好處在於可以實現更靈活、多樣和健壯的消息中間件系統。

2 Exchange

2.1 工作流程

Exchange的作用是:接收從Publisher發來的消息,然後根據路由規則轉發消息給Message Queue。

需要注意的是,在這個過程中Exchange並不會保存消息。

如果能找到匹配的Message Queue,那麼消息就能夠成功轉發。但是如果匹配失敗,Exchange會根據自身是否存在替補交換機(Alternate Exchange)進行重新分發消息,也會根據Publisher發布消息是否指定為mandatory進行丟棄或返回給Publisher。

同時,為了保證數據安全,如果手動開啟了Publisher確認機制,當消息被Broker中的Exchange接收時會返回一個確認消息basic.ack,如果沒有Exchange能夠接收則會響應異常資訊。後續文章會深入討論確認機制的細節。

因此,考慮到所有常見情況,RabbitMQ中Exchange的基本工作流程是:

Exchange工作流程

  1. Broker監聽伺服器的5682(non-ssl)或5671(ssl)埠。
  2. Publisher指定Exchange和路由規則,發送消息給Broker。
  3. Broker接收到消息:
    1. 未找到Exchange:響應Channel級別異常給Publisher,終止流程。
    2. 找到Exchange:分發消息給對應的Exchange,同時響應basic.ack給Publisher,進行下一步。
  4. Exchange解析路由規則,遍歷Binding列表:
    1. 找到Message Queue:轉發消息到對應的Message Queue,終止流程。
    2. 未找到Message Queue,判斷是否存在替補交換機:
      1. 存在:重新分發消息給替補交換機,替補交換機按照自己的方式解析路由規則。
      2. 不存在,判斷mandatory屬性:
        1. false,丟棄消息,終止流程。
        2. true:返回消息給Publisher,終止流程。

2.2 交換機類型

Exchange本質上是一種路由演算法,它會分析發布消息時指定的routingKey或headers,從Binding列表中找到匹配的Queue進行轉發。

為了適應不同的路由情況,AMQP 0-9-1預先定義了幾種不同類型的Exchange,分別對應不同的路由演算法:

  • Direct Exchange
  • Faout Exchange
  • Topic Exchange
  • Headers Exchange
  • System Exchange

這些交換機類型足以滿足日常工作中的各種場景。

RabbitMQ實現了前四種,並在Broker中默認創建了如下Exchange實例:

  • (AMQP default):Direct Exchange,發布消息不指定交換機時會默認發送到該交換機。
  • amq.direct:Direct Exchange。
  • amq.faout:Faout Exchange。
  • amq.headers:Headers Exchange。
  • amq.match:Headers Exchange。
  • amq.rabbitmq.trace:Topic Exchange。
  • amq.topic:Topic Exchange。

1、Direct Exchange

Direct Exchange工作原理:

DirectExchange

  1. Queue綁定到Direct Exchange時需要指定routingKey
  2. Publisher發布消息給Direct Exchange時需要指定routingKey的值。
  3. Direct Exchange接收到消息時,會執行如下路由演算法:
    1. 讀取routingKey值。
    2. 遍歷Binding列表,找到所有routingKey匹配的Queue。
    3. 轉發消息給匹配Queue。

很明顯,上面原理圖中,Direct Exchange會將消息轉發給routingKey=red的Queue。

特別需要注意的是,Direct Exchange路由演算法的唯一指標是routingKey。同時,它會將消息轉發給所有匹配的Queue,而不是說找到了一個匹配的Queue就停止遍歷了。

RabbitMQ默認創建了名為 (空字元串)的默認Direct Exchange,它會自動與所有Queue綁定,並指定routingKey為隊列名。因此,在向默認交換機發送消息時可以直接將routingKey指定為隊列名。

此外,日常工作中也習慣將routingKey與隊列名使用相同值,所以容易引起混淆,讓人們誤以為Direct Exchange是根據隊列名進行路由的。

2、Faout Exchange

Faout Exchange工作原理:

FaoutExchange

  1. Queue綁定到Faout Exchange時不需要指定參數。
  2. Publisher發布消息給Faout Exchange時也不需要指定參數。
  3. 遍歷Binding列表,找到與之綁定的Queue。
  4. Faout Exchange會無條件將消息轉發給所有與它綁定的Queue。

很明顯,上面原理圖中,Faout Exchange會將消息同時轉發給與它綁定的三個Queue。

特別需要注意的是,Faout Exchange路由演算法沒有路由指標,它會將消息轉發給所有與它綁定的Queue。

Faout Exchange的原理與電腦網路中的組播類似,通常用於實現發布/訂閱場景。

3、Topic Exchange

Topic Exchange工作原理:

TopicExchange

  1. Queue綁定到Topic Exchange時需要指定routingKey,其值通常為以「.」分隔的多個單詞。使用通配符#*進行模糊匹配:
    1. #:匹配零或多個單詞。
    2. *:匹配一個單詞。
  2. Publisher發布消息給Topic Exchange時需要指定routingKey,其值為確定值(即沒有通配符的概念)。
  3. Topic Exchange接收到消息時,會執行如下路由演算法:
    1. 讀取routingKey值。
    2. 遍歷Binding列表,找到所有routingKey匹配的Queue。
    3. 轉發消息給匹配Queue。

很明顯,上面原理圖中,Topic Exchange會將兩條消息都轉發給routingKey=#.black.*的Queue。

特別需要注意的是,Topic Exchange路由演算法的唯一指標也是routingKey。同時,它會將消息轉發給所有匹配的Queue,而不是說找到了一個匹配的Queue就停止遍歷了。

將Direct Exchange和Topic Exchange進行對比,可以很明顯地發現:

  • Direct Exchange是低配版的Topic Exchange,routingKey與Queue之間為一對一關係:一個Queue只能接收routingKey唯一對應的消息。
  • Topic Exchange是高配版的Direct Exchange,routingKey與Queue之間為多對一關係:一個Queue可以接收多種routingKey的消息。

4、Headers Exchange

Headers Exchange工作原理:

HeadersExchange

  1. Queue綁定到Headers Exchange時需要指定arguement作為匹配條件,其值為key-value鍵值對。多個key-value鍵值對時,可以使用x-match指定多條件匹配關係:
    1. all:所有key-value鍵值對都要匹配才會進行轉發,即匹配條件之間為「且」的關係。默認值。
    2. any:只要有一個key-value鍵值對匹配就會進行轉發,即匹配條件之間為「或」的關係。
  2. Publisher發布消息給Headers Exchange時需要指定headers,此時不需要添加x-match
  3. headers Exchange接收到消息時,會執行如下路由演算法:
    1. 讀取請求headers
    2. 遍歷Binding列表,讀取綁定arguement
    3. 判斷綁定arguementx-match值:
      1. all或沒有聲明x-match:綁定arguement中所有key-value在請求headers中都存在且匹配則成功,否則失敗。
      2. any:綁定arguement中只要有一個key-value鍵值對在請求headers中存在且匹配就成功,所有綁定arguement的key-value鍵值對在請求headers中都不存在或不匹配才失敗。
    4. 轉發消息給匹配Queue。

很明顯,上面原理圖中,Headers Exchange會將消息轉發給bigOrBlackblack隊列。

5、System Exchange

System Exchange的工作原理為:

  1. Publisher向System Exchange發送routingKey=S的消息。
  2. System Exchange會將該消息轉發給名為S的系統服務。

RabbitMQ默認沒有支援該類型交換機,所以在這裡不進行過多講解。

2.3 交換機屬性

通過導出RabbitMQ的Definitions,我們可以得到Broker中的許多配置資訊,從中我們可以找到交換機數據結構的存儲格式如下:

"exchanges": [
  {
    "name": "test.exchange",
    "vhost": "/",
    "type": "direct",
    "durable": true,
    "auto_delete": false,
    "internal": false,
    "arguments": {
      "alternate-exchange": "amq.direct",
      "testArgumentsName": "testArgumentsValue"
    }
  }
]
  • exchanges:存放交換機實例的數組,內部每一個對象表示一個交換機實例。
  • name:交換機名字。
  • vhost:交換機所屬Virtual Host。
  • type:交換機類型,RabbitMQ中可選值為directfaouttopicheaders
  • durable:是否可以持久化,可選值為true(持久化)和false(非持久化)。持久化交換機會保存到本地磁碟,Broker重啟後能獲取原有交換機數據。
  • auto_delete:是否自動刪除,可選值為truefalse
    • true:當該交換機沒有與之綁定的消息隊列時,會被自動刪除。
    • false:當該交換機沒有與之綁定的消息隊列時,不會被刪除,仍然可以獨立存在。
  • internal:是否時內部使用的交換機,可選值為truefalse
    • true:內部使用交換機,Publisher不能指定發送消息給內部交換機。
    • false:外部使用交換機,Publisher可以將消息發送給外部交換機。通常我們聲明的都是外部使用交換機。
  • arguments:可選參數,內部為key-value鍵值對,可用於完成Exchange的特定功能。例如,alternate-exchange可指定替補交換機。

3 Message Queue

3.1 工作流程

Message Queue是FIFO(First In First Out,先進先出)隊列,它的作用是:

  • 接收消息(from Exchange)
  • 保存消息
  • 發送消息(to Consumer)

RabbitMQ中Message Queue的基本工作流程是:

MessageQueue工作流程

  1. Message Queue接收到Exchange轉發的消息後,將消息保存到記憶體/磁碟中。
  2. Consumer通過訂閱/拉取的方式向Message Queue獲取消息。
  3. Message Queue將隊列頭部消息複製並發送給Consumer。
  4. 在開啟手動確認模式後,Consumer會響應ack/reject/nack給Message Queue(可以在業務處理之前或之後):
    1. ack:確認接收並處理消息(Message Queue會刪除隊列中保存的該消息)。
    2. reject:拒絕一條消息。
    3. nack:拒絕一條/多條消息。
  5. 接收reject/nack響應後,Message Queue會根據響應是否requeue進行下一步處理:
    1. true:不刪除隊列中該消息,之後可以將該消息發給另外的Consumer處理。
    2. false:刪除隊列中該消息,可能會造成消息丟失。
  6. 接收reject/nack響應後,如果設置x-dead-letter-exchange還可以重新轉發給替補交換機。

3.2 消息隊列屬性

通過導出RabbitMQ的Definitions,我們可以得到Broker中的許多配置資訊,從中我們可以找到消息隊列數據結構的存儲格式如下(其中不包含消息隊列的內容):

"queues": [
    {
        "name": "testQueue",
        "vhost": "/",
        "durable": true,
        "auto_delete": false,
        "arguments": {
            "x-queue-type": "classic"
        }
    }
]
  • queues:存放消息隊列實例的數組,內部每一個對象表示一個消息隊列實例。

  • name:消息隊列名字。

  • vhost:消息隊列所屬Virtual Host。

  • durable:是否可以持久化,可選值為true(持久化)和false(非持久化)。持久化消息隊列會保存到本地磁碟,Broker重啟後能獲取原有消息隊列數據(包括其中的消息)。

  • exclusive:是否排他,可選值為truefalse

    • true:消息隊列獨屬於當前Channel,Channel關閉時消息隊列會被刪除。
    • false:消息隊列可以多個Channel共享。
  • auto_delete:是否自動刪除,可選值為truefalse

    • true:當沒有Consumer訂閱該消息隊列時,會被自動刪除。
    • false:當沒有Consumer訂閱該消息隊列時,不會被刪除,仍然可以獨立存在。
  • arguments:可選參數,內部為key-value鍵值對,可用於完成Message Queue的特定功能。例如:

    • x-message-ttl:沒有被Consumer消費的情況下,消息能夠在隊列中存活的時間(毫秒)。
    • x-expires:沒有被Consumer訂閱的情況下,消息隊列能夠存活的時間(毫秒)。
    • x-single-active-consumer:值為true/false,一次只有一個Consumer從隊列中獲取消息。
    • x-dead-letter-exchange:指定消息過期或被reject(死信)後,用來重新轉發消息的交換機的名字。
    • x-dead-letter-routing-key:死信在交換機路由過程中所使用的routingKey,如果沒有指定則使用原先的routingKey
    • x-max-length:消息隊列最大能存放消息的數量。
    • x-max-length-bytes:消息隊列最大能存放消息的位元組長度。
    • x-max-priority:消息隊列能提供的最小優先順序。如果沒有指定則不能提供消息優先順序功能。
    • x-queue-mode:是否為懶模式:
      • lazy:消息隊列會儘可能將消息保存到磁碟,而不是記憶體中。
      • default(未設置):消息隊列儘可能將消息保存到記憶體中,以實現消息的快速發送。
    • x-queue-master-locator:設置消息隊列集群中主節點的地址。
    • x-queue-type:設置消息隊列的類型,可選值為classicquorumstream

4 Binding

Exchange和Message Queue並沒有存儲對方的資訊,那麼Exchange在轉發過程中是如何找到正確的Message Queue的呢?這需要藉助Binding組件。

Binding中保存著sourcedestination屬性,可以將交換機作為消息源,交換機/消息隊列作為轉發地址。當交換機路由消息時,會遍歷Binding數組,找到source為自身的綁定關係,判斷消息屬性是否滿足routing_keyarguments進行轉發。

Binding

通過導出RabbitMQ的Definitions,我們可以得到Broker中的許多配置資訊,從中我們可以找到Binding數據結構的存儲格式如下:

"bindings": [
    {
        "source": "amq.headers",
        "vhost": "/",
        "destination": "bigAndBlue",
        "destination_type": "queue",
        "routing_key": "",
        "arguments": {
            "color": "blue",
            "size": "big",
            "x-match": "all"
        }
    }
]
  • source:數據源,只能是交換機。
  • vhost:綁定關係所屬Virtual Host。。
  • destination:數據轉發地址,可以是交換機或消息隊列。
  • destination_type:數據轉發地址類型。queue-消息隊列,exchange-交換機。
  • routing_keyroutingKey
  • arguments:路由參數。

5 Message

AMQP 0-9-1中傳輸數據的基本結構是Frame(幀),分成三類:

  • 方法幀(Method Frame),RabbitMQ伺服器會按照以下步驟執行方法:
    1. 讀取方法幀載荷。
    2. 解析載荷到對應數據結構。
    3. 校驗許可權和參數格式。
    4. 執行方法。
  • 內容幀(Content Frame),封裝具體業務消息,包含消息頭幀(content header frame)和消息體幀(content body frame)。
  • 心跳幀(Heartbeat Frame),維持TCP/IP連接的心跳數據。

6 Connection & Channel

簡單來說,Connection表示客戶端與RabbitMQ伺服器一個TCP連接。

為了對TCP連接進行多路復用,一個Connection內部可以創建多個Channel,用於不同的業務。客戶端中不同Channel在發送或接收數據時,使用的都是同一個Connection

7 Virtual Host

Virtual Host的作用類似Java中Package(包)的概念,它的作用是:建立命名空間,用來分隔交換機和消息隊列。我們可以在不同Virtual Host下聲明同名的交換機或消息隊列。

Virtual Host本質上只是一個字元串:

"vhosts": [
    {
        "name": "/"
    }
]
Tags: