HBase 系列(七)——HBase 過濾器詳解

  • 2019 年 10 月 3 日
  • 筆記

一、HBase過濾器簡介

Hbase 提供了種類豐富的過濾器(filter)來提高數據處理的效率,用戶可以通過內置或自定義的過濾器來對數據進行過濾,所有的過濾器都在服務端生效,即謂詞下推(predicate push down)。這樣可以保證過濾掉的數據不會被傳送到客戶端,從而減輕網絡傳輸和客戶端處理的壓力。

二、過濾器基礎

2.1 Filter接口和FilterBase抽象類

Filter 接口中定義了過濾器的基本方法,FilterBase 抽象類實現了 Filter 接口。所有內置的過濾器則直接或者間接繼承自 FilterBase 抽象類。用戶只需要將定義好的過濾器通過 setFilter 方法傳遞給 Scanput 的實例即可。

setFilter(Filter filter)
 // Scan 中定義的 setFilter   @Override    public Scan setFilter(Filter filter) {      super.setFilter(filter);      return this;    }
  // Get 中定義的 setFilter   @Override    public Get setFilter(Filter filter) {      super.setFilter(filter);      return this;    }

FilterBase 的所有子類過濾器如下:

說明:上圖基於當前時間點(2019.4)最新的 Hbase-2.1.4 ,下文所有說明均基於此版本。

2.2 過濾器分類

HBase 內置過濾器可以分為三類:分別是比較過濾器,專用過濾器和包裝過濾器。分別在下面的三個小節中做詳細的介紹。

三、比較過濾器

所有比較過濾器均繼承自 CompareFilter。創建一個比較過濾器需要兩個參數,分別是比較運算符比較器實例

 public CompareFilter(final CompareOp compareOp,final ByteArrayComparable comparator) {      this.compareOp = compareOp;      this.comparator = comparator;    }

3.1 比較運算符

  • LESS (<)
  • LESS_OR_EQUAL (<=)
  • EQUAL (=)
  • NOT_EQUAL (!=)
  • GREATER_OR_EQUAL (>=)
  • GREATER (>)
  • NO_OP (排除所有符合條件的值)

比較運算符均定義在枚舉類 CompareOperator

@InterfaceAudience.Public  public enum CompareOperator {    LESS,    LESS_OR_EQUAL,    EQUAL,    NOT_EQUAL,    GREATER_OR_EQUAL,    GREATER,    NO_OP,  }

注意:在 1.x 版本的 HBase 中,比較運算符定義在 CompareFilter.CompareOp 枚舉類中,但在 2.0 之後這個類就被標識為 @deprecated ,並會在 3.0 移除。所以 2.0 之後版本的 HBase 需要使用 CompareOperator 這個枚舉類。

3.2 比較器

所有比較器均繼承自 ByteArrayComparable 抽象類,常用的有以下幾種:

  • BinaryComparator : 使用 Bytes.compareTo(byte [],byte []) 按字典序比較指定的位元組數組。
  • BinaryPrefixComparator : 按字典序與指定的位元組數組進行比較,但只比較到這個位元組數組的長度。
  • RegexStringComparator : 使用給定的正則表達式與指定的位元組數組進行比較。僅支持 EQUALNOT_EQUAL 操作。
  • SubStringComparator : 測試給定的子字符串是否出現在指定的位元組數組中,比較不區分大小寫。僅支持 EQUALNOT_EQUAL 操作。
  • NullComparator :判斷給定的值是否為空。
  • BitComparator :按位進行比較。

BinaryPrefixComparatorBinaryComparator 的區別不是很好理解,這裡舉例說明一下:

在進行 EQUAL 的比較時,如果比較器傳入的是 abcd 的位元組數組,但是待比較數據是 abcdefgh

  • 如果使用的是 BinaryPrefixComparator 比較器,則比較以 abcd 位元組數組的長度為準,即 efgh 不會參與比較,這時候認為 abcdabcdefgh 是滿足 EQUAL 條件的;
  • 如果使用的是 BinaryComparator 比較器,則認為其是不相等的。

3.3 比較過濾器種類

