使用tunnel同步PG數據到kafka
- 2019 年 10 月 26 日
- 筆記
tunnel同步PG數據到kafka
來自哈羅單車開源的組件。支援同步PG數據到kafka或者ES。
https://github.com/hellobike/tunnel
tunnel整體的部署比較簡單的
需要事先部署好zk和kafka(我下面演示的是單節點的zk和kafka)
節點部署關係:
192.168.2.4 部署zk、kafka、pg10運行在1921埠
192.168.2.189 部署tunnel
確保已開啟PG的邏輯複製
wal_level = 'logical'; max_replication_slots = 20
注意這個設置要重啟PG進程的
然後,創建測試庫表和同步用的帳號
CREATE DATABASE test_database; c test_database create table test_1 (id int primary key , name char(40)); create table test_2 (id int primary key , name char(40)); CREATE ROLE test_rep LOGIN ENCRYPTED PASSWORD 'xxxx' REPLICATION; GRANT CONNECT ON DATABASE test_database to test_rep;
vim pg_hba.conf 增加2行配置:
host all test_rep 192.168.2.0/24 md5 host replication test_rep 192.168.2.0/24 md5
然後 reload 下PG
到192.168.2.189 機器上去 編譯tunnel:
注意: tunnel的啟動需要事先安裝好oracle jdk 1.8
git clone https://github.com/hellobike/tunnel cd tunnel mvn clean package -Dmaven.test.skip=true cd target unzip AppTunnelService.zip cd AppTunnelService
vim conf/test.yml 內容如下:
tunnel_subscribe_config: pg_dump_path: '/usr/local/pgsql-10.10/bin/pg_dump' subscribes: - slotName: slot_for_test pgConnConf: host: 192.168.2.4 port: 1921 database: test_database user: test_rep password: xxxx rules: - {table: test_1, pks: ['id'], topic: test_1_logs} - {table: test_2, pks: ['id'], topic: test_2_logs} kafkaConf: addrs: - 192.168.2.4:9092 tunnel_zookeeper_address: 192.168.2.4:2181
前台啟動:
java -server -classpath conf/*:lib/* com.hellobike.base.tunnel.TunnelLauncher -u false -c cfg.properties -p 7788 # 暴露prometheus metric在7788埠(配置監控不是這裡的重點,也很簡單,暫時先跳過)
然後,我們再在PG10上面的test_database的2張表隨便造些數據,然後可以看到kafka裡面已經有數據了(下圖是通過kafkamanager和 kafka-eagle的結果)。


格式化下,數據就是這樣的:
UPDATE的記錄的樣子:
{ "dataList": [{ "dataType": "integer", "name": "id", "value": "1111" }, { "dataType": "character", "name": "name", "value": "大狗蛋 " }], "eventType": "UPDATE", "lsn": 10503246616, "schema": "public", "table": "test_1" }
DELETE的記錄的樣子:
{ "dataList": [{ "dataType": "integer", "name": "id", "value": "3" }], "eventType": "DELETE", "lsn": 10503247064, "schema": "public", "table": "test_1" }