RabbitMQ從概念到使用、從Docker安裝到RabbitMQ整合Springboot【1.5w字保姆級教學】

@

一、前言

我們先來聊聊消息中間件:
消息中間件利用高效可靠的消息傳遞機制進行平台無關的數據交流,並基於數據通訊來進行分散式系統的集成。通過提供消息傳遞和消息排隊模型,它可以在分散式環境下擴展進程間的通訊。(來自百度百科)

我們常見的中間件其實有很多種了,例如ActiveMQ、RabbitMQ、RocketMQ、Kafka、ZeroMQ等,其中應用最為廣泛的要數RabbitMQ、RocketMQ、Kafka 這三款。Redis在某種程度上也可以使用list或者Stream來實現消息隊列,但不能算是中間件哈!

如果大家對怎麼選型感興趣,可以看一下小編的這篇文章:四大MQ選型

今天小編帶著大家一起學習一下RabbitMQ,從入門到精通,從無到有!!小編沒有使用Windows安裝,很麻煩,所以使用Docker安裝。如果沒有安裝Docker的可以看一下小編的另一篇文章:Linux安裝Docker

小編其實也是通過雷神的課件和講解後,自己在整理一下,供以後學習和參考,在此感謝尚矽谷雷神哈!

小編覺得在說概念之前,應該知道他的作用,然後再系統的學習概念等!

二、RabbitMQ作用

其實作用還是挺多的,但是主要是以下三條:

  • 非同步處理
  • 應用解耦
  • 流量控制

下面我們進行一個個的簡單描述一下哈,我們還是拿被用了一萬次的例子和圖例哈!!

1. 非同步處理

用戶在某網站註冊成功後,需要向用戶發送郵件和資訊提示其註冊成功(其實沒什麼必要,但是例子說一下還可以,小編自己的理解哈!)。常規的做法是:後台將註冊資訊保存到資料庫,然後再給用戶發郵件發簡訊。

在這裡插入圖片描述
我們看到這樣非常的耗時,其實保存完成後,就可以登錄了,簡訊和郵件過一會接收也是沒有什麼問題的!或者發送失敗,用戶一直沒有收到,這都是沒什麼問題的,用戶已經登錄進去了,管你發不發簡訊,大家說對吧!!

既然存在問題,我們就是有消息隊列來解決這個問題:
我們可以在將註冊資訊保存資料庫之後,把要發送註冊郵件和發送簡訊的消息寫入消息隊列,然後就告知用戶註冊成功。發送郵件和簡訊將由訂閱了消息的應用非同步的去執行。這樣耗時的問題就解決了!!
在這裡插入圖片描述

2. 應用解耦

在大型電商項目中,會將訂單系統和庫存系統分成兩個不同的應用。然後進行服務與服務之間的調用,正常情況下用戶下單後訂單系統會調用庫存系統,然後返回給用戶顯示下單成功。
在這裡插入圖片描述
但是也存在問題,如果庫存系統掛了,這樣就會導致下單失敗;如果你是用戶,你會判斷這個產品不行,以後不用了!!

別著急,這位用戶,我們幫您解決哈:這時我們引入消息隊列進行解耦
在這裡插入圖片描述

現在有的同學會問,怎麼解決的呢?
別著急,小編來和你說一下哈!剛剛出錯的原因就是庫存系統掛了,改處理的請求沒有處理,所以下單失敗;我們引入消息隊列,就是把訂單消息寫入到消息隊列中,然後庫存系統訂閱我們的消息隊列;然後庫存系統去消息隊列中獲取消息,進行處理訂單,來完成減庫存的操作;如果失敗也會有重試機制,真的掛了,也可以持久化,等到庫存系統活了之後繼續處理!!一個宗旨,不能影響用戶的使用體驗呢!!

3. 流量控制

