Seata分散式事務框架Sample

前言

阿里官方給出了seata-sample地址,官方自己也對Sample提供了很多類型,可以查看學習。 我這裡選擇演示SpringBoot+MyBatis。
QQ截圖20210608171746.png
該聚合工程共包括5個module:

  • sbm-account-service
  • sbm-business-service
  • sbm-common-service
  • sbm-order-service
  • sbm-storage-service

不同Module之間的服務通訊使用Rest方式通訊。

準備工作

創建資料庫

在sql/all_in_one_sql里是演示中需要的sql腳本,共創建了3個schema: db_account, db_order, db_storage, 分別演示三個不同的資料庫,每個schema里都有一張undo_log表。
QQ截圖20210608172250.png

啟動Seata-Server

Seata-Server扮演TM的角色,在官網下載//seata.io/zh-cn/blog/download.html,最新版本為1.4.2。
QQ截圖20210608172642.png
/conf/registry.conf 中有兩個大的節點registry – 註冊中心配置選項,config – 配置中心配置選項。

# 註冊中心配置
registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "file"

  nacos {
    application = "seata-server"
    serverAddr = "127.0.0.1:8848"
    group = "SEATA_GROUP"
    namespace = ""
    cluster = "default"
    username = ""
    password = ""
  }
  eureka {
    serviceUrl = "//localhost:8761/eureka"
    application = "default"
    weight = "1"
  }
  redis {
    serverAddr = "localhost:6379"
    db = 0
    password = ""
    cluster = "default"
    timeout = 0
  }
  zk {
    cluster = "default"
    serverAddr = "127.0.0.1:2181"
    sessionTimeout = 6000
    connectTimeout = 2000
    username = ""
    password = ""
  }
  consul {
    cluster = "default"
    serverAddr = "127.0.0.1:8500"
    aclToken = ""
  }
  etcd3 {
    cluster = "default"
    serverAddr = "//localhost:2379"
  }
  sofa {
    serverAddr = "127.0.0.1:9603"
    application = "default"
    region = "DEFAULT_ZONE"
    datacenter = "DefaultDataCenter"
    cluster = "default"
    group = "SEATA_GROUP"
    addressWaitTime = "3000"
  }
  file {
    name = "file.conf"
  }
}

# 配置中心配置
config {
  # file、nacos 、apollo、zk、consul、etcd3
  type = "file"

  nacos {
    serverAddr = "127.0.0.1:8848"
    namespace = ""
    group = "SEATA_GROUP"
    username = ""
    password = ""
    dataId = "seataServer.properties"
  }
  consul {
    serverAddr = "127.0.0.1:8500"
    aclToken = ""
  }
  apollo {
    appId = "seata-server"
    ## apolloConfigService will cover apolloMeta
    apolloMeta = "//192.168.1.204:8801"
    apolloConfigService = "//192.168.1.204:8080"
    namespace = "application"
    apolloAccesskeySecret = ""
    cluster = "seata"
  }
  zk {
    serverAddr = "127.0.0.1:2181"
    sessionTimeout = 6000
    connectTimeout = 2000
    username = ""
    password = ""
    nodePath = "/seata/seata.properties"
  }
  etcd3 {
    serverAddr = "//localhost:2379"
  }
  file {
    name = "file.conf"
  }
}

/conf/file.conf – 只有當registry.conf下 config.type=file時才載入file.config中的參數。config.type等於其他值的話則不需要file.config。 seata-server也提供了file.conf.example, 詳細的參數介紹也可以查看//seata.io/zh-cn/docs/user/configurations.html

