環境篇:數據同步工具DataX

環境篇:數據同步工具DataX

1 概述

//github.com/alibaba/DataX

  • DataX是什麼?

DataX 是阿里巴巴集團內被廣泛使用的離線數據同步工具/平台,實現包括 MySQL、SQL Server、Oracle、PostgreSQL、HDFS、Hive、HBase、OTS、ODPS 等各種異構數據源之間高效的數據同步功能。

  • 設計理念

為了解決異構數據源同步問題,DataX將複雜的網狀的同步鏈路變成了星型數據鏈路,DataX作為中間傳輸載體負責連接各種數據源。當需要接入一個新的數據源的時候,只需要將此數據源對接到DataX,便能跟已有的數據源做到無縫數據同步。

  • 當前使用現狀

DataX在阿里巴巴集團內被廣泛使用,承擔了所有大數據的離線同步業務,並已持續穩定運行了6年之久。目前每天完成同步8w多道作業,每日傳輸數據量超過300TB。

2 支援數據

類型 數據源 Reader(讀) Writer(寫) 文檔
RDBMS 關係型資料庫 MySQL
Oracle
SQLServer
PostgreSQL
DRDS
達夢
通用RDBMS(支援所有關係型資料庫)
阿里雲數倉數據存儲 ODPS
ADS
OSS
OCS
NoSQL數據存儲 OTS
Hbase0.94
Hbase1.1
MongoDB
Hive
無結構化數據存儲 TxtFile
FTP
HDFS
Elasticsearch

3 架構設計

DataX本身作為離線數據同步框架,採用Framework + plugin架構構建。將數據源讀取和寫入抽象成為Reader/Writer插件,納入到整個同步框架中。

  • Reader:Reader為數據採集模組,負責採集數據源的數據,將數據發送給Framework。
  • Writer: Writer為數據寫入模組,負責不斷向Framework取數據,並將數據寫入到目的端。
  • Framework:Framework用於連接reader和writer,作為兩者的數據傳輸通道,並處理緩衝,流控,並發,數據轉換等核心技術問題。

4 核心架構

DataX 3.0 開源版本支援單機多執行緒模式完成同步作業運行,本小節按一個DataX作業生命周期的時序圖,從整體架構設計非常簡要說明DataX各個模組相互關係。

4.1 核心模組介紹:

  1. DataX完成單個數據同步的作業,我們稱之為Job,DataX接受到一個Job之後,將啟動一個進程來完成整個作業同步過程。DataX Job模組是單個作業的中樞管理節點,承擔了數據清理、子任務切分(將單一作業計算轉化為多個子Task)、TaskGroup管理等功能。
  2. DataXJob啟動後,會根據不同的源端切分策略,將Job切分成多個小的Task(子任務),以便於並發執行。Task便是DataX作業的最小單元,每一個Task都會負責一部分數據的同步工作。
  3. 切分多個Task之後,DataX Job會調用Scheduler模組,根據配置的並發數據量,將拆分成的Task重新組合,組裝成TaskGroup(任務組)。每一個TaskGroup負責以一定的並發運行完畢分配好的所有Task,默認單個任務組的並發數量為5。
  4. 每一個Task都由TaskGroup負責啟動,Task啟動後,會固定啟動Reader—>Channel—>Writer的執行緒來完成任務同步工作。
  5. DataX作業運行起來之後, Job監控並等待多個TaskGroup模組任務完成,等待所有TaskGroup任務完成後Job成功退出。否則,異常退出,進程退出值非0

4.2 DataX調度流程:

舉例來說,用戶提交了一個DataX作業,並且配置了20個並發,目的是將一個100張分表的mysql數據同步到odps裡面。 DataX的調度決策思路是:

  1. DataXJob根據分庫分表切分成了100個Task。
  2. 根據20個並發,DataX計算共需要分配4個TaskGroup。
  3. 4個TaskGroup平分切分好的100個Task,每一個TaskGroup負責以5個並發共計運行25個Task。

5 安裝

#解壓即可
tar -zxvf datax.tar.gz

6 案例

6.1 官方案例

./bin/datax.py job/job.json
  • 結果查看

