RabbitMQ的六種工作模式

一、普通隊列模式

1.  一個消費者,一個隊列,一個消費者。
2.  消息產生消息放入隊列,消息的消費者(consumer) 監聽(while) 消息隊列,如果隊列中有消息,就消費掉,消息被拿走後,自動從隊列中刪除(隱患 消息可能沒有被消費者正確處理,已經從隊列中消失了,造成消息的丟失)應用場景:聊天(中間有一個過度的伺服器;p端,c端)

image
22.cnblogs.com/blog/1913282/202207/1913282-20220730231124618-1368574550.png)

  • 獲取RabbitMQ連接幫助類

    後面程式碼,這部分創建連接共用

     public class RabbitMQHelper
        {
            /// <summary>
            /// 獲取RabbitMQ連接
            /// </summary>
            /// <returns></returns>
            public static IConnection GetConnection()
            {
                //實例化連接工廠
                var factory = new ConnectionFactory
                {
                    HostName = "127.0.0.1", //ip
                    Port = 5672, // 埠
                    UserName = "Admin", // 賬戶
                    Password = "Admin", // 密碼
                    VirtualHost = "/"   // 虛擬主機
                };
    
                return factory.CreateConnection();
            }
        }
    
  • 生產者

    public class Send
    {
    
        public static void SendMessage()
        {
            string queueName = "normal";
    
            //1.創建鏈接
            using (var connection = RabbitMQHelper.GetConnection())
            {
                // 2.創建信道
                using(var channel = connection.CreateModel())
                {
                    // 3.聲明隊列
                    channel.QueueDeclare(queueName, false, false, false, null);
                    // 沒有綁定交換機,怎麼找到路由隊列的呢?
                    for (int i = 1; i <= 30; i++)
                    {
                        //4.構建Byte消息數據包
                        string message =$"第{i}條消息";
                        var body = Encoding.UTF8.GetBytes(message);//消息以二進位形式傳輸
    
                        // 發送消息到rabbitmq,使用rabbitmq中默認提供交換機路由,默認的路由Key和隊列名稱完全一致
                        //5.發送數據包
                        channel.BasicPublish(exchange: "", routingKey: queueName, null, body);
                        Thread.Sleep(1000);//添加延遲
                        Console.WriteLine("生產:" + message);
                    }
                }
            }
    
        } 
    }
    
  • 消費者

    public class Receive
    {
        public static void ReceiveMessage()
        {
            // 消費者消費是隊列中消息
            string queueName = "normal";
            //1.建立鏈接鏈接
            var connection = RabbitMQHelper.GetConnection();
            {
                //2.建立信道
                var channel = connection.CreateModel();
                {
                    //3.聲明隊列:如果你先啟動是消費端就會異常
                    channel.QueueDeclare(queueName, false, false, false, null);
                    //4.創建一個消費者實例
                    var consumer = new EventingBasicConsumer(channel);
                    //5.綁定消息接收後的事件委託
                    consumer.Received +=(model, ea) => {
                        var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                        Thread.Sleep(1000);
                        Console.WriteLine(" Normal Received => {0}", message);
                    }; 
                    //6.啟動消費者
                    channel.BasicConsume( queue: queueName, autoAck:true, consumer);//開始消費
                }
    
            }
    
        } 
    }
    

二、工作隊列模式

  1. 一個消費者,一個隊列,多個消費者。但多個消費者中只會有一個會成功地消費消息

  2. 消息產生者將消息放入隊列消費者可以有多個,消費者1,消費者2,同時監聽同一個隊列,消息被消費?C1 C2共同爭搶當前的消息隊列內容,誰先拿到誰負責消費消息(隱患,高並發情況下,默認會產生某一個消息被多個消費者共同使用。

  3. 應用場景:紅包;大項目中的資源調度(任務分配系統不需知道哪一個任務執行系統在空閑,直接將任務扔到消息隊列中,空閑的系統自動爭搶)
    image

  • 生產者

     public class WorkerSend
        {
    
            public static void SendMessage()
            {
                string queueName = "Worker_Queue";
    
                using (var connection = RabbitMQHelper.GetConnection())
                {
                    using(var channel = connection.CreateModel())
                    {
                        channel.QueueDeclare(queueName, false, false, false, null);
                        for (int i = 0; i < 30; i++)
                        {
                            string message = $"RabbitMQ Worker {i + 1} Message";
                            var body = Encoding.UTF8.GetBytes(message);
                            channel.BasicPublish("", queueName, null, body);
                            Console.WriteLine("send Task {0} message",i + 1);
                        }
                       
                    }
                }
                
            } 
        }
    
  • 消費者

      public class WorkerReceive
        {
            public static void ReceiveMessage()
            {
                string queueName = "Worker_Queue";
                var connection = RabbitMQHelper.GetConnection();
                {
                    var channel = connection.CreateModel();
                    {
                        channel.QueueDeclare(queueName, false, false, false, null);
                        var consumer = new EventingBasicConsumer(channel);
                        //設置prefetchCount : 1來告知RabbitMQ,在未收到消費端的消息確認時,不再分發消息,也就確保了當消費端處於忙碌狀態時,不再分配任務。
                        channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
                        consumer.Received +=(model, ea) => {
                            var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                            Console.WriteLine(" Worker Queue Received => {0}", message);
                        }; 
                        channel.BasicConsume(queueName,true, consumer);
                    }
                   
                }
              
            } 
        }
    

三、扇形隊列模式(發布/訂閱模式)

  1. 一個消息生產者,一個交換機,多個隊列,多個消息消費者。每個消費隊列中消息一致,且每個消息消費者都從自己的消息隊列的第一個消息開始消費,直到最後。

  2. 交換機為rabbitMQ中內部組件。消息生產者將消息發送給rabbitMQ後,rabbitMQ會根據訂閱的消費者個數,生成對應數目的消息隊列,這樣每個消費者都能獲取生產者發送的全部消息。

  3. 一旦消費者斷開與rabbitMQ的連接,隊列就會消失。如果消費者數目很多,對於rabbitMQ而言,也是個重大負擔,訂閱模式是個長連接,佔用並發數,且每個消費者一個隊列會佔用大量空間

  4. 相關應用場景:郵件群發,群聊,廣播
    image

  • 生產者
 public static void SendMessage()
        {
            //1.創建連接
            using (var connection = RabbitMQHelper.GetConnection())
            {
                //2.創建信道
                using(var channel = connection.CreateModel())
                {
                    // 3.聲明交換機對象
                    channel.ExchangeDeclare("fanout_exchange", "fanout");
                   
                    // 4.創建隊列
                    string queueName1 = "fanout_queue1";
                    channel.QueueDeclare(queueName1, false, false, false, null);
                    string queueName2 = "fanout_queue2";
                    channel.QueueDeclare(queueName2, false, false, false, null);
                    string queueName3 = "fanout_queue3";
                    channel.QueueDeclare(queueName3, false, false, false, null);
                    
                    // 5.綁定到交互機
                    // fanout_exchange 綁定了 3個隊列 
                    channel.QueueBind(queue: queueName1, exchange: "fanout_exchange", routingKey: "");//指定交換機
                    channel.QueueBind(queue: queueName2, exchange: "fanout_exchange", routingKey: "");
                    channel.QueueBind(queue: queueName3, exchange: "fanout_exchange", routingKey: "");

                    for (int i = 0; i < 10; i++)
                    {
                        //6.構建消息byte數組
                        string message = $"RabbitMQ Fanout {i + 1} Message";
                        var body = Encoding.UTF8.GetBytes(message);
                        //7.發送消息
                        channel.BasicPublish("fanout_exchange", "", null, body);//同時把消息發送到訂閱的三個隊列
                        Console.WriteLine("Send Fanout {0} message",i + 1);
                    }
                }
            }
            
        } 
    }
  • 消費者
 public class FanoutConsumer
    {
        public static void ConsumerMessage()
        {
            //1.創建連接
            var connection = RabbitMQHelper.GetConnection();
            {
                //2,。創建信道
                var channel = connection.CreateModel();
                {
                    //3.申明exchange
                    channel.ExchangeDeclare(exchange: "fanout_exchange", type: "fanout");
                    
                    // 4.創建隊列
                    string queueName1 = "fanout_queue1";
                    channel.QueueDeclare(queueName1, false, false, false, null);
                    string queueName2 = "fanout_queue2";
                    channel.QueueDeclare(queueName2, false, false, false, null);
                    string queueName3 = "fanout_queue3";
                    channel.QueueDeclare(queueName3, false, false, false, null);
                    
                    // 5.綁定到交互機
                    channel.QueueBind(queue: queueName1, exchange: "fanout_exchange", routingKey: "");
                    channel.QueueBind(queue: queueName2, exchange: "fanout_exchange", routingKey: "");
                    channel.QueueBind(queue: queueName3, exchange: "fanout_exchange", routingKey: "");

                    Console.WriteLine("[*] Waitting for fanout logs.");

                    //6.申明consumer
                    var consumer = new EventingBasicConsumer(channel);
                    //綁定消息接收後的事件委託
                    consumer.Received += (model, ea) => {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body.ToArray());
                        Console.WriteLine("[x] {0}", message);

                    };
                    //7.啟動消費者
                    channel.BasicConsume(queue: queueName1, autoAck: true, consumer: consumer);//只會消費隊列queueName1中的消息,其他隊列中訂閱的消息仍然存在
                    Console.WriteLine(" Press [enter] to exit.");
                    Console.ReadLine();
                }
            }
        }
    }

四、直接隊列模式(Routing路由模式)

  1. 一個消息生產者,一個交換機,多個隊列,多個消息消費者。一個交換機綁定多個消息隊列,每個消息隊列都有自己唯一的Routekey,每一個消息隊列有一個消費者監聽。

  2. 消息生產者將消息發送給交換機,交換機按照路由判斷,將路由到的RouteKey的消息,推送與之綁定的隊列,交換機根據路由的key,只能匹配上路由key對應的消息隊列,對應的消費者才能消費消息;
    image

  • 生產者:
public static void SendMessage()
        {
            //1.創建連接
            using (var connection = RabbitMQHelper.GetConnection())
            {
                //2.創建信道
                using(var channel = connection.CreateModel())
                {
                    // 3.聲明Direct交換機
                    channel.ExchangeDeclare("direct_exchange", "direct");

                    // 4.創建隊列
                    string queueName1 = "direct_queue1";
                    channel.QueueDeclare(queueName1, false, false, false, null);
                    string queueName2 = "direct_queue2";
                    channel.QueueDeclare(queueName2, false, false, false, null);
                    string queueName3 = "direct_queue3";
                    channel.QueueDeclare(queueName3, false, false, false, null);

                    // 5.綁定到交互機 指定routingKey
                    channel.QueueBind(queue: queueName1, exchange: "direct_exchange", routingKey: "red");
                    channel.QueueBind(queue: queueName2, exchange: "direct_exchange", routingKey: "yellow");
                    channel.QueueBind(queue: queueName3, exchange: "direct_exchange", routingKey: "green");

                    for (int i = 0; i < 10; i++)
                    {
                        string message = $"RabbitMQ Direct {i + 1} Message =>green";
                        var body = Encoding.UTF8.GetBytes(message);
                        // 發送消息的時候需要指定routingKey發送
                        channel.BasicPublish(exchange: "direct_exchange", routingKey: "green", null, body);//只發布到RouteKey:green的隊列
                        Console.WriteLine("Send Direct {0} message",i + 1);
                    }
                }
            }
            
        } 
    }
  • 消費者
   public class DirectConsumer
    {
        public static void ConsumerMessage()
        {
            //1.創建連接
            var connection = RabbitMQHelper.GetConnection();
            //2.創建通訊
            var channel = connection.CreateModel();
            //3.聲明交換機
            channel.ExchangeDeclare(exchange: "direct_exchange", type: "direct");
            //4.綁定交換機
            var queueName = "direct_queue2";//隊列direct_queue3綁定有red,yellow,green共3個RouteKey
            channel.QueueDeclare(queueName, false, false, false, null);
            //此處消費通訊沒有必要綁定所有的RouteKey,根據前生產者通訊的路由規則,每個隊列中只會路由到一種消息
            channel.QueueBind(queue: queueName,
                                      exchange: "direct_exchange",
                                      routingKey: "red");
            channel.QueueBind(queue: queueName,
                                      exchange: "direct_exchange",
                                      routingKey: "yellow");
            channel.QueueBind(queue: queueName,
                                      exchange: "direct_exchange",
                                      routingKey: "green");

            Console.WriteLine(" [*] Waiting for messages.");

            //5.實例化消費者
            var consumer = new EventingBasicConsumer(channel);
            //6.為消費者綁定消費委託事件
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body.ToArray());
                var routingKey = ea.RoutingKey;
                Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message);

                // 消費完成後需要手動簽收消息,如果不寫該程式碼就容易導致重複消費問題
                //7.手動確認簽收消息
                channel.BasicAck(ea.DeliveryTag, true); // 可以降低每次簽收性能損耗
            };

            // 消息簽收模式
            // 手動簽收 保證正確消費,不會丟消息(基於客戶端而已)
            // 自動簽收 容易丟消息 
            // 簽收:意味著消息從隊列中刪除
            channel.BasicConsume(queue: queueName,
                                 autoAck: false,
                                 consumer: consumer);//設置為不自動簽收,進行手動簽收

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }

五、模糊匹配隊列模式(Topic 主題模式)

  1. 一個消息生產者,一個交換機,多個隊列,多個消息消費者。一個交換機綁定多個消息隊列,每個消息隊列都有自己唯一的Routekey,每一個消息隊列有一個消費者監聽。

  2. 此時的自己唯一的Routekey,不是一個確定值,像我們熟悉的正則表達式對應的匹配規則。

  3. 生產者產生消息,把消息交給交換機,交換機根據RouteKey的模糊匹配到對應的隊列,由隊列監聽消費者消費消息。

  4. 規則:

    和* 都是通配符,命名規則是多個單詞用頓號(.)分隔開

    代表代表一個單詞

    *代表多個單詞
    image

  • 生產者:
      public static void SendMessage()
        {
            //1.創建連接
            using (var connection = RabbitMQHelper.GetConnection())
            {
                //2.創建信道
                using (var channel = connection.CreateModel())
                {
                    //3.聲明交換機
                    channel.ExchangeDeclare("topic_exchange", "topic");
                    //4.聲明隊列
                    string queueName1 = "topic_queue1";
                    channel.QueueDeclare(queueName1, false, false, false, null);
                    string queueName2 = "topic_queue2";
                    channel.QueueDeclare(queueName2, false, false, false, null);
                    string queueName3 = "topic_queue3";
                    channel.QueueDeclare(queueName3, false, false, false, null);
                    //5.綁定到交互機
                    channel.QueueBind(queue: queueName1, exchange: "topic_exchange", routingKey: "user.data.*");
                    channel.QueueBind(queue: queueName2, exchange: "topic_exchange", routingKey: "user.data.delete");
                    channel.QueueBind(queue: queueName3, exchange: "topic_exchange", routingKey: "user.data.update");
                
                    for (int i = 0; i < 10; i++)
                    {
                        //6.準備發送位元組數組
                        string message = $"RabbitMQ Topic {i + 1} Delete Message";
                        var body = Encoding.UTF8.GetBytes(message);
                        //7.根據RouteKey發布消息
                        channel.BasicPublish("topic_exchange", "user.data.delete", null, body);//會發布到queueName1,queueName2
                        Console.WriteLine("Send Topic {0} message", i + 1);
                    }
                }
            }

        }
  • 消費者:
 public static void ConsumerMessage()
        {
            //1.創建連接
            var connection = RabbitMQHelper.GetConnection();
            //2.創建通訊
            var channel = connection.CreateModel();
            //3.聲明交換機
            channel.ExchangeDeclare(exchange: "topic_exchange", type: "topic");
            //4.聲明隊列
            var queueName = "topic_queue3";
            channel.QueueDeclare(queueName, false, false, false, null);
            //5.綁定交換機
            channel.QueueBind(queue: queueName,
                                      exchange: "topic_exchange",
                                      routingKey: "user.data.*");

            Console.WriteLine(" [*] Waiting for messages.");
            //6.創建消費者
            var consumer = new EventingBasicConsumer(channel);
            //7.綁定消費委託事件
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body.ToArray());
                var routingKey = ea.RoutingKey;
                Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message);
            };

            //8.啟動消費
            channel.BasicConsume(queue: queueName,
                                 autoAck: true,
                                 consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }

六、RPC 模式(了解)

RPC即客戶端遠程調用服務端的方法 ,使用MQ可以實現RPC的非同步調用,基於Direct交換機實現,流程如下:

1、客戶端即是生產者也是消費者,向RPC請求隊列發送RPC調用消息,同時監聽RPC響應隊列。

2、服務端監聽RPC請求隊列的消息,收到消息後執行服務端的方法,得到方法返回的結果。

3、服務端將RPC方法 的結果發送到RPC響應隊列。

4、客戶端(RPC調用方)監聽RPC響應隊列,接收到RPC調用結果。

Tags: