分散式事務實戰

  • 2019 年 12 月 19 日
  • 筆記

轉載本文需註明出處:微信公眾號EAWorld,違者必究。

引言:

微服務倡導將複雜的單體應用拆分為若干個功能簡單、松耦合的服務,這樣可以降低開發難度、增強擴展性、便于敏捷開發,從而被越來越多的開發者和公司推崇運用。但系統微服務化後,一個看似簡單的功能,內部可能需要調用多個服務並操作多個資料庫實現,服務調用的分散式事務問題變的非常突出,幾乎可以說是無法避免。

分散式事務已經成為微服務落地最大的阻礙,也是最具挑戰性的一個技術難題。那麼我們在實際開發中需要如何去應對呢?本文將介紹在實際微服務開發中分散式事務的實戰。

目錄:

1.分散式事務講解

2.分散式事務解決方案-servicecomb-pack

3.分散式事務實戰講解

1. 分散式事務講解

1.1事務原理

在講分散式事務之前,先聊一下事務。簡單講事務是資料庫管理系統執行過程中的一個邏輯單元,它能保證要麼一組資料庫操作全部執行成功,要麼全部失敗,而做到這些的原理就是事務的ACID四大特性。

  • A. Atomic原子性的簡稱,事務作為一個整體來執行,要麼全部成功,要麼全部失敗。
  • C. Consistency一致性的簡稱,事務應確保數據從一個一致的狀態轉變為另一個一致的狀態。
  • I. Isolation隔離性的簡稱,多個事務並發執行時,一個事務的執行不影響其他事務的執行。
  • D.Durability持久性的檢查,已提交的事務修改數據會被持久保存。

1.2傳統單機資料庫事務

在傳統單體應用架構中,我們的業務數據通常都是存儲在一個資料庫中的,應用中的各個模組對資料庫直接進行操作。在這種場景中,事務是由資料庫提供的基於ACID特性來保證的。

例如,在一個用戶購物下單的場景中,涉及到用戶、訂單、支付、庫存等模組的一系列協同操作,如果其中一個模組出現問題,我們就可以通過資料庫提供的事務特性來保證本次下單操作要麼都成功,要麼都失敗。因為這些模組用的是同一個資料庫,所處的是同一個事務管理器,不需要做額外的其他操作就能保證事務的特性。

1.3微服務的分散式事務

從廣義上來講,分散式事務其實也是事務,只是區別於單機事務不同之處是:由於業務上的定義和系統微服務架構的設計,很多大型的業務流程都被拆分成了多個單一的基礎服務,而為了保證每個微服務都能獨立進行開發和部署運行,通常都會採用一個微服務一個資料庫的架構配套,然後將內部服務進行封裝,以Rest api方式對外暴露。這樣以往基於資料庫來實現的數據操作,就變成了多個對外提供微服務的微服務系統之間的協同操作。在這種情況下,原有的單機事務方式已經不能夠使用了,因為多個服務就意味著存在多個事務管理器和多個資源,單個微服務的本地事務管理器只能保證本地事務的ACID,為了在多個服務之間能保證業務的事務性,參與分散式事務的微服務通常會依託協調器來完成相關的一致性協調操作。

那我們在微服務系統實際開發中,如何去實現協調器以處理分散式事務呢,這裡的解決方案是採用華為提供的servicecomb-pack框架來解決這一問題。

2. 分散式事務解決方案:

servicecomb-pack

2.1補償方式

在講servicecomb-pack之前先了解兩個概念:不完美補償(saga)和完美補償(tcc)。

  1. saga:不完美補償,一般在系統中我們會專門為業務邏輯對應寫一個補償邏輯,如果業務邏輯執行失敗,就會去執行這個補償邏輯,我們稱這個補償邏輯為反向操作,這個反向操作同樣會留下操作痕迹,例如:在銀行系統中,客戶去ATM取錢,銀行會先對用戶賬戶進行扣款操作,如果本次取錢不成功,銀行系統會發出一筆沖正操作,將之前扣除的款項打回用戶賬戶,這個沖正操作在交易記錄裡面是開源查詢到的。
  2. tcc:完美補償,cancel階段會徹底清楚之前的業務邏輯操作,用戶是感知不到的。例如:在一個交易平台去發起交易,首先在try階段不會直接去扣除賬戶餘額,而且去檢查用戶的額度並刷新額度,然後在confirm階段才去真正操作賬戶。如果出現異常,那麼在cancel階段就需要去執行業務邏輯來取消try階段產生的後果,釋放在try階段被佔用的額度。整個過程只有等confirm執行完畢,交易才算完成。

