6.深入TiDB:樂觀事務

本文基於 TiDB release-5.1進行分析,需要用到 Go 1.16以後的版本

我的部落格地址: //www.luozhiyun.com/archives/620

事務模型概述

由於 TiDB 的事務模型沿用了 Percolator 的事務模型。所以先從 Percolator 開始,關於 Percolator 論文沒看過的同學看這裡://www.luozhiyun.com/archives/609 我已經翻譯好了

Percolator 分散式事務

Percolator實現分散式事務主要基於3個實體:Client、TSO、BigTable。

  • Client是事務的發起者和協調者
  • TSO為分散式伺服器提供一個精確的,嚴格單調遞增的時間戳服務。
  • BigTable 是Google實現的一個多維持久化Map。

Percolator存儲一列數據的時候,會在BigTable中存儲多列數據:

  • data列(D列): 存儲 value
  • lock列(L列): 存儲用於分散式事務的鎖資訊
  • write列(W列):存儲用於分散式事務的提交時間(commit_timestamp)

Percolator的分散式寫事務是由2階段提交(後稱2PC)實現的。不過它對傳統2PC做了一些修改。一個寫事務事務開啟時,Client 會從 TSO 處獲取一個 timestamp 作為事務的開始時間(後稱為start_ts)。在提交之前,所有的寫操作都只是快取在記憶體里。提交時會經過 prewrite 階段和 commit階段,一個寫事務可以包含多個寫操作。

寫操作

Prewrite

  1. 在事務開啟時會從 TSO 獲取一個 timestamp 作為 start_ts;
  2. 在所有行的寫操作中選出一個作為 primary,其他的為 secondaries;
  3. 對primary行寫入L列,即上鎖,上鎖前會檢查是否有衝突:
    1. 檢查L列是否已經有別的客戶端已經上鎖,直接 Abort 整個事務;
    2. 檢查W列是否在本次事務開始時間之後有事務已提交,檢查 W列,是否有更新 [start_ts, +Inf) 之間是否存在相同 key 的數據 。如果存在,則說明存在 W列 conflict ,直接 Abort 整個事務;
  4. 如果沒有衝突的話,則上鎖,以 start_ts 作為 Bigtable 的 timestamp,將數據寫入 data 列,由於此時 write 列尚未寫入,因此數據對其它事務不可見;

Commit

如果 Prewrite 成功,則進入 Commit 階段:

  1. 從TSO處獲取一個timestamp作為事務的提交時間(後稱為commit_ts);
  2. 提交primary, 如果失敗,則abort事務;
  3. 檢查primary上的lock是否還存在,如果不存在,則abort。(其他事務有可能會認為當前事務已經失敗,從而清理掉當前事務的lock);
  4. 以commit_ts為timestamp, 寫入W列,value為start_ts,清理L列的數據。注意,此時為Commit Point,「寫W列」和「清理L列」由BigTable的單行事務保證ACID;
  5. 一旦primary提交成功,則整個事務成功。此時已經可以給客戶端返回成功了,再非同步的進行 secondary 提交。seconary 提交無需檢測 lock 列鎖是否還存在,一定不會失敗;

讀操作

  1. 檢查該行是否有 L 列,時間戳為 [0, start_ts],如果有,表示目前有其他事務正佔用此行,如果這個鎖已經超時則嘗試清除,否則等待超時或者其他事務主動解鎖;
  2. 如果步驟 1 發現鎖不存在,則可以安全的讀取;

TiDB 樂觀事務實現分析

begin 事務

每個 client connection 對應著一個 session , 事務相關數據的放在了 session 中, 它包含了對 KVStore 和 Txn 介面的引用。

func (s *session) NewTxn(ctx context.Context) error {
	if err := s.checkBeforeNewTxn(ctx); err != nil {
		return err
	}
	// 開啟事務
	txn, err := s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(s.sessionVars.CheckAndGetTxnScope()))
	if err != nil {
		return err
	}
	...
	return nil
}

