­

5.深入TiDB:Insert 語句

本文基於 TiDB release-5.1進行分析,需要用到 Go 1.16以後的版本

我的博客地址://www.luozhiyun.com/archives/605

這篇文章我們看一下 TiDB 是插入數據是如何封裝的,索引是如何維護的,如果插入的數據發生了衝突會如何處理,類似INSERT IGNOREINSERT ON DUPLICATE KEY UPDATE插入語句是如何處理。

下面我們先構造一個表結構:

CREATE TABLE test_insert (a int primary key, b int, c int,d int,index b_index(b),unique index c_index(c) );

這個表結構中有一個主鍵、普通索引、唯一索引。

普通 Insert

構建執行計劃

普通插入 SQL 考慮的是類似下面這樣的語句:

INSERT INTO test.test_insert (a, b, c) VALUES (1, 1, 1);

首先會和 select 語法一樣先進行語法解析構建 ast 語法樹:

type InsertStmt struct {
	dmlNode
	 
	// sql 中的表信息
	Table       *TableRefsClause
	// 字段信息
	Columns     []*ColumnName
	// 要插入的數據
	Lists       [][]ExprNode
	...
}

我這裡展示的是幾個比較重要的字段,因為在插入數據的時候可以使用 :INSERT INTO t VALUES(),(),()... 這樣的語法,所以要插入的數據是一個切片:Lists。

然後制定查詢計劃,在制定查詢計劃的時候同樣會走到 PlanBuilder 的 Build 方法中,然後根據 ast 語法樹的類型 進入到 buildInsert 分支中:

func (b *PlanBuilder) Build(ctx context.Context, node ast.Node) (Plan, error) {
	b.optFlag |= flagPrunColumns
	switch x := node.(type) {
	case *ast.InsertStmt:
		return b.buildInsert(ctx, x)
	...
}

func (b *PlanBuilder) buildInsert(ctx context.Context, insert *ast.InsertStmt) (Plan, error) {
	// 獲取ast樹中表節點
	ts, ok := insert.Table.TableRefs.Left.(*ast.TableSource)
	if !ok {
		return nil, infoschema.ErrTableNotExists.GenWithStackByArgs()
	}
	// 獲取表的相關信息
	// 包含了表信息,庫信息,分區信息等
	tn, ok := ts.Source.(*ast.TableName)
	if !ok {
		return nil, infoschema.ErrTableNotExists.GenWithStackByArgs()
	}
	// 獲取其中表信息
	tableInfo := tn.TableInfo
	...
	// Build Schema with DBName otherwise ColumnRef with DBName cannot match any Column in Schema.
	// schema包含表的字段信息,主鍵字段等,names是表的字段信息切片
	schema, names, err := expression.TableInfo2SchemaAndNames(b.ctx, tn.Schema, tableInfo)
	if err != nil {
		return nil, err
	}
	// 根據表的id從緩存中獲取表的元數據
	// 這裡包含的信息比較多,有表名、字段信息、隱藏字段、所有索引、表的字符集編碼等
	tableInPlan, ok := b.is.TableByID(tableInfo.ID)
	if !ok {
		return nil, errors.Errorf("Can't get table %s.", tableInfo.Name.O)
	}
	// 構建插入執行計劃
	insertPlan := Insert{
		Table:         tableInPlan,
		Columns:       insert.Columns,
		tableSchema:   schema,
		tableColNames: names,
		IsReplace:     insert.IsReplace,
	}.Init(b.ctx)
	... 
	// 根據不同的語法執行不同的分支
	// Branch for `INSERT ... SET ...`.
	if len(insert.Setlist) > 0 { 
	// Branch for `INSERT ... VALUES ...`.
	} else if len(insert.Lists) > 0 {
		// 根據ast語法樹中的= ast.ExprNode 轉換成執行計劃的 expression.Expression
		err := b.buildValuesListOfInsert(ctx, insert, insertPlan, mockTablePlan, checkRefColumn)
		if err != nil {
			return nil, err
		}
	// Branch for `INSERT ... SELECT ...`.
	} else { 
	} 
	...
	return insertPlan, err
}

buildInsert 這個方法主要涉及兩個部分:

  • 補全表相關的元數據信息,包括 Database/Table/Column/Index 信息;
  • 處理 ast 語法樹中要插入的 Lists 中的數據,將 ast.ExprNode 轉換成 expression.Expression。

然後將構建好的 Insert 執行計劃返回。

需要注意的是,由於 Insert 語句比較簡單,沒什麼優化的空間,所以不會走 DoOptimize 進行物理優化:

finalPlan, cost, err := plannercore.DoOptimize(ctx, sctx, builder.GetOptFlag(), logic)

執行 Insert 計劃

func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
	 ...
	// 生成執行器
	e, err := a.buildExecutor()
	if err != nil {
		return nil, err
	}
	// ExecuteExec will rewrite `a.Plan`, so set plan label should be executed after `a.buildExecutor`.
	ctx = a.setPlanLabelForTopSQL(ctx)
	// handleNoDelay負責執行像 Insert 這種不需要返回數據的語句,只需要把語句執行完成即可
	if handled, result, err := a.handleNoDelay(ctx, e, isPessimistic); handled {
		return result, err
	}
	... 
	return &recordSet{
		executor:   e,
		stmt:       a,
		txnStartTS: txnStartTS,
	}, nil
}