比較過濾器共有五個(Hbase 1.x 版本和 2.x 版本相同),見下圖:

  • RowFilter :基於行鍵來過濾數據;
  • FamilyFilterr :基於列族來過濾數據;
  • QualifierFilterr :基於列限定符(列名)來過濾數據;
  • ValueFilterr :基於單元格 (cell) 的值來過濾數據;
  • DependentColumnFilter :指定一個參考列來過濾其他列的過濾器,過濾的原則是基於參考列的時間戳來進行篩選 。

前四種過濾器的使用方法相同,均只要傳遞比較運算符和運算器實例即可構建,然後通過 setFilter 方法傳遞給 scan

 Filter filter  = new RowFilter(CompareOperator.LESS_OR_EQUAL,                                  new BinaryComparator(Bytes.toBytes("xxx")));    scan.setFilter(filter);    

DependentColumnFilter 的使用稍微複雜一點,這裡單獨做下說明。

3.4 DependentColumnFilter

可以把 DependentColumnFilter 理解為一個 valueFilter 和一個時間戳過濾器的組合DependentColumnFilter 有三個帶參構造器,這裡選擇一個參數最全的進行說明:

DependentColumnFilter(final byte [] family, final byte[] qualifier,                                 final boolean dropDependentColumn, final CompareOperator op,                                 final ByteArrayComparable valueComparator)
  • family :列族
  • qualifier :列限定符(列名)
  • dropDependentColumn :決定參考列是否被包含在返回結果內,為 true 時表示參考列被返回,為 false 時表示被丟棄
  • op :比較運算符
  • valueComparator :比較器

這裡舉例進行說明:

DependentColumnFilter dependentColumnFilter = new DependentColumnFilter(      Bytes.toBytes("student"),      Bytes.toBytes("name"),      false,      CompareOperator.EQUAL,      new BinaryPrefixComparator(Bytes.toBytes("xiaolan")));
  • 首先會去查找 student:name 中值以 xiaolan 開頭的所有數據獲得 參考數據集,這一步等同於 valueFilter 過濾器;

  • 其次再用參考數據集中所有數據的時間戳去檢索其他列,獲得時間戳相同的其他列的數據作為 結果數據集,這一步等同於時間戳過濾器;

  • 最後如果 dropDependentColumn 為 true,則返回 參考數據集+結果數據集,若為 false,則拋棄參考數據集,只返回 結果數據集

四、專用過濾器

專用過濾器通常直接繼承自 FilterBase,適用於範圍更小的篩選規則。

4.1 單列列值過濾器 (SingleColumnValueFilter)

基於某列(參考列)的值決定某行數據是否被過濾。其實例有以下方法:

  • setFilterIfMissing(boolean filterIfMissing) :默認值為 false,即如果該行數據不包含參考列,其依然被包含在最後的結果中;設置為 true 時,則不包含;
  • setLatestVersionOnly(boolean latestVersionOnly) :默認為 true,即只檢索參考列的最新版本數據;設置為 false,則檢索所有版本數據。
SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(                  "student".getBytes(),                  "name".getBytes(),                  CompareOperator.EQUAL,                  new SubstringComparator("xiaolan"));  singleColumnValueFilter.setFilterIfMissing(true);  scan.setFilter(singleColumnValueFilter);

4.2 單列列值排除器 (SingleColumnValueExcludeFilter)

SingleColumnValueExcludeFilter 繼承自上面的 SingleColumnValueFilter,過濾行為與其相反。

4.3 行鍵前綴過濾器 (PrefixFilter)

基於 RowKey 值決定某行數據是否被過濾。

PrefixFilter prefixFilter = new PrefixFilter(Bytes.toBytes("xxx"));  scan.setFilter(prefixFilter);

4.4 列名前綴過濾器 (ColumnPrefixFilter)

基於列限定符(列名)決定某行數據是否被過濾。

ColumnPrefixFilter columnPrefixFilter = new ColumnPrefixFilter(Bytes.toBytes("xxx"));   scan.setFilter(columnPrefixFilter);

4.5 分頁過濾器 (PageFilter)

可以使用這個過濾器實現對結果按行進行分頁,創建 PageFilter 實例的時候需要傳入每頁的行數。

