Flink源碼走讀(二):Flink+Kafka實現端到端Exactly Once語義

  • 2020 年 2 月 14 日
  • 筆記

一、前言

Flink通過Checkpoint機制實現了消息對狀態影響的Exactly Once語義,即每條消息只會影響Flink內部狀態有且只有一次。但無法保證輸出到Sink中的數據不重複。以圖一所示為例,Flink APP收到Source中的A消息,將其轉化為B消息輸出到Sink,APP在處理完A1後做了一次Checkpoint,假設APP在處理到A4時發生錯誤重啟,APP將會重新從A2開始消費並處理數據,就會導致B2和B3重複輸出到Sink中兩次。

圖一 Flink輸出消息重複

本文中端到端Exactly Once的含義就是:Source的每條數據會被處理有且僅有一次,並且輸出到Sink中的結果也不重不漏

要實現端到端Exactly Once語義需要藉助於Sink對消息事務的支援。好在Kafka在0.11版本中加入了對事務的支援,Flink使用Kafka的這個特性實現了端到端Exactly Once語義的數據處理。本文先簡單介紹Kafka的消息事務,然後對照源碼解讀下Flink是如何實現輸出消息不重不漏的。

二、Kafka消息事務

Kafka實現事務的出發點很簡單:允許Producer原子性的發布一組消息,即允許一組消息對Consumer要麼全部可見,要麼全部不可見,不會存在中間狀態。

首先介紹幾個Kafka消息事務中的幾個概念:

  • Transaction Cordinator:Kafka事務的協調器,兩階段提交協調者,負責記錄當前正在執行的Transaction,寫Transaction log等。
  • producer id(PID):用於標識執行事務的producer,由Transaction Cordinator分配,對Kafka客戶端使用者透明。在Kafka的事務中,同一個事務只能由一個producer操作,就像mysql事務中所有的sql命令都必須來自同一個客戶端連接一樣。但是這裡所說的「同一個producer」,並不是指同一個運行著producer的進程,而是持有相同PID的producer。例如,進程P1運行著一個Kafka producer,正在執行一個事務,它持有的PID是x,某一時刻進程P1意外終止,啟動了另一個進程P2作為Kafka producer,只要進程P2能獲取到x當做自己的PID(用相同的transactional id請求Transaction Cordinator),它就能繼續之前的事務。換言之,即使是同一個進程P1,在運行過程中改變自身的PID(改變transactional id請求Transaction Cordinator),也就無法執行之前的事務了。對於每個producer id還有一個epoch的概念,用來防止兩個進程同時操作同一個事務。
  • transactional id:用於標識一個事務,需要客戶端使用者指定。客戶端調用InitPidRequest(TransactionalId, TransactionTimeoutMs)方法向Transaction Cordinator請求初始化PID,相同的transactional id會得到相同的PID,並且使PID的epoch加一,Kafka只接受具有最大epoch的producer生產的消息,拒絕其他具有相同PID的producer(殭屍實例)。開啟一個新的事務只需要生成一個未在使用中的transactional id即可,並沒有什麼特別的要求,後面我們會看到Flink Kafka Sink是如何生成transactional id的。
  • Transaction Marker:消息隊列中用於標識事務開始結束的特殊控制消息。
圖二 Kafka中消息存儲

圖二展示了2個Producer在向Kafka同一個Topic的同一個Partition寫入事務消息時,Kafka是如何存儲事務消息的。Producer 1調用BeginTransaction後開始向Topic中生產事務消息,當第一條消息m1到達broker時,Transaction 1便開始了,消息m1中會有一個PID欄位標識它是屬於Transaction 1的,後面的消息也是相同的道理。Producer 1和Producer 2在一段時間內均向該Topic寫入事務消息,消息便按照先後順序排列在消息隊列中。當Producer 1 Commit Transaction時,broker會向消息隊列中插入一條控制消息Commit T1(Transaction Marker),同理Producer 2 Abort Transaction時,broker會插入Abort T2的控制消息。通過控制消息,Consumer在順序消費的過程中,就知道每條消息是否應該可見。

