乾貨!基於SpringBoot的RabbitMQ多種模式隊列實戰

環境準備


安裝RabbitMQ

由於RabbitMQ的安裝比較簡單,這裡不再贅述。可自行到官網下載//www.rabbitmq.com/download.html

依賴

SpringBoot項目導入依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
連接配置

配置文件添加如下配置(根據自身情況修改配置)

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#spring.rabbitmq.virtual-host=acelin

五種隊列模式實現


1 點對點的隊列

在java配置文件DirectRabbitConfig中先聲明一個隊列用於接收資訊

public static final String PEER_TO_PEER_QUEUE = "peer-to-peer-queue"; // 點對點隊列

/******************************* Peer-to-peer ******************************/
@Bean
public Queue peer2peerQueue() {
    return new Queue(PEER_TO_PEER_QUEUE,true);
}

創建一個消費者類Peer2PeerConsumers。用@RabbitListener對聲明的隊列進行監聽

@Component
public class Peer2PeerConsumers extends Base {

    @RabbitListener(queues = DirectRabbitConfig.PEER_TO_PEER_QUEUE)
    public void consumer2(Object testMessage) {
        logger.debug("peer-to-peer消費者收到消息  : " + testMessage.toString());
    }
}

創造一個消息生產者。在編碼形式上,直接把消息發發送給接收的消息隊列

/**
 * 【點對點模式】
 * @param task 消息內容
 **/
@PostMapping("/peer-to-peer/{task}")
public String peerToPeer(@PathVariable("task") String task){
    rabbitTemplate.convertAndSend(DirectRabbitConfig.PEER_TO_PEER_QUEUE,task);
    return "ok";

}

啟動項目。隊列綁定到默認交換機

image

調用生產者介面產生消息,可看到的消費者立即接收到資訊

peer-to-peer消費者收到消息  : (Body:'hi mq' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=peer-to-peer-queue, deliveryTag=1, consumerTag=amq.ctag-vuKWCYLNLn3GwRJKJO5-Mg, consumerQueue=peer-to-peer-queue])

這裡要說明一點的是,點對點模式雖然編碼形式只與隊列交互,但其本質上還是要跟交換機交互的,本質跟下面要介紹的路由模式其實是一樣的。

查看convertAndSend方法的源碼,可以看到我們雖然沒有進行交換機和隊列的綁定,發送消息是也沒指定交換機,但是程式會為我們綁定默認的交換機。

The default exchange is implicitly bound to every queue, with a routing key equal to the queue name. It is not possible to explicitly bind to, or unbind from the default exchange. It also cannot be deleted.

默認交換機會隱式綁定到每個隊列,路由鍵等於隊列名稱。我們無法明確綁定到默認交換機或從默認交換中解除綁定。它也無法刪除。

且我們第一個參數傳遞的是隊列的名稱,但實際上程式是以這個名字作為路由,將同名隊列跟默認交換機做綁定。所以的消息會根據該路由資訊,通過默認交換機分發到同名隊列上。(我們通過接收的資訊receivedRoutingKey=peer-to-peer-queueconsumerQueue=peer-to-peer-queue也可以看的出來)

2 工作隊列模式Work Queue

在java配置文件DirectRabbitConfig中先聲明一個工作隊列

public static final String WORK_QUEUE = "work-queue"; // 工作隊列


/******************************* Work Queue ******************************/
@Bean
public Queue workQueue() {
    return new Queue(WORK_QUEUE,true);
}

創建一個消費者類WorkConsumers。同樣用@RabbitListener對聲明的隊列進行監聽

@Component
public class WorkConsumers extends Base {

    @RabbitListener(queues = DirectRabbitConfig.WORK_QUEUE)
    public void consumer1(Object testMessage) {
        logger.debug("work消費者[1]收到消息  : " + testMessage.toString());
    }

    @RabbitListener(queues = DirectRabbitConfig.WORK_QUEUE)
    public void consumer2(Object testMessage) {
        logger.debug("work消費者[2]收到消息  : " + testMessage.toString());
    }
}

創造一個消息生產者。在編碼形式上,直接把消息發發送給接收的消息隊列

/**
 * 【工作隊列模式】
 * @param task 消息內容
 **/
