TiKV源碼解析系列文章(十五)表達式計算框架
- 2019 年 12 月 4 日
- 筆記
上一篇 《TiKV 源碼解析系列文章(十四)Coprocessor 概覽》講到了 TiDB 為了最大化利用分散式計算能力,會盡量將 Selection 運算元、聚合運算元等運算元下推到 TiKV 節點上。本文將繼續介紹 Coprocessor 中表達式計算框架的源碼架構,帶大家看看 SQL 中的表達式是如何在 Coprocessor 中執行的。
什麼是表達式
比如說我們用這個 SQL 作為例子:
SELECT (count * price) AS sum FROM orders WHERE order_id < 100
其中 order_id < 10
就是一個表達式,它有一個列輸入參數: order_id
,輸出:Bool
。
RPN 表達式
因為 TiDB 下推的是樹狀結構表達式,所以我們需要選擇一種樹的遍歷方式, 這裡 Coprocessor 選擇了由下而上遞推的 RPN(逆波蘭表示法)。RPN 是樹的後序遍歷,後序遍歷在每個節點知道自己有幾個子節點的時候等價於原本的樹結構。
比如說我們有一個數學算式 2 *(3 + 4)+ 5
:

由於數學上習慣寫法是中序遍歷,我們通常要加上括弧消除歧義(比如加減和乘除的順序)。通過把操作符後移我們得到 RPN:2 3 4 + * 5 +
,這樣我們無需括弧就能無歧義地遍歷這個表達式:
1. 執行 RPN 的過程需要一個棧來快取中間結果,比如說對於 2 3 4 + * 5 +
,我們從左到右遍歷表達式,遇到值就壓入棧中。直到 +
操作符,棧中已經壓入了 2 3 4
。

2. 因為 +
是二元操作符,需要從棧中彈出兩個值 3 4
,結果為 7
,重新壓入棧中:


3. 此時棧中的值為 2 7
。

4. 下一個是 *
運算符,也需要彈出兩個值 2 7
,結果為 14
壓入棧中。

5. 接著壓入 5
。

6. 最後 +
運算符彈出 14 5
,結果為 19
,壓入棧。

7. 最後留在棧里的就是表達式的結果。
構建 RPN 表達式
以表達式 order_id < 10
下推為例,其下推的樹狀表達式如下圖所示,其中 ColumnRef(2)
表示列 order_id
,2
表示 order_id
列在該表結構中對應的 offset:

轉化為 RPN 表達式:

Coprocessor 中表達式的定義:
/// An expression in Reverse Polish notation, which is simply a list of RPN expression nodes. /// /// You may want to build it using `RpnExpressionBuilder`. #[derive(Debug, Clone)] pub struct RpnExpression(Vec<RpnExpressionNode>); /// A type for each node in the RPN expression list. #[derive(Debug, Clone)] pub enum RpnExpressionNode { /// Represents a function call. FnCall { func_meta: RpnFnMeta, args_len: usize, field_type: FieldType, implicit_args: Vec<ScalarValue>, }, /// Represents a scalar constant value. Constant { value: ScalarValue, field_type: FieldType, }, /// Represents a reference to a column in the columns specified in evaluation. ColumnRef { offset: usize }, }
執行 RPN 表達式
有了表達式後,接下來我們需要執行表達式,為此我們要使用一個棧結構來快取中間值。由於表達式中的操作符(RpnExpressionNode::FnCall
)不會被存入棧,我們定義了只包含值的 RpnStackNode
儲存中間值:
// A type for each node in the RPN evaluation stack. It can be one of a scalar value node or a /// vector value node. The vector value node can be either an owned vector value or a reference. #[derive(Debug)] pub enum RpnStackNode<'a> { /// Represents a scalar value. Comes from a constant node in expression list. Scalar { value: &'a ScalarValue, field_type: &'a FieldType, }, /// Represents a vector value. Comes from a column reference or evaluated result. Vector { value: RpnStackNodeVectorValue<'a>, field_type: &'a FieldType, }, }
注意,Coprocessor 中表達式是向量化計算的,每次都盡量會計算多行,通常為 1024 行,即 op([]value, []value)
而不是 op(value, value)
,從而減少分支並提高 Cache Locality。但運算數並不總是一個來自列的向量,還可能是用戶直接指定的常量(例如 SELECT a+1
中 a
是向量,但 1
只是標量)。因此,RpnStackNode
分兩種:
- 標量:由
Constant
生成。 - 向量:執行
ColumnRe f
生成,或是FnCall
調用返回的結果。
另外為了避免 Selection 運算元移動大量的數據,向量使用了間接的儲存方式,每個向量有真實數據和邏輯索引,只有邏輯索引中對應的真實數據才是邏輯有效的,這樣 Selection 運算元便可以只需改動邏輯索引而不需搬動大量的真實數據:
/// Represents a vector value node in the RPN stack. /// /// It can be either an owned node or a reference node. /// /// When node comes from a column reference, it is a reference node (both value and field_type /// are references). /// /// When nodes comes from an evaluated result, it is an owned node. #[derive(Debug)] pub enum RpnStackNodeVectorValue<'a> { Generated { physical_value: VectorValue, logical_rows: Arc<[usize]>, }, Ref { physical_value: &'a VectorValue, logical_rows: &'a [usize], }, }
接下來我們用上面的 order_id < 100
作為例子看看錶達式是如何執行的。
1. 首先我們準備好一個棧結構:

2. 接著逐一遍歷表達式,第一個取出的是 ColumnRef
,我們取出輸入 Selection 運算元的數據中對應 offset 的列的向量數據,並將向量壓入棧:

3. 接著是 Constant
,轉化為標量然後壓入棧:

4. 最後一個是 LT
運算符,它需要兩個入參,因此我們從棧中彈出兩個值作為參數調用 LT
,LT
會生成一個新的向量,將結果壓入棧:

5. 最後留在棧里的就是表達式的執行結果。

6. Selection 運算元根據結果的布爾值過濾原輸入的邏輯索引:

7. 這樣就間接的過濾出有效數據而不用改變 Physical Vector:

實現 RPN 表達式函數
實現表達式函數(FnCall
)是比較繁瑣的。比如對於二元操作符加法, 它既可以接受其中一元輸入常量,也可以接受來自列數據的向量。一種解決方法是將標量都重複填充為向量,這樣所有函數運算都是向量參數,但這個方法會有額外的標量拷貝開銷。為了避免這個開銷,Coprocessor 直接實現了向量與標量的運算,rpn_expr_codegen
提供了過程宏 #[rpn_fn]
,我們只需定義標量邏輯,過程宏將自動生成剩下帶有向量的邏輯。
下面我們來試著定義一個整數加法操作符,這裡入參和返回值都為標量即可,源碼的實現引入了泛型更進一步將其抽象為所有數值類型間的加法:
#[rpn_fn] #[inline] pub fn int_plus_int( lhs: &Option<Int>, rhs: &Option<Int>, ) -> Result<Option<Int>> { if let (Some(lhs), Some(rhs)) = (arg0, arg1) { lhs.checked_add(*rhs) .ok_or_else(|| Error::overflow("BIGINT", &format!("({} + {})", lhs, rhs)).into()) .map(Some) } else { Ok(None) } }
#[rpn_fn]
宏會分析這個操作符定義的參數數量和類型,自動生成既可以處理標量也可以處理向量的 int_plus_int_fn_meta()
,這個函數將可以放進 FnCall
被用於表達式計算:
pub fn int_plus_int_fn_meta( _ctx: &mut EvalContext, output_rows: usize, args: &[RpnStackNode<'_>], _extra: &mut RpnFnCallExtra<'_>, ) -> Result<VectorValue> { assert!(args.len() >= 2); let lhs = args[0]; let rhs = args[1]; let mut result: Vec<Int> = Vec::with_capacity(output_rows); match lhs { RpnStackNode::Scalar { value: ScalarValue::Int(lhs) , .. } => { match rhs { RpnStackNode::Scalar { value: ScalarValue::Int(rhs) , .. } => { let value = int_plus_int(lhs, rhs); result.push(result); } RpnStackNode::Vector { value: VectorValue::Int(rhs_vector) , .. } => { for rhs_row in rhs_vector.logical_rows() { let rhs = rhs_vector.physical_value[rhs_row]; let value = int_plus_int(lhs, rhs); result.push(result); } } _ => panic!("invalid expression") } } RpnStackNode::Vector { value: VectorValue::Int(lhs_vector) , .. } => { match rhs { RpnStackNode::Scalar { value: ScalarValue::Int(rhs) , .. } => { for lhs in lhs_vector { let value = int_plus_int(lhs, rhs); result.push(result); } } RpnStackNode::Vector { value: VectorValue::Int(rhs_vector) , .. } => { for (lhs, rhs) in lhs_vector.logical_rows().iter().zip(rhs_vector.logical_rows()) { let lhs = lhs_vector.physical_value[lhs_row]; let rhs = rhs_vector.physical_value[rhs_row]; let value = int_plus_int(lhs, rhs); result.push(result); } } _ => panic!("invalid expression") } } _ => panic!("invalid expression") } result }
注意:TiKV 源碼使用泛型展開生成邏輯程式碼,較為複雜,因此上面給出的這段是展開後的等價偽程式碼。
小結
以上就是 Coprocessor 表達式框架實現解析。下一篇我們將詳細介紹各運算元的內部實現。
TiKV 是一個開源的分散式事務 Key-Value 資料庫,支援跨行 ACID 事務,同時實現了自動水平伸縮、數據強一致性、跨數據中心高可用和雲原生等重要特性。作為一個基礎組件,TiKV 可作為構建其它系統的基石。目前,TiKV 已用於支援分散式 HTAP 資料庫—— TiDB 中,負責存儲數據,並已被多個行業的領先企業應用在實際生產環境。2019 年 5 月,CNCF 的 TOC(技術監督委員會)投票決定接受 TiKV 晉級為孵化項目。
· 源碼地址:https://github.com/tikv/tikv
· 更多資訊:https://tikv.org
文章轉載自PingCAP。