如何使用Python模擬MySQL Slave,可以看看這個開源項目
- 2019 年 10 月 28 日
- 筆記
這是學習筆記的第 2140 篇文章
在MySQL中通過Master向Slave推送binlog數據變化,從而實現主從複製的過程,是一件看似再正常不過的事情了。整個過程可以使用如下的流程圖來表示。

畢竟這是MySQL體系內的實現,如果想要在這個基礎上擴展,比如實現異構數據的流轉,複製,或者情況糟糕一些,多個跨地域的MySQL之間要實現非同步數據複製,這個時候原生的主從場景就會受到限制了。
如果要實現這種特殊的複製,需要具備兩點,第一是可以正常連接到MySQL,並且具有Slave應該擁有的許可權,第二是按照MySQL協議發送相關的數據包,讓MySQL服務能夠識別你是一個「Slave」,這樣如果發生了數據變化,Master就可以源源不斷的推送數據變化的資訊過來。
在技術方向上已經有了很多的產品和組件,比如阿里的canal,Zendesk的Maxwell, Yelp的MySQLStreamer等,都可以模擬MySQL協議,在行業內也有一些實現場景,在特性完善方面各有差異。
最近也做數據多活的一些方案調研,發現mysql-python-replication是一個很不錯的開源項目,它和行業內知名的一些開源項目都有淵源,實現了底層的協議數據解析。

我們接下來看看mysql-python-replication的源碼實現,做一些簡單的解讀。
首先mysql-python-replication的整體實現思路如下,它其實是基於PyMySQL來連接MySQL,然後來模擬協議層的數據包,得到Master推送的數據之後,按照Binlog中的event類型解析為Insert,delete,update(分別對應insert,delete,update事件),當然Binlog中實際的事件要遠遠比這個多。

mysql-python-replication的源碼很容易得到,在GitHub上搜索mysql-python-replication即可。
得到的源碼結果如下,程式碼量其實遠比想像的要少一些。

最後竟然還很貼心的給出了MySQL 5.6,5.7的安裝部署腳本,在examples目錄下提供了幾個案例,我們今天要分析的主要是基於dump_events.py這個文件,它可以實現模擬Slave的整個過程。
整個實常式序很短,內容如下:
from pymysqlreplication import BinLogStreamReader MYSQL_SETTINGS = { "host": "127.0.0.1", "port": 3306, "user": "root", "passwd": "xxxx"} def main(): # server_id is your slave identifier, it should be unique. # set blocking to True if you want to block and wait for the next event at # the end of the stream stream = BinLogStreamReader(connection_settings=MYSQL_SETTINGS, server_id=3, blocking=True) for binlogevent in stream: binlogevent.dump() stream.close()if __name__ == "__main__": main()
示例的Main方法內其實邏輯相對是比較簡單的,初始化BinLogStreamReader對象,然後解析出event的資訊。
這裡的dump主要是基於BinLogEvent和RotateEvent,其他的大都是PyMySQL底層的實現了。

其實按照這個思路我們是很難讀懂程式碼的,只能做一些基本的程式碼熟悉,一方面我們要不斷的調試理解,另一方面我們需要抓住重點。
重點是什麼呢, 其實就是模擬Slave的原理,我來具體解釋一下。
MySQL Master向Slave推送數據,都是按照基礎的協議來完成,就好比我們工作中常用的專業術語,在外行來看就是高深莫測的東西。在這裡MySQL是定義了兩個命令:COM_REGISTER_SLAVE和COM_BINLOG_DUMP,前者用於註冊Slave,後者通知mysqld從指定binlog pos發送event。
如下是一個調試過程中得到的MySQL執行緒情況,可以看到相應的Binlog Dump執行緒,其實這個資料庫是沒有Slave的。

當然python-mysql-replication程式碼後端也會去information_schema中取一些元數據(其實主要是欄位相關的元數據資訊)。
我們在BinLogStreamReader中可以看到很多方法,其實是有關聯調用的,其中目前我最關注的方法是register_slave

def _register_slave(self): if not self.report_slave: return packet = self.report_slave.encoded(self.__server_id) if pymysql.__version__ < "0.6": self._stream_connection.wfile.write(packet) self._stream_connection.wfile.flush() self._stream_connection.read_packet() else: self._stream_connection._write_bytes(packet) self._stream_connection._next_seq_id = 1 self._stream_connection._read_packet()
return (struct.pack('<i', packet_len) + int2byte(COM_REGISTER_SLAVE) + struct.pack('<L', server_id) + struct.pack('<%dp' % min(MAX_STRING_LEN, lhostname + 1), self.hostname.encode()) + struct.pack('<%dp' % min(MAX_STRING_LEN, lusername + 1), self.username.encode()) + struct.pack('<%dp' % min(MAX_STRING_LEN, lpassword + 1), self.password.encode()) + struct.pack('<H', self.port) + struct.pack('<l', 0) + struct.pack('<l', master_id))
如果條件為:if not self.auto_position:即為偏移量同步模式,
後續發送COM_BINLOG_DUMP的實現邏輯為:
prelude = struct.pack('<i', len(self.log_file) + 11) + int2byte(COM_BINLOG_DUMP) if self.__resume_stream: prelude += struct.pack('<I', self.log_pos) else: prelude += struct.pack('<I', 4) flags = 0 if not self.__blocking: flags |= 0x01 # BINLOG_DUMP_NON_BLOCK prelude += struct.pack('<H', flags) prelude += struct.pack('<I', self.__server_id) prelude += self.log_file.encode()
否則註冊的是GTID相關的binlog_dump
gtid_set = GtidSet(self.auto_position) encoded_data_size = gtid_set.encoded_length header_size = (2 + # binlog_flags 4 + # server_id 4 + # binlog_name_info_size 4 + # empty binlog name 8 + # binlog_pos_info_size 4) # encoded_data_size prelude = b'' + struct.pack('<i', header_size + encoded_data_size) + int2byte(COM_BINLOG_DUMP_GTID)
而這個過程中也會通過PyMySQL連接到資料庫中,得到欄位相關的一些資訊。
SQL為:
SELECT COLUMN_NAME, COLLATION_NAME, CHARACTER_SET_NAME,COLUMN_COMMENT, COLUMN_TYPE, COLUMN_KEY, ORDINAL_POSITION FROM information_schema.columns WHERE table_schema = %s AND table_name = %sORDER BY ORDINAL_POSITION
通過程式碼調試可以看到這些明細的資訊。

當然看到這裡我們可能還是一知半解,不知道這個程式運行結果是怎麼樣的。我們來跑一個demo看看。
首先刷新下日誌,對錶test寫入一行數據,看看解析的效果怎麼樣。
mysql> flush logs;mysql> insert into test values(3,'cc');Query OK, 1 row affected (0.07 sec)mysql> show binary logs;| mysql-bin.000064 | 675 || mysql-bin.000065 | 391 |+------------------+-----------+65 rows in set (0.00 sec)
在後端輸出的日誌內容為:
=== RotateEvent ===Position: 4Next binlog file: mysql-bin.000065()=== RotateEvent ===Position: 4Next binlog file: mysql-bin.000065()=== FormatDescriptionEvent ===Date: 2019-10-23T10:51:41Log position: 123Event size: 100Read bytes: 0()=== QueryEvent ===Date: 2019-10-23T10:51:55Log position: 279Event size: 49Read bytes: 49Schema: testExecution time: 0Query: BEGIN()=== TableMapEvent ===Date: 2019-10-23T10:51:55Log position: 325Event size: 27Read bytes: 26Table id: 599Schema: testTable: testColumns: 2()=== WriteRowsEvent ===Date: 2019-10-23T10:51:55Log position: 364Event size: 20Read bytes: 12Table: test.testAffected columns: 2Changed rows: 1Values:--('*', u'id', ':', 3)('*', u'name', ':', u'cc')()=== XidEvent ===Date: 2019-10-23T10:51:55Log position: 391Event size: 8Read bytes: 8Transaction ID: 82() mysql> show slave hosts;Empty set (0.00 sec)
可以看到解析出了多個event,我們把binlog裡面的事件和輸出比對,會發現這些事件是匹配的。因為flush logs對最後的binlog做了切換,所以有Rotate相關事件。

後續解析的內容是比較常規的部分。在程式碼中增刪改的相關event主要放在了row_event.py中,而範圍更大的一些事件在event.py裡面。

近期熱文:
MySQL中的主鍵和rowid,看似簡單,其實有一些使用陷阱需要注意