一個完整的Mysql到Hbase數據同步項目思想與實戰
- 2019 年 10 月 6 日
- 筆記
一個完整的Mysql到Hbase數據同步項目思想與實戰
0.導語
對於上次文章預告,這次則以項目實戰從後往前進行,先給大家一個直觀的應用,從應用中學習,實踐中學習。
歡迎大家留言,轉發,多多支援!
本次可以學習如下知識:
- MySQL binlog啟用配置與使用
- binlog查看提取方案maxwell
- Kafka基本使用
- Hbase基本使用
- Python操縱Hbase
- binlog->maxwell->kafka->hbase方案
這次的實踐名字為:數據增量同步!
採用方案為:binlog->maxwell->Kafka->Hbase!
1.Mysql binlog
binlog是sever層維護的一種二進位日誌,與innodb引擎中的redo/undo log是完全不同的日誌。
可以簡單的理解該log記錄了sql標中的更新刪除插入等操作記錄。通常應用在數據恢復、備份等場景。
1.1 如何開啟?
開啟binlog
對於我的mysql的配置文件在下面這個文件夾,當然直接編輯my.cnf
也是可以的。
vi /etc/mysql/mysql.conf.d/mysqld.cnf
對配置文件設置如下:

1.2 查看是否啟用
進入mysql客戶端輸入:
show variables like '%log_bin%';

1.3 binlog介紹
我的log存放在var下面的log的mysql下面:

在mysql-bin.index中包含了所有的log文件,比如上述圖就是包含了1與2文件,文件長度超過相應大小就會新開一個log文件,索引遞增,如上面的000001,000002。
1.4 binlog實戰
首先創建一個表:
create table house(id int not null primary key,house int,price int);
向表中插入數據:
insert into loaddb.house(id,house,price) values(1,2,3);
上面提到插入數據後,binlog會更新,那麼我們去查看上面log文件,應該會看到插入操作。
Mysql binlog日誌有ROW,Statement,MiXED三種格式;
set global binlog_format='ROW/STATEMENT/MIXED'
命令行:
show variables like 'binlog_format'

對於mysql5.7的,binlog格式默認為ROW,所以不用修改。
那麼為何要了解binlog格式呢,原因很簡單,我要查看我的binlog日誌,而該日誌為二進位文件,打開後是亂碼的。對於不同的格式,查看方式不一樣!
對於ROW模式生成的sql編碼需要解碼,不能用常規的辦法去生成,需要加上相應的參數,如下程式碼:
sudo /usr/bin/mysqlbinlog mysql-bin.000002 --base64-output=decode-rows -v
使用mysqlbinlog工具查看日誌文件:

2.Kafka
Kafka
是使用Java
開發的應用程式,Kafka
需要運行Zookeeper
,兩者都需要Java
,所以在需要安裝Zookeeper
和Kafka
之前,先安裝Java
環境。
Kafka 是一種分散式的,基於發布 / 訂閱的消息系統。在這裡可以把Kafka理解為生產消費者模式。
2.1 Zookeeper安裝及配置
Zookeeper下載: https://www.apache.org/dyn/closer.cgi/zookeeper/
下載相應的tar.gz文件,然後解壓後移動到/usr/local
下面即可。
配置:
cp zoo_sample.cfg zoo.cfg
重要配置:
# 數據目錄 dataDir=/usr/local/zookeeper/data # 日誌 dataLogDir=/usr/local/zookeeper/logs # 客戶端訪問Zookeeper的埠號 clientPort=2181
如果日誌文件夾logs不存在,記得mkdir創建一下即可。data目錄也是這樣。
最後配置到用戶PATH裡面:
vi ~/.bashrc
系統環境變數
export ZOOKEEPER_HOME=/usr/local/zookeeper export PATH=$ZOOKEEPER_HOME/bin:$PATH
環境變數生效:
source ~/.bashrc
2.2 啟動Zookeeper
直接輸入zkServer.sh start
即可!

2.3 Kafka安裝及配置
Kafka下載地址: http://kafka.apache.org/downloads
同上述安裝,這裡下載.tgz文件,也是解壓後移動到/usr/local即可!
關於配置文件可以直接採用默認的即可。
2.4 啟動Kafka
./bin/kafka-server-start.sh ./config/server.properties
2.5 封裝上述兩個啟動
將Zookeeper與Kafka啟動封裝成一個腳本:
啟動腳本:
#!/bin/bash ./zookeeper/bin/zkServer.sh start ./kafka/bin/kafka-server-start.sh ./kafka/config/server.properties
關閉腳本:
#!/bin/bash ./kafka/bin/kafka-server-stop.sh # first stop kafka ./zookeeper/bin/zkServer.sh stop # then stop zookeeper
驗證啟動結果:

驗證關閉結果:

2.6 Topic創建
當使用下面一節maxwell提取出來的binlog資訊的時候,默認使用kafka進行消費。
./kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
2.7 發布與訂閱
向Topic上發布消息,按Ctrl+D
結束:
./kafka-console-producer.sh --broker-list localhost:9092 --topic test

從Topic上接收消息,按Ctrl+C
結束:
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

3.binlog提取工具Maxwell
3.1 Maxwell安裝及配置
Maxwell是將mysql binlog中的insert、update等操作提取出來,並以json數據返回的一個工具。
當然自己也可以用編程實現!
下載地址:http://maxwells-daemon.io/
安裝方式同上。
3.2 mysql配置Maxwell
Maxwell配置文件中默認用戶名密碼均為maxwell,所以需要在mysql中做相應的授權。
mysql> GRANT ALL on maxwell.* to'maxwell'@'%' identified by 'maxwell'; mysql> GRANT SELECT, REPLICATION CLIENT,REPLICATION SLAVE on *.* to 'maxwell'@'%'; mysql> flush privileges;
3.3 配置Maxwell
cp config.properties.example config.properties vi config.properties
maxwell配置:
log_level=info # 默認生產者 producer=kafka kafka.bootstrap.servers=localhost:9092 # mysql login info host=localhost user=maxwell password=maxwell # kafka配置 kafka_topic=test kafka.compression.type=snappy kafka.acks=all kinesis_stream=test
3.4 啟動maxwell
./maxwell/bin/maxwell --user='maxwell' --password='maxwell' --host='127.0.0.1' --producer=kafka --kafka.bootstrap.servers=localhost:9092 --kafka_topic=test
當然也可以把上述封裝成一個啟動腳本:
#!/bin/bash ./maxwell/bin/maxwell --user='maxwell' --password='maxwell' --host='127.0.0.1' --producer=kafka --kafka.bootstrap.servers=localhost:9092 --kafka_topic=test
直接啟動:
./start_maxwell.sh
4.Hbase
4.1 安裝及配置
下載Hbase: https://mirrors.cnnic.cn/apache/hbase/
wget http://mirrors.cnnic.cn/apache/hbase/xxx/hbase-xxx-bin.tar.gz tar zxvf hbase-xxx-bin.tar.gz sudo mv zxvf hbase-xxx-bin /usr/local/hbase
然後把Hadoop中的conf/hbase-site.xml
配置如下:
<configuration> <property> <name>hbase.rootdir</name> <value>file:///home/hadoop/hbase</value> </property> <property> <name>hbase.zookeeper.property.dataDir</name> <value>/home/hadoop/zookeeper</value> </property> </configuration>
並將該配置文件拷貝到hbase/conf
下面即可。
啟動Hbase:
bin/hbase shell
環境變數設置:
修改bashrc文件,添加如下:
export HBASE_HOME=/usr/local/hbase export PATH=$PATH:$HBASE_HOME/bin
後面啟動只需要:
hbase shell
4.2 基本使用
HBase 是一種列式的分散式資料庫,不支援多表連接查詢,可以按照ROW查詢,當中列欄位在簇裡面可以設置。
查詢所有表
list
創建表
info就是簇
create 'test','info'
添加數據
a,b,c是info簇下的三列,value1,value2,value3就是值。
put 'test', 'row1', 'info:a', 'value1' put 'test', 'row2', 'info:b', 'value2' put 'test', 'row3', 'info:c', 'value3'
查詢所有數據
scan 'test'
關於更多資料庫操作及介紹可以去官網學習,掌握上述知識對於本節的實戰就夠了!
4.3 Python操作Hbase
pip install thrift pip install happybase
python連接Hbase需要啟用thrift介面,啟用方式:
./hbase/bin/hbase-daemon.sh start thrift
4.4 Python程式碼實現
import happybase class hbase(): def __init__(self): self.conn = happybase.Connection("127.0.0.1", 9090) print("===========HBASE資料庫表=============n") print(self.conn.tables()) self.conn.open() def createTable(self,table_name,families): self.conn.create_table(table_name,families) def insertData(self,table_name,row,data): table = self.conn.table(table_name) table.put(row=row,data=data) def deletTable(self,table_name,flag): self.conn.delete_table(table_name,flag) def getRow(self,table_name): table = self.conn.table(table_name) print(table.scan()) i=0 for key, value in table.scan(): print(key) print(value) i+=1 print(i) def closeTable(self): self.conn.close() htb = hbase() table_name = 'test1' families = {'info':{}} htb.createTable(table_name,families) htb.insertData(table_name,'row',{'info:content':'光城','info:price':'299'}) htb.deletTable(table_name,True) htb.getRow(table_name)
5.實戰
上述介紹了所有的安裝與使用,下面來實戰兩個例子。
5.1 Kafka消費
流程如下:
往Mysql中實時更新,插入數據等操作,會記錄到binlog中,然後使用maxwell解析binlog,用Kafka進行消費。
依次啟動maxwell,Kafka以及消費Kafka。
./start_maxwell.sh ./start_kafka.sh ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

5.2 Hbase消費
Hbase消費則是在Kafka消費基礎上做的一個調用,通過pykafka
進行消費生產者的數據到Hbase中。流程為:
binlog->maxwell->python操作Kafka->python操作hbase->Hbase。
完整實現如下:
from pykafka import KafkaClient import happybase import json class mysqlToHbase(): def __init__(self): self.client = KafkaClient(hosts="localhost:9092") self.topic = self.client.topics['test'] self.consumer = self.topic.get_balanced_consumer(consumer_group='sqbase', auto_commit_enable=True, zookeeper_connect='localhost:2181') self.conn = happybase.Connection("127.0.0.1", 9090) print("===========HBASE資料庫表=============n") print(self.conn.tables()) self.conn.open() def putTohbase(self,table_name): for m in self.consumer: database = json.loads(m.value.decode('utf-8'))["database"] name = json.loads(m.value.decode('utf-8'))["table"] row_data = json.loads(m.value.decode('utf-8'))["data"] if database == 'mydb' and name == 'house': print(json.loads(m.value.decode('utf-8'))) row_id = row_data["id"] row = str(row_id) del row_data["id"] data = {} for each in row_data: neweach='info:'+each data[neweach] = row_data[each] data['info:price'] = str(data['info:price']) self.insertData(table_name,row,data) def createTable(self,table_name,families): self.conn.create_table(table_name,families) # htb.insertData(table_name, 'row', {'info:content': 'asdas', 'info:price': '299'}) def insertData(self,table_name,row,data): table = self.conn.table(table_name) table.put(row=row,data=data) def deletTable(self,table_name,flag): self.conn.delete_table(table_name,flag) def getRow(self,table_name): table = self.conn.table(table_name) print(table.scan()) i=0 for key, value in table.scan(): print(key) print(value) i+=1 print(i) def closeTable(self): self.conn.close() htb = mysqlToHbase() table_name = 'sql_hbase' families = {'info':{}} # htb.createTable(table_name,families) htb.putTohbase(table_name) htb.closeTable() htb.deletTable(table_name,True) # htb.getRow(table_name)

圖中為當mysql中進行相應操作,hbase便會同步!