分佈式消息隊列

消息隊列

什麼是消息隊列

消息隊列(Message Queue),簡稱為MQ,是分佈式應用程序之間的通訊方法

常用的消息隊列 RabbitMQ ActiveMQ RocketMQ Kafka

一 RabbitMQ

  RabbitMQ 2007年發佈,是一個在AMQP(高級消息隊列協議)基礎上完成的,可復用的企業消息系統,是當前最主流的消息中間件之一。

  主要特性:

  1. 可靠性: 提供了多種技術可以讓你在性能和可靠性之間進行權衡。這些技術包括持久性機制、投遞確認、發佈者證實和高可用性機制;

  2. 靈活的路由:消息在到達隊列前是通過交換機進行路由的。RabbitMQ為典型的路由邏輯提供了多種內置交換機類型。如果你有更複雜的路由需求,可以將這些交換機組合起來使用,你甚至可以實現自己的交換機類型,並且當做RabbitMQ的插件來使用;

  3. 消息集群:在相同局域網中的多個RabbitMQ服務器可以聚合在一起,作為一個獨立的邏輯代理來使用;

  4. 隊列高可用:隊列可以在集群中的機器上進行鏡像,以確保在硬件問題下還保證消息安全;

  5. 多種協議的支持:支持多種消息隊列協議;

  6. 服務器端用Erlang語言編寫,支持只要是你能想到的所有編程語言;

  7. 管理界面: RabbitMQ有一個易用的用戶界面,使得用戶可以監控和管理消息Broker的許多方面;

  8. 跟蹤機制:如果消息異常,RabbitMQ提供消息跟蹤機制,使用者可以找出發生了什麼;

  9. 插件機制:官方啥都沒有,全靠第三方,但同時也增強了,插件的擴展性(個人以為)!

優點:
    1. 由於erlang語言的特性,mq 性能較好,高並發;

    2. 健壯、穩定、易用、跨平台、支持多種語言、文檔齊全;

    3. 有消息確認機制和持久化機制,可靠性高;

    4. 高度可定製的路由;

    5. 管理界面較豐富,在互聯網公司也有較大規模的應用;

    6. 社區活躍度高;
缺點:
    1. 儘管結合erlang語言本身的並發優勢,性能較好,但是不利於做二次開發和維護;

    2. 實現了代理架構,意味着消息在發送到客戶端之前可以在中央節點上排隊。此特性使得RabbitMQ易於使用和部署,但是使得其運行速度較慢,因為中央節點增加了延遲,消息封裝後也比較大;

    3. 需要學習比較複雜的接口和協議,學習和維護成本較高;

二 ActiveMQ

 ActiveMQ是由Apache出品,ActiveMQ 是一個完全支持JMS1.1和J2EE 1.4規範的 JMS Provider實現。它非常快速,支持多種語言的客戶端和協議,而且可以非常容易的嵌入到企業的應用環境中,並有許多高級功能。

  主要特性:

  1. 服從 JMS 規範:JMS 規範提供了良好的標準和保證,包括:同步或異步的消息分發,一次和僅一次的消息分發,消息接收和訂閱等等。遵從 JMS 規範的好處在於,不論使用什麼 JMS 實現提供者,這些基礎特性都是可用的;

  2. 連接性:ActiveMQ 提供了廣泛的連接選項,支持的協議有:HTTP/S,IP 多播,SSL,STOMP,TCP,UDP,XMPP等等。對眾多協議的支持讓 ActiveMQ 擁有了很好的靈活性。

  3. 支持的協議種類多:OpenWire、STOMP、REST、XMPP、AMQP ;

  4. 持久化插件和安全插件:ActiveMQ 提供了多種持久化選擇。而且,ActiveMQ 的安全性也可以完全依據用戶需求進行自定義鑒權和授權;

  5. 支持的客戶端語言種類多:除了 Java 之外,還有:C/C++,.NET,Perl,PHP,Python,Ruby;

  6. 代理集群:多個 ActiveMQ 代理可以組成一個集群來提供服務;

  1. 異常簡單的管理:ActiveMQ 是以開發者思維被設計的。所以,它並不需要專門的管理員,因為它提供了簡單又使用的管理特性。有很多中方法可以監控 ActiveMQ 不同層面的數據,包括使用在 JConsole 或者 ActiveMQ 的Web Console 中使用 JMX,通過處理 JMX 的告警消息,通過使用命令行腳本,甚至可以通過監控各種類型的日誌。
