Flink源碼走讀(二):Flink+Kafka實現端到端Exactly Once語義
- 2020 年 2 月 14 日
- 筆記
一、前言
Flink通過Checkpoint機制實現了消息對狀態影響的Exactly Once語義,即每條消息只會影響Flink內部狀態有且只有一次。但無法保證輸出到Sink中的數據不重複。以圖一所示為例,Flink APP收到Source中的A消息,將其轉化為B消息輸出到Sink,APP在處理完A1後做了一次Checkpoint,假設APP在處理到A4時發生錯誤重啟,APP將會重新從A2開始消費並處理數據,就會導致B2和B3重複輸出到Sink中兩次。

本文中端到端Exactly Once的含義就是:Source的每條數據會被處理有且僅有一次,並且輸出到Sink中的結果也不重不漏。
要實現端到端Exactly Once語義需要藉助於Sink對消息事務的支援。好在Kafka在0.11版本中加入了對事務的支援,Flink使用Kafka的這個特性實現了端到端Exactly Once語義的數據處理。本文先簡單介紹Kafka的消息事務,然後對照源碼解讀下Flink是如何實現輸出消息不重不漏的。
二、Kafka消息事務
Kafka實現事務的出發點很簡單:允許Producer原子性的發布一組消息,即允許一組消息對Consumer要麼全部可見,要麼全部不可見,不會存在中間狀態。
首先介紹幾個Kafka消息事務中的幾個概念:
- Transaction Cordinator:Kafka事務的協調器,兩階段提交協調者,負責記錄當前正在執行的Transaction,寫Transaction log等。
- producer id(PID):用於標識執行事務的producer,由Transaction Cordinator分配,對Kafka客戶端使用者透明。在Kafka的事務中,同一個事務只能由一個producer操作,就像mysql事務中所有的sql命令都必須來自同一個客戶端連接一樣。但是這裡所說的「同一個producer」,並不是指同一個運行著producer的進程,而是持有相同PID的producer。例如,進程P1運行著一個Kafka producer,正在執行一個事務,它持有的PID是x,某一時刻進程P1意外終止,啟動了另一個進程P2作為Kafka producer,只要進程P2能獲取到x當做自己的PID(用相同的transactional id請求Transaction Cordinator),它就能繼續之前的事務。換言之,即使是同一個進程P1,在運行過程中改變自身的PID(改變transactional id請求Transaction Cordinator),也就無法執行之前的事務了。對於每個producer id還有一個epoch的概念,用來防止兩個進程同時操作同一個事務。
- transactional id:用於標識一個事務,需要客戶端使用者指定。客戶端調用InitPidRequest(TransactionalId, TransactionTimeoutMs)方法向Transaction Cordinator請求初始化PID,相同的transactional id會得到相同的PID,並且使PID的epoch加一,Kafka只接受具有最大epoch的producer生產的消息,拒絕其他具有相同PID的producer(殭屍實例)。開啟一個新的事務只需要生成一個未在使用中的transactional id即可,並沒有什麼特別的要求,後面我們會看到Flink Kafka Sink是如何生成transactional id的。
- Transaction Marker:消息隊列中用於標識事務開始結束的特殊控制消息。

