剛體驗完RabbitMQ?一文帶你SpringBoot+RabbitMQ方式收發消息
- 2020 年 8 月 10 日
- 筆記
- JAVA, RabbitMQ, springboot
人生終將是場單人旅途,孤獨之前是迷茫,孤獨過後是成長。
楔子
這篇是消息隊列RabbitMQ的第二彈。
上一篇的結尾我也預告了本篇的內容:利用RabbitTemplate和註解進行收發消息,還有一個我臨時加上的內容:消息的序列化轉換。
本篇會和SpringBoot做整合,採用自動配置的方式進行開發,我們只需要聲明RabbitMQ地址就可以了,關於各種創建連接關閉連接的事都由Spring幫我們了~
交給Spring幫我們管理連接可以讓我們專註於業務邏輯,就像聲明式事務一樣易用,方便又高效。
Tip:上一篇的程式碼都放在prototype
包下,本篇的程式碼都放在auto
包下面。
1. 🔍環境配置
第一節我們先來搞一下環境的配置,上一篇中我們已經引入了自動配置的包,我們既然使用了自動配置的方式,那RabbitMQ
的連接資訊我們直接放在配置文件中就行了,就像我們需要用到JDBC連接的時候去配置一下DataSource
一樣。
如圖所示,我們只需要指明一下連接的IP+埠號和用戶名密碼就行了,這裡我用的是默認的用戶名與密碼,不寫的話默認也都是guest,埠號也是默認5672。
主要我們需要看一下手動確認消息的配置,需要配置成manual
才是手動確認,日後還會有其他的配置項,眼下我們配置這一個就可以了。
接下來我們要配置一個Queue
,上一篇中我們往一個名叫erduo
的隊列中發送消息,當時是我們手動定義的此隊列,這裡我們也需要手動配置,聲明一個Bean
就可以了。
@Configuration
public class RabbitmqConfig {
@Bean
public Queue erduo() {
// 其三個參數:durable exclusive autoDelete
// 一般只設置一下持久化即可
return new Queue("erduo",true);
}
}
就這麼簡單聲明一下就可以了,當然了RabbitMQ
畢竟是一個獨立的組件,如果你在RabbitMQ
中通過其他方式已經創建過一個名叫erduo
的隊列了,你這裡也可以不聲明,這裡起到的一個效果就是如果你沒有這個隊列,會按照你聲明的方式幫你創建這個隊列。
配置完環境之後,我們就可以以SpringBoot的方式來編寫生產者和消費者了。
2. 📕生產者與RabbitTemplate
和上一篇的節奏一樣,我們先來編寫生產者,不過這次我要引入一個新的工具:RabbitTemplate
。
聽它的這個名字就知道,又是一個拿來即用的工具類,Spring家族這點就很舒服,什麼東西都給你封裝一遍,讓你用起來更方便更順手。
RabbitTemplate
實現了標準AmqpTemplate介面,功能大致可以分為發送消息和接受消息。
我們這裡是在生產者中來用,主要就是使用它的發送消息功能:send
和convertAndSend
方法。
// 發送消息到默認的Exchange,使用默認的routing key
void send(Message message) throws AmqpException;
// 使用指定的routing key發送消息到默認的exchange
void send(String routingKey, Message message) throws AmqpException;
// 使用指定的routing key發送消息到指定的exchange
void send(String exchange, String routingKey, Message message) throws AmqpException;
send
方法是發送byte數組的數據的模式,這裡代表消息內容的對象是Message
對象,它的構造方法就是傳入byte數組數據,所以我們需要把我們的數據轉成byte數組然後構造成一個Message
對象再進行發送。
// Object類型,可以傳入POJO
void convertAndSend(Object message) throws AmqpException;
void convertAndSend(String routingKey, Object message) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;
convertAndSend
方法是可以傳入POJO對象作為參數,底層是有一個MessageConverter
幫我們自動將數據轉換成byte類型或String或序列化類型。
所以這裡支援的傳入對象也只有三種:byte類型,String類型和實現了Serializable
介面的POJO。
介紹完了,我們可以看一下程式碼:
@Slf4j
@Component("rabbitProduce")
public class RabbitProduce {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send() {
String message = "Hello 我是作者和耳朵,歡迎關注我。" + LocalDateTime.now().toString();
System.out.println("Message content : " + message);
// 指定消息類型
MessageProperties props = MessagePropertiesBuilder.newInstance()
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN).build();
rabbitTemplate.send(Producer.QUEUE_NAME,new Message(message.getBytes(StandardCharsets.UTF_8),props));
System.out.println("消息發送完畢。");
}
public void convertAndSend() {
User user = new User();
System.out.println("Message content : " + user);
rabbitTemplate.convertAndSend(Producer.QUEUE_NAME,user);
System.out.println("消息發送完畢。");
}
}
這裡我特意寫明了兩個例子,一個用來測試send,另一個用來測試convertAndSend。
send
方法里我們看下來和之前的程式碼是幾乎一樣的,定義一個消息,然後直接send,但是這個構造消息的構造方法可能比我們想的要多一個參數,我們原來說的只要把數據轉成二進位數組放進去即可,現在看來還要多放一個參數了。
MessageProperties
,是的我們需要多放一個MessageProperties
對象,從他的名字我們也可以看出它的功能就是附帶一些參數,但是某些參數是少不了的,不帶不行。
比如我的程式碼這裡就是設置了一下消息的類型,消息的類型有很多種可以是二進位類型,文本類型,或者序列化類型,JSON類型,我這裡設置的就是文本類型,指定類型是必須的,也可以為我們拿到消息之後要將消息轉換成什麼樣的對象提供一個參考。
convertAndSend
方法就要簡單太多,這裡我放了一個User對象拿來測試用,直接指定隊列然後放入這個對象即可。
Tips:User必須實現Serializable
介面,不然的話調用此方法的時候會拋出IllegalArgumentException
異常。
程式碼完成之後我們就可以調用了,這裡我寫一個測試類進行調用:
@SpringBootTest
public class RabbitProduceTest {
@Autowired
private RabbitProduce rabbitProduce;
@Test
public void sendSimpleMessage() {
rabbitProduce.send();
rabbitProduce.convertAndSend();
}
}
效果如下圖~
同時在控制台使用命令rabbitmqctl.bat list_queues
查看隊列-erduo
現在的情況:
如此一來,我們的生產者測試就算完成了,現在消息隊列里兩條消息了,而且消息類型肯定不一樣,一個是我們設置的文本類型,一個是自動設置的序列化類型。
3. 📗消費者與RabbitListener
既然隊列裡面已經有消息了,接下來我們就要看我們該如何通過新的方式拿到消息並消費與確認了。
消費者這裡我們要用到@RabbitListener
來幫我們拿到指定隊列消息,它的用法很簡單也很複雜,我們可以先來說簡單的方式,直接放到方法上,指定監聽的隊列就行了。
@Slf4j
@Component("rabbitConsumer")
public class RabbitConsumer {
@RabbitListener(queues = Producer.QUEUE_NAME)
public void onMessage(Message message, Channel channel) throws Exception {
System.out.println("Message content : " + message);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
System.out.println("消息已確認");
}
}
這段程式碼就代表onMessage
方法會處理erduo
(Producer.QUEUE_NAME是常量字元串”erduo”)隊列中的消息。
我們可以看到這個方法裡面有兩個參數,Message
和Channel
,如果用不到Channel
可以不寫此參數,但是Message
消息一定是要的,它代表了消息本身。
我們可以想想,我們的程式從RabbitMQ
之中拉回一條條消息之後,要以怎麼樣的方式展示給我們呢?
沒錯,就是封裝為一個個Message
對象,這裡面放入了一條消息的所有資訊,數據結構是什麼樣一會我一run你就能看到了。
同時這裡我們使用Channel
做一個消息確認的操作,這裡的DeliveryTag代表的是這個消息在隊列中的序號,這個資訊存放在MessageProperties
中。
4. 📖SpringBoot 啟動!
編寫完生產者和消費者,同時已經運行過生產者往消息隊列裡面放了兩條資訊,接下來我們可以直接啟動消息,查看消費情況:
在我紅色框線標記的地方可以看到,因為我們有了消費者所以項目啟動後先和RabbitMQ建立了一個連接進行監聽隊列。
隨後就開始消費我們隊列中的兩條消息:
第一條資訊是contentType=text/plain
類型,所以直接就在控制台上列印出了具體內容。
第二條資訊是contentType=application/x-java-serialized-object
,在列印的時候只列印了一個記憶體地址+位元組大小。
不管怎麼說,數據我們是拿到了,也就是代表我們的消費是沒有問題的,同時也都進行了消息確認操作,從數據上看,整個消息可以分為兩部分:body
和MessageProperties
。
我們可以單獨使用一個註解拿到這個body的內容 – @Payload
@RabbitListener(queues = Producer.QUEUE_NAME)
public void onMessage(@Payload String body, Channel channel) throws Exception {
System.out.println("Message content : " + body);
}
也可以單獨使用一個註解拿到MessageProperties
的headers屬性,headers屬性在截圖裡也可以看到,只不過是個空的 – @Headers。
@RabbitListener(queues = Producer.QUEUE_NAME)
public void onMessage(@Payload String body, @Headers Map<String,Object> headers) throws Exception {
System.out.println("Message content : " + body);
System.out.println("Message headers : " + headers);
}
這兩個註解都算是擴展知識,我還是更喜歡直接拿到全部,全都要!!!
上面我們已經完成了消息的發送與消費,整個過程我們可以再次回想一下,一切都和我畫的這張圖上一樣的軌跡:
只不過我們一直沒有指定Exchage
一直使用的默認路由,希望大家好好記住這張圖。
5. 📘@RabbitListener與@RabbitHandler
下面再來補一些知識點,有關@RabbitListener
與@RabbitHandler
。
@RabbitListener
上面我們已經簡單的進行了使用,稍微擴展一下它其實是可以監聽多個隊列的,就像這樣:
@RabbitListener(queues = { "queue1", "queue2" })
public void onMessage(Message message, Channel channel) throws Exception {
System.out.println("Message content : " + message);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
System.out.println("消息已確認");
}
還有一些其他的特性如綁定之類的,這裡不再贅述因為太硬編碼了一般用不上。
下面來說說這節要主要講的一個特性:@RabbitListener和@RabbitHandler的搭配使用。
前面我們沒有提到,@RabbitListener
註解其實是可以註解在類上的,這個註解在類上標誌著這個類監聽某個隊列或某些隊列。
這兩個註解的搭配使用就要讓@RabbitListener
註解在類上,然後用@RabbitHandler
註解在方法上,根據方法參數的不同自動識別並去消費,寫個例子給大家看一看更直觀一些。
@Slf4j
@Component("rabbitConsumer")
@RabbitListener(queues = Producer.QUEUE_NAME)
public class RabbitConsumer {
@RabbitHandler
public void onMessage(@Payload String message){
System.out.println("Message content : " + message);
}
@RabbitHandler
public void onMessage(@Payload User user) {
System.out.println("Message content : " + user);
}
}
大家可以看看這個例子,我們先用@RabbitListener
監聽erduo
隊列中的消息,然後使用@RabbitHandler
註解了兩個方法。
-
第一個方法的body類型是String類型,這就代表著這個方法只能處理文本類型的消息。
-
第二個方法的body類型是User類型,這就代表著這個方法只能處理序列化類型且為User類型的消息。
這兩個方法正好對應著我們第二節中測試類會發送的兩種消息,所以我們往RabbitMQ中發送兩條測試消息,用來測試這段程式碼,看看效果:
都在控制台上如常列印了,如果@RabbitHandler
註解的方法中沒有一個的類型可以和你消息的類型對的上,比如消息都是byte數組類型,這裡沒有對應的方法去接收,系統就會在控制台不斷的報錯,如果你出現這個情況就證明你類型寫的不正確。
假設你的erduo
隊列中會出現三種類型的消息:byte,文本和序列化,那你就必須要有對應的處理這三種消息的方法,不然消息發過來的時候就會因為無法正確轉換而報錯。
而且使用了@RabbitHandler
註解之後就不能再和之前一樣使用Message
做接收類型。
@RabbitHandler
public void onMessage(Message message, Channel channel) throws Exception {
System.out.println("Message content : " + message);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
System.out.println("消息已確認");
}
這樣寫的話會報類型轉換異常的,所以二者選其一。
同時上文我的@RabbitHandler
沒有進行消息確認,大家可以自己試一下進行消息確認。
6. 📙消息的序列化轉換
通過上文我們已經知道,能被自動轉換的對象只有byte[]
、String
、java序列化對象
(實現了Serializable介面的對象),但是並不是所有的Java對象都會去實現Serializable介面,而且序列化的過程中使用的是JDK自帶的序列化方法,效率低下。
所以我們更普遍的做法是:使用Jackson先將數據轉換成JSON格式發送給RabbitMQ
,再接收消息的時候再用Jackson將數據反序列化出來。
這樣做可以完美解決上面的痛點:消息對象既不必再去實現Serializable介面,也有比較高的效率(Jackson序列化效率業界應該是最好的了)。
默認的消息轉換方案是消息轉換頂層介面-MessageConverter
的一個子類:SimpleMessageConverter
,我們如果要換到另一個消息轉換器只需要替換掉這個轉換器就行了。
上圖是MessageConverter
結構樹的結構樹,可以看到除了SimpleMessageConverter
之外還有一個Jackson2JsonMessageConverter
,我們只需要將它定義為Bean,就可以直接使用這個轉換器了。
@Bean
public MessageConverter jackson2JsonMessageConverter() {
return new Jackson2JsonMessageConverter(jacksonObjectMapper);
}
這樣就可以了,這裡的jacksonObjectMapper
可以不傳入,但是默認的ObjectMapper
方案對JDK8的時間日期序列化會不太友好,具體可以參考我的上一篇文章:從LocalDateTime序列化探討全局一致性序列化,總的來說就是定義了自己的ObjectMapper
。
同時為了接下來測試方便,我又定義了一個專門測試JSON序列化的隊列:
@Bean
public Queue erduoJson() {
// 其三個參數:durable exclusive autoDelete
// 一般只設置一下持久化即可
return new Queue("erduo_json",true);
}
如此之後就可以進行測試了,先是生產者程式碼:
public void sendObject() {
Client client = new Client();
System.out.println("Message content : " + client);
rabbitTemplate.convertAndSend(RabbitJsonConsumer.JSON_QUEUE,client);
System.out.println("消息發送完畢。");
}
我又重新定義了一個Client
對象,它和之前測試使用的User對象成員變數都是一樣的,不一樣的是它沒有實現Serializable介面。
同時為了保留之前的測試程式碼,我又新建了一個RabbitJsonConsumer
,用於測試JSON序列化的相關消費程式碼,裡面定義了一個靜態變數:JSON_QUEUE = "erduo_json"
;
所以這段程式碼是將Client
對象作為消息發送到"erduo_json"
隊列中去,隨後我們在測試類中run一下進行一次發送。
緊著是消費者程式碼:
@Slf4j
@Component("rabbitJsonConsumer")
@RabbitListener(queues = RabbitJsonConsumer.JSON_QUEUE)
public class RabbitJsonConsumer {
public static final String JSON_QUEUE = "erduo_json";
@RabbitHandler
public void onMessage(Client client, @Headers Map<String,Object> headers, Channel channel) throws Exception {
System.out.println("Message content : " + client);
System.out.println("Message headers : " + headers);
channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false);
System.out.println("消息已確認");
}
}
有了上文的經驗之後,這段程式碼理解起來也是很簡單了吧,同時給出了上一節沒寫的如何在@RabbitHandler
模式下進行消息簽收。
我們直接來看看效果:
在列印的Headers裡面,往後翻可以看到contentType=application/json
,這個contentType
是表明了消息的類型,這裡正是說明我們新的消息轉換器生效了,將所有消息都轉換成了JSON類型。
後記
這兩篇講完了RabbitMQ
的基本收發消息,包括手動配置和自動配置的兩種方式,這些大家仔細研讀之後應該會對RabbitMQ
收發消息沒什麼疑問了~
不過我們一直以來發消息時都是使用默認的交換機,下篇將會講述一下RabbitMQ
的幾種交換機類型,以及其使用方式。
講完了交換機之後,這些RabbitMQ
的常用概念基本就完善了。
最近這段時間壓力挺大,優狐令我八月底之前升級到三級,所以各位讀者的贊對我很重要,希望大家能夠高抬貴手,幫我一哈~
好了,以上就是本期的全部內容,感謝你能看到這裡,歡迎對本文點贊收藏與評論,👍你們的每個點贊都是我創作的最大動力。
我是耳朵,一個一直想做知識輸出的偽文藝程式設計師,我們下期見。