這裡根據執行計劃生成執行器的過程和 Select 是一致的,我們簡單看一下。buildExecutor 方法最後會將執行計劃轉化成 InsertExec 結構體,後續的執行都由這個結構進行。

![Frame 2](//img.luozhiyun.com/Frame 2.png)

在生成完執行計劃之後會進入到 handleNoDelay 執行 SQL 語句。後面的執行流程比較長,我們省略一些中間環節:

![Frame 3](//img.luozhiyun.com/Frame 3-3229191.png)

insertRows 會主要做的就是根據字段類型,獲取數據之後做數據填充。

func insertRows(ctx context.Context, base insertCommon) (err error) {
	// 獲取 InsertValues 實例
	e := base.insertCommon()
	...
	// 設置填充函數
	evalRowFunc := e.fastEvalRow
	// 如果要插入的數據不是常量,那麼會使用evalRow函數
	if !e.allAssignmentsAreConstant {
		evalRowFunc = e.evalRow
	}

	rows := make([][]types.Datum, 0, len(e.Lists)) 
	for i, list := range e.Lists {
		e.rowCount++
		var row []types.Datum
		row, err = evalRowFunc(ctx, list, i)
		if err != nil {
			return err
		}
		...
	}  
	// 批量設置自增id
	rows, err = e.lazyAdjustAutoIncrementDatum(ctx, rows)
	if err != nil {
		return err
	}
	// 將數據寫入存儲引擎中
	err = base.exec(ctx, rows)
	if err != nil {
		return err
	} 
	return nil
}

insertRows 在填充數據的時候會判斷數據類型,如果要處理的數據有非常量,比如有需要依賴其他字段設值、函數等等,這個時候會使用 evalRow 方法進行填充,否則使用 fastEvalRow 進行填充。最後將數據處理好之後會調用 InsertExec 的 exec 方法將數據寫入存儲引擎中。

func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
	...
	for i, row := range rows {
		... 
		err := e.addRecord(ctx, row)
		if err != nil {
			return err
		}
	}
	...
	return nil
}

在 exec 方法中會遍歷所有的數據,然後調用 addRecord 方法進行處理。

tidb3

InsertExec 的 addRecord 方法最終會調用到 TableCommon 的 AddRecord。

func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts ...table.AddRecordOption) (recordID kv.Handle, err error) {
	txn, err := sctx.Txn(true)
	if err != nil {
		return nil, err
	}
    ... 
	writeBufs := sessVars.GetWriteStmtBufs() 
	// 獲取記錄行的key
	key := t.RecordKey(recordID) 
	// 格式化數據行
	writeBufs.RowValBuf, err = tablecodec.EncodeRow(sc, row, colIDs, writeBufs.RowValBuf, writeBufs.AddRowValues, rd)
	if err != nil {
		return nil, err
	}
	value := writeBufs.RowValBuf
	// 檢測該key在本地緩存中是否存在
	var setPresume bool
	skipCheck := sctx.GetSessionVars().StmtCtx.BatchCheck
	if (t.meta.IsCommonHandle || t.meta.PKIsHandle) && !skipCheck && !opt.SkipHandleCheck {
		// 如果是 LazyCheck ,那麼只讀取本地緩存判斷是否存在
		if sctx.GetSessionVars().LazyCheckKeyNotExists() {
			var v []byte
			//只讀取本地緩存判斷是否存在
			v, err = txn.GetMemBuffer().Get(ctx, key)
			if err != nil {
				setPresume = true
			}
			if err == nil && len(v) == 0 {
				err = kv.ErrNotExist
			}
		} else {
			//否則會通過rpc請求tikv從集群中校驗數據是否存在
			_, err = txn.Get(ctx, key)
		}
		if err == nil {
			handleStr := getDuplicateErrorHandleString(t, recordID, r)
			return recordID, kv.ErrKeyExists.FastGenByArgs(handleStr, "PRIMARY")
		} else if !kv.ErrNotExist.Equal(err) {
			return recordID, err
		}
	}
	// 將 Key-Value 寫到當前事務的緩存中
	if setPresume {
		err = memBuffer.SetWithFlags(key, value, kv.SetPresumeKeyNotExists)
	} else {
		err = memBuffer.Set(key, value)
	}
	if err != nil {
		return nil, err
	}  
	// 構造 Index 數據
	h, err := t.addIndices(sctx, recordID, r, txn, createIdxOpts)
	if err != nil {
		return h, err
	}

	...
	return recordID, nil
}

