以 Debug 形式,深入理解 Phoenix 全局索引

  • 2019 年 11 月 21 日
  • 筆記

圖片來自網絡,祝福發自內心^_^

文章作者:吳少傑

編輯整理:Hoh Xil

內容來源:作者授權

出品社區:DataFun

註:歡迎轉載,轉載請註明出處

導讀:上一篇文章:從理解 Phoenix 索引源碼開始,構建全局索引,採用靜態分析源碼的方式研究 Phoenix,後面會通過 debug 的形式深入研究,這樣可以直觀地觀察 Phoenix 運行的過程和邏輯,也有利於理解各個類的具體作用及其內部邏輯。

第六章

Debug 之前,需要寫一段簡單代碼,用以驅動調試過程。

  • PhoenixConnection connection = (PhoenixConnection)DriverManager.getConnection( "jdbc:phoenix:local:2181", "test", "test"); PhoenixStatement statement = (PhoenixStatement)connection.createStatement();String sql = "select /*+ INDEX(test idx_test_email) */ * from test where email='[email protected]'";PhoenixResultSet explainRes = (PhoenixResultSet)statement.executeQuery("explain "+sql);while (explainRes.next()){ logger.info("explain: {}",explainRes.getString(1));} PhoenixResultSet resultSet = (PhoenixResultSet)statement.executeQuery(sql);while (resultSet.next()){ logger.info("id: {},name: {},email: {}", resultSet.getString("id"), resultSet.getString("name"), resultSet.getString("email"));}explainRes.close();resultSet.close();statement.close();connection.close();} catch (SQLException e) {e.printStackTrace();}

請注意上面的代碼中:

connection/statement/resultSet 等類型已經被強制轉換成了 Phoenix 對應的類型,這樣方便IDEA直接跳轉到具體的實現,否則就會跳轉到 java 的 interface 中。這也算一個小技巧吧,調試其他源碼的時候,最好轉換成具體實現的類。

CLIENT 1-CHUNK PARALLEL 1-WAY ROUND ROBIN FULL SCAN OVER TEST    SKIP-SCAN-JOIN TABLE 0        CLIENT 1-CHUNK PARALLEL 1-WAY ROUND ROBIN RANGE SCAN OVER IDX_TEST_EMAIL ['[email protected]']            SERVER FILTER BY FIRST KEY ONLY    DYNAMIC SERVER FILTER BY "TEST.ID" IN ($2.$4)

上面是 explain 得到的 SQL 執行計劃,很明顯 Phoenix 使用了 idx_test_email 這個索引。

根據之前文章的分析,我們可以在 executeQuery 內部設置一個斷點,看一下解析後的 statement 的數據。

SelectStatement 這個類的變量不算多,但涵蓋了 select 查詢的所有信息。此處我們只關注幾個重要的變量:fromTable、hint、where。不過這裡要特別注意 hint 變量,是一個 HintNode 類型,擴展全文索引的時候,會修改這個類的。

之前也分析過,SelectStatement 會被編譯成 QueryPlan,下面是 QueryPlan 的具體字段值。

