RabbitMQ簡介、安裝、基本特性API–Java測試

新的閱讀體驗地址://www.zhouhong.icu/post/141

本篇文章所有的程式碼://github.com/Tom-shushu/Distributed-system-learning-notes/tree/master/rabbitmq-api-demo

一、初識RabbitMQ

是一個開源的消息代理和隊列伺服器,用來通過普通協議在完全不同的應用之間共享數據,RabbitMQ是使用Erlang語言來編寫的,並且RabbitMQ是基於AMQP協議的。
AMQP協議Advanced Message Queuing Protocol(高級消息隊列協議)
 定義:具有現代特徵的二進位協議,是一個提供統一消息服務的應用層標準高級消息隊列協議,
是應用層協議的一個開放標準,為面向消息中間件設計。
AMQP專業術語:
  • Server:又稱broker,接受客戶端的鏈接,實現AMQP實體服務
  • Connection:連接,應用程式與broker的網路連接
  • Channel:網路信道,幾乎所有的操作都在channel中進行,Channel是進行消息讀寫的通道。客戶端可以建立多個channel,每個channel代表一個會話任務。
  • Message:消息,伺服器與應用程式之間傳送的數據,由Properties和Body組成.Properties可以對消息進行修飾,必須消息的優先順序、延遲等高級特性;Body則是消息體內容。
  • virtualhost: 虛擬地址,用於進行邏輯隔離,最上層的消息路由。一個virtual host裡面可以有若干個Exchange和Queue,同一個Virtual Host 裡面不能有相同名稱的Exchange 或 Queue。
  • Exchange:交換機,接收消息,根據路由鍵轉單消息到綁定隊列
  • Binding:  Exchange和Queue之間的虛擬鏈接,binding中可以包換routing key
  • Routing key: 一個路由規則,虛擬機可用它來確定如何路由一個特定消息。(如負載均衡)
RabbitMQ整體架構

Exchange和隊列是多對多關係,實際操作一般為1個exchange對多個隊列,為避免設計過於複雜.

二、單機版快速安裝

  • 1、首先在Linux上進行一些軟體的準備工作
yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz
wget //github.com/rabbitmq/erlang-rpm/releases/download/v23.0.4/erlang-23.0.4-1.el7.x86_64.rpm
wget //repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
wget //github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.5/rabbitmq-server-3.8.5-1.el7.noarch.rpm
  • 3、安裝服務命令
rpm -ivh erlang-23.0.4-1.el7.x86_64.rpm
rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
rpm -ivh rabbitmq-server-3.8.5-1.el7.noarch.rpm​
  • 4、啟動
啟動服務
systemctl start rabbitmq-server
查看是否啟動
lsof -i:5672
  • 5、啟動、安裝web管理插件(管控台)
rabbitmq-plugins enable rabbitmq_management
  • 6、查看管理埠有沒有啟動
lsof -i:15672
或者:
netstat -tnlp | grep 15672
  • 7、添加用戶
#添加用戶 用戶名 admin 密碼 admin web管理工具可用此用戶登錄
sudo rabbitmqctl add_user admin admin
#設置用戶角色 管理員
sudo rabbitmqctl set_user_tags admin administrator
#設置用戶許可權(接受來自所有Host的所有操作)
sudo rabbitmqctl set_permissions -p / admin "." "." ".*"  
#查看用戶許可權
sudo rabbitmqctl list_user_permissions admin
  • 重新啟動
systemctl start rabbitmq-server
rabbitmq-plugins enable rabbitmq_management

  • 程式碼測試
  1. 引入依賴
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.5</version>
        </dependency>

     2.發送端:

package com.zhouhong.rabbitmq.api.helloworld;
import java.util.HashMap;
import java.util.Map;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Sender {
    public static void main(String[] args) throws Exception {
        //    1 創建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.2.121");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        //    2 創建Connection
        Connection connection = connectionFactory.newConnection();
        //    3 創建Channel
        Channel channel = connection.createChannel();  
        //    4 聲明
        String queueName = "test001";  
        //    參數: queue名字,是否持久化,獨佔的queue(僅供此連接),不使用時是否自動刪除, 其他參數
        channel.queueDeclare(queueName, false, false, false, null);
        Map<String, Object> headers = new HashMap<String, Object>();
        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
        .deliveryMode(2)
        .contentEncoding("UTF-8")
        .headers(headers).build();
        for(int i = 0; i < 5;i++) {
            String msg = "Hello World RabbitMQ " + i;
            channel.basicPublish("", queueName , props , msg.getBytes());             
        }
    }
}

    3.接收端

