全量、增量數據在HBase遷移的多種技巧實踐

作者經歷了多次基於HBase實現全量與增量數據的遷移測試,總結了在使用HBase進行數據遷移的多種實踐,本文針對全量與增量數據遷移的場景不同,提供了1+2的技巧分享。

HBase全量與增量數據遷移的方法

1.背景

在HBase使用過程中,使用的HBase集群經常會因為某些原因需要數據遷移。大多數情況下,可以用離線的方式進行遷移,遷移離線數據的方式就比較容易了,將整個hbase的data存儲目錄進行搬遷就行,但是當集群數據量比較多的時候,文件拷貝的時間很長,對業務影響時間也比較長,往往在設計的時間窗口無法完成,本文給出一種遷移思路,可以利用HBase自身的功能,對集群進行遷移,減少集群業務中斷時間

2.簡介

大家都知道HBase有snapshot快照的功能,利用快照可以記錄某個時間點表的數據將其保存快照,在需要的時候可以將表數據恢復到打快照時間時的樣子。我們利用hbase的snapshot可以導出某個時間點的全量數據。

因為實際的業務還在不停的寫入表中,除了遷移快照時間點之前的全量數據,我們還需要將快照時間點後源源不斷的增量數據也遷移走,這裡如果能採用雙寫的方式,將數據寫入兩個集群就好了,但是現實的業務不會這樣做,如果這樣做還得保證雙寫的事務一致性。於是可以利用HBase的replication功能,replication功能本身就是保留了源集群的WAL日誌記錄,去回放寫入到目的集群,這樣一來用戶業務端->原始集群->目的集群便是個串形的數據流,且由HBase來保證數據的正確性。

所以這個遷移的方法就是利用snapshot遷移全量數據,利用replication遷移增量數據。

3.遷移步驟

上圖給出了遷移的整個時間線流程,主要有這麼5個時間點。

T0: 配置好老集群A集群到新集群B的Replication關係,Replication的數據由A集群同步到集群B,將表設置成同步,從此刻開始新寫入A集群表的數據會保留在WAL日誌中;

T1: 生成該時間點的全量數據,通過創建快照,以及導出快照數據的方式將該時間點的數據導出到新集群B;

T2: 新集群B將T1時刻的快照數據導入,此時新集群B中會由快照創建出表,此時老集群A集群上設置的Replication的關係會自動開始將T0時刻保留的WAL日誌回放至新集群B的表中,開始增量數據同步。

T3: 由於從T0-T3之間的操作會花費一段時間,此時會積累很多WAL日誌文件,需要一定的時間來同步至新集群,這裡需要去監控一下數據同步情況,等老集群WAL被逐漸消費完,此時可以將老集群的寫業務停止一下並準備將讀寫業務全部切到新集群B。

T4: T3-T4之間應該是個很短的時間,整個遷移也只有這個時間點會有一定中斷,此時是將業務完全切到新集群B,至此遷移完成。

4.操作涉及的命令

一、設置集群A和集群B的peer關係

在源集群hbase shell中, 設定peer

add_peer ‘peer_name’,’ClusterB:2181:/hbase’

二、在集群A的表中設置replication屬性

假設目標表名為Student,先獲取Family=f

進入hbase shell中,

alter ‘Student’,{NAME => ‘f’,REPLICATION_SCOPE => ‘1’}

三、給集群A的表創建快照

在hbase shell中

snapshot ‘Student’,’Student_table_snapshot’

四、在A集群中導出快照

hbase org.apache.hadoop.hbase.snapshot.ExportSnapshot -snapshot Student_table_snapshot -copy-to /snapshot-backup/Student

五、將快照數據放置到集群B的對應的目錄下

上面命令會導出2個目錄,一個是快照元數據,一個是原始數據

將元數據放到/hbase/.hbase-snapshot中,將原始數據放到/hbase/archive目錄中

由於hbase的archive目錄會有個定時清理,這裡可以提前將集群B的master的hbase.master.cleaner.interval值設置大點,避免拷貝過程中發生碰巧發生了數據清理。

如果集群B中沒有對應的目錄,可以提前創建

hdfs dfs -mkdir -p /hbase/.hbase-snapshot
hdfs dfs -mkdir -p /hbase/archive/data/default/

移動導出的snapshot文件到snapshot目錄

hdfs dfs -mv /snapshot-backup/Student/.hbase-snapshot/Student_table_snapshot /hbase/.hbase-snapshot/
hdfs dfs -mv /snapshot-backup/Student/archive/data/default/Student /hbase/archive/data/default

六、在新集群B中恢復表的快照

進入hbase shell

restore_snapshot ‘Student_table_snapshot’

恢復完成後,記得將集群B的hmaster中hbase.master.cleaner.interval的值調整回來。

HBase增量數據遷移的方法

1.概覽

本章主要是想談一下如何給HBase做增量數據的遷移,也就是遷移實時數據。上文中提到HBase增量數據遷移可以使用Replication的方式去做,但是在實際搬遷時,要給原集群設置Replication可能需要重啟,這樣會影響業務,我們需要做到不停機遷移才行。

2.WAL原理

正常情況下,HBase新增的數據都是有日誌記錄的,數據在落盤成HFile之前,任何一個Put和Delete操作都是記錄日誌並存放在WALs目錄中,日誌中包含了所有已經寫入Memstore但還未Flush到HFile的更改(edits)。

默認情況下每個RegionServer只會寫一個日誌文件,該RS管理的所有region都在向這一個日誌文件寫入Put和Delete記錄,直到日誌文件大小達到128MB(由hbase.regionserver.hlog.blocksize設置)後roll出一個新的日誌文件,總共可以roll出32個日誌文件(由hbase.regionserver.maxlogs設置)。

如果日誌文件未寫滿128MB,RegionServer間隔1小時也會roll出新一個新日誌文件(由hbase.regionserver.logroll.period設置)。

當日誌文件中涉及的所有region的記錄都flush成HFile後,這個日誌文件就會轉移至oldWals目錄下歸檔, Master沒間隔10分鐘(hbase.master.cleaner.interval)會檢查oldWALs目錄下的過期日誌文件,當文件過期時會被Master清理掉,(日誌過期時間由hbase.master.logcleaner.ttl控制)。

RegionServer默認間隔1小時(由hbase.regionserver.optionalcacheflushinterval設置)會對它管理的region做一次flush動作,所以WALs目錄中一直會有新的日誌文件生成,並伴隨着老的日誌文件移動到oldWALs目錄中。

3.遷移方式

一、遷移oldWALs目錄中的文件,使用WALPlayer回放

由於日誌文件文件最終移動到oldWALs目錄下,只需要寫個腳本,定時檢查oldWALs目錄下是否有新文件生成,如果有文件,則move至其他目錄,並使用WALPlayer工具對這個目錄進行回放。

優點:無代碼開發量,僅需腳本實現

缺點:無法做到實時,因為從數據寫入到最後到達oldWAL目錄會間隔很長時間。

二、開發獨立工具,解析日誌文件,寫入目的集群

在網上查找遷移方法的時候了解到了阿里開發了一個專門的HBase遷移工具,可以實現不停機。通過閱讀其設計BDS – HBase數據遷移同步方案的設計與實踐了解到阿里開發了應用去讀取HBase的WAL日誌文件並回放數據至目的集群。

優點:可以做到實時;

缺點:需要一定的代碼開發量;

要做出這樣一個工具,需要了解上面說的WAL文件歸檔的原理以及日誌回放工具WALPlayer,下面簡單說一下可以怎麼去實現。

獨立工具實現

這裡簡單說明下如何去做這樣一個工具,只介紹讀取WAL方面,任務編排就不描述了:

1、定時掃描WALs目錄獲取所有的日誌文件,這裡按ServerName去分組獲取,每個分組內根據WAL文件上的時間戳排序:

● 獲取所有RS的ServerName

ClusterStatus clusterStatus = admin.getClusterStatus();
Collection<ServerName> serverNames = clusterStatus.getServers();

● 根據ServerName去組成Path獲取日誌

Path rsWalPath = new Path(walPath, serverName.getServerName());
List<FileStatus> hlogs = getFiles(fs, rsWalPath, Long.MIN_VALUE, Long.MAX_VALUE);

● getFiles()參考HBase源碼中WALInputFormat.java中的實現,可以指定時間範圍去取日誌文件

private List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, long endTime)
    throws IOException {
  List<FileStatus> result = new ArrayList<FileStatus>();
  LOG.debug("Scanning " + dir.toString() + " for WAL files");

  FileStatus[] files = fs.listStatus(dir);
  if (files == null) return Collections.emptyList();
  for (FileStatus file : files) {
    if (file.isDirectory()) {
      // recurse into sub directories
      result.addAll(getFiles(fs, file.getPath(), startTime, endTime));
    } else {
      String name = file.getPath().toString();
      int idx = name.lastIndexOf('.');
      if (idx > 0) {
        try {
          long fileStartTime = Long.parseLong(name.substring(idx+1));
          if (fileStartTime <= endTime) {
            LOG.info("Found: " + name);
            result.add(file);
          }
        } catch (NumberFormatException x) {
          idx = 0;
        }
      }
      if (idx == 0) {
        LOG.warn("File " + name + " does not appear to be an WAL file. Skipping...");
      }
    }
  }
  return result;
}

2、對於取到的每一個WAL文件,當做一個任務Task執行遷移,這個task主要有下面一些步驟:

● 使用WALFactory為每個日誌文件創建一個Reader

WAL.Reader walReader = WALFactory.createReader(fileSystem, curWalPath, conf);

● 通過Reader去讀取key和edit,並記錄下position,為了加快寫入速度,這裡可以優化為讀取多個entry

WAL.Entry entry = walReader.next();
WALKey walKey = entry.getKey();
WALEdit walEdit = entry.getEdit();
long curPos = reader.getPosition();

● 記錄position的目的是為了Reader的reset以及seek,因為這個原始WAL文件還正在寫入的時候,我們的Reader速度很可能大於原WAL的寫入速度,當Reader讀到底的時候,需要等待一段時間reset然後再重新讀取entry

WAL.Entry nextEntry = reader.next();
if (nextEntry == null) {
    LOG.info("Next entry is null, sleep 10000ms.");
    Thread.sleep(5000);
    curPos = reader.getPosition();
    reader.reset();
    reader.seek(curPos);
    continue;
}

● 在讀取WAL的過程中很可能會遇到日誌轉移到oldWALs目錄下,這個時候捕獲到FileNotFoundException時,需要重新生成一個oldWALs目錄下Reader,然後設置curPos繼續讀取文件,這個時候如果再次讀取到文件最後的時候,就可以關閉Reader了,因為oldWALs中的日誌文件是固定大小的,不會再有追加數據。

這裡需要注意的是這個參數hbase.master.logcleaner.ttl不能設置過小,否則會出現這個在oldWALs目錄下的日誌文件還沒讀取完被清理掉了。

Path oldWALPath = new Path(oldWalPath, walFileName);
WAL.Reader reader = WALFactory.createReader(fileSystem, oldWALPath, conf);
reader.seek(curPos)

● 根據通過WAL.Reader可以讀取到walKey,walEdit進而解析出Cell並寫入目的集群,這個可以參考WALPlay的map()方法。