Seata 1.5.2 源碼學習

文章有點長,我決定用半個小時來和你分享~😂 廢話不多說,上代碼。。。

基於Seata 1.5.2,項目中用 seata-spring-boot-starter

1. SeataDataSourceAutoConfiguration

SeataDataSourceAutoConfiguration 主要是配置數據源代理,可以看到:

  1. 默認seata.enabled、seata.enableAutoDataSourceProxy、seata.enable-auto-data-source-proxy都是true
  2. 只有當classpath中有DataSource時才會進行此配置
  3. 創建了一個SeataAutoDataSourceProxyCreator,用於自動代理數據源

先記一下,SeataAutoDataSourceProxyCreator是一個BeanPostProcessor

剛才new了一個SeataAutoDataSourceProxyCreator,繼續看構造方法,默認useJdkProxy是false,excludes為空,dataSourceProxyMode是AT

構造方法中最重要的一件事情是構造AOP通知(攔截器),這裡new了一個SeataAutoDataSourceProxyAdvice

SeataAutoDataSourceProxyAdvice是一個MethodInterceptor。

MethodInterceptor是aop中的一個接口,當目標方法被調用時就會調用與之關聯的MethodInterceptor的invoke方法

至此,在構造方法中完成了advisors的賦值,advisors[]中有一個DefaultIntroductionAdvisor,DefaultIntroductionAdvisor中引用了SeataAutoDataSourceProxyAdvice

前面說過,SeataAutoDataSourceProxyCreator是一個BeanPostProcessor,而BeanPostProcessor是BeanFactory中的一個鉤子(回調),稱之為後置處理器

AbstractAutoProxyCreator#postProcessBeforeInstantiation()

AbstractAutoProxyCreator#postProcessAfterInitialization()

AbstractAutoProxyCreator#wrapIfNecessary()

AbstractAutoProxyCreator#getAdvicesAndAdvisorsForBean()

SeataAutoDataSourceProxyCreator#getAdvicesAndAdvisorsForBean()

SeataAutoDataSourceProxyCreator#wrapIfNecessary()

DataSourceProxyHolder維護了數據源對象與數據源代理對象的映射

至此,數據源代理部分就看完了,下面總結一下:

1、啟動的時候自動配置數據源代理,創建了一個SeataAutoDataSourceProxyCreator

2、SeataAutoDataSourceProxyCreator在構造方法中創建AOP通知,並賦值給其屬性

3、AbstractAutoProxyCreator是一個抽象類不能被實例化,能實例化的只有SeataAutoDataSourceProxyCreator

4、SeataAutoDataSourceProxyCreator從AbstractAutoProxyCreator那裡繼承了很多屬性和方法,其中就包括postProcessBeforeInstantiation()、postProcessAfterInitialization()、createProxy()等等

5、SeataAutoDataSourceProxyCreator間接實現了BeanPostProcessor接口,也就是說它也是BeanPostProcessor的一個實現類

6、BeanFactory回調所有的BeanPostProcessor#postProcessAfterInitialization()時,就會調用SeataAutoDataSourceProxyCreator的postProcessAfterInitialization()方法,最終會調用wrapIfNecessary()方法

7、wrapIfNecessary()只關心DataSource對象,它負責為DataSource對象生成代理對象,並且在SeataAutoDataSourceProxyCreator中維護了DataSource對象與SeataDataSourceProxy對象之間的映射關心

8、創建代理對象時,會給DataSource對象應用AOP攔截器。用AOP的話來講,就是給目標對象DataSource織入通知,並創建一個被增強的代理對象

9、通知(攔截器)是SeataAutoDataSourceProxyAdvice,它實現了MethodInterceptor接口

10、SeataAutoDataSourceProxyAdvice#invoke()方法所做的事情就是,拿到原始DataSource的代理對象,並且在代理對象上調用目標方法

綜上所述,以上做的所有工作都是為了將來調用 javax.sql.DataSource 上的任意方法時都會被攔截,然後調用其代理對象上對應的方法。而DataSource中最重要的一個方法就是getConnection()

劃重點:將來,所有調用 javax.sql.DataSource#getConnection() 都會被攔截,然後在代理對象上執行getConnection(),因此可以這樣說

