RabbitMQ簡介
RabbitMQ簡介
官網也講述的清楚而且還有例子,只不過是英文的,很多人看到英文就不明白說什麼了吧,即使有翻譯成中文,總覺得哪裡怪怪的,有些翻譯並不流暢。我還是支援多看官網,官網://www.rabbitmq.com/ 。下面是自己做的一點下筆記,有參考其他文檔。如有什麼不對的地方,希望大家能夠告訴我,通過留言板,像消息隊列一樣,你發送消息,我接收消息。
一、消息隊列(MQ)
- 什麼是消息隊列:即MQ,Message Queue。是一種應用程式對應用對應用程式的通訊方式。應用程式通過讀寫出入隊列的消息(針對應用程式的數據)來通訊,而無需專用連接來連接他們。消息隊列是典型的:生產者、消費者模型。生產者不斷向消息隊列中生產消息,消費者不斷從隊列中獲取消息。【因為消息的生產和消費都是非同步的,而且只關心消息的發送和接收,沒有業務邏輯的侵入,這樣就實現了生產者和消費者的解耦】
- AMQP和JMS
- AMQP:即Advanced Message Queuing Protocol,一個提供統一消息服務的應用層標準(高級消息隊列協議)。【限定了數據傳輸的格式和方式,跨語言跨平台,和http協議類似】。是應用層協議的一個開放標準,為面向消息的中間件設計。
- JMS:Java MessageService,實際上指JMS API。JMS是SUN公司早期提出的消息標準,旨在為Java應用提供統一的消息操作,包括create、send、recieve等。JMS已經成為java Enterprise Edition的一部分。
- 兩者間的區別和聯繫:
- JMS是定義了統一的介面,來對消息操作進行統一;AMQP是通過規定協議來統一數據交互的格式。
- JMS限定了必須使用Java語言;AMQP只是協議,不規定實現方式,因此是跨語言的。
- JMS規定了兩種消息模型;而AMQP的消息模型更加豐富。
- 常見MQ產品:
- ActiveMQ:基於JMS, Apache旗下的
- RabbitMQ:基於AMQP協議,erlang(一種通用的面向並發的程式語言)語言開發,穩定性好
- RocketMQ:基於JMS,阿里巴巴產品,目前交由Apache基金會
- Kafka:分散式消息系統,高吞吐量
二、RabbitMQ:
- 下載安裝:先安裝erlang,在安裝rabbitmq。(這裡不詳細說明,如果有不是很清楚的,可以去搜索,相信很多夥伴已經分享過很多這樣的部落格啦)
- 啟動步驟【window系統下的cmd命令行】:cd \RabbitMQ Server\rabbitmq_server-3.7.15\sbin(進入到你安裝的rabbitmq目錄的sbin目錄) –> rabbitmq-plugins enable rabbitmq_management –> rabbitmq-server
- 五種消息模型:RabbitMQ提供了6中消息模型,但是第6種RPC,並不是MQ。其中3/4/5這三種屬於訂閱模式,只不過進行路由的方式不同。
- 基本消息模型:RabbitMQ是一個消息的代理者(Message broker):它接收消息並且傳遞消息。你可以認為它是一個郵局,當你投遞郵件到一個郵箱,你很肯定郵遞員終究會將郵件遞交給你的收件人。與此類似,RabbitMQ可以是一個郵箱、郵局、同時還是郵遞員。不同之處在於:RabbitMQ不是傳遞紙質郵件,而是二進位的數據。
- 問題:那麼RabbitMQ怎麼知道消息被接收了呢?(轉換思維,即如何避免消息的丟失?)【答:消費者的消息確認機制Acknowlege,當消費者獲取消息後,會向RabbitMQ發送回執ACK,告知消息已經被接收。不過這種回執ACK分兩種情況:① 自動ACK:消息一旦被接收,消費者自動發送ACK;② 手動ACK:消息接收後,不會發送ACK,需要手動調用。很好理解,望名知意嘛,嘿嘿嘿!!!】。
-
public class ConnectionUtil { /** * 建立與RabbitMQ的鏈接 */ public static Connection getConnection() throws IOException, TimeoutException { // 定義連接工廠 ConnectionFactory factory = new ConnectionFactory(); // 設置服務地址 factory.setHost("127.0.0.1"); // 埠 factory.setPort(5672); // 設置帳號資訊,用戶名、密碼、vhost factory.setVirtualHost("/demo"); factory.setUsername("guest"); factory.setPassword("guest"); // 通過工廠獲取連接 Connection connection = factory.newConnection(); return connection; } }
public class Send { private final static String QUEUE_NAME = "simple_queue"; public static void main(String[] args) throws IOException, TimeoutException { // 獲取到連接 Connection connection = ConnectionUtil.getConnection(); // 從連接中創建通道,使用通道才能完成消息相關的操作 Channel channel = connection.createChannel(); // 聲明(創建)隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 消息內容 String message = "Hello word!"; // 向指定的隊列中發送消息 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); // 關閉通道和連接 channel.close(); connection.close(); } }
public class Recv { private final static String QUEUE_NAME = "simple_queue"; public static void main(String[] args) throws Exception { // 獲取連接 Connection connection = ConnectionUtil.getConnection(); // 創建通道 Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 定義隊列的消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取消息,並且處理,這個方法類似事件監聽,如果有消息時,會自動被調用 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息體 String msg = new String(body); System.out.println(" [x] received: " + msg + "!"); } }; // 監聽隊列,第二個參數:是否自動進行消息確認 channel.basicConsume(QUEUE_NAME, true, consumer); } }
public class Recv2 { private final static String QUEUE_NAME = "simple_queue"; public static void main(String[] args) throws Exception { // 獲取連接 Connection connection = ConnectionUtil.getConnection(); // 創建通道 Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 定義隊列的消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取消息,並且處理,這個方法類似事件監聽,如果有消息時,會自動被調用 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息體 String msg = new String(body); System.out.println(" [x] received: " + msg + "!"); // 手動進行ACK channel.basicAck(envelope.getDeliveryTag(), false); } }; // 監聽隊列,第二個參數false,手動進行ACK channel.basicConsume(QUEUE_NAME, false, consumer); } }
- work消息模型:也稱為Task queue任務模型。當消息處理比較耗時的時候,可能產生消息的速度會遠遠大於消息的消費速度。長此以往,消息就會堆積越來越多,無法及時處理。此時就可以使用該模型:讓多個消費者綁定到一個隊列,共同消費隊列中的消息。隊列中的消息一旦消費,就會消失,因此任務是不會被重複執行的。
- 問題:又會存在疑惑,任務是如何分配的,可以和顯示生活中任務的分配聯想起來呦?【答案:① 默認是任務平分,一次分配完全(即公平分配);② channel.basicQos(int num); 設置每個消費者同時只能處理num條數據(即能者多勞,耗時小的多處理些,你懂滴)】。
-
public class Send { private final static String QUEUE_NAME = "task_work_queue"; public static void main(String[] args) throws Exception { // 獲取到連接 Connection connection = ConnectionUtil.getConnection(); // 從連接中創建通道,使用通道才能完成消息相關的操作 final Channel channel = connection.createChannel(); // 聲明(創建)隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 循環發布任務 for (int i=0; i<50; i++) { // 消息內容 String message = "task ... " + i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); Thread.sleep(i * 2); } // 關閉通道和連接 channel.close(); connection.close(); } }
public class Recv { private final static String QUEUE_NAME = "task_work_queue"; public static void main(String[] args) throws Exception { // 獲取連接 Connection connection = ConnectionUtil.getConnection(); // 創建通道 Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 設置每個消費同時只能處理一條消息 channel.basicQos(1); // 定義隊列的消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取消息,並且處理,這個方法類似事件監聽,如果有消息時,會自動被調用 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息體 String msg = new String(body); System.out.println(" [x] received: " + msg + "!"); try { // 模擬完成任務的耗時:1000ms Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } channel.basicAck(envelope.getDeliveryTag(), false); } }; // 監聽隊列,第二個參數:是否自動進行消息確認 channel.basicConsume(QUEUE_NAME, false, consumer); } }
/* * 對比上個消費者:耗時小,完成任務多些 */ public class Recv2 { private final static String QUEUE_NAME = "task_work_queue"; public static void main(String[] args) throws Exception { // 獲取連接 Connection connection = ConnectionUtil.getConnection(); // 創建通道 final Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 設置每個消費同時只能處理一條消息 channel.basicQos(1); // 定義隊列的消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取消息,並且處理,這個方法類似事件監聽,如果有消息時,會自動被調用 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息體 String msg = new String(body); System.out.println(" [x] received: " + msg + "!"); // 手動進行ACK channel.basicAck(envelope.getDeliveryTag(), false); } }; // 監聽隊列,第二個參數false,手動進行ACK channel.basicConsume(QUEUE_NAME, false, consumer); } }
- 訂閱模型分類:
- 訂閱模型 – Fanout:廣播。一條消息,會被所有訂閱的隊列消費。
-
public class Send { private final static String EXCHANGE_NAME = "fanout_exchange_test"; public static void main(String[] args) throws Exception { // 獲取連接 Connection connection = ConnectionUtil.getConnection(); // 獲取通道 Channel channel = connection.createChannel(); // 聲明exchange,指定類型為fanout channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); // 消息內容 String message = "Hello everyone"; // 發布消息到Exchange channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [生產者] Sent '" + message + "'"); channel.close(); connection.close(); } }
public class Recv { private final static String QUEUE_NAME = "fanout_exchange_queue_1"; private final static String EXCHANGE_NAME = "fanout_exchange_test"; public static void main(String[] args) throws IOException, TimeoutException { // 獲取到鏈接 Connection connection = ConnectionUtil.getConnection(); // 獲取通道 Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 定義隊列的消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取消息,並且處理,這個方法類似事件監聽,如果有消息時,會自動調用 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body); System.out.println(" [消費者1] received: " + msg + "!"); } }; // 監聽隊列,自動返回完成 channel.basicConsume(QUEUE_NAME, true, consumer); } }
public class Recv2 { private final static String QUEUE_NAME = "fanout_exchange_queue_2"; private final static String EXCHANGE_NAME = "fanout_exchange_test"; public static void main(String[] args) throws IOException, TimeoutException { // 獲取到鏈接 Connection connection = ConnectionUtil.getConnection(); // 獲取通道 Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 定義隊列的消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body); System.out.println(" [消費者2] received: " + msg + "!"); } }; // 監聽隊列,自動返回完成 channel.basicConsume(QUEUE_NAME, true, consumer); } }
- 訂閱模型 – Direct:不同的消息被不同的隊列消費。在Direct模型下:
- 隊列與交換機的綁定,不能是任意綁定,而是要指定至少一個 RoutingKey(路由key);
- 消息的發送方向 Exchange 發送消息時,也必須指定消息的 RoutingKey;
- Exchange 不再把消息交給每一個綁定的隊列,而是根據消息的 RoutingKey 進行判斷,只有隊列的RoutingKey與消息的RoutingKey完全一致,才會接收到消息。
-
public class Send { private final static String EXCHANGE_NAME = "direct_exchange_test"; public static void main(String[] args) throws Exception { // 獲取連接 Connection connection = ConnectionUtil.getConnection(); // 獲取通道 Channel channel = connection.createChannel(); // 聲明exchange,指定類型為direct channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); // 消息內容 String message = "商品增加了,id = 1002"; // 發布消息到Exchange,並且指定routing key為:delete,代表刪除商品 channel.basicPublish(EXCHANGE_NAME, "insert", null, message.getBytes()); System.out.println(" [商品服務] Sent '" + message + "'"); channel.close(); connection.close(); } }
public class Recv { private final static String QUEUE_NAME = "direct_exchange_queue_1"; private final static String EXCHANGE_NAME = "direct_exchange_test"; public static void main(String[] args) throws IOException, TimeoutException { // 獲取到鏈接 Connection connection = ConnectionUtil.getConnection(); // 獲取通道 Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機,同時指定需要訂閱的routing key。假設此處需要update和delete消息 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete"); // 定義隊列的消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取消息,並且處理,這個方法類似事件監聽,如果有消息時,會自動調用 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body); System.out.println(" [消費者1] received: " + msg + "!"); } }; // 監聽隊列,自動返回完成 channel.basicConsume(QUEUE_NAME, true, consumer); } }
public class Recv2 { private final static String QUEUE_NAME = "direct_exchange_queue_2"; private final static String EXCHANGE_NAME = "direct_exchange_test"; public static void main(String[] args) throws IOException, TimeoutException { // 獲取到鏈接 Connection connection = ConnectionUtil.getConnection(); // 獲取通道 Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機,同時指定需要訂閱的routing key。假設此處需要update和delete消息 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert"); // 定義隊列的消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取消息,並且處理,這個方法類似事件監聽,如果有消息時,會自動調用 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body); System.out.println(" [消費者2] received: " + msg + "!"); } }; // 監聽隊列,自動返回完成 channel.basicConsume(QUEUE_NAME, true, consumer); } }
- 訂閱模型 – Topic:可以根據RoutingKey把消息路由到不同的隊列,Topic類型Exchange可以讓隊列在綁定RoutingKey的時候使用通配符。
- RoutingKey 一般都是有一個或多個單片語成,多個單詞之間以「 . 」分割,例:item.insert。
- 通配符規則:
- #:匹配一個或多個詞
- *:匹配不多不少恰好一個詞
-
public class Send { private final static String EXCHANGE_NAME = "topic_durable_exchange_test"; public static void main(String[] args) throws Exception { // 獲取連接 Connection connection = ConnectionUtil.getConnection(); // 獲取通道 Channel channel = connection.createChannel(); // 開啟生產者確認 // channel.confirmSelect(); // 聲明exchange,指定類型為topic, 並且設置durable為true,持久化 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true); // 消息內容 String message = "商品新增了,id = 1002"; // 發布消息到Exchange,並且指定routing key,消息持久化 channel.basicPublish(EXCHANGE_NAME, "item.insert", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println(" [商品服務] Sent '" + message + "'"); // 等待rabbitmq的確認消息,true為確認收到,false為發出有誤 // channel.waitForConfirms(); channel.close(); connection.close(); } }
public class Recv { private final static String QUEUE_NAME = "topic_durable_exchange_queue_1"; private final static String EXCHANGE_NAME = "topic_durable_exchange_test"; public static void main(String[] args) throws IOException, TimeoutException { // 獲取到鏈接 Connection connection = ConnectionUtil.getConnection(); // 獲取通道 Channel channel = connection.createChannel(); // 聲明隊列, 第二個參數:true代表聲明為持久化 channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 綁定隊列到交換機,同時指定需要訂閱的routing key。假設此處需要update和delete消息 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete"); // 定義隊列的消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取消息,並且處理,這個方法類似事件監聽,如果有消息時,會自動調用 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body); System.out.println(" [消費者1] received: " + msg + "!"); } }; // 監聽隊列,自動返回完成 channel.basicConsume(QUEUE_NAME, true, consumer); } }
public class Recv2 { private final static String QUEUE_NAME = "topic_durable_exchange_queue_2"; private final static String EXCHANGE_NAME = "topic_durable_exchange_test"; public static void main(String[] args) throws IOException, TimeoutException { // 獲取到鏈接 Connection connection = ConnectionUtil.getConnection(); // 獲取通道 Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 綁定隊列到交換機,同時指定需要訂閱的routing key。假設此處需要update和delete消息 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.*"); // 定義隊列的消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取消息,並且處理,這個方法類似事件監聽,如果有消息時,會自動調用 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body); System.out.println(" [消費者2] received: " + msg + "!"); } }; // 監聽隊列,自動返回完成 channel.basicConsume(QUEUE_NAME, true, consumer); } }
- 訂閱模型 – Fanout:廣播。一條消息,會被所有訂閱的隊列消費。
- 基本消息模型:RabbitMQ是一個消息的代理者(Message broker):它接收消息並且傳遞消息。你可以認為它是一個郵局,當你投遞郵件到一個郵箱,你很肯定郵遞員終究會將郵件遞交給你的收件人。與此類似,RabbitMQ可以是一個郵箱、郵局、同時還是郵遞員。不同之處在於:RabbitMQ不是傳遞紙質郵件,而是二進位的數據。
- 當然啦,中國這麼hot的rabbitMQ自然也是有集成到了springboot中滴,開心。
- maven坐標依賴:
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.4.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies>
- application.yml:
spring: rabbitmq: host: 127.0.0.1 username: guest password: guest virtual-host: /demo
-
@Component public class Listener { @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "spring.test.queue", durable = "true"), exchange = @Exchange( value = "spring.test.exchange", type = ExchangeTypes.TOPIC), key = {"#.#"})) public void listen(String msg) { System.out.println("接收到的消息: " + msg); } }
- maven坐標依賴:
- 持久化:要將消息持久化,前提是:隊列、Exchange都持久化。
- 交換機持久化:channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true); // 參數三:設置durable為true。
- 隊列持久化:channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 參數二:設置為true,表示設置隊列持久化。
- 消息持久化:channel.basicPublish(EXCHANGE_NAME, “item.insert”, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
- 解決消息丟失?
- ACK(消費者確認,由消費者向mq發送,防止消息丟失於消費者)
- 持久化(防止rabbitmq把消息丟失)
- 生產者確認機制publisher confirm(由mq向生產者發送,有些mq包含,有些不包含,比如:activeMQ不包含該機制,rabbitmq包含該機制)
- 發送消息前,將消息持久化到資料庫,並記錄消息狀態(可靠消息服務)
- 思考問題(這個問題就留給你們思考啦?沖啊):如何保證消息發送的重複性,如何保證介面的冪等性(同一介面被重複執行,其結果一致)?【提示:加標識 消息的重發要謹慎】