以圖二為例,假設m1是該Partition的第一條消息,且只有Producer 1和Producer 2在寫入消息。在消息寫入到m11時,所有消息對於消費者都是不可見的,因為不確定T1和T2最後是Commit還是Abort。當Producer 1執行Commit後,m1對於消費者是可見,因為m1之前的所有消息都已經確定狀態了(只有m1一條消息),而由於m2並未確定狀態,因此m2後面的消息對於消費者都是不可見的。當Producer 2執行Abort後,m1、m3、m4、m11便對消費者可見了(因為m12之前的所有消息狀態都確定了),m2、m10、m12由於T2 Abort便會在消費的過程中被過濾掉,這種情況下Consumer消費出來消息的Offset便是不連續的。

Kafka事務消息寫入的方式可以擴展到多Topic、多Partition的寫入,只需要在Commit(Abort)時同時向所有涉及到的Partition寫入控制消息,只是多條控制消息的原子性寫入就是一個分散式事務問題了,因此Kafka採用了兩階段提交的方式實現事務。

三、Flink利用Kafka消息事務實現端到端Exactly Once語義

Flink實現內部狀態Exactly Once的語義基本原理是:隔一段時間做一個Checkpoint,持久化記錄當前上游Source處理到哪裡了(如Kafka offset),以及當時本地狀態的值,如果過了一會進程掛了,就把這個持久化保存的Checkpoint讀出來,載入當時的狀態,並從當時的位置重新開始處理,這樣每條消息一定只會影響自身狀態一次。但這種方式是沒辦法保證輸出到下游Sink的數據不重複的。要想下游輸出的消息不重,就需要下游Sink支援事務消息,把兩次checkpoint之間輸出的消息當做一個事務提交,如果新的checkpoint成功,則Commit,否則Abort。這樣實現就解決了圖一中B2和B3重複輸出的問題,執行到A4時出錯重啟,由於還未產生新的Checkpoint,紅色B2和B3所在的Transaction不會Commit,也就對下游消費者不可見。

1. TwoPhaseCommitSinkFunction

首先,我們簡單回顧下Flink做checkpoint的流程:當Checkpoint過程開始時,JobManager會向數據流中插入一個Checkpoint barrier,下游的運算元收到checkpoint barrier就對本運算元的狀態做Checkpoint,這樣就保證所有運算元在checkpoint中的狀態是同步的。每個運算元在Checkpoint完成之後會告知JobManager,JobManager在所有運算元完成Checkpoint之後,會向所有運算元推送一個NotifyCheckpointComplete消息。

Flink依賴下游Sink對事務的支援,實現端到端Exactly Once語義,而兩階段提交是解決分散式事務問題一個比較通用的解決方案,因此Flink抽象出了TwoPhaseCommitSinkFunction這個類來完成向下游Sink做兩階段提交的工作。

下面,我們具體來看下TwoPhaseCommitSinkFunction是如何工作的。

圖三 TwoPhaseCommitSinkFunction工作過程

TwoPhaseCommitSinkFunction在收到Checkpoint barrier,開始做自身Checkpoint之前,對Sink做pre-commit,在整個系統製作Checkpoint的同時讓下游Sink開始執行預提交;同時對Sink做一個begin transaction,開啟下一個事務,由於在製作Checkpoint的過程中,Flink仍然可以繼續處理後面的消息,這樣就能保證後續消息在下一個事務周期中;完成自身Checkpoint後,收到JobManager發來的NotifyCheckpointComplete消息時,對Sink做commit,完成兩階段提交的過程,此時這個周期發送的數據才會對下游的消費者可見。

對著程式碼我們再來過一下這個過程:

@Override  public void snapshotState(FunctionSnapshotContext context) throws Exception {  	// this is like the pre-commit of a 2-phase-commit transaction  	// we are ready to commit and remember the transaction    	checkState(currentTransactionHolder != null, "bug: no transaction object when performing state snapshot");    	long checkpointId = context.getCheckpointId();  	LOG.debug("{} - checkpoint {} triggered, flushing transaction '{}'", name(), context.getCheckpointId(), currentTransactionHolder);    	preCommit(currentTransactionHolder.handle);  	pendingCommitTransactions.put(checkpointId, currentTransactionHolder);  	LOG.debug("{} - stored pending transactions {}", name(), pendingCommitTransactions);    	currentTransactionHolder = beginTransactionInternal();  	LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder);    	state.clear();  	state.add(new State<>(  		this.currentTransactionHolder,  		new ArrayList<>(pendingCommitTransactions.values()),  		userContext));  }

snapshotState繼承自CheckpointedFunction介面,也就是收到checkpoint barrier後,執行自身狀態checkpoint的函數。可以看到,首先對currentTransaction執行了Pre-Commit,並將currentTransaction放入pendingCommitTransactions中,同時開啟了新的Transaction作為currentTransaction,最後將currentTransaction和pendingCommitTransactions都作為自身狀態放入checkpoint中(這裡將事務資訊也放入狀態中,可以保證從Checkpoint恢復時能繼續之前的事務)。

@Override  public final void notifyCheckpointComplete(long checkpointId) throws Exception {  	// the following scenarios are possible here  	//  	//  (1) there is exactly one transaction from the latest checkpoint that  	//      was triggered and completed. That should be the common case.  	//      Simply commit that transaction in that case.  	//  	//  (2) there are multiple pending transactions because one previous  	//      checkpoint was skipped. That is a rare case, but can happen  	//      for example when:  	//  	//        - the master cannot persist the metadata of the last  	//          checkpoint (temporary outage in the storage system) but  	//          could persist a successive checkpoint (the one notified here)  	//  	//        - other tasks could not persist their status during  	//          the previous checkpoint, but did not trigger a failure because they  	//          could hold onto their state and could successfully persist it in  	//          a successive checkpoint (the one notified here)  	//  	//      In both cases, the prior checkpoint never reach a committed state, but  	//      this checkpoint is always expected to subsume the prior one and cover all  	//      changes since the last successful one. As a consequence, we need to commit  	//      all pending transactions.  	//  	//  (3) Multiple transactions are pending, but the checkpoint complete notification  	//      relates not to the latest. That is possible, because notification messages  	//      can be delayed (in an extreme case till arrive after a succeeding checkpoint  	//      was triggered) and because there can be concurrent overlapping checkpoints  	//      (a new one is started before the previous fully finished).  	//  	// ==> There should never be a case where we have no pending transaction here  	//  	Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator = pendingCommitTransactions.entrySet().iterator();  	checkState(pendingTransactionIterator.hasNext(), "checkpoint completed, but no transaction pending");  	Throwable firstError = null;    	while (pendingTransactionIterator.hasNext()) {  		Map.Entry<Long, TransactionHolder<TXN>> entry = pendingTransactionIterator.next();  		Long pendingTransactionCheckpointId = entry.getKey();  		TransactionHolder<TXN> pendingTransaction = entry.getValue();  		if (pendingTransactionCheckpointId > checkpointId) {  			continue;  		}    		LOG.info("{} - checkpoint {} complete, committing transaction {} from checkpoint {}",  			name(), checkpointId, pendingTransaction, pendingTransactionCheckpointId);    		logWarningIfTimeoutAlmostReached(pendingTransaction);  		try {  			commit(pendingTransaction.handle);  		} catch (Throwable t) {  			if (firstError == null) {  					firstError = t;  			}  		}    		LOG.debug("{} - committed checkpoint transaction {}", name(), pendingTransaction);    		pendingTransactionIterator.remove();  	}    	if (firstError != null) {  		throw new FlinkRuntimeException("Committing one of transactions failed, logging first encountered failure",  			firstError);  	}  }

notifyCheckpointComplete介面繼承自CheckpointListener,就是收到JobManager發送的NotifyCheckpointComplete消息時執行的函數。這個函數就是簡單的將所有pendingTransactions Commit掉。函數的注釋解釋了一般情況下pendingTransactions應該只有一個,即剛觸發的snapshotState中Pre-Commit的pendingTransaction,但也有可能出現多個pendingTransactions的情況,比如上一次checkpoint之後的NotifyCheckpointComplete消息晚到了的情況。

// ------ methods that should be implemented in child class to support two phase commit algorithm ------    /**   * Write value within a transaction.   */  protected abstract void invoke(TXN transaction, IN value, Context context) throws Exception;    /**   * Method that starts a new transaction.   *   * @return newly created transaction.   */  protected abstract TXN beginTransaction() throws Exception;    /**   * Pre commit previously created transaction. Pre commit must make all of the necessary steps to prepare the   * transaction for a commit that might happen in the future. After this point the transaction might still be   * aborted, but underlying implementation must ensure that commit calls on already pre committed transactions   * will always succeed.   *   * <p>Usually implementation involves flushing the data.   */  protected abstract void preCommit(TXN transaction) throws Exception;    /**   * Commit a pre-committed transaction. If this method fail, Flink application will be   * restarted and {@link TwoPhaseCommitSinkFunction#recoverAndCommit(Object)} will be called again for the   * same transaction.   */  protected abstract void commit(TXN transaction);    /**   * Abort a transaction.   */  protected abstract void abort(TXN transaction);

TwoPhaseCommitSinkFunction保留了5個函數需要子類去實現:

  • invoke:定義了作為sink如何寫數據到外部系統。每一個sink都需要定義invoke函數,sink運算元每收到一條數據都會觸發一次invoke函數,這裡的sink函數只是多了一個transaction入參。
  • beginTransaction、preCommit、commit、abort:兩階段提交協議的幾個步驟。如果外部系統本身支援兩階段提交(如Kafka),這些函數的實現就是調用外部系統兩階段提交協議對應的函數。

2. FlinkKafkaProducer011

了解了TwoPhaseCommitSinkFunction再來看FlinkKafkaProducer011就簡單多了。

@Override  public void invoke(KafkaTransactionState transaction, IN next, Context context) throws FlinkKafka011Exception {  	checkErroneous();    	byte[] serializedKey = schema.serializeKey(next);  	byte[] serializedValue = schema.serializeValue(next);  	String targetTopic = schema.getTargetTopic(next);  	if (targetTopic == null) {  		targetTopic = defaultTopicId;  	}    	Long timestamp = null;  	if (this.writeTimestampToKafka) {  		timestamp = context.timestamp();  	}    	ProducerRecord<byte[], byte[]> record;  	int[] partitions = topicPartitionsMap.get(targetTopic);  	if (null == partitions) {  		partitions = getPartitionsByTopic(targetTopic, transaction.producer);  		topicPartitionsMap.put(targetTopic, partitions);  	}  	if (flinkKafkaPartitioner != null) {  		record = new ProducerRecord<>(  			targetTopic,  			flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions),  			timestamp,  			serializedKey,  			serializedValue);  	} else {  		record = new ProducerRecord<>(targetTopic, null, timestamp, serializedKey, serializedValue);  	}  	pendingRecords.incrementAndGet();  	transaction.producer.send(record, callback);  }

FlinkKafkaProducer011中實現的invoke函數就是將輸入的消息(next)構造為一個Kafka record,並調用Kafka客戶端的send方法發送出去。

@Override  protected KafkaTransactionState beginTransaction() throws FlinkKafka011Exception {  	switch (semantic) {  		case EXACTLY_ONCE:  			FlinkKafkaProducer<byte[], byte[]> producer = createTransactionalProducer();  			producer.beginTransaction();  			return new KafkaTransactionState(producer.getTransactionalId(), producer);  		case AT_LEAST_ONCE:  		case NONE:  			// Do not create new producer on each beginTransaction() if it is not necessary  			final KafkaTransactionState currentTransaction = currentTransaction();  			if (currentTransaction != null && currentTransaction.producer != null) {  				return new KafkaTransactionState(currentTransaction.producer);  			}  			return new KafkaTransactionState(initNonTransactionalProducer(true));  		default:  			throw new UnsupportedOperationException("Not implemented semantic");  	}  }    @Override  protected void preCommit(KafkaTransactionState transaction) throws FlinkKafka011Exception {  	switch (semantic) {  		case EXACTLY_ONCE:  		case AT_LEAST_ONCE:  			flush(transaction);  			break;  		case NONE:  			break;  		default:  			throw new UnsupportedOperationException("Not implemented semantic");  	}  	checkErroneous();  }    @Override  protected void commit(KafkaTransactionState transaction) {  	if (transaction.isTransactional()) {  		try {  			transaction.producer.commitTransaction();  		} finally {  			recycleTransactionalProducer(transaction.producer);  		}  	}  }    @Override  protected void abort(KafkaTransactionState transaction) {  	if (transaction.isTransactional()) {  		transaction.producer.abortTransaction();  		recycleTransactionalProducer(transaction.producer);  	}  }

FlinkKafkaProducer011中實現的beginTransaction、preCommit、commit、abort主要就是調用Kafka Producer客戶端對應的兩階段提交協議的函數。另外值得注意的有2點:

  • 在preCommit函數中調用了flush方法。從TwoPhaseCommitSinkFunction的分析中可以看到preCommit是在snapshotState方法中調用的,而snapshotState方法是在運算元Checkpoint的時候觸發的。這樣就保證了運算元在做Checkpoint時,所有該Checkpoint之前的數據都已經安全的發送到了下游(而不是在快取中)。以圖三為例,sink運算元在收到第一個Checkpoint barrier時觸發Checkpoint操作,而在Checkpoint完成之前,必須保證m1-m5這5條消息都已經發送到了下游,否則如果Checkpoint完成,而m1-m5中有消息沒有送達,就會發生消息丟失。在snapshotState方法中保證快取中的數據都已經發送出去是一個很通用的做法,在自己實現訂製化SinkFunction時也要注意。這裡的flush方法最終調用的是Kafka Producer客戶端的flush方法,這是一個阻塞的方法,會等到所有快取中的消息真正發給Kafka才返回,所以有時看到Checkpoint時間有毛刺,也可能是受這個flush的影響。
  • 在beginTransaction里調用了getTransactionalId,在commit和abort中調用了recycleTransactionalProducer。這裡可以回顧下第二部分中提到的如何生成Kafka transactional id的問題,看一下Flink是如何產生這個id的。從下面的程式碼中可以看出Flink用一個隊列作為transactional id的Pool,新的Transaction開始時從隊頭拿出一個transactional id,Transaction結束時將transactional id放回隊尾。因為每開始一個Transaction,都會構造一個新的Kafka Producer,因此availableTransactionalIds初始的大小就是配置的Kafka Producer Pool Size(默認是5)。
/**   * Pool of available transactional ids.   */  private final BlockingDeque<String> availableTransactionalIds = new LinkedBlockingDeque<>();    /**   * For each checkpoint we create new {@link FlinkKafkaProducer} so that new transactions will not clash   * with transactions created during previous checkpoints ({@code producer.initTransactions()} assures that we   * obtain new producerId and epoch counters).   */  private FlinkKafkaProducer<byte[], byte[]> createTransactionalProducer() throws FlinkKafka011Exception {  	String transactionalId = availableTransactionalIds.poll();  	if (transactionalId == null) {  		throw new FlinkKafka011Exception(  			FlinkKafka011ErrorCode.PRODUCERS_POOL_EMPTY,  			"Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpoints.");  	}  	FlinkKafkaProducer<byte[], byte[]> producer = initTransactionalProducer(transactionalId, true);  	producer.initTransactions();  	return producer;  }    private void recycleTransactionalProducer(FlinkKafkaProducer<byte[], byte[]> producer) {  	availableTransactionalIds.add(producer.getTransactionalId());  	producer.close();  }

四、總結

Flink使用Kafka的消息事務實現的端到端Exactly Once消息送達,其實是一個比較通用的解決方案,了解了其原理,可以很快將這種方案套用到其他支援事務的外部存儲或消息隊列。

Flink使用Kafka事務的方式,對於業務開發中正確使用Kafka也是一個很好的demo,在其他工程中使用Kafka實現消息的強一致性,也可以借鑒Flink的程式碼。

參考文獻

1 Kafka 設計解析(八):Kafka 事務機制與 Exactly Once 語義實現原理。https://www.infoq.cn/article/kafka-analysis-part-8

2 Exactly Once Delivery and Transactional Messaging in Kafka. https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit#

3 An Overview of End-to-End Exactly-Once Processing in Apache Flink® (with Apache Kafka, too!). https://www.ververica.com/blog/end-to-end-exactly-once-processing-apache-flink-apache-kafka