plan = {ScanPlan@3907}  splits = null scans = null allowPageFilter = true isSerial = false isDataToScanWithinThreshold = false serialRowsEstimate = null serialBytesEstimate = null serialEstimateInfoTs = null tableRef = {TableRef@3910}  tableRefs = {SingletonImmutableSet@3911}  size = 1 context = {StatementContext@3912}   resolver = {FromCompiler$ProjectedTableColumnResolver@3920}   binds = {BindManager@3921}   scan = {Scan@3922} "{"loadColumnFamiliesOnDemand":true,"filter":"EMAIL = '[email protected]'","startRow":"","stopRow":"","batch":-1,"cacheBlocks":true,"totalColumns":1,"maxResultSize":-1,"families":{"0":["ALL"]},"caching":2147483647,"maxVersions":1,"timeRange":[0,9223372036854775807]}"  expressions = {ExpressionManager@3923}   aggregates = {AggregationManager@3924}   dateFormat = "yyyy-MM-dd HH:mm:ss.SSS"  dateFormatter = {FastDateFormat@3926} "FastDateFormat[yyyy-MM-dd HH:mm:ss.SSS]"  timeFormat = "yyyy-MM-dd HH:mm:ss.SSS"  timeFormatter = {FastDateFormat@3926} "FastDateFormat[yyyy-MM-dd HH:mm:ss.SSS]"  timestampFormat = "yyyy-MM-dd HH:mm:ss.SSS"  timestampFormatter = {FastDateFormat@3926} "FastDateFormat[yyyy-MM-dd HH:mm:ss.SSS]"  dateFormatTimeZone = {ZoneInfo@3927} "sun.util.calendar.ZoneInfo[id="GMT",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null]"  numberFormat = "#,##0.###"  tempPtr = {ImmutableBytesWritable@3929} ""  statement = {PhoenixStatement@3903}   dataColumns = {LinkedHashMap@3930}  size = 0  currentTime = -1  scanRanges = {ScanRanges@3931} "ScanRanges[[]]"  sequences = {SequenceManager@3932}   currentTable = {TableRef@3910}   whereConditionColumns = {ArrayList@3933}  size = 1  subqueryResults = {HashMap@3934}  size = 0  readMetricsQueue = {ReadMetricQueue@3935}   overAllQueryMetrics = {OverAllQueryMetrics@3936}   queryLogger = null  isClientSideUpsertSelect = false statement = {PhoenixStatement$ExecutableSelectStatement@3904} "SELECT /*+ INDEX(TEST IDX_TEST_EMAIL) */  *  FROM TEST  WHERE EMAIL = '[email protected]'" projection = {RowProjector@3913} "[ID,NAME,EMAIL]" paramMetaData = {PhoenixParameterMetaData@3914}  limit = null offset = null orderBy = {OrderByCompiler$OrderBy@3915}  groupBy = {GroupByCompiler$GroupBy$1@3916}  parallelIteratorFactory = {ParallelIteratorFactory$1@3917}  dynamicFilter = null dataPlan = null estimatedRows = null estimatedSize = null estimateInfoTimestamp = null getEstimatesCalled = false

大家可以重點查看 scan 字段的邏輯,簡單來看就是對 TEST 表的全文檢索,filter 是 EMAIL='[email protected]'。到這裡還沒有走索引,需要繼續執行代碼。

plan = connection.getQueryServices().getOptimizer().optimize(PhoenixStatement.this, plan);

上面代碼是對 QueryPlan 進行優化,也就是執行計劃優化。這裡要具體看一下優化後的 plan 是怎麼樣的。

plan = {HashJoinPlan@3967}  statement = {SelectStatement@3970} "SELECT /*+ NO_INDEX */ TEST.* FROM TEST  Semi JOIN (SELECT /*+ INDEX(TEST IDX_TEST_EMAIL) */ 1 $3,":ID" $4 FROM "IDX_TEST_EMAIL"  WHERE "0:EMAIL" = '[email protected]') ON ("ID" = $2.$4)" joinInfo = {HashJoinInfo@3971}  subPlans = {HashJoinPlan$HashSubPlan[1]@3972}   0 = {HashJoinPlan$HashSubPlan@3978}    index = 0   plan = {TupleProjectionPlan@3979}     tupleProjector = {TupleProjector@3984} "TUPLE-PROJECTOR {[1, "ID"] ==> [INTEGER, VARCHAR]}"    postFilter = null    delegate = {ScanPlan@3985}      splits = null     scans = null     allowPageFilter = true     isSerial = false     isDataToScanWithinThreshold = false     serialRowsEstimate = null     serialBytesEstimate = null     serialEstimateInfoTs = null     tableRef = {TableRef@3987}      tableRefs = {SingletonImmutableSet@3988}  size = 1     context = {StatementContext@3989}      statement = {SelectStatement@3990} "SELECT /*+ INDEX(TEST IDX_TEST_EMAIL) */ 1 $3,":ID" $4 FROM "IDX_TEST_EMAIL"  WHERE "0:EMAIL" = '[email protected]'"     projection = {RowProjector@3991} "[1,"ID"]"     paramMetaData = {PhoenixParameterMetaData@3992}      limit = null     offset = null     orderBy = {OrderByCompiler$OrderBy@3993}      groupBy = {GroupByCompiler$GroupBy$1@3994}      parallelIteratorFactory = {ParallelIteratorFactory$1@3995}      dynamicFilter = null     dataPlan = null     estimatedRows = null     estimatedSize = null     estimateInfoTimestamp = null     getEstimatesCalled = false   hashExpressions = null   singleValueOnly = false   keyRangeLhsExpression = {RowKeyColumnExpression@3980} ""TEST.ID""   keyRangeRhsExpression = {ProjectedColumnExpression@3981} "$2.$4" recompileWhereClause = false tableRefs = {HashSet@3974}  size = 2 maxServerCacheTimeToLive = 30000 serverCacheLimit = 104857600 dependencies = {HashMap@3975}  size = 0 hashClient = null firstJobEndTime = null keyRangeExpressions = null estimatedRows = null estimatedBytes = null estimateInfoTs = null getEstimatesCalled = false delegate = {ScanPlan@3976}

