使用go-mysql-postgresql實現 MySQL實時同步數據到PG

  • 2019 年 10 月 4 日
  • 筆記

MySQL to PG 的數據同步,可以通過canal 或者 bireme 來做,但是操作起來步驟都比較費事。

之前公司的同事,在go-mysql-elasticsearch的基礎上,改了一下,將target從es改為了pg,工具名稱叫做go-mysql-postgresql 。這個工具最大的好處就是一鍵部署使用,不依賴其它組件。

項目地址:https://github.com/frainmeng/go-mysql-elasticsearch

我實驗的時候,看到當前最新版本為 : go-mysql-postgresql3.0.0-linux-amd64.tar.gz

下面是我的配置操作筆記:

1、 在源MySQL上開設同步專用的帳號

grant replication slave, replication client,process ,select on *.* to dts@'%' identified by 'dts';    MySQL上面的表情況:  use testdb;  testdb >show create table t_order G  *************************** 1. row ***************************         Table: t_order  Create Table: CREATE TABLE `t_order` (    `id` int(10) unsigned NOT NULL AUTO_INCREMENT,    `aid` int(10) unsigned NOT NULL,    `uid` int(10) unsigned NOT NULL,    `type` tinyint(3) unsigned NOT NULL,    `status` tinyint(4) unsigned NOT NULL,    `price` int(10) unsigned NOT NULL COMMENT '',    `num` int(10) unsigned NOT NULL,    `city` varchar(64) NOT NULL,    `category` varchar(64) NOT NULL,    PRIMARY KEY (`id`),    KEY `uid` (`uid`)  ) ENGINE=InnoDB AUTO_INCREMENT=1000 DEFAULT CHARSET=utf8 ROW_FORMAT=COMPRESSED COMMENT=''  1 row in set (0.00 sec)

2、在PG上創建相同的表

create database testdb ;    c testdb     CREATE TABLE t_order (    id bigint  NOT NULL,    aid bigint  NOT NULL,    uid bigint  NOT NULL,    type bigint  NOT NULL,    status bigint  NOT NULL,    price bigint  NOT NULL ,    num bigint  NOT NULL,    city varchar(64) NOT NULL,    category varchar(64) NOT NULL,    PRIMARY KEY (id)  ) ;    CREATE USER dts REPLICATION LOGIN CONNECTION LIMIT 10 ENCRYPTED PASSWORD 'dts';   grant connect on database testdb to dts;  grant usage on schema public to dts;  grant select on all tables in schema public to dts;    grant all on table t_order to dts;

go-mysql-postgresql 的部署:

將文件解壓到 /var/lib/pgsql/go-mysql-postgresql 目錄裡面。

vim /var/lib/pgsql/go-mysql-postgresql/master.info  將準備同步的binlog資訊寫入文件中

bin_name = "mysql-bin.000167"

bin_pos = 13389413

cat /var/lib/pgsql/go-mysql-postgresql/river.toml

# 源端MySQL連接配置  my_addr = "172.31.10.100:3306"  my_user = "dts"  my_pass = "dts"  my_charset = "utf8"    # 目的端PG連接配置  pg_host = "192.168.2.4"  pg_port = 5434  pg_user = "dts"  pg_pass = "dts"  pg_dbname = "testdb"    # 存放同步到的位移點的文件目錄  data_dir = "./var"  # Inner Http status address  stat_addr = "192.168.2.4:12800"    # statsd monitor  statsd_host = "127.0.0.1"  statsd_port = 8125  statsd_prefix = "dbsync"    # 偽裝成slave時候,配置的server-id  server_id = 1001  flavor = "mysql"    # minimal items to be inserted in one bulk  bulk_size = 1    # force flush the pending requests if we don't have enough items >= bulk_size  flush_bulk_time = "500ms"    # Ignore table without primary key  skip_no_pk_table = false  # concurrency conf  concurrent_size = 6  concurrent_ack_win = 2048    # MySQL data source  [[source]]  schema = "testdb"  tables = ["t_order"]    # 目標PG的連接配置  [[target]]  pg_name = "172.31.10.100_testdb_t_order"  pg_host = "192.168.2.4"  pg_port = 5434  pg_user = "dts"  pg_pass = "dts"  pg_dbname = "testdb"    # MySQL 數據到 PG 後的分發規則  [[rule]]  #mysql 庫表的配置  schema = "testdb"  table = "t_order"  # pg 庫表的配置  pg_schema = "public"  pg_table = "t_order"  # 下面這行很重要,標識了rule和target的綁定關係  pg_name = "172.31.10.100_testdb_t_order"

啟動:

sh start.sh 即可

日誌大致類似這樣的:

[2019/08/21 13:02:36] [info] pgclient.go:199 pg delete event execute success! Schema[public] Table[t_order], Id[166773984],result[{0xc000182b00 1}],reqId[503]

測試:

5k條記錄, 走專線  從傳輸到寫入到pg 用了33s  2019-08-20 23:33:29.289 CST [112184] LOG:  duration: 0.321 ms  2019-08-20 23:34:02.769 CST [112184] LOG:  duration: 0.085 ms      2w記錄, 走專線  從傳輸到寫入到pg 用了 140s  2019-08-20 23:35:20.216 CST [112189] LOG:  duration: 0.347 ms  2019-08-20 23:37:39.848 CST [85173] LOG:  duration: 6.648 ms

最後補充:

我們在做異構數據同步的時候,使用go-mysql-postgresql之前,通常情況下還需要將mysql老的數據全量同步過來,然後才能使用 go-mysql-postgresql來消費binlog達到同步數據的目的。 全量同步數據的方法,可以參考上一篇blog,地址: https://blog.51cto.com/lee90/2436325