看名字就能知道,肯定是並發很大的情況才會出現的,不用想就是秒殺時刻了!
假設一瓶茅台2萬人搶,這是我們的系統可能會被打垮。所以我們把超過一定並發量時,把超過的請求放在消息隊列中,然後減緩系統壓力,然後慢慢處理;雖然可能降低一下用戶的體驗,但是秒殺就是這樣,只能有一部分人成功,我們要保證好系統可以正常運行哈!!

在這裡插入圖片描述

三、RabbitMQ概念

1. RabbitMQ簡介

RabbitMQ是一個由erlang開發的AMQP(Advanved Message Queue Protocol)的開源實現。
RabbitMQ 是部署最廣泛的開源消息代理。
RabbitMQ擁有數萬用戶,是最流行的開源消息代理之一。從T-Mobile到Runtastic,RabbitMQ在世界各地的小型初創公司和大型企業中使用。
RabbitMQ是輕量級的,易於在本地和雲中部署。它支援多種消息傳遞協議。RabbitMQ可以在分散式和聯合配置中部署,以滿足高規模、高可用性需求。
RabbitMQ運行在許多作業系統和雲環境上,並為最流行的語言提供了廣泛的開發工具。

2. 核心概念

Message
消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,
這些屬性包括routing-key(路由鍵)、priority(相對於其他消息的優先權)、delivery-mode(指出該消息可
能需要持久性存儲)等。

Publisher
消息的生產者,也是一個向交換器發布消息的客戶端應用程式。

Exchange
交換器,用來接收生產者發送的消息並將這些消息路由給伺服器中的隊列。
Exchange有4種類型:direct(默認),fanout, topic, 和headers,不同類型的Exchange轉發消息的策略有所區別Queue
消息隊列,用來保存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列裡面,等待消費者連接到這個隊列將其取走。

Binding
綁定,用於消息隊列和交換器之間的關聯。一個綁定就是基於路由鍵將交換器和消息隊列連接起來的路由規則,所以可以將交換器理解成一個由綁定構成的路由表。

Exchange 和Queue的綁定可以是多對多的關係

Connection
網路連接,比如一個TCP連接。

Channel
信道,多路復用連接中的一條獨立的雙向數據流通道。信道是建立在真實的TCP連接內的虛擬連接,AMQP 命令都是通過信道發出去的,不管是發布消息、訂閱隊列還是接收消息,這些動作都是通過信道完成。因為對於作業系統來說建立和銷毀 TCP 都是非常昂貴的開銷,所以引入了信道的概念,以復用一條 TCP 連接。

Consumer
消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程式。

Virtual Host
虛擬主機,表示一批交換器、消息隊列和相關對象。虛擬主機是共享相同的身份認證和加密環境的獨立伺服器域。每個 vhost 本質上就是一個 mini 版的 RabbitMQ 伺服器,擁有自己的隊列、交換器、綁定和許可權機制。vhost 是 AMQP 概念的基礎,必須在連接時指定,RabbitMQ 默認的 vhost 是 / 。
類似docker容器和容器之間是相互隔離的,一個壞了,不耽誤另一個使用

Broker
表示消息隊列伺服器實體。

總架構圖
在這裡插入圖片描述

四、JMS與AMQP比較

JMS(Java Message Service) AMQP(Advanced Message Queuing Protocol)
定義 Java api 網路線級協議
跨語言
跨平台
Model 提供兩種消息模型:
(1)、Peer-2-Peer
(2)、Pub/sub
提供了五種消息模型:
(1)、direct exchange
(2)、fanout exchange
(3)、topic change
(4)、headers exchange
(5)、system exchange
本質來講,後四種和JMS的pub/sub模型沒有太大差別,
僅是在路由機制上做了更詳細的劃分;
支援消息類
多種消息類型:
TextMessage
MapMessage
BytesMessage
StreamMessage
ObjectMessage
Message (只有消息頭和屬性)
byte[]
當實際應用時,有複雜的消息,可以將消息序列化後發
送。
實現中間件 ActiveMQ、HornetMQ RabbitMQ
綜合評價 JMS 定義了JAVA API層面的標準;在java體系中,
多個client均可以通過JMS進行交互,不需要應用修
改程式碼,但是其對跨平台的支援較差;
AMQP定義了wire-level層的協議標準;天然具有跨平
台、跨語言特性

五、RabbitMQ運行機制

AMQP 中消息的路由過程和 Java 開發者熟悉的 JMS 存在一些差別,AMQP 中增加了Exchange 和Binding的角色。生產者把消息發布到 Exchange 上,消息最終到達隊列並被消費者接收,而 Binding 決定交換器的消息應該發送到那個隊列

在這裡插入圖片描述
Exchange 類型

  • direct
  • fanout
  • topic
  • headers(不建議使用)

RabbitMQ默認七大交換機
全部交換機類型

1. direct

消息中的路由鍵(routing key)如果和Binding 中的 binding key 一致, 交換器就將消息發到對應的隊列中。路由鍵與隊列名完全匹配,如果一個隊列綁定到交換機要求路由鍵為”a1.b1″,則只轉發 routing key 標記為”a1.b1″的消息,不會轉發”a1.b2」,也不會轉發”a1.b3″ 等等。它是完全匹配、單播的模式

在這裡插入圖片描述

2. fanout

每個發到 fanout 類型交換器的消息都會分到所有綁定的隊列上去。fanout 交換器不處理路由鍵,只是簡單的將隊列綁定到交換器上,每個發送到交換器的消息都會被轉發到與該交換器綁定的所有隊列上。很像子網廣播,每檯子網內的主機都獲得了一份複製的消息。fanout 類型轉發消息是最快的、廣播

在這裡插入圖片描述

3. topic

topic是升級版的fanout模式,做了選擇權,並不是全都會接受,符合條件才會收到!topic 交換器通過模式匹配分配消息的路由鍵屬性,將路由鍵和某個模式進行匹配,此時隊列需要綁定到一個模式上。它將路由鍵和綁定鍵的字元串切分成單詞,這些單詞之間用點隔開。
它同樣也會識別兩個通配符:符號#和符號*#匹配0個或多個單詞,* 匹配一個單詞。

在這裡插入圖片描述

六、Docker安裝RabbitMQ

直接輸入命令,docker會幫助我們自動去拉去鏡像的:

docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 \
	-p 15671:15671 -p 15672:15672 rabbitmq:management

我們查詢是否運行成功

docker ps

在這裡插入圖片描述

我們在windows上進行測試是否能夠打開介面:
輸入://192.168.17.130:15672/
在這裡插入圖片描述
用戶名密碼都是:guest

進入介面:
在這裡插入圖片描述

七、整合Springboot

1. 引入依賴

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>
 <!--自定義消息轉化器Jackson2JsonMessageConverter所需依賴-->
 <dependency>
     <groupId>com.fasterxml.jackson.core</groupId>
     <artifactId>jackson-databind</artifactId>
 </dependency>

2. 主啟動類上添加註解

@EnableRabbit
@SpringBootApplication
public class GulimallOrderApplication {

    public static void main(String[] args) {
        SpringApplication.run(GulimallOrderApplication.class, args);
    }

}

3. 編寫配置文件

# 指定rabbitmq伺服器主機
spring.rabbitmq.host=192.168.17.130
# 帳號密碼埠號都默認配置了,我們無需配置

在這裡插入圖片描述

八、測試創建交換機、隊列、綁定關係

1. 測試創建Direct交換機

@Autowired
AmqpAdmin amqpAdmin;

@Test
public void createExchange() {
    // 第一個參數為交換機名字,第二個參數為是否持久化,第三個參數為不使用交換機時刪除
    DirectExchange directExchange = new DirectExchange("hello-java-exchange",true,false);
    amqpAdmin.declareExchange(directExchange);
    System.out.println("交換機創建成功");
}

