分佈式事務學習筆記 2020-02-25

分佈式事務學習項目:流量充值中心 git地址:https://github.com/barrywangmeng/data-refill-center

以流量充值業務為模型,來模擬分佈式事務的實現。

從最開始的單庫到後面拆分成多庫,從2pc到3pc,從springboot到springcloud,一步步模擬分佈式事務中遇到的各種問題。

分佈式事務參見的集中方案

  • XA分佈式事務:一般用於單系統多庫的場景
  • TCC方案:try-confirm-cancel方案, 微服務鏈式調用場景
  • 可靠消息最終一致性方案:針對於耗時的請求操作,使用可靠性消息異步解耦
  • 最大努力通知方案:適用於耗時且非核心業務,例如下單成功發送短訊、發送通知等操作

技術棧&知識點

  • SpringBoot
  • SpringCloud
  • XA規範
  • 分佈式事務2PC、3PC原理
  • JTA、Atomikos框架
  • TCC事務
  • 可靠消息MQ ……

版本

  • master:springboot版本,單體應用多數據庫版本,使用Atomikos框架支持XA規範分佈式事務
  • cloud:springcloud版本,使用tcc分佈式事務

cloud版本如圖,需要自己導入一個個項目:

image.png

流量充值中心的整體架構設計

流量充值中心的整體架構設計.jpg

流量充值中心運轉流程

02_流量充值中心運轉流程.jpg

SpringBoot實現流量中心拆庫操作

數據庫拆分如下:

image.png

SpringBoot多數據源配置:(詳情見application.yml配置文件)

 1activity:   2  datasource:   3    type: com.alibaba.druid.pool.DruidDataSource   4    url: jdbc:mysql://127.0.0.1:3306/data-refill-center-activity?useUnicode=true&characterEncoding=utf-8   5    username: root   6    password: 123456   7    driverClassName: com.mysql.jdbc.Driver   8    initialSize: 10   9    minIdle: 50  10    maxActive: 500  11    maxWait: 60000   12    timeBetweenEvictionRunsMillis: 60000    13    minEvictableIdleTimeMillis: 300000  14    validationQuery: SELECT 1 FROM DUAL  15    testWhileIdle: true  16    testOnBorrow: false    17    testOnReturn: false    18    poolPreparedStatements: true   19    maxPoolPreparedStatementPerConnectionSize: 20  20    filters: stat,wall,log4j  21    connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000

Spring聲明式事務

Spring通過@Transaction 註解來實現聲明式事務。 原理如下圖:

image.png

分佈式事務初探

分佈式事務是指揮操作多個數據庫的事務,這裡可以先來說下XA規範:

XA是由X/Open組織提出的分佈式事務的規範。 XA規範主要定義了(全局)事務管理器(Transaction Manager)和(局部)資源管理器(Resource Manager)之間的接口。 XA接口是雙向的系統接口,在事務管理器(Transaction Manager)以及一個或多個資源管理器(Resource Manager)之間形成通信橋樑。 XA引入的事務管理器充當上文所述全局事務中的「協調者」角色。事務管理器控制着全局事務,管理事務生命周期,並協調資源。 資源管理器負責控制和管理實際資源(如數據庫或JMS隊列)。

上面概念有些抽象,我們只需要知道X/Open的組織定義了分佈式事務的模型,這裏面有幾個角色:

  1. AP(Application,應用程序):我們自己的系統
  2. TM(Transaction Manager,事務管理器):一個在系統里嵌入的一個專門管理橫跨多個數據庫的事務的一個組件
  3. RM(Resource Manager,資源管理器):數據庫(比如MySQL)
  4. CRM(Communication Resource Manager,通信資源管理器):可以是消息中間件(但是也可以不用這個東西)

XA就是定義好的那個TM與RM之間的接口規範,就是管理分佈式事務的那個組件跟各個數據庫之間通信的一個接口

2PC分佈式事務

Two-Phase-Commitment-Protocol,兩階段提交協議

X/Open組織定義的一套分佈式事務的模型,我們可以按照這個模型去實現分佈式事務。

如果我們明白了XA原理也也就等於明白了2PC原理了。2PC是基於XA規範搞的一套分佈式事務的理論,也可以叫做一套規範,或者是協議。

原理如下:

(1)準備階段 TM先發送個prepare消息給各個數據庫,讓各個庫先把分佈式事務里要執行的各種操作,先準備執行,其實此時各個庫會差不多先執行好,不提交事務

prepare消息發送後,各個庫先在本地開個事務,然後執行好SQL,這裡不提交事務,但是會有對應的日誌記錄

然後各個數據庫都返回一個響應消息給事務管理器,如果成功了就發送一個成功的消息,如果失敗了就發送一個失敗的消息

(2)提交階段

第一種情況,要是TM發現某個數據庫返回失敗,此時TM直接判定這個分佈式事務失敗,然後TM通知所有數據庫,全部回滾。反之,則通知所有數據庫提交事務。

原理圖如下:(結合流量充值項目代碼)

04_2PC協議.jpg

2PC缺陷

  1. 同步阻塞:在階段一里執行prepare操作會佔用資源,一直到整個分佈式事務完成,才會釋放資源,這個過程中,如果有其他人要訪問這個資源,就會被阻塞住
  2. 單點故障:TM是個單點,一旦掛掉就完蛋了
  3. 事務狀態丟失:即使把TM做成一個雙機熱備的,一個TM掛了自動選舉其他的TM出來,但是如果TM掛掉的同時,接收到commit消息的某個庫也掛了,此時即使重新選舉了其他的TM ,壓根兒不知道這個分佈式事務當前的狀態,因為不知道哪個庫接收過commit消息,哪個接收過commit消息的庫也掛了
  4. 腦裂問題:在階段二中,如果發生了腦裂問題,那麼就會導致某些數據庫沒有接收到commit消息,有些庫收到了commit消息,結果有些庫沒有收到

具體如圖:

2PC存在的問題.jpg

3PC 分佈式事務

Three-Phase-Commitment 三階段提交協議

主要是針對2PC的一些問題做了改進,改變成下面的過程:

1、 CanCommit階段 這個階段就是TM發送一個CanCommit消息給各個數據庫,各個數據庫返回個結果,這裡並不直接執行SQL語句,只是告訴TM我的網絡是通暢的。

2、PreCommit階段 如果各個數據庫對CanCommit返回都是成功,那麼就進入PreCommit階段,這個時候各個數據庫會執行SQL語句,但是並不提交事務

3、DoCommit階段 如果PreCommit都返回成功,那麼各個數據庫進入DoCommit階段,TM給各個數據庫發送提交事務指令。反之則發送abort消息給各個數據庫。

跟2PC相比,主要是有兩點改進:

  1. 引入CanCommit階段
  2. DoCommit階段引入超時機制,如果PreCommit階段返回都是成功,如果超時時間到了,數據庫還沒有收到DoCommit或者abort指令則會自己提交事務

3PC存在的問題: 1、腦裂問題:如果TM發送abort消息,但是某個數據庫由於網絡原因沒有接收到,那麼這個數據庫過了一定時間後就會自己提交事務

執行原理圖:

3PC執行原理.jpg

Mysql 對分佈式事務的支持

Mysql是支持XA分佈式事務的,而且支持2PC的協議。具體代碼可參見com.barrywang.data.refill.center.MySQLXATest.java

原理圖如下:

Mysql XA分佈式事務實現.jpg

JTA事務原理

Java Transaction API 一套分佈式事務的編程API

如果是跨多個庫的事務,是通過JTA API來支持的,通過JTA API可以協調和管理橫跨多個數據庫的分佈式事務,一般來說會結合JNDI。

JTA&Atomikos

Atomikos:客戶端的TM第三方庫,基於Mysql的XA API來實現分佈式事務

基於TCC分佈式事務

TCC:Try-Cancle-Commit 三個階段

TCC方案細節.png