優點:
        1. 跨平台(JAVA編寫與平台無關有,ActiveMQ幾乎可以運行在任何的JVM上)

        2. 可以用JDBC:可以將數據持久化到數據庫。雖然使用JDBC會降低ActiveMQ的性能,但是數據庫一直都是開發人員最熟悉的存儲介質。將消息存到數據庫,看得見摸得着。而且公司有專門的DBA去對數據庫進行調優,主從分離;

        3. 支持JMS :支持JMS的統一接口;

        4. 支持自動重連;

        5. 有安全機制:支持基於shiro,jaas等多種安全配置機制,可以對Queue/Topic進行認證和授權。

        6. 監控完善:擁有完善的監控,包括Web Console,JMX,Shell命令行,Jolokia的REST API;

        7. 界面友善:提供的Web Console可以滿足大部分情況,還有很多第三方的組件可以使用,如hawtio;
缺點:
        1. 社區活躍度不及RabbitMQ高;

        2. 根據其他用戶反饋,會出莫名其妙的問題,會丟失消息;

        3. 目前重心放到activemq6.0產品-apollo,對5.x的維護較少;

   4. 不適合用於上千個隊列的應用場景;

三 RocketMQ

      RocketMQ出自 阿里公司的開源產品,用 Java 語言實現,在設計時參考了 Kafka,並做出了自己的一些改進,消息可靠性上比 Kafka 更好。RocketMQ在阿里集團被廣泛應用在訂單,交易,充值,流計算,消息推送,日誌流式處理,binglog分發等場景。

        主要特性:

        1. 是一個隊列模型的消息中間件,具有高性能、高可靠、高實時、分佈式特點;

        2. Producer、Consumer、隊列都可以分佈式;

        3. Producer向一些隊列輪流發送消息,隊列集合稱為Topic,Consumer如果做廣播消費,則一個consumer實例消費這個Topic對應的所有隊列,如果做集群消費,則多個Consumer實例平均消費這個topic對應的隊列集合;

        4. 能夠保證嚴格的消息順序;

        5. 提供豐富的消息拉取模式;

        6. 高效的訂閱者水平擴展能力;

        7. 實時的消息訂閱機制;

        8. 億級消息堆積能力;

        9. 較少的依賴;
優點:
        1. 單機支持 1 萬以上持久化隊列

        2. RocketMQ 的所有消息都是持久化的,先寫入系統 PAGECACHE,然後刷盤,可以保證內存與磁盤都有一份數據,訪問時,直接從內存讀取。

        3. 模型簡單,接口易用(JMS 的接口很多場合併不太實用);

        4. 性能非常好,可以大量堆積消息在broker中;

        5. 支持多種消費,包括集群消費、廣播消費等。

        6. 各個環節分佈式擴展設計,主從HA;

        7. 開發度較活躍,版本更新很快。
缺點:
        1. 支持的客戶端語言不多,目前是java及c++,其中c++不成熟;

        2. RocketMQ社區關注度及成熟度也不及前兩者;

        3. 沒有web管理界面,提供了一個CLI(命令行界面)管理工具帶來查詢、管理和診斷各種問題;

        4. 沒有在 mq 核心中去實現JMS等接口;

四 Kafka

Apache Kafka是一個分佈式消息發佈訂閱系統。它最初由LinkedIn公司基於獨特的設計實現為一個分佈式的提交日誌系統( adistributed commit log),,之後成為Apache項目的一部分。Kafka系統快速、可擴展並且可持久化。它的分區特性,可複製和可容錯都是其不錯的特性。

  主要特性:

  1. 快速持久化,可以在O(1)的系統開銷下進行消息持久化;

  2. 高吞吐,在一台普通的服務器上既可以達到10W/s的吞吐速率;

  3. 完全的分佈式系統,Broker、Producer、Consumer都原生自動支持分佈式,自動實現負載均衡;

  4. 支持同步和異步複製兩種HA;

  5. 支持數據批量發送和拉取;

  6. zero-copy:減少IO操作步驟;

  7. 數據遷移、擴容對用戶透明;

  8. 無需停機即可擴展機器;

  9. 其他特性:嚴格的消息順序、豐富的消息拉取模型、高效訂閱者水平擴展、實時的消息訂閱、億級的消息堆積能力、定期刪除機制;

優點
1. 客戶端語言豐富,支持java、.net、php、ruby、python、go等多種語言;

  2. 性能卓越,單機寫入TPS約在百萬條/秒,消息大小10個位元組;

  3. 提供完全分佈式架構, 並有replica機制, 擁有較高的可用性和可靠性, 理論上支持消息無限堆積;

  4. 支持批量操作;

  5. 消費者採用Pull方式獲取消息, 消息有序, 通過控制能夠保證所有消息被消費且僅被消費一次;

  6. 有優秀的第三方Kafka Web管理界面Kafka-Manager;

  7. 在日誌領域比較成熟,被多家公司和多個開源項目使用;
缺點
 1. Kafka單機超過64個隊列/分區,Load會發生明顯的飆高現象,隊列越多,load越高,發送消息響應時間變長

  2. 使用短輪詢方式,實時性取決於輪詢間隔時間;

  3. 消費失敗不支持重試;

  4. 支持消息順序,但是一台代理宕機後,就會產生消息亂序;

  5. 社區更新較慢;

  2. RabbitMQ/ActiveMQ/RocketMQ/Kafka對比

  這裡列舉了上述四種消息隊列的差異對比:
四種消息隊列差異對比圖

為什麼使用消息隊列,消息隊列的好處

消息隊列的好處就6個字,異步、程序解耦、銷峰

一 解耦
傳統模式:

缺點:耦合性,太強,每一個消費者接入,都要修改代碼

說到這裡要說一下生產者消費者這兩個概念,我一直沒有注意過這連個詞,因為消費者和生產者,出現往往就伴隨着分佈式,但是今天畫這張圖的時候,我才有了這想法,其實假如有兩個線程,其中線程a生產了一條數據,然後線程b,那走了這條數據,那麼其實線程a就是生產者,線程b就是消費者,消費者和和生產者,只是一種供求方式,並不是專屬於,分佈式!

消息隊列作為中間件

優點:生產者將消息發送到消息隊列中,消費者去消息隊列中取,實現了解耦,生產者無需修改代碼!

二 異步
傳統方式註冊

缺點:耦合性高及一些非必要的業務邏輯以同步的方式運行,太耗費時間。

消息隊列作為中間件

優點「:即實現了解耦同時異步處理一些非必要的邏輯,加快了頁面響應速度!

三 消峰
傳統模式

消息隊列作為中間件模式

使用消息隊列的缺點:

一 系統可用性降低

原來的系統如果沒問題那就一直沒問題,但是當你加入了消息隊列,那麼當消息隊列掛了呢?那系統就蹦了!

二 提高系統複雜性

要多考慮很多方面的問題,比如一致性問題、如何保證消息不被重複消費,如何保證保證消息可靠傳輸。因此,需要考慮的東西更多,系統複雜性增大。

如何保證消息不被重複

消息重複是如何引起的

其實無論是那種消息隊列,造成重複消費原因其實都是類似的。正常情況下,消費者在消費消息時候,消費完畢後,會發送一個確認信息給消息隊列,消息隊列就知道該消息被消費了,就會將該消息從消息隊列中刪除。只是不同的消息隊列發送的確認信息形式不同,例如RabbitMQ是發送一個ACK確認消息,當消息消費後需要提交ack但是當RabbitMQ確認收到,並將相應 message 從 RabbitMQ 的消息緩存中移除。但是在實際業務處理中,很可能消息接收到,業務處理出現異常,那麼該消息就會丟失此時就需要其中手動確認方式則需要在業務處理成功後,調用channel.basicAck(),手動簽收,如果手動簽收後因為網絡傳輸等等故障,確認信息沒有傳送到消息隊列,導致消息隊列不知道自己已經消費過該消息了,再次將該消息分發給其他的消費者。

冪等性指一次和多次請求某一個資源,對於資源本身應該具有同樣的結果。也就是說,其任意多次執行對資源本身所產生的影響均與一次執行的影響相同。在MQ中指,消費多條相同的消息,得到與消費該消息一次相同的結果。

以轉賬為例:
1.發送消息 
2.消息內容包含了id 和 版本和 金額
3.消費者接收到消息,則根據ID 和版本執行sql語句,
update account set money=money-?,version=version+1 where id=? and version=?
4.如果消費第二次,那麼同一個消息內容是修改不成功的。

以上大意就是在,支付數據提交到消息中間件後,由消息中間件將支付成功發送給消費放,當消息方因為網絡故障或者其他情況收不到消息,就有中間件多次發送,當消費方接受到消息後,在插入數據庫之前進行判斷如果數據庫顯示支付成功那麼則不修改,如果數據庫顯示為支付則修改!

如何保證消息隊列高可用

前文又說,消息隊列的缺點 系統可用性降低,一旦只有一台消息隊列,一旦他掛了那麼整個系統就崩了!

所以說一台消息隊列就是demo級別的,自己拿來玩玩就行!

![](/attachment/20201202/耿直的微笑.jpg

說下第二種

普通集群模式

普通集群模式就是在多台機器上啟動多個rabbitmq實例,每個機器啟動一個。但是創建的queue只會放在一個rabbitmq實例上面,但是其他的實例都同步了這個queue的元數據。在你消費的時候,如果連接到了另一個實例,他會從擁有queue的那個實例獲取消息然後再返回給你。

這種呢和所謂的分佈式,高可用毫無關係!

直接說第三種

鏡像隊列

什麼是鏡像隊列

默認情況下,rabbitmq集群中的隊列內容位於單個節點(聲明隊列的節點)上。這與交換器和綁定形成了對比,交換器和綁定始終可視為位於所有節點上。隊列可以選擇性地跨多個節點進行鏡像。

每個鏡像隊列由一個master和一個或多個mirrors組成。主節點位於一個通常稱為master的節點上。每個隊列都有自己的主節點。給定隊列的所有操作首先應用於隊列的主節點,然後傳播到鏡像。這包括隊列發佈(enqueueing publishes)、向消費者傳遞消息、跟蹤消費者的確認等等。

隊列鏡像意味着是一個集群內的節點。因此,不建議跨廣域網使用它(當然,客戶機仍然可以根據需要儘可能近地連接)。

發佈到隊列的消息將複製到所有鏡像。不管消費者連接到哪個節點,都會連接到master,鏡像會刪除在master上已確認的消息。因此,隊列鏡像提高了可用性,但不會在節點之間分配負載(所有參與節點都完成所有工作)。

如果承載隊列master的節點出現故障,則最舊的鏡像將升級為新的master,只要它已同步。根據隊列鏡像參數,也可以升級未同步的鏡像。

在分佈式系統中,通常使用多個術語來標識主副本和輔助副本。本指南通常使用「master」來指代隊列的主副本,使用「mirror」指代輔助副本。但是,你會發現很多地方都使用了”slave”。這是因為rabbitmq CLI工具在歷史上一直使用術語「slave」來指代輔助設備。因此,這兩個術語目前都可以互換使用,但我們希望最終擺脫遺留術語。

在任意的節點(A或者B)中執行如下命令:

rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'

解釋
rabbitmqctl set_policy 
	用於設置策略
ha-all 
	表示設置為鏡像隊列並策略為所有節點可用 ,意味着 隊列會被(同步)到所有的節點,當一個節點被加入到集群中時,也會同步到新的節點中,此策略比較保守,性能相對低,對接使用半數原則方式設置(N/2+1),例如:有3個結點 此時可以設置為:ha-two 表示同步到2個結點即可。
"^"  表示針對的隊列的名稱的正則表達式,此處表示匹配所有的隊列名稱
'{"ha-mode":"all"}' 設置一組key/value的JSON 設置為高可用模式 匹配所有exchange

此時查看web管理界面:添加一個隊列itcast111,如下圖已經可以出現結果為有一個結點,並且是ha-all模式(鏡像隊列模式)

如何處理消息丟失的問題

其實無論是那種消息隊列,造成重複消費原因其實都是類似的。正常情況下,消費者在消費消息時候,消費完畢後,會發送一個確認信息給消息隊列,消息隊列就知道該消息被消費了,就會將該消息從消息隊列中刪除。只是不同的消息隊列發送的確認信息形式不同,例如RabbitMQ是發送一個ACK確認消息,當消息消費後需要提交ack但是當RabbitMQ確認收到,並將相應 message 從 RabbitMQ 的消息緩存中移除。但是在實際業務處理中,很可能消息接收到,業務處理出現異常,那麼該消息就會丟失此時就需要其中手動確認方式則需要在業務處理成功後,調用channel.basicAck(),手動簽收如果出現異常,則調用channel.basicNack()等方法,讓其按照業務功能進行處理,比如:重新發送,比如拒絕簽收進入死信隊列等等。!這個方式可以消息丟失但是不能解決消息重複

何保證消息的順序性

針對這個問題,通過某種算法,將需要保持先後順序的消息放到同一個消息隊列中比如rabbitMq中queue然後只用一個消費者去消費該隊列。

那如果為了吞吐量,有多個消費者去消費怎麼辦?

下面的就有點抽象了

RabbitMQ:一個 queue,多個 consumer。比如,生產者向 RabbitMQ 里發送了三條數據,順序依次是 data1/data2/data3,壓入的是 RabbitMQ 的一個內存隊列。有三個消費者分別從 MQ 中消費這三條數據中的一條,結果消費者2先執行完操作,把 data2 存入數據庫,然後是 data1/data3。這不明顯亂了。

RabbitMQ解決方案

拆分多個 queue,每個 queue 一個 consumer,就是多一些 queue 而已,確實是麻煩點;或者就一個 queue 但是對應一個 consumer,然後這個 consumer 內部用內存隊列做排隊,然後分發給底層不同的 worker 來處理。

比如說發朋友圈,評論,刪除,三個異步操作,你朋友圈還沒有發出去,評論顯然是執行不了的,所以等,等朋友圈發完了,再去評論!

總而言之呢保證入隊有序,出隊讓消費者自行決定!

如何解決消息隊列的延時以及過期失效問題?消息隊列滿了以後該怎麼處理

首先分析一下什麼情況下回出現這個問題

可能你的消費端出了問題,不消費了,或者消費的極其極其慢。可能你的消息隊列集群的磁盤都快寫滿了,都沒人消費,這個時候怎麼辦?或者是整個這就積壓了幾個小時,你這個時候怎麼辦?或者是你積壓的時間太長了,導致比如rabbitmq設置了消息過期時間後就沒了怎麼辦?

所以就這事兒,其實線上挺常見的,一般不出,一出就是大case,一般常見於,舉個例子,消費端每次消費之後要寫mysql,結果mysql掛了,消費端hang那兒了,不動了。或者是消費端出了個什麼叉子,導致消費速度極其慢。

一般這個時候,只能操作臨時緊急擴容了,具體操作步驟和思路如下:

第一種情況

先修復consumer的問題,確保其恢復消費速度,然後將現有cnosumer都停掉

新建一個topic,partition是原來的10倍,臨時建立好原先10倍或者20倍的queue數量

然後寫一個臨時的分發數據的consumer程序,這個程序部署上去消費積壓的數據,消費之後不做耗時的處理,直接均勻輪詢寫入臨時建立好的10倍數量的queue

接着臨時徵用10倍的機器來部署consumer,每一批consumer消費一個臨時queue的數據

這種做法相當於是臨時將queue資源和consumer資源擴大10倍,以正常的10倍速度來消費數據

等快速消費完積壓數據之後,得恢復原先部署架構,重新用原先的consumer機器來消費消息

第二種情況

如果是rabbitmq,rabbitmq是可以設置過期時間的,就是TTL,如果消息在queue中積壓超過一定的時間就會被rabbitmq給清理掉,這個數據就沒了。那這就是第二個坑了。這就不是說數據會大量積壓在mq里,而是大量的數據會直接搞丟。

這個情況下,就不是說要增加consumer消費積壓的消息,因為實際上沒啥積壓,而是丟了大量的消息。我們可以採取一個方案,就是批量重導,這個我們之前線上也有類似的場景干過。就是大量積壓的時候,我們當時就直接丟棄數據了,然後等過了高峰期以後,比如打局絕地求生英雄聯盟啥都很快就到凌晨了,用戶都睡覺了。

這個時候我們就開始寫程序,將丟失的那批數據,寫個臨時程序,一點一點的查出來,然後重新灌入mq裏面去,把白天丟的數據給他補回來。也只能是這樣了。

假設1萬個訂單積壓在mq裏面,沒有處理,其中1000個訂單都丟了,你只能手動寫程序把那1000個訂單給查出來,手動發到mq里去再補一次

第三種情況

如果走的方式是消息積壓在mq里,那麼如果你很長時間都沒處理掉,此時導致mq都快寫滿了,咋辦?這個還有別的辦法嗎?沒有,誰讓你第一個方案執行的太慢了,你臨時寫程序,接入數據來消費,消費一個丟棄一個,都不要了,快速消費掉所有的消息。然後走第二個方案,到了晚上再補數據吧。

圖解