RabbitMQ簡介

RabbitMQ簡介

  官網也講述的清楚而且還有例子,只不過是英文的,很多人看到英文就不明白說什麼了吧,即使有翻譯成中文,總覺得哪裡怪怪的,有些翻譯並不流暢。我還是支援多看官網,官網://www.rabbitmq.com/ 。下面是自己做的一點下筆記,有參考其他文檔。如有什麼不對的地方,希望大家能夠告訴我,通過留言板,像消息隊列一樣,你發送消息,我接收消息。

一、消息隊列(MQ)

  • 什麼是消息隊列:即MQ,Message Queue。是一種應用程式對應用對應用程式的通訊方式。應用程式通過讀寫出入隊列的消息(針對應用程式的數據)來通訊,而無需專用連接來連接他們。消息隊列是典型的:生產者、消費者模型。生產者不斷向消息隊列中生產消息,消費者不斷從隊列中獲取消息。【因為消息的生產和消費都是非同步的,而且只關心消息的發送和接收,沒有業務邏輯的侵入,這樣就實現了生產者和消費者的解耦
  • AMQP和JMS
    • AMQP:即Advanced Message Queuing Protocol,一個提供統一消息服務的應用層標準(高級消息隊列協議)。【限定了數據傳輸的格式和方式,跨語言跨平台,和http協議類似】。是應用層協議的一個開放標準,為面向消息的中間件設計。
    • JMS:Java MessageService,實際上指JMS API。JMS是SUN公司早期提出的消息標準,旨在為Java應用提供統一的消息操作,包括create、send、recieve等。JMS已經成為java Enterprise Edition的一部分。
    • 兩者間的區別和聯繫:
      • JMS是定義了統一的介面,來對消息操作進行統一;AMQP是通過規定協議來統一數據交互的格式。
      • JMS限定了必須使用Java語言;AMQP只是協議,不規定實現方式,因此是跨語言的。
      • JMS規定了兩種消息模型;而AMQP的消息模型更加豐富。
  • 常見MQ產品:
    • ActiveMQ:基於JMS, Apache旗下的
    • RabbitMQ:基於AMQP協議,erlang(一種通用的面向並發的程式語言)語言開發,穩定性好
    • RocketMQ:基於JMS,阿里巴巴產品,目前交由Apache基金會
    • Kafka:分散式消息系統,高吞吐量