public PageFilter(final long pageSize) {      Preconditions.checkArgument(pageSize >= 0, "must be positive %s", pageSize);      this.pageSize = pageSize;    }

下面的代碼體現了客戶端實現分頁查詢的主要邏輯,這裡對其進行一下解釋說明:

客戶端進行分頁查詢,需要傳遞 startRow(起始 RowKey),知道起始 startRow 後,就可以返回對應的 pageSize 行數據。這裡唯一的問題就是,對於第一次查詢,顯然 startRow 就是表格的第一行數據,但是之後第二次、第三次查詢我們並不知道 startRow,只能知道上一次查詢的最後一條數據的 RowKey(簡單稱之為 lastRow)。

我們不能將 lastRow 作為新一次查詢的 startRow 傳入,因為 scan 的查詢區間是[startRow,endRow) ,即前開後閉區間,這樣 startRow 在新的查詢也會被返回,這條數據就重複了。

同時在不使用第三方數據庫存儲 RowKey 的情況下,我們是無法通過知道 lastRow 的下一個 RowKey 的,因為 RowKey 的設計可能是連續的也有可能是不連續的。

由於 Hbase 的 RowKey 是按照字典序進行排序的。這種情況下,就可以在 lastRow 後面加上 0 ,作為 startRow 傳入,因為按照字典序的規則,某個值加上 0 後的新值,在字典序上一定是這個值的下一個值,對於 HBase 來說下一個 RowKey 在字典序上一定也是等於或者大於這個新值的。

所以最後傳入 lastRow+0,如果等於這個值的 RowKey 存在就從這個值開始 scan,否則從字典序的下一個 RowKey 開始 scan。

25 個字母以及數字字符,字典排序如下:

'0' < '1' < '2' < ... < '9' < 'a' < 'b' < ... < 'z'

分頁查詢主要實現邏輯:

byte[] POSTFIX = new byte[] { 0x00 };  Filter filter = new PageFilter(15);    int totalRows = 0;  byte[] lastRow = null;  while (true) {      Scan scan = new Scan();      scan.setFilter(filter);      if (lastRow != null) {          // 如果不是首行 則 lastRow + 0          byte[] startRow = Bytes.add(lastRow, POSTFIX);          System.out.println("start row: " +                             Bytes.toStringBinary(startRow));          scan.withStartRow(startRow);      }      ResultScanner scanner = table.getScanner(scan);      int localRows = 0;      Result result;      while ((result = scanner.next()) != null) {          System.out.println(localRows++ + ": " + result);          totalRows++;          lastRow = result.getRow();      }      scanner.close();      //最後一頁,查詢結束      if (localRows == 0) break;  }  System.out.println("total rows: " + totalRows);

需要注意的是在多台 Regin Services 上執行分頁過濾的時候,由於並行執行的過濾器不能共享它們的狀態和邊界,所以有可能每個過濾器都會在完成掃描前獲取了 PageCount 行的結果,這種情況下會返回比分頁條數更多的數據,分頁過濾器就有失效的可能。

4.6 時間戳過濾器 (TimestampsFilter)

List<Long> list = new ArrayList<>();  list.add(1554975573000L);  TimestampsFilter timestampsFilter = new TimestampsFilter(list);  scan.setFilter(timestampsFilter);

4.7 首次行鍵過濾器 (FirstKeyOnlyFilter)

FirstKeyOnlyFilter 只掃描每行的第一列,掃描完第一列後就結束對當前行的掃描,並跳轉到下一行。相比於全表掃描,其性能更好,通常用於行數統計的場景,因為如果某一行存在,則行中必然至少有一列。

FirstKeyOnlyFilter firstKeyOnlyFilter = new FirstKeyOnlyFilter();  scan.set(firstKeyOnlyFilter);

五、包裝過濾器

包裝過濾器就是通過包裝其他過濾器以實現某些拓展的功能。

5.1 SkipFilter過濾器

SkipFilter 包裝一個過濾器,當被包裝的過濾器遇到一個需要過濾的 KeyValue 實例時,則拓展過濾整行數據。下面是一個使用示例:

// 定義 ValueFilter 過濾器  Filter filter1 = new ValueFilter(CompareOperator.NOT_EQUAL,        new BinaryComparator(Bytes.toBytes("xxx")));  // 使用 SkipFilter 進行包裝  Filter filter2 = new SkipFilter(filter1);

5.2 WhileMatchFilter過濾器

WhileMatchFilter 包裝一個過濾器,當被包裝的過濾器遇到一個需要過濾的 KeyValue 實例時,WhileMatchFilter 則結束本次掃描,返回已經掃描到的結果。下面是其使用示例:

Filter filter1 = new RowFilter(CompareOperator.NOT_EQUAL,                                 new BinaryComparator(Bytes.toBytes("rowKey4")));    Scan scan = new Scan();  scan.setFilter(filter1);  ResultScanner scanner1 = table.getScanner(scan);  for (Result result : scanner1) {      for (Cell cell : result.listCells()) {          System.out.println(cell);      }  }  scanner1.close();    System.out.println("--------------------");    // 使用 WhileMatchFilter 進行包裝  Filter filter2 = new WhileMatchFilter(filter1);    scan.setFilter(filter2);  ResultScanner scanner2 = table.getScanner(scan);  for (Result result : scanner1) {      for (Cell cell : result.listCells()) {          System.out.println(cell);      }  }  scanner2.close();
rowKey0/student:name/1555035006994/Put/vlen=8/seqid=0  rowKey1/student:name/1555035007019/Put/vlen=8/seqid=0  rowKey2/student:name/1555035007025/Put/vlen=8/seqid=0  rowKey3/student:name/1555035007037/Put/vlen=8/seqid=0  rowKey5/student:name/1555035007051/Put/vlen=8/seqid=0  rowKey6/student:name/1555035007057/Put/vlen=8/seqid=0  rowKey7/student:name/1555035007062/Put/vlen=8/seqid=0  rowKey8/student:name/1555035007068/Put/vlen=8/seqid=0  rowKey9/student:name/1555035007073/Put/vlen=8/seqid=0  --------------------  rowKey0/student:name/1555035006994/Put/vlen=8/seqid=0  rowKey1/student:name/1555035007019/Put/vlen=8/seqid=0  rowKey2/student:name/1555035007025/Put/vlen=8/seqid=0  rowKey3/student:name/1555035007037/Put/vlen=8/seqid=0

可以看到被包裝後,只返回了 rowKey4 之前的數據。

六、FilterList

以上都是講解單個過濾器的作用,當需要多個過濾器共同作用於一次查詢的時候,就需要使用 FilterListFilterList 支持通過構造器或者 addFilter 方法傳入多個過濾器。

// 構造器傳入  public FilterList(final Operator operator, final List<Filter> filters)  public FilterList(final List<Filter> filters)  public FilterList(final Filter... filters)    // 方法傳入   public void addFilter(List<Filter> filters)   public void addFilter(Filter filter)

多個過濾器組合的結果由 operator 參數定義 ,其可選參數定義在 Operator 枚舉類中。只有 MUST_PASS_ALLMUST_PASS_ONE 兩個可選的值:

  • MUST_PASS_ALL :相當於 AND,必須所有的過濾器都通過才認為通過;
  • MUST_PASS_ONE :相當於 OR,只有要一個過濾器通過則認為通過。
@InterfaceAudience.Public    public enum Operator {      /** !AND */      MUST_PASS_ALL,      /** !OR */      MUST_PASS_ONE    }

使用示例如下:

List<Filter> filters = new ArrayList<Filter>();    Filter filter1 = new RowFilter(CompareOperator.GREATER_OR_EQUAL,                                 new BinaryComparator(Bytes.toBytes("XXX")));  filters.add(filter1);    Filter filter2 = new RowFilter(CompareOperator.LESS_OR_EQUAL,                                 new BinaryComparator(Bytes.toBytes("YYY")));  filters.add(filter2);    Filter filter3 = new QualifierFilter(CompareOperator.EQUAL,                                       new RegexStringComparator("ZZZ"));  filters.add(filter3);    FilterList filterList = new FilterList(filters);    Scan scan = new Scan();  scan.setFilter(filterList);

參考資料

HBase: The Definitive Guide _> Chapter 4. Client API: Advanced Features

更多大數據系列文章可以參見 GitHub 開源項目大數據入門指南