@PostMapping("/work/{task}")
public String sendWorkMessage(@PathVariable("task") String task){
	
	rabbitTemplate.convertAndSend(DirectRabbitConfig.WORK_QUEUE,task);
	return "ok";

}

啟動項目,同樣的,工作隊列也是綁定到默認交換機。

image

調用生產者介面連續發送幾次消息,可看到兩個消費者競爭對隊列消息進行消費,一條消息只被一個消費者消費,不會出現重複消費的情況,因此工作隊列模式也被稱為競爭消費者模式。

- work消費者[1]收到消息  : (Body:'task1' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=work-queue, deliveryTag=1, consumerTag=amq.ctag-PUYjfVq56aEn-7a9DzLNzQ, consumerQueue=work-queue])

- work消費者[2]收到消息  : (Body:'task2' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=work-queue, deliveryTag=1, consumerTag=amq.ctag-1IVtDalFUCKVvYpFr_GF8A, consumerQueue=work-queue])

- work消費者[1]收到消息  : (Body:'task3' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=work-queue, deliveryTag=2, consumerTag=amq.ctag-PUYjfVq56aEn-7a9DzLNzQ, consumerQueue=work-queue])

- work消費者[2]收到消息  : (Body:'task4' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=work-queue, deliveryTag=2, consumerTag=amq.ctag-1IVtDalFUCKVvYpFr_GF8A, consumerQueue=work-queue])

事實上,競爭消費者模式本質就是多個消費者對同一個隊列消息進行消費。另外,與點對點模式一樣,工作隊列模式的也是用到了默認交換機進行消息分發。因此於基於的Direct交換機的路由模式的原理本質上都是一樣的,因此,某種程度上,我們也可以用路由模式實現工作隊列模式,這點我們下面介紹路由模式再進行展開

3 路由模式Routing

在java配置文件DirectRabbitConfig中先聲明2個隊列和一個direct類型的交換機,然後將隊列1和與交換機用一個路由鍵1進行綁定,隊列2用路由鍵2與隊列進行綁定

public static final String DIRECT_QUEUE_ONE = "directQueue-1"; // Direct隊列名稱1
public static final String DIRECT_QUEUE_TWO = "directQueue-2"; // Direct隊列名稱2

public static final String MY_DIRECT_EXCHANGE = "myDirectExchange"; // Direct交換機名稱

public static final String ROUTING_KEY_ONE = "direct.routing-key-1"; // direct路由標識1
public static final String ROUTING_KEY_ONE = "direct.routing-key-2"; // direct路由標識2

/******************************* Direct ******************************/
@Bean
public Queue directQueueOne() {
    return new Queue(DIRECT_QUEUE_ONE,true);
}

@Bean
public Queue directQueueTwo() {
    return new Queue(DIRECT_QUEUE_TWO,true);
}

@Bean
public DirectExchange directExchange() {
    return new DirectExchange(MY_DIRECT_EXCHANGE,true,false);
}

@Bean
public Binding bindingDirectOne() {
    return BindingBuilder.bind(directQueueOne()).to(directExchange()).with(ROUTING_KEY_ONE);
}

@Bean
public Binding bindingDirectTwo() {
    return BindingBuilder.bind(directQueueTwo()).to(directExchange()).with(ROUTING_KEY_TWO);
}

創建一個消費者類DirectConsumers。在每個消費者上,我們用3個消費者註解@RabbitListener對聲明的隊列進行監聽。消費者1和3監聽隊列1,消費者2監聽隊列2

@Component
public class DirectConsumers extends Base {

    @RabbitListener(queues = DirectRabbitConfig.DIRECT_QUEUE_ONE)
    public void consumer1(Object testMessage) {
        logger.debug("Direct消費者[1]收到消息  : " + testMessage.toString());
    }

    @RabbitListener(queues = DirectRabbitConfig.DIRECT_QUEUE_TWO)
    public void consumer2(Object testMessage) {
        logger.debug("Direct消費者[2]收到消息  : " + testMessage.toString());
    }

    @RabbitListener(queues = DirectRabbitConfig.DIRECT_QUEUE_ONE)
    public void consumer3(Object testMessage) {
        logger.debug("Direct消費者[3]收到消息  : " + testMessage.toString());
    }
}

創造一個消息生產者。發送消息時,帶上路由鍵1資訊

/**
 * 【Direct路由模式】
 * @param message 消息內容
 **/
