RabbitMQ簡介、安裝、基本特性API–Java測試
新的閱讀體驗地址://www.zhouhong.icu/post/141
本篇文章所有的程式碼://github.com/Tom-shushu/Distributed-system-learning-notes/tree/master/rabbitmq-api-demo
一、初識RabbitMQ
是一個開源的消息代理和隊列伺服器,用來通過普通協議在完全不同的應用之間共享數據,RabbitMQ是使用Erlang語言來編寫的,並且RabbitMQ是基於AMQP協議的。
AMQP協議Advanced Message Queuing Protocol(高級消息隊列協議)
定義:具有現代特徵的二進位協議,是一個提供統一消息服務的應用層標準高級消息隊列協議,
是應用層協議的一個開放標準,為面向消息中間件設計。

AMQP專業術語:
- Server:又稱broker,接受客戶端的鏈接,實現AMQP實體服務
- Connection:連接,應用程式與broker的網路連接
- Channel:網路信道,幾乎所有的操作都在channel中進行,Channel是進行消息讀寫的通道。客戶端可以建立多個channel,每個channel代表一個會話任務。
- Message:消息,伺服器與應用程式之間傳送的數據,由Properties和Body組成.Properties可以對消息進行修飾,必須消息的優先順序、延遲等高級特性;Body則是消息體內容。
- virtualhost: 虛擬地址,用於進行邏輯隔離,最上層的消息路由。一個virtual host裡面可以有若干個Exchange和Queue,同一個Virtual Host 裡面不能有相同名稱的Exchange 或 Queue。
- Exchange:交換機,接收消息,根據路由鍵轉單消息到綁定隊列
- Binding: Exchange和Queue之間的虛擬鏈接,binding中可以包換routing key
- Routing key: 一個路由規則,虛擬機可用它來確定如何路由一個特定消息。(如負載均衡)
RabbitMQ整體架構
Exchange和隊列是多對多關係,實際操作一般為1個exchange對多個隊列,為避免設計過於複雜.
二、單機版快速安裝
- 1、首先在Linux上進行一些軟體的準備工作
yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz
wget //github.com/rabbitmq/erlang-rpm/releases/download/v23.0.4/erlang-23.0.4-1.el7.x86_64.rpm
wget //repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
wget //github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.5/rabbitmq-server-3.8.5-1.el7.noarch.rpm
- 3、安裝服務命令
rpm -ivh erlang-23.0.4-1.el7.x86_64.rpm
rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
rpm -ivh rabbitmq-server-3.8.5-1.el7.noarch.rpm
- 4、啟動
啟動服務
systemctl start rabbitmq-server
查看是否啟動
lsof -i:5672
- 5、啟動、安裝web管理插件(管控台)
rabbitmq-plugins enable rabbitmq_management
- 6、查看管理埠有沒有啟動
lsof -i:15672
或者:
netstat -tnlp | grep 15672
- 7、添加用戶
#添加用戶 用戶名 admin 密碼 admin web管理工具可用此用戶登錄
sudo rabbitmqctl add_user admin admin
#設置用戶角色 管理員
sudo rabbitmqctl set_user_tags admin administrator
#設置用戶許可權(接受來自所有Host的所有操作)
sudo rabbitmqctl set_permissions -p / admin "." "." ".*"
#查看用戶許可權
sudo rabbitmqctl list_user_permissions admin
- 重新啟動
systemctl start rabbitmq-server
rabbitmq-plugins enable rabbitmq_management
- 訪問://192.168.2.121:15672/ 使用 admin 登錄
- 程式碼測試
- 引入依賴
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.5</version> </dependency>
2.發送端:
package com.zhouhong.rabbitmq.api.helloworld; import java.util.HashMap; import java.util.Map; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Sender { public static void main(String[] args) throws Exception { // 1 創建ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.2.121"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); connectionFactory.setVirtualHost("/"); // 2 創建Connection Connection connection = connectionFactory.newConnection(); // 3 創建Channel Channel channel = connection.createChannel(); // 4 聲明 String queueName = "test001"; // 參數: queue名字,是否持久化,獨佔的queue(僅供此連接),不使用時是否自動刪除, 其他參數 channel.queueDeclare(queueName, false, false, false, null); Map<String, Object> headers = new HashMap<String, Object>(); AMQP.BasicProperties props = new AMQP.BasicProperties.Builder() .deliveryMode(2) .contentEncoding("UTF-8") .headers(headers).build(); for(int i = 0; i < 5;i++) { String msg = "Hello World RabbitMQ " + i; channel.basicPublish("", queueName , props , msg.getBytes()); } } }
3.接收端
package com.zhouhong.rabbitmq.api.helloworld;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Receiver {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost("192.168.2.121");
connectionFactory.setPort(5672);
connectionFactory.setPassword("admin");
connectionFactory.setUsername("admin");
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String queueName = "test001";
// durable 是否持久化消息
channel.queueDeclare(queueName, false, false, false, null);
QueueingConsumer consumer = new QueueingConsumer(channel);
// 參數:隊列名稱、是否自動ACK、Consumer
channel.basicConsume(queueName, true, consumer);
// 循環獲取消息
while(true){
// 獲取消息,如果沒有消息,這一步將會一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
}
}
}
4.結果(先啟動接收端進行監控,再啟動發送端)
收到消息:Hello World RabbitMQ 0
收到消息:Hello World RabbitMQ 1
收到消息:Hello World RabbitMQ 2
收到消息:Hello World RabbitMQ 3
收到消息:Hello World RabbitMQ 4
三、RabbitMQ—-交換機
- Name:交換機名稱。
- Type:交換機類型 direct、topic、fanout、headers。
- Durability:是否持久化,ture為持久化。
- Auto Delete :當最後一個綁定道Exchange上的隊列刪除後,自動刪除該Exchange。
- Internal:當前Exchange是否用於RabbitMQ內部使用,默認為False。
- Arguments:擴展參數,用於擴展AMQP協議自製定化使用。
- DirectExchange的消息被轉發道RouteKey中指定的Queue。
交換機—–Direct exchange
Direct模式可以使用RabbitMQ自帶的Exchange:default Exchange,所以不需要將Exchange進行任何綁定操作,消息傳遞時,RouteKey必須完全匹配才會被隊列接收,否則該消息會被拋棄。
程式碼:
- 發送端
package com.zhouhong.rabbitmq.api.exchange.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Sender4DirectExchange {
public static void main(String[] args) throws Exception {
//1 創建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.2.121");
connectionFactory.setPort(5672);
connectionFactory.setPassword("admin");
connectionFactory.setUsername("admin");
connectionFactory.setVirtualHost("/");
//2 創建Connection
Connection connection = connectionFactory.newConnection();
//3 創建Channel
Channel channel = connection.createChannel();
//4 聲明
String exchangeName = "test_direct_exchange";
//必須要和接收端 routingKey 一一對應
String routingKey = "test_direct_routingKey";
//5 發送
String msg = "Hello World RabbitMQ 4 Direct Exchange Message ... ";
channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());
}
}
- 接收端
package com.zhouhong.rabbitmq.api.exchange.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Receiver4DirectExchange {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost("192.168.2.121");
connectionFactory.setPort(5672);
connectionFactory.setPassword("admin");
connectionFactory.setUsername("admin");
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//4 聲明
String exchangeName = "test_direct_exchange";
String exchangeType = "direct";
String queueName = "test_direct_queue";
String routingKey = "test_direct_routingKey";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
//參數:隊列名稱、是否自動ACK、Consumer
channel.basicConsume(queueName, true, consumer);
//循環獲取消息
while(true){
//獲取消息,如果沒有消息,這一步將會一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
}
}
}
交換機—–topic exchange
exchange 將Routekey和某個topic進行一個模糊匹配,發送給對應隊列、可以用通配符進行匹配
比如下面例子

程式碼:
- 接收端
package com.zhouhong.rabbitmq.api.exchange.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Receiver4TopicExchange1 {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost("192.168.2.121");
connectionFactory.setPort(5672);
connectionFactory.setPassword("admin");
connectionFactory.setUsername("admin");
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//4 聲明
String exchangeName = "test_topic_exchange";
String exchangeType = "topic";
String queueName = "test_topic_queue";
// 只能匹配一個 例如:user.txt、user.py都可以,但是user.txt.py 不行
//String routingKey = "user.*";
// user.txt、user.py 、user.txt.py 都可以匹配到
String routingKey = "user.#";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
// 參數:隊列名稱、是否自動ACK、Consumer
channel.basicConsume(queueName, true, consumer);
System.err.println("consumer1 start.. ");
// 循環獲取消息
while(true){
// 獲取消息,如果沒有消息,這一步將會一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg + ", RoutingKey: " + delivery.getEnvelope().getRoutingKey());
}
}
}
- 發送端
package com.zhouhong.rabbitmq.api.exchange.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Sender4TopicExchange {
public static void main(String[] args) throws Exception {
//1 創建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.2.121");
connectionFactory.setPort(5672);
connectionFactory.setPassword("admin");
connectionFactory.setUsername("admin");
connectionFactory.setVirtualHost("/");
//2 創建Connection
Connection connection = connectionFactory.newConnection();
//3 創建Channel
Channel channel = connection.createChannel();
//4 聲明
String exchangeName = "test_topic_exchange";
String routingKey1 = "user.save";
String routingKey2 = "user.update";
String routingKey3 = "user.delete.abc";
//5 發送
String msg = "Hello World RabbitMQ 4 Topic Exchange Message ...";
channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes());
channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());
channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes());
channel.close();
connection.close();
}
}
交換機—–Fanout exchange 廣播模式
1.不處理路由鍵,只需要簡單的將隊列綁定到交換機上。
2.發送到交換機的消息都會被轉發到與該交換機綁定的所有隊列上。
3.Fanout交換機轉發消息是最快的。

程式碼:見示例文章開始GitHub地址
四、RabbitMQ高級特性
1、消息如何保障 100% 的投遞成功
生產端的可靠性投遞的標誌:
1、消息成功發出
2、mq節點成功接收
3、發送端MQ節點確認應答
4、完善的消息補償機制
解決:消息資訊落庫,對消息狀態進行打標

冪等性
1、 select count(1) from t_order where id = 唯一id(或)指紋碼
2、唯一id或指紋碼機制,利用資料庫主鍵去重
2、Confirm

第一步:再channel上開啟確認模式:channel.confirmSelect();
第二步:再channel上添加監聽:addConfirmListener,監聽成功和失敗的返回結果,根據具體的結果對消息進行重新發送、或記錄日期等後續處理!
3、return消息機制
ReturnListener用於處理不可路由的消息

我們的消息生產者,通過指定一個Exchage和Routingkey,把消息送達某一個隊列中去,然後我們的消費者監聽隊列,進行消費處理操作,如果沒有合適的隊列,則會由returnListener進行接受。
Mandatory:如果為true,則監聽器會接收到路由不可達的消息,然後進行後續處理,如果為false,那麼broker端自動刪除該消息。
4、消費端ACK與重回隊列
消費端ACK:
- 在工作的時候一般不會選擇自動ack
- 消費端的手工ack分為兩種ACK和NACK
- 消費端進行消費的時候,如果由於業務異常我們可以進行日誌的記錄,然後進行補償。這種建議回復NACK,不要重回隊列
- 如果由於伺服器宕機等嚴重問題,那我們就需要手工進行ACK保障消費端消費成功
消費端的重回隊列
- 是為了對沒有處理成功的消息,把消息重新會投遞給broker。
- 重回隊列,會回到隊列的尾部
- 也會造成一條消息一直重複投遞,死循環了
- 在實際應用中,都會關閉重回隊列,也就是設置為false
5、TTL隊列和消息
TTL: time to live的縮寫,也就是生存時間。
- RabbitMQ 支援消息過期時間,在消息發送時可以進行指定
- RabbitMQ支援隊列過期時間,從消息入隊列開始計算,只要超過了隊列的超時間時間配置,那麼消息會自動的清除
死隊列: DLX,Dead-Letter-Exchange
- 利用DLX,當消息在一個隊列中變成死信(dead message)之後,它能被重新publish到另一個Exchange,這個Exchange就是DLX.
消息變成死信的幾種情況
- 消息被拒絕 並且requeue = false
- 消息TTL過期
- 隊列達到最大長度
DLX也是一個正常的Exchange,實際上是一個屬性控制
- 當隊列中有死信時,RabbitMQ就會自動的將這個消息重新發布到設置的Exchange上,進而被路由到另一個隊列.
- 可以監聽這個隊列中消息做相應的處理,這個特性可以彌補rabbitMQ3.0以前的immediate參數功能。
- 在正常隊列上添加參數:arguments.put(“x-dead-letter-exchange”,”dlx.exchange”);這樣消息過期、requeue、隊列達到最大長度時,就可以直接路由到死信隊列。