通過Maxwell解析MySQL Binlog,打好業務多活的基礎
- 2019 年 11 月 11 日
- 筆記
這是學習筆記的第 2153 篇文章

在Binlog解析方向和數據流轉方向上,經常會提到比較有名的幾類工具,阿里的Canal,Zendesk的Maxwell和Yelp的mysql_streamer,他們整體的情況如下:

主要設計思想是偽裝MySQL Slave,通過與MySQL服務端協議通訊,建立複製執行緒,從而獲得主庫推送的實時數據變化。
在功能完善性和生態建設上,Canal和Zendesk整體的表現要好一些,它們都是基於Java開發,支援多種模式的數據上下游集成,如果是想快速上手,Maxwell是一個不錯的選擇,而mysql_streamer的維護時間在2017年左右,在行業里看到的案例相對要少。
Maxwell相對比較精巧,它能實時讀取MySQL二進位日誌binlog,並生成 JSON 格式的消息,這一點是我優先考慮Maxwell的首要原因,當然它也可以作為生產者發送給 Kafka,Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平台的應用程式。如果說使用場景,它的常見應用場景有ETL、維護快取、收集表級別的DML指標、增量到搜索引擎、數據分區遷移等。
bin/maxwell –user='maxwell' –password='XXXXXX' –port=33071 –host=127.0.0.1 –gtid_mode=true –output_server_id=true –output_thread_id=true –output_schema_id=true –output_primary_keys=true –output_primary_key_columns=true –output_binlog_position=true –output_gtid_position=true –output_null_zerodates=true –output_ddl=true –producer=stdout
開啟了全量的指標,通過全量的指標來權衡各種語句中必須的選項和解析邏輯.
我們先按照兩個大的維度來梳理和總結。
- DML語句梳理
- 事務語句梳理
.DML語句調研梳理
主要覆蓋Insert,Update,Delete,對返回的JSON數據進行梳理分析。
1) Insert語句
JSON返回數據
{ "database": "test", "table": "test_data", "type": "insert", "ts": 1573024626, "xid": 49482, "commit": true, "position": "binlog.000009:2466059", "gtid": "f73d7025-f25b-11e9-9824-52540058c70f:10310", "server_id": 33091, "thread_id": 147, "schema_id": 5, "primary_key": [3], "primary_key_columns": ["id"], "data": { "id": 3, "name": "cc" } }
語句解析設計
可以直接解析data中的數據,拼裝為insert語句
欄位列表需要根據data中的第1行數據進行拼裝
需要解析的屬性:
{ "database": "test", "table": "test_data", "type": "insert", "ts": 1573024626, "xid": 49482, "commit": true, "position": "binlog.000009:2466059", "gtid": "f73d7025-f25b-11e9-9824-52540058c70f:10310", "server_id": 33091, "data": { "id": 3, "name": "cc" } }
冪等偽SQL
Insert into [table]([id],[name]) values(?,?);
2) delete語句
JSON返回數據
{ "database": "test", "table": "test_data", "type": "delete", "ts": 1573014236, "xid": 39918, "commit": true, "position": "binlog.000009:1948897", "gtid": "f73d7025-f25b-11e9-9824-52540058c70f:8856", "server_id": 33091, "thread_id": 122, "schema_id": 5, "primary_key": [3], "primary_key_columns": ["id"], "data": { "id": 3, "name": "fff" } }
語句解析設計
{ "database": "test", "table": "test_data", "type": "delete", "ts": 1573014236, "xid": 39918, "commit": true, "position": "binlog.000009:1948897", "gtid": "f73d7025-f25b-11e9-9824-52540058c70f:8856", "primary_key": [3], "primary_key_columns": ["id"], "data": { "id": 3, "name": "fff" } }
如果刪除多行,假設SQL語句如下,刪除兩行數據:
delete from test_data where id>2;
Query OK, 2 rows affected (0.06 sec)
返回的JSON為:
{ "database": "test", "table": "test_data", "type": "delete", "ts": 1573028638, "xid": 54808, "xoffset": 0, "position": "binlog.000009:2754895", "gtid": "f73d7025-f25b-11e9-9824-52540058c70f:11120", "primary_key": [3], "primary_key_columns": ["id"], } { "database": "test", "table": "test_data", "type": "delete", "ts": 1573028638, "xid": 54808, "commit": true, "position": "binlog.000009:2754895", "gtid": "f73d7025-f25b-11e9-9824-52540058c70f:11120", "primary_key": [4], "primary_key_columns": ["id"], }
通過以上的分析和測試,可以看出delete操作可以關注於primary_key和primary_key_columns,得到相關的SQL語句,實現邏輯冪等性,
冪等偽SQL
Delete from [table] where [id]=?
3) update語句
JSON返回數據
{ "database": "test", "table": "test_data", "type": "update", "ts": 1573024676, "xid": 49552, "commit": true, "position": "binlog.000009:2470294", "gtid": "f73d7025-f25b-11e9-9824-52540058c70f:10322", "server_id": 33091, "thread_id": 147, "schema_id": 5, "primary_key": [3], "primary_key_columns": ["id"], "data": { "id": 3, "name": "ccc" }, "old": { "name": "cc" } }
語句解析設計
{ "database": "test", "table": "test_data", "type": "update", "ts": 1573024676, "xid": 49552, "commit": true, "position": "binlog.000009:2470294", "gtid": "f73d7025-f25b-11e9-9824-52540058c70f:10322", "primary_key": [3], "primary_key_columns": ["id"], "data": { "id": 3, --去除主鍵列 "name": "ccc" }, "old": { "name": "cc" } }
需要儘可能得到完整的Update語句。
冪等偽SQL
Update [table] set [name]=? Where [id]=? and [name]=?
4) 複雜SQL語句
表關聯修改場景1:
mysql> update test_data set name='bb' where id in (select id from test_data2);
Query OK, 1 row affected (0.01 sec)
Rows matched: 1 Changed: 1 Warnings: 0
會轉換為冪等的update語句。
{"database":"test","table":"test_data","type":"update","ts":1573096677,"xid":64394,"commit":true,"position":"binlog.000009:3276416","gtid":"f73d7025-f25b-11e9-9824-52540058c70f:12583","server_id":33091,"thread_id":170,"schema_id":6,"primary_key":[1],"primary_key_columns":["id"],"data":{"id":1,"name":"bb"},"old":{"name":"aa"}}
表關聯修改場景2:
mysql> update test_data,test_data2 set test_data.name='cc' where test_data.id=test_data2.id and test_data2.name='aa';
Query OK, 1 row affected (0.01 sec)
Rows matched: 1 Changed: 1 Warnings: 0
{"database":"test","table":"test_data","type":"update","ts":1573097195,"xid":65078,"commit":true,"position":"binlog.000009:3314180","gtid":"f73d7025-f25b-11e9-9824-52540058c70f:12689","server_id":33091,"thread_id":170,"schema_id":6,"primary_key":[1],"primary_key_columns":["id"],"data":{"id":1,"name":"cc"},"old":{"name":"bb"}}
5) DML語句冪等小結
整體是基於行模式的解析,可以邏輯冪等的設計原則來進行完善。
語句類型 |
冪等SQL |
---|---|
insert |
Insert into [table]([id],[name]) values(?,?); |
delete |
Delete from [table] where [id]=? |
update |
Update [table] set [name]=? Where [id]=? and [name]=? |
通過以上的小結,其實我們可以明確對於分散式ID的強烈需求,這會是我們構築業務多活的基礎實現。
二。事務調研和梳理
1) SQL操作分析
mysql> begin;
Query OK, 0 rows affected (0.00 sec)
mysql> update test_data set name='cc' where id=3;
Query OK, 1 row affected (0.00 sec)
Rows matched: 1 Changed: 1 Warnings: 0
mysql> insert into test_data values(4,'dd');
Query OK, 1 row affected (0.00 sec)
mysql> delete from test_data where id=2 and name='bb';
Query OK, 1 row affected (0.00 sec)
mysql> commit;
Query OK, 0 rows affected (0.01 sec)
2) JSON返回數據
{ "database": "test", "table": "test_data", "type": "update", "ts": 1573024725, "xid": 49621, "xoffset": 0, "position": "binlog.000009:2476678", "gtid": "f73d7025-f25b-11e9-9824-52540058c70f:10340", "server_id": 33091, "thread_id": 147, "schema_id": 5, "primary_key": [3], "primary_key_columns": ["id"], "data": { "id": 3, "name": "cc" }, "old": { "name": "ccc" } } { "database": "test", "table": "test_data", "type": "insert", "ts": 1573024735, "xid": 49621, "xoffset": 1, "position": "binlog.000009:2476778", "gtid": "f73d7025-f25b-11e9-9824-52540058c70f:10340", "server_id": 33091, "thread_id": 147, "schema_id": 5, "primary_key": [4], "primary_key_columns": ["id"], "data": { "id": 4, "name": "dd" } } { "database": "test", "table": "test_data", "type": "delete", "ts": 1573024754, "xid": 49621, "commit": true, "position": "binlog.000009:2476868", "gtid": "f73d7025-f25b-11e9-9824-52540058c70f:10340", "server_id": 33091, "thread_id": 147, "schema_id": 5, "primary_key": [2], "primary_key_columns": ["id"], "data": { "id": 2, "name": "bb" } }
3) 語句邏輯解析設計
按照xoffset來遞增,下標為0,最後一個事務沒有xoffset,commit為true
對於insert,delete,update的解析邏輯可以復用DML處理的部分
SQL語句/命令 |
type |
xid |
timestamp |
xid |
xoffset |
commit |
---|---|---|---|---|---|---|
begin; |
|
|
|
|
|
|
update test_data set name='cc' where id=3; |
update |
49621 |
1573024725 |
147 |
0 |
|
insert into test_data values(4,'dd'); |
insert |
49621 |
1573024735 |
147 |
1 |
|
delete from test_data where id=2 and name='bb'; |
delete |
49621 |
1573024754 |
147 |
|
true |
commit; |
|
|
|
|
|
|
4) 大事務binlog
如果瞬間產生了大量的binlog,為了控制記憶體使用,會將處理延遲的binlog下沉到文件系統。
xxxxx INFO BinlogConnectorLifecycleListener – Binlog connected.
xxxxx INFO ListWithDiskBuffer – Overflowed in-memory buffer, spilling over into /tmp/maxwell7935334910787514257events
後續補充Maxwell解析DDL和設計中的一些潛在問題和補救措施。