## transaction log store, only used in seata-server
store {
  ## store mode: file、db、redis
  mode = "file"
  ## rsa decryption public key
  publicKey = ""
  ## file store property
  file {
    ## store location dir
    dir = "sessionStore"
    # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
    maxBranchSessionSize = 16384
    # globe session size , if exceeded throws exceptions
    maxGlobalSessionSize = 512
    # file buffer size , if exceeded allocate new buffer
    fileWriteBufferCacheSize = 16384
    # when recover batch read size
    sessionReloadReadSize = 100
    # async, sync
    flushDiskMode = async
  }

  ## database store property
  db {
    ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp)/HikariDataSource(hikari) etc.
    datasource = "druid"
    ## mysql/oracle/postgresql/h2/oceanbase etc.
    dbType = "mysql"
    driverClassName = "com.mysql.jdbc.Driver"
    ## if using mysql to store the data, recommend add rewriteBatchedStatements=true in jdbc connection param
    url = "jdbc:mysql://127.0.0.1:3306/seata?rewriteBatchedStatements=true"
    user = "mysql"
    password = "mysql"
    minConn = 5
    maxConn = 100
    globalTable = "global_table"
    branchTable = "branch_table"
    lockTable = "lock_table"
    queryLimit = 100
    maxWait = 5000
  }

  ## redis store property
  redis {
    ## redis mode: single、sentinel
    mode = "single"
    ## single mode property
    single {
      host = "127.0.0.1"
      port = "6379"
    }
    ## sentinel mode property
    sentinel {
      masterName = ""
      ## such as "10.28.235.65:26379,10.28.235.65:26380,10.28.235.65:26381"
      sentinelHosts = ""
    }
    password = ""
    database = "0"
    minConn = 1
    maxConn = 10
    maxTotal = 100
    queryLimit = 100
  }
}

啟動服務./bin/seata-server.bat ,默認打開了8091埠。
QQ截圖20210608173152.png

啟動Sample服務

依此啟動account-service,business-service,order-service,storage-service。business-service作為業務邏輯的入口分別調用order-service和storage-service。這裡就看到了關鍵註解GlobalTransactional。
4個服務的默認埠分別是8081,8082, 8083,8084。
QQ截圖20210609110539.png

BusinessService

@Service
public class BusinessService {

    private static final Logger LOGGER = LoggerFactory.getLogger(BusinessService.class);

    @Autowired
    private StorageClient storageClient;
    @Autowired
    private OrderClient orderClient;

    /**
     * 減庫存,下訂單
     *
     * @param userId
     * @param commodityCode
     * @param orderCount
     */
    @GlobalTransactional
    public void purchase(String userId, String commodityCode, int orderCount) {
        LOGGER.info("purchase begin ... xid: " + RootContext.getXID());
        storageClient.deduct(commodityCode, orderCount);
        orderClient.create(userId, commodityCode, orderCount);
    }
}

OrderClient

@Slf4j
@Component
public class OrderClient {

    @Autowired
    private RestTemplate restTemplate;

    public void create(String userId, String commodityCode, int orderCount) {
        String url = "//127.0.0.1:8082/api/order/debit?userId=" + userId + "&commodityCode=" + commodityCode + "&count=" + orderCount;
        try {
            restTemplate.getForEntity(url, Void.class);
        } catch (Exception e) {
            log.error("create url {} ,error:", url);
            throw new RuntimeException();
        }
    }
}

StorageClient

@Slf4j
@Component
public class StorageClient {

    @Autowired
    private RestTemplate restTemplate;

    public void deduct(String commodityCode, int orderCount) {
        System.out.println("business to storage " + RootContext.getXID());
        String url = "//127.0.0.1:8081/api/storage/deduct?commodityCode=" + commodityCode + "&count=" + orderCount;
        try {
            restTemplate.getForEntity(url, Void.class);
        } catch (Exception e) {
            log.error("deduct url {} ,error:", url, e);
            throw new RuntimeException();
        }
    }
}

運行Sample

模擬正常事務提交

運行Sample前,我們先查看下當前三個服務資料庫的數據。
db_account.account_tbl
QQ截圖20210609105849.png
db_storage.storage_tbl
QQ截圖20210609105855.png
業務介面在business-service/BusinessController里,我們先來執行下購買下單正常的提交流程。
QQ截圖20210609110716.png
執行介面//localhost:8094/api/business/purchase/commit/, 執行後,seata-server控制台上會顯示全局事務執行的具體日誌和執行成功的日誌。
QQ截圖20210609111114.png
執行後我們再查看下資料庫。
db_account.account_tbl。 user_id對應1001的賬戶減去了5元(9995)。
QQ截圖20210609111238.png
db_storage.storage_tbl
QQ截圖20210609111351.png
db_order.order_tbl
QQ截圖20210609111421.png

模擬全局事務回滾

執行介面//localhost:8084/api/business/purchase/rollback, 這裡我們想查看undo_log表的數據,則在BusinessSerivce#purchase斷點。
QQ截圖20210609111852.png
db_storage.undo_log
QQ截圖20210609111936.png
QQ截圖20210609112112.png

undo_log里最重要的是關注beforeImage和afterImage節點。

全局事務回滾後,undo_log表會清空數據。