RabbitMQ之消息模式簡單易懂,超詳細分享~~~

前言

上一篇對RabbitMQ的流程和相關的理論進行初步的概述,如果小夥伴之前對消息隊列不是很了解,那麼在看理論時會有些困惑,這裡以消息模式為切入點,結合理論細節和程式碼實踐的方式一起來學習。

正文

常用的模式有Simple、Work、Fanout、Direct、Topic、Headers,可以通過設置交換機類型和配置參數來實現各個模式;接下來就分別進行實操演示吧。

以下演示都是通過管理員的帳號進行。其實每種模式其實很大一部分操作都是一樣的,所以公共部分不會重複截圖說明,不過會針對不同的配置進行說明。

1. 簡單模式(Simple)

簡單模式顧名思義就是簡單,不用配置太多的東西,如下圖所示:

上圖解析:

  • P:表示生產者,負責推送消息;

  • C:表示消費者,負責接收消息;

  • 中間紅色部分:代表的是隊列(Queue);

小夥伴可能會奇怪,這裡沒有交換機嗎?

其實是有的,上一篇說流程的時候,消息肯定是要通過交換機轉發到隊列中的,這裡沒有指定,那是因為用到了默認的交換機,具體看以下演示。

1.1 Web管理介面進行演示

對於Web介面演示來說,只需要將消息能生產、投遞、消費即可,我們不用去弄一個生產者和消費者,生產者和消費者都是業務處理邏輯用的,所以通常都是根據業務需求就行實現的;話不多說開始演示吧。

根據上圖所示,我們只需要創建一個隊列即可,然後就可以進行消息模擬發送和消費了。

此時並沒有指定交換機綁定,點擊隊列名看詳情中的Bindings,有一個默認的交換機已經和隊列進行綁定

隊列詳情頁面的說明,在上篇文章中就已經標註了,這裡就不再贅述。

有了綁定關係之後,就可以在默認的交換機頁面開始模擬轉發消息;首先進入Exchanges管理頁面,點擊默認交換機(AMQP default)進入詳情開始發布消息:

消息發送成功之後就會在隊列介面看到消息情況:

隊列裡面有了消息之後,就可以模擬消費者進行消息消費,點擊隊列名進入詳情,可在詳情也模擬消費:

如上所示,簡單模式整個消費流程就通過Web頁面模擬完了。但在消費消息時,提供了Ack Mode模式(消息確認模式)選擇來進行消費,可選擇的模式如下:

  • Nack message requeue true:獲取消息,但是不會向Server做ack應答確認(即不告訴伺服器消息被消費了),消息重新入隊。即隊列中的消息不會被刪除掉;
  • Automatic Ack:獲取消息,向Server做應答確認(即會告訴伺服器消息被消費了),消息不重新入隊,將會從隊列中刪除;
  • Reject requeue true:拒絕獲取消息(即拒絕處理消息),消息重新入隊;
  • Reject requeue false:拒絕獲取消息(即拒絕處理消息),消息不重新入隊,將會被刪除;

到這關於簡單模式下的介面演示就結束了,其中描述的細節內容是共用的,在其他模式下的操作也類似,後續不做重複說明。

1.2 程式碼進行演示

這裡就用控制台的方式,一步一步的實現。這裡需要引入Nuget包:RabbitMQ.Client。生產者的整體程式碼如下:

接下來就一步一步來調試,看看消息是怎麼一步一步發出去的;

  • 創建連接

    剛開始沒有任何連接,如下:

    程式碼繼續下一步,連接就有了:

    此時就可以理解為網路連接上了,但通道還沒有創建出來,如下:

  • 根據連接創建通道

    通道根據連接進行創建,目的是為了提高傳輸效率,共用一個連接,不然頻繁的創建和銷毀連接會佔資源,影響性能

  • 定義隊列

    有連接和通道之後理論就可以直接發消息了,但直接通訊會相互依賴比較強,達不到解耦合的效果。所以需要定義一個隊列將消息存放到裡面,客戶端想用了自己來消費就行,另外隊列還可以達到一定的削峰作用

    創建隊列的時候需要傳幾個參數,分別意思如下:

    參數1:queue, 隊列的名稱;

    參數2:durable, 隊列是否持久化;如果為true,伺服器重啟之後隊列還在,不會被清除;否則就被清掉。

    參數3:exclusive ,是否排他,即是否私有的,如果為true,會對當前隊列加鎖,其他的通道不能訪問,並且連接自動關閉;

    參數4:autoDelete, 是否自動刪除,當最後一個消費者斷開連接之後是否自動刪除消息;

    參數5:arguments, 用來設置隊列附加參數,如設置隊列的有效期、隊列的消息生命周期、消息的最大長度等;

  • 發送消息

    經過以上步驟,就可以發送消息了,如上沒有定義交換機,那就是綁定了默認交換機。

    發送時的幾個參數意思如下:

    參數1:exchange,交換機,這裡沒有指定交換機。

    參數2:routingKey,路由key,即指定隊列,簡單模式下,路由key默認就是隊列名

    參數3:basicProperties, 配置其他相關屬性

    參數4:body ,需要發送的消息內容

以上的生產者完成了,現在再來一個消費者演示一下消息消費,消費者整體程式碼如下:

效果如下:

在整個過程中,還是會先建立連接,創建通道,指定隊列; 不同的是增加對接收數據的處理。

是不是用起來比較簡單方便,主要是在複雜項目中,消息隊列的作用真的很大。來,接著往下說說其他模式。

2. 工作模式(Work)

工作模式是考慮到多個消費者情況下,消息如何被消費的,主要有兩種方案,輪詢分發和公平分發;

  • 輪詢分發:消費者依次輪著消費消息,直到消息消費完為止,按均分配。
  • 公平分發:根據消費者能力進行分發,即處理快的消費就多,處理慢的就消費就少,能者多勞。

上圖解析:

  • P:表示生產者,負責推送消息;
  • C1、C2:表示多個消費者,都可以從同一個隊列接收消息;
  • 中間紅色部分:代表的是隊列(Queue);

這兩種方式需要多個消費者,在介面不太好模擬,所以就直接上程式碼演示了。

2.1 輪詢分發

在簡單模式基礎上稍微改動一下程式碼即可。在生產者中發多條消息出來,然後啟用多個消費者就可以進行模擬如下:

生產者程式碼整體如下:

消費者程式碼整體如下:

兩個消費者的程式碼都是一樣,沒有變動; 兩個消費者的都是在消費同一個隊列的消息,可以先啟動兩個消費者,然後在啟動生產者,效果如下:

由上可見,默認情況下其實採用的是輪詢方式。

2.2 公平分發

在實際業務中,有些業務處理比較耗時,有些處理耗時不長,如上案例,假如奇數消息需要處理比較耗時,那麼對應的消費者就壓力比較大。這種情況可以通過公平分發的方式進行業務處理,處理快點的就多處理點。

如何才能知道消費者處理業務完成呢?消費者處理完成之後主動上報是最好不過的,所以只需要在消費者端將自動確認機制改為手動確認即可,即:業務處理完成之後,手動上報確認狀態。

生產者的程式碼不需要變動,只需要稍微改改消費者程式碼即可,這裡模擬兩個不同處理能力的消費者:

  • 消費者1處理一條消息業務需要500毫秒;
  • 消費者2處理一條消息業務需要2秒;

消費者1整體程式碼如下:

消費者2整體程式碼如下,主要是模擬時間不一樣:

先啟動兩個消費者,再啟動生產者,看看消費情況,如下:

以上演示就是公平分發模式的演示,其中有兩個關鍵的步驟:

  • 設置每次消費的消息條數,可以根據實際業務情況配置,這裡設置的是每次取一條。通過 channel.BasicQos進行設置。
  • 將消息確認模式改為自動模式,這樣就可以根據實際業務處理情況回饋確認資訊,伺服器就會將消息處理掉,即刪除消息。所以一般在實際業務場景大都會推薦使用手動確認的方式,這樣避免業務未處理導致消息就被伺服器給清掉的情況。

3. 發布訂閱模式(Fanout)

Fanout模式是一種發布訂閱模式,是一種廣播機制,不需要指定路由Key。這種模式的交換機就會將消息廣播到綁定的所有隊列上去,只要有消費者訂閱對應的隊列,就會收到消息。如下圖:

上圖解析:

  • P:表示生產者,負責推送消息;
  • X:表示交換機,圖中表示一個交換機綁定了多個隊列;
  • C1、C2:表示多個消費者,都可以從同一個隊列接收消息;
  • 中間紅色部分:代表的是隊列(Queue);

在這種模式下,消息會一次性被多個消費者消費。

3.1 Web管理介面進行演示
  • 先創建一個Fanout模式的交換機;

  • 創建兩個隊列(根據實際需要創建多個),並將隊列綁定到上一步創建的交換機上;

    這裡演示就創建兩個隊列,分別是FanoutQ1和FanoutQ2

    在隊列詳情頁或交換機詳情頁都可以進行交換機和隊列的綁定,這裡分別在FanoutQ1和FanoutQ2隊列詳情頁進行綁定,如下:

    同樣的方式綁定FanoutQ2, 綁定完成之後,可以在交換機詳情頁看到對應的綁定隊列:

  • 通過交換機上投遞消息,看效果;

    此時通過FanoutExchange交換機投遞消息,綁定到此交換機上的隊列都能收到:

    查看隊列消息情況,可以看到兩個隊列都接收到消息了。

3.2 程式碼進行演示

其實程式碼和操作Web一樣,只是用程式碼實現生產者和消費者而已。

  • 生產者的程式碼

    因為Fanout交換機不用關注RoutingKey,所以在發布消息時,第二個參數不需要傳遞RoutingKey。

  • 消費者1的程式碼

    消費者比較關注的是交換機需要和生產者指定的是同一個,隊列和交換機有綁定。

  • 消費者2的程式碼,其實和消費者1基本一樣,只是定義的隊列不一樣

  • 先啟動消費者,然後啟動生產者,看效果

    這裡是控制台程式,為了顯示方便就先啟動消費者,後期的生產者,實際應用場景先啟動誰都行。

4. 路由模式(Direct)

Direct模式是在Fanout基礎增加RoutingKey條件, 即交換機不會將消息現全部投遞到所有隊列,而是只投遞到對應RoutingKey下的隊列。如圖:

上圖解析:

  • P:表示生產者,負責推送消息;
  • X:表示交換機,指定類型為direct,圖中表示一個交換機綁定了多個隊列;
  • 中間箭頭上的error、info、warning代表具體的RoutingKey;
  • C1、C2:表示多個消費者,都可以從同一個隊列接收消息;
  • 中間紅色部分:代表的是隊列(Queue);

Direct模式其實在實際應用場景中用的比較多的,默認的Exhanges也是Direct模式, 很多關於消息隊列的框架,默認也是採用這種模式,主要原因是根據RoutingKey精確的處理對應的業務,不會由於考慮不周到,導致消息處理有不確定性,性能相對也不錯

4.1 Web管理介面進行演示
  • 先創建一個Direct模式的交換機;

  • 創建兩個隊列,然後將隊列綁定到上一步創建的交換機上,並指定對應的RoutingKey;

    創建隊列完成之後,還需要綁定到交換機上,上一種模式演示的是從隊列詳情中維護綁定關係,這次從交換機詳情中進行演示,如下:

    綁定第1個隊列,指定RoutingKey是order,模擬處理訂單的:

    綁定第2個隊列,指定RoutingKey是msg,模擬處理消息的:

    有了綁定關係,就可以測試驗證了。

    註:這裡的RoutingKey可以根據實際情況隨意指定的。

  • 向交換機上投遞消息,看效果;

    模擬發布一個RoutingKey為order的消息:

    再發布一個消息,指定RoutingKey為msg,如下:

    兩個消息都發布成功,而是指定對應的RoutingKey進行投遞,所以現在綁定到此交換機上兩個隊列中分別有一條數據,查看隊列的消息概況:

    可以進入隊列詳情,通過GetMessage獲取到具體的消息內容,這裡就不截圖了,上面已經演示過。

4.2 程式碼進行演示

在Fanout的程式碼基礎上稍微改動即可,主要改動點就是改變交換機的類型,並在隊列和交換機綁定時設置對應的RoutingKey,發布消息的時候指定交換機和RoutingKey。

  • 生產者程式碼

  • 消費者1程式碼,主要是綁定隊列時指定的RoutingKey為order

  • 消費者2程式碼,主要是綁定隊列時指定的RoutingKey為msg

  • 運行起來看效果:

    如上演示效果,和Web演示一樣,只有精確匹配到RoutingKey才能消費到對應的消息數據。

5. 主題模式(Topic)

Topic模式是在Direct模式基礎增加模糊匹配RoutingKey,Direct精確匹配RoutingKey,Topic可以通*或#進行模糊匹配,從而把消息投遞到對應的隊列中,如圖:

上圖解析:

  • P:表示生產者,負責推送消息;
  • X:表示交換機,指定類型為topic,圖中表示一個交換機綁定了多個隊列;
  • 中間箭頭上的文字代表模糊匹配的RoutingKey;其中*表示匹配RoutingKey中的一個詞,#號表示匹配RoutingKey的零個或多個詞,匹配符需要與點號(.)搭配使用。 *.orange.test.# 示例中orange算一個詞,test算一個詞,即通過點號(.)分開的就稱為一個詞。
  • C1、C2:表示多個消費者,都可以從同一個隊列接收消息;
  • 中間紅色部分:代表的是隊列(Queue);
5.1 Web管理介面進行演示
  • 先創建一個Topic模式的交換機;

  • 創建兩個隊列,然後將隊列綁定到上一步創建的交換機上,並指定對應的RoutingKey;

    將隊列綁定到交換機上,這裡還是在交換機詳情中進行演示,如下:

    同樣的步驟分別對TopicQ1和TopicQ2進行規則綁定,如下:

    現在的TopicExchange的綁定關係如下:

    有了關係之後就可以進行驗證效果了。

  • 向交換機上投遞消息,會根據RoutingKey的模糊匹配規則將消息投遞到對應的隊列中,看效果;

    先指定order.create.test發布消息,看看會匹配哪些隊列:

    TopicQ2接收到消息,匹配到路由規則order.#

    再指定RoutingKey 為order.update 發布一個消息,如下:

    TopicQ1和TopicQ2都收到消息了,匹配到路由規則order.#和order.*,如下:

    再指定RoutingKey 為order發布一個消息,就會匹配到order.#,這裡就不截圖了。

    以上測試說明:在Topic類型交換機和隊列綁定關係時,可以指定RoutingKey的匹配規則,星號、#號、點號搭配使用,其中*表示匹配RoutingKey中的一個詞,#號表示匹配RoutingKey的零個或多個詞

    更多情況,小夥伴們自己動手試試。

5.2 程式碼進行演示

在Direct的程式碼基礎上稍微改動即可,主要改動點就是改變交換機的類型,並在隊列和交換機綁定時設置對應的RoutingKey,這裡的RoutingKey是一個規則,是星號、#號、點號和每個詞的組合,發布消息的時候指定交換機和RoutingKey,RoutingKey會去匹配綁定的規則。

  • 生產者程式碼

  • 消費者1程式碼,指定路由匹配規則為order.#

  • 消費者2程式碼,指定路由匹配規則為order.*

  • 演示效果,將生產者和消費者都啟動

    如上圖,和Web演示一樣,#號匹配0個和多個詞,*號只能匹配一個詞。 符號可以與詞任意組合,小夥伴可以根據業務情況自行發揮。

6. 參數模式(Headers)

Headers模式不是通過RoutingKey進行匹配投遞消息,而是匹配請求頭中所帶的鍵值進行消息投遞,所以創建隊列是需要設置綁定的頭部資訊,有兩種模式:全部匹配和部分匹配。

  • 全部匹配:x-match=all,表示所有的鍵值都匹配了才行。
  • 部分匹配:x-match=any,表示只要其中有鍵值對匹配就行。