這是優化後的執行計劃,SQL 語句被優化成 semi join 了。有沒有很熟悉的感覺?作者在 HBase Meetup 中,有提到過這種優化方案,就是直接用 join 強制走索引。那麼這裡有必要簡單介紹一下 semi joiin 的概念。

所謂的 semi-join 是指 semi-join 子查詢。 當一張表在另一張表找到匹配的記錄之後,半連接 ( semi-jion ) 返回第一張表中的記錄。與條件連接相反,即使在右節點中找到幾條匹配的記錄,左節點 的表也只會返回一條記錄。另外,右節點的表一條記錄也不會返回。半連接通常使用 IN 或 EXISTS 作為連接條件。 https://blog.csdn.net/lppl010_/article/details/80301757

作者竟誤打誤撞,竟然猜出了 Phoenix 索引使用的原理。讀者一定要深刻理解這個機制,有助於我們實現全文檢索索引。也要注意優化後的 plan 類型是 HashJoinPlan。

/** *  * Interface for an executable query plan * *  * @since 0.1 */public interface QueryPlan extends StatementPlan ;/** * Get a result iterator to iterate over the results * @return result iterator for iterating over the results * @throws SQLException */public ResultIterator iterator() throws SQLException;/** *  * @return whether underlying {@link ResultScanner} can be picked up in a round-robin  * fashion. Generally, selecting scanners in such a fashion is possible if rows don't * have to be returned back in a certain order. * @throws SQLException  */public boolean useRoundRobinIterator() throws SQLException;

在分析 HashJoinPlan 之前,先看一下 QueryPlan 接口的定義及其重要的函數。

iterator 返回一個 ResultIterator 用來迭代數據。那麼 HashJoinPlan 的這個 iterator 方法就非常重要了,當然 iterator 有幾種不同的重載形式,也需要關注下。

根據對 HashJoinPlan 的調試追蹤,定位到了以下函數:

public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException

這個函數的代碼有點多,但仔細分析其邏輯,可以知道該函數前半段,將涉及到的子查詢放到 Future 後台執行,所有子查詢結束後其數據放到 ServerCache,dependencies 存放各個子查詢對應的 ServerCache。最終用 dependencies 改寫當前 QueryPlan 的 delegate。簡單來說就是後台並行執行子查詢,將結果用以改寫當前執行計劃的 scan 對象。

iterator = {RoundRobinResultIterator@4319}  threshold = 2147483646 numScannersCacheExhausted = 0 resultIterators = {ParallelIterators@4325} "ResultIterators [name=PARALLEL,id=8f373aba-e360-94d0-ae67-3774e06c338e,scans=[[{"loadColumnFamiliesOnDemand":true,"startRow":"1","stopRow":"1\x00","batch":-1,"cacheBlocks":true,"totalColumns":3,"maxResultSize":-1,"families":{"0":["\x00\x00\x00\x00","\x80\x0B","\x80\x0C"]},"caching":2147483647,"maxVersions":1,"timeRange":[0,9223372036854775807]}]]]"  iteratorFactory = {ParallelIteratorFactory$1@4328}   initFirstScanOnly = false  scans = {ArrayList@4329}  size = 1  splits = {SingletonImmutableList@4330}  size = 1  physicalTableName = {byte[4]@4331}   plan = {ScanPlan@3968}   scanId = "8f373aba-e360-94d0-ae67-3774e06c338e"  mutationState = {MutationState@4333}   scanGrouper = {DefaultParallelScanGrouper@3967}   allFutures = {ArrayList@4334}  size = 0  estimatedRows = {Long@4335} 1  estimatedSize = {Long@4336} 206  estimateInfoTimestamp = {Long@4337} 0  hasGuidePosts = false  scan = {Scan@4249} "{"loadColumnFamiliesOnDemand":true,"startRow":"1","stopRow":"1\x00","batch":-1,"cacheBlocks":true,"totalColumns":3,"maxResultSize":-1,"families":{"0":["\x00\x00\x00\x00","\x80\x0B","\x80\x0C"]},"caching":2147483647,"maxVersions":1,"timeRange":[0,9223372036854775807]}"  useStatsForParallelization = true  caches = {HashMap@3974}  size = 0  dataPlan = null  context = {StatementContext@4269}   tableRef = {TableRef@4338}   groupBy = {GroupByCompiler$GroupBy$1@4339}   orderBy = {OrderByCompiler$OrderBy@4340}   hint = {HintNode@4341} "/*+ NO_INDEX */ "  limit = null  offset = null openIterators = {ArrayList@4326}  size = 0 index = 0 closed = false plan = {ScanPlan@3968}  numParallelFetches = 0

