RabbitMQ的六種工作模式
一、普通隊列模式
1. 一個消費者,一個隊列,一個消費者。
2. 消息產生消息放入隊列,消息的消費者(consumer) 監聽(while) 消息隊列,如果隊列中有消息,就消費掉,消息被拿走後,自動從隊列中刪除(隱患 消息可能沒有被消費者正確處理,已經從隊列中消失了,造成消息的丟失)應用場景:聊天(中間有一個過度的伺服器;p端,c端)
22.cnblogs.com/blog/1913282/202207/1913282-20220730231124618-1368574550.png)
-
獲取RabbitMQ連接幫助類
後面程式碼,這部分創建連接共用
public class RabbitMQHelper { /// <summary> /// 獲取RabbitMQ連接 /// </summary> /// <returns></returns> public static IConnection GetConnection() { //實例化連接工廠 var factory = new ConnectionFactory { HostName = "127.0.0.1", //ip Port = 5672, // 埠 UserName = "Admin", // 賬戶 Password = "Admin", // 密碼 VirtualHost = "/" // 虛擬主機 }; return factory.CreateConnection(); } }
-
生產者
public class Send { public static void SendMessage() { string queueName = "normal"; //1.創建鏈接 using (var connection = RabbitMQHelper.GetConnection()) { // 2.創建信道 using(var channel = connection.CreateModel()) { // 3.聲明隊列 channel.QueueDeclare(queueName, false, false, false, null); // 沒有綁定交換機,怎麼找到路由隊列的呢? for (int i = 1; i <= 30; i++) { //4.構建Byte消息數據包 string message =$"第{i}條消息"; var body = Encoding.UTF8.GetBytes(message);//消息以二進位形式傳輸 // 發送消息到rabbitmq,使用rabbitmq中默認提供交換機路由,默認的路由Key和隊列名稱完全一致 //5.發送數據包 channel.BasicPublish(exchange: "", routingKey: queueName, null, body); Thread.Sleep(1000);//添加延遲 Console.WriteLine("生產:" + message); } } } } }
-
消費者
public class Receive { public static void ReceiveMessage() { // 消費者消費是隊列中消息 string queueName = "normal"; //1.建立鏈接鏈接 var connection = RabbitMQHelper.GetConnection(); { //2.建立信道 var channel = connection.CreateModel(); { //3.聲明隊列:如果你先啟動是消費端就會異常 channel.QueueDeclare(queueName, false, false, false, null); //4.創建一個消費者實例 var consumer = new EventingBasicConsumer(channel); //5.綁定消息接收後的事件委託 consumer.Received +=(model, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Thread.Sleep(1000); Console.WriteLine(" Normal Received => {0}", message); }; //6.啟動消費者 channel.BasicConsume( queue: queueName, autoAck:true, consumer);//開始消費 } } } }
二、工作隊列模式
-
一個消費者,一個隊列,多個消費者。但多個消費者中只會有一個會成功地消費消息
-
消息產生者將消息放入隊列消費者可以有多個,消費者1,消費者2,同時監聽同一個隊列,消息被消費?C1 C2共同爭搶當前的消息隊列內容,誰先拿到誰負責消費消息(隱患,高並發情況下,默認會產生某一個消息被多個消費者共同使用。
-
應用場景:紅包;大項目中的資源調度(任務分配系統不需知道哪一個任務執行系統在空閑,直接將任務扔到消息隊列中,空閑的系統自動爭搶)
-
生產者
public class WorkerSend { public static void SendMessage() { string queueName = "Worker_Queue"; using (var connection = RabbitMQHelper.GetConnection()) { using(var channel = connection.CreateModel()) { channel.QueueDeclare(queueName, false, false, false, null); for (int i = 0; i < 30; i++) { string message = $"RabbitMQ Worker {i + 1} Message"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish("", queueName, null, body); Console.WriteLine("send Task {0} message",i + 1); } } } } }
-
消費者
public class WorkerReceive { public static void ReceiveMessage() { string queueName = "Worker_Queue"; var connection = RabbitMQHelper.GetConnection(); { var channel = connection.CreateModel(); { channel.QueueDeclare(queueName, false, false, false, null); var consumer = new EventingBasicConsumer(channel); //設置prefetchCount : 1來告知RabbitMQ,在未收到消費端的消息確認時,不再分發消息,也就確保了當消費端處於忙碌狀態時,不再分配任務。 channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); consumer.Received +=(model, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine(" Worker Queue Received => {0}", message); }; channel.BasicConsume(queueName,true, consumer); } } } }
三、扇形隊列模式(發布/訂閱模式)
-
一個消息生產者,一個交換機,多個隊列,多個消息消費者。每個消費隊列中消息一致,且每個消息消費者都從自己的消息隊列的第一個消息開始消費,直到最後。
-
交換機為rabbitMQ中內部組件。消息生產者將消息發送給rabbitMQ後,rabbitMQ會根據訂閱的消費者個數,生成對應數目的消息隊列,這樣每個消費者都能獲取生產者發送的全部消息。
-
一旦消費者斷開與rabbitMQ的連接,隊列就會消失。如果消費者數目很多,對於rabbitMQ而言,也是個重大負擔,訂閱模式是個長連接,佔用並發數,且每個消費者一個隊列會佔用大量空間
-
相關應用場景:郵件群發,群聊,廣播
- 生產者
public static void SendMessage()
{
//1.創建連接
using (var connection = RabbitMQHelper.GetConnection())
{
//2.創建信道
using(var channel = connection.CreateModel())
{
// 3.聲明交換機對象
channel.ExchangeDeclare("fanout_exchange", "fanout");
// 4.創建隊列
string queueName1 = "fanout_queue1";
channel.QueueDeclare(queueName1, false, false, false, null);
string queueName2 = "fanout_queue2";
channel.QueueDeclare(queueName2, false, false, false, null);
string queueName3 = "fanout_queue3";
channel.QueueDeclare(queueName3, false, false, false, null);
// 5.綁定到交互機
// fanout_exchange 綁定了 3個隊列
channel.QueueBind(queue: queueName1, exchange: "fanout_exchange", routingKey: "");//指定交換機
channel.QueueBind(queue: queueName2, exchange: "fanout_exchange", routingKey: "");
channel.QueueBind(queue: queueName3, exchange: "fanout_exchange", routingKey: "");
for (int i = 0; i < 10; i++)
{
//6.構建消息byte數組
string message = $"RabbitMQ Fanout {i + 1} Message";
var body = Encoding.UTF8.GetBytes(message);
//7.發送消息
channel.BasicPublish("fanout_exchange", "", null, body);//同時把消息發送到訂閱的三個隊列
Console.WriteLine("Send Fanout {0} message",i + 1);
}
}
}
}
}
- 消費者
public class FanoutConsumer
{
public static void ConsumerMessage()
{
//1.創建連接
var connection = RabbitMQHelper.GetConnection();
{
//2,。創建信道
var channel = connection.CreateModel();
{
//3.申明exchange
channel.ExchangeDeclare(exchange: "fanout_exchange", type: "fanout");
// 4.創建隊列
string queueName1 = "fanout_queue1";
channel.QueueDeclare(queueName1, false, false, false, null);
string queueName2 = "fanout_queue2";
channel.QueueDeclare(queueName2, false, false, false, null);
string queueName3 = "fanout_queue3";
channel.QueueDeclare(queueName3, false, false, false, null);
// 5.綁定到交互機
channel.QueueBind(queue: queueName1, exchange: "fanout_exchange", routingKey: "");
channel.QueueBind(queue: queueName2, exchange: "fanout_exchange", routingKey: "");
channel.QueueBind(queue: queueName3, exchange: "fanout_exchange", routingKey: "");
Console.WriteLine("[*] Waitting for fanout logs.");
//6.申明consumer
var consumer = new EventingBasicConsumer(channel);
//綁定消息接收後的事件委託
consumer.Received += (model, ea) => {
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine("[x] {0}", message);
};
//7.啟動消費者
channel.BasicConsume(queue: queueName1, autoAck: true, consumer: consumer);//只會消費隊列queueName1中的消息,其他隊列中訂閱的消息仍然存在
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
}
四、直接隊列模式(Routing路由模式)
-
一個消息生產者,一個交換機,多個隊列,多個消息消費者。一個交換機綁定多個消息隊列,每個消息隊列都有自己唯一的Routekey,每一個消息隊列有一個消費者監聽。
-
消息生產者將消息發送給交換機,交換機按照路由判斷,將路由到的RouteKey的消息,推送與之綁定的隊列,交換機根據路由的key,只能匹配上路由key對應的消息隊列,對應的消費者才能消費消息;
- 生產者:
public static void SendMessage()
{
//1.創建連接
using (var connection = RabbitMQHelper.GetConnection())
{
//2.創建信道
using(var channel = connection.CreateModel())
{
// 3.聲明Direct交換機
channel.ExchangeDeclare("direct_exchange", "direct");
// 4.創建隊列
string queueName1 = "direct_queue1";
channel.QueueDeclare(queueName1, false, false, false, null);
string queueName2 = "direct_queue2";
channel.QueueDeclare(queueName2, false, false, false, null);
string queueName3 = "direct_queue3";
channel.QueueDeclare(queueName3, false, false, false, null);
// 5.綁定到交互機 指定routingKey
channel.QueueBind(queue: queueName1, exchange: "direct_exchange", routingKey: "red");
channel.QueueBind(queue: queueName2, exchange: "direct_exchange", routingKey: "yellow");
channel.QueueBind(queue: queueName3, exchange: "direct_exchange", routingKey: "green");
for (int i = 0; i < 10; i++)
{
string message = $"RabbitMQ Direct {i + 1} Message =>green";
var body = Encoding.UTF8.GetBytes(message);
// 發送消息的時候需要指定routingKey發送
channel.BasicPublish(exchange: "direct_exchange", routingKey: "green", null, body);//只發布到RouteKey:green的隊列
Console.WriteLine("Send Direct {0} message",i + 1);
}
}
}
}
}
- 消費者
public class DirectConsumer
{
public static void ConsumerMessage()
{
//1.創建連接
var connection = RabbitMQHelper.GetConnection();
//2.創建通訊
var channel = connection.CreateModel();
//3.聲明交換機
channel.ExchangeDeclare(exchange: "direct_exchange", type: "direct");
//4.綁定交換機
var queueName = "direct_queue2";//隊列direct_queue3綁定有red,yellow,green共3個RouteKey
channel.QueueDeclare(queueName, false, false, false, null);
//此處消費通訊沒有必要綁定所有的RouteKey,根據前生產者通訊的路由規則,每個隊列中只會路由到一種消息
channel.QueueBind(queue: queueName,
exchange: "direct_exchange",
routingKey: "red");
channel.QueueBind(queue: queueName,
exchange: "direct_exchange",
routingKey: "yellow");
channel.QueueBind(queue: queueName,
exchange: "direct_exchange",
routingKey: "green");
Console.WriteLine(" [*] Waiting for messages.");
//5.實例化消費者
var consumer = new EventingBasicConsumer(channel);
//6.為消費者綁定消費委託事件
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
var routingKey = ea.RoutingKey;
Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message);
// 消費完成後需要手動簽收消息,如果不寫該程式碼就容易導致重複消費問題
//7.手動確認簽收消息
channel.BasicAck(ea.DeliveryTag, true); // 可以降低每次簽收性能損耗
};
// 消息簽收模式
// 手動簽收 保證正確消費,不會丟消息(基於客戶端而已)
// 自動簽收 容易丟消息
// 簽收:意味著消息從隊列中刪除
channel.BasicConsume(queue: queueName,
autoAck: false,
consumer: consumer);//設置為不自動簽收,進行手動簽收
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
五、模糊匹配隊列模式(Topic 主題模式)
-
一個消息生產者,一個交換機,多個隊列,多個消息消費者。一個交換機綁定多個消息隊列,每個消息隊列都有自己唯一的Routekey,每一個消息隊列有一個消費者監聽。
-
此時的自己唯一的Routekey,不是一個確定值,像我們熟悉的正則表達式對應的匹配規則。
-
生產者產生消息,把消息交給交換機,交換機根據RouteKey的模糊匹配到對應的隊列,由隊列監聽消費者消費消息。
-
規則:
和* 都是通配符,命名規則是多個單詞用頓號(.)分隔開
代表代表一個單詞
*代表多個單詞
- 生產者:
public static void SendMessage()
{
//1.創建連接
using (var connection = RabbitMQHelper.GetConnection())
{
//2.創建信道
using (var channel = connection.CreateModel())
{
//3.聲明交換機
channel.ExchangeDeclare("topic_exchange", "topic");
//4.聲明隊列
string queueName1 = "topic_queue1";
channel.QueueDeclare(queueName1, false, false, false, null);
string queueName2 = "topic_queue2";
channel.QueueDeclare(queueName2, false, false, false, null);
string queueName3 = "topic_queue3";
channel.QueueDeclare(queueName3, false, false, false, null);
//5.綁定到交互機
channel.QueueBind(queue: queueName1, exchange: "topic_exchange", routingKey: "user.data.*");
channel.QueueBind(queue: queueName2, exchange: "topic_exchange", routingKey: "user.data.delete");
channel.QueueBind(queue: queueName3, exchange: "topic_exchange", routingKey: "user.data.update");
for (int i = 0; i < 10; i++)
{
//6.準備發送位元組數組
string message = $"RabbitMQ Topic {i + 1} Delete Message";
var body = Encoding.UTF8.GetBytes(message);
//7.根據RouteKey發布消息
channel.BasicPublish("topic_exchange", "user.data.delete", null, body);//會發布到queueName1,queueName2
Console.WriteLine("Send Topic {0} message", i + 1);
}
}
}
}
- 消費者:
public static void ConsumerMessage()
{
//1.創建連接
var connection = RabbitMQHelper.GetConnection();
//2.創建通訊
var channel = connection.CreateModel();
//3.聲明交換機
channel.ExchangeDeclare(exchange: "topic_exchange", type: "topic");
//4.聲明隊列
var queueName = "topic_queue3";
channel.QueueDeclare(queueName, false, false, false, null);
//5.綁定交換機
channel.QueueBind(queue: queueName,
exchange: "topic_exchange",
routingKey: "user.data.*");
Console.WriteLine(" [*] Waiting for messages.");
//6.創建消費者
var consumer = new EventingBasicConsumer(channel);
//7.綁定消費委託事件
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
var routingKey = ea.RoutingKey;
Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message);
};
//8.啟動消費
channel.BasicConsume(queue: queueName,
autoAck: true,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
六、RPC 模式(了解)
RPC即客戶端遠程調用服務端的方法 ,使用MQ可以實現RPC的非同步調用,基於Direct交換機實現,流程如下:
1、客戶端即是生產者也是消費者,向RPC請求隊列發送RPC調用消息,同時監聽RPC響應隊列。
2、服務端監聽RPC請求隊列的消息,收到消息後執行服務端的方法,得到方法返回的結果。
3、服務端將RPC方法 的結果發送到RPC響應隊列。
4、客戶端(RPC調用方)監聽RPC響應隊列,接收到RPC調用結果。