聊聊Hive數據血緣——從Atlas沒有列級血緣的Bug講起

前幾天,Datahub提供了最新的欄位級別數據血緣功能,很多朋友迫不及待想對比一下Datahub的欄位級血緣與Atlas的區別。

這個時候問題來了,在Atlas收集Hive血緣的時候,由於部分版本問題,沒有顯示出欄位級的數據血緣。這是為什麼呢?其實只要做一個簡單的修復就可以了,但是知其然也要知其所以然。今天我們就來看一下這個問題到底是怎麼引起的,然後從HiveSql的語法樹講起,看看數據血緣到底是如何被檢測到的。

最後提醒,文檔版權為公眾號 大數據流動 所有,請勿商用。相關技術問題以及安裝包可以聯繫筆者獨孤 風加入相關技術交流群討論獲取

另外,為了將我之前寫作的文章,還有積累的資料留下來。去年的時候,我申請了知識星球《大數據流動資料庫》。雖然想進行一些具體的分類,但是精力有限,以後將只維護這一個知識星球。包括我寫作文章的PDF版本和收集的資料都會上傳到這裡,包括未來付出較多時間整理的付費文章,我也會同步到這裡。

星球分享各種相關資料,包括但不限於大數據,實時計算,數據治理,數據可視化,用戶畫像,及實現開源技術如Datahub,Superset,Atlas,Spark,Flink,Minio等等。

星球為學習資料首發地,並將永久存儲這些資料

星球為多個技術微信群問題解答匯總處;

定期組織實戰小組,共同學習進步;

現在加入可以查看之前上傳的所有內容,當然後期隨著內容不斷增加,需要購買一些知識星球的服務,為了更好的服務星友,價格會根據人數上調。

正文開始: 通過本文檔,可以快速的解決Hive在Altas欄位級血緣沒有生成的問題,並了解Hive數據血緣實現原理。更多元數據管理,數據血緣相關文章,可以關注後續的文章更新。 文檔共分為5個部分,層級結構如下圖所示。

一、Hive與Atlas集成全流程

Apache Atlas 為組織提供開放式元數據管理和治理功能,用以構建其數據資產目錄,對 這些資產進行分類和管理,形成數據字典。並為數據分析師和數據治理團隊,提供圍繞這些 數據資產的協作功能。

Atlas的安裝部署可以參考我之前的文章: 數據治理之元數據管理的利器——Atlas入門寶典

這次我們直接來看Atlas與Hive的集成過程。

首先準備,Hive連接Atlas的Hook包。

可以採用源碼打包的方式。

在HDP平台上,通常可以從/usr/hdp/3.1.5.0-152/atlas/hook/hive/atlas-hive-plugin-impl獲取Atlas Hive Hook的所有jar包(包括依賴包)。

  • 將 atlas-application.properties 配置文件,壓縮加入到 atlas-plugin-classloader-2.0.0.jar 中
#必須在此路徑打包,才能打到第一級目錄下
cd /usr/local/src/atlas/apache-atlas-2.1.0/conf

zip -u /usr/local/src/atlas/apache-atlas-2.1.0/hook/hive/atlas-plugin-classloader-2.1.0.jar atlas-application.properties
  • 修改 hive-site.xml

<property>
    <name>hive.exec.post.hooks</name>
    <value>org.apache.atlas.hive.hook.HiveHook</value>
</property>
  • 修改 hive-env.sh 的 Gateway 客戶端環境高級配置程式碼段(安全閥)

HIVE_AUX_JARS_PATH=/usr/local/src/atlas/apache-atlas-2.1.0/hook/hive
  • 修改 HIVE_AUX_JARS_PATH

  • 修改 hive-site.xml 的 HiveServer2 高級配置程式碼段(安全閥)

<property>
    <name>hive.exec.post.hooks</name>
    <value>org.apache.atlas.hive.hook.HiveHook</value>
</property>
<property>
    <name>hive.reloadable.aux.jars.path</name>
    <value>/usr/local/src/atlas/apache-atlas-2.1.0/hook/hive</value>
</property>
  • 修改 HiveServer2 環境高級配置程式碼段

HIVE_AUX_JARS_PATH=/usr/local/src/atlas/apache-atlas-2.1.0/hook/hive

