如何在 Flink 1.9 中使用 Hive?

  • 2019 年 10 月 5 日
  • 筆記

來源:ververica.cn

作者:李銳

Apache Hive PMC,阿里巴巴技術專家

阿里巴巴技術專家,Apache Hive PMC成員,加入阿里巴巴之前曾就職於Intel、IBM等公司,主要參與Hive、HDFS、Spark等開源項目。

Apache Flink 從 1.9.0 版本開始增加了與 Hive 集成的功能,用戶可以通過 Flink 來訪問 Hive 的元數據,以及讀寫 Hive 中的表。本文將主要從項目的設計架構、最新進展、使用說明等方面來介紹這一功能。

Flink on Hive 介紹

SQL 是大數據領域中的重要應用場景,為了完善 Flink 的生態,發掘 Flink 在批處理方面的潛力,我們決定增強 FlinkSQL 的功能,從而讓用戶能夠通過 Flink 完成更多的任務。

Hive 是大數據領域最早出現的 SQL 引擎,發展至今有著豐富的功能和廣泛的用戶基礎。之後出現的 SQL 引擎,如 Spark SQL、Impala 等,都在一定程度上提供了與 Hive 集成的功能,從而方便用戶使用現有的數據倉庫、進行作業遷移等。因此我們認為提供與 Hive 交互的能力對於 FlinkSQL 也是非常重要的。

設計架構

與 Hive 集成主要包含了元數據和實際表數據的訪問,因此我們會從這兩方面介紹一下該項目的架構。

1. 元數據

為了訪問外部系統的元數據,Flink 提供了 ExternalCatalog 的概念。但是目前 ExternalCatalog 的定義非常不完整,基本處於不可用的狀態。因此,我們提出了一套全新的 Catalog 介面來取代現有的 ExternalCatalog。新的 Catalog 能夠支援資料庫、表、分區等多種元數據對象;允許在一個用戶 Session 中維護多個 Catalog 實例,從而同時訪問多個外部系統;並且 Catalog 以可插拔的方式接入 Flink,允許用戶提供自定義的實現。下圖展示了新的 Catalog API 的總體架構。

創建 TableEnvironment 的時候會同時創建一個 CatalogManager,負責管理不同的 Catalog 實例。TableEnvironment 通過 Catalog 來為 Table API 和 SQL Client 用戶提供元數據服務。

目前 Catalog 有兩個實現,GenericInMemoryCatalog 和 HiveCatalog。其中 GenericInMemoryCatalog 保持了原有的 Flink 元數據管理機制,將所有元數據保存在記憶體中。而 HiveCatalog 會與一個 Hive Metastore 的實例連接,提供元數據持久化的能力。要使用 Flink 與 Hive 進行交互,用戶需要配置一個 HiveCatalog,並通過 HiveCatalog 訪問 Hive 中的元數據。另一方面,HiveCatalog 也可以用來處理 Flink 自身的元數據,在這種場景下,HiveCatalog 僅將 Hive Metastore 作為持久化存儲使用,寫入 Hive Metastore 中的元數據並不一定是 Hive 所支援的格式。一個 HiveCatalog 實例可以同時支援這兩種模式,用戶無需為管理 Hive 和 Flink 的元數據創建不同的實例。

另外,我們設計了 HiveShim 來支援不同版本的 Hive Metastore。目前支援的 Hive 版本包括 2.3.4 和 1.2.1。

2. 表數據

我們提供了 Hive Data Connector 來讀寫 Hive 的表數據。Hive Data Connector 儘可能的復用了 Hive 本身的 Input/Output Format 和 SerDe 等類,這樣做的好處一方面是減少了程式碼重複,更重要的是可以最大程度的保持與 Hive 的兼容,即 Flink 寫入的數據 Hive 可以正常讀取,並且反之亦然。

與 HiveCatalog 類似的,Hive Data Connector 目前支援的 Hive 版本也是 2.3.4 和 1.2.1。

項目進展

Flink 與 Hive 集成的功能會在 1.9.0 版本中作為試用功能發布,用戶可以通過 Table API 或者 SQL Client 的模式與 Hive 進行交互。下面列出的是在 1.9.0 中已經支援的功能:

  • 提供簡單的 DDL 來讀取 Hive 元數據,比如 show databases、show tables、describe table 等。
  • 可通過 Catalog API 來修改 Hive 元數據,如 create table、drop table 等。
  • 讀取 Hive 數據,支援分區表和非分區表。
  • 寫 Hive 數據,支援非分區表。
  • 支援 Text、ORC、Parquet、SequenceFile 等文件格式。
  • 支援調用用戶在 Hive 中創建的 UDF。

由於是試用功能,因此還有一些方面不夠完善,下面列出的是在 1.9.0 中缺失的功能:

  • 不支援INSERT OVERWRITE。
  • 不支援寫分區表。
  • 不支援ACID表。
  • 不支援Bucket表。
  • 不支援View。
  • 部分數據類型不支援,包括Decimal、Char、Varchar、Date、Time、Timestamp、Interval、Union等。

如何應用

1. 添加依賴

使用 Flink 與 Hive 集成的功能,用戶首先需要添加相應的依賴。如果是使用 SQL Client,則需要將依賴的 jar 添加到 Flink 的 lib 目錄中;如果使用 Table API,則需要將相應的依賴添加到項目中(如pom.xml)。

如上文所述,目前支援的 Hive 版本包括 2.3.4 和 1.2.1,下表列出的是針對不同版本所需的依賴。

Hive版本

所需依賴

2.3.4

flink-connector-hive_2.11flink-hadoop-compatibilityflink-shaded-hadoop-2-uber-2.7.5hive-exec

1.2.1

flink-connector-hive_2.11flink-hadoop-compatibilityflink-shaded-hadoop-2-uber-2.6.5hive-metastorehive-execlibfb303-0.9.3

其中 flink-shaded-hadoop-2-uber 包含了 Hive 對於 Hadoop 的依賴。如果不用 Flink 提供的包,用戶也可以將集群中使用的 Hadoop 包添加進來,不過需要保證添加的 Hadoop 版本與 Hive 所依賴的版本是兼容的(Hive 2.3.4 依賴的 Hadoop 版本是 2.7.2;Hive 1.2.1 依賴的 Hadoop 版本是 2.6.0)。

依賴的 Hive 包(即 hive-exec 和 hive-metastore)也可以使用用戶集群中 Hive 所提供的 jar 包,詳情請見支援不同的 Hive 版本。

2. 配置 HiveCatalog

要與 Hive 交互,必須使用 HiveCatalog,下面介紹一下如何配置 HiveCatalog。

3. SQL Client

使用 SQL Client 時,用戶需要在 sql-client-defaults.yaml 中指定自己所需的 Catalog,在 sql-client-defaults.yaml 的「catalogs」列表中可以指定一個或多個 Catalog 實例。以下的示例展示了如何指定一個 HiveCatalog:

catalogs:  # A typical catalog definition looks like:    - name: myhive      type: hive  hive-conf-dir: /path/to/hive_conf_dir  hive-version: 2.3.4

其中 name 是用戶給每個 Catalog 實例指定的名字, Catalog 名字和 DB 名字構成了 FlinkSQL 中元數據的命名空間,因此需要保證每個 Catalog 的名字是唯一的。type 表示 Catalog 的類型,對於 HiveCatalog 而言,type 應該指定為 hive。hive-conf-dir 用於讀取 Hive 的配置文件,用戶可以將其設定為集群中 Hive 的配置文件目錄。hive-version 用於指定所使用的 Hive 版本,可以設定為 2.3.4 或者 1.2.1。

指定了 HiveCatalog 以後,用戶就可以啟動 sql-client,並通過以下命令驗證 HiveCatalog 已經正確載入。

Flink SQL> show catalogs;  default_catalog  myhive    Flink SQL> use catalog myhive;

其中 show catalogs 會列出載入的所有 Catalog 實例。需要注意的是,除了用戶在sql-client-defaults.yaml 文件中配置的 Catalog 以外,FlinkSQL 還會自動載入一個 GenericInMemoryCatalog 實例作為內置的 Catalog,該內置 Catalog 默認名字為 default_catalog。

使用 use catalog 可以設定用戶 Session 當前的 Catalog。用戶在 SQL 語句中訪問元數據對象(如 DB、Table 等)時,如果不指定 Catalog 名字,則 FlinkSQL 會在當前 Catalog 中進行查找。

4. Table API

下面的程式碼展示了如何通過 TableAPI 來創建 HiveCatalog,並註冊到 TableEnvironment。

String name = "myhive";  String defaultDatabase = "default";  String hiveConfDir = "/path/to/hive_conf_dir";  String version = "2.3.4";    TableEnvironment tableEnv = …; // create TableEnvironment  HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase,  hiveConfDir, version);  tableEnv.registerCatalog(name, hiveCatalog);  tableEnv.useCatalog(name);

將 HiveCatalog 註冊到 TableEnvironment 以後,就可以在通過 TableEnvironment 提交 SQL 的時候訪問 HiveCatalog 中的元數據了。與 SQL Client 類似, TableEnvironment 也提供了 useCatalog 介面讓用戶設定當前 Catalog。

5. 讀寫 Hive 表

設置好 HiveCatalog 以後就可以通過 SQL Client 或者 Table API 來讀寫 Hive 中的表了。

6. SQL Client

假設 Hive 中已經有一張名為 src 的表,我們可以用以下的 SQL 語句來讀寫這張表。

Flink SQL> describe src;  root   |-- key: STRING   |-- value: STRING      Flink SQL> select * from src;                          key                     value                         100                   val_100                         298                   val_298                           9                     val_9                         341                   val_341                         498                   val_498                         146                   val_146                         458                   val_458                         362                   val_362                         186                   val_186                         ……                   ……    Flink SQL> insert into src values ('newKey','newVal');

7. Table API

類似的,也可以通過 Table API 來讀寫上面提到的這張表。下面的程式碼展示了如何實現這一操作。

TableEnvironment tableEnv = …; // create TableEnvironment  tableEnv.registerCatalog("myhive", hiveCatalog);  // set myhive as current catalog  tableEnv.useCatalog("myhive");    Table src = tableEnv.sqlQuery("select * from src");  // write src into a sink or do further analysis  ……    tableEnv.sqlUpdate("insert into src values ('newKey', 'newVal')");  tableEnv.execute("insert into src");

8. 支援不同的 Hive 版本

Flink 1.9.0 中支援的 Hive 版本是 2.3.4 和 1.2.1,目前我們只針對這兩個版本進行了測試。使用 SQL Client 時,如果用戶沒有在 sql-client-defaults.yaml 文件中指定 Hive 版本,我們會自動檢測 classpath 中的 Hive 版本。如果檢測到的 Hive 版本不是 2.3.4 或 1.2.1 就會報錯。

藉助 Hive 兼容性的保證,其它不同的小版本也比較可能是可以正常工作的。因此,如果用戶使用的 Hive 小版本與我們所支援的不同,可以指定一個支援的版本來試用與 Hive 集成的功能。比如用戶使用的 Hive 版本是 2.3.3,可以在 sql-client-defaults.yaml 文件或者程式碼中將 Hive 版本指定為 2.3.4。

9. 執行模式與 Planner 的選擇

Flink 1.9.0 中 Hive 的 TableSink 只能在 batch 模式下工作,因此如果用戶想要使用 Hive 的 TableSink,需要將執行模式設置為 batch。

Flink 1.9.0 增加了新的 blink planner,由於 blink planner 相比於原來的 planner 功能更加全面,因此我們建議在使用 FlinkSQL 與 Hive 集成時使用 blink planner。後續新的功能也可能會只支援 blink planner。

使用 SQL Client 時可以像這樣在 sql-client-defaults.yaml 中指定執行模式和 planner:

execution:    # select the implementation responsible for planning table programs    # possible values are 'old' (used by default) or 'blink'    planner: blink    # 'batch' or 'streaming' execution    type: batch

對應的 Table API 的寫法如下:

EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();  TableEnvironment tableEnv = TableEnvironment.create(settings);

後期規劃

我們會在 Flink 後續版本中進一步完善與 Hive 集成的功能,預計會在 1.10.0 版本中實現 Production-Ready。我們在後續版本中計劃開展的工作包括:

  • 更完整的數據類型支援
  • 支援寫分區表,包括靜態和動態分區
  • 支援 INSERT OVERWRITE
  • 支援 View
  • 更完整的 DDL、DML 的支援
  • 支援 Hive 的 TableSink 在 streaming 模式下工作,以便用戶將流式數據寫入到 Hive 中
  • 測試並支援更多的 Hive 版本
  • 支援 Bucket 表
  • 性能測試與優化

歡迎大家試用 Flink 1.9 中的 Hive 功能,如果遇到任何問題也歡迎大家通過釘釘、郵件列表等方式與我們聯繫。