RabbitMQ六種工作模式有哪些?怎樣用SpringBoot整合RabbitMQ
- 2021 年 1 月 23 日
- 筆記
目錄
- 一、RabbitMQ入門程序
- 二、Work queues 工作模式
- 三、Publish / Subscribe 發佈/訂閱模式
- 四、Routing 路由模式
- 五、Topics
- 六、Header
- 七、RPC
- 八、Spring Data Elasticsearch
一、RabbitMQ入門程序
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</dependency>
</dependencies>
application.yml
server:
port: 44000
spring:
application:
name: test-rabbitmq-producer
rabbitmq:
username: guest
password: guest
host: 127.0.0.1
port: 5672
virtual-host: /
消息發送者
/**
* Description: rabbitmq入門程序
*
* @author zygui
* @date Created on 2020/5/13 15:34
*/
public class Producer01 {
// 聲明一個消息隊列名稱
private static final String QUEUE_NAME = "helloworld";
public static void main(String[] args) {
// 通過連接工廠創建新的連接與mq建立連接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//設置虛擬機,一個mq服務可以設置多個虛擬機,每個虛擬機就相當於一個獨立的mq
connectionFactory.setVirtualHost("/"); // 默認為 / 即可
// 建立連接
Connection connection = null;
// 建立信道(目的是為了復用連接)
Channel channel = null;
try {
//建立新連接
connection = connectionFactory.newConnection();
//創建會話通道,生產者和mq服務所有通信都在channel通道中完成
channel = connection.createChannel();
//聲明隊列,如果隊列在mq 中沒有則要創建
//參數:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
/**
* 參數明細
* 1、queue 隊列名稱
* 2、durable 是否持久化,如果持久化,mq重啟後隊列還在
* 3、exclusive 是否獨佔連接,隊列只允許在該連接中訪問,如果connection連接關閉隊列則自動刪除,如果將此參數設置true可用於臨時隊列的創建
* 4、autoDelete 自動刪除,隊列不再使用時是否自動刪除此隊列,如果將此參數和exclusive參數設置為true就可以實現臨時隊列(隊列不用了就自動刪除)
* 5、arguments 參數,可以設置一個隊列的擴展參數,比如:可設置存活時間
*/
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
// 發送消息
//參數:String exchange, String routingKey, BasicProperties props, byte[] body
/**
* 參數明細:
* 1、exchange,交換機,如果不指定將使用mq的默認交換機(設置為"")
* 2、routingKey,路由key,交換機根據路由key來將消息轉發到指定的隊列,如果使用默認交換機,routingKey設置為隊列的名稱
* 3、props,消息的屬性
* 4、body,消息內容
*/
//消息內容
String message = "hello world 桂朝陽";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("send to mq "+message);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
// 關閉信道
channel.close();
// 關閉連接
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
消息接收者
/**
* Description: rabbitmq入門程序
*
* @author zygui
* @date Created on 2020/5/13 15:45
*/
public class Consumer01 {
private static final String QUEUE_NAME = "helloworld";
public static void main(String[] args) throws IOException, TimeoutException {
//通過連接工廠創建新的連接和mq建立連接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);//端口
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//設置虛擬機,一個mq服務可以設置多個虛擬機,每個虛擬機就相當於一個獨立的mq
connectionFactory.setVirtualHost("/");
//建立新連接
Connection connection = connectionFactory.newConnection();
//創建會話通道,生產者和mq服務所有通信都在channel通道中完成
Channel channel = connection.createChannel();
//監聽隊列
//聲明隊列,如果隊列在mq 中沒有則要創建
//參數:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
/**
* 參數明細
* 1、queue 隊列名稱
* 2、durable 是否持久化,如果持久化,mq重啟後隊列還在
* 3、exclusive 是否獨佔連接,隊列只允許在該連接中訪問,如果connection連接關閉隊列則自動刪除,如果將此參數設置true可用於臨時隊列的創建
* 4、autoDelete 自動刪除,隊列不再使用時是否自動刪除此隊列,如果將此參數和exclusive參數設置為true就可以實現臨時隊列(隊列不用了就自動刪除)
* 5、arguments 參數,可以設置一個隊列的擴展參數,比如:可設置存活時間
*/
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
// 實現消費方法
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
/**
* 當接收到消息後此方法將被調用
* @param consumerTag 消費者標籤,用來標識消費者的,在監聽隊列時設置channel.basicConsume
* @param envelope 信封,通過envelope
* @param properties 消息屬性
* @param body 消息內容
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//交換機
String exchange = envelope.getExchange();
//消息id,mq在channel中用來標識消息的id,可用於確認消息已接收
long deliveryTag = envelope.getDeliveryTag();
//消息內容
String message= new String(body,"utf-8");
System.out.println("receive message:"+message);
}
};
//監聽隊列
//參數:String queue, boolean autoAck, Consumer callback
/**
* 參數明細:
* 1、queue 隊列名稱
* 2、autoAck 自動回復,當消費者接收到消息後要告訴mq消息已接收,如果將此參數設置為tru表示會自動回復mq,如果設置為false要通過編程實現回復
* 3、callback,消費方法,當消費者接收到消息要執行的方法
*/
channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
}
}
入門程序步驟
二、Work queues 工作模式
三、Publish / Subscribe 發佈/訂閱模式
消息生產者
public class Producer02_publish {
//隊列名稱
private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
// 交換機名稱
private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";
public static void main(String[] args) {
//通過連接工廠創建新的連接和mq建立連接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);//端口
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//設置虛擬機,一個mq服務可以設置多個虛擬機,每個虛擬機就相當於一個獨立的mq
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
//建立新連接
connection = connectionFactory.newConnection();
//創建會話通道,生產者和mq服務所有通信都在channel通道中完成
channel = connection.createChannel();
//聲明隊列,如果隊列在mq 中沒有則要創建
//參數:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
/**
* 參數明細
* 1、queue 隊列名稱
* 2、durable 是否持久化,如果持久化,mq重啟後隊列還在
* 3、exclusive 是否獨佔連接,隊列只允許在該連接中訪問,如果connection連接關閉隊列則自動刪除,如果將此參數設置true可用於臨時隊列的創建
* 4、autoDelete 自動刪除,隊列不再使用時是否自動刪除此隊列,如果將此參數和exclusive參數設置為true就可以實現臨時隊列(隊列不用了就自動刪除)
* 5、arguments 參數,可以設置一個隊列的擴展參數,比如:可設置存活時間
*/
channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
//聲明一個交換機
//參數:String exchange, String type
/**
* 參數明細:
* 1、交換機的名稱
* 2、交換機的類型
* fanout:對應的rabbitmq的工作模式是 publish/subscribe
* direct:對應的Routing 工作模式
* topic:對應的Topics工作模式
* headers: 對應的headers工作模式
*/
channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);
//進行交換機和隊列綁定
//參數:String queue, String exchange, String routingKey
/**
* 參數明細:
* 1、queue 隊列名稱
* 2、exchange 交換機名稱
* 3、routingKey 路由key,作用是交換機根據路由key的值將消息轉發到指定的隊列中,在發佈訂閱模式中調協為空字符串
*/
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_FANOUT_INFORM,"");
channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_FANOUT_INFORM,"");
//發送消息
//參數:String exchange, String routingKey, BasicProperties props, byte[] body
/**
* 參數明細:
* 1、exchange,交換機,如果不指定將使用mq的默認交換機(設置為"")
* 2、routingKey,路由key,交換機根據路由key來將消息轉發到指定的隊列,如果使用默認交換機,routingKey設置為隊列的名稱
* 3、props,消息的屬性
* 4、body,消息內容
*/
for(int i=0;i<5;i++){
//消息內容
String message = "send inform message to user";
channel.basicPublish(EXCHANGE_FANOUT_INFORM,"",null,message.getBytes());
System.out.println("send to mq "+message);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//關閉連接
//先關閉通道
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
try {
connection.close();
} catch (IOException e) {
e