springcloud、springboot或者dubbo項目都可以使用開源框架bytetcc(https://github.com/liuyangming/ByteTCC) 來實現分佈式事務 推薦一篇文章: bytetcc原理(https://www.cnblogs.com/jajian/p/10014145.html)

可靠消息最終一致性方案

實現原理如圖:

可靠消息最終一致性方案.jpg

可靠消息最終一致性方案涉及到4個組件:

  1. 上游服務:發送MQ消息通知下游服務執行某個操作
  2. 可靠消息服務:協調上下游服務的消息傳遞,確保數據一致性,可以認為這個所謂的可靠消息服務是我們自己開發的,也是一個spring cloud的服務,只不過這個服務是通用的,是所有服務所有系統都基於這個可靠消息服務來實現可靠消息最終一致性的方案。「可靠消息」四個字,這一切都是基於可靠消息服務來做的,方案設計,消息如何保持可靠性
  3. MQ消息中間件:這個一般是RocketMQ、RabbitMQ或者Kafka
  4. 下游服務:就是那個要被調用的服務

所謂的分佈式事務,上游服務他要執行一個本地的數據庫操作,下游服務也要執行一個本地的數據庫操作,現在盡量就是希望是說上游服務和下游服務的數據庫操作要麼同時完成,要麼同時不完成。

具體的執行流程如下所示:

(1)上游服務發送一個待確認消息給可靠消息服務

(2)可靠消息服務將這個待確認的消息保存到自己本地數據庫里,保存起來,但是不發給MQ,這個時候消息的狀態是「待確認」

(3)上游服務操作本地數據庫

(4)上游服務根據自己操作本地數據庫的結果,來通知可靠消息服務,可以確認發送消息了,或者是刪除消息。 操作完本地數據庫之後,會有兩個結果,第一個結果是操作失敗了,第二個結果是操作成功了,如果本地數據庫操作失敗了,本地操作會回滾,回滾之後,上游服務就要通知可靠消息服務刪除消息;如果本地數據庫操作成功了,那麼此時本地事務就提交了,接着就可以通知可靠消息服務發送消息

(5)可靠消息服務將這個消息的狀態修改為「已發送」,並且將消息發送到MQ中間件里去 這個環節是必須包裹在一個事務里的,如果發送MQ失敗報錯,那麼可靠消息服務更新本地數據庫里的消息狀態為「已發送」的操作也必須回滾,反之如果本地數據庫里的消息狀態為「已發送」,那麼必須成功投遞消息到MQ里去

@Transactional public void confirmMessage(Long messageId) { messageDAO.updateStatus(messageId, MessageStatus.SENT); rabbitmqProducer.send(message); }

如果更新數據庫里的消息狀態報錯了,那麼消息根本不會投遞到MQ里去;如果更新數據庫里的消息狀態成功了,但是事務還沒提交,然後將消息投遞到MQ里去報錯了,此時事務管理器會感知到這個異常,然後會直接回滾掉整個事務,更新數據庫里消息狀態的操作也會回滾掉的

就可以保證說,更新數據庫里的消息狀態和投遞消息到MQ,要麼一起成功,要麼一起失敗,這裡這第5個步驟,必須是一起成功或者是一起失敗

MQ,rabbitmq,都有事務消息的一個實現,你可以先去投遞一個prepare的消息,接着如果說數據庫操作成功過了,那麼就commit那個消息發送給rabbitmq,然後如果數據庫操作失敗了,就通知mq去rollback一條消息

但是MQ的事務消息最好別輕易用,因為那個性能實在是太低了,吞吐量太差

所以說我這裡給大家介紹的是上面的那種方案

(6)下游服務從MQ里監聽到一條消息

(7)下游服務根據消息,在自己本地操作數據庫

(8)下游服務對本地數據庫操作完成之後,對MQ進行ack操作,確認這個消息處理成功

現在的MQ中間件,無論是rabbitmq、rocketmq、kafka,都是支持手動ack。如果你是使用的默認自動ack的模式,那麼就會導致消息的丟失;現在一般都會用手動ack,當本地操作執行成功之後,再對MQ執行手動的ack確認

只有當我手動ack確認之後,mq才會刪除消息

如果我還沒ack,本地數據庫比如操作失敗報錯了,此時MQ一直沒收到ack消息,會怎麼樣呢?此時MQ會保證重新投遞一次消息,可以給其他的消費者實例去消費

(9)下游服務對MQ進行ack之後,再給可靠消息服務發送個請求,通知該服務說,ok,我這裡處理完畢了,可靠消息服務收到通知之後,將消息的狀態修改為「已完成」