調 javax.sql.DataSource#getConnection() 實際上執行的是 io.seata.rm.datasource.SeataDataSourceProxy#getConnection()

2. SeataAutoConfiguration

SeataAutoConfiguration裏面主要是配置GlobalTransactionScanner(全局事務掃描器)

seata.enabled=true 才會開啟 SeataAutoConfiguration

GlobalTransactionScanner 也繼承自 AbstractAutoProxyCreator,同時還實現了InitializingBean接口。BeanFactory在設置了所有bean屬性之後會調用InitializingBean的afterPropertiesSet()方法

GlobalTransactionScanner#afterPropertiesSet()

io.seata.common.DefaultValues中定義了很多默認值

同樣地,因為實現了BeanPostProcessor接口,所以在啟動時BeanFactory實例化Bean之後,會調用GlobalTransactionScanner的postProcessAfterInitialization(),儘管這個postProcessAfterInitialization()方法時從AbstractAutoProxyCreator那裡繼承來的,但是不影響啊,還是會調用GlobalTransactionScanner這個bean的postProcessAfterInitialization()方法。於是,最終又會調wrapIfNecessary()方法。

GlobalTransactionScanner#wrapIfNecessary()

這裏面有一個很重要的邏輯就是,創建了一個GlobalTransactionalInterceptor對象,並賦值給interceptor

AbstractAutoProxyCreator#getAdvicesAndAdvisorsForBean()是一個抽象方法,實現在子類GlobalTransactionScanner中

因此,所有在GlobalTransactionScanner#wrapIfNecessary()中被代理的對象,都被應用GlobalTransactionalInterceptor

GlobalTransactionalInterceptor也是一個MethodInterceptor

也就是說,目標方法的調用都會轉到GlobalTransactionalInterceptor#invoke()上

GlobalTransactionalInterceptor#handleGlobalTransaction()

事務執行直接調用TransactionalTemplate的execute()方法

io.seata.tm.api.TransactionalTemplate#execute()

io.seata.tm.api.GlobalTransactionContext#getCurrent() 獲取當前事務

io.seata.tm.api.TransactionalTemplate#beginTransaction()

tx是DefaultGlobalTransaction

io.seata.tm.api.DefaultGlobalTransaction#begin()

DefaultGlobalTransaction中的TransactionManager是DefaultTransactionManager

DefaultTransactionManager中提供了事務相關的底層操作

io.seata.tm.api.DefaultGlobalTransaction#commit()

io.seata.tm.api.DefaultGlobalTransaction#rollback()的邏輯與commit()類似,都是重試調用transactionManager.rollback(xid)

全局事務掃描器部分的代碼就看到這裡,下面總結一下:

1、配置項seata.enabled=true 會觸發 SeataAutoConfiguration 自動配置

2、SeataAutoConfiguration中創建了一個GlobalTransactionScanner

3、GlobalTransactionScanner繼承了AbstractAutoProxyCreator,並實現InitializingBean接口

4、初始化TM、RM

5、由於繼承了AbstractAutoProxyCreator,所以BeanFactory會調用GlobalTransactionScanner#方法postProcessAfterInitialization(),最終會調用GlobalTransactionScanner#wrapIfNecessary()來為目標對象創建代理對象

6、GlobalTransactionScanner#wrapIfNecessary()中創建了一個GlobalTransactionalInterceptor,GlobalTransactionalInterceptor是一個MethodInterceptor

7、在創建代理對象的時候,在AbstractAutoProxyCreator#wrapIfNecessary()方法中,為代理對象應用GlobalTransactionalInterceptor,於是所有目標對象上的方法調用就會轉為調用GlobalTransactionalInterceptor#invoke()

8、GlobalTransactionalInterceptor#invoke()方法中,首先獲取被調用的目標對象的Class和Method對象,然後檢查目標方法或類上是否有@GlobalTransactional或@GlobalLock註解,而且配置項中不能禁用全局事務

9、如果加了@GlobalTransactional註解,則創建一個AspectTransactional,然後開始處理全局事務,默認傳播特性是REQUIRED

10、如果加了@GlobalLock註解,則開始處理全局鎖

11、處理全局事務就是直接調用事務模板中的execute方法,TransactionalTemplate#execute()是一個模板方法,其中定義了事務處理的流程。首先開啟事務,然後執行業務邏輯,最後提交事務,異常回滾事務。

12、事務操作是在DefaultGlobalTransaction中處理的,最終處理在DefaultTransactionManager。DefaultTransactionManager負責同步遠程調用,向TC發請求來開啟、提交、回滾事務等操作

3. 數據庫操作執行SQL語句

通過Java自帶的JDBC操作數據庫通常是這樣的:

Class.forName(driverClass);
// 獲取Connection
Connection connection = DriverManager.getConnection(url,user,password);
// 創建Statement或者PreparedStatement
Statement stmt = connection.createStatement();
stmt.execute(sql);

// PreparedStatement ps = connection.prepareStatement(sql);
// ps.execute();

MyBatis底層也是這一套

接下來看Seata是如何做的

首先是獲取數據庫連接Connection,前面已經說過了,調用DataSource的getConnection()方法底層是在代理對象SeataDataSourceProxy上調用getConnection()。SeataDataSourceProxy是接口,如果是AT模式,則這個數據源代理對象是DataSourceProxy

DataSourceProxy#getConnection()獲取數據庫連接

ConnectionProxy#createStatement()

ConnectionProxy#prepareStatement()

PreparedStatementProxy 繼承自 StatementProxy,因此下面就直接看PreparedStatementProxy如何執行SQL

PreparedStatementProxy#execute()

ExecuteTemplate#execute()  是一個模板方法

挑一個看看吧,就挑UpdateExecutor

UpdateExecutor構造方法中一直調父類的構造法,既然如此,那麼直接看BaseTransactionalExecutor

UpdateExecutor#execute()

這個方法時從BaseTransactionalExecutor那裡繼承來的,又是一個模板方法,可見設計模式是多麼重要

AbstractDMLBaseExecutor#doExecute()

AbstractDMLBaseExecutor#executeAutoCommitTrue()

ConnectionProxy#changeAutoCommit()

現在事務自動提交已經被Seata改成false了

UpdateExecutor#beforeImage()

BaseTransactionalExecutor#prepareUndoLog()

接下來,提交事務

ConnectionProxy#commit()

ConnectionProxy#processGlobalTransactionCommit() 處理全局事務提交

分支事務提交以後,業務數據更改和undo_log就都提交了

回想一下,為什麼在執行業務修改前要先將默認的自動提交改成手動提交,最後再改成自動提交呢?

因為,要將業務數據修改和插入undo_log放在同一個事務里,一起提交

這一切都歸功於代理

回顧一下整個調用鏈

結合之前的案例,AT模式TC、TM、RM三者的交互應該是這樣的:

問題一:為什麼在執行的時候,先將數據庫自動提交autoCommit設為false,最後再改成true呢?

答:因為,需要將undo_log和業務數據修改放到同一個事務中,這樣可以保證業務數據修改成功後undo_log必然插入成功,所以Seata要將其改為手動提交。最後再改成true是因為默認autoCommit就是true,這樣可以不影響其它業務。

問題二:什麼情況下ConnectionContext中xid=null,且isGlobalLockRequire=true呢?或者換一種問法,什麼情況下不在全局事務中,當仍然需要全局鎖呢?

答:當業務方法上不加@GlobalTransactional,而是只加了@GlobalLock註解的情況下,就會出現上述情況,也就會執行 ConnectionProxy#processLocalCommitWithGlobalLocks()方法,在事務提交前檢查全局鎖,這樣設計的目的是在AT模式下,不出現臟讀、臟寫。由於數據源被代理了,當一個加了@GlobalTransactional的全局事務,與另一個加了@GlobalTransactional或@GlobalLock註解的事務在本地事務提交前就會檢查全局鎖,要先獲得全局鎖才能提交本地事務,這樣就避免了臟讀臟寫,從而相當於實現了全局事務的讀已提交隔離級別。參見://seata.io/zh-cn/blog/seata-at-lock.html

關於Seata 1.5.2 Client端的源碼學習就先到這裡,歡迎交流~

如果你都已經看到了這裡,不妨給我點個贊吧😄

 

Tags: