Spark Driver Program剖析

  SparkContext是通往Spark集群的唯一入口,是整個Application運行調度的核心。

一、Spark Driver Program

  Spark Driver Program(以下簡稱Driver)是運行Application的main函數並且新建SparkContext實例的程式。其實,初始化SparkContext是為了準備Spark應用程式的運行環境,在Spark中,由SparkContext負責與集群進行通訊、資源的申請、任務的分配和監控等。當Worker節點中的Executor運行完畢Task後,Driver同時負責將SparkContext關閉。通常也可以使用SparkContext來代表驅動程式(Driver)。

  Driver(SparkContext)整體架構圖如下所示。

   

二、SparkContext深度剖析

  SparkContext是通往Spark集群的唯一入口,可以用來在Spark集群中創建RDDs、累加器(Accumulators)和廣播變數(Broadcast Variables)。SparkContext也是整個Spark應用程式(Application)中至關重要的一個對象,可以說是整個Application運行調度的核心(不是指資源調度)。

  SparkContext的核心作用是初始化Spark應用程式運行所需要的核心組件,包括高層調度器(DAGScheduler)、底層調度器(TaskScheduler)和調度器的通訊終端(SchedulerBackend),同時還會負責Spark程式向Master註冊程式等。

  一般而言,通常為了測試或者學習Spark開發一個Application,在Application的main方法中,最開始幾行編寫的程式碼一般是這樣的:

  首先,創建SparkConf實例,設置SparkConf實例的屬性,以便覆蓋Spark默認配置文件spark-env.sh,spark-default.sh和log4j.properties中的參數;

  然後,SparkConf實例作為SparkContext類的唯一構造參數來實例化SparkContext實例對象。SparkContext在實例化的過程中會初始化DAGScheduler、TaskScheduler和SchedulerBackend,而當RDD的action觸發了作業(Job)後,SparkContext會調用DAGScheduler將整個Job劃分成幾個小的階段(Stage),TaskScheduler會調度每個Stage的任務(Task)進行處理。

  還有,SchedulerBackend管理整個集群中為這個當前的Application分配的計算資源,即Executor。

  如果用一個車來比喻Spark Application,那麼SparkContext就是車的引擎,而SparkConf是關於引擎的配置參數。說明:只可以有一個SparkContext實例運行在一個JVM記憶體中,所以在創建新的SparkContext實例前,必須調用stop方法停止當前JVM唯一運行的SparkContext實例。

  Spark程式在運行時分為Driver和Executor兩部分:Spark程式編寫是基於SparkContext的,具體包含兩方面。

  Spark編程的核心基礎RDD是由SparkContext最初創建的(第一個RDD一定是由SparkContext創建的)。

  Spark程式的調度優化也是基於SparkContext,首先進行調度優化。

  Spark程式的註冊是通過SparkContext實例化時生產的對象來完成的(其實是SchedulerBackend來註冊程式)。

  Spark程式在運行時要通過Cluster Manager獲取具體的計算資源,計算資源獲取也是通過SparkContext產生的對象來申請的(其實是SchedulerBackend來獲取計算資源的)。

  SparkContext崩潰或者結束的時候,整個Spark程式也結束。

三、SparkContext源碼解析

  SparkContext是Spark應用程式的核心。我們運行WordCount程式,通過日誌來深入了解SparkContext。

  WordCount.scala的程式碼如下。

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.HadoopRDD
import org.apache.log4j.Logger
import org.apache.log4j.Level

object wordcount {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ALL);
    
    // 第1步:創建Spark的配置對象SparkConf,設置Spark程式運行時的配置資訊,
    val conf = new SparkConf().setAppName("My First Spark APP").setMaster("local")
    
    // 第2步:創建SparkContext對象
    val sc = new SparkContext(conf)
    
    // 第3步:根據具體的數據來源來創建RDD
    val lines = sc.textFile("helloSpark.txt", 1)
    
    // 第4步:對初始的RDD進行Transformation級別的處理,如通過map、filter等
    val words = lines.flatMap{line=>line.split(" ")}
    val pairs = words.map{word=>(word,1)}
    pairs.cache()
    val wordCountsOdered = pairs.reduceByKey(_+_).saveAsTextFile("wordCountResult.log")