KVStore 定義了Begin/BeginWithOption,用來創建開始一個事務。如上程式碼,session 的 NewTxn 方法中調用 KVStore 的 BeginWithOption 方法創建開始一個事務。

func (s *KVStore) Begin() (*KVTxn, error) {
	return s.BeginWithOption(DefaultStartTSOption())
}
 
func (s *KVStore) BeginWithOption(options StartTSOption) (*KVTxn, error) {
	return newTiKVTxnWithOptions(s, options)
}

Begin/BeginWithOption調用圖如下:

Begin/BeginWithOption最終都會調用到 newTiKVTxnWithOptions 函數中。如果 startTS 為 nil ,則會去 PD服務獲取一個時間戳,作為事務的startTS,同時也是事務的唯一標識。

func newTiKVTxnWithOptions(store *KVStore, options StartTSOption) (*KVTxn, error) {
	if options.TxnScope == "" {
		options.TxnScope = oracle.GlobalTxnScope
	}
	// 去PD服務獲取一個時間戳
	startTS, err := ExtractStartTS(store, options)
	if err != nil {
		return nil, errors.Trace(err)
	}
	snapshot := newTiKVSnapshot(store, startTS, store.nextReplicaReadSeed())
	newTiKVTxn := &KVTxn{
		snapshot:  snapshot,
		us:        unionstore.NewUnionStore(snapshot),
		store:     store,
		startTS:   startTS,
		startTime: time.Now(),
		valid:     true,
		vars:      tikv.DefaultVars,
		scope:     options.TxnScope,
	}
	return newTiKVTxn, nil
}

寫入數據

TiDB 在執行 insert/update/delete 等 DML 時,會調用memBuffer.Set(key, value) 將數據放入到 kv.Transaction 的 memBuffer 裡面,如果執行失敗,就調 StmtRollbackTxnState裡面的buf 清空 。具體實現可以看 tableCommon.AddRecord 函數:

func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts ...table.AddRecordOption) (recordID kv.Handle, err error) {
    ...
	var setPresume bool
	skipCheck := sctx.GetSessionVars().StmtCtx.BatchCheck
	if (t.meta.IsCommonHandle || t.meta.PKIsHandle) && !skipCheck && !opt.SkipHandleCheck {
		// 如果是 LazyCheck ,那麼只讀取本地快取判斷是否存在
		if sctx.GetSessionVars().LazyCheckKeyNotExists() {
			var v []byte
			//只讀取本地快取判斷是否存在
			v, err = txn.GetMemBuffer().Get(ctx, key)
			if err != nil {
				setPresume = true
			}
			if err == nil && len(v) == 0 {
				err = kv.ErrNotExist
			}
		} else {
			//否則會通過rpc請求tikv從集群中校驗數據是否存在
			_, err = txn.Get(ctx, key)
		}
		if err == nil {
			handleStr := getDuplicateErrorHandleString(t, recordID, r)
			return recordID, kv.ErrKeyExists.FastGenByArgs(handleStr, "PRIMARY")
		} else if !kv.ErrNotExist.Equal(err) {
			return recordID, err
		}
	} 
	if setPresume {
		// 表示假定數據不存在
		err = memBuffer.SetWithFlags(key, value, kv.SetPresumeKeyNotExists)
	} else {
        //將 Key-Value 寫到當前事務的快取中
		err = memBuffer.Set(key, value)
	}
	if err != nil {
		return nil, err
	}
    ...
}

AddRecord 將數據寫入的過程我在<5.深入TiDB:Insert 語句>分析過了,就不過多講解。

兩階段提交事務

TiDB 提交事務是通過調用 KVTxn 的 Commit 方法進行的。像 pecolator 論文中描述的協議一樣,這是一個兩階段提交的過程,Prewrite 階段與 Commit 階段。

