(WebFlux)003、多數據源R2dbc事務失效分析
- 2022 年 8 月 27 日
- 筆記
- JAVA, R2DBC, spring, SpringWebFlux, spring源碼分析, SrpingWebflux, 多數據源, 源碼分析
一、背景
最近項目持續改造,然後把SpringMVC換成了SpringWebflux,然後把Mybatis換成了R2dbc。中間沒有遇到什麼問題,一切都那麼的美滋滋,直到最近一個新需求的出現,打破了往日的寧靜。
在對需求分析了一番後,需要引入新的數據源,那就是MongoDb。然後出現了MongoDb、Mysql兩種數據源,然後原來好好的事物操作就芭比Q(完蛋)了。細細來分析一下原因與解決方法。
題外話:在本地測試的時候強烈建議用虛擬機+Docker來安裝MySql與MongoDb,不然Mac直連docker真的麻煩啊!!~
SpringBoot 版本號:
2.6.10 , (本文基於已經會在項目中使用R2DBC與MongoDb)
二、武松打虎
2.1 單獨solo Mysql
我們創建了一個測試庫r2dbc_test,裡面有一個user表。
# 創建測試庫
create database r2dbc_test;
# 創建表
create table r2dbc_test.user(
id int auto_increment primary key ,
name varchar(12)
);
2.1.1 項目引入R2dbc
略..給出鏈接,如果感興趣可以看看,Spring Data R2DBC,(實在太多,這個時間點懶得寫了,後面有時間再補一下),
2.1.2 測試程式碼
創建表對結構對應實體類:user
@Data
@Table("user")
@NoArgsConstructor
@AllArgsConstructor
public class User implements Persistable<Integer> {
@Id
private Integer id;
private String name;
@Override
public boolean isNew() {
return true;
}
}
這裡面有個坑點,那就是為什麼實現org.springframework.data.domain.Persistable
這個介面呢,先賣個關子,看完Repository後在描述哈。
Repository如下程式碼所示。
/**
* <br>User Repository</br>
*
* @author [email protected]
* @since 2022/8/26
*/
@Repository
public interface UserR2dbcRepository extends R2dbcRepository<User, Integer> {
}
我們直接使用了Spring提供好的org.springframework.data.r2dbc.repository.R2dbcRepository
,裡面有一些基礎的實現類。我們在測試的時候使用了org.springframework.data.repository.reactive.ReactiveCrudRepository#save()
方法,這個方法會去判斷這個實體對象是不是new object,如果不是,則會去Update。而判斷的方法則是org.springframework.data.domain.Persistable#isNew()
方法。所以這就是我們為啥要實現這個介面。
接著寫一個簡單測試的Controller,程式碼如下所示。
@RestController
@EnableR2dbcRepositories
public class TransactionController {
@Autowired
private UserR2dbcRepository repository;
@Autowired
private TransactionalOperator operator;
// 根據seed當做初始ID,初始化資料庫對象, 便於測試
@RequestMapping("/r2dbc/init")
public Flux<User> init(Integer seed) {
Flux<User> userFlux = Flux.range(seed, 5).map(id -> new User(id,"name" + id))
.flatMap(repository::save);
return userFlux;
}
// 先刪除一條記錄, 然後在添加一條記錄
@RequestMapping("/r2dbc/delete")
public Mono<User> delete(Integer id1, Integer id2) {
Mono<Void> id1Mono = repository.deleteById(id1);
Mono<User> id2Mono = repository.save(new User(id2, "name" + id2));
return id1Mono.then(id2Mono).as(operator::transactional);
}
}
不要糾結沒有service啥的哈,我們僅僅為了測試哈。兩個方法
- 方法一:init, 用seed當做起始Id, 然後在資料庫生成數據存儲起來
- 方法二:delete, 先刪處一條數據,然後在插入一條已存在的數據,通過資料庫異常來回滾數據。
我們調用init方法,生成數據id=1和id=100以後的數據,如下圖所示。
為了查看我們是不是插入成功,我們查一下資料庫看看。結果如下圖。
數據看起來是沒問題的哈,是我們想要的,從1-5, 100-105
2.1.3 測試事務
數據已經準備好了,我們來進行事務測試,看看現在只有R2DBC的時候,事務是否生效。
我們來刪除id=1,然後保存id=100的情況試一下看看。結果如圖所示。
通過日誌,我們看到結果的確是我們想要的,當id2=100的時候,拋出了Dulicate entry異常, 那我們在查詢一下資料庫,看看資料庫的數據是否有刪除掉。
結果還是用圖展示。
我們通過查看資料庫的查詢記錄,發現id=1數據沒有刪除。那也說明了事務是生效的,在正常情況下,發生異常不會提交事務。
2.2 引入MongoDb
略…感興趣的老哥參考Spring Data MongoDb引入MongoDB
2.2.1 開啟MongoDb事務
官方文檔中有這樣一句話:
Unless you specify a
MongoTransactionManager
within your application context, transaction support is DISABLED. You can usesetSessionSynchronization(ALWAYS)
to participate in ongoing non-native MongoDB transactions.
需要手動指定MongoTransactionManager
,否則不可用。 引入事務,參考文檔,需要如下程式碼。
@Bean
MongoTransactionManager transactionManager(MongoDatabaseFactory dbFactory) {
return new MongoTransactionManager(dbFactory);
}
我們按照文檔指示,在項目中添加了如下程式碼。因為我們用的是Webflux,所以我們創建的是Reactive的。
@EnableReactiveMongoRepositories
@Configuration
public class MongoConfig {
@Bean
ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory dbFactory) {
return new ReactiveMongoTransactionManager(dbFactory);
}
}
這樣,我們MongoDB的事物也搞定了,直接美滋滋,上手開干CRUD。
2.2.2 再來一次—-測試數據刪除
我們引入了新的數據源,本該美滋滋的,但是,問題也來了。我們在來進行一次數據刪除操作。這次刪除,我們修改一下Id,刪除id=2和添加id=102的。測試如下圖所示。
我們再一次看到了同樣的情況,拋出了異常Duplicate entry,是我們預期的結果。那我們接著看看資料庫的數據。如下圖所示。
這個時候我們在查詢數據,發現id=2的數據已經被刪除了。這次事務沒有回滾! 真是F了個K,啥情況呢?我們得一探究竟。
三、智取謎底
我們帶著問題來找原因,現在事務失效了,項目能起來,沒有報錯。那麼最有的可能那就是TransactionalOperator
失效了,TransactionalOperator
是Spring幫我們初始化的,我們要找問題,那就得要看看這個TransactionalOperator
是如何初始化的了
3.1 看源碼找原因
3.1.1 從根本入手
我們直接從TransactionalOperator
程式碼進入,發現其需要傳入ReactiveTransactionManager
,部分程式碼如下。
final class TransactionalOperatorImpl implements TransactionalOperator {
private final ReactiveTransactionManager transactionManager;
private final TransactionDefinition transactionDefinition;
/**
* Construct a new TransactionTemplate using the given transaction manager,
* taking its default settings from the given transaction definition.
* @param transactionManager the transaction management strategy to be used
* @param transactionDefinition the transaction definition to copy the
* default settings from. Local properties can still be set to change values.
*/
TransactionalOperatorImpl(ReactiveTransactionManager transactionManager, TransactionDefinition transactionDefinition) {
this.transactionManager = transactionManager;
this.transactionDefinition = transactionDefinition;
}
}
按照一般邏輯來說,事務是放在TransactionManager中來管理的,這個符合我們的預期,我們接著看看TransactionManager的實現類有哪些。經過查看,發現有R2dbcTransactionManager實現。如下圖所示。
![TransactionManager實現類]](//img2022.cnblogs.com/blog/1495071/202208/1495071-20220827211141944-495282422.jpg)
3.1.2 按照猜想繼續
我們找到了R2dbcTransactionManager,那我們就有兩個思路。
1、查看其實現方式,有哪些需要我們關注的,哪些因素是可能造成事務不生效。
2、啟動方式。因為R2dbcTransactionManager初始化是交由SpringBoot實現,那會不會有什麼特別之處。
3.1.2.1 思路1
我們打開R2dbcTransactionManager程式碼,發現其實現沒有特別之處。部分程式碼如下。
public class R2dbcTransactionManager extends AbstractReactiveTransactionManager implements InitializingBean {
@Nullable
private ConnectionFactory connectionFactory;
/**
* Create a new {@code R2dbcTransactionManager} instance.
* A ConnectionFactory has to be set to be able to use it.
* @see #setConnectionFactory
*/
public R2dbcTransactionManager() {}
/**
* Create a new {@code R2dbcTransactionManager} instance.
* @param connectionFactory the R2DBC ConnectionFactory to manage transactions for
*/
public R2dbcTransactionManager(ConnectionFactory connectionFactory) {
this();
setConnectionFactory(connectionFactory);
afterPropertiesSet();
}
}
可以看到,無參初始化可以不需要ConnectionFactory,也可以傳入ConnectionFactory進行初始化。 也沒有什麼特別之處。
3.1.2.2 思路2
我們看完其實現,並沒有特別之處,那就看它初始化有什麼特別的地方。Double Shift 來一波,我們看到了有AutoConfiguration,來讓我們瞧一瞧。
我們點進去瞧一瞧,便發現了端倪,嘴上一句 原來如此 蹦了出來。部分程式碼如下。
public class R2dbcTransactionManagerAutoConfiguration {
@Bean
@ConditionalOnMissingBean(ReactiveTransactionManager.class)
public R2dbcTransactionManager connectionFactoryTransactionManager(ConnectionFactory connectionFactory) {
return new R2dbcTransactionManager(connectionFactory);
}
}
我們看到,其初始化的時候,採用了ConditionalOnMissingBean,只有在沒有ReactiveTransactionManager的時候才會初始化。但是我們在初始化MongoDB事務的時候,已經初始化過ReactiveTransactionManager了啊!趕緊看看ReactiveMongoTransactionManager。
打開ReactiveMongoTransactionManager程式碼,果然如此。程式碼如下。
public class ReactiveMongoTransactionManager extends AbstractReactiveTransactionManager implements InitializingBean {
// ...略
}
AbstractReactiveTransactionManager這個不就是ReactiveTransactionManager嘛, 已經初始化過一次了,所以導致R2dbcTransactionManager無法進行初始化,所以TransactionalOperatorImpl裡面傳入的不是R2dbcTransactionManager,那肯定對mysql無法失誤操作了啊。
3.1.3 怎麼辦?
至此,我們已經找到原因了,但是,這也緊緊是猜想。我們還是得分2步驟來啊!!
- 1、針對問題,提出具體的解決方案,並實現
- 2、針對實現的方案進行驗證
3.1.3.1 解決方案
我們知道事務沒有實現的原因是R2dbcTransactionManager沒有初始化,然後再TransactionalOperatorImpl種注入的不是R2dbcTransactionManager,那麼我們就自己動手初始化Bean。
我們創建2個對象,分別為MongoConfig和R2dbcConfig,程式碼如下所示。
R2dbcConfig:
/**
* <br>r2dbc 配置</br>
*
* @author [email protected]
* @since 2022/8/27
*/
@EnableR2dbcRepositories
@Configuration
public class R2dbcConfig {
@Bean("r2dbcTransactionManager")
public R2dbcTransactionManager transactionManager(ConnectionFactory pool) {
return new R2dbcTransactionManager(pool);
}
@Bean("r2dbcTransactionalOperator")
public TransactionalOperator transactionalOperator(R2dbcTransactionManager transactionManager){
return TransactionalOperator.create(transactionManager);
}
}
MongoConfig:
/**
* <br>mongo transaction manager</br>
*
* @author [email protected]
* @since 2022/8/27
*/
@EnableReactiveMongoRepositories
@Configuration
public class MongoConfig {
@Bean("mongoTransactionManager")
public ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory dbFactory) {
return new ReactiveMongoTransactionManager(dbFactory);
}
@Bean("mongoTransactionalOperator")
public TransactionalOperator transactionalOperator(ReactiveMongoTransactionManager transactionManager){
return TransactionalOperator.create(transactionManager);
}
}
我們通過別名的方式,創建兩個TransactionalOperator,這樣就可以解決R2bdc無法自動創建TransactionManager的問題。
3.1.3.2 驗證
我們在Controller中的TransactionalOperator指定名稱。程式碼如下所示。
@RestController
public class TransactionController {
@Autowired
private UserR2dbcRepository repository;
@Autowired
@Qualifier("r2dbcTransactionalOperator") // 在這指定使用哪個operator
private TransactionalOperator operator;
// ... 略
}
指定了具體的名稱,我們就可以接著在來測試一次。這次我們刪除Id=3,然後添加id=103的數據試試看。測試過程如下圖。
還是和我們剛一下,出現了Duplicate entry的問題。我們要關注事物是否回滾。
接下來就是激動人心的時刻,我們直接查庫,看看事務是否回滾了。結果如下圖所示。
哇喔!棒!我們看到,資料庫查詢出來的結果中還是包含了Id=3的數據,那完全說明了事務回滾了!
至此我們的問題算是完全解決了,舒坦!(心裡長舒一口氣,解決問題就這麼簡單?)
3.2 偷雞
看了這麼多,我們都是手動,一步步驗證結果的,哪有沒有快捷的方式呢?說到這,那肯定是有的。
在使用R2dbc的時候,我們其實是沒有添加日誌的。我們可以打開日誌。可以看到操作是記錄了完整的日誌。我們添加日誌配置(log配置文件自己添加一下)。
logging.level.org.springframework.r2dbc=debug
3.2.1 再次驗證
添加完日誌,我們在執行一下刪除id=3,添加id=104的操作,看看日誌記錄了什麼。貼出來測試結果。
我們可以看到,日誌中清晰的記錄著,創建事務,回滾事務!完全驗證了我們的操作方案是對的,NO爬不浪~!
上述的所有操作,都可以通過日誌驗證,我就不一步步驗證,大家可以自己試驗一下~
四、總結
在使用新東西的時候,還是要多實驗,驗證結果!
遇到問題,不要慌,一步步來,就是干!
如有問題,歡迎指正,交流。