kafka實現無消息丟失與精確一次語義(exactly once)處理

  • 2020 年 3 月 16 日
  • 筆記

在很多的流處理框架的介紹中,都會說kafka是一個可靠的數據源,並且推薦使用Kafka當作數據源來進行使用。這是因為與其他消息引擎系統相比,kafka提供了可靠的數據保存及備份機制。並且通過消費者位移這一概念,可以讓消費者在因某些原因宕機而重啟後,可以輕易得回到宕機前的位置。

但其實kafka的可靠性也只能說是相對的,在整條數據鏈條中,總有可以讓數據出現丟失的情況,今天就來討論如何避免kafka數據丟失,以及實現精確一致處理的語義。

kafka無消息丟失處理

在討論如何實現kafka無消息丟失的時候,首先要先清楚大部分情況下消息丟失是在什麼情況下發生的。為什麼是大部分,因為總有一些非常特殊的情況會被人忽略,而我們只需要關注普遍的情況就足夠了。接下來我們來討論如何較為普遍的數據丟失情況。

1.1 生產者丟失

前面介紹Kafka分區和副本的時候,有提到過一個producer客戶端有一個acks的配置,這個配置為0的時候,producer是發送之後不管的,這個時候就很有可能因為網路等原因造成數據丟失,所以應該盡量避免。但是將ack設置為1就沒問題了嗎,那也不一定,因為有可能在leader副本接收到數據,但還沒同步給其他副本的時候就掛掉了,這時候數據也是丟失了。並且這種時候是客戶端以為消息發送成功,但kafka丟失了數據。

要達到最嚴格的無消息丟失配置,應該是要將acks的參數設置為-1(也就是all),並且將min.insync.replicas配置項調高到大於1,這部分內容在上一篇副本機制有介紹詳細解析kafka之kafka分區和副本

同時還需要使用帶有回調的producer api,來發送數據。注意這裡討論的都是非同步發送消息,同步發送不在討論範圍。

public class send{      ......      public static void main(){          ...          /*          *  第一個參數是 ProducerRecord 類型的對象,封裝了目標 Topic,消息的 kv          *  第二個參數是一個 CallBack 對象,當生產者接收到 Kafka 發來的 ACK 確認消息的時候,          *  會調用此 CallBack 對象的 onCompletion() 方法,實現回調功能          */           producer.send(new ProducerRecord<>(topic, messageNo, messageStr),                          new DemoCallBack(startTime, messageNo, messageStr));          ...      }      ......  }    class DemoCallBack implements Callback {      /* 開始發送消息的時間戳 */      private final long startTime;      private final int key;      private final String message;        public DemoCallBack(long startTime, int key, String message) {          this.startTime = startTime;          this.key = key;          this.message = message;      }        /**       * 生產者成功發送消息,收到 Kafka 服務端發來的 ACK 確認消息後,會調用此回調函數       * @param metadata 生產者發送的消息的元數據,如果發送過程中出現異常,此參數為 null       * @param exception 發送過程中出現的異常,如果發送成功為 null       */      @Override      public void onCompletion(RecordMetadata metadata, Exception exception) {          long elapsedTime = System.currentTimeMillis() - startTime;          if (metadata != null) {              System.out.printf("message: (%d, %s) send to partition %d, offset: %d, in %dn",                      key, message, metadata.partition(), metadata.offset(), elapsedTime);          } else {              exception.printStackTrace();          }      }  }

更詳細的程式碼可以參考這裡:Kafka生產者分析——KafkaProducer

我們之前提到過,producer發送到kafka broker的時候,是有多種可能會失敗的,而回調函數能準確告訴你是否確認發送成功,當然這依託於acks和min.insync.replicas的配置。而當數據發送丟失的時候,就可以進行手動重發或其他操作,從而確保生產者發送成功。

1.2 kafka內部丟失

有些時候,kafka內部因為一些不大好的配置,可能會出現一些極為隱蔽的數據丟失情況,那麼我們分別討論下大致都有哪幾種情況。

首先是replication.factor配置參數,這個配置決定了副本的數量,默認是1。注意這個參數不能超過broker的數量。說這個參數其實是因為如果使用默認的1,或者不在創建topic的時候指定副本數量(也就是副本數為1),那麼當一台機器出現磁碟損壞等情況,那麼數據也就從kafka裡面丟失了。所以replication.factor這個參數最好是配置大於1,比如說3

接下來要說的還是和副本相關的,也是上一篇副本中提到的unclean.leader.election.enable 參數,這個參數是在主副本掛掉,然後在ISR集合中沒有副本可以成為leader的時候,要不要讓進度比較慢的副本成為leader的。不用多說,讓進度比較慢的副本成為leader,肯定是要丟數據的。雖然可能會提高一些可用性,但如果你的業務場景丟失數據更加不能忍受,那還是將unclean.leader.election.enable設置為false吧

1.3 消費者丟失

消費者丟失的情況,其實跟消費者位移處理不當有關。消費者位移提交有一個參數,enable.auto.commit,默認是true,決定是否要讓消費者自動提交位移。如果開啟,那麼consumer每次都是先提交位移,再進行消費,比如先跟broker說這5個數據我消費好了,然後才開始慢慢消費這5個數據。

這樣處理的話,好處是簡單,壞處就是漏消費數據,比如你說要消費5個數據,消費了2個自己就掛了。那下次該consumer重啟後,在broker的記錄中這個consumer是已經消費了5個的。

所以最好的做法就是將enable.auto.commit設置為false,改為手動提交位移,在每次消費完之後再手動提交位移資訊。當然這樣又有可能會重複消費數據,畢竟exactly once處理一直是一個問題呀(/攤手)。遺憾的是kafka目前沒有保證consumer冪等消費的措施,如果確實需要保證consumer的冪等,可以對每條消息維持一個全局的id,每次消費進行去重,當然耗費這麼多的資源來實現exactly once的消費到底值不值,那就得看具體業務了。

1.4 無消息丟失小結

那麼到這裡先來總結下無消息丟失的主要配置吧:

  • producer的acks設置位-1,同時min.insync.replicas設置大於1。並且使用帶有回調的producer api發生消息。
  • 默認副本數replication.factor設置為大於1,或者創建topic的時候指定大於1的副本數。
  • unclean.leader.election.enable 設置為false,防止定期副本leader重選舉
  • 消費者端,自動提交位移enable.auto.commit設置為false。在消費完後手動提交位移。

那麼接下來就來說說kafka實現精確一次(exactly once)處理的方法吧。

實現精確一次(exactly once)處理

在分散式環境下,要實現消息一致與精確一次(exactly once)語義處理是很難的。精確一次處理意味著一個消息只處理一次,造成一次的效果,不能多也不能少。

那麼kafka如何能夠實現這樣的效果呢?在介紹之前,我們先來介紹其他兩個語義,至多一次(at most once)和至少一次(at least once)。

最多一次和至少一次

最多一次就是保證一條消息只發送一次,這個其實最簡單,非同步發送一次然後不管就可以,缺點是容易丟數據,所以一般不採用。

至少一次語義是kafka默認提供的語義,它保證每條消息都能至少接收並處理一次,缺點是可能有重複數據。

前面有介紹過acks機制,當設置producer客戶端的acks是1的時候,broker接收到消息就會跟producer確認。但producer發送一條消息後,可能因為網路原因消息超時未達,這時候producer客戶端會選擇重發,broker回應接收到消息,但很可能最開始發送的消息延遲到達,就會造成消息重複接收。

那麼針對這些情況,要如何實現精確一次處理的語義呢?

冪等的producer

要介紹冪等的producer之前,得先了解一下冪等這個詞是什麼意思。冪等這個詞最早起源於函數式編程,意思是一個函數無論執行多少次都會返回一樣的結果。比如說讓一個數加1就不是冪等的,而讓一個數取整就是冪等的。因為這個特性所以冪等的函數適用於並發的場景下。

但冪等在分散式系統中含義又做了進一步的延申,比如在kafka中,冪等性意味著一個消息無論重複多少次,都會被當作一個消息來持久化處理。

kafka的producer默認是支援最少一次語義,也就是說不是冪等的,這樣在一些比如支付等要求精確數據的場景會出現問題,在0.11.0後,kafka提供了讓producer支援冪等的配置操作。即:

props.put("enable.idempotence", ture)

在創建producer客戶端的時候,添加這一行配置,producer就變成冪等的了。注意開啟冪等性的時候,acks就自動是「all」了,如果這時候手動將ackss設置為0,那麼會報錯。

而底層實現其實也很簡單,就是對每條消息生成一個id值,broker會根據這個id值進行去重,從而實現冪等,這樣一來就能夠實現精確一次的語義了。

但是!冪等的producery也並非萬能。有兩個主要是缺陷:

  • 冪等性的producer僅做到單分區上的冪等性,即單分區消息不重複,多分區無法保證冪等性。
  • 只能保持單會話的冪等性,無法實現跨會話的冪等性,也就是說如果producer掛掉再重啟,無法保證兩個會話間的冪等(新會話可能會重發)。因為broker端無法獲取之前的狀態資訊,所以無法實現跨會話的冪等。

事務的producer

當遇到上述冪等性的缺陷無法解決的時候,可以考慮使用事務了。事務可以支援多分區的數據完整性,原子性。並且支援跨會話的exactly once處理語義,也就是說如果producer宕機重啟,依舊能保證數據只處理一次。

開啟事務也很簡單,首先需要開啟冪等性,即設置enable.idempotence為true。然後對producer發送程式碼做一些小小的修改。

//初始化事務  producer.initTransactions();  try {      //開啟一個事務      producer.beginTransaction();      producer.send(record1);      producer.send(record2);      //提交      producer.commitTransaction();  } catch (KafkaException e) {      //出現異常的時候,終止事務      producer.abortTransaction();  }  

但無論開啟冪等還是事務的特性,都會對性能有一定影響,這是必然的。所以kafka默認也並沒有開啟這兩個特性,而是交由開發者根據自身業務特點進行處理。

以上~


推薦閱讀:
分散式系統一致性問題與Raft演算法(上)
Scala函數式編程(五) 函數式的錯誤處理
大數據存儲的進化史 –從 RAID 到 Hadoop Hdfs