二、RabbitMQ:

  • 下載安裝:先安裝erlang,在安裝rabbitmq。(這裡不詳細說明,如果有不是很清楚的,可以去搜索,相信很多夥伴已經分享過很多這樣的部落格啦)
  • 啟動步驟【window系統下的cmd命令行】:cd \RabbitMQ Server\rabbitmq_server-3.7.15\sbin(進入到你安裝的rabbitmq目錄的sbin目錄) –> rabbitmq-plugins enable rabbitmq_management –> rabbitmq-server
  • 五種消息模型:RabbitMQ提供了6中消息模型,但是第6種RPC,並不是MQ。其中3/4/5這三種屬於訂閱模式,只不過進行路由的方式不同。
    • 基本消息模型:RabbitMQ是一個消息的代理者(Message broker):它接收消息並且傳遞消息。你可以認為它是一個郵局,當你投遞郵件到一個郵箱,你很肯定郵遞員終究會將郵件遞交給你的收件人。與此類似,RabbitMQ可以是一個郵箱、郵局、同時還是郵遞員。不同之處在於:RabbitMQ不是傳遞紙質郵件,而是二進位的數據。
      •   問題:那麼RabbitMQ怎麼知道消息被接收了呢?(轉換思維,即如何避免消息的丟失?)【答:消費者的消息確認機制Acknowlege,當消費者獲取消息後,會向RabbitMQ發送回執ACK,告知消息已經被接收。不過這種回執ACK分兩種情況:① 自動ACK:消息一旦被接收,消費者自動發送ACK;② 手動ACK:消息接收後,不會發送ACK,需要手動調用。很好理解,望名知意嘛,嘿嘿嘿!!!】。
      • public class ConnectionUtil {
        
            /**
             * 建立與RabbitMQ的鏈接
             */
            public static Connection getConnection() throws IOException, TimeoutException {
                // 定義連接工廠
                ConnectionFactory factory = new ConnectionFactory();
                // 設置服務地址
                factory.setHost("127.0.0.1");
                //
                factory.setPort(5672);
                // 設置帳號資訊,用戶名、密碼、vhost
                factory.setVirtualHost("/demo");
                factory.setUsername("guest");
                factory.setPassword("guest");
                // 通過工廠獲取連接
                Connection connection = factory.newConnection();
                return connection;
            }
        }
        public class Send {
        
            private final static String QUEUE_NAME = "simple_queue";
        
            public static void main(String[] args) throws IOException, TimeoutException {
                // 獲取到連接
                Connection connection = ConnectionUtil.getConnection();
                // 從連接中創建通道,使用通道才能完成消息相關的操作
                Channel channel = connection.createChannel();
                // 聲明(創建)隊列
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                // 消息內容
                String message = "Hello word!";
                // 向指定的隊列中發送消息
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        
                System.out.println(" [x] Sent '" + message + "'");
        
                // 關閉通道和連接
                channel.close();
                connection.close();
            }
        }
        public class Recv {
        
            private final static String QUEUE_NAME = "simple_queue";
        
            public static void main(String[] args) throws Exception {
                // 獲取連接
                Connection connection = ConnectionUtil.getConnection();
                // 創建通道
                Channel channel = connection.createChannel();
                // 聲明隊列
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                // 定義隊列的消費者
                DefaultConsumer consumer = new DefaultConsumer(channel) {
                    // 獲取消息,並且處理,這個方法類似事件監聽,如果有消息時,會自動被調用
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                               byte[] body) throws IOException {
                        // body 即消息體
                        String msg = new String(body);
                        System.out.println(" [x] received: " + msg + "!");
                    }
                };
                // 監聽隊列,第二個參數:是否自動進行消息確認
                channel.basicConsume(QUEUE_NAME, true, consumer);
            }
        }
        public class Recv2 {
        
            private final static String QUEUE_NAME = "simple_queue";
        
            public static void main(String[] args) throws Exception {
                // 獲取連接
                Connection connection = ConnectionUtil.getConnection();
                // 創建通道
                Channel channel = connection.createChannel();
                // 聲明隊列
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                // 定義隊列的消費者
                DefaultConsumer consumer = new DefaultConsumer(channel) {
                    // 獲取消息,並且處理,這個方法類似事件監聽,如果有消息時,會自動被調用
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                               byte[] body) throws IOException {
                        // body 即消息體
                        String msg = new String(body);
                        System.out.println(" [x] received: " + msg + "!");
                        // 手動進行ACK
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                };
                // 監聽隊列,第二個參數false,手動進行ACK
                channel.basicConsume(QUEUE_NAME, false, consumer);
            }
        }
    • work消息模型:也稱為Task queue任務模型。當消息處理比較耗時的時候,可能產生消息的速度會遠遠大於消息的消費速度。長此以往,消息就會堆積越來越多,無法及時處理。此時就可以使用該模型:讓多個消費者綁定到一個隊列,共同消費隊列中的消息。隊列中的消息一旦消費,就會消失,因此任務是不會被重複執行的。
      •   問題:又會存在疑惑,任務是如何分配的,可以和顯示生活中任務的分配聯想起來呦?【答案:①  默認是任務平分,一次分配完全(即公平分配);② channel.basicQos(int num); 設置每個消費者同時只能處理num條數據(即能者多勞,耗時小的多處理些,你懂滴)】。
      • public class Send {
        
            private final static String QUEUE_NAME = "task_work_queue";
        
            public static void main(String[] args) throws Exception {
                // 獲取到連接
                Connection connection = ConnectionUtil.getConnection();
                // 從連接中創建通道,使用通道才能完成消息相關的操作
                final Channel channel = connection.createChannel();
                // 聲明(創建)隊列
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                // 循環發布任務
                for (int i=0; i<50; i++) {
                    // 消息內容
                    String message = "task ... " + i;
                    channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                    System.out.println(" [x] Sent '" + message + "'");
                    Thread.sleep(i * 2);
                }
                // 關閉通道和連接
                channel.close();
                connection.close();
            }
        }
        public class Recv {
        
            private final static String QUEUE_NAME = "task_work_queue";
        
            public static void main(String[] args) throws Exception {
                // 獲取連接
                Connection connection = ConnectionUtil.getConnection();
                // 創建通道
                Channel channel = connection.createChannel();
                // 聲明隊列
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                // 設置每個消費同時只能處理一條消息
                channel.basicQos(1);
                // 定義隊列的消費者
                DefaultConsumer consumer = new DefaultConsumer(channel) {
                    // 獲取消息,並且處理,這個方法類似事件監聽,如果有消息時,會自動被調用
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                               byte[] body) throws IOException {
                        // body 即消息體
                        String msg = new String(body);
                        System.out.println(" [x] received: " + msg + "!");
        
                        try {
                            // 模擬完成任務的耗時:1000ms
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                };
                // 監聽隊列,第二個參數:是否自動進行消息確認
                channel.basicConsume(QUEUE_NAME, false, consumer);
            }
        }
        /*
        * 對比上個消費者:耗時小,完成任務多些
        */
        public class Recv2 {
        
            private final static String QUEUE_NAME = "task_work_queue";
        
            public static void main(String[] args) throws Exception {
                // 獲取連接
                Connection connection = ConnectionUtil.getConnection();
                // 創建通道
                final Channel channel = connection.createChannel();
                // 聲明隊列
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                // 設置每個消費同時只能處理一條消息
                channel.basicQos(1);
                // 定義隊列的消費者
                DefaultConsumer consumer = new DefaultConsumer(channel) {
                    // 獲取消息,並且處理,這個方法類似事件監聽,如果有消息時,會自動被調用
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                               byte[] body) throws IOException {
                        // body 即消息體
                        String msg = new String(body);
                        System.out.println(" [x] received: " + msg + "!");
                        // 手動進行ACK
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                };
                // 監聽隊列,第二個參數false,手動進行ACK
                channel.basicConsume(QUEUE_NAME, false, consumer);
            }
        }
    • 訂閱模型分類:
      • 訂閱模型 – Fanout:廣播。一條消息,會被所有訂閱的隊列消費。
        • public class Send {
          
              private final static String EXCHANGE_NAME = "fanout_exchange_test";
          
              public static void main(String[] args) throws Exception {
                  // 獲取連接
                  Connection connection = ConnectionUtil.getConnection();
                  // 獲取通道
                  Channel channel = connection.createChannel();
                  // 聲明exchange,指定類型為fanout
                  channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
                  // 消息內容
                  String message = "Hello everyone";
                  // 發布消息到Exchange
                  channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
                  System.out.println(" [生產者] Sent '" + message + "'");
                  channel.close();
                  connection.close();
              }
          }
          public class Recv {
          
              private final static String QUEUE_NAME = "fanout_exchange_queue_1";
              private final static String EXCHANGE_NAME = "fanout_exchange_test";
          
              public static void main(String[] args) throws IOException, TimeoutException {
                  // 獲取到鏈接
                  Connection connection = ConnectionUtil.getConnection();
                  // 獲取通道
                  Channel channel = connection.createChannel();
                  // 聲明隊列
                  channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                  // 綁定隊列到交換機
                  channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
                  // 定義隊列的消費者
                  DefaultConsumer consumer = new DefaultConsumer(channel) {
                      // 獲取消息,並且處理,這個方法類似事件監聽,如果有消息時,會自動調用
                      @Override
                      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                          String msg = new String(body);
                          System.out.println(" [消費者1] received: " + msg + "!");
                      }
                  };
                  // 監聽隊列,自動返回完成
                  channel.basicConsume(QUEUE_NAME, true, consumer);
              }
          }
          public class Recv2 {
          
              private final static String QUEUE_NAME = "fanout_exchange_queue_2";
              private final static String EXCHANGE_NAME = "fanout_exchange_test";
          
              public static void main(String[] args) throws IOException, TimeoutException {
                  // 獲取到鏈接
                  Connection connection = ConnectionUtil.getConnection();
                  // 獲取通道
                  Channel channel = connection.createChannel();
                  // 聲明隊列
                  channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                  // 綁定隊列到交換機
                  channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
                  // 定義隊列的消費者
                  DefaultConsumer consumer = new DefaultConsumer(channel) {
                      @Override
                      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                          String msg = new String(body);
                          System.out.println(" [消費者2] received: " + msg + "!");
                      }
                  };
                  // 監聽隊列,自動返回完成
                  channel.basicConsume(QUEUE_NAME, true, consumer);
              }
          }
      • 訂閱模型 – Direct:不同的消息被不同的隊列消費。在Direct模型下:
        •   隊列與交換機的綁定,不能是任意綁定,而是要指定至少一個 RoutingKey(路由key);
        •   消息的發送方向 Exchange 發送消息時,也必須指定消息的 RoutingKey;
        •   Exchange 不再把消息交給每一個綁定的隊列,而是根據消息的 RoutingKey 進行判斷,只有隊列的RoutingKey與消息的RoutingKey完全一致,才會接收到消息。
        • public class Send {
          
              private final static String EXCHANGE_NAME = "direct_exchange_test";
          
              public static void main(String[] args) throws Exception {
                  // 獲取連接
                  Connection connection = ConnectionUtil.getConnection();
                  // 獲取通道
                  Channel channel = connection.createChannel();
                  // 聲明exchange,指定類型為direct
                  channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
                  // 消息內容
                  String message = "商品增加了,id = 1002";
                  // 發布消息到Exchange,並且指定routing key為:delete,代表刪除商品
                  channel.basicPublish(EXCHANGE_NAME, "insert", null, message.getBytes());
                  System.out.println(" [商品服務] Sent '" + message + "'");
                  channel.close();
                  connection.close();
              }
          }
          public class Recv {
          
              private final static String QUEUE_NAME = "direct_exchange_queue_1";
              private final static String EXCHANGE_NAME = "direct_exchange_test";
          
              public static void main(String[] args) throws IOException, TimeoutException {
                  // 獲取到鏈接
                  Connection connection = ConnectionUtil.getConnection();
                  // 獲取通道
                  Channel channel = connection.createChannel();
                  // 聲明隊列
                  channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                  // 綁定隊列到交換機,同時指定需要訂閱的routing key。假設此處需要update和delete消息
                  channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
                  channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
                  // 定義隊列的消費者
                  DefaultConsumer consumer = new DefaultConsumer(channel) {
                      // 獲取消息,並且處理,這個方法類似事件監聽,如果有消息時,會自動調用
                      @Override
                      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                          String msg = new String(body);
                          System.out.println(" [消費者1] received: " + msg + "!");
                      }
                  };
                  // 監聽隊列,自動返回完成
                  channel.basicConsume(QUEUE_NAME, true, consumer);
              }
          }
          public class Recv2 {
          
              private final static String QUEUE_NAME = "direct_exchange_queue_2";
              private final static String EXCHANGE_NAME = "direct_exchange_test";
          
              public static void main(String[] args) throws IOException, TimeoutException {
                  // 獲取到鏈接
                  Connection connection = ConnectionUtil.getConnection();
                  // 獲取通道
                  Channel channel = connection.createChannel();
                  // 聲明隊列
                  channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                  // 綁定隊列到交換機,同時指定需要訂閱的routing key。假設此處需要update和delete消息
                  channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
                  channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
                  channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
                  // 定義隊列的消費者
                  DefaultConsumer consumer = new DefaultConsumer(channel) {
                      // 獲取消息,並且處理,這個方法類似事件監聽,如果有消息時,會自動調用
                      @Override
                      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                          String msg = new String(body);
                          System.out.println(" [消費者2] received: " + msg + "!");
                      }
                  };
                  // 監聽隊列,自動返回完成
                  channel.basicConsume(QUEUE_NAME, true, consumer);
              }
          }
      • 訂閱模型 – Topic:可以根據RoutingKey把消息路由到不同的隊列,Topic類型Exchange可以讓隊列在綁定RoutingKey的時候使用通配符
        • RoutingKey 一般都是有一個或多個單片語成,多個單詞之間以「 . 」分割,例:item.insert。
        • 通配符規則:
          • #:匹配一個或多個詞
          • *:匹配不多不少恰好一個詞
        • public class Send {
          
              private final static String EXCHANGE_NAME = "topic_durable_exchange_test";
          
              public static void main(String[] args) throws Exception {
                  // 獲取連接
                  Connection connection = ConnectionUtil.getConnection();
                  // 獲取通道
                  Channel channel = connection.createChannel();
          
                  // 開啟生產者確認
          //        channel.confirmSelect();
          
                  // 聲明exchange,指定類型為topic, 並且設置durable為true,持久化
                  channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);
                  // 消息內容
                  String message = "商品新增了,id = 1002";
                  // 發布消息到Exchange,並且指定routing key,消息持久化
                  channel.basicPublish(EXCHANGE_NAME, "item.insert", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
                  System.out.println(" [商品服務] Sent '" + message + "'");
          
                  // 等待rabbitmq的確認消息,true為確認收到,false為發出有誤
          //        channel.waitForConfirms();
          
                  channel.close();
                  connection.close();
              }
          }
          public class Recv {
          
              private final static String QUEUE_NAME = "topic_durable_exchange_queue_1";
              private final static String EXCHANGE_NAME = "topic_durable_exchange_test";
          
              public static void main(String[] args) throws IOException, TimeoutException {
                  // 獲取到鏈接
                  Connection connection = ConnectionUtil.getConnection();
                  // 獲取通道
                  Channel channel = connection.createChannel();
                  // 聲明隊列, 第二個參數:true代表聲明為持久化
                  channel.queueDeclare(QUEUE_NAME, true, false, false, null);
                  // 綁定隊列到交換機,同時指定需要訂閱的routing key。假設此處需要update和delete消息
                  channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");
                  channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");
                  // 定義隊列的消費者
                  DefaultConsumer consumer = new DefaultConsumer(channel) {
                      // 獲取消息,並且處理,這個方法類似事件監聽,如果有消息時,會自動調用
                      @Override
                      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                          String msg = new String(body);
                          System.out.println(" [消費者1] received: " + msg + "!");
                      }
                  };
                  // 監聽隊列,自動返回完成
                  channel.basicConsume(QUEUE_NAME, true, consumer);
              }
          }
          public class Recv2 {
          
              private final static String QUEUE_NAME = "topic_durable_exchange_queue_2";
              private final static String EXCHANGE_NAME = "topic_durable_exchange_test";
          
              public static void main(String[] args) throws IOException, TimeoutException {
                  // 獲取到鏈接
                  Connection connection = ConnectionUtil.getConnection();
                  // 獲取通道
                  Channel channel = connection.createChannel();
                  // 聲明隊列
                  channel.queueDeclare(QUEUE_NAME, true, false, false, null);
                  // 綁定隊列到交換機,同時指定需要訂閱的routing key。假設此處需要update和delete消息
                  channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.*");
                  // 定義隊列的消費者
                  DefaultConsumer consumer = new DefaultConsumer(channel) {
                      // 獲取消息,並且處理,這個方法類似事件監聽,如果有消息時,會自動調用
                      @Override
                      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                          String msg = new String(body);
                          System.out.println(" [消費者2] received: " + msg + "!");
                      }
                  };
                  // 監聽隊列,自動返回完成
                  channel.basicConsume(QUEUE_NAME, true, consumer);
              }
          }
  • 當然啦,中國這麼hot的rabbitMQ自然也是有集成到了springboot中滴,開心。
    • maven坐標依賴:
      <parent>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-parent</artifactId>
              <version>2.0.4.RELEASE</version>
      </parent>
      
      <dependencies>
              <dependency>
                  <groupId>org.springframework.boot</groupId>
                  <artifactId>spring-boot-starter-amqp</artifactId>
              </dependency>
      </dependencies>
    • application.yml:
      spring:
        rabbitmq:
          host: 127.0.0.1
          username: guest
          password: guest
          virtual-host: /demo 
    • @Component
      public class Listener {
      
          @RabbitListener(bindings = @QueueBinding(
                  value = @Queue(value = "spring.test.queue", durable = "true"),
                  exchange = @Exchange(
                          value = "spring.test.exchange",
                          type = ExchangeTypes.TOPIC),
                  key = {"#.#"}))
          public void listen(String msg) {
              System.out.println("接收到的消息: " + msg);
          }
      }
  • 持久化:要將消息持久化,前提是:隊列、Exchange都持久化
    • 交換機持久化:channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);  // 參數三:設置durable為true。
    • 隊列持久化:channel.queueDeclare(QUEUE_NAME, true, false, false, null);  // 參數二:設置為true,表示設置隊列持久化。
    • 消息持久化:channel.basicPublish(EXCHANGE_NAME, “item.insert”, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
  • 解決消息丟失?
    • ACK(消費者確認,由消費者向mq發送,防止消息丟失於消費者)
    • 持久化(防止rabbitmq把消息丟失)
    • 生產者確認機制publisher confirm(由mq向生產者發送,有些mq包含,有些不包含,比如:activeMQ不包含該機制,rabbitmq包含該機制)
    • 發送消息前,將消息持久化到資料庫,並記錄消息狀態(可靠消息服務)
  • 思考問題(這個問題就留給你們思考啦?沖啊):如何保證消息發送的重複性,如何保證介面的冪等性(同一介面被重複執行,其結果一致)?【提示:加標識 消息的重發要謹慎】