2. 打開交換機介面查看

在這裡插入圖片描述

3. 創建Queue

@Autowired
AmqpAdmin amqpAdmin;

@Test
public void createQueue() {
    /**
     * 第一個參數為隊列名字,
     * 第二個參數為是否持久化,
     * 第三個參數為是否排他(true:一個連接只能有一個隊列,false:一個連接可以有多個(推薦))
     * 第四個參數為不使用隊列時自動刪除
     */
    Queue queue = new Queue("hello-java-queue",true,false,false);
    amqpAdmin.declareQueue(queue);
    System.out.println("隊列創建成功");
}

4. 打開隊列介面查看

在這裡插入圖片描述

5. 綁定交換機和隊列

@Autowired
AmqpAdmin amqpAdmin;

@Test
public void createBinding() {
    /**
     * 第一個參數為目的地,就是交換機或者隊列的名字
     * 第二個參數為目的地類型,交換機還是隊列
     * 第三個參數為交換機,
     * 第四個參數為路由鍵,匹配的名稱
     */
    Binding binding = new Binding("hello-java-queue",
            Binding.DestinationType.QUEUE,
            "hello-java-exchange",
            "hello.java",null);
    amqpAdmin.declareBinding(binding);
    System.out.println("綁定成功");
}

6. 打開交換機中的綁定介面

點擊交換機
在這裡插入圖片描述
綁定列表

在這裡插入圖片描述

九、測試發消息

1. 測試發送消息

@Autowired
RabbitTemplate rabbitTemplate;

@Test
public void sendMessageTest(){
	// 消息類型為object 發送對象也是可以的
    String msg = "這是一條消息";
    // 第一個參數為發送消息到那個交換機上,第二個是發送的路由鍵(交換機進行需要符合綁定的隊列),第三個參數為發送的消息
    rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",msg);
    System.out.println("消息發送成功");
}

2. Queue列表中查看消息

在這裡插入圖片描述

3. 手動確認消息

點擊我們的隊列:
在這裡插入圖片描述

進入詳細介面,下滑找到Get messages

在這裡插入圖片描述

在次點擊隊列,消息消失:
在這裡插入圖片描述

4. 測試發送對象

@Data
// 必須序列化,不然報錯
public class User implements Serializable {
    private String name;
    private Integer age;
}
@Autowired
RabbitTemplate rabbitTemplate;

@Test
public void sendMessageTest(){
    User user = new User();
    user.setAge(22);
    user.setName("王振軍");
    // 第一個參數為發送消息到那個交換機上,第二個是發送的路由鍵(交換機進行需要符合綁定的隊列),第三個參數為發送的消息
    rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",user);
    System.out.println("消息發送成功");
}

5. 查看消息發送的對象

在這裡插入圖片描述

6. 書寫消息發送對象轉JSON配置類

編寫配置類

@Configuration
public class MyRabbitmqConfig {

    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

7. 再次發送九、4中的程式碼,查詢是否正常顯示對象

在這裡插入圖片描述

十、測試收消息

1. 創建接收資訊的方法

方法所在的類必須交給了IOC管理,我們直接寫在service裡面。程式碼如下:

@Service
public class TestService {
	// queues是監聽的隊列名字,可以是多個
    @RabbitListener(queues = {"hello-java-queue"})
    public void reciveMessage(Object message){
        System.out.println("接受的資訊" + message);
    }
}

2. 模擬發送消息

還是用上面的方法進行發送一個對象!

@Autowired
RabbitTemplate rabbitTemplate;

@Test
public void sendMessageTest(){
    User user = new User();
    user.setAge(22);
    user.setName("王振軍");
    // 第一個參數為發送消息到那個交換機上,第二個是發送的路由鍵(交換機進行需要符合綁定的隊列),第三個參數為發送的消息
    rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",user);
    System.out.println("消息發送成功");
}

3. 接收消息查看

接受的資訊:(Body:'{"name":"王振軍","age":22}' 
MessageProperties [headers={__TypeId__=com.atguigu.gulimall.order.entity.User}, 
contentType=application/json, contentEncoding=UTF-8, contentLength=0, 
receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, 
receivedExchange=hello-java-exchange, receivedRoutingKey=hello.java, deliveryTag=1, 
consumerTag=amq.ctag-Nlg0mulsX9mxdPvGe72CBw, consumerQueue=hello-java-queue])

4. 在思考

我們發現剛剛返回的是詳細資訊,我們可以指定消息的類型,就是發送消息發的對象是什麼,我們就可以直接接收就行!看程式碼:

@Service
public class TestService {