這是最終返回的 iterator 對象。可以看出 Scan 對象的 startRow/stopRow 已經被替換成了 email='[email protected]' 的 ID 值,也就是1。到此為止也就通過索引表改寫了源 SQL 的執行計劃。

debug 分析到這裡就結束了,仍然還有很多細節沒有探討清楚,感興趣的讀者可以自行 debug。

semi join 生效條件 https://blog.csdn.net/lppl010_/article/details/80301699

第七章

經過前面的分析,我們知道 PhoenixSQL 會經過優化器改寫、優化,索引會被翻譯成對應的索引表。接下來會介紹 SQL 各種不同的形式及其優化後的共同特點,以便擴展全文索引。

考慮到 SQL 編譯的複雜性,以及自身精力的有限性,Phoenix 自身的執行優化引擎的源碼不再分析,我會設計一些 SQL,用以探測優化後的執行 SQL 形式,由此來觀察 Phoenix 查詢索引的方式。

通過前面的文章我們知道,在 PhoenixStatement.executeQuery 方法通過 QueryOptimizer 對初步生成的執行計划進行了優化。

plan = connection.getQueryServices().getOptimizer().optimize(PhoenixStatement.this, plan);

此處我們需要查看優化後的 plan 對應的 SQL,該如何查看呢?總不能下一個斷點,每次都停到這裡看一下吧。這裡介紹一個小技巧,就是在不中斷程序的情況下,打印調試代碼的變量值。

在斷點標誌上面點擊鼠標右鍵,會彈出會話框。可以看出居然還可以根據某個條件進行中斷,不過這不是我們關注的點。

點擊 「More」 出現下面的對話框:

上面有 「Evaluate and log」 可以填寫我們要打印的內容,此處可以用上下文的變量,然後單擊 done,然後 debug 執行我們的代碼,可以看到執行的每個 SQL 都有對應的、優化後的 SQL。

認真觀察上面的輸出,我們發現一個規律:所有能走索引的 SQL 都會用對應索引表的表名替代原表名,並且過濾條件也會透傳給索引表。

可是這個規律說明什麼呢?

這就意味着,查詢外部索引時,所有的信息都已經有了!包括查詢的字段,過濾條件,聚合條件等。我們只需要根據這些條件查詢外部索引就行了。

總結

經過前面的分析,我們已經對 Phoenix 索引的創建、維護、使用有了簡單的了解。接下來用思維導圖為大家簡單總結下:

上面的圖畫的比較簡單,但可以用來幫助讀者分析實現全文檢索的基本過程。其實還是有很多細節沒有講解清楚的。比如字段如何投射、聚合如何實現、排序如何實現,索引表的 ROWKEY 如何編碼、事務表有沒有特殊的地方、如何做單元測試、索引字段如何與 ES(SOLR) 進行映射、Phoenix查詢條件如何映射成 ES 查詢代碼、全文檢索實現過程如何更加通用以便適配 ES 和 SOLR、如何度量全文檢索的性能、創建索引時如何將屬性透傳給 ES。

具體實現的過程還是比較複雜的,另外 Phoenix 的代碼質量並不是特別高,很多地方實現方式不統一,既要兼顧對源碼的侵入性小,又要兼顧實現的通用型,作者還是着實下了一番功夫的。

作者介紹

吳少傑,愛好大數據生態的技術和框架,對數倉架構和實時計算比較熟悉,目前主要從事大數據開發和架構的工作

——END——

文章推薦:

Apache Beam 架構原理及應用實踐

快手 Druid 精確去重的設計和實現

基於Flink的嚴選實時數倉實踐

關於 DataFun:

您的「在看」,我的動力!?