2.2servicecomb-pack

servicecomb-pack出自於華為微服務框架servicecomb,是一個開源的分散式事務最終一致性解決方案,該項目已交由Apache軟體基金會孵化,目前已經在apache畢業了。0.3.0版本之前叫servicecomb-saga,現版本已經改名為servicecomb-pack。

servicecomb-pack架構主要包含兩個組件:alpha和Omega

  • alpha:alpha其實就是一個server端,需要用戶自行編譯運行,它的作用就是上述中的分散式事務協調器,主要作用是和Omega客戶端進行通訊,接收omega發過來的事務事件,然後進行持久化存儲事務以及修改協調子事務的狀態,從而保證全局事務中的所有子事務狀態都一致,即要麼全執行完成,要麼全執行失敗。
  • omega:Omega端其實可以看成是一個微服務中內嵌的agent,主要作用是監控本地子事務的執行情況並向alpha-server端發送子事務執行事件以及傳遞全局事務ID,並在異常情況下會根據alpha下發的操作事件進行相應的補償操作。

從上圖中我們大致可以了解整個servicecomb-pack是如何運轉的,但是有一個疑問點,alpha-server端是怎麼知道多個Omega發送過來的子事務是屬於同一個全局事務的呢?其實在分散式事務開始點會生成一個全局事務ID,然後在調用子事務所處的服務時,會把這個全局事務ID傳遞給子事務,然後alpha端會會把這個全局事務ID和Omega傳遞過來的子事務事件綁定並持久化到資料庫中,這樣就會形成一個完整的事務調用鏈,我們通過這個全局事務ID就可以完整的追蹤到整個分散式事務的執行情況。

Omega會以切面編程的方式嚮應用程式注入相關的處理模組,幫助我們構建分散式事務調用的上下文。Omega在事務處理初始階段處理事務的相關準備的操作,在事務執行完畢做一些清理的操作,例如創建分散式事務起始事件,以及相關的子事件,根據事務的執行的成功或者失敗生產相關的事務終止或者失敗事件。這樣帶來的好處是用戶的程式碼只需要添加幾個annotation 來描述分散式事務執行範圍,以及與本地的事務處理恢復的相關函數資訊,Omega就能通過切面注入的程式碼能夠追蹤與本地事務的執行情況。Omega會將本地事務執行的情況以事件的方式通知給Alpha。由於單個Omega不可能知曉一個分散式事務下其他參與服務的執行情況, 這樣就需要Alpha扮演一個十分重要的協調者的角色。Alpha將收集到的分散式事務事件資訊整理匯總,通過分析這些事件之間的關係可以了解到分散式事務的執行情況, Alpha通過向Omega下發相關的執行指令由Omega執行相關提交或恢復操作,實現分散式事務的最終一致性。

在了解的Pack實現的部分細節之後, 我們可以從下圖進一步了解ServiceComb Pack架構下,Alpha與Omega內部各模組之間的關係圖[1]。

整個架構分為三個部分,一個是Alpha協調器,另外一個就是注入到微服務實例中的Omega,以及Alpha與Omega之間的交互協議, 目前ServiceComb Pack支援Saga 以及TCC兩種分散式事務協調協議實現。

