一個非侵入的Go事務管理庫——工作原理
- 2020 年 6 月 22 日
- 筆記
在上一篇文章「一個非侵入的Go事務管理庫——如何使用」中,我講述了如何使用事務庫。有些讀者可能讀過“清晰架構(Clean Architecture)的Go微服務: 事物管理” ,其中描述了事務管理系統的舊版本。那篇文章和本文之間會有一些重疊。因為大多數人可能還沒有讀過那篇文章或者即使讀了也忘記了它的內容。因此為了照顧多數讀者,本文還是從頭開始(假設你沒有讀過前文)。如果你讀過,那你可以直接跳過熟悉的部分。
好的事務庫對於使用它的應用程序是透明的。在Go的「sql」庫中,有兩種類型的數據庫鏈接,「sql.DB」和「sql.Tx」。當你不需要事務支持時,使用「sql.DB」;否則使用「sql.Tx」。為了讓這兩種不同場景共享相同的持久層代碼,我們需要對數據庫鏈接進行一個封裝來同時支持這兩種場景。我從“db transaction in golang” 里得到了這個想法。
數據庫層的接口
數據庫層是事務管理庫中處理數據庫訪問的最低層。應用程序不需要修改該層,只有事務管理庫需要這樣做。
數據庫訪問封裝
下面是可同時支持事務和非事務操作的共享數據庫訪問接口, 它在「gdb.go」中定義。
// SqlGdbc (SQL Go database connection) is a wrapper for SQL database handler ( can be *sql.DB or *sql.Tx)
// It should be able to work with all SQL data that follows SQL standard.
type SqlGdbc interface {
Exec(query string, args ...interface{}) (sql.Result, error)
Prepare(query string) (*sql.Stmt, error)
Query(query string, args ...interface{}) (*sql.Rows, error)
QueryRow(query string, args ...interface{}) *sql.Row
// If need transaction support, add this interface
Transactioner
}
// Transactioner is the transaction interface for database handler
// It should only be applicable to SQL database
type Transactioner interface {
// Rollback a transaction
Rollback() error
// Commit a transaction
Commit() error
// TxEnd commits a transaction if no errors, otherwise rollback
// txFunc is the operations wrapped in a transaction
TxEnd(txFunc func() error) error
}
它有兩部分。一個是數據庫接口,它包含了常規的數據庫操作,如查詢表、更新表記錄。另一個事務接口,它包含里支持事務所需要的函數,如「提交」和「回滾」。「SqlGdbc」接口是兩者的結合。該接口將用於連接數據庫。
數據庫訪問接口的實現
下面是數據庫訪問接口的代碼實現。它在「sqlConnWrapper.go」文件中。它定義了兩個結構體,「SqlDBTx」是對「sql.DB」的封裝,將被非事務函數使用。「SqlConnTx」是對「sql.Tx」的封裝,將被事務函數使用。
// SqlDBTx is the concrete implementation of sqlGdbc by using *sql.DB
type SqlDBTx struct {
DB *sql.DB
}
// SqlConnTx is the concrete implementation of sqlGdbc by using *sql.Tx
type SqlConnTx struct {
DB *sql.Tx
}
func (sdt *SqlDBTx) Exec(query string, args ...interface{}) (sql.Result, error) {
return sdt.DB.Exec(query, args...)
}
func (sdt *SqlDBTx) Prepare(query string) (*sql.Stmt, error) {
return sdt.DB.Prepare(query)
}
func (sdt *SqlDBTx) Query(query string, args ...interface{}) (*sql.Rows, error) {
return sdt.DB.Query(query, args...)
}
func (sdt *SqlDBTx) QueryRow(query string, args ...interface{}) *sql.Row {
return sdt.DB.QueryRow(query, args...)
}
func (sdb *SqlConnTx) Exec(query string, args ...interface{}) (sql.Result, error) {
return sdb.DB.Exec(query, args...)
}
func (sdb *SqlConnTx) Prepare(query string) (*sql.Stmt, error) {
return sdb.DB.Prepare(query)
}
func (sdb *SqlConnTx) Query(query string, args ...interface{}) (*sql.Rows, error) {
return sdb.DB.Query(query, args...)
}
func (sdb *SqlConnTx) QueryRow(query string, args ...interface{}) *sql.Row {
return sdb.DB.QueryRow(query, args...)
}
事務接口的實現
下面是「Transactioner」接口的代碼實現,它在文件 “txConn.go”中。我從“database/sql Tx — detecting Commit or Rollback”中得到這個想法。
因為「SqlDBTx」不支持事務,所以它的所有函數都返回「nil”。
// DB doesn't rollback, do nothing here
func (cdt *SqlDBTx) Rollback() error {
return nil
}
//DB doesnt commit, do nothing here
func (cdt *SqlDBTx) Commit() error {
return nil
}
// DB doesnt rollback, do nothing here
func (cdt *SqlDBTx) TxEnd(txFunc func() error) error {
return nil
}
func (sct *SqlConnTx) TxEnd(txFunc func() error) error {
var err error
tx := sct.DB
defer func() {
if p := recover(); p != nil {
log.Println("found p and rollback:", p)
tx.Rollback()
panic(p) // re-throw panic after Rollback
} else if err != nil {
log.Println("found error and rollback:", err)
tx.Rollback() // err is non-nil; don't change it
} else {
log.Println("commit:")
err = tx.Commit() // if Commit returns error update err with commit err
}
}()
err = txFunc()
return err
}
func (sct *SqlConnTx) Rollback() error {
return sct.DB.Rollback()
}
func (sct *SqlConnTx) Commit() error {
return sct.DB.Commit()
}
持久層的接口
在數據庫層之上是持久層,應用程序使用持久層來訪問數據庫表中的記錄。你需要定義一個函數在本層中實現對事務的支持。下面是持久層的事務接口,它位於「txDataService.go」文件中。
// TxDataInterface represents operations needed for transaction support.
type TxDataInterface interface {
// EnableTx is called at the end of a transaction and based on whether there is an error, it commits or rollback the
// transaction.
// txFunc is the business function wrapped in a transaction
EnableTx(txFunc func() error) error
}
以下是它的實現代碼。它只是調用下層數據庫中的函數「TxEnd()」,該函數已在數據庫層實現。下面的代碼不是事務庫的代碼(它是本文中惟一的不是事務庫中的代碼),你需要在應用程序中實現它。
func (uds *UserDataSql) EnableTx(txFunc func() error) error {
return uds.DB.TxEnd(txFunc)
}
獲取數據庫鏈接的代碼
除了我們上面描述的調用接口之外,在應用程序中你還需要先獲得數據庫鏈接。事務庫中有兩個函數可以完成這個任務。
返回”SqlGdbc”接口的函數
函數”Build()”(在”factory.go”中)將返回”SqlGdbc”接口。根據傳入的參數,它講返回滿足”SqlGdbc”接口的結構,如果需要事務支持就是「SqlConnTx」,不需要就是「SqlDBTx」。如果你不需要在應用程序中直接使用數據庫鏈接,那麼調用它是最好的。
// Build returns the SqlGdbc interface. This is the interface that you can use directly in your persistence layer
// If you don't need to cache sql.DB connection, you can call this function because you won't be able to get the sql.DB
// in SqlGdbc interface (if you need to do it, call BuildSqlDB()
func Build(dsc *config.DatabaseConfig) (gdbc.SqlGdbc, error) {
db, err := sql.Open(dsc.DriverName, dsc.DataSourceName)
if err != nil {
return nil, errors.Wrap(err, "")
}
// check the connection
err = db.Ping()
if err != nil {
return nil, errors.Wrap(err, "")
}
dt, err := buildGdbc(db, dsc)
if err != nil {
return nil, err
}
return dt, nil
}
func buildGdbc(sdb *sql.DB,dsc *config.DatabaseConfig) (gdbc.SqlGdbc, error){
var sdt gdbc.SqlGdbc
if dsc.Tx {
tx, err := sdb.Begin()
if err != nil {
return nil, err
}
sdt = &gdbc.SqlConnTx{DB: tx}
log.Println("buildGdbc(), create TX:")
} else {
sdt = &gdbc.SqlDBTx{sdb}
log.Println("buildGdbc(), create DB:")
}
return sdt, nil
}
返回數據庫鏈接的函數
函數”BuildSqlDB()”(在”factory.go”中)將返回”sql.DB”。它會忽略傳入的事務標識參數。應用程序在調用這個函數獲得數據庫鏈接後,還需要根據事務標識自己生成「SqlConnTx」或「SqlDBTx」。如果你需要在應用程序里緩存”sql.DB”,那麼你必須調用這個函數。
// BuildSqlDB returns the sql.DB. The calling function need to generate corresponding gdbc.SqlGdbc struct based on
// sql.DB in order to use it in your persistence layer
// If you need to cache sql.DB connection, you need to call this function
func BuildSqlDB(dsc *config.DatabaseConfig) (*sql.DB, error) {
db, err := sql.Open(dsc.DriverName, dsc.DataSourceName)
if err != nil {
return nil, errors.Wrap(err, "")
}
// check the connection
err = db.Ping()
if err != nil {
return nil, errors.Wrap(err, "")
}
return db, nil
}
局限性
首先,它只支持SQL數據庫的事務。如果你有一個NoSql數據庫,那麼它不支持(大多數NoSql數據庫不支持事務)。
其次,如果你的事務跨越數據庫(例如在不同的微服務之間),那麼它將無法工作。常用的做法是使用「Saga Pattern」。你可以為事務中的每個操作編寫一個補償操作,並在回滾階段逐個執行補償操作。在應用程序中添加「Saga」解決方案並不困難。你可能會問,為什麼不把「Saga」加到事務庫中呢? 這是一個有趣的問題。我覺得還是單獨為「Saga」建一個庫比較合適。
第三,它不支持嵌套事務(Nested Transaction),因此你需要手動確保在代碼中沒有嵌套事務。如果代碼庫不是太複雜,這很容易做到。如果你有一個非常複雜的代碼庫,其中有很多事務和非事務代碼混在一起,那麼你需要一個支持嵌套事務的解決方案。我沒有花時間研究如何添加嵌套事務,但它應該有一定的工作量。如果你對此感興趣,可以從“database/sql: nested transaction or save point support”開始。到目前為止,對於大多數場景,當前的解決方案可能是在代價不大的情況下的最佳方案。
如何擴展庫的功能
「SqlGdbc」接口沒有列出「sql」包中的所有函數,只列出我的應用程序中需要的函數。你可以輕鬆地擴展該接口以包含其他函數。
例如,如果需要將全鏈路跟蹤(詳情請見“Go微服務全鏈路跟蹤詳解”)擴展到數據庫中,則可能需要在上下文中傳遞到數據庫函數中。「sql」庫已經支持具有上下文的數據庫函數。你只需要找到它們並將它們添加到”SqlGdbc”接口中,然後在”sqlConnWrapper “中實現它們。然後在持久層中,需要使用上下文作為參數調用函數。
源碼:
完整源碼: “jfeng45/gtransaction”
索引:
2 “清晰架構(Clean Architecture)的Go微服務: 事物管理”
4 “database/sql Tx — detecting Commit or Rollback”
5 “Applying the Saga Pattern – GOTO Conference”
6 “database/sql: nested transaction or save point support”