RabbitMQ的應用場景以及基本原理介紹
RabbitMQ 是一個由 erlang 開發的 AMQP(Advanced Message Queuing Protocol)的開源實現。
AMQP:高級消息隊列協議,是應用層協議的一個開放標準,為面向消息的中間件設計。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。 AMQP的主要特徵是面向消息、隊列、路由(包括點對點和發布/訂閱)、可靠性、安全。 RabbitMQ是一個開源的AMQP實現,伺服器端用Erlang語言編寫,支援多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支援AJAX。用於在分散式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。
一、應用場景
- 非同步處理
- 應用解耦
- 流量削峰
二、RabbitMQ 特性
RabbitMQ 最初起源於金融系統,用於在分散式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。具體特點包括:
# 可靠性(Reliability) RabbitMQ 使用一些機制來保證可靠性,如持久化、傳輸確認、發布確認。 # 靈活的路由(Flexible Routing) 在消息進入隊列之前,通過 Exchange 來路由消息的。對於典型的路由功能,RabbitMQ 已經提供了一些內置的 Exchange 來實現。針對更複雜的路由功能,可以將多個 Exchange 綁定在一起,也通過插件機制實現自己的 Exchange 。 # 消息集群(Clustering) 多個 RabbitMQ 伺服器可以組成一個集群,形成一個邏輯 Broker 。 # 高可用(Highly Available Queues) 隊列可以在集群中的機器上進行鏡像,使得在部分節點出問題的情況下隊列仍然可用。 # 多種協議(Multi-protocol) RabbitMQ 支援多種消息隊列協議,比如 STOMP、MQTT 等等。 # 多語言客戶端(Many Clients) RabbitMQ 幾乎支援所有常用語言,比如 Java、.NET、Ruby 等等。 # 管理介面(Management UI) RabbitMQ 提供了一個易用的用戶介面,使得用戶可以監控和管理消息 Broker 的許多方面。 # 跟蹤機制(Tracing) 如果消息異常,RabbitMQ 提供了消息跟蹤機制,使用者可以找出發生了什麼。 # 插件機制(Plugin System) RabbitMQ 提供了許多插件,來從多方面進行擴展,也可以編寫自己的插件。
三、RabbitMQ 基本概念
# Message 消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對於其他消息的優先權)、delivery-mode(指出該消息可能需要持久性存儲)等。 # Publisher 消息的生產者,也是一個向交換器發布消息的客戶端應用程式。 # Exchange 交換器,用來接收生產者發送的消息並將這些消息路由給伺服器中的隊列。 # Routing Key 路由關鍵字,exchange根據這個關鍵字進行消息投遞。 # Binding 綁定,用於消息隊列和交換器之間的關聯。一個綁定就是基於路由鍵將交換器和消息隊列連接起來的路由規則,所以可以將交換器理解成一個由綁定構成的路由表。 # Queue 消息隊列,用來保存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列裡面,等待消費者連接到這個隊列將其取走。 # Connection 網路連接,比如一個TCP連接。 # Channel 信道,多路復用連接中的一條獨立的雙向數據流通道。信道是建立在真實的TCP連接內地虛擬連接,AMQP 命令都是通過信道發出去的,不管是發布消息、訂閱隊列還是接收消息,這些動作都是通過信道完成。因為對於作業系統來說建立和銷毀 TCP 都是非常昂貴的開銷,所以引入了信道的概念,以復用一條 TCP 連接。 # Consumer 消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程式。 # Virtual Host 虛擬主機,表示一批交換器、消息隊列和相關對象。虛擬主機是共享相同的身份認證和加密環境的獨立伺服器域。每個 vhost 本質上就是一個 mini 版的 RabbitMQ 伺服器,擁有自己的隊列、交換器、綁定和許可權機制。vhost 是 AMQP 概念的基礎,必須在連接時指定,RabbitMQ 默認的 vhost 是 / 。 # Broker
四、Exchange 類型
Exchange分發消息時根據類型的不同分發策略有區別,目前共四種類型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由鍵,此外 headers 交換器和 direct 交換器完全一致,但性能差很多,目前幾乎用不到了,所以直接看另外三種類型:
- direct
消息中的路由鍵(routing key)如果和 Binding 中的 binding key 一致, 交換器就將消息發到對應的隊列中。路由鍵與隊列名完全匹配,如果一個隊列綁定到交換機要求路由鍵為「dog」,則只轉發 routing key 標記為「dog」的消息,不會轉發「dog.puppy」,也不會轉發「dog.guard」等等。它是完全匹配、單播的模式。
- fanout
每個發到 fanout 類型交換器的消息都會分到所有綁定的隊列上去。fanout 交換器不處理路由鍵,只是簡單的將隊列綁定到交換器上,每個發送到交換器的消息都會被轉發到與該交換器綁定的所有隊列上。很像子網廣播,每檯子網內的主機都獲得了一份複製的消息。fanout 類型轉發消息是最快的。
- topic
topic 交換器通過模式匹配分配消息的路由鍵屬性,將路由鍵和某個模式進行匹配,此時隊列需要綁定到一個模式上。它將路由鍵和綁定鍵的字元串切分成單詞,這些單詞之間用點隔開。它同樣也會識別兩個通配符:符號「#」和符號「」。#匹配0個或多個單詞,匹配不多不少一個單詞。
五、ConnectionFactory、Connection、Channel
ConnectionFactory、Connection、Channel都是RabbitMQ對外提供的API中最基本的對象。
- Connection 是 RabbitMQ 的 socket 鏈接,它封裝了socket 協議相關部分邏輯。
- ConnectionFactory 為 Connection 的製造工廠。
- Channel 是我們與 RabbitMQ 打交道的最重要的一個介面,我們大部分的業務操作是在 Channel 這個介面中完成的,包括定義Queue、定義Exchange、綁定 Queue 與 Exchange、發布消息等。
六、任務分發機制
1、Round-robin dispathching循環分發
RabbbitMQ的分發機制非常適合擴展,而且它是專門為並發程式設計的,如果現在load加重,那麼只需要創建更多的Consumer來進行任務處理
2、Message acknowledgment 消息 確認
在實際應用中,可能會發生消費者收到 Queue 中的消息,但沒有處理完成就宕機(或出現其他意外)的情況,這種情況下就可能會導致消息丟失。為了避免這種情況發生,我們可以要求消費者在消費完消息後發送一個回執給 RabbitMQ,RabbitMQ 收到消息回執(Message acknowledgment)後才將該消息從Queue中移除;如果 RabbitMQ 沒有收到回執並檢測到消費者的 RabbitMQ連接斷開,則RabbitMQ會將該消息發送給其他消費者(如果存在多個消費者)進行處理。這裡不存在timeout概念,一個消費者處理消息時間再長也不會導致該消息被發送給其他消費者,除非它的RabbitMQ連接斷開。 這裡會產生另外一個問題,如果我們的開發人員在處理完業務邏輯後,忘記發送回執給RabbitMQ,這將會導致嚴重的bug——Queue中堆積的消息會越來越多;消費者重啟後會重複消費這些消息並重複執行業務邏輯…
另外pub message是沒有ack的。
3、Message durability 消息持久化
如果我們希望即使在RabbitMQ服務重啟的情況下,也不會丟失消息,我們可以將Queue與Message都設置為可持久化的(durable),這樣可以保證絕大部分情況下我們的RabbitMQ消息不會丟失。但依然解決不了小概率丟失事件的發生(比如RabbitMQ伺服器已經接收到生產者的消息,但還沒來得及持久化該消息時RabbitMQ伺服器就斷電了),如果我們需要對這種小概率事件也要管理起來,那麼我們要用到事務。由於這裡僅為RabbitMQ的簡單介紹,所以這裡將不講解RabbitMQ相關的事務。
要持久化隊列queue的持久化需要在聲明時指定durable=True;
這裡要注意,隊列的名字一定要是Broker中不存在的,不然不能改變此隊列的任何屬性.
隊列和交換機有一個創建時候指定的標誌durable,durable的唯一含義就是具有這個標誌的隊列和交換機會在重啟之後重新建立,它不表示說在隊列中的消息會在重啟後恢復
消息持久化包括3部分
# 1.exchange持久化,在聲明時指定durable => true hannel.ExchangeDeclare(ExchangeName,"direct",durable:true,autoDelete:false,arguments:null);//聲明消息隊列,且為可持久化的 # 2.queue持久化,在聲明時指定durable => true channel.QueueDeclare(QueueName,durable:true,exclusive:false,autoDelete:false,arguments:null);//聲明消息隊列,且為可持久化的 # 3.消息持久化,在投遞時指定delivery_mode => 2(1是非持久化). channel.basic_publish(exchange='',routing_key="task_queue",body=message,properties=pika.BasicProperties(delivery_mode = 2, # make message persistent))
如果 exchange 和 queue 都是持久化的,那麼它們之間的binding 也是持久化的,如果 exchange 和 queue 兩者之間有一個持久化,一個非持久化,則不允許建立綁定.
注意:一旦創建了隊列和交換機,就不能修改其標誌了,例如,創建了一個non-durable的隊列,然後想把它改變成durable的,唯一的辦法就是刪除這個隊列然後重現創建。
關於持久化的進一步討論:
為了數據不丟失,我們採用了:
在數據處理結束後發送ack,這樣RabbitMQ Server會認為Message Deliver 成功。
持久化queue,可以防止RabbitMQ Server 重啟或者crash引起的數據丟失。
持久化Message,理由同上。
但是這樣能保證數據100%不丟失嗎?答案是否定的。問題就在與RabbitMQ 需要時間去把這些資訊存到磁碟上,這個time window 雖然短,但是它的確還是有。在這個時間窗口內如果數據沒有保存,數據還會丟失。還有另一個原因就是 RabbitMQ 並不是為每個 Message 都做 fsync:它可能僅僅是把它保存到Cache 里,還沒來得及保存到物理磁碟上。因此這個持久化還是有問題。但是對於大多數應用來說,這已經足夠了。當然為了保持一致性,你可以把每次的publish放到一個transaction中。這個transaction的實現需要user defined codes。那麼商業系統會做什麼呢?一種可能的方案是在系統異常重啟時或者斷電時,應該給各個應用留出時間去flash cache,保證每個應用都能 exit gracefully。
4、Fair dispath 公平分發
你可能也注意到了,分發機制不是那麼優雅,默認狀態下,RabbitMQ將第n個Message分發給第n個Consumer。n是取余後的,它不管Consumer是否還有unacked Message,只是按照這個默認的機制進行分發.
那麼如果有個Consumer工作比較重,那麼就會導致有的Consumer基本沒事可做,有的Consumer卻毫無休息的機會,那麼,Rabbit是如何處理這種問題呢?
- 4.1 Prefetch count
前面我們講到如果有多個消費者同時訂閱同一個Queue中的消息,Queue中的消息會被平攤給多個消費者。這時如果每個消息的處理時間不同,就有可能會導致某些消費者一直在忙,而另外一些消費者很快就處理完手頭工作並一直空閑的情況。我們可以通過設置prefetchCount來限制Queue每次發送給每個消費者的消息數,比如我們設置prefetchCount=1,則Queue每次給每個消費者發送一條消息;消費者處理完這條消息後Queue會再給該消費者發送一條消息。
通過basic.qos方法設置prefetch_count=1,這樣RabbitMQ就會使得每個Consumer在同一個時間點最多處理一個Message,換句話說,在接收到該Consumer的ack前,它不會將新的Message分發給它
channel.basic_qos(prefetch_count=1)
注意,這種方法可能會導致queue滿。當然,這種情況下你可能需要添加更多的Consumer,或者創建更多的virtual Host來細化你的設計。
七、消息序列化
RabbitMQ使用ProtoBuf序列化消息,它可作為RabbitMQ的Message的數據格式進行傳輸,由於是結構化的數據,這樣就極大的方便了Consumer的數據高效處理,當然也可以使用XML,與XML相比, ProtoBuf有以下優勢:
1.簡單
2.size小了3-10倍
3.速度快了20-100倍
4.易於編程
6.減少了語義的歧義.
,ProtoBuf具有速度和空間的優勢,使得它現在應用非常廣泛
八、RPC
MQ 本身是基於非同步的消息處理,前面的示例中所有的生產者(P)將消息發送到 RabbitMQ 後不會知道消費者(C)處理成功或者失敗(甚至連有沒有消費者來處理這條消息都不知道)。
但實際的應用場景中,我們很可能需要一些同步處理,需要同步等待服務端將我的消息處理完成後再進行下一步處理。這相當於RPC(Remote Procedure Call,遠程過程調用)。
RabbitMQ 中也支援 RPC,RabbitMQ 中實現 RPC 的機制是:
客戶端發送請求(消息)時,在消息的屬性(MessageProperties ,在 AMQP 協議中定義了14種 properties ,這些屬性會隨著消息一起發送)中設置兩個值 replyTo (一個 Queue 名稱,用於告訴伺服器處理完成後將通知我的消息發送到這個 Queue 中)和 correlationId (此次請求的標識號,伺服器處理完成後需要將此屬性返還,客戶端將根據這個id了解哪條請求被成功執行了或執行失敗)
伺服器端收到消息並處理,處理完消息後,將生成一條應答消息到replyTo 指定的 Queue ,同時帶上 correlationId 屬性
客戶端之前已訂閱 replyTo 指定的 Queue ,從中收到伺服器的應答消息後,根據其中的correlationId 屬性分析哪條請求被執行了,根據執行結果進行後續業務處理
九、RabbitMQ 選型和對比
1.從社區活躍度
按照目前網路上的資料,RabbitMQ 、activeM 、ZeroMQ 三者中,綜合來看,RabbitMQ 是首選。
2.持久化消息比較
ZeroMq 不支援,ActiveMq 和RabbitMq 都支援。持久化消息主要是指我們機器在不可抗力因素等情況下掛掉了,消息不會丟失的機制。
3.綜合技術實現
可靠性、靈活的路由、集群、事務、高可用的隊列、消息排序、問題追蹤、可視化管理工具、插件系統等等。
RabbitMq / Kafka 最好,ActiveMq 次之,ZeroMq 最差。當然ZeroMq 也可以做到,不過自己必須手動寫程式碼實現,程式碼量不小。尤其是可靠性中的:持久性、投遞確認、發布者證實和高可用性。
4.高並發
毋庸置疑,RabbitMQ 最高,原因是它的實現語言是天生具備高並發高可用的erlang 語言。
5.比較關注的比較, RabbitMQ 和 Kafka
RabbitMq 比 Kafka 成熟,在可用性上,穩定性上,可靠性上,RabbitMq 勝於 Kafka(理論上)。
另外,Kafka 的定位主要在日誌等方面, 因為Kafka 設計的初衷就是處理日誌的,可以看做是一個日誌(消息)系統一個重要組件,針對性很強,所以 如果業務方面還是建議選擇 RabbitMq 。
還有就是,Kafka 的性能(吞吐量、TPS )比RabbitMq 要高出來很多。
選型最後總結:
如果我們系統中已經有選擇 Kafka ,或者 RabbitMq ,並且完全可以滿足現在的業務,建議就不用重複去增加和造輪子。
可以在 Kafka 和 RabbitMq 中選擇一個適合自己團隊和業務的,這個才是最重要的。但是毋庸置疑現階段,綜合考慮沒有第三選擇。