實戰 | 深入理解 Hive ACID 事務表

  • 2019 年 10 月 6 日
  • 筆記

來源:https://blog.csdn.net/zjerryj/article/details/91470261

作者:薄荷腦

By 暴走大數據

場景描述:Apache Hive 0.13 版本引入了事務特性,能夠在 Hive 表上實現 ACID 語義,包括 INSERT/UPDATE/DELETE/MERGE 語句、增量數據抽取等。Hive 3.0 又對該特性進行了優化,包括改進了底層的文件組織方式,減少了對錶結構的限制,以及支持條件下推和向量化查詢。Hive 事務表的介紹和使用方法可以參考 Hive Wiki 和 各類教程,本文將重點講述 Hive 事務表是如何在 HDFS 上存儲的,及其讀寫過程是怎樣的。

關鍵詞:Hive ACID

文件結構

插入數據

CREATE TABLE employee (id int, name string, salary int)  STORED AS ORC TBLPROPERTIES ('transactional' = 'true');    INSERT INTO employee VALUES  (1, 'Jerry', 5000),  (2, 'Tom',   8000),  (3, 'Kate',  6000);

INSERT 語句會在一個事務中運行。它會創建名為 delta 的目錄,存放事務的信息和表的數據。

/user/hive/warehouse/employee/delta_0000001_0000001_0000  /user/hive/warehouse/employee/delta_0000001_0000001_0000/_orc_acid_version  /user/hive/warehouse/employee/delta_0000001_0000001_0000/bucket_00000

目錄名稱的格式為 delta_minWID_maxWID_stmtID,即 delta 前綴、寫事務的 ID 範圍、以及語句 ID。具體來說:

所有 INSERT 語句都會創建 delta 目錄。UPDATE 語句也會創建 delta 目錄,但會先創建一個 delete 目錄,即先刪除、後插入。delete 目錄的前綴是 delete_delta;

Hive 會為所有的事務生成一個全局唯一的 ID,包括讀操作和寫操作。針對寫事務(INSERT、DELETE 等),Hive 還會創建一個寫事務 ID(Write ID),該 ID 在表範圍內唯一。寫事務 ID 會編碼到 delta 和 delete 目錄的名稱中;

語句 ID(Statement ID)則是當一個事務中有多條寫入語句時使用的,用作唯一標識。

再看文件內容,_orc_acid_version 的內容是 2,即當前 ACID 版本號是 2。它和版本 1 的主要區別是 UPDATE 語句採用了 split-update 特性,即上文提到的先刪除、後插入。這個特性能夠使 ACID 表支持條件下推等功能,具體可以查看 HIVE-14035。bucket_00000 文件則是寫入的數據內容。由於這張表沒有分區和分桶,所以只有這一個文件。事務表都以 ORC 格式存儲的,我們可以使用 orc-tools 來查看文件的內容:

$ orc-tools data bucket_00000  {"operation":0,"originalTransaction":1,"bucket":536870912,"rowId":0,"currentTransaction":1,"row":{"id":1,"name":"Jerry","salary":5000}}  {"operation":0,"originalTransaction":1,"bucket":536870912,"rowId":1,"currentTransaction":1,"row":{"id":2,"name":"Tom","salary":8000}}  {"operation":0,"originalTransaction":1,"bucket":536870912,"rowId":2,"currentTransaction":1,"row":{"id":3,"name":"Kate","salary":6000}}

輸出內容被格式化為了一行行的 JSON 字符串,我們可以看到具體數據是在 row 這個鍵中的,其它鍵則是 Hive 用來實現事務特性所使用的,具體含義為:

  • operation 0 表示插入,1 表示更新,2 表示刪除。由於使用了 split-update,UPDATE 是不會出現的;
  • originalTransaction 是該條記錄的原始寫事務 ID。對於 INSERT 操作,該值和 currentTransaction 是一致的。對於 DELETE,則是該條記錄第一次插入時的寫事務 ID;
  • bucket 是一個 32 位整型,由 BucketCodec 編碼,各個二進制位的含義為:
    • 1-3 位:編碼版本,當前是 001;
    • 4 位:保留;
    • 5-16 位:分桶 ID,由 0 開始。分桶 ID 是由 CLUSTERED BY 子句所指定的字段、以及分桶的數量決定的。該值和 bucket_N 中的 N 一致;
    • 17-20 位:保留;
    • 21-32 位:語句 ID;
    • 舉例來說,整型 536936448 的二進制格式為 00100000000000010000000000000000,即它是按版本 1 的格式編碼的,分桶 ID 為 1;
  • rowId 是一個自增的唯一 ID,在寫事務和分桶的組合中唯一;
  • currentTransaction 當前的寫事務 ID;
  • row 具體數據。對於 DELETE 語句,則為 null

我們可以注意到,文件中的數據會按 (originalTransaction, bucket, rowId) 進行排序,這點對後面的讀取操作非常關鍵。

這些信息還可以通過 row__id 這個虛擬列進行查看:

SELECT row__id, id, name, salary FROM employee;

輸出結果為:

{"writeid":1,"bucketid":536870912,"rowid":0}    1       Jerry   5000  {"writeid":1,"bucketid":536870912,"rowid":1}    2       Tom     8000  {"writeid":1,"bucketid":536870912,"rowid":2}    3       Kate    6000

增量數據抽取 API V2

Hive 3.0 還改進了先前的 增量抽取 API,通過這個 API,用戶或第三方工具(Flume 等)就可以利用 ACID 特性持續不斷地向 Hive 表寫入數據了。這一操作同樣會生成 delta 目錄,但更新和刪除操作不再支持。

StreamingConnection connection = HiveStreamingConnection.newBuilder().connect();  connection.beginTransaction();  connection.write("11,val11,Asia,China".getBytes());  connection.write("12,val12,Asia,India".getBytes());  connection.commitTransaction();  connection.close();

更新數據

UPDATE employee SET salary = 7000 WHERE id = 2;

這條語句會先查詢出所有符合條件的記錄,獲取它們的 row__id 信息,然後分別創建 deletedelta 目錄:

/user/hive/warehouse/employee/delta_0000001_0000001_0000/bucket_00000  /user/hive/warehouse/employee/delete_delta_0000002_0000002_0000/bucket_00000  /user/hive/warehouse/employee/delta_0000002_0000002_0000/bucket_00000

delete_delta_0000002_0000002_0000/bucket_00000 包含了刪除的記錄:

{"operation":2,"originalTransaction":1,"bucket":536870912,"rowId":1,"currentTransaction":2,"row":null}

delta_0000002_0000002_0000/bucket_00000 包含更新後的數據:

{"operation":0,"originalTransaction":2,"bucket":536870912,"rowId":0,"currentTransaction":2,"row":{"id":2,"name":"Tom","salary":7000}}

DELETE 語句的工作方式類似,同樣是先查詢,後生成 delete 目錄。

合併表

MERGE 語句和 MySQL 的 INSERT ON UPDATE 功能類似,它可以將來源表的數據合併到目標表中:

CREATE TABLE employee_update (id int, name string, salary int);  INSERT INTO employee_update VALUES  (2, 'Tom',  7000),  (4, 'Mary', 9000);    MERGE INTO employee AS a  USING employee_update AS b ON a.id = b.id  WHEN MATCHED THEN UPDATE SET salary = b.salary  WHEN NOT MATCHED THEN INSERT VALUES (b.id, b.name, b.salary);

這條語句會更新 Tom 的薪資字段,並插入一條 Mary 的新記錄。多條 WHEN 子句會被視為不同的語句,有各自的語句 ID(Statement ID)。INSERT 子句會創建 delta_0000002_0000002_0000 文件,內容是 Mary 的數據;UPDATE 語句則會創建 delete_delta_0000002_0000002_0001 和 delta_0000002_0000002_0001 兩個文件,刪除並新增 Tom 的數據。

/user/hive/warehouse/employee/delta_0000001_0000001_0000  /user/hive/warehouse/employee/delta_0000002_0000002_0000  /user/hive/warehouse/employee/delete_delta_0000002_0000002_0001  /user/hive/warehouse/employee/delta_0000002_0000002_0001

壓縮

隨着寫操作的積累,表中的 delta 和 delete 文件會越來越多。事務表的讀取過程中需要合併所有文件,數量一多勢必會影響效率。此外,小文件對 HDFS 這樣的文件系統也是不夠友好的。因此,Hive 引入了壓縮(Compaction)的概念,分為 Minor 和 Major 兩類。

Minor Compaction 會將所有的 delta 文件壓縮為一個文件,delete 也壓縮為一個。壓縮後的結果文件名中會包含寫事務 ID 範圍,同時省略掉語句 ID。壓縮過程是在 Hive Metastore 中運行的,會根據一定閾值自動觸發。我們也可以使用如下語句人工觸發:

ALTER TABLE employee COMPACT 'minor';

以上文中的 MERGE 語句的結果舉例,在運行了一次 Minor Compaction 後,文件目錄結構將變為:

/user/hive/warehouse/employee/delete_delta_0000001_0000002  /user/hive/warehouse/employee/delta_0000001_0000002

在 delta_0000001_0000002/bucket_00000 文件中,數據會被排序和合併起來,因此文件中將包含兩行 Tom 的數據。Minor Compaction 不會刪除任何數據。

Major Compaction 則會將所有文件合併為一個文件,以 base_N 的形式命名,其中 N 表示最新的寫事務 ID。已刪除的數據將在這個過程中被剔除。row__id 則按原樣保留。

/user/hive/warehouse/employee/base_0000002

需要注意的是,在 Minor 或 Major Compaction 執行之後,原來的文件不會被立刻刪除。這是因為刪除的動作是在另一個名為 Cleaner 的線程中執行的。因此,表中可能同時存在不同事務 ID 的文件組合,這在讀取過程中需要做特殊處理。

讀取過程

我們可以看到 ACID 事務表中會包含三類文件,分別是 base、delta、以及 delete。文件中的每一行數據都會以 row__id 作為標識並排序。從 ACID 事務表中讀取數據就是對這些文件進行合併,從而得到最新事務的結果。這一過程是在 OrcInputFormat 和 OrcRawRecordMerger 類中實現的,本質上是一個合併排序的算法。

以下列文件為例,產生這些文件的操作為:插入三條記錄,進行一次 Major Compaction,然後更新兩條記錄。1-0-0-1 是對 originalTransaction – bucketId – rowId – currentTransaction 的縮寫。

+----------+    +----------+    +----------+  | base_1   |    | delete_2 |    | delta_2  |  +----------+    +----------+    +----------+  | 1-0-0-1  |    | 1-0-1-2  |    | 2-0-0-2  |  | 1-0-1-1  |    | 1-0-2-2  |    | 2-0-1-2  |  | 1-0-2-1  |    +----------+    +----------+  +----------+

合併過程為:

  • 對所有數據行按照 (originalTransaction, bucketId, rowId) 正序排列,(currentTransaction) 倒序排列,即:
    • 1-0-0-1
    • 1-0-1-2
    • 1-0-1-1
    • 2-0-1-2
  • 獲取第一條記錄;
  • 如果當前記錄的 row__id 和上條數據一樣,則跳過;
  • 如果當前記錄的操作類型為 DELETE,也跳過;
    • 通過以上兩條規則,對於 1-0-1-2 和 1-0-1-1,這條記錄會被跳過;
  • 如果沒有跳過,記錄將被輸出給下游;
  • 重複以上過程。

合併過程是流式的,即 Hive 會將所有文件打開,預讀第一條記錄,並將 row__id 信息存入到 ReaderKey 類型中。該類型實現了 Comparable 接口,因此可以按照上述規則自定義排序:

public class RecordIdentifier implements WritableComparable<RecordIdentifier> {    private long writeId;    private int bucketId;    private long rowId;    protected int compareToInternal(RecordIdentifier other) {      if (other == null) { return -1; }      if (writeId != other.writeId) { return writeId < other.writeId ? -1 : 1; }      if (bucketId != other.bucketId) { return bucketId < other.bucketId ? - 1 : 1; }      if (rowId != other.rowId) { return rowId < other.rowId ? -1 : 1; }      return 0;    }  }    public class ReaderKey extends RecordIdentifier {    private long currentWriteId;    private boolean isDeleteEvent = false;    public int compareTo(RecordIdentifier other) {      int sup = compareToInternal(other);      if (sup == 0) {        if (other.getClass() == ReaderKey.class) {          ReaderKey oth = (ReaderKey) other;          if (currentWriteId != oth.currentWriteId) { return currentWriteId < oth.currentWriteId ? +1 : -1; }          if (isDeleteEvent != oth.isDeleteEvent) { return isDeleteEvent ? -1 : +1; }        } else {          return -1;        }      }      return sup;    }  }

然後,ReaderKey 會和文件句柄一起存入到 TreeMap 結構中。根據該結構的特性,我們每次獲取第一個元素時就能得到排序後的結果,並讀取數據了。

public class OrcRawRecordMerger {    private TreeMap<ReaderKey, ReaderPair> readers = new TreeMap<>();    public boolean next(RecordIdentifier recordIdentifier, OrcStruct prev) {      Map.Entry<ReaderKey, ReaderPair> entry = readers.pollFirstEntry();    }  }

選擇文件

上文中提到,事務表目錄中會同時存在多個事務的快照文件,因此 Hive 首先要選擇出反映了最新事務結果的文件集合,然後再進行合併。舉例來說,下列文件是一系列操作後的結果:兩次插入,一次 Minor Compaction,一次 Major Compaction,一次刪除。

delta_0000001_0000001_0000  delta_0000002_0000002_0000  delta_0000001_0000002  base_0000002  delete_delta_0000003_0000003_0000

過濾過程為:

  • 從 Hive Metastore 中獲取所有成功提交的寫事務 ID 列表;
  • 從文件名中解析出文件類型、寫事務 ID 範圍、以及語句 ID;
  • 選取寫事務 ID 最大且合法的那個 base 目錄,如果存在的話;
  • 對 delta 和 delete 文件進行排序:
    • minWID 較小的優先;
    • 如果 minWID 相等,則 maxWID 較大的優先;
    • 如果都相等,則按 stmtID 排序;沒有 stmtID 的會排在前面;
  • 將 base 文件中的寫事務 ID 作為當前 ID,循環過濾所有 delta 文件:
    • 如果 maxWID 大於當前 ID,則保留這個文件,並以此更新當前 ID;
    • 如果 ID 範圍相同,也會保留這個文件;
    • 重複上述步驟。

過濾過程中還會處理一些特別的情況,如沒有 base 文件,有多條語句,包含原始文件(即不含 row__id 信息的文件,一般是通過 LOAD DATA 導入的),以及 ACID 版本 1 格式的文件等。具體可以參考 AcidUtils#getAcidState 方法。

並行執行

在 Map-Reduce 模式下運行 Hive 時,多個 Mapper 是並行執行的,這就需要將 delta 文件按一定的規則組織好。簡單來說,base 和 delta 文件會被分配到不同的分片(Split)中,但所有分片都需要能夠讀取所有的 delete 文件,從而根據它們忽略掉已刪除的記錄。

向量化查詢

當 向量化查詢 特性開啟時,Hive 會嘗試將所有的 delete 文件讀入內存,並維護一個特定的數據結構,能夠快速地對數據進行過濾。如果內存放不下,則會像上文提到的過程一樣,逐步讀取 delete 文件,使用合併排序的算法進行過濾。

public class VectorizedOrcAcidRowBatchReader {    private final DeleteEventRegistry deleteEventRegistry;      protected static interface DeleteEventRegistry {      public void findDeletedRecords(ColumnVector[] cols, int size, BitSet selectedBitSet);    }    static class ColumnizedDeleteEventRegistry implements DeleteEventRegistry {}    static class SortMergedDeleteEventRegistry implements DeleteEventRegistry {}      public boolean next(NullWritable key, VectorizedRowBatch value) {      BitSet selectedBitSet = new BitSet(vectorizedRowBatchBase.size);      this.deleteEventRegistry.findDeletedRecords(innerRecordIdColumnVector,          vectorizedRowBatchBase.size, selectedBitSet);      for (int setBitIndex = selectedBitSet.nextSetBit(0), selectedItr = 0;          setBitIndex >= 0;          setBitIndex = selectedBitSet.nextSetBit(setBitIndex+1), ++selectedItr) {        value.selected[selectedItr] = setBitIndex;      }    }  }

事務管理

為了實現 ACID 事務機制,Hive 還引入了新的事務管理器 DbTxnManager,它能夠在查詢計劃中分辨出 ACID 事務表,聯繫 Hive Metastore 打開新的事務,完成後提交事務。它也同時實現了過去的讀寫鎖機制,用來支持非事務表的情形。

Hive Metastore 負責分配新的事務 ID。這一過程是在一個數據庫事務中完成的,從而避免多個 Metastore 實例衝突的情況。

abstract class TxnHandler {    private List<Long> openTxns(Connection dbConn, Statement stmt, OpenTxnRequest rqst) {      String s = sqlGenerator.addForUpdateClause("select ntxn_next from NEXT_TXN_ID");      s = "update NEXT_TXN_ID set ntxn_next = " + (first + numTxns);      for (long i = first; i < first + numTxns; i++) {        txnIds.add(i);        rows.add(i + "," + quoteChar(TXN_OPEN) + "," + now + "," + now + ","            + quoteString(rqst.getUser()) + "," + quoteString(rqst.getHostname()) + "," + txnType.getValue());      }      List<String> queries = sqlGenerator.createInsertValuesStmt(          "TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, txn_user, txn_host, txn_type)", rows);    }  }

參考資料

  • Hive Transactions
  • Transactional Operations in Apache Hive
  • ORCFile ACID Support