圖二展示了2個Producer在向Kafka同一個Topic的同一個Partition寫入事務消息時,Kafka是如何存儲事務消息的。Producer 1調用BeginTransaction後開始向Topic中生產事務消息,當第一條消息m1到達broker時,Transaction 1便開始了,消息m1中會有一個PID欄位標識它是屬於Transaction 1的,後面的消息也是相同的道理。Producer 1和Producer 2在一段時間內均向該Topic寫入事務消息,消息便按照先後順序排列在消息隊列中。當Producer 1 Commit Transaction時,broker會向消息隊列中插入一條控制消息Commit T1(Transaction Marker),同理Producer 2 Abort Transaction時,broker會插入Abort T2的控制消息。通過控制消息,Consumer在順序消費的過程中,就知道每條消息是否應該可見。
以圖二為例,假設m1是該Partition的第一條消息,且只有Producer 1和Producer 2在寫入消息。在消息寫入到m11時,所有消息對於消費者都是不可見的,因為不確定T1和T2最後是Commit還是Abort。當Producer 1執行Commit後,m1對於消費者是可見,因為m1之前的所有消息都已經確定狀態了(只有m1一條消息),而由於m2並未確定狀態,因此m2後面的消息對於消費者都是不可見的。當Producer 2執行Abort後,m1、m3、m4、m11便對消費者可見了(因為m12之前的所有消息狀態都確定了),m2、m10、m12由於T2 Abort便會在消費的過程中被過濾掉,這種情況下Consumer消費出來消息的Offset便是不連續的。
Kafka事務消息寫入的方式可以擴展到多Topic、多Partition的寫入,只需要在Commit(Abort)時同時向所有涉及到的Partition寫入控制消息,只是多條控制消息的原子性寫入就是一個分散式事務問題了,因此Kafka採用了兩階段提交的方式實現事務。
三、Flink利用Kafka消息事務實現端到端Exactly Once語義
Flink實現內部狀態Exactly Once的語義基本原理是:隔一段時間做一個Checkpoint,持久化記錄當前上游Source處理到哪裡了(如Kafka offset),以及當時本地狀態的值,如果過了一會進程掛了,就把這個持久化保存的Checkpoint讀出來,載入當時的狀態,並從當時的位置重新開始處理,這樣每條消息一定只會影響自身狀態一次。但這種方式是沒辦法保證輸出到下游Sink的數據不重複的。要想下游輸出的消息不重,就需要下游Sink支援事務消息,把兩次checkpoint之間輸出的消息當做一個事務提交,如果新的checkpoint成功,則Commit,否則Abort。這樣實現就解決了圖一中B2和B3重複輸出的問題,執行到A4時出錯重啟,由於還未產生新的Checkpoint,紅色B2和B3所在的Transaction不會Commit,也就對下游消費者不可見。
1. TwoPhaseCommitSinkFunction
首先,我們簡單回顧下Flink做checkpoint的流程:當Checkpoint過程開始時,JobManager會向數據流中插入一個Checkpoint barrier,下游的運算元收到checkpoint barrier就對本運算元的狀態做Checkpoint,這樣就保證所有運算元在checkpoint中的狀態是同步的。每個運算元在Checkpoint完成之後會告知JobManager,JobManager在所有運算元完成Checkpoint之後,會向所有運算元推送一個NotifyCheckpointComplete消息。
Flink依賴下游Sink對事務的支援,實現端到端Exactly Once語義,而兩階段提交是解決分散式事務問題一個比較通用的解決方案,因此Flink抽象出了TwoPhaseCommitSinkFunction這個類來完成向下游Sink做兩階段提交的工作。
下面,我們具體來看下TwoPhaseCommitSinkFunction是如何工作的。