任務啟動時刻 : 2020-07-30 14:36:00
任務結束時刻 : 2020-07-30 14:36:10
任務總計耗時 : 10s
任務平均流量 : 253.91KB/s
記錄寫入速度 : 10000rec/s
讀出記錄總數 : 100000
讀寫失敗總數 : 0

  • 查閱job.json文件
  1 {
  2     "job": {
  3         "setting": {
  4             "speed": {
  5                 "byte":10485760
  6             },
  7             "errorLimit": {
  8                 "record": 0,
  9                 "percentage": 0.02
 10             }
 11         },
 12         "content": [
 13             {
 14                 "reader": {
 15                     "name": "streamreader",
 16                     "parameter": {
 17                         "column" : [
 18                             {
 19                                 "value": "DataX",
 20                                 "type": "string"
 21                             },
 22                             {
 23                                 "value": 19890604,
 24                                 "type": "long"
 25                             },
 26                             {
 27                                 "value": "1989-06-04 00:00:00",
 28                                 "type": "date"
 29                             },
 30                             {
 31                                 "value": true,
 32                                 "type": "bool"
 33                             },
 34                             {
 35                                 "value": "test",
 36                                 "type": "bytes"
 37                             }
 38                         ],
 39                         "sliceRecordCount": 100000
 40                     }
 41                 },
 42                 "writer": {
 43                     "name": "streamwriter",
 44                     "parameter": {
 45                         "print": false,
 46                         "encoding": "UTF-8"
 47                     }
 48                 }
 49             }
 50         ]
 51     }
 52 }

6.2 從Stream流讀取數據並列印到控制台

可以通過命令查看配置模板

python bin/datax.py -r streamreader -w streamwriter

如上命令會輸出如下資訊,這樣就方便我們去編寫了

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


Please refer to the streamreader document:
     //github.com/alibaba/DataX/blob/master/streamreader/doc/streamreader.md 

Please refer to the streamwriter document:
     //github.com/alibaba/DataX/blob/master/streamwriter/doc/streamwriter.md 
 
Please save the following configuration as a json file and  use
     python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json 
to run the job.

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "streamreader", 
                    "parameter": {
                        "column": [], 
                        "sliceRecordCount": ""
                    }
                }, 
                "writer": {
                    "name": "streamwriter", 
                    "parameter": {
                        "encoding": "", 
                        "print": true
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": ""
            }
        }
    }
}
  • 配置模板

vim job/HelloWorld.json

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "streamreader", 
                    "parameter": {
                        "column": [
                        {
                        	"type":"string",
                        	"value":"HelloWorld"
                        },
                        {
                        	"type":"long",
                        	"value":"2020"                        
                        }
                        ], 
                        "sliceRecordCount": "10"
                    }
                }, 
                "writer": {
                    "name": "streamwriter", 
                    "parameter": {
                        "encoding": "UTF-8", 
                        "print": true
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "2"
            }
        }
    }
}

上述模板配置了列印資訊為HelloWorld和2020,列印10條,並顯示,格式為UTF-8

注意:這裡的Framework(Speed)為channel,其意思為並發,在流中的意思為啟動了2個流去做這個事情,那麼輸出結果就會變為20條

  • 執行
./bin/datax.py job/HelloWorld.json


6.3 Mysql資料庫同步任務到本地

DataX 內部類型 Mysql 數據類型
Long int, tinyint, smallint, mediumint, int, bigint
Double float, double, decimal
String varchar, char, tinytext, text, mediumtext, longtext, year
Date date, datetime, timestamp, time
Boolean bit, bool
Bytes tinyblob, mediumblob, blob, longblob, varbinary
  • mysql測試表創建
CREATE TABLE `user_info` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '編號',
  `login_name` varchar(200) DEFAULT NULL COMMENT '用戶名稱',
  `nick_name` varchar(200) DEFAULT NULL COMMENT '用戶昵稱',
  `passwd` varchar(200) DEFAULT NULL COMMENT '用戶密碼',
  `name` varchar(200) DEFAULT NULL COMMENT '用戶姓名',
  `phone_num` varchar(200) DEFAULT NULL COMMENT '手機號',
  `email` varchar(200) DEFAULT NULL COMMENT '郵箱',
  `head_img` varchar(200) DEFAULT NULL COMMENT '頭像',
  `user_level` varchar(200) DEFAULT NULL COMMENT '用戶級別',
  `birthday` date DEFAULT NULL COMMENT '用戶生日',
  `gender` varchar(1) DEFAULT NULL COMMENT '性別 M男,F女',
  `create_time` datetime DEFAULT NULL COMMENT '創建時間',
  `operate_time` datetime DEFAULT NULL COMMENT '修改時間',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=41 DEFAULT CHARSET=utf8 COMMENT='用戶表';


INSERT INTO `mall`.`user_info`(`id`, `login_name`, `nick_name`, `passwd`, `name`, `phone_num`, `email`, `head_img`, `user_level`, `birthday`, `gender`, `create_time`, `operate_time`) VALUES (1, 'rz3bor', '香香', NULL, '戴香', '13446155565', '[email protected]', NULL, '1', '1990-03-15', 'F', '2020-03-15 00:00:00', '2020-03-20 00:00:00');
INSERT INTO `mall`.`user_info`(`id`, `login_name`, `nick_name`, `passwd`, `name`, `phone_num`, `email`, `head_img`, `user_level`, `birthday`, `gender`, `create_time`, `operate_time`) VALUES (2, 'n332si1jfe45', '玲芬', NULL, '柏鶯', '13531861983', '[email protected]', NULL, '1', '1976-03-15', 'F', '2020-03-15 00:00:00', '2020-03-20 00:00:00');
INSERT INTO `mall`.`user_info`(`id`, `login_name`, `nick_name`, `passwd`, `name`, `phone_num`, `email`, `head_img`, `user_level`, `birthday`, `gender`, `create_time`, `operate_time`) VALUES (3, 'hflupfwh9acg', '詠詠', NULL, '伍寒伊', '13146668529', '[email protected]', NULL, '2', '1997-03-15', 'F', '2020-03-15 00:00:00', '2020-03-20 00:00:00');
INSERT INTO `mall`.`user_info`(`id`, `login_name`, `nick_name`, `passwd`, `name`, `phone_num`, `email`, `head_img`, `user_level`, `birthday`, `gender`, `create_time`, `operate_time`) VALUES (4, 'o8o8cq4c', '昭昭', NULL, '軒轅婉', '13542317252', '[email protected]', NULL, '3', '1965-03-15', 'F', '2020-03-15 00:00:00', '2020-03-20 00:00:00');
INSERT INTO `mall`.`user_info`(`id`, `login_name`, `nick_name`, `passwd`, `name`, `phone_num`, `email`, `head_img`, `user_level`, `birthday`, `gender`, `create_time`, `operate_time`) VALUES (5, 'lqk2g4', '可可', NULL, '安靄', '13899248493', '[email protected]', NULL, '1', '1978-03-15', 'F', '2020-03-15 00:00:00', '2020-03-20 00:00:00');
INSERT INTO `mall`.`user_info`(`id`, `login_name`, `nick_name`, `passwd`, `name`, `phone_num`, `email`, `head_img`, `user_level`, `birthday`, `gender`, `create_time`, `operate_time`) VALUES (6, '866ym3j4b', '蕊薇', NULL, '司馬蕊薇', '13662131286', '[email protected]', NULL, '2', '2000-03-15', 'F', '2020-03-15 00:00:00', '2020-03-20 00:00:00');
INSERT INTO `mall`.`user_info`(`id`, `login_name`, `nick_name`, `passwd`, `name`, `phone_num`, `email`, `head_img`, `user_level`, `birthday`, `gender`, `create_time`, `operate_time`) VALUES (7, 'u8duuusr', '黛青', NULL, '百里涵予', '13742717589', '[email protected]', NULL, '1', '1996-03-15', 'F', '2020-03-15 00:00:00', '2020-03-20 00:00:00');
INSERT INTO `mall`.`user_info`(`id`, `login_name`, `nick_name`, `passwd`, `name`, `phone_num`, `email`, `head_img`, `user_level`, `birthday`, `gender`, `create_time`, `operate_time`) VALUES (8, 'hg9fkdthkv7', '娣娣', NULL, '戴莎錦', '13953633795', '[email protected]', NULL, '3', '2000-03-15', 'F', '2020-03-15 00:00:00', '2020-03-20 00:00:00');
INSERT INTO `mall`.`user_info`(`id`, `login_name`, `nick_name`, `passwd`, `name`, `phone_num`, `email`, `head_img`, `user_level`, `birthday`, `gender`, `create_time`, `operate_time`) VALUES (9, 'goatp7d', '月月', NULL, '戚珊莎', '13513119569', '[email protected]', NULL, '1', '1981-03-15', 'F', '2020-03-15 00:00:00', '2020-03-20 00:00:00');
INSERT INTO `mall`.`user_info`(`id`, `login_name`, `nick_name`, `passwd`, `name`, `phone_num`, `email`, `head_img`, `user_level`, `birthday`, `gender`, `create_time`, `operate_time`) VALUES (10, '6ernv47', '阿謙', NULL, '西門利清', '13176428692', '[email protected]', NULL, '1', '1983-03-15', 'M', '2020-03-15 00:00:00', '2020-03-20 00:00:00');
INSERT INTO `mall`.`user_info`(`id`, `login_name`, `nick_name`, `passwd`, `name`, `phone_num`, `email`, `head_img`, `user_level`, `birthday`, `gender`, `create_time`, `operate_time`) VALUES (11, 'zpo8f6swym8', '嫻瑾', NULL, '臧詠', '13782895459', '[email protected]', NULL, '1', '1979-03-16', 'F', '2020-03-16 00:00:00', '2020-03-20 00:00:00');
INSERT INTO `mall`.`user_info`(`id`, `login_name`, `nick_name`, `passwd`, `name`, `phone_num`, `email`, `head_img`, `user_level`, `birthday`, `gender`, `create_time`, `operate_time`) VALUES (12, '6nnowpp', '凡凡', NULL, '紀姣婉', '13425112725', '[email protected]', NULL, '2', '1978-03-16', 'F', '2020-03-16 00:00:00', '2020-03-20 00:00:00');
INSERT INTO `mall`.`user_info`(`id`, `login_name`, `nick_name`, `passwd`, `name`, `phone_num`, `email`, `head_img`, `user_level`, `birthday`, `gender`, `create_time`, `operate_time`) VALUES (13, 'hhzxmgoo8no', '聰聰', NULL, '淳于妹霞', '13147486822', '[email protected]', NULL, '1', '1990-03-16', 'F', '2020-03-16 00:00:00', '2020-03-20 00:00:00');
INSERT INTO `mall`.`user_info`(`id`, `login_name`, `nick_name`, `passwd`, `name`, `phone_num`, `email`, `head_img`, `user_level`, `birthday`, `gender`, `create_time`, `operate_time`) VALUES (14, 'l2p366773l', '馨馨', NULL, '葛馨', '13159754795', '[email protected]', NULL, '1', '1995-03-16', 'F', '2020-03-16 00:00:00', '2020-03-20 00:00:00');
INSERT INTO `mall`.`user_info`(`id`, `login_name`, `nick_name`, `passwd`, `name`, `phone_num`, `email`, `head_img`, `user_level`, `birthday`, `gender`, `create_time`, `operate_time`) VALUES (15, '1cyd3eop', '凡凡', NULL, '司馬瑤', '13731934554', '[email protected]', NULL, '1', '1980-03-16', 'F', '2020-03-16 00:00:00', '2020-03-20 00:00:00');
INSERT INTO `mall`.`user_info`(`id`, `login_name`, `nick_name`, `passwd`, `name`, `phone_num`, `email`, `head_img`, `user_level`, `birthday`, `gender`, `create_time`, `operate_time`) VALUES (16, 'cn5wnk4utxom', '蓓蓓', NULL, '卞蓓', '13377379335', '[email protected]', NULL, '1', '1976-03-16', 'F', '2020-03-16 00:00:00', '2020-03-20 00:00:00');
INSERT INTO `mall`.`user_info`(`id`, `login_name`, `nick_name`, `passwd`, `name`, `phone_num`, `email`, `head_img`, `user_level`, `birthday`, `gender`, `create_time`, `operate_time`) VALUES (17, 'r7l1wj', '燕彩', NULL, '謝素雲', '13624975852', '[email protected]', NULL, '1', '1989-03-16', 'F', '2020-03-16 00:00:00', '2020-03-20 00:00:00');
INSERT INTO `mall`.`user_info`(`id`, `login_name`, `nick_name`, `passwd`, `name`, `phone_num`, `email`, `head_img`, `user_level`, `birthday`, `gender`, `create_time`, `operate_time`) VALUES (18, '3e8hts7n9do1', '瑞凡', NULL, '計楓', '13417352247', '[email protected]', NULL, '1', '1995-03-16', 'F', '2020-03-16 00:00:00', '2020-03-20 00:00:00');
INSERT INTO `mall`.`user_info`(`id`, `login_name`, `nick_name`, `passwd`, `name`, `phone_num`, `email`, `head_img`, `user_level`, `birthday`, `gender`, `create_time`, `operate_time`) VALUES (19, 'b34feue0x', '萍萍', NULL, '伍英華', '13628754425', '[email protected]', NULL, '3', '2005-03-16', 'F', '2020-03-16 00:00:00', '2020-03-20 00:00:00');
INSERT INTO `mall`.`user_info`(`id`, `login_name`, `nick_name`, `passwd`, `name`, `phone_num`, `email`, `head_img`, `user_level`, `birthday`, `gender`, `create_time`, `operate_time`) VALUES (20, 'sdyk2u501qis', '蕊蕊', NULL, '周冰爽', '13161154156', '[email protected]', NULL, '1', '1985-03-16', 'F', '2020-03-16 00:00:00', '2020-03-20 00:00:00');

  • 模板查看
python bin/datax.py -r mysqlreader -w streamwriter
  • vim job/mysql-local.json
{
    "job": {
        "setting": {
            "speed": {
                 "channel":1
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "root",
                        "connection": [
                            {
                                "querySql": [
                                    "select db_id,on_line_flag from db_info where db_id < 10;"
                                ],
                                "jdbcUrl": [
                                    "jdbc:mysql://127.0.0.1:3306/database"
                                ]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "print": true,
                        "encoding": "UTF-8"
                    }
                }
            }
        ]
    }
}
  • 執行
./bin/datax.py job/mysql-local.json


寫在最後,官網例子相當的全,請參考各數據源讀寫的doc