Prewrite:

  1. TiDB 從當前要寫入的數據中選擇一個 Key 作為當前事務的 Primary Key。
  2. TiDB 從 PD 獲取所有數據的寫入路由資訊,並將所有的 Key 按照所有的路由進行分類。
  3. TiDB 並發向所有涉及的 TiKV 發起 prewrite 請求,TiKV 收到 prewrite 數據後,檢查數據版本資訊是否存在衝突、過期,符合條件給數據加鎖,鎖中記錄本次事務的開始時間戳 startTs。Prewrite 流程任何一步發生錯誤,都會進行回滾:刪除鎖標記 , 刪除版本為 startTs 的數據;
  4. TiDB 收到所有的 prewrite 成功。

當 Prewrite 階段完成以後,進入 Commit 階段,當前時間戳為 commitTs,TSO 會保證 commitTs > startTs。

Commit:

  1. TiDB 向 Primary Key 所在 TiKV 發起第二階段提交 commit 操作,TiKV 收到 commit 操作後,檢查數據合法性,清理 prewrite 階段留下的鎖。

twoPhaseCommitter

我們先看看整體的 twoPhaseCommitter 二階段提交的調用時序圖:

在程式碼實現上面首先會構建一個 twoPhaseCommitter,這個對象會用到在 begin 裡面創建的 KVTxn 對象的欄位:

func newTwoPhaseCommitter(txn *KVTxn, sessionID uint64) (*twoPhaseCommitter, error) {
   return &twoPhaseCommitter{
      store:         txn.store,
      txn:           txn,
      startTS:       txn.StartTS(),
      sessionID:     sessionID,
      regionTxnSize: map[uint64]int{},
      ttlManager: ttlManager{
         ch: make(chan struct{}),
      },
      isPessimistic: txn.IsPessimistic(),
      binlog:        txn.binlog,
   }, nil
}

mutations

由於事務數據都是存放在快取中的,所以 twoPhaseCommitter 會通過 initKeysAndMutations 方法將當前事務的快取中的數據轉成 mutations:

func (c *twoPhaseCommitter) initKeysAndMutations() error {
	var size, putCnt, delCnt, lockCnt, checkCnt int

	txn := c.txn
	// 當前事務的數據都存放在 memBuf 中
	// memBuffer里的 key 是有序排列
	memBuf := txn.GetMemBuffer()
	sizeHint := txn.us.GetMemBuffer().Len()
	c.mutations = newMemBufferMutations(sizeHint, memBuf)
	c.isPessimistic = txn.IsPessimistic()
	filter := txn.kvFilter

	var err error
	// 遍歷 memBuffer 可以順序的收集到事務里需要修改的 key
	for it := memBuf.IterWithFlags(nil, nil); it.Valid(); err = it.Next() {
		_ = err
		key := it.Key()
		flags := it.Flags()
		var value []byte
		var op pb.Op

		if !it.HasValue() {
			...
		} else {
			value = it.Value()
			...
		}

		var isPessimistic bool
		if flags.HasLocked() {
			isPessimistic = c.isPessimistic
		}
		c.mutations.Push(op, isPessimistic, it.Handle())
		size += len(key) + len(value)

		if len(c.primaryKey) == 0 && op != pb.Op_CheckNotExists {
			c.primaryKey = key
		}
	}
	...

	commitDetail := &util.CommitDetails{WriteSize: size, WriteKeys: c.mutations.Len()}
	metrics.TiKVTxnWriteKVCountHistogram.Observe(float64(commitDetail.WriteKeys))
	metrics.TiKVTxnWriteSizeHistogram.Observe(float64(commitDetail.WriteSize))
	c.hasNoNeedCommitKeys = checkCnt > 0
	// 計算事務的 TTL 時間
	c.lockTTL = txnLockTTL(txn.startTime, size)
	c.priority = txn.priority.ToPB()
	c.syncLog = txn.syncLog
	c.resourceGroupTag = txn.resourceGroupTag
	c.setDetail(commitDetail)
	return nil
}

當前事務的數據都存放在 memBuf 中,所以我們需要遍歷 memBuf 可以順序的收集到事務里需要修改的 key。