Omega包含了與分析用戶分散式事務邏輯相關的事務註解模組(Transaction Annotation)以及事務攔截器(Transaction Interceptor);分散式事務執行相關的事務上下文(Transaction Context),事務回調(Transaction Callback) ,事務執行器(Transaction Executor);以及負責與Alpha進行通訊的事務傳輸(Transaction Transport)模組。

  • 事務註解模組是分散式事務的用戶介面,用戶將這些標註添加到自己的業務程式碼之上用以描述與分散式事務相關的資訊,這樣Omega就可以按照分散式事務的協調要求進行相關的處理。如果大家擴展自己的分散式事務,也可以通過定義自己的事務標註來實現。
  • 事務攔截器這個模組我們可以藉助AOP手段,在用戶標註的程式碼基礎上添加相關的攔截程式碼,獲取到與分散式事務以及本地事務執行相關的資訊,並藉助事務傳輸模組與Alpha進行通訊傳遞事件。
  • 事務上下文為Omega內部提供了一個傳遞事務調用資訊的一個手段,藉助前面提到的全局事務ID以及本地事務ID的對應關係,Alpha可以很容易檢索到與一個分散式事務相關的所有本地事務事件資訊。
  • 事務執行器主要是為了處理事務調用超時設計的模組。由於Alpha與Omega之間的連接有可能不可靠,Alpha端很難判斷Omega本地事務執行超時是由Alpha與Omega直接的網路引起的還是Omega自身調用的問題,因此設計了事務執行器來監控Omega的本地的執行情況,簡化Omega的超時操作。目前Omega的預設實現是直接調用事務方法,由Alpha的後台服務通過掃描事件表的方式來確定事務執行時間是否超時。
  • 事務回調在Omega與Alpha建立連接的時候就會向Alpha進行註冊,當Alpha需要進行相關的協調操作的時候,會直接調用Omega註冊的回調方法進行通訊。由於微服務實例在雲化場景啟停會很頻繁,我們不能假設Alpha一直能找到原有註冊上的事務回調, 因此我們建議微服務實例是無狀態的,這樣Alpha只需要根據服務名就能找到對應的Omega進行通訊。
  • 事務傳輸模組負責Omega與Alpha之間的通訊,在具體的實現過程中,Pack通過定義相關的Grpc描述介面文件定義了TCC 以及Saga的事務交互方法, 同時也定義了與交互相關的事件[2]。

3. 分散式事務實戰

如何在項目中運用servicecomb-pack,需要進行以下步驟:

3.1 alpha-server配置

3.1.1編譯alpha-server

1. 環境準備

  • JDK1.8
  • Maven3.x

2. 源碼獲取

Github地址:https://github.com/apache/servicecomb-pack

$ git clone:https://github.com/apache/servicecomb-pack.git

$ git checkout 0.4.0

3. 修改配置文件

找到alpha-server/src/main/resource/application.yaml,修改datasource資訊為本地資訊即可

4. 本地構建alpha-server

$ cd servicecomb-pack

$ mvn clean install -DskipTests -Pspring-boot-2

在執行完命令後,可在alpha/alpha-server/target/saga/alpha-server-${version}-exec.jar中找到alpha-server的可執行jar包

5. 初始化資料庫

可在alphaalpha-serversrcmainresources目錄下找到schema-mysql.sql和schema-postgresql.sql兩個sql文件,可自行根據所選資料庫進行初始化即可。

6. 啟動alpha-server

java -Dspring.profiles.active=prd -D"spring.datasource.url=jdbc:postgresql://${host_address}:5432/saga?useSSL=false" -jar alpha-server-${saga_version}-exec.jar

*注意:請在執行命令前將${saga_version}和${host_address}更改為實際值

至此,alpha-server全局事務管理器已經啟動成功。

3.1.2替換postgresql為mysql

目前alpha-server支援pg和mysql兩種資料庫,默認為pg,如需改為mysql,需要進行如下操作:

1. 安裝並運行mysql

2. 修改pom文件,添加依賴

alpha-server/pom.xml,添加mysql依賴

dependency>    <groupId>mysql</groupId>    <artifactId>mysql-connector-java</artifactId>    <scope>runtime</scope>    <version>8.0.15</version>  </dependency>

3. 修改配置文件

找到alpha-server/src/main/resource/application.yaml,修改datasource資訊為本地資訊即可