    @RabbitListener(queues = {"hello-java-queue"})
    public void reciveMessage(Message message, User user){
        System.out.println("接受的資訊:" + message);
        System.out.println("發送的資訊:" + user);
    }
}

在此發送消息,我們看一下控制台:

接受的資訊:(Body:'{"name":"王振軍","age":22}' 
MessageProperties [headers={__TypeId__=com.atguigu.gulimall.order.entity.User}, 
contentType=application/json, contentEncoding=UTF-8, contentLength=0, 
receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, 
receivedExchange=hello-java-exchange, receivedRoutingKey=hello.java, deliveryTag=1, 
consumerTag=amq.ctag-Nlg0mulsX9mxdPvGe72CBw, consumerQueue=hello-java-queue])
發送的資訊:User(name=王振軍, age=22)

這樣就很清晰了哈!

拓展: 接收的還有第三個參數就是通道,每一個連接只會有一個通道哈!,大家可以自己測試一下,列印看看一下!!

public void reciveMessage(Message message, User user, Channel channel)

5. 多個服務監聽同一條隊列

右擊已存在服務,複製一份配置不同埠:

在這裡插入圖片描述

在這裡插入圖片描述
現在有兩個服務監聽同一個隊列!!

6. 模擬發送十條消息,查看會被那個服務接收

調整測試發消息程式碼:

@Autowired
RabbitTemplate rabbitTemplate;

@Test
public void sendMessageTest(){
    for (int i = 0;i < 10; i++) {
        User user = new User();
        user.setAge(i);
        user.setName("王振軍" + i);
        // 第一個參數為發送消息到那個交換機上,第二個是發送的路由鍵(交換機進行需要符合綁定的隊列),第三個參數為發送的消息
        rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", user);
        System.out.println("消息發送成功");
    }
}

接收消息的程式碼:

@Service
public class TestService {

    @RabbitListener(queues = {"hello-java-queue"})
    public void reciveMessage(Message message, User user){

        System.out.println("接收的資訊:" + user);
    }
}

7. 多服務接收消息結果查看

我們看到9000服務接收了1,4,7消息
在這裡插入圖片描述
9010服務接收了0,3,6,9消息
在這裡插入圖片描述
總結: 我們可以發現一個消息只會被接收一次!

還有就是發了10條消息,只有7條被接收了,其餘的呢?

別急小編來告訴大家,這是因為我們測試是使用SpringBoot的測試類進行的,有的部分消息被測試的接收了!大家不信可以看一下測試的控制台,找一下:

在這裡插入圖片描述
我們看到消息的2,5,8在這裡呢!!

拓展: 我們一次發送十條消息,每條接收消息假如耗時10s,此時會接收處理完一個消息,才會接收下一個,就是我們說的串列化!!

十一、總結

這樣我們就對RabbitMQ有了新的認識,從入門也算走上了實踐!後面有時間小編再把消息的可靠性發出來,也就是進階版!!

在次感謝雷神的課程哈,看到這裡,小夥伴們點個讚唄,小編整理不易呀!!謝謝大家了!!


有緣人才可以看得到的哦!!!

點擊訪問!小編自己的網站,裡面也是有很多好的文章哦!