使用tunnel同步PG數據到kafka

  • 2019 年 10 月 26 日
  • 筆記

tunnel同步PG數據到kafka

來自哈羅單車開源的組件。支援同步PG數據到kafka或者ES。

https://github.com/hellobike/tunnel

tunnel整體的部署比較簡單的

需要事先部署好zk和kafka(我下面演示的是單節點的zk和kafka)

節點部署關係:

192.168.2.4   部署zk、kafka、pg10運行在1921埠

192.168.2.189 部署tunnel

確保已開啟PG的邏輯複製

wal_level = 'logical';    max_replication_slots = 20

注意這個設置要重啟PG進程的

然後,創建測試庫表和同步用的帳號

CREATE DATABASE test_database;    c test_database    create table test_1 (id int primary key , name char(40));    create table test_2 (id int primary key , name char(40));        CREATE ROLE test_rep LOGIN ENCRYPTED PASSWORD 'xxxx' REPLICATION;    GRANT CONNECT ON DATABASE test_database to test_rep;

vim pg_hba.conf 增加2行配置:

host    all                   test_rep        192.168.2.0/24         md5    host    replication     test_rep        192.168.2.0/24         md5

然後 reload 下PG

到192.168.2.189 機器上去 編譯tunnel:

注意: tunnel的啟動需要事先安裝好oracle jdk 1.8

git clone https://github.com/hellobike/tunnel    cd tunnel    mvn clean package -Dmaven.test.skip=true    cd target    unzip AppTunnelService.zip    cd AppTunnelService

vim conf/test.yml 內容如下:

tunnel_subscribe_config:      pg_dump_path: '/usr/local/pgsql-10.10/bin/pg_dump'      subscribes:      - slotName: slot_for_test        pgConnConf:          host: 192.168.2.4          port: 1921          database: test_database          user: test_rep          password: xxxx        rules:        - {table: test_1, pks: ['id'], topic: test_1_logs}        - {table: test_2, pks: ['id'], topic: test_2_logs}        kafkaConf:          addrs:          - 192.168.2.4:9092    tunnel_zookeeper_address: 192.168.2.4:2181

前台啟動:

java -server -classpath conf/*:lib/* com.hellobike.base.tunnel.TunnelLauncher -u false -c cfg.properties -p 7788     # 暴露prometheus metric在7788埠(配置監控不是這裡的重點,也很簡單,暫時先跳過)

然後,我們再在PG10上面的test_database的2張表隨便造些數據,然後可以看到kafka裡面已經有數據了(下圖是通過kafkamanager和 kafka-eagle的結果)。

格式化下,數據就是這樣的:

UPDATE的記錄的樣子:

{             "dataList": [{                       "dataType": "integer",                       "name": "id",                       "value": "1111"             }, {                       "dataType": "character",                       "name": "name",                       "value": "大狗蛋 "             }],             "eventType": "UPDATE",             "lsn": 10503246616,             "schema": "public",             "table": "test_1"    }

DELETE的記錄的樣子:

{             "dataList": [{                       "dataType": "integer",                       "name": "id",                       "value": "3"             }],             "eventType": "DELETE",             "lsn": 10503247064,             "schema": "public",             "table": "test_1"    }