在這裡還會調用 txnLockTTL 根據事務的大小計算事務的 TTL 時間。如果一個事務的 key 通過 prewrite加鎖後,事務沒有執行完,tidb-server 就掛掉了,這時候集群內其他 tidb-server 是無法讀取這個 key 的,如果沒有 TTL,就會死鎖。設置了 TTL 之後,讀請求就可以在 TTL 超時之後執行清鎖,然後讀取到數據。

func txnLockTTL(startTime time.Time, txnSize int) uint64 { 
	lockTTL := defaultLockTTL
	// 當事務大小大於16KB
	if txnSize >= txnCommitBatchSize {
		sizeMiB := float64(txnSize) / bytesPerMiB
		// 6000 * 事務大小平方根
		lockTTL = uint64(float64(ttlFactor) * math.Sqrt(sizeMiB))
		//最小為3s
		if lockTTL < defaultLockTTL {
			lockTTL = defaultLockTTL
		}
		//最大為20s
		if lockTTL > ManagedLockTTL {
			lockTTL = ManagedLockTTL
		}
	}
 
	elapsed := time.Since(startTime) / time.Millisecond
	return lockTTL + uint64(elapsed)
}

TTL 和事務的大小的平方根成正比,並控制在一個最小值和一個最大值之間,最大20s,最小3s。

prewrite

在執行 之前,先會調用 twoPhaseCommitter 的 prewriteMutations 方法進行一些預處理工作。

func (c *twoPhaseCommitter) prewriteMutations(bo *Backoffer, mutations CommitterMutations) error {
	... 
	return c.doActionOnMutations(bo, actionPrewrite{}, mutations)
}

func (c *twoPhaseCommitter) doActionOnMutations(bo *Backoffer, action twoPhaseCommitAction, mutations CommitterMutations) error {
	if mutations.Len() == 0 {
		return nil
	}
	//按照region分組對mutations進行分組
	groups, err := c.groupMutations(bo, mutations)
	if err != nil {
		return errors.Trace(err)
	}
	...
	//進一步的分批處理
	return c.doActionOnGroupMutations(bo, action, groups)
}

groupMutations

首先會調用 groupMutations 對 mutations 按照 region 分組。整個分組流程如下:

先對mutations按照region分組,如果某個region的mutations 太多, 則會先發送CmdSplitRegion命令給TiKV, TiKV對那個region先做個split, 然後再開始提交。

doActionOnGroupMutations

分組完之後會調用 doActionOnGroupMutations 會對每個group的 mutations 做進一步的分批處理。然後調用 doActionOnBatches 進行 prewrite 處理,整個調用圖如下:

程式碼的主要執行邏輯如下:

func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *Backoffer, action twoPhaseCommitAction, groups []groupedMutations) error {
	action.tiKVTxnRegionsNumHistogram().Observe(float64(len(groups))) 
	... 
	batchBuilder := newBatched(c.primary())
	//每個分組內按16KB大小再分批
	for _, group := range groups {
		batchBuilder.appendBatchMutationsBySize(group.region, group.mutations, sizeFunc, txnCommitBatchSize)
	}
	firstIsPrimary := batchBuilder.setPrimary()

	actionCommit, actionIsCommit := action.(actionCommit)
	...
	//commit先同步的提交primary key所在的batch
	if firstIsPrimary &&
		((actionIsCommit && !c.isAsyncCommit()) || actionIsCleanup || actionIsPessimiticLock) {
		// primary should be committed(not async commit)/cleanup/pessimistically locked first
		err = c.doActionOnBatches(bo, action, batchBuilder.primaryBatch())
		... 
		//提交完之後將primary key所在的batch移除
		batchBuilder.forgetPrimary()
	}
	// Already spawned a goroutine for async commit transaction.
	// 其它的key由go routine後台非同步的提交
	if actionIsCommit && !actionCommit.retry && !c.isAsyncCommit() {
		secondaryBo := retry.NewBackofferWithVars(context.Background(), CommitSecondaryMaxBackoff, c.txn.vars)
		go func() {
			... 
			e := c.doActionOnBatches(secondaryBo, action, batchBuilder.allBatches())
			...
		}()
	} else {
		//執行 prewrite
		err = c.doActionOnBatches(bo, action, batchBuilder.allBatches())
	}
	return errors.Trace(err)
}

