RocketMQ知識(及開發實戰)

  • 2020 年 1 月 16 日
  • 筆記

MQ基礎概念:

  • MQ: 消息匯流排(Message Queue),是一種跨進程的通訊機制,用於上下游傳遞消息。在互聯網架構中,MQ是一種非常常見的上下游「邏輯解耦+物理解耦」的消息通訊服務。使用MQ之後,消息發送上游只需要依賴MQ,邏輯上和物理上都不用依賴其他服務。 MQ的不足 (1)系統更加複雜,多了一個MQ組件 (2)消息傳遞路徑更長,延時會增加 (3)消息可能會被重複消費 (4)上游無法知道下游的執行結果(因此,調用方實時依賴執行結果的業務場景,請使用調用,而不是MQ) 使用場景 (1)上游不關注執行結果 (2)上游關注結果,但執行時間比較長。舉個例子,微信支付,跨公網調用微信的介面,執行時間會比較長,但調用方又非常關注執行結果,此時一般怎麼玩呢?

image.png 一般採用「回調網關+MQ」方案來解耦: a、調用方直接跨公網調用微信介面 b、微信返回調用成功,此時並不代表返回成功 c、微信執行完成後,回調統一網關 d、網關將返回結果通知MQ e、請求方收到結果通知

  • rocketMQ: RocketMQ 是什麼? Github 上關於 RocketMQ 的介紹: RcoketMQ 是一款低延遲、高可靠、可伸縮、易於使用的消息中間件。具有以下特性: 支援發布/訂閱(Pub/Sub)和點對點(P2P)消息模型 在一個隊列中可靠的先進先出(FIFO)和嚴格的順序傳遞 支援拉(pull)和推(push)兩種消息模式 單一隊列百萬消息的堆積能力 支援多種消息協議,如 JMS、MQTT 等 分散式高可用的部署架構,滿足至少一次消息傳遞語義 提供 docker 鏡像用於隔離測試和雲集群部署 提供配置、指標和監控等功能豐富的 Dashboard
  • consumer group: 1、概念:消費者分組,多個消費者在一個消費者分組中。 2、注意點:一個consumer group中的機器相當於一個集群,consumer group中只有一台機器會接收到消息,並進行消費。每一個consumer group都會接收到消息。這樣子的設計要求消費端需要保證冪等性。
  • topic: 1、概念:Topic 是一種消息的邏輯分類,比如說你有訂單類的消息,也有庫存類的消息,那麼就需要進行分類,一個是訂單 Topic 存放訂單相關的消息,一個是庫存 Topic 存儲庫存相關的消息。 2、生產方發出的消息綁定某個topic,然後消費方監聽某個topic,消費方(各個group)接收到消息,進行消費 3、topic應用級別:整個應用最好都使用一個topic,而更加細的區分,使用tags來區分。
  • tag: 1、概念:標籤,用於對消息分類,在topic的基礎上進行更細的劃分。
  • nameServer: 1、概念:Name Server 為 producer 和 consumer 提供路由資訊。類似rpc中的註冊中心。當producer需要發送消息首先去詢問nameServer需要請求哪個broker。而當consumer需要拉取消息,也會先詢問nameServer需要請求哪個broker。
  • broker: 1、概念:rocketMQ中負責接收生產者消息、給消費者發送消息的組件。
  • Message: 1、概念:Message 是消息的載體。一個 Message 必須指定 topic。Message 還有一個可選的 tag 設置,以便消費端可以基於 tag 進行過濾消息。也可以添加額外的鍵值對,例如你需要一個業務 key 來查找 broker 上的消息,方便在開發過程中診斷問題。

MQ生產者者實例:

public class Producer {      public static void main(String[] args) throws MQClientException, InterruptedException {            //聲明並初始化一個producer          //需要一個producer group名字作為構造方法的參數,這裡為producer1          DefaultMQProducer producer = new DefaultMQProducer("producer1");            //設置NameServer地址,此處應改為實際NameServer地址,多個地址之間用;分隔          //NameServer的地址必須有,但是也可以通過環境變數的方式設置,不一定非得寫死在程式碼里          producer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876");            //調用start()方法啟動一個producer實例          producer.start();            //發送10條消息到Topic為TopicTest,tag為TagA,消息內容為「Hello RocketMQ」拼接上i的值          for (int i = 0; i < 10; i++) {              try {                  Message msg = new Message("TopicTest",// topic                          "TagA",// tag                          ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)// body                  );                    //調用producer的send()方法發送消息                  //這裡調用的是同步的方式,所以會有返回結果                  SendResult sendResult = producer.send(msg);                    //列印返回結果,可以看到消息發送的狀態以及一些相關資訊                  System.out.println(sendResult);              } catch (Exception e) {                  e.printStackTrace();                  Thread.sleep(1000);              }          }            //發送完消息之後,調用shutdown()方法關閉producer          producer.shutdown();      }  }

MQ消費者實例:

在開發過程中,如果想測試生產者是否發出了mq,可以編寫一個消費者進行測試

@Test  public void testMqConsumer() throws Exception {      String rocketmqAddress="10.113.41.2:9876;10.113.41.4:9876";        int threadNum = 5;      String topics = "WechatUnionCoreTemplateNotifyTopic";      String instanceName = "TemplateComsumer";      String groupName = "wechatUnionTemplateNotifyConsumer";      DefaultMQPushConsumer consumer = null;        consumer = new DefaultMQPushConsumer(groupName);      consumer.setNamesrvAddr(rocketmqAddress);//MQ地址      consumer.setClientCallbackExecutorThreads(threadNum);//消費現場數量      consumer.setInstanceName(instanceName);//實例名稱      consumer.subscribe(topics, "*");          //註冊監聽      consumer.registerMessageListener(new MessageListenerConcurrently() {            @Override          public ConsumeConcurrentlyStatus consumeMessage(                  List<MessageExt> msgs,                  ConsumeConcurrentlyContext context) {              for (int i = 0; i < msgs.size(); i++) {                  MessageExt msgExt =  msgs.get(i);                  String msgId = msgExt.getMsgId();                  Integer flag = msgExt.getFlag();                  TemplateNotifyItem templateNotifyItem = ProtoBufSerialize.fromProto(msgExt.getBody(), TemplateNotifyItem.class);                  logger.info("receive new Msg:    " + "  msgId=" + msgId + "   flag=" + flag + "  templateNotifyItem=" + templateNotifyItem);              }                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;          }      });      consumer.start();      logger.info("監聽執行中");            Thread.sleep(1000000);  }

參考: http://blog.csdn.net/manzhizhen/article/details/52606733 https://www.jianshu.com/p/824066d70da8 架構師之路-mq系列