­

使用 Apache Flink 開發實時ETL

  • 2019 年 10 月 6 日
  • 筆記

來源:薄荷腦的部落格

作者:薄荷腦

大數據開發領域最強公眾號!
暴走大數據!
By 大數據技術與架構

場景描述:本文將介紹如何使用 Flink 開發實時 ETL 程式,並介紹 Flink 是如何保證其 Exactly-once 語義的。

關鍵詞:Flink ETL

版權聲明:本文作者為薄荷腦,經授權轉載。

https://blog.csdn.net/zjerryj/article/details/85381098

未經作者同意不得二次轉載。

Apache Flink 是大數據領域又一新興框架。它與 Spark 的不同之處在於,它是使用流式處理來模擬批量處理的,因此能夠提供亞秒級的、符合 Exactly-once 語義的實時處理能力。Flink 的使用場景之一是構建實時的數據通道,在不同的存儲之間搬運和轉換數據。本文將介紹如何使用 Flink 開發實時 ETL 程式,並介紹 Flink 是如何保證其 Exactly-once 語義的。

案例

讓我們來編寫一個從 Kafka 抽取數據到 HDFS 的程式。數據源是一組事件日誌,其中包含了事件發生的時間,以時間戳的方式存儲。我們需要將這些日誌按事件時間分別存放到不同的目錄中,即按日分桶。時間日誌示例如下:

{"timestamp":1545184226.432,"event":"page_view","uuid":"ac0e50bf-944c-4e2f-bbf5-a34b22718e0c"}  {"timestamp":1545184602.640,"event":"adv_click","uuid":"9b220808-2193-44d1-a0e9-09b9743dec55"}  {"timestamp":1545184608.969,"event":"thumbs_up","uuid":"b44c3137-4c91-4f36-96fb-80f56561c914"}

產生的目錄結構為:

/user/flink/event_log/dt=20181219/part-0-1  /user/flink/event_log/dt=20181220/part-1-9

創建項目

Flink 應用程式需要使用 Java 8 編寫,我們可以使用 Maven 模板創建項目:

mvn archetype:generate     -DarchetypeGroupId=org.apache.flink     -DarchetypeArtifactId=flink-quickstart-java     -DarchetypeVersion=1.7.0

將生成好的程式碼導入到 IDE 中,可以看到名為 StreamingJob 的文件,我們由此開始編寫程式。

Kafka 數據源

Flink 對 Kafka 數據源提供了原生支援,我們需要選擇正確的 Kafka 依賴版本,將其添加到 POM 文件中:

<dependency>    <groupId>org.apache.flink</groupId>    <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>    <version>${flink.version}</version>  </dependency>

測試過程中,我們需要一個能夠運行的 Kafka 服務,讀者可以參照官方文檔 搭建本地服務。在 Flink 中初始化 Kafka 數據源時,傳入伺服器名和主題名就可以了:

Properties props = new Properties();  props.setProperty("bootstrap.servers", "localhost:9092");  FlinkKafkaConsumer010<String> consumer = new FlinkKafkaConsumer010<>(      "flink_test", new SimpleStringSchema(), props);  DataStream<String> stream = env.addSource(consumer);

Flink 會連接本地的 Kafka 服務,讀取 flink_test 主題中的數據,轉換成字元串後返回。除了 SimpleStringSchema,Flink 還提供了其他內置的反序列化方式,如 JSON、Avro 等,我們也可以編寫自定義邏輯。

流式文件存儲

StreamingFileSink 替代了先前的 BucketingSink,用來將上游數據存儲到 HDFS 的不同目錄中。它的核心邏輯是分桶,默認的分桶方式是 DateTimeBucketAssigner,即按照處理時間分桶。處理時間指的是消息到達 Flink 程式的時間,這點並不符合我們的需求。因此,我們需要自己編寫程式碼將事件時間從消息體中解析出來,按規則生成分桶的名稱:

public class EventTimeBucketAssigner implements BucketAssigner<String, String> {    @Override    public String getBucketId(String element, Context context) {      JsonNode node = mapper.readTree(element);      long date = (long) (node.path("timestamp").floatValue() * 1000);      String partitionValue = new SimpleDateFormat("yyyyMMdd").format(new Date(date));      return "dt=" + partitionValue;    }  }

