實戰 | 深入理解 Hive ACID 事務表
- 2019 年 10 月 6 日
- 筆記
來源:https://blog.csdn.net/zjerryj/article/details/91470261
作者:薄荷腦
By 暴走大數據
場景描述:Apache Hive 0.13 版本引入了事務特性,能夠在 Hive 表上實現 ACID 語義,包括 INSERT/UPDATE/DELETE/MERGE 語句、增量數據抽取等。Hive 3.0 又對該特性進行了優化,包括改進了底層的文件組織方式,減少了對錶結構的限制,以及支援條件下推和向量化查詢。Hive 事務表的介紹和使用方法可以參考 Hive Wiki 和 各類教程,本文將重點講述 Hive 事務表是如何在 HDFS 上存儲的,及其讀寫過程是怎樣的。
關鍵詞:Hive ACID
文件結構
插入數據
CREATE TABLE employee (id int, name string, salary int) STORED AS ORC TBLPROPERTIES ('transactional' = 'true'); INSERT INTO employee VALUES (1, 'Jerry', 5000), (2, 'Tom', 8000), (3, 'Kate', 6000);
INSERT 語句會在一個事務中運行。它會創建名為 delta
的目錄,存放事務的資訊和表的數據。
/user/hive/warehouse/employee/delta_0000001_0000001_0000 /user/hive/warehouse/employee/delta_0000001_0000001_0000/_orc_acid_version /user/hive/warehouse/employee/delta_0000001_0000001_0000/bucket_00000
目錄名稱的格式為 delta_minWID_maxWID_stmtID,即 delta 前綴、寫事務的 ID 範圍、以及語句 ID。具體來說:
所有 INSERT 語句都會創建 delta 目錄。UPDATE 語句也會創建 delta 目錄,但會先創建一個 delete 目錄,即先刪除、後插入。delete 目錄的前綴是 delete_delta;
Hive 會為所有的事務生成一個全局唯一的 ID,包括讀操作和寫操作。針對寫事務(INSERT、DELETE 等),Hive 還會創建一個寫事務 ID(Write ID),該 ID 在表範圍內唯一。寫事務 ID 會編碼到 delta 和 delete 目錄的名稱中;
語句 ID(Statement ID)則是當一個事務中有多條寫入語句時使用的,用作唯一標識。
再看文件內容,_orc_acid_version 的內容是 2,即當前 ACID 版本號是 2。它和版本 1 的主要區別是 UPDATE 語句採用了 split-update 特性,即上文提到的先刪除、後插入。這個特性能夠使 ACID 表支援條件下推等功能,具體可以查看 HIVE-14035。bucket_00000 文件則是寫入的數據內容。由於這張表沒有分區和分桶,所以只有這一個文件。事務表都以 ORC 格式存儲的,我們可以使用 orc-tools 來查看文件的內容:
$ orc-tools data bucket_00000 {"operation":0,"originalTransaction":1,"bucket":536870912,"rowId":0,"currentTransaction":1,"row":{"id":1,"name":"Jerry","salary":5000}} {"operation":0,"originalTransaction":1,"bucket":536870912,"rowId":1,"currentTransaction":1,"row":{"id":2,"name":"Tom","salary":8000}} {"operation":0,"originalTransaction":1,"bucket":536870912,"rowId":2,"currentTransaction":1,"row":{"id":3,"name":"Kate","salary":6000}}
輸出內容被格式化為了一行行的 JSON 字元串,我們可以看到具體數據是在 row
這個鍵中的,其它鍵則是 Hive 用來實現事務特性所使用的,具體含義為:
- operation 0 表示插入,1 表示更新,2 表示刪除。由於使用了 split-update,UPDATE 是不會出現的;
- originalTransaction 是該條記錄的原始寫事務 ID。對於 INSERT 操作,該值和 currentTransaction 是一致的。對於 DELETE,則是該條記錄第一次插入時的寫事務 ID;
- bucket 是一個 32 位整型,由 BucketCodec 編碼,各個二進位位的含義為:
- 1-3 位:編碼版本,當前是 001;
- 4 位:保留;
- 5-16 位:分桶 ID,由 0 開始。分桶 ID 是由 CLUSTERED BY 子句所指定的欄位、以及分桶的數量決定的。該值和 bucket_N 中的 N 一致;
- 17-20 位:保留;
- 21-32 位:語句 ID;
- 舉例來說,整型 536936448 的二進位格式為 00100000000000010000000000000000,即它是按版本 1 的格式編碼的,分桶 ID 為 1;
rowId
是一個自增的唯一 ID,在寫事務和分桶的組合中唯一;currentTransaction
當前的寫事務 ID;row
具體數據。對於 DELETE 語句,則為null
。
我們可以注意到,文件中的數據會按 (originalTransaction
, bucket
, rowId
) 進行排序,這點對後面的讀取操作非常關鍵。
這些資訊還可以通過 row__id
這個虛擬列進行查看:
SELECT row__id, id, name, salary FROM employee;
輸出結果為:
{"writeid":1,"bucketid":536870912,"rowid":0} 1 Jerry 5000 {"writeid":1,"bucketid":536870912,"rowid":1} 2 Tom 8000 {"writeid":1,"bucketid":536870912,"rowid":2} 3 Kate 6000
增量數據抽取 API V2
Hive 3.0 還改進了先前的 增量抽取 API,通過這個 API,用戶或第三方工具(Flume 等)就可以利用 ACID 特性持續不斷地向 Hive 表寫入數據了。這一操作同樣會生成 delta
目錄,但更新和刪除操作不再支援。
StreamingConnection connection = HiveStreamingConnection.newBuilder().connect(); connection.beginTransaction(); connection.write("11,val11,Asia,China".getBytes()); connection.write("12,val12,Asia,India".getBytes()); connection.commitTransaction(); connection.close();
更新數據
UPDATE employee SET salary = 7000 WHERE id = 2;
這條語句會先查詢出所有符合條件的記錄,獲取它們的 row__id
資訊,然後分別創建 delete
和 delta
目錄:
/user/hive/warehouse/employee/delta_0000001_0000001_0000/bucket_00000 /user/hive/warehouse/employee/delete_delta_0000002_0000002_0000/bucket_00000 /user/hive/warehouse/employee/delta_0000002_0000002_0000/bucket_00000
delete_delta_0000002_0000002_0000/bucket_00000
包含了刪除的記錄:
{"operation":2,"originalTransaction":1,"bucket":536870912,"rowId":1,"currentTransaction":2,"row":null}
delta_0000002_0000002_0000/bucket_00000
包含更新後的數據:
{"operation":0,"originalTransaction":2,"bucket":536870912,"rowId":0,"currentTransaction":2,"row":{"id":2,"name":"Tom","salary":7000}}
DELETE 語句的工作方式類似,同樣是先查詢,後生成 delete
目錄。
合併表
MERGE 語句和 MySQL 的 INSERT ON UPDATE 功能類似,它可以將來源表的數據合併到目標表中:
CREATE TABLE employee_update (id int, name string, salary int); INSERT INTO employee_update VALUES (2, 'Tom', 7000), (4, 'Mary', 9000); MERGE INTO employee AS a USING employee_update AS b ON a.id = b.id WHEN MATCHED THEN UPDATE SET salary = b.salary WHEN NOT MATCHED THEN INSERT VALUES (b.id, b.name, b.salary);
這條語句會更新 Tom 的薪資欄位,並插入一條 Mary 的新記錄。多條 WHEN 子句會被視為不同的語句,有各自的語句 ID(Statement ID)。INSERT 子句會創建 delta_0000002_0000002_0000 文件,內容是 Mary 的數據;UPDATE 語句則會創建 delete_delta_0000002_0000002_0001 和 delta_0000002_0000002_0001 兩個文件,刪除並新增 Tom 的數據。
/user/hive/warehouse/employee/delta_0000001_0000001_0000 /user/hive/warehouse/employee/delta_0000002_0000002_0000 /user/hive/warehouse/employee/delete_delta_0000002_0000002_0001 /user/hive/warehouse/employee/delta_0000002_0000002_0001
壓縮
隨著寫操作的積累,表中的 delta 和 delete 文件會越來越多。事務表的讀取過程中需要合併所有文件,數量一多勢必會影響效率。此外,小文件對 HDFS 這樣的文件系統也是不夠友好的。因此,Hive 引入了壓縮(Compaction)的概念,分為 Minor 和 Major 兩類。
Minor Compaction 會將所有的 delta 文件壓縮為一個文件,delete 也壓縮為一個。壓縮後的結果文件名中會包含寫事務 ID 範圍,同時省略掉語句 ID。壓縮過程是在 Hive Metastore 中運行的,會根據一定閾值自動觸發。我們也可以使用如下語句人工觸發:
ALTER TABLE employee COMPACT 'minor';
以上文中的 MERGE 語句的結果舉例,在運行了一次 Minor Compaction 後,文件目錄結構將變為:
/user/hive/warehouse/employee/delete_delta_0000001_0000002 /user/hive/warehouse/employee/delta_0000001_0000002
在 delta_0000001_0000002/bucket_00000 文件中,數據會被排序和合併起來,因此文件中將包含兩行 Tom 的數據。Minor Compaction 不會刪除任何數據。
Major Compaction 則會將所有文件合併為一個文件,以 base_N 的形式命名,其中 N 表示最新的寫事務 ID。已刪除的數據將在這個過程中被剔除。row__id 則按原樣保留。
/user/hive/warehouse/employee/base_0000002
需要注意的是,在 Minor 或 Major Compaction 執行之後,原來的文件不會被立刻刪除。這是因為刪除的動作是在另一個名為 Cleaner 的執行緒中執行的。因此,表中可能同時存在不同事務 ID 的文件組合,這在讀取過程中需要做特殊處理。
讀取過程
我們可以看到 ACID 事務表中會包含三類文件,分別是 base、delta、以及 delete。文件中的每一行數據都會以 row__id 作為標識並排序。從 ACID 事務表中讀取數據就是對這些文件進行合併,從而得到最新事務的結果。這一過程是在 OrcInputFormat 和 OrcRawRecordMerger 類中實現的,本質上是一個合併排序的演算法。
以下列文件為例,產生這些文件的操作為:插入三條記錄,進行一次 Major Compaction,然後更新兩條記錄。1-0-0-1 是對 originalTransaction – bucketId – rowId – currentTransaction 的縮寫。
+----------+ +----------+ +----------+ | base_1 | | delete_2 | | delta_2 | +----------+ +----------+ +----------+ | 1-0-0-1 | | 1-0-1-2 | | 2-0-0-2 | | 1-0-1-1 | | 1-0-2-2 | | 2-0-1-2 | | 1-0-2-1 | +----------+ +----------+ +----------+
合併過程為:
- 對所有數據行按照 (originalTransaction, bucketId, rowId) 正序排列,(currentTransaction) 倒序排列,即:
- 1-0-0-1
- 1-0-1-2
- 1-0-1-1
- …
- 2-0-1-2
- 獲取第一條記錄;
- 如果當前記錄的 row__id 和上條數據一樣,則跳過;
- 如果當前記錄的操作類型為 DELETE,也跳過;
- 通過以上兩條規則,對於 1-0-1-2 和 1-0-1-1,這條記錄會被跳過;
- 如果沒有跳過,記錄將被輸出給下游;
- 重複以上過程。
合併過程是流式的,即 Hive 會將所有文件打開,預讀第一條記錄,並將 row__id 資訊存入到 ReaderKey 類型中。該類型實現了 Comparable 介面,因此可以按照上述規則自定義排序:
public class RecordIdentifier implements WritableComparable<RecordIdentifier> { private long writeId; private int bucketId; private long rowId; protected int compareToInternal(RecordIdentifier other) { if (other == null) { return -1; } if (writeId != other.writeId) { return writeId < other.writeId ? -1 : 1; } if (bucketId != other.bucketId) { return bucketId < other.bucketId ? - 1 : 1; } if (rowId != other.rowId) { return rowId < other.rowId ? -1 : 1; } return 0; } } public class ReaderKey extends RecordIdentifier { private long currentWriteId; private boolean isDeleteEvent = false; public int compareTo(RecordIdentifier other) { int sup = compareToInternal(other); if (sup == 0) { if (other.getClass() == ReaderKey.class) { ReaderKey oth = (ReaderKey) other; if (currentWriteId != oth.currentWriteId) { return currentWriteId < oth.currentWriteId ? +1 : -1; } if (isDeleteEvent != oth.isDeleteEvent) { return isDeleteEvent ? -1 : +1; } } else { return -1; } } return sup; } }
然後,ReaderKey
會和文件句柄一起存入到 TreeMap
結構中。根據該結構的特性,我們每次獲取第一個元素時就能得到排序後的結果,並讀取數據了。
public class OrcRawRecordMerger { private TreeMap<ReaderKey, ReaderPair> readers = new TreeMap<>(); public boolean next(RecordIdentifier recordIdentifier, OrcStruct prev) { Map.Entry<ReaderKey, ReaderPair> entry = readers.pollFirstEntry(); } }
選擇文件
上文中提到,事務表目錄中會同時存在多個事務的快照文件,因此 Hive 首先要選擇出反映了最新事務結果的文件集合,然後再進行合併。舉例來說,下列文件是一系列操作後的結果:兩次插入,一次 Minor Compaction,一次 Major Compaction,一次刪除。
delta_0000001_0000001_0000 delta_0000002_0000002_0000 delta_0000001_0000002 base_0000002 delete_delta_0000003_0000003_0000
過濾過程為:
- 從 Hive Metastore 中獲取所有成功提交的寫事務 ID 列表;
- 從文件名中解析出文件類型、寫事務 ID 範圍、以及語句 ID;
- 選取寫事務 ID 最大且合法的那個 base 目錄,如果存在的話;
- 對 delta 和 delete 文件進行排序:
- minWID 較小的優先;
- 如果 minWID 相等,則 maxWID 較大的優先;
- 如果都相等,則按 stmtID 排序;沒有 stmtID 的會排在前面;
- 將 base 文件中的寫事務 ID 作為當前 ID,循環過濾所有 delta 文件:
- 如果 maxWID 大於當前 ID,則保留這個文件,並以此更新當前 ID;
- 如果 ID 範圍相同,也會保留這個文件;
- 重複上述步驟。
過濾過程中還會處理一些特別的情況,如沒有 base 文件,有多條語句,包含原始文件(即不含 row__id 資訊的文件,一般是通過 LOAD DATA 導入的),以及 ACID 版本 1 格式的文件等。具體可以參考 AcidUtils#getAcidState 方法。
並行執行
在 Map-Reduce 模式下運行 Hive 時,多個 Mapper 是並行執行的,這就需要將 delta 文件按一定的規則組織好。簡單來說,base 和 delta 文件會被分配到不同的分片(Split)中,但所有分片都需要能夠讀取所有的 delete 文件,從而根據它們忽略掉已刪除的記錄。

