RocketMQ 詳解系列
- 2022 年 8 月 8 日
- 筆記
什麼是RocketMQ
RocketMQ作為一款純java、分散式、隊列模型的開源消息中間件,支援事務消息、順序消息、批量消息、定時消息、消息回溯等。主要功能是非同步解耦和流量削峰:。
常見的MQ主要有:ActiveMQ、RabbitMQ、Kafka、RocketMQ
四種消息中間件的基本介紹:
特性 | ActiveMQ | RabbitMQ | Kafka | RocketMQ |
---|---|---|---|---|
單機吞吐量 | 萬級,比RocketMQ和Kafka第一個級別 | 同ActiveMQ | 10萬級,支撐高吞吐 | 10萬級,高吞吐,一般配合大數據類的系統進行實時數據計算、日誌採集等場景 |
topic數量對吞吐量的影響 | topic可以達到幾百/幾千級別,吞吐量會有較小幅度的下降,這是RocketMQ的一大優勢,在同等機器下,可以支撐大量的topic | topic從幾十到幾百時,吞吐量會大幅度下降,在同等機器下,kafka盡量保證topic數量不要過多,如果要支撐大規模的topic,需要增加更多的機器資源 | ||
時效性 | ms級 | 微秒級別,RabbitMQ的特性,延遲最低 | ms級別 | 延遲在ms級別以內 |
可用性 | 高,基於主從架構實現高可用 | 同ActiveMQ | 非常高,分散式架構 | 非常高,分散式一個數據多個副本,少數機器宕機,不會丟失數據,不會導致不可用 |
消息可靠性 | 有較低的概率丟失數據 | 基本不丟 | 經過參數優化配置,可以做到0丟失 | 經過參數優化配置,可以做到0丟失 |
功能支援 | MQ領域的功能機器完備 | 基於erlang開發,並發能力很強,性能極好,延時很低 | MQ功能較為完善,基本分散式,擴展性好 | 功能較簡單,主要支援簡單的MQ功能,在大數據領域的實時計算以及日誌採集被大規模使用 |
其他 | Apache開發,起步早,沒有經過高吞吐場景驗證,社區不活躍 | 開源、穩定、社區活躍度高 | 阿里開源,交給Apache,社區活躍度低 | Apache開發,開源、高吞吐量、社區活躍度高 |
消息中間件的使用場景:
非同步與解耦:
當我們下了一個訂單之後,訂單系統會進行RPC同步調用 支付系統、庫存系統、物流系統等,那麼系統之間就會有耦合性,耦合性越高的話,容錯性就越低,比如我們的支付系統如果宕機了,就會導致我們整個交易的異常,從而影響用戶的體驗。
如果我們中間加入了消息中間件,不管是支付還是庫存等系統,都是通過非同步的方式進行調用的,如果其中一個系統宕機了,不會影響我們用戶下單的使用。
本質上MQ第一步完成了 非同步 ,第二步完成了 解耦 。那麼系統的容錯性就越高。
流量削峰:
流量削峰也可以叫削峰填谷,比如一些互聯網公司大促場景,雙十一、店慶或者秒殺活動,都會使用到消息中間件。
如果在不使用消息中間件或者沒有流量削峰,每秒是很高的並發,這個時候如果我們的A系統,如果要將數據寫入到我們的MYSQL中,受限於MYSQL本身服務的上限,最大我們只能每秒處理200請求,這個時候會有大量的消息進行堆積,從而導致A系統的奔潰。
這個時候我們可以將用戶的請求消息通過MQ進行寫入,因為消息中間件本身是對數據量處理比較高的一個系統,所以對於每秒2000請求,消息中間件可以處理,然後A系統作為消息中間件的一個消費者,以固定的速度從MQ中拉取200個消息,完成我們的業務操作,用時間換空間 從而確保我們A系統的穩定性。
數據分發:
如果S系統,在對系統進行開發的時候,需要對接多個(A、B、C、D)系統,使用傳統的介面調用,中間有改動就需要修改我們的程式碼,當新增了A系統,我們需要去修改程式碼去調用A系統來完成對應的業務邏輯,如果我們當中的D系統需要移除, 同樣也需要修改程式碼刪除對應的介面調用。
如果S系統使用了消息中間件,我們S系統只要將消息交給MQ,剩下的不論是新增還是移除,還是原有的,他們都只是消息中間件的一個消費者,這個時候我們就便於數據的分發。
比如我們新增一個系統,我們只需要新增一個MQ的消費者,直接從MQ裡面拿消息就可以,當我們需要移除一個系統的時候,只需要取消對MQ消息的監聽即可。對於我們原有的S系統不需要進行額外的修改。如果使用MQ作為數據分發,減少數據的修改,提高開發的效率。
RocketMQ 基本概念
RocketMQ主要有四大核心組成部分:NameServer、Broker、Producer以及Consumer四部分
。這些角色通常以集群的方式存在,RocketMQ 基於純Java開發,具有高吞吐量、高可用性、適合大規模分散式系統應用的特點。
對於 RockerMQ
而言,我們想要啟動,必須首先啟動 NameServer
,在啟動 Brober
主機, Brober
會向 NameServer 註冊對應的路由和服務(Broker 地址、主體和),Producer會進行路由的發現,向NameServer請求Broker路由資訊,進行消息的發送。
作為Consumer要連通NameServer,獲取到相關的路由資訊,方便我們進行消息的訂閱。
Broker 也是一個很重要的角色,主要負責消息的存儲,不管是生產消息還是訂閱消息,消息的來源都是 Broker,一般來說消息的發送(Producer)只會發到主節點,然後Broker會進行消息的同步,同步到從節點,作為消費者(Consumer)也只會優先從Master節點,獲取消息,進行消費,除非主節點不可用或者非常繁忙,才會從從節點進行消費,Broker除了消息的中轉,還負責消息的持久化以及主從數據之間的複製
NameServer:
NameServer
是一個服務與註冊的發現中心。也是整個 RocketMQ 的「大腦」,所以 RocketMQ 需要先啟動 NameServer
再啟動 RocketMQ 中的 Broker
NameServer
是一個幾乎無狀態節點,可集群部署,節點之間無任何資訊同步。NameServer
底層由 Netty 實現,是記憶體式存儲,所以 NameServer
中的 broker、topic不會持久化。
NameServer
其角色類似Dubbo和zookeeper,主要負責Broker的動態註冊與發現。為什麼不使用zookeeper?rocketmq主要是在分散式情況下使用追求性能,因為zookeeper最求最終一致性,所以在性能上會有所折扣。
Broker:
消息伺服器(Broker
)是消息存儲中心,主要作用是接收來自 Producer
的消息並存儲,Consumer
從這裡取得消息。存儲與消息相關的元數據,包括用戶組、消費進度偏移量、隊列資訊等。從部署結構圖中可以看出 Broker
有 Master
和 Slave
兩種類型, Master
既可以寫又可以讀,Slave
不可以寫只可以讀。
Producer:
Producer
也稱為消息發布者(生產者),負責生產並發送消息至 Topic
。生產者向 broker
發送由業務應用程式系統生成的消息。RocketMQ
提供了發送:同步、非同步和單向(one-way)的多種範例。
Consumer:
也稱為消息訂閱者,負責從 Topic 接收並消費消息。消費者從 brokers
那裡拉取資訊並將其輸入應用程式。從Master拿到消息,執行完成後,會發送一個消息給Broker進行確認,這個就是ACK確認
RocketMQ 基本概念
分組(Group)
Group 分為兩個部分 生產者和消費者
-
生產者: 表示發送同一類消息的 Producer,通常情況下發送邏輯是一致的。發送普通消息時,用於標識使用,沒有特別的用處。
主要用來作用於事務消息,當事務消息中某條消息一直處於等待狀態並超時,Broker會回查同一個Group下的其他producer,確定該消息是 commit 還是 rollback
-
消費者: 消費者的分組就非常有意義了,消費者是標識一類
Consumer
的集合名稱,這類Consumer
通常消費一類消息,且消費邏輯一致。同一個Consumer Group
下的各個實例將共同消費 topic 的消息,起到負載均衡的作用。消費進度以
Consumer Group
為粒度管理,不同Consumer Group
之間消費進度彼此不受影響,即消息 A 被Consumer Group1
消費過,也會再給Consumer Group2
消費。
主體(Topic)
用來區分消息的種類,表示一類消息的邏輯名字,消息的邏輯管理單位,無論生產還是消費消息,都需要執行Topic。
一個發送者可以發送消息給一個或者多個Topic;
一個消息接受者可以訂閱一個或多個Topic消息;
消息隊列(Message Queue)
消息隊列 簡稱 Queue ,消息物理管理單位。用來並行發送和接收消息,相當於是Topic的分區。
一個Topic會有若干個Queue,消息的生產一般會比消息消費的速度要快,消息進行消費的時會有對應的業務邏輯進行處理,這個時候就會降低消息消費的速度。所有一般Topic會有若干個Queue。主要用來解決生產很快,消費很慢。
如果同一個Topic創建在不同的Broker,那麼不同的Broker有不同的Queue,將物理存儲在不同的Broker節點之上,具有水平擴展的能力。無論是生產者還是消費者,實際的操作都是針對Queue級別。
標籤(Tag)
RocketMQ 支援在發送時給 topic 的消息設置 tag,用於同一主題下區分不同類型的消息。
來自同一業務單元的消息,可以根據不同業務目的在同一主題下設置不同標籤。比如有一個 Topic 消息為水果,那麼水果可以有其他的標籤 可以是 香蕉、西瓜、草莓等等,我們可以把對應的消息,打上對應的標籤(Tag),這個就是方便我們在消費的時候做對應的篩選。
標籤能夠有效地保持程式碼的清晰度和連貫性,並優化 RocketMQ 提供的查詢系統。消費者可以根據Tag實現對不同子主題的不同消費邏輯,實現更好的擴展性。
偏移量(Offset)
在 RocketMQ 中,有很多 offset 的概念。一般我們只關心暴露到客戶端的 offset。不指定的話,一般指的是消費者消息的偏移量(ConsumerOffset)
Message queue
是無限長的數組。一條消息進來下標就會漲 1,而這個數組的下標就是 offset。
Message queue
中的 max offset 表示消息的最大 offset,Consumer offset
可以理解為標記 Consumer Group 在一條邏輯 Message Queue
上,消息消費到哪裡即消費進度。
RocketMQ 下載安裝
下載地址://rocketmq.apache.org/dowloading/releases/
環境要求:
- Windows/Linux 64位系統
- JDK1.8(64位)
- 源碼安裝需要安裝Maven 3.2.x
這裡我們用 rocketmq-4.9.2
來做演示案例。
設置環境變數:
變數名: ROCKETMQ_HOME
變數值: MQ解壓路徑\MQ文件夾名
啟動
在rocketmq-4.9.2\bin
目錄下,打開cmd窗口
先啟動 nameServer,啟動命令:start mqnamesrv.cmd
然後在啟動 Broker,啟動命令:start mqbroker.cmd -n 127.0.0.1:7906 autoCreateTopicEnable=true
管理端插件安裝:
老版本地址下載://codeload.github.com/apache/rocketmq-externals/zip/master
新版本地址://github.com/apache/rocketmq-dashboard
啟動完成之後,瀏覽器中輸入『127.0.0.1:8089』,成功後即可進行管理端查看。
消息發送
RocketMQ提供的原生客戶端的API,當然除了原生客戶端外,SpringBoot、SpringCloudStream也進行了集成,但本質上這些也是基於原生API的封裝,所以只需掌握原生API,其他的也會水到渠成。
導入MQ客戶端依賴
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.2</version>
</dependency>
消息發送:
/**
* 同步發送
*/
public class SyncProducer {
public static void main(String[] args) throws Exception{
// 實例化消息生產者Producer
DefaultMQProducer producer = new DefaultMQProducer("group_test");
// 設置NameServer的地址
producer.setNamesrvAddr("127.0.0.1:9876");
//producer.setSendLatencyFaultEnable(true);
// 啟動Producer實例
producer.start();
for (int i = 0; i < 10; i++) {
// 創建消息,並指定Topic,Tag和消息體
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 發送消息到一個Broker
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//如果不再發送消息,關閉Producer實例。
producer.shutdown();
}
}
總結
這篇主要是帶大家了解RocketMQ的基本原理和介紹,在後面的章節中,會帶大家深入了解和使用RocketMQ,如果覺得文章有幫助的,記得點贊關注,您的支援是我創作的最大動力。
怕什麼真理無窮,進一步有進一步的歡喜,大家加油!