doActionOnGroupMutations 裡面還參雜了 commit 程式碼,可以先忽略。

下面跟著上面的流程圖 doActionOnGroupMutations 會進入到 actionPrewrite 的 handleSingleBatch 方法中詳細說說這個方法。在講這個方法之前先看看主要的執行邏輯:

下面來看看程式碼的具體實現:

func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchMutations) (err error) {
	...
	// 獲取事務的大小
	txnSize := uint64(c.regionTxnSize[batch.region.id])
	// 因為region的缺失導致的重試,所以不知道事務大小,這裡重置事務大小為最大值
	if action.retry {
		txnSize = math.MaxUint64
	} 
	tBegin := time.Now()
	attempts := 0
	// 構建 Request
	req := c.buildPrewriteRequest(batch, txnSize)
	// 構建 RegionRequestSender
	sender := NewRegionRequestSender(c.store.regionCache, c.store.GetTiKVClient())
	 
	for {
		//嘗試次數
		attempts++
		// 如果請求超過了1分鐘,那麼列印一條日誌
		if time.Since(tBegin) > slowRequestThreshold {
			logutil.BgLogger().Warn("slow prewrite request", zap.Uint64("startTS", c.startTS), zap.Stringer("region", &batch.region), zap.Int("attempts", attempts))
			tBegin = time.Now()
		}
		//發送請求
		resp, err := sender.SendReq(bo, req, batch.region, client.ReadTimeoutShort)
		// Unexpected error occurs, return it
		if err != nil {
			return errors.Trace(err)
		}
		regionErr, err := resp.GetRegionError()
		if err != nil {
			return errors.Trace(err)
		}
		// 如果遇到了regionError, 則需要重新調用doActionOnMutations重新分組,重新嘗試
		if regionErr != nil {
			...
			err = c.doActionOnMutations(bo, actionPrewrite{true}, batch.mutations)
			return errors.Trace(err)
		}

		if resp.Resp == nil {
			return errors.Trace(tikverr.ErrBodyMissing)
		}
		prewriteResp := resp.Resp.(*pb.PrewriteResponse)
		keyErrs := prewriteResp.GetErrors()
		if len(keyErrs) == 0 {
			//如果沒有keyError,並且Batch是primary,則啟動一個tllManager
			if batch.isPrimary { 
				// 如果事務大於32M,那麼開啟ttlManager定時發送TxnHeartBeat心跳
				if int64(c.txnSize) > config.GetGlobalConfig().TiKVClient.TTLRefreshedTxnSize &&
					prewriteResp.OnePcCommitTs == 0 {
					c.run(c, nil)
				}
			} 
			...
			return nil
		}
		var locks []*Lock
		for _, keyErr := range keyErrs { 
			// 該key已存在
			if alreadyExist := keyErr.GetAlreadyExist(); alreadyExist != nil {
				e := &tikverr.ErrKeyExist{AlreadyExist: alreadyExist}
				err = c.extractKeyExistsErr(e)
				if err != nil {
					atomic.StoreUint32(&c.prewriteFailed, 1)
				}
				return err
			} 
			// 從 keyErr 中抽取出衝突的lock
			lock, err1 := extractLockFromKeyErr(keyErr)
			if err1 != nil {
				atomic.StoreUint32(&c.prewriteFailed, 1)
				return errors.Trace(err1)
			} 
			locks = append(locks, lock)
		}
		start := time.Now()
		//嘗試解決這些locks,獲取鎖的過期時間
		msBeforeExpired, err := c.store.lockResolver.resolveLocksForWrite(bo, c.startTS, locks)
		if err != nil {
			return errors.Trace(err)
		}
		atomic.AddInt64(&c.getDetail().ResolveLockTime, int64(time.Since(start)))
		if msBeforeExpired > 0 {
			// 過期時間大於0,那麼sleep等待
			err = bo.BackoffWithCfgAndMaxSleep(retry.BoTxnLock, int(msBeforeExpired), errors.Errorf("2PC prewrite lockedKeys: %d", len(locks)))
			if err != nil {
				return errors.Trace(err)
			}
		}
	}
}

handleSingleBatch 裡面有個循環會發請求到 TiKV,如果失敗,那麼會根據返回的錯誤來判斷是否需要重試。需要注意的是,如果事務大於32M,那麼開啟ttlManager定時發送TxnHeartBeat心跳,因為大事務處理時間比較長。

commit

commit 和 上面的 prewrite 執行流程類似,在 twoPhaseCommitter 的 execute 方法中執行完 prewriteMutations 之後會調用到 commitTxn 方法中,最後在 doActionOnBatches 方法中進行分批處理。

doActionOnBatches 方法會調用到 actionCommit 的 handleSingleBatch 方法進行事務的提交。

actionCommit 的 handleSingleBatch 執行流程其實和上面的 prewrite 也是類似的邏輯:

handleSingleBatch 首先也會調用 NewRequest 初始化一個 Request 結構體作為請求體,然後進入到循環結構中,調用 RegionRequestSender 的 SendReq 向 TiKV 發起請求;

如果返回 regionErr 錯誤,那麼會重新調用 doActionOnMutations 重新分組之後再請求;如果返回的錯誤裡面 GetCommitTsExpired 不為空,那麼會調用 getTimestampWithRetry 方法重新獲取 commitTS 之後再重試提交。

func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchMutations) error {
	req := tikvrpc.NewRequest(...) 
	sender := NewRegionRequestSender(c.store.regionCache, c.store.GetTiKVClient())
	for {
		// 重試次數
		attempts++
		...
		//向 tikv 發起提交請求
		resp, err := sender.SendReq(bo, req, batch.region, client.ReadTimeoutShort)
		...
		// 如果遇到了regionError, 則需要重新調用doActionOnMutations重新分組,重新嘗試
		regionErr, err := resp.GetRegionError()
		if err != nil {
			return errors.Trace(err)
		}
		if regionErr != nil {
			...
			// 重新分組
			err = c.doActionOnMutations(bo, actionCommit{true}, batch.mutations)
			return errors.Trace(err)
		} 
		commitResp := resp.Resp.(*pb.CommitResponse)
		 
		if keyErr := commitResp.GetError(); keyErr != nil {
			if rejected := keyErr.GetCommitTsExpired(); rejected != nil {
				// 重新獲取 commitTS
				commitTS, err := c.store.getTimestampWithRetry(bo, c.txn.GetScope())
				...
				continue
			}
			...
		}
		break
	} 
	return nil
}

總結

由於TiDB的二階段提交是通過 Percolator 分散式事務模型實現的,所以本篇文章首先從 Percolator 分散式事務模型給大家講解一下它裡面主要的寫操作和讀操作的實現步驟;

然後再帶入到 TiDB 的二階段提交中,從程式碼中和大家剖析實現原理。整個提交的過程大致如圖所示:

Reference

//pingcap.com/zh/blog/tidb-source-code-reading-19

//asktug.com/t/topic/1495

//pingcap.com/zh/blog/tidb-source-code-reading-19

//pingcap.com/zh/blog/percolator-and-txn

//www.luozhiyun.com/archives/609

//pingcap.com/zh/blog/tikv-source-code-reading-12

//github.com/tikv/sig-transaction/tree/master/design/async-commit

//pingcap.com/zh/blog/async-commit-principle

//zhuanlan.zhihu.com/p/59115828

//mysql.taobao.org/monthly/2018/11/02/

//pingcap.com/zh/blog/best-practice-optimistic-transaction

//pingcap.com/zh/blog/tidb-transaction-model

掃碼_搜索聯合傳播樣式-白色版 1