//    val wordCountsOdered = pairs.reduceByKey(_+_).map(
//      pair=>(pair._2,pair._1)    
//    ).sortByKey(false).map(pair=>(pair._2,pair._1))
//    wordCountsOdered.collect.foreach(wordNumberPair=>println(wordNumberPair._1+" : "+wordNumberPair._2))
//    while(true){
//      
//    }
    sc.stop()
    
    
  }
}

  在Eclipse中運行wordcount程式碼,日誌顯示如下:

   

  程式一開始,日誌里顯示的是:INFO SparkContext: Running Spark version 2.0.1,日誌中間部分是一些隨著SparkContext創建而創建的對象,另一條比較重要的日誌資訊,作業啟動了並正在運行:INFO SparkContext: Starting job: saveAsTextFile at WordCountJobRuntime.scala:58。

  在程式運行的過程中會創建TaskScheduler、DAGScheduler和SchedulerBackend,它們有各自的功能。DAGScheduler是面向Job的Stage的高層調度器;TaskScheduler是底層調度器。SchedulerBackend是一個介面,根據具體的ClusterManager的不同會有不同的實現。程式列印結果後便開始結束。日誌顯示:INFO SparkContext: Successfully stopped SparkContext。

   

  通過這個例子可以感受到Spark程式的運行到處都可以看到SparkContext的存在,我們將SparkContext作為Spark源碼閱讀的入口,來理解Spark的所有內部機制。

  我們從一個整體去看SparkContext創建的實例對象。首先,SparkContext構建的頂級三大核心為DAGScheduler、TaskScheduler、SchedulerBackend,其中,DAGScheduler是面向Job的Stage的高層調度器;TaskScheduler是一個介面,是底層調度器,根據具體的ClusterManager的不同會有不同的實現,Standalone模式下具體的實現是TaskSchedulerImpl。SchedulerBackend是一個介面,根據具體的ClusterManager的不同會有不同的實現。Standalone模式下具體的實現是StandaloneSchedulerBackend。下圖為SparkContext整體運行圖

   

  從整個程式運行的角度講,SparkContext包含四大核心對象:DAGScheduler、TaskScheduler、SchedulerBackend、MapOutputTrackerMaster。StandaloneSchedulerBackend有三大核心功能:負責與Master連接,註冊當前程式RegisterWithMaster;接收集群中為當前應用程式分配的計算資源Executor的註冊並管理Executors;負責發送Task到具體的Executor執行。

  第一步:程式一開始運行時會實例化SparkContext里的對象,所有不在方法里的成員都會被實例化!一開始實例化時第一個關鍵的程式碼是createTaskScheduler,它位於SparkContext的PrimaryConstructor中,當它實例化時會直接被調用,這個方法返回的是taskScheduler和dagScheduler的實例,然後基於這個內容又構建了DAGScheduler,最後調用taskScheduler的start()方法。要先創建taskScheduler,然後再創建dagScheduler,因為taskScheduler是受dagScheduler管理的。

  SparkContext.scala的源碼如下。

    // Create and start the scheduler
    val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
    _schedulerBackend = sched
    _taskScheduler = ts
    _dagScheduler = new DAGScheduler(this)
    _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

    // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
    // constructor
    _taskScheduler.start()

  第二步:調用createTaskScheduler,這個方法創建了TaskSchedulerImpl和StandaloneSchedulerBackend,createTaskScheduler方法的第一個入參是SparkContext,傳入的this對象是在應用程式中創建的sc,第二個入參是master的地址。

  以下是wordcount.scala創建SparkConf和SparkContext的上下文資訊

    // 第1步:創建Spark的配置對象SparkConf,設置Spark程式運行時的配置資訊,
    val conf = new SparkConf().setAppName("My First Spark APP").setMaster("local")
    
    // 第2步:創建SparkContext對象
    val sc = new SparkContext(conf)

  當SparkContext調用createTaskScheduler方法時,根據集群的條件創建不同的調度器,例如,createTaskScheduler第二個入參master如傳入local參數,SparkContext將創建TaskSchedulerImpl實例及LocalSchedulerBackend實例,在測試程式碼的時候,可以嘗試傳入local[*]或者是local[2]的參數,然後跟蹤程式碼,看看創建了什麼樣的實例對象。

  SparkContext中的SparkMasterRegex對象定義不同的正則表達式,從master字元串中根據正則表達式適配master資訊。

  SparkContext.scala的源碼如下。

/**
 * A collection of regexes for extracting information from the master string.
 */
private object SparkMasterRegex {
  // Regular expression used for local[N] and local[*] master formats
  val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r
  // Regular expression for local[N, maxRetries], used in tests with failing tasks
  val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r
  // Regular expression for simulating a Spark cluster of [N, cores, memory] locally
  val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r
  // Regular expression for connecting to Spark deploy clusters
  val SPARK_REGEX = """spark://(.*)""".r
  // Regular expression for connection to Mesos cluster by mesos:// or mesos://zk:// url
  val MESOS_REGEX = """mesos://(.*)""".r
}

  這是設計模式中的策略模式,它會根據實際需要創建出不同的SchedulerBackend的子類。

  SparkContext.scala的createTaskScheduler方法的源碼如下。

  /**
   * Create a task scheduler based on a given master URL.
   * Return a 2-tuple of the scheduler backend and the task scheduler.
   */
  private def createTaskScheduler(
      sc: SparkContext,
      master: String,
      deployMode: String): (SchedulerBackend, TaskScheduler) = {
    import SparkMasterRegex._

    // When running locally, don't try to re-execute tasks on failure.
    val MAX_LOCAL_TASK_FAILURES = 1

    master match {
      case "local" =>
        val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
        val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)
        scheduler.initialize(backend)
        (backend, scheduler)

  在實際生產環境下,我們都是用集群模式,即以spark://開頭,此時在程式運行時,框架會創建一個TaskSchedulerImpl和StandaloneSchedulerBackend的實例,在這個過程中也會初始化taskscheduler,把StandaloneSchedulerBackend的實例對象作為參數傳入。StandaloneSchedulerBackend被TaskSchedulerImpl管理,最後返回TaskScheduler和StandaloneSchdeulerBackend。

  SparkContext.scala的源碼如下。

      case SPARK_REGEX(sparkUrl) =>
        val scheduler = new TaskSchedulerImpl(sc)
        val masterUrls = sparkUrl.split(",").map("spark://" + _)
        val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
        scheduler.initialize(backend)
        (backend, scheduler)

  createTaskScheduler方法執行完畢後,調用了taskscheduler.start()方法來正式啟動taskscheduler,這裡雖然調用了taskscheduler.start方法,但實際上是調用了taskSchedulerImpl的start方法,因為taskSchedulerImpl是taskScheduler的子類。

  Task默認失敗重試次數是4次,如果任務不容許失敗,就可以調大這個參數。調大spark.task.maxFailures參數有助於確保重要的任務失敗後可以重試多次。

  初始化TaskSchedulerImpl:調用createTaskScheduler方法時會初始化TaskSchedulerImpl,然後把StandaloneSchedulerBackend當作參數傳進去,初始化TaskSchedulerImpl時首先是創建一個Pool來初定義資源分布的模式Scheduling Mode,默認是先進先出(FIFO)的模式。

  回到taskScheduler start方法,taskScheduler.start方法調用時會再調用schedulerbackend的start方法。

  SchedulerBackend包含多個子類,分別是LocalSchedulerBackend、CoarseGrainedScheduler-Backend和StandaloneSchedulerBackend、MesosCoarseGrainedSchedulerBackend、YarnScheduler-Backend。

  StandaloneSchedulerBackend的start方法調用了CoarseGraninedSchedulerBackend的start方法,通過StandaloneSchedulerBackend註冊程式把command提交給Master:Command (“org.apache.spark.executor.CoarseGrainedExecutorBackend”, args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)來創建一個StandaloneAppClient的實例。

  Master髮指令給Worker去啟動Executor所有的進程時載入的Main方法所在的入口類就是command中的CoarseGrainedExecutorBackend,在CoarseGrainedExecutorBackend中啟動Executor(Executor是先註冊,再實例化),Executor通過執行緒池並發執行Task,然後再調用它的run方法。

  回到StandaloneSchedulerBackend.scala的start方法:其中創建了一個很重要的對象,即StandaloneAppClient對象,然後調用它的client.start()方法。

  在start方法中創建一個ClientEndpoint對象。

  StandaloneAppClient.scala的star方法的源碼如下。

  def start() {
    // Just launch an rpcEndpoint; it will call back into the listener.
    endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)))
  }

  ClientEndpoint是一個RpcEndPoint,首先調用自己的onStart方法,接下來向Master註冊。

  調用registerWithMaster方法,從registerWithMaster調用tryRegisterAllMasters,開一條新的執行緒來註冊,然後發送一條資訊(RegisterApplication的case class)給Master。

  Master收到RegisterApplication資訊後便開始註冊,註冊後再次調用schedule()方法。

四、總結

  從SparkContext創建taskSchedulerImpl初始化不同的實例對象來完成最終向Master註冊的任務,中間包括調用scheduler的start方法和創建StandaloneAppClient來間接創建ClientEndPoint完成註冊工作。

  我們把SparkContext稱為天堂之門,SparkContext開啟天堂之門:Spark程式是通過SparkContext發布到Spark集群的;SparkContext導演天堂世界:Spark程式的運行都是在SparkContext為核心的調度器的指揮下進行的;SparkContext關閉天堂之門:SparkContext崩潰或者結束的時候整個Spark程式也結束。