AddRecord 主要做這麼幾件事:

  • 獲取記錄行的key,序列化 value,將 Key-Value 寫到當前事務的緩存中;
  • 構造 Index 數據;

TiDB 中存儲的數據是全局有序 的,並且數據會以 Key-Value的形式存儲在 TiDB 中。

所以 TiDB 對每個表分配一個 TableID,每一個索引都會分配一個 IndexID,每一行分配一個 RowID(如果表有整數型的 Primary Key,那麼會用 Primary Key 的值當做 RowID),其中 TableID 在整個集群內唯一,IndexID/RowID 在表內唯一,這些 ID 都是 int64 類型。

每行數據按照如下規則進行編碼成 Key-Value pair:

Key: tablePrefix{tableID}_recordPrefixSep{rowID}
Value: [col1, col2, col3, col4]

那麼對應的代碼實現則會調用 RecordKey 方法獲得一個這樣的 Key:

t.indexPrefix = tablecodec.GenTableIndexPrefix(physicalTableID)

func (t *TableCommon) RecordKey(h kv.Handle) kv.Key {
	return tablecodec.EncodeRecordKey(t.recordPrefix, h)
}

這個 Key 分別由 tableID 與 rowID 構成;

對於 Unique Index 數據,會按照如下規則編碼成 Key-Value pair:

Key: tablePrefix{tableID}_indexPrefixSep{indexID}_indexedColumnsValue
Value: rowID

對於非Unique Index 數據,可能有多行數據的 ColumnsValue是一樣的,所以會按照如下規則編碼成 Key-Value pair:

Key: tablePrefix{tableID}_indexPrefixSep{indexID}_indexedColumnsValue_rowID
Value: null

對應的 Index 實現則會調用 addIndices 方法,最後調用到 GenIndexKey 生成Key:

tidb4

func GenIndexKey(sc *stmtctx.StatementContext, tblInfo *model.TableInfo, idxInfo *model.IndexInfo,
	phyTblID int64, indexedValues []types.Datum, h kv.Handle, buf []byte) (key []byte, distinct bool, err error) {
	// 校驗是否是唯一鍵
	if idxInfo.Unique { 
		distinct = true
		// 唯一鍵是允許 null 值的
		for _, cv := range indexedValues {
			if cv.IsNull() {
				distinct = false
				break
			}
		}
	} 
	//如果是字符串,那麼需要按字段長度裁切
	TruncateIndexValues(tblInfo, idxInfo, indexedValues)
	// 按 tablePrefix{tableID}_indexPrefixSep{indexID}_indexedColumnsValue 拼接
	key = GetIndexKeyBuf(buf, RecordRowKeyLen+len(indexedValues)*9+9)
	key = appendTableIndexPrefix(key, phyTblID)
	key = codec.EncodeInt(key, idxInfo.ID)
	key, err = codec.EncodeKey(sc, key, indexedValues...)
	if err != nil {
		return nil, false, err
	}
	if !distinct && h != nil {
		// 如果是非Unique Index 數據,還需要拼接上 rowID
		if h.IsInt() { 
			key, err = codec.EncodeKey(sc, key, types.NewDatum(h.IntValue()))
		} else {
			key = append(key, h.Encoded()...)
		}
	}
	return
}