spring:    profiles: mysql    datasource:      username: ${username}      password: ${password}      url: jdbc:mysql://${host_address}:${port}/${database_name}?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8&useSSL=false      platform: mysql      continue-on-error: false      driver-class-name: com.mysql.cj.jdbc.Driver

(左右滑動查看全部程式碼)

*注意:${username},${password},${host_address},${port},${database_name}需替換為實際值

4. 本地構建alpha-server(和上面步驟一致)

5. 啟動alpha-server

java -Dspring.profiles.active=mysql -Dloader.path=./plugins -D"spring.datasource.url=jdbc:mysql://${host_address}:3306/${database_name}? serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8&useSSL=false " -jar alpha-server-${saga_version}-exec.jar

(左右滑動查看全部程式碼)

*注意:請在執行命令前將${saga_version}和${host_address}更改為實際值

至此,alpha-server端已經配置編譯完成。

3.2 Omega配置

配置完alpha-server之後,就相當於分散式事務的協調器已經配置完成,剩下的就是omega的配置,也就是在實際開發中如何運用servicecomb-pack去處理分散式事務。本次講解會結合一個實際案例:購物系統中的下單流程和刪除產品流程來分別講解saga模式和tcc模式如何使用的。

3.2.1 環境準備

本次案例:購物系統是採用分散式微服務架構,整體分為三個微服務應用:orderManage訂單管理應用、productManage產品管理應用、stockManage庫存管理應用

1. 添加依賴

分別在三個應用的pom文件中添加Omega所需的依賴:

<dependency>      <groupId>org.apache.servicecomb.pack</groupId>      <artifactId>omega-spring-starter</artifactId>      <version>${servicecomb-pack.version}</version>  </dependency>  <dependency>      <groupId>org.apache.servicecomb.pack</groupId>      <artifactId>omega-transport-resttemplate</artifactId>      <version>${servicecomb-pack.version}</version>  </dependency>  <!--非必需 -->  <dependency>      <groupId>org.apache.servicecomb.pack</groupId>      <artifactId>omega-spring-cloud-consul-starter</artifactId>      <version>${servicecomb-pack.version}</version>  </dependency>  <dependency>      <groupId>org.apache.servicecomb.pack</groupId>      <artifactId>omega-spring-cloud-eureka-starter</artifactId>      <version>${servicecomb-pack.version}</version>  </dependency>

(左右滑動查看全部程式碼)

*注意:請將${servicecomb-pack.version}更改為實際的版本號(推薦版本為0.4.0)

*注意:如需做集群,omega-spring-cloud-consul-starter和omega-spring-cloud-eureka-starter二選一,視項目的註冊中心而定。

2. 修改配置文件

分別在三個應用的application.yml配置文件中添加alpha-server配置,具體配置如下:

#配置alpha-server地址  alpha:    cluster:      address: 10.15.15.172:8080  omega:    enabled: true

(左右滑動查看全部程式碼)

注意:application.name一定不要過長,因為instanceId的格式是application.name+IP,並且長度為36,否則alpha-server事務持久化會報錯

以上兩個屬性配置為必填,因為alpha-server會依據application.name去查找對應的Omega,其他應用配置自行添加,address可根據alpha-server中的配置實際添加

至此,環境準備已經完畢,下面開始進行應用程式碼編寫。

3.2.2 saga模式程式碼編寫

在本次案例中,我們以一個下單流程來講解saga模式下程式碼是如何編寫的。下單流程包括:點擊下單、查詢庫存、支付、更新庫存;訂單應用作為起始服務,調用庫存應用和產品應用,這兩個應用對應的服務作為參與服務(子事務),在訂單應用下單,訂單應用使用rest template向產品應用發起調用校驗產品庫存,然後訂單應用向庫存應用發起支付請求(子事務1),支付成功後訂單應用再向庫存應用發起請求更新庫存(子事務2)。

1. @SagaStart

首先需要在應用程式碼中描述出saga事務的邊界,作為分散式事務的起始點,因此我們需要在訂單應用中的createOrder()方法上添加該註解@SagaStart:

2. @ Compensable

@Compensable所代表的是本地子事務,因此需要在創建支付和更新庫存的方法上添加此註解來標註該邏輯為子事務,並且在Compensable的compensationMethod屬性中描述補償方法。注意補償方法和本地事務方法的參數必須一致,否則Omega在系統啟動進行參數檢查的時候報找不到恢復方法的錯誤。

支付:

支付對應補償方法:

更新庫存:

更新庫存補償方法:

*注意:實現的服務和補償方法必須滿足冪等的要求

*注意:默認情況下,超時需要顯示聲明

*注意:若全局事務起點與子事務重合,需同時聲明@SagaStart和@Compensable註解

*注意:補償方法的入參必須與try方法入參一致,否則啟動時會報錯(alpha-server找不到補償方法)

3.2.3 tcc模式程式碼編寫

下面我們會以刪除庫存流程來講解tcc模式是如何編寫程式碼的。刪除庫存流程:由產品應用發起(分散式事務起始),調用庫存應用刪除對應產品的庫存資訊(tcc子事務)。

本次調用使用的是feign的方式,因此需要在產品應用中的pom文件添加相應的依賴:

1. @TccStart

我們以產品應用中的delete方法作為分散式事務起始點,因此在該方法上添加註解@TccStart:

2. @ Participate

在子事務所處的方法上添加該註解,並通過confirmMethod 以及cancelMethod屬性定義相關確認以及取消方法名。這裡需要注意的是這裡提到的confirm,cancel方法的參數必須和try方法的相同。

Confirm邏輯:

Cancel邏輯:

*注意:confirm和cancel方法的入參必須和try方法一致

*注意:目前tcc模式還不支援timeout

3.2.4事件資訊獲取

默認情況下,8080埠用來處理Omega處發起的grpc請求,用來做事務上下文等操作;而8090埠則用於處理查詢alpha處的事件資訊。

1. saga-事件資訊查詢api

統計所有事件狀態:

http://${alpha-server.address:port}/saga/stats

統計最近事件狀態:

http://${alpha-server.address:port}/saga/recent

根據事件狀態查詢事件列表:

http://${alpha-server.address:port}/saga/transactions

根據服務名稱查詢對應的分散式事件列表:

http://${alpha-server.address:port}/saga/findTransactions

2. tcc-事件資訊查詢api

Tcc目前沒有提供正式的查詢介面。但是有測試介面,在AlphaTccEventController中,可自行根據測試介面修改源碼,重新編譯即可。

目前alpha-server提供的事件查詢api不多,若有其他需求,用戶可自行編寫介面對資料庫進行查詢。

本文所有觀點都出自個人見解,疏漏、錯誤之處在所難免,歡迎大家指正,希望能夠與大家一起交流和進步。

[1]引用自:

http://servicecomb.apache.org/cn/docs/distributed-transaction-of-services-1/

[2]引用自:

http://servicecomb.apache.org/cn/docs/distributed-transaction-of-services-1/

精選提問:

問1:TCC實現的是強一致事務么?

答:可以這麼理解,tcc分為三個步驟try、confirm、cancel,每個子事務都需要實現這三個步驟。Try部分完成業務的準備工作,confirm部分完成業務的提交,cancel部分完成事務的回滾,只有confirm階段完成才算整個事務的完成。

問2:全局事務起點與子事務重合這個怎麼理解?

答:就是比如一個業務方法即是事務的起點,也屬於這個分散式事務內的一個子事務。這種情況下就需要同時聲明@SagaStart 和 @Compensable 的註解或者@TccStart和@Participate 註解。

問3:cancel操作是怎麼做到的?是通過undo log做的,還是通過補償語句呢?

答:不管是saga模式還是tcc模式下的cance補償邏輯,都是由alpha-server協調器進行下髮指令給omega端;omega端再通過子事務註解內聲明的cancel方法去找到對應的補償方法,然後執行補償方法內的邏輯。這個邏輯肯定是由自己根據當前的業務邏輯去實現的,比如扣庫存,一般就是把扣除的庫存加回去之類的操作。具體底層實現可以參見源碼中的「GrpcCompensateStreamObserver」類。