需要將配置好的Atlas包發往各個hive節點後重啟集群。

但是,很多同學在按該步驟操作完以後,欄位級數據血緣並未生成。這是為什麼呢?

二、 CDH6Hive2.1無欄位數據血緣問題修復

原來是Hive是生成元數據日誌的一個bug,此bug描述的問題是,用如下語句操作Hive時:

create table t1(id int, name string);
create table t2 as select * from t1;

欄位血緣關係無法生成,也就是說源碼中這段程式碼不能生效。

lInfo = hookContext.getLinfo()
> for(Map.Entry<LineageInfo.DependencyKey, LineageInfo.Dependency> e : 
> lInfo.entrySet()) {
>     System.out.println("Col Lineage Key : " + e.getKey());
>     System.out.println("Col Lineage Value: " + e.getValue());
> }

隨後Hive也對該問題進行了修復,不過修復的版本是後續版本,所以前面的版本受到了一些影響。

該修補程式為:HIVE-14706,如需要獲取修補程式,可以關注大數據流動,回復「HIVE-14706」獲取。

影響的版本主要是 2.1.0和2.1.1,這個問題在2.2.0中進行了修復。

修補程式修復後,列級別數據血緣就能正常顯示了。

此外還有一些Atlas與Hive存在兼容性問題,本文基於Atlas2.1.0兼容CDH6.3.2部署。Hive版本為2.1.1.其他版本的問題不在此文檔討論。

為兼容Hive2.1.1,需要修改源碼重新編譯。

  • 所需修改的項目位置:apache-atlas-sources-2.1.0\addons\hive-bridge

①.org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java 577行

String catalogName = hiveDB.getCatalogName() != null ? hiveDB.getCatalogName().toLowerCase() : null;

改為:

String catalogName = null;

②.org/apache/atlas/hive/hook/AtlasHiveHookContext.java 81行

this.metastoreHandler = (listenerEvent != null) ? metastoreEvent.getIHMSHandler() : null;

改為:C:\Users\Desktop\apache-atlas-2.1.0-sources\apache-atlas-sources-2.1.0\addons

this.metastoreHandler = null;

三、Hive血緣的核心—鉤子函數

QQ拼音截圖未命名123

Hive的鉤子非常重要,首先來了解下Hive的執行過程。

如果Hive通過MapReduce作為計算引擎為例,具體處理流程如下:

  1. HQL解析生成AST語法樹

Antlr定義SQL的語法規則,完成SQL詞法和語法解析,將SQL轉化為抽象語法樹AST Tree

  1. 語法分析得到QueryBlock

遍歷AST Tree,抽象出查詢的基本組成單元QueryBlock

  1. 生成邏輯執行計劃

遍歷QueryBlock,翻譯為執行操作樹Operator Tree

  1. Logical Optimizer Operator進行邏輯優化

邏輯層優化器進行OperatorTree變換,合併不必要的ReduceSinkOperator,減少shuffle數據量

  1. 生成物理執行計劃Task Plan

遍歷Operator Tree,翻譯為MapReduce任務

  1. 物理優化Task Tree,構建執行計劃QueryPlan

物理層優化器進行MapReduce任務的變換,生成最終的執行計劃

  1. 表以及其他操作鑒權

  2. 執行引擎執行

在Hive Query整個生命周期中,會有如下鉤子函數被執行:

HiveDriverRunHook的preDriverRun

該鉤子函數由參數hive.exec.driver.run.hooks控制,決定要運行的pre hooks,多個鉤子實現類以逗號間隔,鉤子需實現 org.apache.hadoop.hive.ql.HiveDriverRunHook介面。

HiveSemanticAnalyzerHook的preAnalyze

在Driver開始run之前,HQL經過解析會進入編譯階段的語法分析,而在語法分析前會經過鉤子HiveSemanticAnalyzerHook的preAnalyze方法處理。該鉤子函數由hive.semantic.analyzer.hook配置,鉤子需實現org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHook介面。

HiveSemanticAnalyzerHook的postAnalyze

與preAnalyze同屬於一個鉤子類,配置參數相同,會執行所有配置的語義分析hooks,但它位於Hive的語法分析之後,可以獲取HQL的輸入和輸出表及分區資訊,以及語法分析得到的task資訊,由此可以判斷是否是需要分散式執行的任務,以及執行引擎是什麼。