GenIndexKey 這裡會按照上面說到的規則進行拼接。

最後所有的 Key Value 構造完畢之後會將值寫入到當前事務緩存中,等待提交。

func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts ...table.AddRecordOption) (recordID kv.Handle, err error) {
    ...
	var setPresume bool
	skipCheck := sctx.GetSessionVars().StmtCtx.BatchCheck
	if (t.meta.IsCommonHandle || t.meta.PKIsHandle) && !skipCheck && !opt.SkipHandleCheck {
		// 如果是 LazyCheck ,那麼只讀取本地緩存判斷是否存在
		if sctx.GetSessionVars().LazyCheckKeyNotExists() {
			var v []byte
			//只讀取本地緩存判斷是否存在
			v, err = txn.GetMemBuffer().Get(ctx, key)
			if err != nil {
				setPresume = true
			}
			if err == nil && len(v) == 0 {
				err = kv.ErrNotExist
			}
		} else {
			//否則會通過rpc請求tikv從集群中校驗數據是否存在
			_, err = txn.Get(ctx, key)
		}
		if err == nil {
			handleStr := getDuplicateErrorHandleString(t, recordID, r)
			return recordID, kv.ErrKeyExists.FastGenByArgs(handleStr, "PRIMARY")
		} else if !kv.ErrNotExist.Equal(err) {
			return recordID, err
		}
	}
	//將 Key-Value 寫到當前事務的緩存中
	if setPresume {
		// 表示假定數據不存在
		err = memBuffer.SetWithFlags(key, value, kv.SetPresumeKeyNotExists)
	} else {
		err = memBuffer.Set(key, value)
	}
	if err != nil {
		return nil, err
	}
    ...
}

由於在設計上,TiDB 與 TiKV 是分層的結構,為了保證高效率的執行,在 LazyCheck 模式下,在事務內只有讀操作是必須從存儲引擎獲取數據,而所有的寫操作都事先放在單 TiDB 實例內事務自有的 memDbBuffer 中,在事務提交時才一次性將事務寫入 TiKV。

如上面代碼所示,在調用 AddRecord 時,會根據 Key 從 MemBuffer 中判斷是否存在,不存在那麼在操作 memBuffer 的時候會打上標記 SetPresumeKeyNotExists 表示假設插入不會發生衝突,不需要去 TiKV 中檢查衝突數據是否存在,只將這些數據標記為待檢測狀態。最後到提交過程中,統一將整個事務里待檢測數據做一次批量檢測。

下面通過一個官方的例子來說明一下 LazyCheck 模式下 MySQL 和 TiDB 的區別:

MySQL:

mysql> CREATE TABLE t (i INT UNIQUE);
Query OK, 0 rows affected (0.15 sec)

mysql> INSERT INTO t VALUES (1);
Query OK, 1 row affected (0.01 sec)

mysql> BEGIN;
Query OK, 0 rows affected (0.00 sec)

mysql> INSERT INTO t VALUES (1);
ERROR 1062 (23000): Duplicate entry '1' for key 'i'
mysql> COMMIT;
Query OK, 0 rows affected (0.11 sec)

TiDB:

mysql> CREATE TABLE t (i INT UNIQUE);
Query OK, 0 rows affected (1.04 sec)

mysql> INSERT INTO t VALUES (1);
Query OK, 1 row affected (0.12 sec)

mysql> BEGIN;
Query OK, 0 rows affected (0.01 sec)

mysql> INSERT INTO t VALUES (1);
Query OK, 1 row affected (0.00 sec)

mysql> COMMIT;
ERROR 1062 (23000): Duplicate entry '1' for key 'i'

可以看出來,對於 INSERT 語句 TiDB 是在事務提交的時候才做衝突檢測而 MySQL 是在語句執行的時候做的檢測。

最後讓我們用一幅圖來再回顧一下整個流程:

tidb5

INSERT IGNORE

INSERT IGNORE和普通 Insert 不同的是當 INSERT 的時候遇到唯一約束衝突後,忽略當前 INSERT 的行,並記一個 warning。當語句執行結束後,可以通過 SHOW WARNINGS看到哪些行沒有被插入。

為了實現這個目的又不影響性能,TiDB 通過 batchCheckAndInsert 批量檢測來校驗數據是否衝突:

func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
	...
	sessVars := e.ctx.GetSessionVars()
	defer sessVars.CleanBuffers()
	ignoreErr := sessVars.StmtCtx.DupKeyAsWarning
	 	// 判斷是否有 OnDuplicate 語句
	if len(e.OnDuplicate) > 0 {
		...
        // 判斷是否包含 IGNORE 語句
	} else if ignoreErr {
        // 判斷是否重複,不重複則插入
		err := e.batchCheckAndInsert(ctx, rows, e.addRecord)
		if err != nil {
			return err
		}
        // 普通 Insert
	} else {
		...
	} 
	return nil
}

在 InsertExec 的 exec 方法中如果 SQL 語句包含 IGNORE 會進入到 IF 判斷的第二個分支中調用 batchCheckAndInsert 方法進行衝突校驗。

func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.Datum, addRecord func(ctx context.Context, row []types.Datum) error) error {
	...
	start := time.Now()
	// 獲取行數據中需要校驗的key,如主鍵,唯一鍵
	toBeCheckedRows, err := getKeysNeedCheck(ctx, e.ctx, e.Table, rows)
	if err != nil {
		return err
	}
	// 獲取事務處理器
	txn, err := e.ctx.Txn(true)
	if err != nil {
		return err
	}   
	// 批量從 tikv 中根據傳入的 key 獲取數據,存入到緩存中
	if _, err = prefetchUniqueIndices(ctx, txn, toBeCheckedRows); err != nil {
		return err
	}  
	for i, r := range toBeCheckedRows {
		if r.ignored {
			continue
		}
		skip := false
		// 判斷主鍵
		if r.handleKey != nil {
			// 從緩存中判斷key是否存在,存在則重複
			_, err := txn.Get(ctx, r.handleKey.newKey)
			if err == nil {
				e.ctx.GetSessionVars().StmtCtx.AppendWarning(r.handleKey.dupErr)
				continue
			}
			if !kv.IsErrNotFound(err) {
				return err
			}
		}
		// 判斷唯一鍵
		for _, uk := range r.uniqueKeys {
			// 從緩存中判斷key是否存在,存在則重複
			_, err := txn.Get(ctx, uk.newKey)
			if err == nil {
				// If duplicate keys were found in BatchGet, mark row = nil.
				e.ctx.GetSessionVars().StmtCtx.AppendWarning(uk.dupErr)
				skip = true
				break
			}
			if !kv.IsErrNotFound(err) {
				return err
			}
		}
		// 沒有衝突,調用 addRecord 添加數據
		if !skip {
			e.ctx.GetSessionVars().StmtCtx.AddCopiedRows(1)
			err = addRecord(ctx, rows[i])
			if err != nil {
				return err
			}
		}
	} 
	return nil
}

這一段代碼比較長,但是也很好理解。

  • getKeysNeedCheck 作用是根據所有的 rows 數據封裝好裏面唯一鍵和主鍵的key,按照 TiKV 中存儲的格式封裝,我在上面普通 Insert 已經講過了,這裡就不再重複貼出 Key 的規則;
  • prefetchUniqueIndices 是根據 toBeCheckedRows 裏面封裝好的 Key 通過 BatchGet 發送 RPC 請求批量去 TiKV 獲取數據,然後存入到緩存中;
  • 然後會遍歷 toBeCheckedRows 這裏面的主鍵和唯一鍵,通過 txn.Get從緩存中判斷key是否存在,存在則重複;
  • 最後如果不衝突,那麼會調用 addRecord 將數據緩存到本地事務中。

tidb8

ON DUPLICATE

ON DUPLICATE 指的是INSERT ON DUPLICATE KEY UPDATE語句,它是幾種 INSERT 語句中最為複雜的。其語義的本質是包含了一個 INSERT 和 一個 UPDATE。

它的入口在 InsertExec 執行 exec 方法的時候:

func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
	...
	sessVars := e.ctx.GetSessionVars()
	defer sessVars.CleanBuffers()
	ignoreErr := sessVars.StmtCtx.DupKeyAsWarning
	 	// 判斷是否有 OnDuplicate 語句
	if len(e.OnDuplicate) > 0 {
		err := e.batchUpdateDupRows(ctx, rows)
		if err != nil {
			return err
		}
        // 判斷是否包含 IGNORE 語句
	} else if ignoreErr {
        ...
        // 普通 Insert
	} else {
		...
	} 
	return nil
}

INSERT IGNORE相同,首先會進入 IF 分支,判斷是否包含 ON DUPLICATE執行語句,然後執行 batchUpdateDupRows 方法。

func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.Datum) error {
	...
	// 構造唯一鍵和主鍵的key
	toBeCheckedRows, err := getKeysNeedCheck(ctx, e.ctx, e.Table, newRows)
	if err != nil {
		return err
	}
	txn, err := e.ctx.Txn(true)
	if err != nil {
		return err
	}

	// 根據key填充對應的緩存
	if err = prefetchDataCache(ctx, txn, toBeCheckedRows); err != nil {
		return err
	}
	for i, r := range toBeCheckedRows {
		if r.handleKey != nil {
			handle, err := tablecodec.DecodeRowKey(r.handleKey.newKey)
			if err != nil {
				return err
			}
			// 根據主鍵判斷是否有衝突,如果有衝突 err 則為 nil
			err = e.updateDupRow(ctx, i, txn, r, handle, e.OnDuplicate)
			if err == nil {
				continue
			}
			if !kv.IsErrNotFound(err) {
				return err
			}
		}
		// 如果主鍵沒有衝突,那麼判斷唯一鍵是否有衝突
		for _, uk := range r.uniqueKeys {
			val, err := txn.Get(ctx, uk.newKey)
			if err != nil {
				if kv.IsErrNotFound(err) {
					continue
				}
				return err
			}
			handle, err := tablecodec.DecodeHandleInUniqueIndexValue(val, uk.commonHandle)
			if err != nil {
				return err
			} 
			err = e.updateDupRow(ctx, i, txn, r, handle, e.OnDuplicate)
			if err != nil { 
				return err
			} 
			newRows[i] = nil
			break
		} 
		// 如果主鍵和唯一鍵都沒有衝突,那麼執行正常插入邏輯
		if newRows[i] != nil {
			err := e.addRecord(ctx, newRows[i])
			if err != nil {
				return err
			}
		}
	}
	if e.stats != nil {
		e.stats.CheckInsertTime += time.Since(start)
	}
	return nil
}

batchUpdateDupRows 方法首先會構造唯一鍵和主鍵的 key ,然後調用 prefetchDataCache 方法根據 Key 值一次性獲取 TiKV 對應值填充緩存。

之後遍歷構造好的 toBeCheckedRows ,先調用 updateDupRow 方法判斷主鍵判斷是否有衝突,如果主鍵沒有衝突,那麼判斷唯一鍵是否有衝突,都沒有衝突則執行正常插入邏輯。

tidb6

updateDupRow 會判斷 Key 值在緩存中是否存在,存在則調用 doDupRowUpdate ;doDupRowUpdate 中會根據 ON DUPLICATE 中的字段更新新的數據行中的值,並將被更新過的字段打上 flag 之後調用 updateRecord 函數。

func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, oldData, newData []types.Datum, modified []bool, t table.Table,
	onDup bool, memTracker *memory.Tracker) (bool, error) {
	 
	txn, err := sctx.Txn(false)
	if err != nil {
		return false, err
	}
	 
	changed, handleChanged := false, false
	...
	for i, col := range t.Cols() {
		// 這裡是新舊數據進行比較,如果相同返回0
		cmp, err := newData[i].CompareDatum(sc, &oldData[i])
		if err != nil {
			return false, err
		}
		//這裡表明新舊數據不同
		if cmp != 0 {
			changed = true //設置標記位,表示有數據被修改
			modified[i] = true 
			...
			// 如果是主鍵更改,設置 handleChanged
			if col.IsPKHandleColumn(t.Meta()) {
				handleChanged = true 
				if err := rebaseAutoRandomValue(sctx, t, &newData[i], col); err != nil {
					return false, err
				}
			}
			// 如果是主鍵更改,設置 handleChanged
			if col.IsCommonHandleColumn(t.Meta()) {
				handleChanged = true
			}
			// 表示該字段沒有被更改
		} else {
			if mysql.HasOnUpdateNowFlag(col.Flag) && modified[i] { 
				onUpdateSpecified[i] = true
			}
			modified[i] = false
		}
	} 
	// 如果數據行沒有變化,直接返回
	if !changed {
		...
		return false, nil
	} 
 	// 這裡如果是主鍵被更改了,那麼會先將原數據刪除,再添加一條新的數據
	if handleChanged {
		if updated, err := func() (bool, error) {
			txn, err := sctx.Txn(true)
			if err != nil {
				return false, err
			}
			memBuffer := txn.GetMemBuffer()
			sh := memBuffer.Staging()
			defer memBuffer.Cleanup(sh)

			if err = t.RemoveRecord(sctx, h, oldData); err != nil {
				return false, err
			}

			_, err = t.AddRecord(sctx, newData, table.IsUpdate, table.WithCtx(ctx))
			if err != nil {
				return false, err
			}
			memBuffer.Release(sh)
			return true, nil
		}(); err != nil {
			if terr, ok := errors.Cause(err).(*terror.Error); sctx.GetSessionVars().StmtCtx.IgnoreNoPartition && ok && terr.Code() == errno.ErrNoPartitionForGivenValue {
				return false, nil
			}
			return updated, err
		}
	} else { 
		// 更新記錄行
		if err = t.UpdateRecord(ctx, sctx, h, oldData, newData, modified); err != nil {
			if terr, ok := errors.Cause(err).(*terror.Error); sctx.GetSessionVars().StmtCtx.IgnoreNoPartition && ok && terr.Code() == errno.ErrNoPartitionForGivenValue {
				return false, nil
			}
			return false, err
		}

	}
	... 
	return true, nil
}

updateRecord 會判斷行數據有沒有被更改,如果有被更改,那麼分為兩種情況:

  1. 主鍵被更改了,那麼會先將原數據刪除,再添加一條新的數據;
  2. 唯一鍵被更改會調用 UpdateRecord 更新記錄行;
func (t *TableCommon) UpdateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, oldData, newData []types.Datum, touched []bool) error {
	txn, err := sctx.Txn(true)
	if err != nil {
		return err
	} 
	memBuffer := txn.GetMemBuffer()
	...
	// 重建索引記錄
	err = t.rebuildIndices(sctx, txn, h, touched, oldData, newData, table.WithCtx(ctx))
	if err != nil {
		return err
	}
	// 構建行記錄key
	key := t.RecordKey(h)
	sc, rd := sessVars.StmtCtx, &sessVars.RowEncoder
	// 構建行記錄value
	value, err := tablecodec.EncodeRow(sc, row, colIDs, nil, nil, rd) 
	if err != nil {
		return err
	}
	// 將數據添加到事務緩存中
	if err = memBuffer.Set(key, value); err != nil { 
		return err
	}
	memBuffer.Release(sh)
	...
	return nil
}

UpdateRecord 中執行的邏輯和 AddRecord 有點類似,首先會調用 rebuildIndices 將舊的索引記錄刪除,重新構建新的索引;然後根據當前的行記錄構建 key-value 添加到事務緩存中。

最後用一張圖總結一下這個過程:

tidb7

總結

這篇文章 debug 用了蠻長時間的,想要弄清楚其中的邏輯非常不容易,但是還有一些地方沒弄明白,如在執行 ON DUPLICATE會更新數據行,那麼數據一致性怎麼保證的?這些疑問我想到時候留給事務章節去弄明白。

Reference

//pingcap.com/zh/blog/tidb-source-code-reading-4

//pingcap.com/zh/blog/tidb-source-code-reading-16

掃碼_搜索聯合傳播樣式-白色版 1

Tags: