(WebFlux)003、多數據源R2dbc事務失效分析

一、背景

最近項目持續改造,然後把SpringMVC換成了SpringWebflux,然後把Mybatis換成了R2dbc。中間沒有遇到什麼問題,一切都那麼的美滋滋,直到最近一個新需求的出現,打破了往日的寧靜。

在對需求分析了一番後,需要引入新的數據源,那就是MongoDb。然後出現了MongoDb、Mysql兩種數據源,然後原來好好的事物操作就芭比Q(完蛋)了。細細來分析一下原因與解決方法。

題外話:在本地測試的時候強烈建議用虛擬機+Docker來安裝MySql與MongoDb,不然Mac直連docker真的麻煩啊!!~

SpringBoot 版本號: 2.6.10, (本文基於已經會在項目中使用R2DBC與MongoDb)

二、武松打虎

2.1 單獨solo Mysql

我們創建了一個測試庫r2dbc_test,裡面有一個user表。

# 創建測試庫
create database r2dbc_test;

# 創建表
create table r2dbc_test.user(
    id int auto_increment primary key ,
    name varchar(12)
);

2.1.1 項目引入R2dbc

略..給出鏈接,如果感興趣可以看看,Spring Data R2DBC,(實在太多,這個時間點懶得寫了,後面有時間再補一下)

2.1.2 測試程式碼

創建表對結構對應實體類:user

@Data
@Table("user")
@NoArgsConstructor
@AllArgsConstructor
public class User implements Persistable<Integer> {
    @Id
    private Integer id;
    private String name;

    @Override
    public boolean isNew() {
        return true;
    }
}

這裡面有個坑點,那就是為什麼實現org.springframework.data.domain.Persistable這個介面呢,先賣個關子,看完Repository後在描述哈。

Repository如下程式碼所示。

/**
 * <br>User Repository</br>
 *
 * @author [email protected]
 * @since 2022/8/26
 */
@Repository
public interface UserR2dbcRepository extends R2dbcRepository<User, Integer> {

}

我們直接使用了Spring提供好的org.springframework.data.r2dbc.repository.R2dbcRepository,裡面有一些基礎的實現類。我們在測試的時候使用了org.springframework.data.repository.reactive.ReactiveCrudRepository#save()方法,這個方法會去判斷這個實體對象是不是new object,如果不是,則會去Update。而判斷的方法則是org.springframework.data.domain.Persistable#isNew()方法。所以這就是我們為啥要實現這個介面。

接著寫一個簡單測試的Controller,程式碼如下所示。

@RestController
@EnableR2dbcRepositories
public class TransactionController {
    @Autowired
    private UserR2dbcRepository repository;
    @Autowired
    private TransactionalOperator operator;

    // 根據seed當做初始ID,初始化資料庫對象, 便於測試
    @RequestMapping("/r2dbc/init")
    public Flux<User> init(Integer seed) {
        Flux<User> userFlux = Flux.range(seed, 5).map(id -> new User(id,"name" + id))
                .flatMap(repository::save);
        return userFlux;
    }

    // 先刪除一條記錄, 然後在添加一條記錄
    @RequestMapping("/r2dbc/delete")
    public Mono<User> delete(Integer id1, Integer id2) {
        Mono<Void> id1Mono = repository.deleteById(id1);
        Mono<User> id2Mono = repository.save(new User(id2, "name" + id2));
        return id1Mono.then(id2Mono).as(operator::transactional);
    }
}

不要糾結沒有service啥的哈,我們僅僅為了測試哈。兩個方法

  • 方法一:init, 用seed當做起始Id, 然後在資料庫生成數據存儲起來
  • 方法二:delete, 先刪處一條數據,然後在插入一條已存在的數據,通過資料庫異常來回滾數據。

我們調用init方法,生成數據id=1和id=100以後的數據,如下圖所示。

生成測試數據

為了查看我們是不是插入成功,我們查一下資料庫看看。結果如下圖。

查詢資料庫測試數據

數據看起來是沒問題的哈,是我們想要的,從1-5, 100-105

2.1.3 測試事務

數據已經準備好了,我們來進行事務測試,看看現在只有R2DBC的時候,事務是否生效。

我們來刪除id=1,然後保存id=100的情況試一下看看。結果如圖所示。

刪除事物操作

通過日誌,我們看到結果的確是我們想要的,當id2=100的時候,拋出了Dulicate entry異常, 那我們在查詢一下資料庫,看看資料庫的數據是否有刪除掉。

結果還是用圖展示。

發生刪除異常

我們通過查看資料庫的查詢記錄,發現id=1數據沒有刪除。那也說明了事務是生效的,在正常情況下,發生異常不會提交事務。

2.2 引入MongoDb

略…感興趣的老哥參考Spring Data MongoDb引入MongoDB

2.2.1 開啟MongoDb事務

官方文檔中有這樣一句話:

Unless you specify a MongoTransactionManager within your application context, transaction support is DISABLED. You can use setSessionSynchronization(ALWAYS) to participate in ongoing non-native MongoDB transactions.

需要手動指定MongoTransactionManager,否則不可用。 引入事務,參考文檔,需要如下程式碼。

@Bean
MongoTransactionManager transactionManager(MongoDatabaseFactory dbFactory) {  
  return new MongoTransactionManager(dbFactory);
}

我們按照文檔指示,在項目中添加了如下程式碼。因為我們用的是Webflux,所以我們創建的是Reactive的。

@EnableReactiveMongoRepositories
@Configuration
public class MongoConfig {
    @Bean
    ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory dbFactory) {
        return new ReactiveMongoTransactionManager(dbFactory);
    }
}

這樣,我們MongoDB的事物也搞定了,直接美滋滋,上手開干CRUD。

2.2.2 再來一次—-測試數據刪除

我們引入了新的數據源,本該美滋滋的,但是,問題也來了。我們在來進行一次數據刪除操作。這次刪除,我們修改一下Id,刪除id=2和添加id=102的。測試如下圖所示。

刪除Id=2和添加Id=102的數據

我們再一次看到了同樣的情況,拋出了異常Duplicate entry,是我們預期的結果。那我們接著看看資料庫的數據。如下圖所示。

添加Mongo後刪除數據

這個時候我們在查詢數據,發現id=2的數據已經被刪除了。這次事務沒有回滾! 真是F了個K,啥情況呢?我們得一探究竟。

三、智取謎底

我們帶著問題來找原因,現在事務失效了,項目能起來,沒有報錯。那麼最有的可能那就是TransactionalOperator失效了,TransactionalOperator是Spring幫我們初始化的,我們要找問題,那就得要看看這個TransactionalOperator是如何初始化的了

3.1 看源碼找原因

3.1.1 從根本入手

我們直接從TransactionalOperator程式碼進入,發現其需要傳入ReactiveTransactionManager,部分程式碼如下。

final class TransactionalOperatorImpl implements TransactionalOperator {

	private final ReactiveTransactionManager transactionManager;
	private final TransactionDefinition transactionDefinition;

	/**
	 * Construct a new TransactionTemplate using the given transaction manager,
	 * taking its default settings from the given transaction definition.
	 * @param transactionManager the transaction management strategy to be used
	 * @param transactionDefinition the transaction definition to copy the
	 * default settings from. Local properties can still be set to change values.
	 */
	TransactionalOperatorImpl(ReactiveTransactionManager transactionManager, TransactionDefinition transactionDefinition) {
		
		this.transactionManager = transactionManager;
		this.transactionDefinition = transactionDefinition;
	}
}

按照一般邏輯來說,事務是放在TransactionManager中來管理的,這個符合我們的預期,我們接著看看TransactionManager的實現類有哪些。經過查看,發現有R2dbcTransactionManager實現。如下圖所示。

