消息隊列之RabbitMQ介紹與運用

RabbitMQ 說明
本章,我們主要從RabbitMQ簡介RabbitMQ安裝RabbitMQ常用命令RabbitMQ架構模式RabbitMQ使用Quick.RabbitMQPlus的使用RabbitMQ總結這幾個方面對
RabbitMQ進行介紹!

1、🍇RabbitMQ 簡介

RabbitMQ 是使用Erlang語言開發的開源消息隊列系統,基於 AMQP 協議來實現。

AMQP 的主要特徵是面向消息、隊列、路由(包括點對點和發佈/訂閱)、可靠性和安全。AMQP 協議更多用在企業系統內,對數據一致性、穩定性和可靠性要求很高的場景,對性能和吞吐量的要求還在其次。

MQ 全稱為 Message Queue,消息隊列(MQ)是一種應用程序應用程序的通信方法。應用程序通過讀寫出入隊列的消息(針對應用程序的數據)來通信,而無需專用連接來鏈接它們。

消息傳遞指的是程序之間通過在消息中發送數據進行通信,而不是通過直接調用彼此來通信,直接調用通常是用於諸如遠程過程調用的技術。排隊指的是應用程序通過隊列來通信。隊列的使用除去了接收和發送應用程序同時執行的要求。


2、🍈RabbitMQ 安裝

以下我們主要介紹 RabbitMQ 在 Windows 環境中的安裝過程。

2.1、🍉 下載 OTP

由於 RabbitMQ 使用 Erlang 技術開發,所以需要先安裝 Erlang 運行環境後,才能安裝消息隊列服務。

我們到//www.erlang.org/downloads下載相應版本的安裝包,如這裡我們下載//github.com/erlang/otp/releases/download/OTP-25.0.4/otp_win64_25.0.4.exe這個版本,如下圖所示:

erlang

2.2、🍊 下載 RabbitMQ

我們到//www.rabbitmq.com/download.html下載相應版本的安裝包,如這裡我們下載//github.com/rabbitmq/rabbitmq-server/releases/download/v3.10.7/rabbitmq-server-3.10.7.exe這個版本,如下圖所示:

rabbitmq

2.3、🍋 安裝 Erlang 和 RabbitMQ

我們在 D 盤分別創建目錄D:\Net_Program\Net_RabbitMQErlangD:\Net_Program\Net_RabbitMQ實際環境中根據自己的需求新建目錄即可),用於安裝 Erlang 和 RabbitMQ。

雙擊下載下來的 otp_win64_25.0.4.exe 和 rabbitmq-server-3.10.7.exe 進行安裝,安裝過程中將安裝目錄選擇剛才創建的目錄即可,其他按照默認設置即可。

2.4、🍌 激活 Rabbit MQ’s Management Plugin

激活步驟如下所示:

  • 以管理員身份運行 CMD

  • 然後切換到 RabbitMQ 的安裝目錄 sbin 下,D:\Net_Program\Net_RabbitMQ\rabbitmq_server-3.10.7\sbin

  • 然後輸入如下命令並執行

    rabbitmq-plugins.bat enable rabbitmq_management
    

    如出現如下所示的提示信息,原因是安裝了 Erlang 沒有重啟電腦導致的環境變量沒有生效,重啟電腦即可:

    erlang

  • 然後輸入如下命令重啟 RabbitMQ 服務

    net stop rabbitmq && net start rabbitmq
    
  • 最後即可訪問 RabbitMQ 的管理控制台了,訪問地址(默認賬戶和密碼為 guest)://localhost:15672

激活過程如下圖所示:

rabbitmq

rabbitmq

rabbitmq

2.5、🍍 遠程設置

如果希望 RabbitMQ 允許遠程連接,比如在 Windows Service2012 服務器上安裝了 RabbitMQ,其他客戶端想連接此服務器的 RabbitMQ,則需要設置防火牆開放端口。

具體設置步驟(以 Windows Service2012 為例):

  • 打開防火牆 → 入站規則 → 新建規則 → 選擇「端口」,下一步 → 選擇 TCP,並在特定本地端口中填入 15672,5671-5672,下一步 → 選擇「允許連接」,下一步 → 下一步 → 輸入名稱或描述 → 完成。

2.6、🍵 Docker 中安裝 RabbitMQ

如果你電腦上沒安裝 Docker,請先安裝 Docker,可參考:Docker 的安裝

RabbitMQ 在 Docker 中的鏡像地址://hub.docker.com/_/rabbitmq

rabbitmqhub

2.6.1、拉取鏡像容器並安裝

  • 拉取 RabbitMQ 鏡像

    以管理員身份運行 CMD,執行如下命令拉取 RabbitMQ 鏡像:

    docker pull rabbitmq
    

    rabbitmqpull

    rabbitmqpull

    拉取完成後,我們可以打開 Docker Desktop 客戶端查看就多了一個名稱為 rabbitmq 的鏡像了,如下圖所示:

    rabbitmqpull

  • 新建目錄

    D:\Net_Program\Net_Docker\RabbitMQ下分別新建DataLog文件夾,用於存放 RabbitMQ 數據和日誌:

    D:\Net_Program\Net_Docker\RabbitMQ\Data

    D:\Net_Program\Net_Docker\RabbitMQ\Log

  • 創建並啟動容器

    以管理員身份運行 CMD,執行如下命令創建並啟動容器:

    docker run -d --volume D:/Net_Program/Net_Docker/RabbitMQ/Data:/var/lib/rabbitmq --volume D:/Net_Program/Net_Docker/RabbitMQ/Log:/var/log/rabbitmq --hostname rabbitmq --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq
    

    rabbitmqrun

    :::tip 參數說明:

    完整執行命令如下:

    docker run -d --volume D:/Net_Program/Net_Docker/RabbitMQ/Data:/var/lib/rabbitmq --volume D:/Net_Program/Net_Docker/RabbitMQ/Log:/var/log/rabbitmq --hostname rabbitmq --name rabbitmq -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_VHOST=/ -e RABBITMQ_DEFAULT_USER=guest -e RABBITMQ_DEFAULT_PASS=guest rabbitmq
    
    • -d:表示在後台運行容器;

    • –volume D:/Net_Program/Net_Docker/RabbitMQ/Data:/var/lib/rabbitmq:映射 RabbitMQ 數據存儲目錄;

    • –volume D:/Net_Program/Net_Docker/RabbitMQ/Log:/var/log/rabbitmq:映射 RabbitMQ 日誌存儲目錄;

    • –name rabbitmq:設置容器名稱;

    • –hostname:指定主機名(RabbitMQ 的一個重要注意事項是它根據所謂的 節點名稱 存儲數據,默認為主機名);

    • -p:將容器的端口 5672(應用訪問端口)和 15672 (控制台 Web 端口號)映射到主機中;

    • -e:指定環境變量:

      • RABBITMQ_DEFAULT_VHOST:默認虛擬機名;

      • RABBITMQ_DEFAULT_USER:默認的用戶名;

      • RABBITMQ_DEFAULT_PASS:默認的用戶密碼;

    :::

  • 啟動 Docker 的時候自動啟動 RabbitMQ

    以管理員身份運行 CMD,執行如下命令:

    docker update rabbitmq --restart=always
    

    rabbitmqstart

2.6.2、安裝 Rabbit MQ’s Management Plugin

  • 方式 1:

    以管理員身份運行 CMD,執行如下命令先進入 RabbitMQ 容器:

    docker exec -it rabbitmq /bin/bash
    

    再執行如下命令:

    rabbitmq-plugins enable rabbitmq_management
    

    插件

    這時候在瀏覽器中打開//localhost:15672即可查看 RabbitMQ 的 Web 管理端了。

    插件

  • 方式 2:

    以管理員身份運行 CMD,執行如下命令即可:

    docker exec -it rabbitmq rabbitmq-plugins enable rabbitmq_management
    
  • 注意事項:

    當我們安裝好插件後,打開//localhost:15672Web 管理界面,使用 guest 賬戶登錄進去後,點擊Channels標籤,會出現如下圖所示的警告提醒:

    插件

    1. 以管理員身份運行 CMD,執行如下命令先進入 RabbitMQ 容器:

      docker exec -it rabbitmq /bin/bash
      
    2. 再執行如下命令切換到以下路徑:

      cd /etc/rabbitmq/conf.d/
      
    3. 再執行如下命令修改management_agent.disable_metrics_collector = false

      echo management_agent.disable_metrics_collector = false > management_agent.disable_metrics_collector.conf
      
    4. 再執行如下命令退出容器:

      exit
      
    5. 再執行如下命令重啟容器:

      docker restart rabbitmq
      

    執行命令過程:

    channels

    界面效果:

    channels


3、🥭RabbitMQ 常用命令

3.1、🍎 用戶管理

  • 增加用戶:

    rabbitmqctl add_user user1 pwd1
    
  • 刪除用戶:

    rabbitmqctl delete_user user1
    
  • 修改密碼:

    rabbitmqctl change_password user1 123456
    
  • 查看用戶列表:

    rabbitmqctl list_users
    

3.2、🍏 用戶角色

  1. 設置用戶角色:

    rabbitmqctl set_user_tags user1 Tag
    

    user1 為用戶名稱

    Tag 為角色名稱,如:administrator、monitoring、policymaker、management、impersonator

  2. 設置多個角色:

    rabbitmqctl set_user_tags user1 Tag1 Tag2
    

執行命令如下圖所示:

![ml]image

![ml]image

![ml]image


4、🍐RabbitMQ 架構模式

4.1、🍑RabbitMQ 架構模式

RabbitMQ

架構模式說明:

  1. 首先建立生產者與 MQ 之間的連接;

  2. 然後生產者將消息發送到 MQ 的交換機中;

  3. 交換機將消息分別存儲到與之綁定的隊列中;

  4. 建立消費者與 MQ 之間的連接;

  5. 消費者指定消費哪個隊列的消息;

  6. 最後隊列將消息推送給對應的消費者。

4.2、🍒RabbitMQ 中幾個核心概念

  1. Message(消息):消息是不透明的,是由一系列的可選屬性組成,如:路由鍵(RoutingKey)、相對其他消息的優先權(Priority)、指出該消息是否需要永久存儲(DeliveryMode)等;

  2. Producer(生產者):生產者是向交換機發佈消息的客戶端應用程序;

  3. Exchange(交換機):用來接受消息並將消息路由(存儲)給服務器中的隊列。交換機有四種類型,即決定消息發佈到那個隊列,具體有以下的類型:

    • Fanout:發佈訂閱(廣播模式),每個發送到 Fanout 類型的交換器消息,交換器會將消息發送到它綁定的所有隊列中,它轉發消息是最快的,也是目前使用最多的類型。

    • Direct:路由模式,路由模式下,在發佈消息時指定不同的 RouteKey,交換機會根據不同的 RouteKey 分發消息到不同的隊列中,簡單點說其實就是在 Fanout 基礎上多增加了一個 RoutingKey 條件

    • Topic:通配符模式(主題),通配符模式和路由模式其實差不多,不同之處在於通配符模式中的路由可以聲明為模糊查詢。符號#匹配一個或多個詞,符號*匹配一個詞。RabbitMQ 中通配符的通配符是用.來分割字符串的,比如a.*只能匹配到 a.b、a.c,而a.#可以匹配到 a.a.c、a.a.b。

    • Headers根據消息內容中的 Headers 屬性匹配(性能差,不實用,使用少),該模式基本不使用

  4. Queue(隊列):消息的存放容器,一個消息可以放在一個或者多個隊列中;

  5. Binding(綁定):如果想要將消息存放到具體的隊列中,就需要先將隊列和交換機進行綁定,交換機跟隊列的綁定可以是多對多的關係;

  6. Connection(連接):如一個 Tcp 連接;

  7. Channel(通道):多路復用連接中的一條獨立的雙向數據流通道,通道是建立在真實的 TCP 連接內的虛擬通道,AMQP 命令都是通過通道發出去的,不管是發佈消息、訂閱隊列還是接收消息,都是通過通道完成的,因為對於操作系統來說創建和銷毀一個 TCP 連接都是很昂貴的開銷,所以使用通道以實現復用一條 TCP 連接;

  8. Consumer(消費者):接收和消費消息的客戶端應用程序;

  9. Virtual Host(虛擬主機):即小型的 RabbitMQ 服務器,它表示一批交換器,消息隊列和相關對象,連接時必須指定,默認是:/(以路徑區分);

  10. Broker:消息隊列服務器實體。

4.3、🍓RabbitMQ 幾種模式

4.3.1、簡單隊列模式

在該模式下,不用顯示聲明交換機,只需聲明一個隊列即可。

生產者指定隊列名稱發送消息給 MQ,然後會有一個默認的交換機將消息轉發給這個隊列。

消費者只需要監聽這個隊列,一有消息就會得到通知做出響應。

如下圖所示:

簡單隊列模式

4.3.2、工作隊列模式(Work Queues)

和簡單隊列模式基本一樣,不過有一點不同,該模式有多個消費者在監聽隊列。

RabbitMQ 會以輪詢的方式將消息發給多個消費者確保一條消息只會被一個消費者消費,即:在該模式下一條消息只會被其中一個消費者消費

4.3.3、Exchange – 發佈訂閱模式(Fanout)

和上面 2 種模式默認提供交換機不同的是,該模式需要顯示聲明交換機,然後可以創建多個隊列和這個交換機進行綁定。

生產者發消息給 MQ 時需要指定交換機,然後交換機將消息轉發給與自己綁定的所有隊列.

消費者監聽指定的隊列獲得消息,每個隊列可以有多個消費者監聽,同樣也是以輪詢的機制發給消費者。

如下圖所示:

簡單隊列模式

該模式是目前使用最多的模式。

4.3.4、Exchange – 路由模式(Direct)

和發佈訂閱模式不同的是,隊列綁定交換機時需要指定一個 RoutingKey。

那麼生產者發送消息時不僅需要指定交換機還需要指定 RoutingKey。

這樣的話交換機就會把消息轉發給跟自己綁定並且 RoutingKey 相匹配的隊列。

如下圖所示:

簡單隊列模式

PS:當生產者發送了一個消息且發送的 RoutingKey 為 success 時,交換機會根據該 RoutingKey 匹配並轉發消息到 Queue1 和 Queue2,兩個隊列都滿足了路由規則;當 RoutingKey 為 error 時,僅 Queue2 滿足,則將消息轉發給 Queue2。

4.3.5、Exchange – 通配符模式(Topic)

和路由模式唯一的不同就是可以設置帶有通配符進行模糊匹配的 RoutingKey。

設定的 RoutingKey(不論是 BindingKey 還是 RoutingKey)都需要為帶.的字符串。比如 a.b、c.d.e、fff.gggg.hhhh 等,最多為 255 個位元組.

在交換機和隊列綁定時,給定的 RoutingKey 可以依照如下來設置:

  • :匹配 0~N 個單詞;

  • *:匹配 1 個單詞。

比如兩個 RoutingKey 分別為 index.和#.crt,當生產者發送消息時給定的 RoutingKey 為 index.a、index.b 或 index.c 等都滿足 index.的規則,a.crt、aa.crt 或是 b.crt 等都滿足#.crt 的規則。

如下圖所示:

簡單隊列模式


5、🥝RabbitMQ 使用

針對 RabbitMQ 的使用,這裡我們主要介紹在.Net Core(.Net6)中的簡單使用,其他平台或語言類似,僅作參考。

5.1、🍅 安裝 RabbitMQ.Client 包

我們將使用RabbitMQ.Client這個包來實現,當然也可以使用其他包,如:EasyNetQ

使用如下命令安裝 RabbitMQ.Client 包即可:

Install-Package RabbitMQ.Client -Version 6.4.0

5.2、🥑 生產者實現

首先我們定義 RabbitMQHelper.cs 幫助類,該類需要實現泛型 T 的定義(T 為發送和接收的消息實體)。

然後在該類中定義消息通道消息連接交換機名稱隊列名稱集合路由規則變量。

定義構造函數 RabbitMQHelper,並實現連接工廠的定義、消息連接的初始化、消息通道的初始化以及交換機類型(此處我們以發佈訂閱模式 Fanout 的實現為例)的定義。

注意:以下實現代碼僅僅為了展示使用說明,沒有進一步進行封裝。

具體幫助類和調用示例代碼如下所示:


using RabbitMQ.Client.Events;
using RabbitMQ.Client;
using System.Text;

namespace Quick.RabbitMQPlus
{
    public class RabbitMQHelper<T> where T : class
    {
        /// <summary>
        /// 消息通道
        /// </summary>
        readonly IModel _channel;

        /// <summary>
        /// 消息連接
        /// </summary>
        readonly IConnection _connection;

        /// <summary>
        /// 交換機名稱
        /// </summary>
        readonly string _exchangeName = "TestExchangeName";

        /// <summary>
        /// 隊列名稱集合
        /// </summary>
        readonly List<string> _queueNames = new() { "Queue1", "Queue2" };

        /// <summary>
        /// 路由規則
        /// </summary>
        string _routeKey = "TestRouteName";

        /// <summary>
        /// 構造函數
        /// </summary>
        public RabbitMQHelper()
        {
            //創建連接工廠
            var connectionFactory = new ConnectionFactory
            {
                HostName = "192.168.1.1",
                UserName = "admin",
                Password = "123456",
                Port = 5672
            };

            //創建連接
            _connection = connectionFactory.CreateConnection();

            //創建通道
            _channel = _connection.CreateModel();

            /*
             * 定義一個Fanout類型交換機:
             * 參數1:交換機名稱
             * 參數2:交換機類型
             * 參數3:是否開啟消息持久化
             * 參數4:是否設置如果這個隊列沒有其他消費者消費,隊列自動刪除
             * 參數5:指定隊列攜帶的信息
             */
            _channel.ExchangeDeclare(_exchangeName, ExchangeType.Fanout, false, false, null);
        }

        /// <summary>
        /// 發送消息
        /// </summary>
        /// <param name="data"></param>
        /// <returns></returns>
        public async Task<(bool, string)> Send(T data)
        {
            return await Task.Run(() =>
            {

                if (_channel == null)
                {
                    return (false, "RabbitMQ初始化錯誤,請檢查連接配置!");
                }

                var ret = true;
                var errMsg = string.Empty;

                try
                {
                    if (string.IsNullOrWhiteSpace(_routeKey))
                    {
                        _routeKey = _exchangeName;
                    }

                    //將多個隊列綁定到交換機上
                    foreach (var item in _queueNames)
                    {
                        /*
                         * 定義隊列:
                         * 參數1:隊列名稱
                         * 參數2:是否持久化,true為持久化。隊列會存儲到磁盤,服務器重啟時可以保證不丟失信息
                         * 參數3:是否排他,true為排他。如果一個隊列聲明為排他隊列,該隊列僅對首次聲明它的連接可見,並在連接斷開時自動刪除
                         * 參數4:是否自動刪除,true為自動刪除。自動刪除的前提是:致少有一個消費者連接到這個隊列,之後所有與這個隊列連接的消費者都斷開時,才會自動刪除
                         * 參數5:指定隊列攜帶的信息
                         */
                        _channel.QueueDeclare(item, true, false, false, null);

                        //將隊列綁定到交換機
                        _channel.QueueBind(item, _exchangeName, _routeKey, null);
                    }

                    //將實體序列化為字符串,該方法(ObjectToJson)需自己實現
                    var msg = ObjectToJson(data);

                    //將字符串轉換為byte[]
                    var msgBody = Encoding.UTF8.GetBytes(msg);

                    /*
                     * 發佈消息:
                     * 參數1:交換機名稱,如果傳"",將使用RabbitMQ默認的交換機名稱
                     * 參數2:指定路由的規則,使用具體的隊列名稱,交換機為""時,消息直接發送到隊列中
                     * 參數3:指定傳遞的消息攜帶的properties
                     * 參數4:指定傳遞的消息,byte[]類型
                     */
                    _channel.BasicPublish(_exchangeName, _routeKey, null, msgBody);
                }
                catch (Exception e)
                {
                    ret = false;
                    errMsg = e.Message;
                }

                return (ret, errMsg);
            });
        }

        /// <summary>
        /// 關閉通道和連接
        /// </summary>
        public void Close()
        {
            _channel.Close();
            _connection.Close();
        }
    }
}


//消息實體
var msgModel = new TestRabbitMQModel
{
    UserId = rand.Next(1, 9999),
    UserName = "Quber",
    UserAge = rand.Next(20, 80),
    CreateTime = DateTime.Now
};

//定義發送對象
var sendInstance = new RabbitMQHelper<TestRabbitMQModel>();

//發送消息
var sendRet = await sendInstance.Send(msgModel);

if (sendRet.Item1)
{
    //發送成功
    //……

    //消息發送完成後,關閉通道(根據實際情況自行決定要不要調用關閉方法)
    sendInstance.Close();
}
else
{
    //發送失敗
    var errMsg = $"失敗原因:{sendRet.Item2}";
}


5.3、🍆 消費者實現

消費者的實現和生產者的實現基本一樣,直接上代碼,如下所示:


using RabbitMQ.Client.Events;
using RabbitMQ.Client;
using System.Text;

namespace Quick.RabbitMQPlus
{
    public class RabbitMQHelper<T> where T : class
    {
        /// <summary>
        /// 消息通道
        /// </summary>
        readonly IModel _channel;

        /// <summary>
        /// 消息連接
        /// </summary>
        readonly IConnection _connection;

        /// <summary>
        /// 交換機名稱
        /// </summary>
        readonly string _exchangeName = "TestExchangeName";

        /// <summary>
        /// 隊列名稱集合
        /// </summary>
        readonly List<string> _queueNames = new() { "Queue1", "Queue2" };

        /// <summary>
        /// 路由規則
        /// </summary>
        string _routeKey = "TestRouteName";

        /// <summary>
        /// 構造函數
        /// </summary>
        public RabbitMQHelper()
        {
            //創建連接工廠
            var connectionFactory = new ConnectionFactory
            {
                HostName = "192.168.1.1",
                UserName = "admin",
                Password = "123456",
                Port = 5672
            };

            //創建連接
            _connection = connectionFactory.CreateConnection();

            //創建通道
            _channel = _connection.CreateModel();

            /*
             * 定義一個Fanout類型交換機:
             * 參數1:交換機名稱
             * 參數2:交換機類型
             * 參數3:是否開啟消息持久化
             * 參數4:是否設置如果這個隊列沒有其他消費者消費,隊列自動刪除
             * 參數5:指定隊列攜帶的信息
             */
            _channel.ExchangeDeclare(_exchangeName, ExchangeType.Fanout, false, false, null);
        }

        /// <summary>
        /// 接收消息
        /// </summary>
        /// <param name="queueName">隊列名稱</param>
        /// <param name="received">回調方法</param>
        /// <param name="prefetchCount">設置RabbitMQ一次最多推送多少條消息給消費者,默認為10</param>
        public async Task<(bool, string)> Receive(string queueName, Func<T?, string, Task<bool>> received, ushort prefetchCount = 10)
        {
            return await Task.Run(() =>
            {
                if (_channel == null)
                {
                    return (false, "RabbitMQ初始化錯誤,請檢查連接配置!");
                }

                var ret = true;
                var errMsg = string.Empty;

                try
                {
                    /*
                     * 設置限流機制
                     * 參數1:消息本身的大小,如果設置為0,那麼表示對消息本身的大小不限制
                     * 參數2:設置RabbitMQ一次最多推送多少條消息給消費者
                     * 參數3:是否將上面的設置應用於整個通道,false表示只應用於當前消費者
                     */
                    _channel.BasicQos(0, prefetchCount, false);

                    //創建消費者對象
                    var consumer = new EventingBasicConsumer(_channel);

                    //接收到消息事件
                    consumer.Received += async (_, ea) =>
                    {
                        //獲取消息以及反序列化為實體(JsonToObject方法需自己實現)
                        var msg = Encoding.UTF8.GetString(ea.Body.ToArray());
                        var data = JsonToObject<T>(msg);

                        var retRec = true;

                        try
                        {
                            //接收消費事件,如果返回true則代表處理成功,以便告知RabbitMQ該消息已消費並處理成功
                            retRec = await received(data, msg);
                        }
                        catch (Exception e)
                        {
                            retRec = false;
                        }

                        //業務處理成功的時候
                        if (retRec)
                        {
                            //告知RabbitMQ該消息成功處理,可以從隊列中刪除該消息了
                            _channel.BasicAck(ea.DeliveryTag, false);
                        }
                        else
                        {
                            /*
                             * 告知RabbitMQ該消息處理失敗,重新加入隊列,以便後續可再次消費該消息
                             * 參數1:
                             * 參數2:是否將該消息重新加入隊列,true為重新加入隊列
                             *
                             * 需要注意的是:
                             *     假設await received(data, msg);一直對某些消息都處理失敗(即retRec=false),
                             *     那麼這些數據(這一批次的所有數據)會重新進入隊列,並在下次重新消費,
                             *     如果業務方法received不做處理的話,有可能會造成一直循環消費該批次的消息
                             */
                            _channel.BasicReject(ea.DeliveryTag, true);
                        }
                    };

                    //啟動消費者,設置為手動應答消息
                    /*
                     * 啟動消費者:
                     * 參數1:指定要消費哪個隊列的名稱
                     * 參數2:指定是否自動告訴RabbitMQ該消息已收到
                     * 參數3:指定消息的回調
                     */
                    _channel.BasicConsume(queueName, false, consumer);
                }
                catch (Exception e)
                {
                    ret = false;
                    errMsg = e.Message;
                }

                return (ret, errMsg);
            });
        }

        /// <summary>
        /// 關閉通道和連接
        /// </summary>
        public void Close()
        {
            _channel.Close();
            _connection.Close();
        }
    }
}


//定義接收對象
var recInstance = new RabbitMQHelper<TestRabbitMQModel1>();

//接收隊列Queue1的消息
var retRec = await recInstance.Receive("Queue1", async (data, msg) =>
{
    await Task.Delay(5000);

    Console.ForegroundColor = ConsoleColor.Green;
    Console.WriteLine($"\r\n隊列1消息:{msg}");

    //返回true代表業務邏輯處理成功,會告知MQ這條消息已經接收成功,會從MQ隊列中刪除
    //返回false代表業務邏輯處理失敗,會告知MQ這條消息沒有處理成功,則MQ會繼續推送這條消息
    return true;
}, 1);

//接收消息失敗
if (!retRec.Item1)
{
    Console.ForegroundColor = ConsoleColor.Red;
    Console.WriteLine($"\r\n隊列1接收失敗:{retRec1.Item2}");
}

//關閉通道(根據實際情況自行決定要不要調用關閉方法)
recInstance.Close();



6、🥔Quick.RabbitMQPlus.Furion 的使用

為了更好更簡單的在.Net Core 中使用 RabbitMQ,特此基於RabbitMQ.Client封裝了Quick.RabbitMQPlus.FurionQuick.RabbitMQPlus組件。

  • Quick.RabbitMQPlus.Furion:依賴於.Net6+、Furion

  • Quick.RabbitMQPlus:依賴於.Net6+

Quick.RabbitMQPlus.Furion 包地址為://www.nuget.org/packages/Quick.RabbitMQPlus.Furion

關於 Quick.RabbitMQPlus.Furion 的詳細使用說明,如下所示:

6.1、🍹 更新日誌

  • 1.0.9

    • 在路由模式消費的時候,去掉了調用實例化方法Instance()必須傳入參數true的參數;

    • 提供了兩種使用方式,第一種就是v1.0.8之前的通過實例化的方式進行初始化使用,第二種就是v1.0.9之後增加了可以通過依賴注入的方式進行初始化使用(推薦使用依賴注入的方式);

    • 新增加了AddRabbitMQPlusGetInstance方法,這兩個方法分別有 2 個重載,具體說明參見Quick.RabbitMQPlus方法

    • 增加了 IQuickRabbitMQPlus 接口,如果使用依賴注入的方式,就可以實現該接口並使用其中的各個方法。

  • 1.0.8

    • 去掉了通過實體特性去配置各個屬性,目的在於簡化各個配置,都通過配置文件進行設置;

    • 在接收數據的時候,可以通過實體特性QuickRabbitMQPlusReceive去設置接收的消息隊列(如[QuickRabbitMQPlusReceive(“TestRabbitMQName1”)]),如果不要實體特性,那麼就會通過配置中的QueueNames屬性去控制;

    • 對接收數據的方法進行了重載;

    • 配置中增加了Default屬性,用於設置默認連接配置(如果配置中有多個連接配置,並且沒有一個配置中設置 Default 為 true,那麼默認會使用第一個配置);

    • 配置中增加了PrefetchCount屬性,用於全局設置 RabbitMQ 一次最多推送多少條消息給消費者,默認為 10;

    • 配置中去掉了RouteKey配置,如果使用的是路由模式,請使用RouteKeys屬性進行配置,前提是RouteKeysQueueNames集合的數量需要保持一致;

    • 實例化組件對象的時候,去掉了泛型的定義,如var sendInstance = QuickRabbitMQPlusInstance.Instance();

    • 發送和接收的方法,增加了泛型的定義,這樣做的目的是不受限於在組件實例化的時候指定只能是某個泛型,將泛型的定義設置到方法上更靈活,如同一個組件實例化對象可發送不同的泛型實體數據。

  • 1.0.7

    • 去掉了必須要設置實體特性的控制(如果沒有實體特性,那麼就需要在配置文件中將相關屬性配置齊全),默認使用的是配置中的第一個配置;

    • 在路由模式下,可指定將消息發送到對應的隊列中,需要配置QueueNamesRouteKeys的集合數量保持一一對應的關係;

    • Send 方法增加了第二個參數,路由 Key 名稱;

    • 實例化對象的方法 Instance 增加了參數Instance(bool isReceive = false),當前實例化對象是否為接收消息。

  • 1.0.6

    • 新增加了可動態切換連接的方法ChangeConn

    • 去掉了Furion的依賴;

    • 去掉了Newtonsoft.Json的依賴;

    • 同時將原來的Quick.RabbitMQPlus分為了Quick.RabbitMQPlusQuick.RabbitMQPlus.Furion這兩個版本。

6.2、🥕 Quick.RabbitMQPlus.Furion 使用說明

該組件是基於RabbitMQ.ClientFurion組件進行封裝使用的,目的在於結合.Net Core 更快、更簡單和更靈活的使用 RabbitMQ!!!

功能說明:

  • 支持發佈訂閱模式路由模式,通配符模式Headers屬性模式

  • 可根據配置文件讀取 RabbitMQ 連接的各個配置(如:RabbitMQ 服務地址、賬號、密碼和交換機名稱等);

  • 支持配置多個 RabbitMQ 的連接配置;

  • 支持動態切換 RabbitMQ 的連接配置;

  • 可根據實體定義的特性發佈和訂閱消息(已廢棄),目前只針對接收消息定義了實體特性,並且只能指定接收消息的隊列(v1.0.8 調整)

  • 支持配置將多個隊列綁定到交換機;

  • 一個消費端支持可以同時消費多個多列的消息等;

  • 支持使用同一個實體,將不同的消息發送到不同的隊列中(使用路由模式,同時在發送的時候將路由 Key 傳入);

  • 支持全局設置接收消息一次性接收多少條的配置(v1.0.8 新增)

  • 支持兩種使用方式,第一種就是v1.0.8之前的通過實例化的方式進行初始化使用,第二種就是v1.0.9之後增加了可以通過依賴注入的方式進行初始化使用(推薦使用依賴注入的方式)

6.3、🌽 安裝

安裝命令如下所示:

Install-Package Quick.RabbitMQPlus.Furion

該組件的命名空間為:Quick.RabbitMQPlus,包括Quick.RabbitMQPlus原生組件。

Quick.RabbitMQPlus.Furion

Quick.RabbitMQPlus.Furion

6.4、🌶 生產端

6.4.1、配置appsettings.json

appsettings.json配置文件中創建節點QuickRabbitMQPlus>PrefetchCountQuickRabbitMQPlusConfigs,PrefetchCount 為設置 RabbitMQ 一次最多推送多少條消息給消費者(默認為 10),QuickRabbitMQPlusConfigs 為數組類型(即可配置多個 RabbitMQ 服務地址),具體配置如下所示:

{
    "QuickRabbitMQPlus": {
        "PrefetchCount": 1,
        "QuickRabbitMQPlusConfigs": [
            {
                "Default": false,
                "ConnId": 1,
                "UserName": "quber",
                "Password": "0807_quberONE",
                "HostName": "127.0.0.1",
                "Port": 5672,
                "ExchangeType": "direct",
                "ExchangeName": "TestExchangeName",
                "QueueNames": ["TestRabbitMQName1", "TestRabbitMQName2"],
                "RouteKeys": ["TestRouteKey1", "TestRouteKey2"] //ExchangeType=direct才起作用,並且和QueueNames一一對應
                //"ExchangeDurable": true,
                //"QueueDurable": true,
                //"MessageDurable": true
            },
            //fanout模式
            {
                "Default": true,
                "ConnId": 2,
                "UserName": "quber",
                "Password": "0807_quberONE",
                "HostName": "127.0.0.1",
                "Port": 5672,
                "ExchangeType": "fanout",
                "ExchangeName": "TestExchangeName",
                "QueueNames": ["TestRabbitMQName1", "TestRabbitMQName2"],
                "RouteKeys": ["TestRouteKey1", "TestRouteKey2"] //ExchangeType=direct才起作用,並且和QueueNames一一對應
                //"ExchangeDurable": true,
                //"QueueDurable": true,
                //"MessageDurable": true
            }
        ]
    }
}

配置說明(消費端通用):

屬性名稱 屬性說明 是否必填 備註
PrefetchCount 全局設置 RabbitMQ 一次最多推送多少條消息給消費者,默認為 10 消費端才使用的屬性
Default 是否為默認連接 默認為 false
ConnId 連接 Id(請確保該 Id 的唯一性) 如果要動態切換連接配置,請確保該 Id 有值並且唯一
UserName RabbitMQ 連接賬戶
Password RabbitMQ 連接密碼
HostName RabbitMQ 連接 IP
Port RabbitMQ 連接端口 不填就是默認端口5672
ExchangeType 交換機類型(fanout:發佈訂閱模式、direct:路由模式、topic:通配符模式、headers:屬性匹配模式)
ExchangeName 交換機名稱
QueueNames 隊列名稱集合(與交換機 ExchangeName 進行綁定) 此處為集合,目的是在發佈消息時將消息存儲到該隊列集合中去
RouteKeys 路由名稱集合(或通配符名稱集合) ExchangeType=direct才起作用,並且和 QueueNames 是一一對應的關係,這樣配置目的是可以實現將消息 1 發送到隊列 1,將消息 2 發送到隊列 2
ExchangeDurable 交換機是否持久化,默認為 true 如果採用默認的設置,配置文件可以不要該屬性
QueueDurable 隊列是否持久化,默認為 true 如果採用默認的設置,配置文件可以不要該屬性
MessageDurable 消息是否持久化,默認為 true 如果採用默認的設置,配置文件可以不要該屬性

6.4.2、配置 Program.cs

由於我們使用的是Furion,因此,我們可在程序啟動文件中配置如下代碼(具體可參考Furion 入門指南),目的是註冊RabbitMQ 服務配置選項 QuickRabbitMQPlusOptions

  1. 依賴注入方式-WinForm 中使用:


    [STAThread]
    static void Main()
    {
    	ApplicationConfiguration.Initialize();
    
    	//初始化Furion
    	Serve.Run(GenericRunOptions.DefaultSilence);
    }


    public void ConfigureServices(IServiceCollection services)
    {
    	//註冊FrmMain窗體類
    	services.AddScoped<FrmMain>();
    
    	//注入IQuickRabbitMQPlus的方式
    	//通過AddRabbitMQPlus添加依賴注入
    	services.AddRabbitMQPlus();
    
    	////使用構造函數獲取實例的方式:
    	////通過AddRabbitMQPlus添加依賴注入,並註冊TestConsumerClassForDI類
    	//services.AddRabbitMQPlus<TestConsumerClassForDI>()
    
    	//DI容器生成serviceProvider
    	var serviceProvider = services.BuildServiceProvider();
    
    	//通過serviceProvider獲取MainForm的註冊實例
    	var frmMain = serviceProvider.GetRequiredService<FrmMain>();
    	//var frmMain = (FrmMain)serviceProvider.GetService(typeof(FrmMain));
    
    	Application.Run(frmMain);
    }


    說明:上述的關鍵點就在於調用.AddRabbitMQPlus()或者.AddRabbitMQPlus<T>()方法對服務進行註冊。

  2. 實例化方式-WinForm 中使用:


    [STAThread]
    static void Main()
    {
    	//初始化Furion
    	Serve.Run(GenericRunOptions.DefaultSilence);
    
    	//或者
    	//Serve.Run(RunOptions.DefaultSilence.ConfigureBuilder(builder =>
    	//{
    		//註冊RabbitMQ連接配置對象
    		//builder.Services.AddConfigurableOptions<QuickRabbitMQPlusOptions>();
    	//}).Configure(app =>
    	//{
    	//}));
    
    	ApplicationConfiguration.Initialize();
    	Application.Run(new FrmMain());
    }


    public void ConfigureServices(IServiceCollection services)
    {
    	//註冊RabbitMQ連接配置對象
    	services.AddConfigurableOptions<QuickRabbitMQPlusOptions>();
    }


  3. Quick.RabbitMQPlus 組件,依賴注入方式-WinForm 中使用:

    Program.cs 的 Main 方法:

    ApplicationConfiguration.Initialize();
    
    using IHost host = Host.CreateDefaultBuilder()
    	.ConfigureServices((_, services) =>
    		{
    			//註冊FrmMain窗體類
    			services.AddScoped<FrmMain>();
    
    			//注入IQuickRabbitMQPlus的方式
    			//通過AddRabbitMQPlus添加依賴注入
    			services.AddRabbitMQPlus();
    
    			////使用構造函數獲取實例的方式:
    			////通過AddRabbitMQPlus添加依賴注入,並註冊TestConsumerClassForDI類
    			//services.AddRabbitMQPlus<TestConsumerClassForDI>()
    
    			//DI容器生成serviceProvider
    			var serviceProvider = services.BuildServiceProvider();
    
    			//通過serviceProvider獲取MainForm的註冊實例
    			var frmMain = serviceProvider.GetRequiredService<FrmMain>();
    			//var frmMain = (FrmMain)serviceProvider.GetService(typeof(FrmMain));
    
    			Application.Run(frmMain);
    		}
    	)
    	.Build();
    
    host.RunAsync();
  4. Quick.RabbitMQPlus 組件,實例化方式-WinForm 中使用:

    Program.cs 的 Main 方法:

    [STAThread]
    static void Main()
    {
    	ApplicationConfiguration.Initialize();
    	Application.Run(new FrmMain());
    }
    

其他庫的使用方式也基本類似,就不一一介紹了。

6.4.3、定義發送消息實體

如下所示我們可以定義一個消息實體:

namespace Quick.RabbitMQPlus.Publisher
{
    public class TestRabbitMQModel
    {
        public int UserId { get; set; }

        public string UserName { get; set; }

        public int UserAge { get; set; }

        public DateTime CreateTime { get; set; }
    }
}

6.4.4、發送消息 Demo

定義發送對象:


public partial class FrmMain : Form
{
	private readonly IQuickRabbitMQPlus _quickRabbitMqPlus;

	public FrmMain(IQuickRabbitMQPlus quickRabbitMqPlus)
	{
		InitializeComponent();

		//定義發送對象
		_quickRabbitMqPlus = quickRabbitMqPlus;
	}
}


//定義發送對象
var sendInstance = QuickRabbitMQPlusInstance.Instance();


發送單條消息:


//發送10條數據
for (int i = 0; i < 10; i++)
{
	var msgModel = new TestRabbitMQModel
	{
		UserId = rand.Next(1, 9999),
		UserName = "Quick" + (i + 1),
		UserAge = rand.Next(20, 80),
		CreateTime = DateTime.Now
	};

	var sendRet = await _quickRabbitMqPlus.Send(msgModel);

	if (sendRet.Item1)
	{
		//發送成功
	}
	else
	{
		//發送失敗
		var errMsg = $"失敗原因:{sendRet.Item2}";
	}

	//間隔2秒發送一次
	await Task.Delay(2000);
}

//消息發送完成後,關閉通道
_quickRabbitMqPlus.Close();


//當i % 2為0時,發送給路由TestRouteKey1對應的隊列TestRabbitMQName1,否則發送給路由TestRouteKey2對應的隊列TestRabbitMQName2
//此處就實現了在路由模式下,將不同的消息發送給不同的隊列
//需要注意的時候,此方式需要將交換機類型配置為direct路由模式,同時需要設置配置的QueueNames和RouteKeys屬性(這兩屬性的集合數量需要保持一致,一一對應的關係)
var sendRet = await _quickRabbitMqPlus.Send(msgModel, i % 2 == 0 ? "TestRouteKey1" : "TestRouteKey2");


發送多條消息:

var sendList = new List<TestRabbitMQModel>{
	new TestRabbitMQModel(),
	new TestRabbitMQModel()
};

var sendRet = await _quickRabbitMqPlus.Send(sendList);

切換連接:

//切換到connId=2的配置
_quickRabbitMqPlus.ChangeConn(2);

var sendRetConn2 = await _quickRabbitMqPlus.Send(msgModel);

//切換到connId=3的配置
_quickRabbitMqPlus.ChangeConn(3);

var sendRetConn3 = await _quickRabbitMqPlus.Send(msgModel);

6.5、🥦 消費端

6.5.1、配置appsettings.json與實體特性QuickRabbitMQPlusReceive

  • 配置說明:

    具體配置請參見生產端(和生產端完全一致)。

    需要注意的是,消費端中,增加了PrefetchCount配置,目的用於全局設置 RabbitMQ 一次最多推送多少條消息給消費者,默認為 10。

    需要注意的是,如果消費端中的 QueueNames 屬性設置了多個隊列,就代表該消費端同時接收多個隊列的消息

  • 實體特性配置說明(消費端使用):

    屬性名稱 屬性說明 是否必填 備註
    queueName 隊列名稱(多個隊列名稱請使用英文逗號,分隔) 如果同時設置了實體特性的隊列名稱和配置中的QueueNames屬性,那麼會優先採用實體的隊列名稱

    如下所示:

    namespace Quick.RabbitMQPlus.Publisher
    {
        [QuickRabbitMQPlusReceive("TestRabbitMQName1")]
        //[QuickRabbitMQPlusReceive("TestRabbitMQName1,TestRabbitMQName2")]
        public class TestRabbitMQModel
        {
            public int UserId { get; set; }
    
            public string UserName { get; set; }
    
            public int UserAge { get; set; }
    
            public DateTime CreateTime { get; set; }
        }
    }

6.5.2、配置 Program.cs

由於我們使用的是Furion,因此,我們可在程序啟動文件中配置如下代碼(具體可參考Furion 入門指南),目的是註冊RabbitMQ 服務配置選項 QuickRabbitMQPlusOptions

  1. 依賴注入方式-Worker Service 中使用:


    //初始化Furion
    Serve.Run(GenericRunOptions.Default);
    


    public void ConfigureServices(IServiceCollection services)
    {
    	//通過AddRabbitMQPlus添加依賴注入
    	services.AddRabbitMQPlus();
    }


    說明:上述的關鍵點就在於調用.AddRabbitMQPlus()或者.AddRabbitMQPlus<T>()方法對服務進行註冊。

  2. 依賴注入方式-控制台中使用:


    //初始化Furion
    Serve.Run(GenericRunOptions.DefaultSilence);
    


    public void ConfigureServices(IServiceCollection services)
    {
    	////通過AddRabbitMQPlus添加依賴注入
    	//services.AddRabbitMQPlus();
    
    	//使用構造函數獲取實例的方式:
    	//通過AddRabbitMQPlus添加依賴注入,並註冊TestConsumerClassForDI類
    	services.AddRabbitMQPlus<TestConsumerClassForDI>();
    }


  3. Quick.RabbitMQPlus 組件,依賴注入方式-Worker Service 中使用:

    Program.cs 的 Main 方法:

    IHost host = Host.CreateDefaultBuilder(args)
    	.ConfigureServices(services =>
    	{
    		services.AddHostedService<Worker>();
    
    		//通過AddRabbitMQPlus添加依賴注入
    		services.AddRabbitMQPlus();
    	})
    	.Build();
    
    await host.RunAsync();
  4. Quick.RabbitMQPlus 組件,依賴注入方式-控制台中使用:

    Program.cs 的 Main 方法:

    using IHost host = Host.CreateDefaultBuilder(args)
    	.ConfigureServices((_, services) =>
    	//注入IQuickRabbitMQPlus的方式
    	//通過AddRabbitMQPlus添加依賴注入
    	services.AddRabbitMQPlus()
    
    	////使用構造函數獲取實例的方式:
    	////通過AddRabbitMQPlus添加依賴注入,並註冊TestConsumerClassForDI類
    	//services.AddRabbitMQPlus<TestConsumerClassForDI>()
    )
    .Build();

6.5.3、定義接收消息實體

如下所示我們可以定義 3 個消息實體(第一個用於接收隊列TestRabbitMQName1的消息,第二個用於接收隊列TestRabbitMQName2的消息,第三個用於接收隊列TestRabbitMQName1TestRabbitMQName2):


namespace Quick.RabbitMQPlus.Consumer
{
    [QuickRabbitMQPlusReceive("TestRabbitMQName1")]
    public class TestRabbitMQModel1
    {
        public int UserId { get; set; }

        public string UserName { get; set; }

        public int UserAge { get; set; }

        public DateTime CreateTime { get; set; }
    }
}


namespace Quick.RabbitMQPlus.Consumer
{
    [QuickRabbitMQPlusReceive("TestRabbitMQName2")]
    public class TestRabbitMQModel2
    {
        public int UserId { get; set; }

        public string UserName { get; set; }

        public int UserAge { get; set; }

        public DateTime CreateTime { get; set; }
    }
}


namespace Quick.RabbitMQPlus.Consumer
{
    [QuickRabbitMQPlusReceive("TestRabbitMQName1,TestRabbitMQName2")]
    public class TestRabbitMQModel3
    {
        public int UserId { get; set; }

        public string UserName { get; set; }

        public int UserAge { get; set; }

        public DateTime CreateTime { get; set; }
    }
}


6.5.4、接收消息 Demo

定義接收對象(依賴注入方式):


public class Worker : BackgroundService
{
	private readonly ILogger<Worker> _logger;
	private readonly IQuickRabbitMQPlus _quickRabbitMqPlus;

	public Worker(ILogger<Worker> logger, IQuickRabbitMQPlus quickRabbitMqPlus)
	{
		_logger = logger;
		_quickRabbitMqPlus = quickRabbitMqPlus;
	}
}


//獲取IQuickRabbitMQPlus的實例(App是Furion中的靜態類)
var _quickRabbitMqPlus = App.GetService<IQuickRabbitMQPlus>();


//獲取IQuickRabbitMQPlus的實例(其中的host為IHost對象,GetInstance方法為封裝的擴展方法)
//var _quickRabbitMqPlus = host.Services.GetInstance<IQuickRabbitMQPlus>();

//獲取IQuickRabbitMQPlus的實例(其中的host為IHost對象,GetInstance方法為封裝的擴展方法)
var _quickRabbitMqPlus = host.GetInstance<IQuickRabbitMQPlus>();


定義接收對象(實例化方式):

//定義接收對象
var recInstance = QuickRabbitMQPlusInstance.Instance();

定義兩個消費端,一個消費端消費一個隊列,具體的接收消息代碼如下所示(接收單條消息):


namespace Quick.RabbitMQPlus.ConsumerServiceFurion
{
    public class Worker : BackgroundService
    {
        private readonly ILogger<Worker> _logger;
        private readonly IQuickRabbitMQPlus _quickRabbitMqPlus;

        public Worker(ILogger<Worker> logger, IQuickRabbitMQPlus quickRabbitMqPlus)
        {
            _logger = logger;
            _quickRabbitMqPlus = quickRabbitMqPlus;
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            while (!stoppingToken.IsCancellationRequested)
            {
                //接收隊列1的消息
                var retRec1 = await _quickRabbitMqPlus.Receive<TestRabbitMQModel1>(async (data, msg) =>
                {
                    await Task.Delay(1000);

                    Console.ForegroundColor = ConsoleColor.Green;
                    Console.WriteLine($"\r\n隊列1消息:{msg}");

                    //返回true代表業務邏輯處理成功,會告知MQ這條消息已經接收成功,會從MQ隊列中刪除
                    //返回false代表業務邏輯處理失敗,會告知MQ這條消息沒有處理成功,則MQ會繼續推送這條消息
                    return true;
                }, async (errMsg, msg) =>
                {
                    await Task.Delay(3000);

                    Console.ForegroundColor = ConsoleColor.Red;
                    Console.WriteLine($"\r\n隊列1接收錯誤:{errMsg}\r\n原始數據:{msg}");
                }, 1);
                if (!retRec1.Item1)
                {
                    Console.ForegroundColor = ConsoleColor.Red;
                    Console.WriteLine($"\r\n隊列1接收失敗:{retRec1.Item2}");
                }

                //接收隊列2的消息
                var retRec2 = await _quickRabbitMqPlus.Receive<TestRabbitMQModel2>(async (data, msg) =>
                {
                    await Task.Delay(2500);

                    Console.ForegroundColor = ConsoleColor.Magenta;
                    Console.WriteLine($"\r\n隊列2消息:{msg}");

                    //返回true代表業務邏輯處理成功,會告知MQ這條消息已經接收成功,會從MQ隊列中刪除
                    //返回false代表業務邏輯處理失敗,會告知MQ這條消息沒有處理成功,則MQ會繼續推送這條消息
                    return true;
                }, async (errMsg, msg) =>
                {
                    await Task.Delay(3000);

                    Console.ForegroundColor = ConsoleColor.Red;
                    Console.WriteLine($"\r\n隊列1接收錯誤:{errMsg}\r\n原始數據:{msg}");
                }, 10);
                if (!retRec2.Item1)
                {
                    Console.ForegroundColor = ConsoleColor.Red;
                    Console.WriteLine($"\r\n隊列2接收失敗:{retRec2.Item2}");
                }
            }
        }
    }
}


//接收隊列1的消息
var retRec1 = await _quickRabbitMqPlus.Receive<TestRabbitMQModel1>(async (data, msg) =>
{
    await Task.Delay(1000);

    Console.ForegroundColor = ConsoleColor.Green;
    Console.WriteLine($"\r\n隊列1消息:{msg}");

    //返回true代表業務邏輯處理成功,會告知MQ這條消息已經接收成功,會從MQ隊列中刪除
    //返回false代表業務邏輯處理失敗,會告知MQ這條消息沒有處理成功,則MQ會繼續推送這條消息
    return true;
});
if (!retRec1.Item1)
{
    Console.ForegroundColor = ConsoleColor.Red;
    Console.WriteLine($"\r\n隊列1接收失敗:{retRec1.Item2}");
}

//接收隊列2的消息
var retRec2 = await _quickRabbitMqPlus.Receive<TestRabbitMQModel2>(async (data, msg) =>
{
    await Task.Delay(2500);

    Console.ForegroundColor = ConsoleColor.Magenta;
    Console.WriteLine($"\r\n隊列2消息:{msg}");

    //返回true代表業務邏輯處理成功,會告知MQ這條消息已經接收成功,會從MQ隊列中刪除
    //返回false代表業務邏輯處理失敗,會告知MQ這條消息沒有處理成功,則MQ會繼續推送這條消息
    return true;
});
if (!retRec2.Item1)
{
    Console.ForegroundColor = ConsoleColor.Red;
    Console.WriteLine($"\r\n隊列2接收失敗:{retRec2.Item2}");
}


效果如下所示:

6.4.4-1

定義一個消費端,同時消費兩個隊列,具體的接收消息代碼如下所示(接收單條消息):

//接收隊列1的消息
var retRec = await _quickRabbitMqPlus.Receive<TestRabbitMQModel3>(async (data, msg) =>
{
    await Task.Delay(1000);

    Console.ForegroundColor = ConsoleColor.Green;
    Console.WriteLine($"\r\n隊列1、2消息:{msg}");

    //返回true代表業務邏輯處理成功,會告知MQ這條消息已經接收成功,會從MQ隊列中刪除
    //返回false代表業務邏輯處理失敗,會告知MQ這條消息沒有處理成功,則MQ會繼續推送這條消息
    return true;
});
if (!retRec.Item1)
{
    Console.ForegroundColor = ConsoleColor.Red;
    Console.WriteLine($"\r\n隊列1、2接收失敗:{retRec.Item2}");
}

效果如下所示:

6.4.4-2

如果需要接收多條消息,請使用Receives方法:

//接收隊列1的消息
var retRec = await _quickRabbitMqPlus.Receives<TestRabbitMQModel3>(async (dataList, msg) =>
{
	//此處的dataList為List<TestRabbitMQModel3>

    return true;
});

6.6、🥙 Quick.RabbitMQPlus.Furion 方法

  • 首先聲明 Quick.RabbitMQPlus 的實例化對象,有兩種方式可以得到 Quick.RabbitMQPlus 的實例化對象,一種是通過依賴注入在構造函數中得到,另一種是直接通過實例化對象的方式,具體可參照上述文檔中的相關示例。

  • 依賴注入方法:

    方法名稱 方法說明 方法參數 備註
    AddRabbitMQPlus 添加依賴注入服務 重載 1:()
    重載 2:<T>()
    該方法為IServiceCollection的擴展方法,目的是實現IQuickRabbitMQPlus接口的註冊。
    重載 1 代表註冊的是IQuickRabbitMQPlus服務;
    重載 2 傳入了泛型 T,代表的是註冊了IQuickRabbitMQPlus服務的同時,也註冊了 T 這個服務(T這個泛型類中,在構造函數中實現了IQuickRabbitMQPlus接口服務,該方法可能在控制台程序使用的情況較多)。
    GetInstance 獲取某接口服務的實例 重載 1:()
    重載 2:()
    該方法為IServiceProviderIHost的擴展方法,目的是獲取某接口或類的實例。
    重載 1 是基於IServiceProvider的擴展;
    重載 2 是基於IHost的擴展。
  • 其次就可以使用使用該實例化對象中的發送和接收方法了,具體說明如下所示:

    方法名稱 方法說明 方法參數 備註
    Send 發送消息方法,支持單條消息和多條消息的發送 (data,routeKey) 方法的第一個參數 data 可以為 T 或 List<T>
    方法的第二個參數為路由名稱(當交換機類型為路由模式的時候,該參數起作用,如可以實現使用同一個實體將不同的消息發送到不同的隊列中)
    Receive 接收消息(單條消息),該方法有 3 個重載 重載 1:(received)
    重載 2:(received, receivedError)
    重載 3:(received, receivedError, prefetchCount)
    方法的第一個參數為回調函數,該回調函數包含 2 個返回數據(第一個為T,第二個為T 對應的字符串),並且該回調函數需要返回 bool 類型(以便告訴 RabbitMQ 服務該消息是否處理成功);

    方法的第二個參數為消費出錯的回調函數,該回調函數包括 2 個返回數據(第一個為錯誤提示信息,第二個為T 對應的字符串
    方法的第三個參數為設置 RabbitMQ 一次最多推送多少條消息給消費者,默認為 10

    Receives 接收消息(多條消息),該方法有 3 個重載 重載 1:(received)
    重載 2:(received, receivedError)
    重載 3:(received, receivedError, prefetchCount)
    方法的第一個參數為回調函數,該回調函數包含 2 個返回數據(第一個為List<T>,第二個為List<T>對應的字符串),並且該回調函數需要返回 bool 類型(以便告訴 RabbitMQ 服務該消息是否處理成功);
    方法的第二個參數為消費出錯的回調函數,該回調函數包括 2 個返回數據(第一個為錯誤提示信息,第二個為List<T>對應的字符串
    方法的第三個參數為設置 RabbitMQ 一次最多推送多少條消息給消費者,默認為 10
    ChangeConn 切換連接(切換配置文件中某個連接配置) (connId) 方法參數 connId 為 int 類型,即與配置中的 ConnId 保持一致
    Close 關閉連接 () 注意,如果調用了該方法,又想重新使用實例化對象 mqInstance 發送或接收消息,需要重新實例化該對象。

    如下所示為接收消息的使用方式:

    //接收隊列1的消息
    var retRec1 = await _quickRabbitMqPlus.Receive<TestRabbitMQModel1>(async (data, msg) =>
    {
    	await Task.Delay(1000);
    
    	Console.ForegroundColor = ConsoleColor.Green;
    	Console.WriteLine($"\r\n隊列1消息:{msg}");
    
    	//返回true代表業務邏輯處理成功,會告知MQ這條消息已經接收成功,會從MQ隊列中刪除
    	//返回false代表業務邏輯處理失敗,會告知MQ這條消息沒有處理成功,則MQ會繼續推送這條消息
    	return true;
    }, async (errMsg, msg) =>
    {
    	await Task.Delay(3000);
    
    	Console.ForegroundColor = ConsoleColor.Red;
    	Console.WriteLine($"\r\n隊列1接收錯誤:{errMsg}\r\n原始數據:{msg}");
    }, 1);
    if (!retRec1.Item1)
    {
    	Console.ForegroundColor = ConsoleColor.Red;
    	Console.WriteLine($"\r\n隊列1接收失敗:{retRec1.Item2}");
    }
    

7、🍟Quick.RabbitMQPlus 使用說明

Quick.RabbitMQPlus組件的使用方式和Quick.RabbitMQPlus.Furion組件完全一致(包括配置、實體特性和方法等),唯一不同的就是Quick.RabbitMQPlus.Furion需要在啟動程序中通過依賴注入註冊服務(services.AddRabbitMQPlus()、services.AddRabbitMQPlus<T>())註冊 RabbitMQ 連接配置對象builder.Services.AddConfigurableOptions<QuickRabbitMQPlusOptions>();)。


8、🌰RabbitMQ 總結

經過以上對 RabbitMQ 的介紹和運用,簡單總結幾點關於 RabbitMQ 的重點注意事項:

  • 需要了解掌握 RabbitMQ 的幾個核心概念:生產者、交換機、隊列、綁定、連接、通道和消費者;

  • 需要了解掌握 RabbitMQ 的幾種模式:簡單隊列模式、工作隊列模式、發佈訂閱模式(使用最多)、路由模式和通配符模式;

  • 在消費數據完成後,需要響應給 MQ 服務器,因此要理解自動響應和手動響應的原理和區別(一般都使用手動響應,為了確保消息不丟失並且可再次消費);

  • 如果想要消息持久化,需要根據實際情況對交換機隊列消息設置其持久化配置;

  • ……