Hive 終於等來了 Flink
- 2020 年 2 月 10 日
- 筆記
等疫情過去了,我們一起看春暖花開。

Apache Spark 什麼時候開始支援集成 Hive 功能?
筆者相信只要使用過 Spark 的讀者,應該都會說這是很久以前的事情了。
那 Apache Flink 什麼時候支援與 Hive 的集成呢?
讀者可能有些疑惑,還沒有支援吧,沒用過?或者說最近版本才支援,但是功能還比較弱。
其實比較也沒啥意義,不同社區發展的目標總是會有差異,而且 Flink 在真正的實時流計算方面投入的精力很多。不過筆者想表達的是,Apache Hive 已經成為數據倉庫生態系統的焦點,它不僅是一個用於大數據分析和 ETL 的 SQL 引擎,也是一個數據管理平台,所以無論是 Spark,還是 Flink,或是 Impala、Presto 等,都會積極地支援集成 Hive 的功能。
的確,對真正需要使用 Flink 訪問 Hive 進行數據讀寫的讀者會發現,Apache Flink 1.9.0 版本才開始提供與 Hive 集成的功能。不過,值得欣慰的是,Flink 社區在集成 Hive 功能方面付出很多,目前進展也比較順利,最近 Flink 1.10.0 RC1 版本已經發布,感興趣的讀者可以進行調研和驗證功能。
架構設計
首先,筆者基於社區公開的資料以及部落格,概括性地講解 Flink 集成 Hive 的架構設計。
Apache Flink 與 Hive 集成的目的,主要包含了元數據和實際表數據的訪問。
1. 元數據
為了訪問外部系統的元數據,Flink 剛開始提供了 ExternalCatalog 的概念。但是 ExternalCatalog 的定義非常不完整,基本處於不可用的狀態。Flink 1.10 版本正式刪除了 ExternalCatalog API (FLINK-13697),這包括:
- ExternalCatalog(以及所有依賴的類,比如 ExternalTable)
- SchematicDescriptor、MetadataDescriptor 和 StatisticsDescriptor
針對 ExternalCatalog 的問題,Flink 社區提出了一套全新的 Catalog 介面(new Catalog API)來取代現有的 ExternalCatalog。新的 Catalog 實現的功能包括:
- 能夠支援資料庫、表、分區等多種元數據對象
- 允許在一個用戶 Session 中維護多個 Catalog 實例,從而支援同時訪問多個外部系統
- Catalog 以可插拔的方式接入 Flink,允許用戶提供自定義的實現
下圖展示了新的 Catalog API 的總體架構:

創建 TableEnvironment 的時候會同時創建一個 CatalogManager,負責管理不同的 Catalog 實例。TableEnvironment 通過 Catalog 來為 Table API 和 SQL Client 用戶提供元數據服務。
val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build() val tableEnv = TableEnvironment.create(settings) val name = "myhive" val defaultDatabase = "mydatabase" val hiveConfDir = "/opt/hive-conf" // a local path val version = "2.3.4" val hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version) tableEnv.registerCatalog("myhive", hive) // set the HiveCatalog as the current catalog of the session tableEnv.useCatalog("myhive")
目前 Catalog 有兩個實現,GenericInMemoryCatalog 和 HiveCatalog。其中 GenericInMemoryCatalog 保持了原有的 Flink 元數據管理機制,將所有元數據保存在記憶體中。而 HiveCatalog 會與一個 Hive Metastore 的實例連接,提供元數據持久化的能力。要使用 Flink 與 Hive 進行交互,用戶需要配置一個 HiveCatalog,並通過 HiveCatalog 訪問 Hive 中的元數據。另一方面,HiveCatalog 也可以用來處理 Flink 自身的元數據,在這種場景下,HiveCatalog 僅將 Hive Metastore 作為持久化存儲使用,寫入 Hive Metastore 中的元數據並不一定是 Hive 所支援的格式。一個 HiveCatalog 實例可以同時支援這兩種模式,用戶無需為管理 Hive 和 Flink 的元數據創建不同的實例。
另外,通過設計 HiveShim 來支援不同版本的 Hive Metastore,具體支援的 Hive 版本列表,請參考官方文檔。
2. 表數據
Flink 提供了 Hive Data Connector 來讀寫 Hive 的表數據。Hive Data Connector 儘可能的復用了 Hive 本身的 Input/Output Format 和 SerDe 等類,這樣做的好處一方面是減少了程式碼重複,更重要的是可以最大程度的保持與 Hive 的兼容,即 Flink 寫入的數據 Hive 可以正常讀取,並且反之亦然。
集成 Hive 功能
Flink 與 Hive 集成的功能在 1.9.0 版本中作為試用功能發布,存在不少使用的局限性,但是不久將發布的 Flink 1.10 穩定版本會更加完善集成 Hive 的功能並應用到企業場景中。
為了讓讀者提前體驗 Flink 1.10 集成 Hive 的功能,筆者會基於 Cloudera CDH 編譯 Flink 1.10.0 RC1 版本並進行較為完整的測試。
1. 環境資訊
CDH 版本:cdh5.16.2
Flink 版本:release-1.10.0-rc1
Flink 使用了 RC 版本,僅供測試,不建議用於生產環境。 目前 Cloudera Data Platform 正式集成了 Flink 作為其流計算產品,非常方便用戶使用。
CDH 環境開啟了 Sentry 和 Kerberos。
2. 下載並編譯 Flink
$ wget https://github.com/apache/flink/archive/release-1.10.0-rc1.tar.gz $ tar zxvf release-1.10.0-rc1.tar.gz $ cd flink-release-1.10.0-rc1/ $ mvn clean install -DskipTests -Pvendor-repos -Dhadoop.version=2.6.0-cdh5.16.2
不出意外的話,編譯到 flink-hadoop-fs 模組時,會報如下錯誤:
[ERROR] Failed to execute goal on project flink-hadoop-fs: Could not resolve dependencies for project org.apache.flink:flink-hadoop-fs:jar:1.10.0: Failed to collect dependencies at org.apache.flink:flink-shaded-hadoop-2:jar:2.6.0-cdh5.16.2-9.0: Failed to read artifact descriptor for org.apache.flink:flink-shaded-hadoop-2:jar:2.6.0-cdh5.16.2-9.0: Could not transfer artifact org.apache.flink:flink-shaded-hadoop-2:pom:2.6.0-cdh5.16.2-9.0 from/to HDPReleases (https://repo.hortonworks.com/content/repositories/releases/): Remote host closed connection during handshake: SSL peer shut down incorrectly
編譯中遇到 flink-shaded-hadoop-2 找不到的問題,其實查看 Maven 倉庫會發現,根本原因是 CDH 的 flink-shaded-hadoop-2 的 jar 包在 Maven 中央倉庫是沒有對應的編譯版本,所以需要先對 Flink 依賴的 flink-shaded-hadoop-2 進行打包,再進行編譯。
解決 flink-shaded-hadoop-2 問題
(1). 獲取 flink-shaded 源碼
git clone https://github.com/apache/flink-shaded.git
(2). 切換依賴的版本分支
根據上面報錯時提示缺少的版本切換對應的程式碼分支,即缺少的是 9.0 版本的 flink-shaded-hadoop-2:
git checkout release-9.0
(3). 配置 CDH Repo 倉庫
修改 flink-shaded 項目中的 pom.xml,添加 CDH maven 倉庫,否則編譯時找不到 CDH 相關的包。
在 <profiles>...</profiles>
中添加如下內容:
<profile> <id>vendor-repos</id> <activation> <property> <name>vendor-repos</name> </property> </activation> <!-- Add vendor maven repositories --> <repositories> <!-- Cloudera --> <repository> <id>cloudera-releases</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos</url> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>false</enabled> </snapshots> </repository> </repositories> </profile>
(4). 編譯 flink-shaded
開始執行編譯:
mvn clean install -DskipTests -Drat.skip=true -Pvendor-repos -Dhadoop.version=2.6.0-cdh5.16.2
建議通過科學上網方式編譯,如果讀者遇到一些網路連接的問題,可以試著重試或者更換依賴組件的倉庫地址。
編譯成功後,就會把 flink-shaded-hadoop-2-uber-2.6.0-cdh5.16.2-9.0.jar 安裝在本地 maven 倉庫,如下為編譯的最後日誌:
Installing /Users/…/source/flink-shaded/flink-shaded-hadoop-2-uber/target/flink-shaded-hadoop-2-uber-2.6.0-cdh5.16.2-9.0.jar to /Users/…/.m2/repository/org/apache/flink/flink-shaded-hadoop-2-uber/2.6.0-cdh5.16.2-9.0/flink-shaded-hadoop-2-uber-2.6.0-cdh5.16.2-9.0.jar Installing /Users/…/source/flink-shaded/flink-shaded-hadoop-2-uber/target/dependency-reduced-pom.xml to /Users/…/.m2/repository/org/apache/flink/flink-shaded-hadoop-2-uber/2.6.0-cdh5.16.2-9.0/flink-shaded-hadoop-2-uber-2.6.0-cdh5.16.2-9.0.pom
3. 重新編譯 Flink
mvn clean install -DskipTests -Pvendor-repos -Dhadoop.version=2.6.0-cdh5.16.2
漫長的等待過程,讀者可以並行做其他事情。
編譯過程中,如果不出意外的話,會看到類似下面的錯誤資訊:
[INFO] Running 'npm ci –cache-max=0 –no-save' in /Users/xxx/Downloads/Flink/flink-release-1.10.0-rc1/flink-release-1.10.0-rc1/flink-runtime-web/web-dashboard [WARNING] npm WARN prepare removing existing node_modules/ before installation [ERROR] WARN registry Unexpected warning for https://registry.npmjs.org/: Miscellaneous Warning ECONNRESET: request to https://registry.npmjs.org/mime/-/mime-2.4.0.tgz failed, reason: read ECONNRESET [ERROR] WARN registry Using stale package data from https://registry.npmjs.org/ due to a request error during revalidation. [ERROR] WARN registry Unexpected warning for https://registry.npmjs.org/: Miscellaneous Warning ECONNRESET: request to https://registry.npmjs.org/optimist/-/optimist-0.6.1.tgz failed, reason: read ECONNRESET
可以看到, flink-runtime-web 模組引入了對 frontend-maven-plugin 的依賴,需要安裝 node、npm 和依賴組件。
如果沒有通過科學上網,可以修改 flink-runtime-web/pom.xml 文件,添加 nodeDownloadRoot 和 npmDownloadRoot 的資訊:
<plugin> <groupId>com.github.eirslett</groupId> <artifactId>frontend-maven-plugin</artifactId> <version>1.6</version> <executions> <execution> <id>install node and npm</id> <goals> <goal>install-node-and-npm</goal> </goals> <configuration> <nodeDownloadRoot>https://registry.npm.taobao.org/dist/</nodeDownloadRoot> <npmDownloadRoot>https://registry.npmjs.org/npm/-/</npmDownloadRoot> <nodeVersion>v10.9.0</nodeVersion> </configuration> </execution> <execution> <id>npm install</id> <goals> <goal>npm</goal> </goals> <configuration> <arguments>ci --cache-max=0 --no-save</arguments> <environmentVariables> <HUSKY_SKIP_INSTALL>true</HUSKY_SKIP_INSTALL> </environmentVariables> </configuration> </execution> <execution> <id>npm run build</id> <goals> <goal>npm</goal> </goals> <configuration> <arguments>run build</arguments> </configuration> </execution> </executions> <configuration> <workingDirectory>web-dashboard</workingDirectory> </configuration> </plugin>
編譯成功後,Flink 安裝文件位於 flink-release-1.10.0-rc1/flink-dist/target/flink-1.10.0-bin 目錄下,打包並上傳到部署到節點:
$ cd flink-dist/target/flink-1.10.0-bin $ tar zcvf flink-1.10.0.tar.gz flink-1.10.0
4. 部署和配置
Flink 部署比較簡單,解壓縮包即可。另外可以設置軟鏈接、環境變數等,筆者不再介紹。
Flink 的核心配置文件是 flink-conf.yaml,一個典型的配置如下:
jobmanager.rpc.address: localhost jobmanager.rpc.port: 6123 jobmanager.heap.size: 2048m taskmanager.heap.size: 1024m taskmanager.numberOfTaskSlots: 4 parallelism.default: 1 high-availability: zookeeper high-availability.storageDir: hdfs:///user/flink110/recovery high-availability.zookeeper.quorum: zk1:2181,zk2:2181,zk3:2181 state.backend: filesystem state.checkpoints.dir: hdfs:///user/flink110/checkpoints state.savepoints.dir: hdfs:///user/flink110/savepoints jobmanager.execution.failover-strategy: region rest.port: 8081 taskmanager.memory.preallocate: false classloader.resolve-order: parent-first security.kerberos.login.use-ticket-cache: true security.kerberos.login.keytab: /home/flink_user/flink_user.keytab security.kerberos.login.principal: flink_user jobmanager.archive.fs.dir: hdfs:///user/flink110/completed-jobs historyserver.web.address: 0.0.0.0 historyserver.web.port: 8082 historyserver.archive.fs.dir: hdfs:///user/flink110/completed-jobs historyserver.archive.fs.refresh-interval: 10000
筆者只羅列了一些常見的配置參數,讀者根據實際情況修改。配置參數其實還是比較容易理解的,以後結合實戰的文章再進行詳細講解。
4.1 集成 Hive 配置的依賴
如果要使用 Flink 與 Hive 集成的功能,除了上面的配置外,用戶還需要添加相應的依賴:
- 如果需要使用 SQL Client,則需要將依賴的 jar 拷貝到 Flink 的 lib 目錄中
- 如果需要使用 Table API,則需要將相應的依賴添加到項目中(如 pom.xml)
<!-- Flink Dependency --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-hive_2.11</artifactId> <version>1.11-SNAPSHOT</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.11</artifactId> <version>1.11-SNAPSHOT</version> <scope>provided</scope> </dependency> <!-- Hive Dependency --> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>${hive.version}</version> <scope>provided</scope> </dependency>
筆者主要介紹使用 SQL Client 的方式,由於使用的 CDH 版本為 5.16.2,其中 Hadoop 版本為 2.6.0,Hive 版本為 1.1.0,所以需要將如下 jar 包拷貝到 flink 部署家目錄中的 lib 目錄下:
- Flink 的 Hive connector flink-connector-hive2.11-1.10.0.jar flink-hadoop-compatibility2.11-1.10.0.jar flink-orc_2.11-1.10.0.jar
flink-release-1.10.0-rc1/flink-connectors/flink-hadoop-compatibility/target/flink-hadoop-compatibility_2.11-1.10.0.jar flink-release-1.10.0-rc1/flink-connectors/flink-connector-hive/target/flink-connector-hive_2.11-1.10.0.jar flink-release-1.10.0-rc1/flink-formats/flink-orc/target/flink-orc_2.11-1.10.0.jar
- Hadoop 依賴 flink-shaded-hadoop-2-uber-2.6.0-cdh5.16.2-9.0.jar
flink-shaded/flink-shaded-hadoop-2-uber/target/flink-shaded-hadoop-2-uber-2.6.0-cdh5.16.2-9.0.jar
- Hive 依賴 hive-exec-1.1.0-cdh5.16.2.jar hive-metastore-1.1.0-cdh5.16.2.jar libfb303-0.9.3.jar
/opt/cloudera/parcels/CDH/lib/hive/lib/hive-exec-1.1.0-cdh5.16.2.jar /opt/cloudera/parcels/CDH/lib/hive/lib/hive-metastore-1.1.0-cdh5.16.2.jar /opt/cloudera/parcels/CDH/lib/hive/lib/libfb303-0.9.3.jar
其中 flink-shaded-hadoop-2-uber 包含了 Hive 對於 Hadoop 的依賴。如果不用 Flink 提供的包,用戶也可以將集群中使用的 Hadoop 包添加進來,不過需要保證添加的 Hadoop 版本與 Hive 所依賴的版本是兼容的。
依賴的 Hive 包(即 hive-exec 和 hive-metastore)也可以使用用戶集群中 Hive 所提供的 jar 包,詳情請見支援不同的 Hive 版本。
Flink 部署的節點要添加 Hadoop、Yarn 以及 Hive 的客戶端。
4.2 配置 HiveCatalog
多年來,Hive Metastore 在 Hadoop 生態系統中已發展成為事實上的元數據中心。許多公司在其生產中有一個單獨的 Hive Metastore 服務實例,以管理其所有元數據(Hive 元數據或非 Hive 元數據)。 如果同時部署了 Hive 和 Flink,那麼通過 HiveCatalog 能夠使用 Hive Metastore 來管理 Flink 的元數據。 如果僅部署 Flink,HiveCatalog 就是 Flink 開箱即用提供的唯一持久化的 Catalog。如果沒有持久化的 Catalog,那麼使用 Flink SQL CREATE DDL 時必須在每個會話中重複創建像 Kafka 表這樣的元對象,這會浪費大量時間。HiveCatalog 通過授權用戶只需要創建一次表和其他元對象,並在以後的跨會話中非常方便地進行引用和管理。
如果要使用 SQL Client 時,用戶需要在 sql-client-defaults.yaml 中指定自己所需的 Catalog,在 sql-client-defaults.yaml 的 catalogs 列表中可以指定一個或多個 Catalog 實例。
以下的示例展示了如何指定一個 HiveCatalog:
execution: planner: blink type: streaming ... current-catalog: myhive # set the HiveCatalog as the current catalog of the session current-database: mydatabase catalogs: - name: myhive type: hive hive-conf-dir: /opt/hive-conf # contains hive-site.xml hive-version: 2.3.4
其中:
- name 是用戶給每個 Catalog 實例指定的名字,Catalog 名字和 DB 名字構成了 FlinkSQL 中元數據的命名空間,因此需要保證每個 Catalog 的名字是唯一的。
- type 表示 Catalog 的類型,對於 HiveCatalog 而言,type 應該指定為 hive。
- hive-conf-dir 用於讀取 Hive 的配置文件,用戶可以將其設定為集群中 Hive 的配置文件目錄。
- hive-version 用於指定所使用的 Hive 版本。
指定了 HiveCatalog 以後,用戶就可以啟動 sql-client,並通過以下命令驗證 HiveCatalog 已經正確載入。
Flink SQL> show catalogs; default_catalog myhive Flink SQL> use catalog myhive;
其中 show catalogs 會列出載入的所有 Catalog 實例。需要注意的是,除了用戶在 sql-client-defaults.yaml 文件中配置的 Catalog 以外,FlinkSQL 還會自動載入一個 GenericInMemoryCatalog 實例作為內置的 Catalog,該內置 Catalog 默認名字為 default_catalog。
5. 讀寫 Hive 表
設置好 HiveCatalog 以後就可以通過 SQL Client 或者 Table API 來讀寫 Hive 中的表了。
假設 Hive 中已經有一張名為 mytable 的表,我們可以用以下的 SQL 語句來讀寫這張表。
5.1 讀數據
Flink SQL> show catalogs; myhive default_catalog Flink SQL> use catalog myhive; Flink SQL> show databases; default Flink SQL> show tables; mytable Flink SQL> describe mytable; root |-- name: name |-- type: STRING |-- name: value |-- type: DOUBLE Flink SQL> SELECT * FROM mytable; name value __________ __________ Tom 4.72 John 8.0 Tom 24.2 Bob 3.14 Bob 4.72 Tom 34.9 Mary 4.79 Tiff 2.72 Bill 4.33 Mary 77.7
5.2 寫數據
Flink SQL> INSERT INTO mytable SELECT 'Tom', 25; Flink SQL> INSERT OVERWRITE mytable SELECT 'Tom', 25; # 靜態分區 Flink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1', my_date='2019-08-08') SELECT 'Tom', 25; # 動態分區 Flink SQL> INSERT OVERWRITE myparttable SELECT 'Tom', 25, 'type_1', '2019-08-08'; # 靜態分區和動態分區 Flink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1') SELECT 'Tom', 25, '2019-08-08';
總結
在本文中,筆者首先介紹了 Flink 與 Hive 集成功能的架構設計,然後從源碼開始編譯,解決遇到的一些問題,接著部署和配置 Flink 環境以及集成 Hive 的具體操作過程,最後參考官方的案例,對 Hive 表進行讀寫操作。
後續,筆者會結合生產環境的實際使用情況,講解通過 Flink SQL 來操作 Hive。
參考
- https://ci.apache.org/projects/flink/flink-docs-release-1.10/flinkDev/building.html
- https://ververica.cn/developers/flink1-9-hive/