5.1 Web管理介面進行演示
  • 先創建一個Headers模式的交換機;

  • 創建兩個隊列,然後將隊列綁定到上一步創建的交換機上,可以指定Headers的參數;

    將隊列綁定到交換機上,這裡還是在交換機詳情中進行演示,如下:

    這裡不使用RoutingKey的方式,而是通過設置參數的形式進行綁定,後續投遞消息的時候就匹配參數,如果能匹配上,就將消息投遞到對應的隊列。

    綁定HeaderQ1隊列:

    同樣的方式綁定HeaderQ2隊列,只是只添加了一個鍵值對,order:111,最後HeaderExchange交換機的綁定關係如下:

    關係綁定好之後就可以進行測試效果了。

  • 向交換機上投遞消息,會根據檢查Headers參數的條件是否符合,若符合將消息投遞到對應的隊列中,看效果;

    設置兩個參數進行發布,如下:

    可以看到兩個隊列都匹配到了,因為order和msg鍵值對匹配到HeaderQ1,order的鍵值對匹配到HeaderQ2,如果只設置一個order簡直對呢:

    此時只有HeaderQ2才能精確匹配,HeaderQ1沒有全部匹配,所以對應隊列沒有收到消息,如下:

    由此可見,在介面上沒有指定x-match綁定的話,默認是all,就是要全部匹配才投遞消息到對應隊列

    這裡繼續新增一個HeaderQ3的隊列,創建方式和上面不一樣,只是在綁定交換機的時候增加x-match 為 any,如下:

    綁定成功之後,現在關係如下,其中order的鍵值對是在每個綁定中都有,如下:

    測試發消息之前,把之前消息都清空了,也就是隊列中的消息都是空的,這次我們再指定order為111的參數進行發布消息,看看有哪些隊列收到消息呢:

    消息發出後,之後HeaderQ2和HeaderQ3收到消息,HeaderQ2隻有一個order參數,精確匹配上了,HeaderQ3有多個參數,但設置了x-match為any,所以只要匹配其中一個即可。 HeaderQ1多個參數需要全部匹配才行,所以沒有接收到消息:

5.2 程式碼進行演示

Headers模式是根據參數進行匹配,不是通過RoutingKey,所以只需要在綁定隊列時設置好參數,在發送消息的時候也設置好參數,這樣就會根據匹配原則去匹配參數,如果匹配上,消息就投遞到對應的隊列,供消費者進行消費。這裡為了演示比較清晰一點,使用一個生產者,三個消費者的形式進行演示。

  • 生產者

    程式碼中關鍵的部分是指定交換機的類型為Headers,然後模擬了兩類參數的形式投遞消息,方便用於測試參數匹配模式的測試。

  • 消費者1,綁定參數為order:111和msg:222

  • 消費者2,和消費者1代表基本一樣,只是綁定參數不一樣;

  • 消費者3,代表基本和消費者1一樣,只是在參數指定的時候增加了x-match來指定匹配模式,這裡指定為any,也就是只要其中有部分匹配上也可以消費到消息。

  • 演示效果,啟動生產者和消費者:

    如上圖,消費者3指定了匹配模式為部分匹配,所以可以接收到一個參數的消息,而消費者1需要精確匹配,所以不能接收到。

關於常用的消息模式就聊到這吧,小夥伴們可以根據自己的業務場景進行使用,相關演示程式碼的地址如下:

碼云://gitee.com/CodeZoe/dot-net-core-study-demo/tree/main/RabbitMQDemo

總結

RabbitMQ提供了多種模式應對各種業務,但匹配條件越是模糊或者參數化,那性能相對比較弱。再回顧一些常用的消息隊列組件,是不是很多都是默認使用Direct模式,精確匹配的RoutingKey可以針對具體不同業務處理,匹配性能也相對比較高; 當然其他模式小夥伴也可以針對業務情況進行使用。

後續的文章將繼續分享RabbitMQ消息確認機制、死信隊列、磁碟監控等相關知識點的應用,關注「Code綜藝圈」,和我一起學習吧。

Tags: