Spark提交任务入口源码分析

  • 2020 年 3 月 30 日
  • 筆記

我们平常在使用Spark进行提交代码的时候,一般是直接在装有spark客户端的机器上提交jar包执行。运行命令如下:

/data/opt/spark-2.3.1-bin-hadoop2.7/bin/spark-submit       --class com.tencent.th.dwd.t_dwd_evt_user_action_log_s       --total-executor-cores 300 --conf spark.sql.shuffle.partitions=500       SparkV2-1.0.1.jar repartition_num=300

这里的执行入口spark-submit是什么呢?请看:

cat /data/opt/spark-2.3.1-bin-hadoop2.7/bin/spark-submit  if [ -z "${SPARK_HOME}" ]; then    source "$(dirname "$0")"/find-spark-home  fi    # disable randomized hash for string in Python 3.3+  export PYTHONHASHSEED=0  export SPARK_HOME=/data/opt/spark-2.3.1-bin-hadoop2.7/  exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

这里首先是初始化SPARK_HOME目录,然后执行编译后的类:org.apache.spark.deploy.SparkSubmit,那么这个入口类做了哪些工作呢?请看源代码:

def main(args: Array[String]): Unit = {    //这里将传入的args参数进行初始化    val appArgs = new SparkSubmitArguments(args)    //判断参数是否有效合法    if (appArgs.verbose) {      // scalastyle:off println      printStream.println(appArgs)      // scalastyle:on println    }    //判断执行类别    appArgs.action match {      case SparkSubmitAction.SUBMIT => submit(appArgs)      case SparkSubmitAction.KILL => kill(appArgs)      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)    }  }

如果提交命令正确,开始执行spark:

/**   * Submit the application using the provided parameters.   *   * This runs in two steps. First, we prepare the launch environment by setting up   * the appropriate classpath, system properties, and application arguments for   * running the child main class based on the cluster manager and the deploy mode.   * Second, we use this launch environment to invoke the main method of the child   * main class.   */  @tailrec  private def submit(args: SparkSubmitArguments): Unit = {    /**准备执行环境,这里主要得到了以下4个参数:        (1)childArgs: 子进程的参数        (2)childClasspath: 子进程的执行环境        (3)sysProps:系统参数        (4)childMainClass:子类名    **/    val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)      //开始执行Spark任务    def doRunMain(): Unit = {      //是否需要创建代理用户      if (args.proxyUser != null) {        val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,          UserGroupInformation.getCurrentUser())        try {          proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {            override def run(): Unit = {              runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)            }          })        } catch {          case e: Exception =>            // Hadoop's AuthorizationException suppresses the exception's stack trace, which            // makes the message printed to the output by the JVM not very helpful. Instead,            // detect exceptions with empty stack traces here, and treat them differently.            if (e.getStackTrace().length == 0) {              // scalastyle:off println              printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")              // scalastyle:on println              exitFn(1)            } else {              throw e            }        }      } else {        runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)      }    }

执行的时候无论创建代理用户,最后都是调用 runMain方法开始执行,在runMain方法中,先是初始化判断参数是否verbose,然后是加载jar包:

for (jar <- childClasspath) {    addJarToClasspath(jar, loader)  }

接下来做了两件核心的事情,第一个:加载要执行的类:

mainClass = Utils.classForName(childMainClass)

第二个,判断要执行的任务的入口:

val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)

最后一步,通过反射调用要执行类的任务:

mainMethod.invoke(null, childArgs.toArray)

整体来看,执行入口的代码还是比较清晰易懂的。

Exit mobile version