《RabbitMQ》什麼是死信隊列
一 什麼是死信隊列
當一條消息在隊列中出現以下三種情況的時候,該消息就會變成一條死信。
- 消息被拒絕(basic.reject / basic.nack),並且requeue = false
- 消息TTL過期
- 隊列達到最大長度
當消息在一個隊列中變成一個死信之後,如果配置了死信隊列,它將被重新publish到死信交換機,死信交換機將死信投遞到一個隊列上,這個隊列就是死信隊列。
二 實現死信隊列
2.1 原理圖
2.2 創建消費者
創建一個消費者,綁定消費隊列及死信交換機,交換機默認為direct
模型,死信交換機也是,arguments綁定死信交換機和key。(註解支援的具體參數文末會附上)
public class DirectConsumer {
@RabbitListener(bindings = {
@QueueBinding(value = @Queue(value = "javatrip",arguments =
{@Argument(name="x-dead-letter-exchange",value = "deadExchange"),
@Argument(name="x-dead-letter-routing-key",value = "deadKey")
}),
exchange = @Exchange(value="javatripDirect"),
key = {"info","error","warning"}
)
})
public void receive1(String message, @Headers Map<String,Object> headers, Channel channel)throws Exception{
System.out.println("消費者1"+message);
}
2.3 創建生產者
public void publishMessage(String message){
rabbitTemplate.setMandatory(true);
rabbitTemplate.convertAndSend("javatripDirect","info",message);
}
三 造成死信的三種情況
3.1 拒絕消息,並且禁止重新入隊
- 設置yml為手動簽收模式
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
- 設置拒絕消息並禁止重新入隊
Long deliverTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
channel.basicNack(deliverTag,false,false);
- 綁定死信隊列
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(value = "javatripDead"),
exchange = @Exchange(value = "deadExchange"),
key = "deadKey"
)
})
public void receive2(String message){
System.out.println("我是一條死信:"+message);
}
3.2 消息TTL過期
綁定業務隊列的時候,增加消息的過期時長,當消息過期後,消息將被轉發到死信隊列中。
@RabbitListener(bindings = {
@QueueBinding(value = @Queue(value = "javatrip",arguments =
{@Argument(name="x-dead-letter-exchange",value = "deadExchange"),
@Argument(name="x-dead-letter-routing-key",value = "deadKey"),
@Argument(name = "x-message-ttl",value = "3000")
}),
exchange = @Exchange(value="javatripDirect"),
key = {"info","error","warning"}
)
})
public void receive1(String message, @Headers Map<String,Object> headers, Channel channel)throws Exception{
System.out.println("消費者1"+message);
}
3.3 隊列達到最大長度
設置消息隊列長度,當隊列中的消息達到最大長度後,繼續發送消息,消息將被轉發到死信隊列中。
@RabbitListener(bindings = {
@QueueBinding(value = @Queue(value = "javatrip",arguments =
{@Argument(name="x-dead-letter-exchange",value = "deadExchange"),
@Argument(name="x-dead-letter-routing-key",value = "deadKey"),
@Argument(name = "x-max-length",value = "3")
}),
exchange = @Exchange(value="javatripDirect"),
key = {"info","error","warning"}
)
})
public void receive1(String message, @Headers Map<String,Object> headers, Channel channel)throws Exception{
System.out.println("消費者1"+message);
}
四 Spring Boot整合RabbitMQ用到的幾個註解
- @QueueBinding作用就是將隊列和交換機進行綁定,主要有以下三個參數:
@Target({})
@Retention(RetentionPolicy.RUNTIME)
public @interface QueueBinding {
/**
* @return the queue.
*/
Queue value();
/**
* @return the exchange.
*/
Exchange exchange();
/**
* @return the routing key or pattern for the binding.
* Multiple elements will result in multiple bindings.
*/
String[] key() default {};
}
- @Queue是聲明隊列及隊列的一些屬性,主要參數如下:
@Target({})
@Retention(RetentionPolicy.RUNTIME)
public @interface Queue {
/**
* @return the queue name or "" for a generated queue name (default).
*/
@AliasFor("name")
String value() default "";
/**
* @return the queue name or "" for a generated queue name (default).
* @since 2.0
*/
@AliasFor("value")
String name() default "";
/**
* 是否持久化
*/
String durable() default "";
/**
* 是否獨享、排外的.
*/
String exclusive() default "";
/**
* 是否自動刪除;
*/
String autoDelete() default "";
/**
* 隊列的其他屬性參數
* (1)x-message-ttl:消息的過期時間,單位:毫秒;
*(2)x-expires:隊列過期時間,隊列在多長時間未被訪問將被刪除,單位:毫秒;
*(3)x-max-length:隊列最大長度,超過該最大值,則將從隊列頭部開始刪除消息;
*(4)x-max-length-bytes:隊列消息內容佔用最大空間,受限於記憶體大小,超過該閾值則從隊列頭部開始刪除消息;
*(5)x-overflow:設置隊列溢出行為。這決定了當達到隊列的最大長度時消息會發生什麼。有效值是drop-head、reject-publish或reject-publish-dlx。仲裁隊列類型僅支援drop-head;
*(6)x-dead-letter-exchange:死信交換器名稱,過期或被刪除(因隊列長度超長或因空間超出閾值)的消息可指定發送到該交換器中;
*(7)x-dead-letter-routing-key:死信消息路由鍵,在消息發送到死信交換器時會使用該路由鍵,如果不設置,則使用消息的原來的路由鍵值
*(8)x-single-active-consumer:表示隊列是否是單一活動消費者,true時,註冊的消費組內只有一個消費者消費消息,其他被忽略,false時消息循環分發給所有消費者(默認false)
*(9)x-max-priority:隊列要支援的最大優先順序數;如果未設置,隊列將不支援消息優先順序;
*(10)x-queue-mode(Lazy mode):將隊列設置為延遲模式,在磁碟上保留儘可能多的消息,以減少RAM的使用;如果未設置,隊列將保留記憶體快取以儘可能快地傳遞消息;
*(11)x-queue-master-locator:在集群模式下設置鏡像隊列的主節點資訊。
*/
Argument[] arguments() default {};
}
- @Exchange是聲明交換及交換機的一些屬性,
@Target({})
@Retention(RetentionPolicy.RUNTIME)
public @interface Exchange {
String TRUE = "true";
String FALSE = "false";
/**
* @return the exchange name.
*/
@AliasFor("name")
String value() default "";
/**
* @return the exchange name.
* @since 2.0
*/
@AliasFor("value")
String name() default "";
/**
* 交換機類型,默認DIRECT
*/
String type() default ExchangeTypes.DIRECT;
/**
* 是否持久化
*/
String durable() default TRUE;
/**
* 是否自動刪除
*/
String autoDelete() default FALSE;
/**
* @return the arguments to apply when declaring this exchange.
* @since 1.6
*/
Argument[] arguments() default {};
}