使用Seata徹底解決Spring Cloud中的分佈式事務問題!

  • 2019 年 11 月 23 日
  • 筆記

Seata是Alibaba開源的一款分佈式事務解決方案,致力於提供高性能和簡單易用的分佈式事務服務,本文將通過一個簡單的下單業務場景來對其用法進行詳細介紹。

什麼是分佈式事務問題?

單體應用

單體應用中,一個業務操作需要調用三個模塊完成,此時數據的一致性由本地事務來保證。

微服務應用

隨着業務需求的變化,單體應用被拆分成微服務應用,原來的三個模塊被拆分成三個獨立的應用,分別使用獨立的數據源,業務操作需要調用三個服務來完成。此時每個服務內部的數據一致性由本地事務來保證,但是全局的數據一致性問題沒法保證。

小結

在微服務架構中由於全局數據一致性沒法保證產生的問題就是分佈式事務問題。簡單來說,一次業務操作需要操作多個數據源或需要進行遠程調用,就會產生分佈式事務問題。

Seata簡介

Seata 是一款開源的分佈式事務解決方案,致力於提供高性能和簡單易用的分佈式事務服務。Seata 將為用戶提供了 AT、TCC、SAGA 和 XA 事務模式,為用戶打造一站式的分佈式解決方案。

Seata原理和設計

定義一個分佈式事務

我們可以把一個分佈式事務理解成一個包含了若干分支事務的全局事務,全局事務的職責是協調其下管轄的分支事務達成一致,要麼一起成功提交,要麼一起失敗回滾。此外,通常分支事務本身就是一個滿足ACID的本地事務。這是我們對分佈式事務結構的基本認識,與 XA 是一致的。

協議分佈式事務處理過程的三個組件

  • Transaction Coordinator (TC):事務協調器,維護全局事務的運行狀態,負責協調並驅動全局事務的提交或回滾;
  • Transaction Manager (TM):控制全局事務的邊界,負責開啟一個全局事務,並最終發起全局提交或全局回滾的決議;
  • Resource Manager (RM):控制分支事務,負責分支註冊、狀態彙報,並接收事務協調器的指令,驅動分支(本地)事務的提交和回滾。

一個典型的分佈式事務過程

  • TM 向 TC 申請開啟一個全局事務,全局事務創建成功並生成一個全局唯一的 XID;
  • XID 在微服務調用鏈路的上下文中傳播;
  • RM 向 TC 註冊分支事務,將其納入 XID 對應全局事務的管轄;
  • TM 向 TC 發起針對 XID 的全局提交或回滾決議;
  • TC 調度 XID 下管轄的全部分支事務完成提交或回滾請求。

seata-server的安裝與配置

  • 我們先從官網下載seata-server,這裡下載的是seata-server-0.9.0.zip,下載地址:https://github.com/seata/seata/releases
  • 這裡我們使用Nacos作為註冊中心,Nacos的安裝及使用可以參考:Spring Cloud Alibaba:Nacos 作為註冊中心和配置中心使用
  • 解壓seata-server安裝包到指定目錄,修改conf目錄下的file.conf配置文件,主要修改自定義事務組名稱,事務日誌存儲模式為db及數據庫連接信息;
service {    #vgroup->rgroup    vgroup_mapping.fsp_tx_group = "default" #修改事務組名稱為:fsp_tx_group,和客戶端自定義的名稱對應    #only support single node    default.grouplist = "127.0.0.1:8091"    #degrade current not support    enableDegrade = false    #disable    disable = false    #unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent    max.commit.retry.timeout = "-1"    max.rollback.retry.timeout = "-1"  }    ## transaction log store  store {    ## store mode: file、db    mode = "db" #修改此處將事務信息存儲到數據庫中      ## database store    db {      ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.      datasource = "dbcp"      ## mysql/oracle/h2/oceanbase etc.      db-type = "mysql"      driver-class-name = "com.mysql.jdbc.Driver"      url = "jdbc:mysql://localhost:3306/seat-server" #修改數據庫連接地址      user = "root" #修改數據庫用戶名      password = "root" #修改數據庫密碼      min-conn = 1      max-conn = 3      global.table = "global_table"      branch.table = "branch_table"      lock-table = "lock_table"      query-limit = 100    }  }
  • 由於我們使用了db模式存儲事務日誌,所以我們需要創建一個seat-server數據庫,建表sql在seata-server的/conf/db_store.sql中;
  • 修改conf目錄下的registry.conf配置文件,指明註冊中心為nacos,及修改nacos連接信息即可;
registry {    # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa    type = "nacos" #改為nacos      nacos {      serverAddr = "localhost:8848" #改為nacos的連接地址      namespace = ""      cluster = "default"    }  }
  • 先啟動Nacos,再使用seata-server中/bin/seata-server.bat文件啟動seata-server。

數據庫準備

創建業務數據庫

  • seat-order:存儲訂單的數據庫;
  • seat-storage:存儲庫存的數據庫;
  • seat-account:存儲賬戶信息的數據庫。

初始化業務表

order表

CREATE TABLE `order` (    `id` bigint(11) NOT NULL AUTO_INCREMENT,    `user_id` bigint(11) DEFAULT NULL COMMENT '用戶id',    `product_id` bigint(11) DEFAULT NULL COMMENT '產品id',    `count` int(11) DEFAULT NULL COMMENT '數量',    `money` decimal(11,0) DEFAULT NULL COMMENT '金額',    PRIMARY KEY (`id`)  ) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8;    ALTER TABLE `order` ADD COLUMN `status` int(1) DEFAULT NULL COMMENT '訂單狀態:0:創建中;1:已完結' AFTER `money` ;

storage表

CREATE TABLE `storage` (                           `id` bigint(11) NOT NULL AUTO_INCREMENT,                           `product_id` bigint(11) DEFAULT NULL COMMENT '產品id',                           `total` int(11) DEFAULT NULL COMMENT '總庫存',                           `used` int(11) DEFAULT NULL COMMENT '已用庫存',                           `residue` int(11) DEFAULT NULL COMMENT '剩餘庫存',                           PRIMARY KEY (`id`)  ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;    INSERT INTO `seat-storage`.`storage` (`id`, `product_id`, `total`, `used`, `residue`) VALUES ('1', '1', '100', '0', '100');

account表

CREATE TABLE `account` (    `id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT 'id',    `user_id` bigint(11) DEFAULT NULL COMMENT '用戶id',    `total` decimal(10,0) DEFAULT NULL COMMENT '總額度',    `used` decimal(10,0) DEFAULT NULL COMMENT '已用餘額',    `residue` decimal(10,0) DEFAULT '0' COMMENT '剩餘可用額度',    PRIMARY KEY (`id`)  ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;    INSERT INTO `seat-account`.`account` (`id`, `user_id`, `total`, `used`, `residue`) VALUES ('1', '1', '1000', '0', '1000');

創建日誌回滾表

使用Seata還需要在每個數據庫中創建日誌表,建表sql在seata-server的/conf/db_undo_log.sql中。

完整數據庫示意圖

製造一個分佈式事務問題

這裡我們會創建三個服務,一個訂單服務,一個庫存服務,一個賬戶服務。當用戶下單時,會在訂單服務中創建一個訂單,然後通過遠程調用庫存服務來扣減下單商品的庫存,再通過遠程調用賬戶服務來扣減用戶賬戶裏面的餘額,最後在訂單服務中修改訂單狀態為已完成。該操作跨越三個數據庫,有兩次遠程調用,很明顯會有分佈式事務問題。

客戶端配置

  • 對seata-order-service、seata-storage-service和seata-account-service三個seata的客戶端進行配置,它們配置大致相同,我們下面以seata-order-service的配置為例;
  • 修改application.yml文件,自定義事務組的名稱;
spring:    cloud:      alibaba:        seata:          tx-service-group: fsp_tx_group #自定義事務組名稱需要與seata-server中的對應
  • 添加並修改file.conf配置文件,主要是修改自定義事務組名稱;
service {    #vgroup->rgroup    vgroup_mapping.fsp_tx_group = "default" #修改自定義事務組名稱    #only support single node    default.grouplist = "127.0.0.1:8091"    #degrade current not support    enableDegrade = false    #disable    disable = false    #unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent    max.commit.retry.timeout = "-1"    max.rollback.retry.timeout = "-1"    disableGlobalTransaction = false  }
  • 添加並修改registry.conf配置文件,主要是將註冊中心改為nacos;
registry {    # file 、nacos 、eureka、redis、zk    type = "nacos" #修改為nacos      nacos {      serverAddr = "localhost:8848" #修改為nacos的連接地址      namespace = ""      cluster = "default"    }  }
  • 在啟動類中取消數據源的自動創建:
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)  @EnableDiscoveryClient  @EnableFeignClients  public class SeataOrderServiceApplication {        public static void main(String[] args) {          SpringApplication.run(SeataOrderServiceApplication.class, args);      }    }
  • 創建配置使用Seata對數據源進行代理:
/**   * 使用Seata對數據源進行代理   * Created by macro on 2019/11/11.   */  @Configuration  public class DataSourceProxyConfig {        @Value("${mybatis.mapperLocations}")      private String mapperLocations;        @Bean      @ConfigurationProperties(prefix = "spring.datasource")      public DataSource druidDataSource(){          return new DruidDataSource();      }        @Bean      public DataSourceProxy dataSourceProxy(DataSource dataSource) {          return new DataSourceProxy(dataSource);      }        @Bean      public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {          SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();          sqlSessionFactoryBean.setDataSource(dataSourceProxy);          sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver()                  .getResources(mapperLocations));          sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory());          return sqlSessionFactoryBean.getObject();      }    }
  • 使用@GlobalTransactional註解開啟分佈式事務:
package com.macro.cloud.service.impl;    import com.macro.cloud.dao.OrderDao;  import com.macro.cloud.domain.Order;  import com.macro.cloud.service.AccountService;  import com.macro.cloud.service.OrderService;  import com.macro.cloud.service.StorageService;  import io.seata.spring.annotation.GlobalTransactional;  import org.slf4j.Logger;  import org.slf4j.LoggerFactory;  import org.springframework.beans.factory.annotation.Autowired;  import org.springframework.stereotype.Service;    /**   * 訂單業務實現類   * Created by macro on 2019/11/11.   */  @Service  public class OrderServiceImpl implements OrderService {        private static final Logger LOGGER = LoggerFactory.getLogger(OrderServiceImpl.class);        @Autowired      private OrderDao orderDao;      @Autowired      private StorageService storageService;      @Autowired      private AccountService accountService;        /**       * 創建訂單->調用庫存服務扣減庫存->調用賬戶服務扣減賬戶餘額->修改訂單狀態       */      @Override      @GlobalTransactional(name = "fsp-create-order",rollbackFor = Exception.class)      public void create(Order order) {          LOGGER.info("------->下單開始");          //本應用創建訂單          orderDao.create(order);            //遠程調用庫存服務扣減庫存          LOGGER.info("------->order-service中扣減庫存開始");          storageService.decrease(order.getProductId(),order.getCount());          LOGGER.info("------->order-service中扣減庫存結束:{}",order.getId());            //遠程調用賬戶服務扣減餘額          LOGGER.info("------->order-service中扣減餘額開始");          accountService.decrease(order.getUserId(),order.getMoney());          LOGGER.info("------->order-service中扣減餘額結束");            //修改訂單狀態為已完成          LOGGER.info("------->order-service中修改訂單狀態開始");          orderDao.update(order.getUserId(),0);          LOGGER.info("------->order-service中修改訂單狀態結束");            LOGGER.info("------->下單結束");      }  }

分佈式事務功能演示

  • 運行seata-order-service、seata-storage-service和seata-account-service三個服務;
  • 數據庫初始信息狀態:
  • 調用接口進行下單操作後查看數據庫:http://localhost:8180/order/create?userId=1&productId=1&count=10&money=100
  • 我們在seata-account-service中製造一個超時異常後,調用下單接口:
/**   * 賬戶業務實現類   * Created by macro on 2019/11/11.   */  @Service  public class AccountServiceImpl implements AccountService {        private static final Logger LOGGER = LoggerFactory.getLogger(AccountServiceImpl.class);      @Autowired      private AccountDao accountDao;        /**       * 扣減賬戶餘額       */      @Override      public void decrease(Long userId, BigDecimal money) {          LOGGER.info("------->account-service中扣減賬戶餘額開始");          //模擬超時異常,全局事務回滾          try {              Thread.sleep(30*1000);          } catch (InterruptedException e) {              e.printStackTrace();          }          accountDao.decrease(userId,money);          LOGGER.info("------->account-service中扣減賬戶餘額結束");      }  }
  • 此時我們可以發現下單後數據庫數據並沒有任何改變;
  • 我們可以在seata-order-service中注釋掉@GlobalTransactional來看看沒有Seata的分佈式事務管理會發生什麼情況:
/**   * 訂單業務實現類   * Created by macro on 2019/11/11.   */  @Service  public class OrderServiceImpl implements OrderService {        /**       * 創建訂單->調用庫存服務扣減庫存->調用賬戶服務扣減賬戶餘額->修改訂單狀態       */      @Override  //    @GlobalTransactional(name = "fsp-create-order",rollbackFor = Exception.class)      public void create(Order order) {          LOGGER.info("------->下單開始");          //省略代碼...          LOGGER.info("------->下單結束");      }  }
  • 由於seata-account-service的超時會導致當庫存和賬戶金額扣減後訂單狀態並沒有設置為已經完成,而且由於遠程調用的重試機制,賬戶餘額還會被多次扣減。

參考資料

Seata官方文檔:https://github.com/seata/seata/wiki

使用到的模塊

springcloud-learning  ├── seata-order-service -- 整合了seata的訂單服務  ├── seata-storage-service -- 整合了seata的庫存服務  └── seata-account-service -- 整合了seata的賬戶服務

項目源碼地址

https://github.com/macrozheng/springcloud-learning