向量化查詢
當 向量化查詢 特性開啟時,Hive 會嘗試將所有的 delete
文件讀入記憶體,並維護一個特定的數據結構,能夠快速地對數據進行過濾。如果記憶體放不下,則會像上文提到的過程一樣,逐步讀取 delete
文件,使用合併排序的演算法進行過濾。
public class VectorizedOrcAcidRowBatchReader { private final DeleteEventRegistry deleteEventRegistry; protected static interface DeleteEventRegistry { public void findDeletedRecords(ColumnVector[] cols, int size, BitSet selectedBitSet); } static class ColumnizedDeleteEventRegistry implements DeleteEventRegistry {} static class SortMergedDeleteEventRegistry implements DeleteEventRegistry {} public boolean next(NullWritable key, VectorizedRowBatch value) { BitSet selectedBitSet = new BitSet(vectorizedRowBatchBase.size); this.deleteEventRegistry.findDeletedRecords(innerRecordIdColumnVector, vectorizedRowBatchBase.size, selectedBitSet); for (int setBitIndex = selectedBitSet.nextSetBit(0), selectedItr = 0; setBitIndex >= 0; setBitIndex = selectedBitSet.nextSetBit(setBitIndex+1), ++selectedItr) { value.selected[selectedItr] = setBitIndex; } } }
事務管理
為了實現 ACID 事務機制,Hive 還引入了新的事務管理器 DbTxnManager,它能夠在查詢計劃中分辨出 ACID 事務表,聯繫 Hive Metastore 打開新的事務,完成後提交事務。它也同時實現了過去的讀寫鎖機制,用來支援非事務表的情形。
Hive Metastore 負責分配新的事務 ID。這一過程是在一個資料庫事務中完成的,從而避免多個 Metastore 實例衝突的情況。
abstract class TxnHandler { private List<Long> openTxns(Connection dbConn, Statement stmt, OpenTxnRequest rqst) { String s = sqlGenerator.addForUpdateClause("select ntxn_next from NEXT_TXN_ID"); s = "update NEXT_TXN_ID set ntxn_next = " + (first + numTxns); for (long i = first; i < first + numTxns; i++) { txnIds.add(i); rows.add(i + "," + quoteChar(TXN_OPEN) + "," + now + "," + now + "," + quoteString(rqst.getUser()) + "," + quoteString(rqst.getHostname()) + "," + txnType.getValue()); } List<String> queries = sqlGenerator.createInsertValuesStmt( "TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, txn_user, txn_host, txn_type)", rows); } }
參考資料
- Hive Transactions
- Transactional Operations in Apache Hive
- ORCFile ACID Support