RabbitMQ的高級特性概念理解

  • 2019 年 11 月 26 日
  • 筆記

1、RabbitMQ中的消息如何保障百分之百的投遞成功?

  答:百分之百的投遞成功,方案可以參考下面的2、3。

2、什麼是生產者端的可靠性投遞?

  答:第一步,生產者保障消息的成功發出。第二步,保障RabbitMQ的節點成功接收到生產者發送的消息。第三步,發送端收到RabbitMQ節點(即Broker)確認應答。第四步,完善的消息進行補償機制。

3、如何實現生產端的可靠性投遞,解決方案,如下所示。

  答:第一種,消息落庫,對消息狀態進行打標。意思是發送消息的時候,將消息持久化到數據庫中,將消息設置一個狀態,比如,剛發送出去,消息狀態叫做發送中,當消息到達Broker端,Broker端返回給你一個響應,當你收到這個響應,代表了Broker端已經收到了該條消息(即應答確認),此時將消息的狀態設置為發送成功,做一個打標。此時,針對沒有進行響應的消息狀態,可以做輪詢操作,將未返回給你響應的消息進行重新發送(最大努力嘗試值,可以手動設置)。直到所有消息發送成功並返回給你響應。

第二種,消息的延遲投遞,做二次確認,回調檢查。此種方式和減少和數據庫的操作。第一種方式在高並發的場景下可能不是很適合。

4、RabbitMQ的冪等性概念。冪等性是什麼,冪等性的概念。

  答:可能對一件事情進行一個操作,這個操作可能執行一百次或者一千次,最終執行結果都是相同的,即執行一百次或者一千次結果都是相同的。

5、消費端,冪等性保障,比如,在海量訂單產生的業務高峰期,如何避免消息的重複消費問題?比如在高並發的情況下,會有好多消息到達RabbitMQ的Broker,消費端監聽大量的消息,難免出現消息的重複投遞,網絡閃斷導致的消息重複投遞。如果不做消費端,冪等就會出現消息重複消費。

  答:消費端實現冪等性,就意味着,我們的消息永遠不會消費多次,只能消費一次,即使我們收到了多條一樣的消息。

6、主流的冪等性操作。

  方案一:唯一ID(全局生成的id) + 指紋碼機制,利用數據庫主鍵去重方案。好處是實現比較簡單。壞處是高並發下有數據庫寫入的性能瓶頸。解決方案跟進ID進行分庫分表進行算法路由。

  方案二:利用Redis的原子性去實現,實現去重,實現冪等性操作。需要考慮的問題,第一個問題是我們是否要進行數據落庫,如果落庫的話,關鍵解決的問題是數據庫和緩存如何做到原子性。第二個問題,如果不進行落庫,那麼都存儲到緩存中,如何設置定時同步的策略。

7、Confirm確認消息詳解,理解Confirm消息確認機制。

  答:消息的確認,是指生產者投遞消息後,如果Broker收到消息,則會給我們生產者一個應答。生產者進行接收應答,用來確定這條消息是否正常的發送到Broker,這種方式也是消息的可靠性投遞的核心保障。

Confirm確認消息實現,如何實現Confirm確認消息?   第一步,在channel上開啟確認模式,即channel.confirmSelect()。調用方法進行確認模式的選擇。   第二部,在channel上添加監聽,addConfirmListener,當broker回送應答的時候,根據addConfirmListener添加的監聽進行接收broker回送的結果,根據回送的結果。監聽成功和失敗的返回結果,根據具體的結果對消息進行重新發送,或者記錄日誌等後續處理。

8、Return消息機制,Return Listener用於處理一些不可路由的消息。也是生產段添加的一個監聽。

  我們的消息生產者,通過指定一個Exchange和Routingkey,把消息送達到某一個隊列中去,然後我們的消費者監聽隊列,進行消息處理操作。但是在某些情況下,如果我們在發送消息的時候,當前的exchange不存在或者指定的路由key路由不到,這個時候如果我們需要監聽這種不可達的消息,就要使用Return Listener。

  Return消息機制,在基礎api中有一個關鍵的配置項。Mandatory,如果為true,則監聽器會接收到路由不可達的消息,然後進行後續處理,如果為false,那麼broker端自動刪除該消息。

9、RabbitMQ消費端的限流策略,什麼是消費端的限流。假設一個場景,首先,我們RabbitMQ服務器有上萬條未處理的消息,我們隨便打開一個消費者客戶端,會出現下面的情況。巨量的消息瞬間全部推送過來,但是我們單個客戶端無法同時處理這麼多數據。

  1)、消費端限流,RabbitMQ提供了一種Qos(quality of service服務質量保證)功能,即在非自動確認消息的前提下,如果有一定數據的消息堆積,可以設置一個限制,我們不去更新消費,不去做ack,不去做確認機制。如果有一定數目的消息(通過基於consume或者channel設置Qos的值),消息未被確認前,不進行消費新的消息。Rabbitmq有兩種簽收模式,一種是自動簽收,一種是手動簽收。如果做消費端限流的話,不能設置自動簽收模式,即autoack=false。

  2)、消費端的方法void BasicQos(unit prefetchSize,ushort prefetchCount,bool global);

    a、參數1是prefetchSize消息大小的限制,設置為0,不做大小限制。

    b、參數2是prefetchCount一次最多可以處理的消息,會告訴RabbitMQ不要同時給一個消費者推送多於N個消息,即一旦有N個消息還沒有ack,則該consume將block掉,直到有消息ack。

    c、參數3是global是什麼方式應用的,true表示channel的級別,false表示consume進行限制,true或者false是否將上面設置應用於channel簡單點說,就是上面限制是channel級別的還是consumer級別。

  3)、注意:prefetchSize和global這兩項,rabbitmq沒有實現,暫且不研究prefetch_count在no_ask=false的情況下生效,即在自動應答(自動簽收)的情況下這兩個值是不生效的。

  4)、消費端的限流策略,開發步驟如下所示:

第一步,限流方式,第一件事就是autoAck設置為false。     第二步,channel.basicQos(0,1,false);     第三步,手動確認,channel.basicAck(envelope.getDeliveryTag(),flase);參數1是deliveryTag,表示這一條消息已經處理完了,可以給一下條了,參數2是不批量簽收,我們一條一條進行簽收。

10、Rabbitmq的消費端ACK與重回隊列。

  答:消費端可以進行手動的ACK和NACK。區分與ack自動確認簽收,手動的ACK是代表消息確認了,消息已經收到了,確認了會給Broker端發送一個請求,說我已經收到了。手動NACK是代表了消息未進行確認,消息未收到或者處理失敗了,Broker端將未收到的消息重新發送一遍。消費端進行消費的時候,如果由於業務異常我們可以進行日誌的記錄,然後進行補償。如果由於服務器宕機等嚴重問題,那我們就需要手工進行ACK保障消費端消息成功。

  消費端的重回隊列,消息未被處理成功,將該消息重新發送給Broker,消費端重回隊列是為了對沒有處理成功的消息,把消息重新會遞給Broker。一般我們在實際應用中,都會關閉重回隊列,也就是設置為False。

11、RabbitMQ的TTL隊列或者消息,TTL是Time To Live的縮寫,也就是生存時間。

  RabbitMQ支持消息的過期時間,在消息發送的時候可以進行指定。RabbitMQ支持隊列的過期時間,從消息入隊列開始計算,只要超過了隊列的超時時間配置,那麼消息會自動的清除。

12、死信隊列,DLX,Dead-Letter-Exchange。

  1)、RabbitMQ的死信隊列是路由到交換機上面的,RabbitMQ的死信隊列是和Exchange、隊列息息相關的。利用DLX,當消息在一個隊列中變成死信(dead message,即這個消息沒有任何消費者消費)之後,它能被重新publish到另一個Exchange,這個Exchange就是DLX(死信隊列,Dead-Letter-Exchange)。   2)、消息變成死信有以下幾種情況。

    a、當消息被拒絕(basic.reject/basic.nack)並且requeue=false。     b、消息TTL(TTL是Time To Live的縮寫,也就是生存時間)過期。     c、當隊列達到了最大長度。

  3)、DLX也是一個正常的Exchange,和一般的Exchange沒有區別,它能在任何的隊列上被指定,實際上就是設置某個隊列的屬性。當這個隊列中有死信的時候,RabbitMQ就會自動的將這個消息重新發佈到設置的Exchange上去,進而被路由到另一個隊列。可以監聽這個隊列中消息做相應的處理,這個特性可以彌補RabbitMQ3.0以前支持的immediate參數的功能。

  4)、死信隊列設置,首先需要設置死信隊列的exchange和queue,然後進行綁定。

    第一步、Exchange:dlx.exchange。死信隊列就是一個正常的交換機Exchange。     第二步、Queue:dlx.queue。需要將隊列和這個交換機Exchange進行綁定。     第三步、RoutingKey:#。任何的路由key都可以進行綁定。然後我們進行正常聲明交換機,隊列,綁定,只不過我們需要在隊列加上一個參數即可:arguments.put("x-dead-letter-exchange","dlx.exchange");最後需要設置一個監聽來監聽這個隊列。這樣消息在過期、requeue、隊列在達到最大長度的時候,消息就可以直接路由到死信隊列了。