![TransactionManager實現類]](//img2022.cnblogs.com/blog/1495071/202208/1495071-20220827211141944-495282422.jpg)

3.1.2 按照猜想繼續

我們找到了R2dbcTransactionManager,那我們就有兩個思路。

1、查看其實現方式,有哪些需要我們關注的,哪些因素是可能造成事務不生效。

2、啟動方式。因為R2dbcTransactionManager初始化是交由SpringBoot實現,那會不會有什麼特別之處。

3.1.2.1 思路1

我們打開R2dbcTransactionManager程式碼,發現其實現沒有特別之處。部分程式碼如下。

public class R2dbcTransactionManager extends AbstractReactiveTransactionManager implements InitializingBean {

	@Nullable
	private ConnectionFactory connectionFactory;
	/**
	 * Create a new {@code R2dbcTransactionManager} instance.
	 * A ConnectionFactory has to be set to be able to use it.
	 * @see #setConnectionFactory
	 */
	public R2dbcTransactionManager() {}
	/**
	 * Create a new {@code R2dbcTransactionManager} instance.
	 * @param connectionFactory the R2DBC ConnectionFactory to manage transactions for
	 */
	public R2dbcTransactionManager(ConnectionFactory connectionFactory) {
		this();
		setConnectionFactory(connectionFactory);
		afterPropertiesSet();
	}
}

可以看到,無參初始化可以不需要ConnectionFactory,也可以傳入ConnectionFactory進行初始化。 也沒有什麼特別之處。

3.1.2.2 思路2

我們看完其實現,並沒有特別之處,那就看它初始化有什麼特別的地方。Double Shift 來一波,我們看到了有AutoConfiguration,來讓我們瞧一瞧。

R2dbcTransactionManagerAutoConfiguration

我們點進去瞧一瞧,便發現了端倪,嘴上一句 原來如此 蹦了出來。部分程式碼如下。

public class R2dbcTransactionManagerAutoConfiguration {
	@Bean
	@ConditionalOnMissingBean(ReactiveTransactionManager.class)
	public R2dbcTransactionManager connectionFactoryTransactionManager(ConnectionFactory connectionFactory) {
		return new R2dbcTransactionManager(connectionFactory);
	}
}

我們看到,其初始化的時候,採用了ConditionalOnMissingBean,只有在沒有ReactiveTransactionManager的時候才會初始化。但是我們在初始化MongoDB事務的時候,已經初始化過ReactiveTransactionManager了啊!趕緊看看ReactiveMongoTransactionManager。

打開ReactiveMongoTransactionManager程式碼,果然如此。程式碼如下。

public class ReactiveMongoTransactionManager extends AbstractReactiveTransactionManager implements InitializingBean {
	// ...略
}

AbstractReactiveTransactionManager這個不就是ReactiveTransactionManager嘛, 已經初始化過一次了,所以導致R2dbcTransactionManager無法進行初始化,所以TransactionalOperatorImpl裡面傳入的不是R2dbcTransactionManager,那肯定對mysql無法失誤操作了啊。

3.1.3 怎麼辦?

至此,我們已經找到原因了,但是,這也緊緊是猜想。我們還是得分2步驟來啊!!

  • 1、針對問題,提出具體的解決方案,並實現
  • 2、針對實現的方案進行驗證
3.1.3.1 解決方案

我們知道事務沒有實現的原因是R2dbcTransactionManager沒有初始化,然後再TransactionalOperatorImpl種注入的不是R2dbcTransactionManager,那麼我們就自己動手初始化Bean。

我們創建2個對象,分別為MongoConfig和R2dbcConfig,程式碼如下所示。

R2dbcConfig:

/**
 * <br>r2dbc 配置</br>
 *
 * @author [email protected]
 * @since 2022/8/27
 */
@EnableR2dbcRepositories
@Configuration
public class R2dbcConfig {
  
    @Bean("r2dbcTransactionManager")
    public R2dbcTransactionManager transactionManager(ConnectionFactory pool) {
        return new R2dbcTransactionManager(pool);
    }

    @Bean("r2dbcTransactionalOperator")
    public TransactionalOperator transactionalOperator(R2dbcTransactionManager transactionManager){
        return TransactionalOperator.create(transactionManager);
    }
}

MongoConfig:

/**
 * <br>mongo transaction manager</br>
 *
 * @author [email protected]
 * @since 2022/8/27
 */
@EnableReactiveMongoRepositories
@Configuration
public class MongoConfig {

    @Bean("mongoTransactionManager")
    public ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory dbFactory) {
        return new ReactiveMongoTransactionManager(dbFactory);
    }

    @Bean("mongoTransactionalOperator")
    public TransactionalOperator transactionalOperator(ReactiveMongoTransactionManager transactionManager){
        return TransactionalOperator.create(transactionManager);
    }
}

我們通過別名的方式,創建兩個TransactionalOperator,這樣就可以解決R2bdc無法自動創建TransactionManager的問題。

3.1.3.2 驗證

我們在Controller中的TransactionalOperator指定名稱。程式碼如下所示。

@RestController
public class TransactionController {
    @Autowired
    private UserR2dbcRepository repository;
    
    @Autowired
    @Qualifier("r2dbcTransactionalOperator") // 在這指定使用哪個operator
    private TransactionalOperator operator;
		// ... 略
}

指定了具體的名稱,我們就可以接著在來測試一次。這次我們刪除Id=3,然後添加id=103的數據試試看。測試過程如下圖。

刪除Id=3,添加Id=103數據

還是和我們剛一下,出現了Duplicate entry的問題。我們要關注事物是否回滾。

接下來就是激動人心的時刻,我們直接查庫,看看事務是否回滾了。結果如下圖所示。

驗證結果

哇喔!棒!我們看到,資料庫查詢出來的結果中還是包含了Id=3的數據,那完全說明了事務回滾了!

至此我們的問題算是完全解決了,舒坦!(心裡長舒一口氣,解決問題就這麼簡單?)

3.2 偷雞

看了這麼多,我們都是手動,一步步驗證結果的,哪有沒有快捷的方式呢?說到這,那肯定是有的。

在使用R2dbc的時候,我們其實是沒有添加日誌的。我們可以打開日誌。可以看到操作是記錄了完整的日誌。我們添加日誌配置(log配置文件自己添加一下)。

logging.level.org.springframework.r2dbc=debug

3.2.1 再次驗證

添加完日誌,我們在執行一下刪除id=3,添加id=104的操作,看看日誌記錄了什麼。貼出來測試結果。

我們可以看到,日誌中清晰的記錄著,創建事務,回滾事務!完全驗證了我們的操作方案是對的,NO爬不浪~!

上述的所有操作,都可以通過日誌驗證,我就不一步步驗證,大家可以自己試驗一下~

四、總結

在使用新東西的時候,還是要多實驗,驗證結果!

遇到問題,不要慌,一步步來,就是干!

如有問題,歡迎指正,交流。