生成執行計劃之前的redactor鉤子

該鉤子由hive.exec.query.redactor.hooks配置,多個實現類以逗號間隔,鉤子需繼承org.apache.hadoop.hive.ql.hooks.Redactor抽象類,並替換redactQuery方法。

這個鉤子函數是在語法分析之後,生成QueryPlan之前,所以執行它的時候語法分析已完成,具體要跑的任務已定,這個鉤子的目的在於完成QueryString的替換,比如QueryString中包含敏感的表或欄位資訊,在這裡都可以完成替換,從而在Yarn的RM介面或其他方式查詢該任務的時候,會顯示經過替換後的HQL。

task執行前的preExecutionHook

在執行計劃QueryPlan生成完,並通過鑒權後,就會執行具體的task,而task執行之前會經過一個鉤子函數,鉤子函數由hive.exec.pre.hooks配置,多個鉤子實現類以逗號間隔。實現方式:

1)實現org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext

通過實現該介面的run方法,執行所有的pre-execution hooks

// Pre/Post Execute Hook can run with the HookContext
public interface ExecuteWithHookContext extends Hook {

/** hookContext: The hook context passed to each hooks.
   *  HookContext帶有執行計劃、Hive的配置資訊、Lineage、UGI、提交的用戶以及輸入輸出表等資訊
   */
void run(HookContext hookContext) throws Exception;
}

2)實現org.apache.hadoop.hive.ql.hooks.PreExecute

該介面的run方法已經標註為過時,並且相對於ExecuteWithHookContext,PreExecute提供的資訊可能不能完全滿足我們的業務需求。

public interface PreExecute extends Hook {

/**
   * The run command that is called just before the execution of the query.
   * SessionState、UGI、HQL輸入表及分區資訊,HQL輸出表、分區以及本地和hdfs文件目錄資訊
   */
@Deprecated
public void run(SessionState sess, Set<ReadEntity> inputs,Set<WriteEntity> outputs, UserGroupInformation ugi) throws Exception;
}

task執行失敗時的ON_FAILURE_HOOKS

task執行失敗時,Hive會調用這個hook執行一些處理措施。該鉤子由參數hive.exec.failure.hooks配置,多個鉤子實現類以逗號間隔。需實實現org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext介面。

task執行完成時的postExecutionHook

在task任務執行完成後執行。如果task失敗,會先執行ON_FAILURE_HOOKS,之後執行postExecutionHook,該鉤子由參數hive.exec.post.hooks指定的hooks(多個鉤子實現類以逗號間隔)執行post execution hooks。實現方式:

1)實現org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext

2)實現org.apache.hadoop.hive.ql.hooks.PostExecute

ExecuteWithHookContext和PostExecute跟分別與上述task執行前的preExecutionHook、PreExecute對應,這裡不再贅述。

HiveDriverRunHook的postDriverRun

在查詢完成運行之後以及將結果返回給客戶端之前執行,與preDriverRun對應。

此外,Hive中已經有一些內置實現的hook,下面舉一些例子以及它們的主要作用:

ATSHook:實現了ExecuteWithHookContext,將查詢和計劃資訊推送到Yarn App Timeline Server。

DriverTestHook:實現了HiveDriverRunHook的preDriverRun方法(對postDriverRun是空實現),用於列印輸出的命令

EnforceReadOnlyTables:pre execute hook,實現了ExecuteWithHookContext,用於阻止修改只讀表。

LineageLogger:實現了ExecuteWithHookContext,它將查詢的血統資訊記錄到日誌文件中。LineageInfo包含有關query血統的所有資訊。

PreExecutePrinter和PostExecutePrinter:pre和post hook的示例,它將參數列印輸出。

PostExecTezSummaryPrinter:post execution hook,實現了ExecuteWithHookContext,可以列印Hive Tez計數器的相關資訊。

PostExecOrcFileDump:post execution hook,實現了ExecuteWithHookContext,用於列印ORC文件資訊。

UpdateInputAccessTimeHook:pre execution hook,可在運行查詢之前更新所有輸入表的訪問時間。

特彆強調一下LineageLogger和LineageInfo,對於做Hive血緣關係分析很有參考價值,這個下文會說。

通過對上面Hive中hook的執行”位置”和作用,以及Hive本身實現的一些Hook,分析可知:自定義hook,比如實現一個pre execution hook。

首先在maven的pom中引入hive-exec的依賴,如:

<dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>2.1.0</version>
</dependency>

此外,還需創建一個實現ExecuteWithHookContext的類,實現其中的run方法,並設置相應的參數,使自定義的hook類生效。

四、Hive表數據血緣實現

表的實現就比較簡單了。

Hive提供了org.apache.hadoop.hive.ql.tools.LineageInfo類,可以用來分析HiveQL中的表級別血緣關係。

public static void query(String[] args) throws IOException, ParseException,
      SemanticException {
    String query = args[0];
    LineageInfo lep = new LineageInfo();
    lep.getLineageInfo(query);
    for (String tab : lep.getInputTableList()) {
      System.out.println("InputTable=" + tab);
    }
    for (String tab : lep.getOutputTableList()) {
      System.out.println("OutputTable=" + tab);
    }
  }

如果我們調用該方法

import` `org.apache.hadoop.hive.ql.tools.LineageInfo;
public` `class` `LineageInfoTest {
public` `static` `void` `main(String[] args) ``throws` `Exception {
String query = ``"INSERT OVERWRITE TABLE ccc PARTITION (dt='20221109') SELECT z.id AS id,z.name AS name FROM aaa z LEFT JOIN bbb c ON z.id = c.id AND z.dt='20221109' AND c.dt='20221109' ``;
LineageInfo.query(``new` `String[] { query });
}
}

將輸出如下的結果:

InputTable=aaa
InputTable=bbb
OutputTable=ccc

五、Hive欄位數據血緣實現原理

LineageLogger Hook 是Hive2.0版本之後存在的,如果HIVE版本不夠需要升級HIVE版本。

Hive提供了org.apache.hadoop.hive.ql.hooks.LineageLogger類,可以用來分析HiveQL中的欄位級別血緣關係

具體設置如下

<property>
  ``<name>hive.``exec``.post.hooks<``/name``>
  ``<value>org.apache.hadoop.hive.ql.hooks.LineageLogger<``/value``>
<``/property``>

hive.exec.post.hooks參數介紹

執行後置條件。一個用逗號分隔開的實現了org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext介面的java class列表,配置了該參數後,每個hiveQL語句執行後都要執行這個鉤子,默認是空;

hive支援以下四種語句的血緣分析

  • HiveOperation.QUERY
  • HiveOperation.CREATETABLE_AS_SELECT
  • HiveOperation.ALTERVIEW_AS
  • HiveOperation.CREATEVIEW
配置hook輸出
vim ${HIVE_HOME}/conf/hive-log4j2.properties
og4j.logger.org.apache.hadoop.hive.ql.hooks.LineageLogger=INFO

輸出位置在 hive-log4j2.properties 的 property.hive.log.dir 參數

測試輸出

hive> desc test_table;
OK
c1                  	string
c2                  	bigint
c3                  	int

hive> select c1, max(c2) as max_c2 from test_table group by c1;

輸出結果為:

  • vertices:頂點。代表參與DAG的節點元素,vertexType有COLUMN和TABLE兩個值
  • edges:邊。代表DAG的流向,由sources指向targets,edgeType有PROJECTION(投影)和PREDICATE(謂語)兩個值
{
	"edges": [{
		"sources": [2],
		"targets": [0],
		"edgeType": "PROJECTION"
	}, {
		"sources": [3],
		"targets": [1],
		"expression": "max(ods.test_table.c2)",
		"edgeType": "PROJECTION"
	}],
	"vertices": [{
		"id": 0,
		"vertexType": "COLUMN",
		"vertexId": "c1"
	}, {
		"id": 1,
		"vertexType": "COLUMN",
		"vertexId": "max_c2"
	}, {
		"id": 2,
		"vertexType": "COLUMN",
		"vertexId": "ods.test_table.c1"
	}, {
		"id": 3,
		"vertexType": "COLUMN",
		"vertexId": "ods.test_table.c2"
	}]
}

接下來就是將數據血緣存起來,然後進行展示了。

未完待續~

更多數據治理實踐落地相關技術與資料,歡迎關注大數據流動