Sqoop-1.4.4工具import和export使用詳解

Sqoop可以在HDFS/Hive和關係型資料庫之間進行數據的導入導出,其中主要使用了import和export這兩個工具。這兩個工具非常強大,提供了很多選項幫助我們完成數據的遷移和同步。比如,下面兩個潛在的需求:

  1. 業務數據存放在關係資料庫中,如果數據量達到一定規模後需要對其進行分析或同統計,單純使用關係資料庫可能會成為瓶頸,這時可以將數據從業務資料庫數據導入(import)到Hadoop平台進行離線分析。
  2. 對大規模的數據在Hadoop平台上進行分析以後,可能需要將結果同步到關係資料庫中作為業務的輔助數據,這時候需要將Hadoop平台分析後的數據導出(export)到關係資料庫。

這裡,我們介紹Sqoop完成上述基本應用場景所使用的import和export工具,通過一些簡單的例子來說明這兩個工具是如何做到的。

工具通用選項

import和export工具有些通用的選項,如下表所示:

選項

含義說明

–connect

指定JDBC連接字元串

–connection-manager

指定要使用的連接管理器類

–driver

指定要使用的JDBC驅動類

–hadoop-mapred-home

指定$HADOOP_MAPRED_HOME路徑

–help

列印用法幫助資訊

–password-file

設置用於存放認證的密碼資訊文件的路徑

-P

從控制台讀取輸入的密碼

–password

設置認證密碼

–username

設置認證用戶名

–verbose

列印詳細的運行資訊

–connection-param-file

可選,指定存儲資料庫連接參數的屬性文件

數據導入工具import

import工具,是將HDFS平台外部的結構化存儲系統中的數據導入到Hadoop平台,便於後續分析。我們先看一下import工具的基本選項及其含義,如下表所示:

選項

含義說明

–append

將數據追加到HDFS上一個已存在的數據集上

–as-avrodatafile

將數據導入到Avro數據文件

–as-sequencefile

將數據導入到SequenceFile

–as-textfile

將數據導入到普通文本文件(默認)

–boundary-query

邊界查詢,用於創建分片(InputSplit)

–columns

從表中導出指定的一組列的數據

–delete-target-dir

如果指定目錄存在,則先刪除掉

–direct

使用直接導入模式(優化導入速度)

–direct-split-size

分割輸入stream的位元組大小(在直接導入模式下)

–fetch-size

從資料庫中批量讀取記錄數

–inline-lob-limit

設置內聯的LOB對象的大小

-m,–num-mappers

使用n個map任務並行導入數據

-e,–query

導入的查詢語句

–split-by

指定按照哪個列去分割數據

–table

導入的源表表名

–target-dir

導入HDFS的目標路徑

–warehouse-dir

HDFS存放表的根路徑

–where

指定導出時所使用的查詢條件

-z,–compress

啟用壓縮

–compression-codec

指定Hadoop的codec方式(默認gzip)

–null-string

果指定列為字元串類型,使用指定字元串替換值為null的該類列的值

–null-non-string

如果指定列為非字元串類型,使用指定字元串替換值為null的該類列的值

下面,我們通過實例來說明,在實際中如何使用這些選項。

1

bin/sqoop import –connect jdbc:mysql://10.95.3.49:3306/workflow –table project –username shirdrn -P –hive-import  — –default-character-set=utf-8

將MySQL資料庫workflow中project表的數據導入到Hive表中。

  • 將MySQL資料庫中整個表數據導入到Hive表

1

bin/sqoop import –connect jdbc:mysql://10.95.3.49:3306/workflow –username shirdrn -P –query 'SELECT users.*, tags.tag FROM users JOIN tags ON (users.id = tags.user_id) WHERE $CONDITIONS' –split-by users.id –target-dir /hive/tag_db/user_tags  — –default-character-set=utf-8

這裡,使用了--query選項,不能同時與--table選項使用。而且,變數$CONDITIONS必須在WHERE語句之後,供Sqoop進程運行命令過程中使用。上面的--target-dir指向的其實就是Hive表存儲的數據目錄。

  • 將MySQL資料庫中多表JION後的數據導入到HDFS

1

bin/sqoop job –create your-sync-job — import –connect jdbc:mysql://10.95.3.49:3306/workflow –table project –username shirdrn -P –hive-import –incremental append –check-column id –last-value 1 — –default-character-set=utf-8

這裡,每次運行增量導入到Hive表之前,都要修改--last-value的值,否則Hive表中會出現重複記錄。

  • 將MySQL資料庫中某個表的數據增量同步到Hive表

1