@PostMapping("/direct/{message}")
public String sendDirectMessage(@PathVariable("message") String message) {

    Map<String, Object> map = new HashMap<>();
    map.put("messageId", String.valueOf(UUID.randomUUID()));
    map.put("messageData", message);

    /* 設置路由標識MY_ROUTING_KEY,發送到交換機MY_DIRECT_EXCHANGE */
    rabbitTemplate.convertAndSend(DirectRabbitConfig.MY_DIRECT_EXCHANGE,DirectRabbitConfig.ROUTING_KEY_ONE, map);
    return "ok";
}

啟動項目,查看該交換機的綁定情況

image

發送多條資訊,可以看到,由於隊列2沒有通過路由鍵1跟交換機進行綁定,所以對於監控隊列2的消費者2,其無法結束到的帶有路由鍵1的消息,而消費者1和3則競爭消費隊列1的消息

- Direct消費者[3]收到消息  : (Body:'{messageId=54682b16-0142-46af-be0c-1156df1f27a7, messageData=msg-1}' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=myDirectExchange, receivedRoutingKey=direct.routing-key-1, deliveryTag=15, consumerTag=amq.ctag-CsuZL9KKByH9IDtqTKe-fg, consumerQueue=directQueue-1])

- Direct消費者[1]收到消息  : (Body:'{messageId=66cd296a-9a60-4458-8e87-72ed13f9964b, messageData=msg-2}' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=myDirectExchange, receivedRoutingKey=direct.routing-key-1, deliveryTag=2, consumerTag=amq.ctag-hWmdY04YuLL0O2rgeSlxsw, consumerQueue=directQueue-1])

- Direct消費者[3]收到消息  : (Body:'{messageId=48c0830e-2207-47ec-bd3e-a958fec48118, messageData=msg-3}' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=myDirectExchange, receivedRoutingKey=direct.routing-key-1, deliveryTag=16, consumerTag=amq.ctag-CsuZL9KKByH9IDtqTKe-fg, consumerQueue=directQueue-1])

我們如果對新增一個隊列3,通過路由鍵1與交換機進行綁定,消費者獨立監聽隊列3,那麼我們不難猜到,隊列3將和隊列1同樣拿到一條消息,相當於廣播的概念,但我們會發現如果要這麼做,似乎路由鍵無足輕重,因此rabbitmq提供了一種特殊的交換機來處理這種場景,不需要路由鍵的參與。我們接著往下看

4 發布/訂閱模式Publish/Subscribe

在java配置文件DirectRabbitConfig中先聲明Fanout交換機和兩隊列,並將兩個隊列與該交換機進行綁定

public static final String MY_FANOUT_EXCHANGE = "myFanoutExchange"; // Fanout交換機名稱

public static final String FANOUT_QUEUE_ONE = "fanout-queue-1"; // Fanout隊列名稱1
public static final String FANOUT_QUEUE_TWO = "fanout-queue-2"; // Fanout隊列名稱2

/******************************* Fanout ******************************/
@Bean
public Queue fanoutQueueOne() {
    return new Queue(FANOUT_QUEUE_ONE,true);
}

@Bean
public Queue fanoutQueueTwo() {
    return new Queue(FANOUT_QUEUE_TWO,true);
}

@Bean
public FanoutExchange fanoutExchange(){
    return new FanoutExchange(MY_FANOUT_EXCHANGE,true,false);
}

@Bean
public Binding bindingFanoutOne() {
    return BindingBuilder.bind(fanoutQueueOne()).to(fanoutExchange());
}

@Bean
public Binding bindingFanoutTwo() {
    return BindingBuilder.bind(fanoutQueueTwo()).to(fanoutExchange());
}

創建一個消費者類FanoutConsumers。創建兩個消費者,分表對兩個隊列進行監聽

@Component
public class FanoutConsumers extends Base {

    @RabbitListener(queues = DirectRabbitConfig.FANOUT_QUEUE_ONE)
    public void consumer1(Object testMessage) {
        logger.debug("FANOUT消費者[1]收到消息  : " + testMessage.toString());
    }

    @RabbitListener(queues = DirectRabbitConfig.FANOUT_QUEUE_TWO)
    public void consumer2(Object testMessage) {
        logger.debug("FANOUT消費者[2]收到消息  : " + testMessage.toString());
    }
}