package com.zhouhong.rabbitmq.api.helloworld;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Receiver {
    public static void main(String[] args) throws Exception {        
        ConnectionFactory connectionFactory = new ConnectionFactory() ;          
        connectionFactory.setHost("192.168.2.121");
        connectionFactory.setPort(5672);
        connectionFactory.setPassword("admin");
        connectionFactory.setUsername("admin");
        connectionFactory.setVirtualHost("/");        
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();        
        Channel channel = connection.createChannel();          
        String queueName = "test001";  
        //    durable 是否持久化消息
        channel.queueDeclare(queueName, false, false, false, null);  
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //    參數:隊列名稱、是否自動ACK、Consumer
        channel.basicConsume(queueName, true, consumer);  
        //    循環獲取消息  
        while(true){  
            //    獲取消息,如果沒有消息,這一步將會一直阻塞  
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());    
            System.out.println("收到消息:" + msg);  
        } 
    }
}

    4.結果(先啟動接收端進行監控,再啟動發送端)

收到消息:Hello World RabbitMQ 0
收到消息:Hello World RabbitMQ 1
收到消息:Hello World RabbitMQ 2
收到消息:Hello World RabbitMQ 3
收到消息:Hello World RabbitMQ 4

三、RabbitMQ—-交換機

  1. Name:交換機名稱。
  2. Type:交換機類型 direct、topic、fanout、headers。
  3. Durability:是否持久化,ture為持久化。
  4. Auto Delete :當最後一個綁定道Exchange上的隊列刪除後,自動刪除該Exchange。
  5. Internal:當前Exchange是否用於RabbitMQ內部使用,默認為False。
  6. Arguments:擴展參數,用於擴展AMQP協議自製定化使用。
  7. DirectExchange的消息被轉發道RouteKey中指定的Queue。
交換機—–Direct exchange
Direct模式可以使用RabbitMQ自帶的Exchange:default Exchange,所以不需要將Exchange進行任何綁定操作,消息傳遞時,RouteKey必須完全匹配才會被隊列接收,否則該消息會被拋棄。

程式碼:
  • 發送端
package com.zhouhong.rabbitmq.api.exchange.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Sender4DirectExchange {
    public static void main(String[] args) throws Exception {
        //1 創建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.2.121");
        connectionFactory.setPort(5672);
        connectionFactory.setPassword("admin");
        connectionFactory.setUsername("admin");
        connectionFactory.setVirtualHost("/");
        //2 創建Connection
        Connection connection = connectionFactory.newConnection();
        //3 創建Channel
        Channel channel = connection.createChannel();  
        //4 聲明
        String exchangeName = "test_direct_exchange";
        //必須要和接收端 routingKey 一一對應
        String routingKey = "test_direct_routingKey";
        //5 發送
        String msg = "Hello World RabbitMQ 4  Direct Exchange Message ... ";
        channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());                 
    }
}
  •  接收端
package com.zhouhong.rabbitmq.api.exchange.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Receiver4DirectExchange {    
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory() ;          
        connectionFactory.setHost("192.168.2.121");
        connectionFactory.setPort(5672);
        connectionFactory.setPassword("admin");
        connectionFactory.setUsername("admin");
        connectionFactory.setVirtualHost("/");    
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();  
        //4 聲明
        String exchangeName = "test_direct_exchange";
        String exchangeType = "direct";
        String queueName = "test_direct_queue";
        String routingKey = "test_direct_routingKey";
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //參數:隊列名稱、是否自動ACK、Consumer
        channel.basicConsume(queueName, true, consumer);  
        //循環獲取消息  
        while(true){  
            //獲取消息,如果沒有消息,這一步將會一直阻塞  
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());    
            System.out.println("收到消息:" + msg);  
        } 
    }
}
交換機—–topic exchange
exchange 將Routekey和某個topic進行一個模糊匹配,發送給對應隊列、可以用通配符進行匹配

比如下面例子
程式碼:
  • 接收端
package com.zhouhong.rabbitmq.api.exchange.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Receiver4TopicExchange1 {
    public static void main(String[] args) throws Exception {        
        ConnectionFactory connectionFactory = new ConnectionFactory() ;         
        connectionFactory.setHost("192.168.2.121");
        connectionFactory.setPort(5672);
        connectionFactory.setPassword("admin");
        connectionFactory.setUsername("admin");
        connectionFactory.setVirtualHost("/");        
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();       
        Channel channel = connection.createChannel();  
        //4 聲明
        String exchangeName = "test_topic_exchange";
        String exchangeType = "topic";
        String queueName = "test_topic_queue";
        // 只能匹配一個 例如:user.txt、user.py都可以,但是user.txt.py 不行
        //String routingKey = "user.*";
        // user.txt、user.py 、user.txt.py 都可以匹配到
        String routingKey = "user.#";
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);        
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //    參數:隊列名稱、是否自動ACK、Consumer
        channel.basicConsume(queueName, true, consumer);
        System.err.println("consumer1 start.. ");
        //    循環獲取消息  
        while(true){  
            //    獲取消息,如果沒有消息,這一步將會一直阻塞  
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());    
            System.out.println("收到消息:" + msg + ", RoutingKey: " + delivery.getEnvelope().getRoutingKey());  
        } 
    }
}
  • 發送端
package com.zhouhong.rabbitmq.api.exchange.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Sender4TopicExchange {    
    public static void main(String[] args) throws Exception {        
        //1 創建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.2.121");
        connectionFactory.setPort(5672);
        connectionFactory.setPassword("admin");
        connectionFactory.setUsername("admin");
        connectionFactory.setVirtualHost("/");        
        //2 創建Connection
        Connection connection = connectionFactory.newConnection();
        //3 創建Channel
        Channel channel = connection.createChannel();  
        //4 聲明
        String exchangeName = "test_topic_exchange";
        String routingKey1 = "user.save";
        String routingKey2 = "user.update";
        String routingKey3 = "user.delete.abc";
        //5 發送        
        String msg = "Hello World RabbitMQ 4 Topic Exchange Message ...";
        channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes()); 
        channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());     
        channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes()); 
        channel.close();  
        connection.close();  
    }    
}
交換機—–Fanout exchange 廣播模式
1.不處理路由鍵,只需要簡單的將隊列綁定到交換機上。
2.發送到交換機的消息都會被轉發到與該交換機綁定的所有隊列上。
3.Fanout交換機轉發消息是最快的。
程式碼:見示例文章開始GitHub地址

四、RabbitMQ高級特性

1、消息如何保障 100% 的投遞成功
生產端的可靠性投遞的標誌:
1、消息成功發出
2、mq節點成功接收
3、發送端MQ節點確認應答
4、完善的消息補償機制
解決:消息資訊落庫,對消息狀態進行打標
冪等性
    1、 select count(1) from t_order where id = 唯一id(或)指紋碼
    2、唯一id或指紋碼機制,利用資料庫主鍵去重
2、Confirm
第一步:再channel上開啟確認模式:channel.confirmSelect();
第二步:再channel上添加監聽:addConfirmListener,監聽成功和失敗的返回結果,根據具體的結果對消息進行重新發送、或記錄日期等後續處理!
3、return消息機制
ReturnListener用於處理不可路由的消息
我們的消息生產者,通過指定一個Exchage和Routingkey,把消息送達某一個隊列中去,然後我們的消費者監聽隊列,進行消費處理操作,如果沒有合適的隊列,則會由returnListener進行接受。
Mandatory:如果為true,則監聽器會接收到路由不可達的消息,然後進行後續處理,如果為false,那麼broker端自動刪除該消息。
4、消費端ACK與重回隊列
消費端ACK:
  • 在工作的時候一般不會選擇自動ack
  • 消費端的手工ack分為兩種ACK和NACK
  • 消費端進行消費的時候,如果由於業務異常我們可以進行日誌的記錄,然後進行補償。這種建議回復NACK,不要重回隊列
  • 如果由於伺服器宕機等嚴重問題,那我們就需要手工進行ACK保障消費端消費成功
消費端的重回隊列
  • 是為了對沒有處理成功的消息,把消息重新會投遞給broker。
  • 重回隊列,會回到隊列的尾部
  • 也會造成一條消息一直重複投遞,死循環了
  • 在實際應用中,都會關閉重回隊列,也就是設置為false
5、TTL隊列和消息
TTL: time to live的縮寫,也就是生存時間。
  • RabbitMQ 支援消息過期時間,在消息發送時可以進行指定
  • RabbitMQ支援隊列過期時間,從消息入隊列開始計算,只要超過了隊列的超時間時間配置,那麼消息會自動的清除
死隊列: DLX,Dead-Letter-Exchange
  • 利用DLX,當消息在一個隊列中變成死信(dead message)之後,它能被重新publish到另一個Exchange,這個Exchange就是DLX.
消息變成死信的幾種情況
  • 消息被拒絕 並且requeue = false
  • 消息TTL過期
  • 隊列達到最大長度
DLX也是一個正常的Exchange,實際上是一個屬性控制
  • 當隊列中有死信時,RabbitMQ就會自動的將這個消息重新發布到設置的Exchange上,進而被路由到另一個隊列.
  • 可以監聽這個隊列中消息做相應的處理,這個特性可以彌補rabbitMQ3.0以前的immediate參數功能。
  • 在正常隊列上添加參數:arguments.put(“x-dead-letter-exchange”,”dlx.exchange”);這樣消息過期、requeue、隊列達到最大長度時,就可以直接路由到死信隊列。