上述程式碼會使用 Jackson 庫對消息體進行解析,將時間戳轉換成日期字元串,添加前綴後返回。如此一來,StreamingFileSink 就能知道應該將當前記錄放置到哪個目錄中了。

StreamingFileSink<String> sink = StreamingFileSink      .forRowFormat(new Path("/tmp/kafka-loader"), new SimpleStringEncoder<String>())      .withBucketAssigner(new EventTimeBucketAssigner())      .build();  stream.addSink(sink);

forRowFormat 表示輸出的文件是按行存儲的,對應的有 forBulkFormat,可以將輸出結果用 Parquet 等格式進行壓縮存儲。

關於 StreamingFileSink 還有一點要注意,它只支援 Hadoop 2.7 以上的版本,因為需要用到高版本文件系統提供的 truncate 方法來實現故障恢復,這點下文會詳述。

開啟檢查點

程式碼編寫到這裡,其實已經可以通過 env.execute() 來運行了。但是,它只能保證 At-least-once 語義,即消息有可能會被重複處理。要做到 Exactly-once,我們還需要開啟 Flink 的檢查點功能:

env.enableCheckpointing(60_000);  env.setStateBackend((StateBackend) new FsStateBackend("/tmp/flink/checkpoints"));  env.getCheckpointConfig().enableExternalizedCheckpoints(      ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);

檢查點(Checkpoint)是 Flink 的故障恢復機制,同樣會在下文詳述。程式碼中,我們將狀態存儲方式由 MemoryStateBackend 修改為了 FsStateBackend,即使用外部文件系統,如 HDFS,來保存應用程式的中間狀態,這樣當 Flink JobManager 宕機時,也可以恢復過來。Flink 還支援 RocksDBStateBackend,用來存放較大的中間狀態,並能支援增量的狀態更新。

提交與管理腳本

Flink 程式可以直接在 IDE 中調試。我們也可以搭建一個本地的 Flink 集群,並通過 Flink CLI 命令行工具來提交腳本:

bin/flink run -c com.shzhangji.flinksandbox.kafka.KafkaLoader target/flink-sandbox-0.1.0.jar

腳本的運行狀態可以在 Flink 儀錶盤中查看:

使用暫存點來停止和恢復腳本

當需要暫停腳本、或對程式邏輯進行修改時,我們需要用到 Flink 的暫存點機制(Savepoint)。暫存點和檢查點類似,同樣保存的是 Flink 各個運算元的狀態數據(Operator State)。不同的是,暫存點主要用於人為的腳本更替,而檢查點則主要由 Flink 控制,用來實現故障恢復。flink cancel -s 命令可以在停止腳本的同時創建一個暫存點:

$ bin/flink cancel -s /tmp/flink/savepoints 1253cc85e5c702dbe963dd7d8d279038  Cancelled job 1253cc85e5c702dbe963dd7d8d279038. Savepoint stored in file:/tmp/flink/savepoints/savepoint-1253cc-0df030f4f2ee.

在 YARN 上運行

要將腳本提交到 YARN 集群上運行,同樣是使用 flink run 命令。首先將程式碼中指定文件目錄的部分添加上 HDFS 前綴,如 hdfs://localhost:9000/,重新打包後執行下列命令:

$ export HADOOP_CONF_DIR=/path/to/hadoop/conf  $ bin/flink run -m yarn-cluster -c com.shzhangji.flinksandbox.kafka.KafkaLoader target/flink-sandbox-0.1.0.jar  Submitted application application_1545534487726_0001

Flink 儀錶盤會在 YARN Application Master 中運行,我們可以通過 ResourceManager 介面進入。返回的應用 ID 可以用來管理腳本,添加 -yid 參數即可:

bin/flink cancel -s hdfs://localhost:9000/tmp/flink/savepoints -yid application_1545534487726_0001 84de00a5e193f26c937f72a9dc97f386

Flink 如何保證 Exactly-once 語義

Flink 實時處理程式可以分為三個部分,數據源、處理流程、以及輸出。不同的數據源和輸出提供了不同的語義保證,Flink 統稱為 連接器。處理流程則能提供 Exactly-once 或 At-least-once 語義,需要看檢查點是否開啟。

實時處理與檢查點

Flink 的檢查點機制是基於 Chandy-Lamport 演算法的:Flink 會定時在數據流中安插輕量的標記資訊(Barrier),將消息流切割成一組組記錄;當某個運算元處理完一組記錄後,就將當前狀態保存為一個檢查點,提交給 JobManager,該組的標記資訊也會傳遞給下游;當末端的運算元(通常是 Sink)處理完這組記錄並提交檢查點後,這個檢查點將被標記為「已完成」;當腳本出現問題時,就會從最後一個「已完成」的檢查點開始重放記錄。

如果運算元有多個上游,Flink 會使用一種稱為「消息對齊」的機制:如果某個上游出現延遲,當前運算元會停止從其它上游消費消息,直到延遲的上游趕上進度,這樣就保證了運算元中的狀態不會包含下一批次的記錄。顯然,這種方式會引入額外的延遲,因此除了這種 EXACTLY_ONCE 模式,我們也可將檢查點配置為 AT_LEAST_ONCE,以獲得更高的吞吐量。具體方式請參考 官方文檔。

可重放的數據源

當出錯的腳本需要從上一個檢查點恢復時,Flink 必須對數據進行重放,這就要求數據源支援這一功能。Kafka 是目前使用得較多的消息隊列,且支援從特定位點進行消費。具體來說,FlinkKafkaConsumer 類實現了 CheckpointedFunction 介面,會在檢查點中存放主題名、分區名、以及偏移量:

abstract class FlinkKafkaConsumerBase implements CheckpointedFunction {    public void initializeState(FunctionInitializationContext context) {      OperatorStateStore stateStore = context.getOperatorStateStore();      this.unionOffsetStates = stateStore.getUnionListState(new ListStateDescriptor<>(          OFFSETS_STATE_NAME,          TypeInformation.of(new TypeHint<Tuple2<KafkaTopicPartition, Long>>() {})));        if (context.isRestored()) {        for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : unionOffsetStates.get()) {          restoredState.put(kafkaOffset.f0, kafkaOffset.f1);        }      }    }      public void snapshotState(FunctionSnapshotContext context) {      unionOffsetStates.clear();      for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) {        unionOffsetStates.add(Tuple2.of(kafkaTopicPartitionLongEntry.getKey(),            kafkaTopicPartitionLongEntry.getValue()));      }    }  }

當數據源運算元從檢查點或暫存點恢復時,我們可以在 TaskManager 的日誌中看到以下資訊,表明當前消費的偏移量是從運算元狀態中恢復出來的:

2018-12-23 10:56:47,380 INFO FlinkKafkaConsumerBase  Consumer subtask 0 will start reading 2 partitions with offsets in restored state:    {KafkaTopicPartition{topic='flink_test', partition=1}=725,     KafkaTopicPartition{topic='flink_test', partition=0}=721}

恢複寫入中的文件

程式運行過程中,StreamingFileSink 首先會將結果寫入中間文件,以 . 開頭、in-progress 結尾。這些中間文件會在符合一定條件後更名為正式文件,取決於用戶配置的 RollingPolicy,默認策略是基於時間(60 秒)和基於大小(128 MB)。當腳本出錯或重啟時,中間文件會被直接關閉;在恢復時,由於檢查點中保存了中間文件名和成功寫入的長度,程式會重新打開這些文件,切割到指定長度(Truncate),然後繼續寫入。這樣一來,文件中就不會包含檢查點之後的記錄了,從而實現 Exactly-once。

以 Hadoop 文件系統舉例,恢復的過程是在 HadoopRecoverableFsDataOutputStream 類的構造函數中進行的。它會接收一個 HadoopFsRecoverable 類型的結構,裡面包含了中間文件的路徑和長度。這個對象是 BucketState 的成員,會被保存在檢查點中。

HadoopRecoverableFsDataOutputStream(FileSystem fs, HadoopFsRecoverable recoverable) {    this.tempFile = checkNotNull(recoverable.tempFile());    truncate(fs, tempFile, recoverable.offset());    out = fs.append(tempFile);  }

結論

Apache Flink 構建在實時處理之上,從設計之初就充分考慮了中間狀態的保存,而且能夠很好地與現有 Hadoop 生態環境結合,因而在大數據領域非常有競爭力。它還在高速發展之中,近期也引入了 Table API、流式 SQL、機器學習等功能,像阿里巴巴這樣的公司也在大量使用和貢獻程式碼。Flink 的應用場景眾多,有很大的發展潛力,值得一試。