創造一個消息生產者。將消息發送給Fanout交換機

/**
 * 【工作隊列模式】
 * @param task 消息內容
 **/
@PostMapping("/work/{task}")
public String sendWorkMessage(@PathVariable("task") String task){
	
	rabbitTemplate.convertAndSend(DirectRabbitConfig.WORK_QUEUE,task);
	return "ok";

}

啟動項目,我們可以看到交換機與兩個隊列進行了綁定,但是路由鍵那一欄是空的。

image

發送兩條消息。

/**
 * 【Fanout發布訂閱模式】
 * @param message 消息內容
 **/
@PostMapping("/fanout/{message}")
public String sendFanoutMessage(@PathVariable("message") String message) {

    Map<String, Object> map = new HashMap<>();
    map.put("messageId", String.valueOf(UUID.randomUUID()));
    map.put("messageData", message);

    /* 直接跟交換機MY_FANOUT_EXCHANGE交互 */
    rabbitTemplate.setExchange(DirectRabbitConfig.MY_FANOUT_EXCHANGE);
    rabbitTemplate.convertAndSend(map);
    return "ok";
}

可以看到,兩個消費者都拿到了同樣的數據,達到了廣播的效果。

- FANOUT消費者[2]收到消息  : (Body:'{messageId=a4bf1931-1db8-4cb9-8b01-397f43a82660, messageData=Hi Fanout}' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=myFanoutExchange, receivedRoutingKey=, deliveryTag=1, consumerTag=amq.ctag-ncVmsRM7xHLZ0iAJT2tSTg, consumerQueue=fanout-queue-2])

- FANOUT消費者[1]收到消息  : (Body:'{messageId=a4bf1931-1db8-4cb9-8b01-397f43a82660, messageData=Hi Fanout}' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=myFanoutExchange, receivedRoutingKey=, deliveryTag=1, consumerTag=amq.ctag-zR3Oi0MVESq8qushlAMa3Q, consumerQueue=fanout-queue-1])

- FANOUT消費者[1]收到消息  : (Body:'{messageId=51f66720-35dd-4abf-9d33-24acf7786ed8, messageData=666}' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=myFanoutExchange, receivedRoutingKey=, deliveryTag=2, consumerTag=amq.ctag-zR3Oi0MVESq8qushlAMa3Q, consumerQueue=fanout-queue-1])

- FANOUT消費者[2]收到消息  : (Body:'{messageId=51f66720-35dd-4abf-9d33-24acf7786ed8, messageData=666}' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=myFanoutExchange, receivedRoutingKey=, deliveryTag=2, consumerTag=amq.ctag-ncVmsRM7xHLZ0iAJT2tSTg, consumerQueue=fanout-queue-2])

5 通配符模式Topics

在java配置文件DirectRabbitConfig中先聲明一個Topic交換機、兩個工作隊列和三個通配綁定鍵,其中一個隊列通過兩個不同通配綁定鍵與交換機綁定,另外一個隊列用第三個綁定鍵進行綁定。

public static final String WORK_QUEUE = "work-queue"; // 工作隊列


/******************************* Work Queue ******************************/
@Bean
public Queue workQueue() {
    return new Queue(WORK_QUEUE,true);
}

通過rabbitmq管理頁面我們可以看到交換機與隊列的綁定變化,可以看到隊列1車工綁定了兩個通配鍵

image

創建一個消費者類TopicConsumers。創建兩個消費者分別對兩個隊列做監聽。

@Component
public class WorkConsumers extends Base {

    @RabbitListener(queues = DirectRabbitConfig.WORK_QUEUE)
    public void consumer1(Object testMessage) {
        logger.debug("work消費者[1]收到消息  : " + testMessage.toString());
    }

    @RabbitListener(queues = DirectRabbitConfig.WORK_QUEUE)
    public void consumer2(Object testMessage) {
        logger.debug("work消費者[2]收到消息  : " + testMessage.toString());
    }
}

創造一個消息生產者。發送3條不同的消息,分別帶上三個不同的路由鍵

/**
 * 【Topic通配符模式】
 * @param message 消息內容
 **/
