Spark SQL源码解析(三)Analysis阶段分析

Spark SQL原理解析前言:
Spark SQL源码剖析(一)SQL解析框架Catalyst流程概述

Spark SQL源码解析(二)Antlr4解析Sql并生成树

Analysis阶段概述

首先,这里需要引入一个新概念,前面介绍SQL parse阶段,会使用antlr4,将一条SQL语句解析成语法树,然后使用antlr4的访问者模式遍历生成语法树,也就是Logical Plan。但其实,SQL parse这一阶段生成的Logical Plan是被称为Unresolved Logical Plan。所谓unresolved,就是说SQL语句中的对象都是未解释的。

比如说一条语句SELECT col FROM sales,当我们不知道col的具体类型(Int,String,还是其他),甚至是否在sales表中有col这一个列的时候,就称之为是Unresolved的。

而在analysis阶段,主要就是解决这个问题,也就是将Unresolved的变成Resolved的。Spark SQL通过使用Catalyst rule和Catalog来跟踪数据源的table信息。并对Unresolved应用如下的rules(rule可以理解为一条一条的规则,当匹配到树某些节点的时候就会被应用)。

  • 从Catalog中,查询Unresolved Logical Plan中对应的关系(relations)
  • 根据输入属性名(比如上述的col列),映射到具体的属性
  • 确定哪些属性引用相同的值并赋予它们唯一的ID(这个是论文中的内容,看不是很明白,不过主要是方便后面优化器实现的)
  • Propagating and coercing types through expressions,这个看着也是有点迷,大概是对数据进行强制转换,方便后续对1 + col 这样的数据进行处理。

而处理过后,就会真正生成一棵Resolved Logical Plan,接下来就去看看源码里面是怎么实现的吧。

Analysis阶段详细解析

通过跟踪调用代码,在调用完SQL parse的内容后,就会跑去org.apache.spark.sql.execution.QueryExecution这个类中执行,后面包括Logical Optimization阶段,Physical Planning阶段,生成RDD任务阶段都是在这个类中进行调度的。不过此次只介绍Analysis。

在QueryExecution中,会去调用org.apache.spark.sql.catalyst.Analyzer这个类,这个类是继承自org.apache.spark.sql.catalyst.rules.RuleExecutor,记住这个,后面还有很多个阶段都是通过继承这个类实现的,实现原理也和Analysis阶段相似。

继承自RuleExecutor的类,包括这里的Analyzer类,都是在自身实现大量的rule,然后注册到batch变量中,这里大概贴点代码瞅瞅。

class Analyzer(
    catalog: SessionCatalog,
    conf: SQLConf,
    maxIterations: Int)
  extends RuleExecutor[LogicalPlan] with CheckAnalysis {
  
  ......其他代码
    lazy val batches: Seq[Batch] = Seq(
    Batch("Hints", fixedPoint,
      new ResolveHints.ResolveBroadcastHints(conf),
      ResolveHints.ResolveCoalesceHints,
      ResolveHints.RemoveAllHints),
    Batch("Simple Sanity Check", Once,
      LookupFunctions),
    Batch("Substitution", fixedPoint,
      CTESubstitution,
      WindowsSubstitution,
      EliminateUnions,
      new SubstituteUnresolvedOrdinals(conf)),
      
    ......其他代码
  

先大概说下batches这个变量吧,batches是由Batch的列表构成。而Batch的具体签名如下:

abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
  ......其他代码
  protected case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*)
  ......其他代码
}

一个Batch由策略Strategy,和一组Rule构成,其中策略Strategy主要是区分迭代次数用的,按我的理解,某些rule可以迭代多次,越多次效果会越好,类似机器学习的学习过程。而策略Strategy会规定迭代一次还是固定次数。而rule就是具体的应用规则了,这个先略过。

在Analyzer这个类中,你会发现很大篇幅的代码都是各种各样rule的实现。然后最终,Analyzer会去调用super.execute()方法,也就是调用父类(RuleExecutor)的方法执行具体逻辑。而父类又会去调用这个batches变量,循环来与Sql Parse阶段生成的Unresolved Logical Plan做匹配,匹配到了就执行具体的验证。还是贴下代码看看吧。

abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
  def execute(plan: TreeType): TreeType = {
    var curPlan = plan
    val queryExecutionMetrics = RuleExecutor.queryExecutionMeter
    //遍历Analyzer中定义的batchs变量
    batches.foreach { batch =>
      val batchStartPlan = curPlan
      var iteration = 1
      var lastPlan = curPlan
      var continue = true

      // Run until fix point (or the max number of iterations as specified in the strategy.
	  //这里的continue决定是否再次循环,由batch的策略(固定次数或单次),以及该batch对plan的作用效果这两者控制
      while (continue) {
	    //调用foldLeft让batch中每条rule应用于plan,然后就是执行对应rule规则逻辑了
        curPlan = batch.rules.foldLeft(curPlan) {
          case (plan, rule) =>
            val startTime = System.nanoTime()
            val result = rule(plan)
            val runTime = System.nanoTime() - startTime

            if (!result.fastEquals(plan)) {
              queryExecutionMetrics.incNumEffectiveExecution(rule.ruleName)
              queryExecutionMetrics.incTimeEffectiveExecutionBy(rule.ruleName, runTime)
              logTrace(
                s"""
                  |=== Applying Rule ${rule.ruleName} ===
                  |${sideBySide(plan.treeString, result.treeString).mkString("\n")}
                """.stripMargin)
            }
            queryExecutionMetrics.incExecutionTimeBy(rule.ruleName, runTime)
            queryExecutionMetrics.incNumExecution(rule.ruleName)

            // Run the structural integrity checker against the plan after each rule.
            if (!isPlanIntegral(result)) {
              val message = s"After applying rule ${rule.ruleName} in batch ${batch.name}, " +
                "the structural integrity of the plan is broken."
              throw new TreeNodeException(result, message, null)
            }

            result
        }
        iteration += 1
		//策略的生效地方
        if (iteration > batch.strategy.maxIterations) {
          // Only log if this is a rule that is supposed to run more than once.
          if (iteration != 2) {
            val message = s"Max iterations (${iteration - 1}) reached for batch ${batch.name}"
            if (Utils.isTesting) {
              throw new TreeNodeException(curPlan, message, null)
            } else {
              logWarning(message)
            }
          }
          continue = false
        }

        if (curPlan.fastEquals(lastPlan)) {
          logTrace(
            s"Fixed point reached for batch ${batch.name} after ${iteration - 1} iterations.")
          continue = false
        }
        lastPlan = curPlan
      }

      if (!batchStartPlan.fastEquals(curPlan)) {
        logDebug(
          s"""
            |=== Result of Batch ${batch.name} ===
            |${sideBySide(batchStartPlan.treeString, curPlan.treeString).mkString("\n")}
          """.stripMargin)
      } else {
        logTrace(s"Batch ${batch.name} has no effect.")
      }
    }

    curPlan
  }

}

其实这个类的逻辑不难懂,就是遍历batchs变量,而每个batch又会去使用scala的foldLeft函数,遍历应用里面的每条rule。然后根据Batch的策略以及将新生成的Plan与旧的Plan比较,决定是否要再次遍历。然后最后将新生成的Plan输出。

如果不清楚scala的foldLeft函数内容,可以百度下看看,不难懂的。然后跟RuleExecutor有关的基本都是这个套路,区别只在于rule的不同。

接下来我们来看看具体是如果应用一条rule,将Unresolved LogicalPlan转换成Resolved LogicalPlan吧。

Rule介绍

前面说到,在Analyzer中重写了Batchs变量,Batchs包含多个Batch,每个Batch又有多个Rule,所以不可能全部看过来,庆幸的是,要了解Unresolved LogicalPlan转换成Resolved LogicalPlan,只需要看一个Rule就行,那就是ResolveRelations这个Rule,我们就只介绍这个Rule来管中窥豹。

各自Rule基本都是object类型,也就是静态的,且继承自Rule这个抽象类,Rule很简单,就一个ruleName变量喝一个apply方法用以实现逻辑,然后就没了。所以重点还是在继承后的实现逻辑。

前面提到,从Unresolved到Resolved的过程,可以理解为就是将SQL语句中的类型和字段,映射到实体表中的字段信息。而存储实体表元数据信息的,是Catalog,到具体的类,是org.apache.spark.sql.catalyst.catalog.SessionCatalog。

我们来看看具体的逻辑代码:

object ResolveRelations extends Rule[LogicalPlan] {
    ......其他代码
    def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
      case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved =>
        EliminateSubqueryAliases(lookupTableFromCatalog(u)) match {
          case v: View =>
            u.failAnalysis(s"Inserting into a view is not allowed. View: ${v.desc.identifier}.")
          case other => i.copy(table = other)
        }
      case u: UnresolvedRelation => resolveRelation(u)
    }
	......其他代码
}

逻辑其实也蛮简单的,就是匹配UnresolvedRelation(就是Unresolved的节点),然后递归去Catlog中获取对应的元数据信息,递归将它及子节点变成Resoulved。不过还有个要提的是,SQL中对应的,有可能是文件数据,或是数据库中的表,抑或是视图(view),针对文件数据是不会转换的,转换成Resolved会在后面进行。而表和视图则会立即转换。

最后,接上一篇的例子,接着来看看,经过Analysis阶段后,LogicalPlan变成什么样吧,上一篇SQL parse使用的示例代码:

    //生成DataFrame
    val df = Seq((1, 1)).toDF("key", "value")
    df.createOrReplaceTempView("src")
    //调用spark.sql
    val queryCaseWhen = sql("select key from src ")

经过上次介绍的SQL parse后是变成这样:

'Project ['key]
+- 'UnresolvedRelation `src`

这里的涵义上篇已介绍,不再赘述,而经过本次的Analysis后,则会变成这样

Project [key#5]
+- SubqueryAlias `src`
   +- Project [_1#2 AS key#5, _2#3 AS value#6]
      +- LocalRelation [_1#2, _2#3]

可以发现,主要就是对UnresolvedRelation进行展开,现在我们可以发现src有两个字段,分别是key和value及其对应的别名(1#2,2#3)。这里还有一个SubqueryAlias,这个我也不是很明白,按源码里面的说法,这里的subquery仅用来提供属性的作用域信息,Analysis阶段过后就就可以将其删除,所以在Optimization阶段后会发现SubqueryAlias消失了。

小结

OK,那今天就先介绍到这里吧,主要综述了Analysis的内容,然后介绍RuleExecution的逻辑,最后简单看了个Rule的具体内容以及承接SQL parse阶段的例子。有兴趣的童鞋可以自己去顺着思路翻源码看看。

以上~