bin/sqoop import –connect jdbc:mysql://10.95.3.49:3306/workflow –username shirdrn –P –table tags –columns 'id,tag' –create-hive-table -target-dir /hive/tag_db/tags -m 1 –hive-table tags –hive-import — –default-character-set=utf-8

我們這裡將MySQL資料庫workflow中tags表的id和tag欄位的值導入到Hive表tag_db.tags。其中--create-hive-table選項會自動創建Hive表,--hive-import選項會將選擇的指定列的數據導入到Hive表。如果在Hive中通過SHOW TABLES無法看到導入的表,可以在conf/hive-site.xml中顯式修改如下配置選項:

1

<property>

然後再重新運行,就能看到了。

  • 將MySQL資料庫中某個表的幾個欄位的數據導入到Hive表

1

sqoop import –connect jdbc:mysql://db.foo.com/corp –table EMPLOYEES –validate –validator org.apache.sqoop.validation.RowCountValidator –validation-threshold org.apache.sqoop.validation.AbsoluteValidationThreshold –validation-failurehandler org.apache.sqoop.validation.AbortOnFailureHandler

上面這個是官方用戶手冊上給出的用法,我們在實際中還沒用過這個,有感興趣的可以驗證嘗試一下。

  • 使用驗證配置選項

數據導出工具export

export工具,是將HDFS平台的數據,導出到外部的結構化存儲系統中,可能會為一些應用系統提供數據支援。我們看一下export工具的基本選項及其含義,如下表所示:

選項

含義說明

–validate

啟用數據副本驗證功能,僅支援單表拷貝,可以指定驗證使用的實現類

–validation-threshold

指定驗證門限所使用的類

–direct

使用直接導出模式(優化速度)

–export-dir

導出過程中HDFS源路徑

-m,–num-mappers

使用n個map任務並行導出

–table

導出的目的表名稱

–call

導出數據調用的指定存儲過程名

–update-key

更新參考的列名稱,多個列名使用逗號分隔

–update-mode

指定更新策略,包括:updateonly(默認)、allowinsert

–input-null-string

使用指定字元串,替換字元串類型值為null的列

–input-null-non-string

使用指定字元串,替換非字元串類型值為null的列

–staging-table

在數據導出到資料庫之前,數據臨時存放的表名稱

–clear-staging-table

清除工作區中臨時存放的數據

–batch

使用批量模式導出

下面,我們通過實例來說明,在實際中如何使用這些選項。這裡,我們主要結合一個實例,講解如何將Hive中的數據導入到MySQL資料庫。 首先,我們準備幾個表,MySQL資料庫為tag_db,裡面有兩個表,定義如下所示:

01

CREATE TABLE tag_db.users (

這兩個表中存儲的是基礎數據,同時對應著Hive中如下兩個表:

01

CREATE TABLE users (

我們首先在上述MySQL的兩個表中插入一些測試數據:

1

INSERT INTO tag_db.users(name) VALUES('jeffery');

然後,使用Sqoop的import工具,將MySQL兩個表中的數據導入到Hive表,執行如下命令行:

1

bin/sqoop import –connect jdbc:mysql://10.95.3.49:3306/tag_db –table users –username shirdrn -P –hive-import — –default-character-set=utf-8

導入成功以後,再在Hive中創建一個用來存儲users和tags關聯後數據的表:

1

CREATE TABLE user_tags (

執行如下HQL語句,將關聯數據插入user_tags表:

1

FROM users u JOIN tags t ON u.id=t.user_id INSERT INTO TABLE user_tags SELECT CONCAT(CAST(u.id AS STRING), CAST(t.id AS STRING)), u.name, t.tag;

將users.id與tags.id拼接的字元串,作為新表的唯一欄位id,name是用戶名,tag是標籤名稱。 再在MySQL中創建一個對應的user_tags表,如下所示:

1

CREATE TABLE tag_db.user_tags (

使用Sqoop的export工具,將Hive表user_tags的數據同步到MySQL表tag_db.user_tags中,執行如下命令行:

1

bin/sqoop export –connect jdbc:mysql://10.95.3.49:3306/tag_db –username shirdrn –P –table user_tags –export-dir /hive/user_tags –input-fields-terminated-by '01' — –default-character-set=utf-8

執行導出成功後,可以在MySQL的tag_db.user_tags表中看到對應的數據。 如果在導出的時候出現類似如下的錯誤:

01

14/02/27 17:59:06 INFO mapred.JobClient: Task Id : attempt_201402260008_0057_m_000001_0, Status : FAILED

通過指定欄位分隔符選項--input-fields-terminated-by,指定Hive中表欄位之間使用的分隔符,供Sqoop讀取解析,就不會報錯了。