TwoPhaseCommitSinkFunction在收到Checkpoint barrier,開始做自身Checkpoint之前,對Sink做pre-commit,在整個系統製作Checkpoint的同時讓下游Sink開始執行預提交;同時對Sink做一個begin transaction,開啟下一個事務,由於在製作Checkpoint的過程中,Flink仍然可以繼續處理後面的消息,這樣就能保證後續消息在下一個事務周期中;完成自身Checkpoint後,收到JobManager發來的NotifyCheckpointComplete消息時,對Sink做commit,完成兩階段提交的過程,此時這個周期發送的數據才會對下游的消費者可見。
對著程式碼我們再來過一下這個過程:
@Override public void snapshotState(FunctionSnapshotContext context) throws Exception { // this is like the pre-commit of a 2-phase-commit transaction // we are ready to commit and remember the transaction checkState(currentTransactionHolder != null, "bug: no transaction object when performing state snapshot"); long checkpointId = context.getCheckpointId(); LOG.debug("{} - checkpoint {} triggered, flushing transaction '{}'", name(), context.getCheckpointId(), currentTransactionHolder); preCommit(currentTransactionHolder.handle); pendingCommitTransactions.put(checkpointId, currentTransactionHolder); LOG.debug("{} - stored pending transactions {}", name(), pendingCommitTransactions); currentTransactionHolder = beginTransactionInternal(); LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder); state.clear(); state.add(new State<>( this.currentTransactionHolder, new ArrayList<>(pendingCommitTransactions.values()), userContext)); }
snapshotState繼承自CheckpointedFunction介面,也就是收到checkpoint barrier後,執行自身狀態checkpoint的函數。可以看到,首先對currentTransaction執行了Pre-Commit,並將currentTransaction放入pendingCommitTransactions中,同時開啟了新的Transaction作為currentTransaction,最後將currentTransaction和pendingCommitTransactions都作為自身狀態放入checkpoint中(這裡將事務資訊也放入狀態中,可以保證從Checkpoint恢復時能繼續之前的事務)。
@Override public final void notifyCheckpointComplete(long checkpointId) throws Exception { // the following scenarios are possible here // // (1) there is exactly one transaction from the latest checkpoint that // was triggered and completed. That should be the common case. // Simply commit that transaction in that case. // // (2) there are multiple pending transactions because one previous // checkpoint was skipped. That is a rare case, but can happen // for example when: // // - the master cannot persist the metadata of the last // checkpoint (temporary outage in the storage system) but // could persist a successive checkpoint (the one notified here) // // - other tasks could not persist their status during // the previous checkpoint, but did not trigger a failure because they // could hold onto their state and could successfully persist it in // a successive checkpoint (the one notified here) // // In both cases, the prior checkpoint never reach a committed state, but // this checkpoint is always expected to subsume the prior one and cover all // changes since the last successful one. As a consequence, we need to commit // all pending transactions. // // (3) Multiple transactions are pending, but the checkpoint complete notification // relates not to the latest. That is possible, because notification messages // can be delayed (in an extreme case till arrive after a succeeding checkpoint // was triggered) and because there can be concurrent overlapping checkpoints // (a new one is started before the previous fully finished). // // ==> There should never be a case where we have no pending transaction here // Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator = pendingCommitTransactions.entrySet().iterator(); checkState(pendingTransactionIterator.hasNext(), "checkpoint completed, but no transaction pending"); Throwable firstError = null; while (pendingTransactionIterator.hasNext()) { Map.Entry<Long, TransactionHolder<TXN>> entry = pendingTransactionIterator.next(); Long pendingTransactionCheckpointId = entry.getKey(); TransactionHolder<TXN> pendingTransaction = entry.getValue(); if (pendingTransactionCheckpointId > checkpointId) { continue; } LOG.info("{} - checkpoint {} complete, committing transaction {} from checkpoint {}", name(), checkpointId, pendingTransaction, pendingTransactionCheckpointId); logWarningIfTimeoutAlmostReached(pendingTransaction); try { commit(pendingTransaction.handle); } catch (Throwable t) { if (firstError == null) { firstError = t; } } LOG.debug("{} - committed checkpoint transaction {}", name(), pendingTransaction); pendingTransactionIterator.remove(); } if (firstError != null) { throw new FlinkRuntimeException("Committing one of transactions failed, logging first encountered failure", firstError); } }
notifyCheckpointComplete介面繼承自CheckpointListener,就是收到JobManager發送的NotifyCheckpointComplete消息時執行的函數。這個函數就是簡單的將所有pendingTransactions Commit掉。函數的注釋解釋了一般情況下pendingTransactions應該只有一個,即剛觸發的snapshotState中Pre-Commit的pendingTransaction,但也有可能出現多個pendingTransactions的情況,比如上一次checkpoint之後的NotifyCheckpointComplete消息晚到了的情況。
// ------ methods that should be implemented in child class to support two phase commit algorithm ------ /** * Write value within a transaction. */ protected abstract void invoke(TXN transaction, IN value, Context context) throws Exception; /** * Method that starts a new transaction. * * @return newly created transaction. */ protected abstract TXN beginTransaction() throws Exception; /** * Pre commit previously created transaction. Pre commit must make all of the necessary steps to prepare the * transaction for a commit that might happen in the future. After this point the transaction might still be * aborted, but underlying implementation must ensure that commit calls on already pre committed transactions * will always succeed. * * <p>Usually implementation involves flushing the data. */ protected abstract void preCommit(TXN transaction) throws Exception; /** * Commit a pre-committed transaction. If this method fail, Flink application will be * restarted and {@link TwoPhaseCommitSinkFunction#recoverAndCommit(Object)} will be called again for the * same transaction. */ protected abstract void commit(TXN transaction); /** * Abort a transaction. */ protected abstract void abort(TXN transaction);
TwoPhaseCommitSinkFunction保留了5個函數需要子類去實現:
- invoke:定義了作為sink如何寫數據到外部系統。每一個sink都需要定義invoke函數,sink運算元每收到一條數據都會觸發一次invoke函數,這裡的sink函數只是多了一個transaction入參。
- beginTransaction、preCommit、commit、abort:兩階段提交協議的幾個步驟。如果外部系統本身支援兩階段提交(如Kafka),這些函數的實現就是調用外部系統兩階段提交協議對應的函數。
2. FlinkKafkaProducer011
了解了TwoPhaseCommitSinkFunction再來看FlinkKafkaProducer011就簡單多了。
@Override public void invoke(KafkaTransactionState transaction, IN next, Context context) throws FlinkKafka011Exception { checkErroneous(); byte[] serializedKey = schema.serializeKey(next); byte[] serializedValue = schema.serializeValue(next); String targetTopic = schema.getTargetTopic(next); if (targetTopic == null) { targetTopic = defaultTopicId; } Long timestamp = null; if (this.writeTimestampToKafka) { timestamp = context.timestamp(); } ProducerRecord<byte[], byte[]> record; int[] partitions = topicPartitionsMap.get(targetTopic); if (null == partitions) { partitions = getPartitionsByTopic(targetTopic, transaction.producer); topicPartitionsMap.put(targetTopic, partitions); } if (flinkKafkaPartitioner != null) { record = new ProducerRecord<>( targetTopic, flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions), timestamp, serializedKey, serializedValue); } else { record = new ProducerRecord<>(targetTopic, null, timestamp, serializedKey, serializedValue); } pendingRecords.incrementAndGet(); transaction.producer.send(record, callback); }
FlinkKafkaProducer011中實現的invoke函數就是將輸入的消息(next)構造為一個Kafka record,並調用Kafka客戶端的send方法發送出去。
@Override protected KafkaTransactionState beginTransaction() throws FlinkKafka011Exception { switch (semantic) { case EXACTLY_ONCE: FlinkKafkaProducer<byte[], byte[]> producer = createTransactionalProducer(); producer.beginTransaction(); return new KafkaTransactionState(producer.getTransactionalId(), producer); case AT_LEAST_ONCE: case NONE: // Do not create new producer on each beginTransaction() if it is not necessary final KafkaTransactionState currentTransaction = currentTransaction(); if (currentTransaction != null && currentTransaction.producer != null) { return new KafkaTransactionState(currentTransaction.producer); } return new KafkaTransactionState(initNonTransactionalProducer(true)); default: throw new UnsupportedOperationException("Not implemented semantic"); } } @Override protected void preCommit(KafkaTransactionState transaction) throws FlinkKafka011Exception { switch (semantic) { case EXACTLY_ONCE: case AT_LEAST_ONCE: flush(transaction); break; case NONE: break; default: throw new UnsupportedOperationException("Not implemented semantic"); } checkErroneous(); } @Override protected void commit(KafkaTransactionState transaction) { if (transaction.isTransactional()) { try { transaction.producer.commitTransaction(); } finally { recycleTransactionalProducer(transaction.producer); } } } @Override protected void abort(KafkaTransactionState transaction) { if (transaction.isTransactional()) { transaction.producer.abortTransaction(); recycleTransactionalProducer(transaction.producer); } }
FlinkKafkaProducer011中實現的beginTransaction、preCommit、commit、abort主要就是調用Kafka Producer客戶端對應的兩階段提交協議的函數。另外值得注意的有2點:
- 在preCommit函數中調用了flush方法。從TwoPhaseCommitSinkFunction的分析中可以看到preCommit是在snapshotState方法中調用的,而snapshotState方法是在運算元Checkpoint的時候觸發的。這樣就保證了運算元在做Checkpoint時,所有該Checkpoint之前的數據都已經安全的發送到了下游(而不是在快取中)。以圖三為例,sink運算元在收到第一個Checkpoint barrier時觸發Checkpoint操作,而在Checkpoint完成之前,必須保證m1-m5這5條消息都已經發送到了下游,否則如果Checkpoint完成,而m1-m5中有消息沒有送達,就會發生消息丟失。在snapshotState方法中保證快取中的數據都已經發送出去是一個很通用的做法,在自己實現訂製化SinkFunction時也要注意。這裡的flush方法最終調用的是Kafka Producer客戶端的flush方法,這是一個阻塞的方法,會等到所有快取中的消息真正發給Kafka才返回,所以有時看到Checkpoint時間有毛刺,也可能是受這個flush的影響。
- 在beginTransaction里調用了getTransactionalId,在commit和abort中調用了recycleTransactionalProducer。這裡可以回顧下第二部分中提到的如何生成Kafka transactional id的問題,看一下Flink是如何產生這個id的。從下面的程式碼中可以看出Flink用一個隊列作為transactional id的Pool,新的Transaction開始時從隊頭拿出一個transactional id,Transaction結束時將transactional id放回隊尾。因為每開始一個Transaction,都會構造一個新的Kafka Producer,因此availableTransactionalIds初始的大小就是配置的Kafka Producer Pool Size(默認是5)。
/** * Pool of available transactional ids. */ private final BlockingDeque<String> availableTransactionalIds = new LinkedBlockingDeque<>(); /** * For each checkpoint we create new {@link FlinkKafkaProducer} so that new transactions will not clash * with transactions created during previous checkpoints ({@code producer.initTransactions()} assures that we * obtain new producerId and epoch counters). */ private FlinkKafkaProducer<byte[], byte[]> createTransactionalProducer() throws FlinkKafka011Exception { String transactionalId = availableTransactionalIds.poll(); if (transactionalId == null) { throw new FlinkKafka011Exception( FlinkKafka011ErrorCode.PRODUCERS_POOL_EMPTY, "Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpoints."); } FlinkKafkaProducer<byte[], byte[]> producer = initTransactionalProducer(transactionalId, true); producer.initTransactions(); return producer; } private void recycleTransactionalProducer(FlinkKafkaProducer<byte[], byte[]> producer) { availableTransactionalIds.add(producer.getTransactionalId()); producer.close(); }
四、總結
Flink使用Kafka的消息事務實現的端到端Exactly Once消息送達,其實是一個比較通用的解決方案,了解了其原理,可以很快將這種方案套用到其他支援事務的外部存儲或消息隊列。
Flink使用Kafka事務的方式,對於業務開發中正確使用Kafka也是一個很好的demo,在其他工程中使用Kafka實現消息的強一致性,也可以借鑒Flink的程式碼。
參考文獻
1 Kafka 設計解析(八):Kafka 事務機制與 Exactly Once 語義實現原理。https://www.infoq.cn/article/kafka-analysis-part-8
2 Exactly Once Delivery and Transactional Messaging in Kafka. https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit#
3 An Overview of End-to-End Exactly-Once Processing in Apache Flink® (with Apache Kafka, too!). https://www.ververica.com/blog/end-to-end-exactly-once-processing-apache-flink-apache-kafka