@PostMapping("/topic/{message}")
public String sendTopicMessage(@PathVariable("message") String message) {

    Map<String, Object> map = new HashMap<>();

    /* 直接跟交換機MY_FANOUT_EXCHANGE交互 */
    rabbitTemplate.setExchange(DirectRabbitConfig.MY_TOPIC_EXCHANGE);

    map.put("messageId", String.valueOf(UUID.randomUUID()));
    map.put("messageData", message + "TEST1");
    rabbitTemplate.convertAndSend(DirectRabbitConfig.TOPIC_ROUTING_KEY_ONE,map);

    map.put("messageId", String.valueOf(UUID.randomUUID()));
    map.put("messageData", message + "TEST2");
    rabbitTemplate.convertAndSend(DirectRabbitConfig.TOPIC_ROUTING_KEY_TWO,map);

    map.put("messageId", String.valueOf(UUID.randomUUID()));
    map.put("messageData", message + "TEST3");
    rabbitTemplate.convertAndSend(DirectRabbitConfig.TOPIC_ROUTING_KEY_THREE,map);

    return "ok";
}

路由鍵聲明如下:

public static final String TOPIC_ROUTING_KEY_ONE = "topic.a1.b1.c1"; // topic路由鍵1
public static final String TOPIC_ROUTING_KEY_TWO = "topic.a1.b1";    // topic路由鍵2
public static final String TOPIC_ROUTING_KEY_THREE = "topic.a2.b1";  // topic路由鍵3

啟動項目,調用生產者的介面,查看兩個消費者的消費情況。

- TOPIC消費者[2]收到消息  : (Body:'{messageId=82abd282-1110-4f1a-b09e-ae9a43c560c3, messageData=hi topic! TEST1}' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=myTopicExchange, receivedRoutingKey=topic.a1.b1.c1, deliveryTag=1, consumerTag=amq.ctag-wlRVC5xWiN8glrtA2_i6uA, consumerQueue=topic-queue-2])

- TOPIC消費者[1]收到消息  : (Body:'{messageId=b2039557-75d8-47d5-93a0-2a03a38fabc7, messageData=hi topic! TEST2}' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=myTopicExchange, receivedRoutingKey=topic.a1.b1, deliveryTag=1, consumerTag=amq.ctag-F6ByjknEnCjh7XVolNfmcg, consumerQueue=topic-queue-1])

- TOPIC消費者[2]收到消息  : (Body:'{messageId=b2039557-75d8-47d5-93a0-2a03a38fabc7, messageData=hi topic! TEST2}' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=myTopicExchange, receivedRoutingKey=topic.a1.b1, deliveryTag=2, consumerTag=amq.ctag-wlRVC5xWiN8glrtA2_i6uA, consumerQueue=topic-queue-2])

- TOPIC消費者[1]收到消息  : (Body:'{messageId=3a8f3164-706f-4523-bd2a-4fee73595fbb, messageData=hi topic! TEST3}' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=myTopicExchange, receivedRoutingKey=topic.a2.b1, deliveryTag=2, consumerTag=amq.ctag-F6ByjknEnCjh7XVolNfmcg, consumerQueue=topic-queue-1])

可以看到,路由鍵前綴為topic.a1的資訊都可以被綁定了topic.a1.#的隊列接收到,而綁定了topic.a1.*的隊列只能接收到topic.a1後面帶一個單詞的資訊,由於隊列1還通過topic.*.b1綁定交換機,因此攜帶路由鍵"topic.a2.b1"的資訊同樣也被隊列1接收

topic交換機是direct交換機做的改造的。兩者的區別主要體現在路由鍵和綁定鍵格式上的限制不同。

路由鍵:必須是由點分隔的單詞列表。單詞形式不限。比如一個主題建:<主題1>.<主題2>.<主題3>

綁定鍵:格式上和路由鍵一致,但多了兩個通配符*##代表任意數量的單詞,包括0個。*標識一個單詞。

使用上,一個綁定鍵,我們可以看成是對一類具有多個特徵的物體的一個抽象,由點分割的每個單詞,我們可以看成一個主題或是一個特徵。因此只要做好消息特徵的歸納抽象,加上通配符的使用,我們就有很高的自由度去處理任意類型的消息

總結


以上就是關於RabbitMQ五種隊列模式的實戰演練,關於RabbitMQ其它實戰與知識理解後續會相繼分享,感興趣的同學歡迎留言討論

Tags: