.Net RabbitMQ實戰指南——進階(一)
備份交換器
備份交換器,英文名稱為Alternate Exchange,簡稱AE。通過在聲明交換器(調用channel.ExchangeDeclare方法)時添加alternate-exchange參數來實現。
備份交換器工作流程如下:
using (var channel = connection.CreateModel()) { //設置備胎交換器參數 var arguments = new Dictionary<string, object>(); arguments.Add("alternate-exchange","myAe"); channel.ExchangeDeclare("normalExchange", "direct",true,false, arguments); channel.ExchangeDeclare("myAe", "fanout", true, false); channel.QueueDeclare("nromalQueue",true,false,false); channel.QueueBind("nromalQueue", "normalExchange", "normalKey"); channel.QueueDeclare("unroutedQueue", true, false, false); channel.QueueBind("unroutedQueue", "myAe","ae"); var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2; string message = "RabbitMQ Test"; //傳遞的消息內容 channel.BasicPublish("normalExchange", "normalKey", properties, Encoding.UTF8.GetBytes(message)); //生產消息 channel.BasicPublish("normalExchange", "un-routkey", properties, Encoding.UTF8.GetBytes(message)); //生產消息 Console.WriteLine($"Send:{message}"); }
程式碼中聲明了兩個交換器normalExchange和myAe,分別綁定了normalQueue和unroutedQueue這兩個隊列,同時將myAe設置為normalExchange的備份交換器。myAe的交換器類型為fanout。同時生成兩條消息,其中「un-routkey」並沒有定義對應路由。
運行效果:
切換到queues視圖,可以看到兩個隊列分別有一條需要消費的消息。
備份交換器與普通的交換器沒有太大的區別,為了方便使用,建議設置為fanout類型
過期時間(TTL)
TTL,Time to Live的簡稱,即過期時間。RabbitMQ可以對隊列和消息設置TTL。
設置隊列的TTL
channel.QueueDeclare方法中的x-expires參數可以設置隊列被自動刪除前處於未使用狀態的時間。未使用是指隊列上沒有任何的消費者,隊列也沒有被重新聲明過,並且其間內也未調用過Basic.Get命令。
var arguments = new Dictionary<string, object>(); arguments.Add("x-expires", 10000); //單位毫秒 channel.QueueDeclare(arguments: arguments);
RabbitMQ重啟後,持久化的隊列的過期時間會被重新計算。
設置消息的TTL
消息的過期時間有兩種設置方式。第一種是通過隊列屬性設置,隊列中所有消息都有相同的過期時間。第二種方法是對消息本身進行設置。
如果兩種方法一起使用,則消息的TTL以兩者之間較小的那個數值為準。消息在隊列中的生存時間一旦超過設置的TTL值,就會變成「死信」(Dead Message)
通過隊列屬性設置消息TTL的方法是在channel.QueueDeclare方法中加入x-message-ttl參數實現的,這個參數的單位是毫秒。
arguments.Add("x-message-ttl", 6000); //單位毫秒 channel.QueueDeclare(arguments: arguments);
對消息本身進行過期時間設置前面程式碼有涉及過,是通過BasicPublish方法的basicProperties參數指定。
var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 1; properties.Priority = 2; properties.Expiration = "6000"; var message = "RabbitMQ Test"; //傳遞的消息內容 channel.BasicPublish("", "stacking", properties, Encoding.UTF8.GetBytes(message)); //生產消息
死信隊列
DLX,全稱為Dead-Letter-Exchange,即死信交換器。當一個消息在隊列中變成死信(dead message),它就會被重新被發送到DLX(死信交換器),綁定DLX的隊列就稱之為死信隊列。
通過在channel.QueueDeclare方法中設置x-dead-letter-exchange參數來為這個隊列添加DLX.
示例程式碼:
var arguments = new Dictionary<string, object>(); arguments.Add("x-expires", 10000); //單位毫秒 arguments.Add("x-message-ttl", 6000); //單位毫秒 channel.QueueDeclare(arguments: arguments); channel.ExchangeDeclare("dlx_exchange", "direct"); var argumentsDlx = new Dictionary<string, object>(); argumentsDlx.Add("x-dead-letter-exchange", "dlx_exchange"); argumentsDlx.Add("x-dead-letter-routing-key", "dlx-routing-key"); //為DLX指定路由鍵,如果沒有特殊指定,則使用原隊列的路由鍵 channel.QueueDeclare("dlx_queue",false,false,false,argumentsDlx);
可以看到dlx_queue有標記了DLX和DLK(Dead-Letter-Key)。
DLX是一個非常有用的特性。異常情況下,消息不能夠被消費者正確消費(消費者調用了Basic.Nack或者Basic.Reject)而被置入死信隊列中,後續分析程式可以通過死信隊列中的內容來分析當時所遇到的異常情況,從而改善和優化系統。
延遲隊列
延遲隊列存儲的是對應的延遲消息,「延遲消息」並不想讓消費者立刻消息,而是等待特定時間後,消費者才能拿到這個消息進行消費。如一個訂單系統中,用戶下單30分鐘內沒有完成支付那麼就對該訂單進行異常處理,就可以使用延遲隊列來處理這些訂單。
RabbitMQ並支援延遲隊列的功能,但是我們可以通過DLX和TTL模擬出延遲隊列的功能。如下圖所示:
實例中設置了5秒、10秒、30秒、1分鐘四個延時等級。根據應用需求的不同,生產者在發送消息的時候設置不同的路由鍵,從而將消息發送到與交換器綁定的對應隊列中。這些隊列分別設置了過期時間為5秒、10秒、30秒、1分鐘,同時也分別配置了DLX和相應的死信隊列。隊列中消息過期時,就會轉存到相應的死信隊列(即延遲隊列)中,在根據業務自身的情況,分別選擇不同延遲等級的延遲隊列進行消費。
優先順序隊列
優先順序隊列,有高優先順序的隊列具有高的優先權,優先順序高的消息有優先被消費的特權。
可以通過設置隊列的x-max-priority參數來實現。並在發送消息時通過Priority設置消息優先順序。
using (var channel = connection.CreateModel()) { channel.ExchangeDeclare("priority_exchange", "direct"); var arguments = new Dictionary<string, object>(); arguments.Add("x-max-priority", 10); // channel.QueueDeclare("priority_queue", false, false, false, arguments); channel.QueueBind("priority_queue", "priority_exchange", "priority_key"); var properties = channel.CreateBasicProperties(); properties.Priority = 2; string message = "Priority 2"; //傳遞的消息內容 channel.BasicPublish("priority_exchange", "priority_key", properties, Encoding.UTF8.GetBytes(message)); //生產消息 properties.Priority = 5; message = "Priority 5"; //傳遞的消息內容 channel.BasicPublish("priority_exchange", "priority_key", properties, Encoding.UTF8.GetBytes(message)); //生產消息 var result = channel.BasicGet("priority_queue", false); channel.BasicAck(result.DeliveryTag, true); Console.WriteLine($"Received:{Encoding.UTF8.GetString(result.Body.ToArray())}"); }
運行上面的程式碼,不管我們如何調整生成消息的順序,channel.BasicGet取出來的始終是Priority為5的那條消息。
Github
示例程式碼地址://github.com/MayueCif/RabbitMQ