Spark Driver Program剖析
SparkContext是通往Spark集群的唯一入口,是整個Application運行調度的核心。
一、Spark Driver Program
Spark Driver Program(以下簡稱Driver)是運行Application的main函數並且新建SparkContext實例的程式。其實,初始化SparkContext是為了準備Spark應用程式的運行環境,在Spark中,由SparkContext負責與集群進行通訊、資源的申請、任務的分配和監控等。當Worker節點中的Executor運行完畢Task後,Driver同時負責將SparkContext關閉。通常也可以使用SparkContext來代表驅動程式(Driver)。
Driver(SparkContext)整體架構圖如下所示。
二、SparkContext深度剖析
SparkContext是通往Spark集群的唯一入口,可以用來在Spark集群中創建RDDs、累加器(Accumulators)和廣播變數(Broadcast Variables)。SparkContext也是整個Spark應用程式(Application)中至關重要的一個對象,可以說是整個Application運行調度的核心(不是指資源調度)。
SparkContext的核心作用是初始化Spark應用程式運行所需要的核心組件,包括高層調度器(DAGScheduler)、底層調度器(TaskScheduler)和調度器的通訊終端(SchedulerBackend),同時還會負責Spark程式向Master註冊程式等。
一般而言,通常為了測試或者學習Spark開發一個Application,在Application的main方法中,最開始幾行編寫的程式碼一般是這樣的:
首先,創建SparkConf實例,設置SparkConf實例的屬性,以便覆蓋Spark默認配置文件spark-env.sh,spark-default.sh和log4j.properties中的參數;
然後,SparkConf實例作為SparkContext類的唯一構造參數來實例化SparkContext實例對象。SparkContext在實例化的過程中會初始化DAGScheduler、TaskScheduler和SchedulerBackend,而當RDD的action觸發了作業(Job)後,SparkContext會調用DAGScheduler將整個Job劃分成幾個小的階段(Stage),TaskScheduler會調度每個Stage的任務(Task)進行處理。
還有,SchedulerBackend管理整個集群中為這個當前的Application分配的計算資源,即Executor。
如果用一個車來比喻Spark Application,那麼SparkContext就是車的引擎,而SparkConf是關於引擎的配置參數。說明:只可以有一個SparkContext實例運行在一個JVM記憶體中,所以在創建新的SparkContext實例前,必須調用stop方法停止當前JVM唯一運行的SparkContext實例。
Spark程式在運行時分為Driver和Executor兩部分:Spark程式編寫是基於SparkContext的,具體包含兩方面。
Spark編程的核心基礎RDD是由SparkContext最初創建的(第一個RDD一定是由SparkContext創建的)。
Spark程式的調度優化也是基於SparkContext,首先進行調度優化。
Spark程式的註冊是通過SparkContext實例化時生產的對象來完成的(其實是SchedulerBackend來註冊程式)。
Spark程式在運行時要通過Cluster Manager獲取具體的計算資源,計算資源獲取也是通過SparkContext產生的對象來申請的(其實是SchedulerBackend來獲取計算資源的)。
SparkContext崩潰或者結束的時候,整個Spark程式也結束。
三、SparkContext源碼解析
SparkContext是Spark應用程式的核心。我們運行WordCount程式,通過日誌來深入了解SparkContext。
WordCount.scala的程式碼如下。
import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.rdd.HadoopRDD import org.apache.log4j.Logger import org.apache.log4j.Level object wordcount { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.ALL); // 第1步:創建Spark的配置對象SparkConf,設置Spark程式運行時的配置資訊, val conf = new SparkConf().setAppName("My First Spark APP").setMaster("local") // 第2步:創建SparkContext對象 val sc = new SparkContext(conf) // 第3步:根據具體的數據來源來創建RDD val lines = sc.textFile("helloSpark.txt", 1) // 第4步:對初始的RDD進行Transformation級別的處理,如通過map、filter等 val words = lines.flatMap{line=>line.split(" ")} val pairs = words.map{word=>(word,1)} pairs.cache() val wordCountsOdered = pairs.reduceByKey(_+_).saveAsTextFile("wordCountResult.log") // val wordCountsOdered = pairs.reduceByKey(_+_).map( // pair=>(pair._2,pair._1) // ).sortByKey(false).map(pair=>(pair._2,pair._1)) // wordCountsOdered.collect.foreach(wordNumberPair=>println(wordNumberPair._1+" : "+wordNumberPair._2)) // while(true){ // // } sc.stop() } }
在Eclipse中運行wordcount程式碼,日誌顯示如下:
程式一開始,日誌里顯示的是:INFO SparkContext: Running Spark version 2.0.1,日誌中間部分是一些隨著SparkContext創建而創建的對象,另一條比較重要的日誌資訊,作業啟動了並正在運行:INFO SparkContext: Starting job: saveAsTextFile at WordCountJobRuntime.scala:58。
在程式運行的過程中會創建TaskScheduler、DAGScheduler和SchedulerBackend,它們有各自的功能。DAGScheduler是面向Job的Stage的高層調度器;TaskScheduler是底層調度器。SchedulerBackend是一個介面,根據具體的ClusterManager的不同會有不同的實現。程式列印結果後便開始結束。日誌顯示:INFO SparkContext: Successfully stopped SparkContext。
通過這個例子可以感受到Spark程式的運行到處都可以看到SparkContext的存在,我們將SparkContext作為Spark源碼閱讀的入口,來理解Spark的所有內部機制。
我們從一個整體去看SparkContext創建的實例對象。首先,SparkContext構建的頂級三大核心為DAGScheduler、TaskScheduler、SchedulerBackend,其中,DAGScheduler是面向Job的Stage的高層調度器;TaskScheduler是一個介面,是底層調度器,根據具體的ClusterManager的不同會有不同的實現,Standalone模式下具體的實現是TaskSchedulerImpl。SchedulerBackend是一個介面,根據具體的ClusterManager的不同會有不同的實現。Standalone模式下具體的實現是StandaloneSchedulerBackend。下圖為SparkContext整體運行圖
從整個程式運行的角度講,SparkContext包含四大核心對象:DAGScheduler、TaskScheduler、SchedulerBackend、MapOutputTrackerMaster。StandaloneSchedulerBackend有三大核心功能:負責與Master連接,註冊當前程式RegisterWithMaster;接收集群中為當前應用程式分配的計算資源Executor的註冊並管理Executors;負責發送Task到具體的Executor執行。
第一步:程式一開始運行時會實例化SparkContext里的對象,所有不在方法里的成員都會被實例化!一開始實例化時第一個關鍵的程式碼是createTaskScheduler,它位於SparkContext的PrimaryConstructor中,當它實例化時會直接被調用,這個方法返回的是taskScheduler和dagScheduler的實例,然後基於這個內容又構建了DAGScheduler,最後調用taskScheduler的start()方法。要先創建taskScheduler,然後再創建dagScheduler,因為taskScheduler是受dagScheduler管理的。
SparkContext.scala的源碼如下。
// Create and start the scheduler val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode) _schedulerBackend = sched _taskScheduler = ts _dagScheduler = new DAGScheduler(this) _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet) // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's // constructor _taskScheduler.start()
第二步:調用createTaskScheduler,這個方法創建了TaskSchedulerImpl和StandaloneSchedulerBackend,createTaskScheduler方法的第一個入參是SparkContext,傳入的this對象是在應用程式中創建的sc,第二個入參是master的地址。
以下是wordcount.scala創建SparkConf和SparkContext的上下文資訊
// 第1步:創建Spark的配置對象SparkConf,設置Spark程式運行時的配置資訊, val conf = new SparkConf().setAppName("My First Spark APP").setMaster("local") // 第2步:創建SparkContext對象 val sc = new SparkContext(conf)
當SparkContext調用createTaskScheduler方法時,根據集群的條件創建不同的調度器,例如,createTaskScheduler第二個入參master如傳入local參數,SparkContext將創建TaskSchedulerImpl實例及LocalSchedulerBackend實例,在測試程式碼的時候,可以嘗試傳入local[*]或者是local[2]的參數,然後跟蹤程式碼,看看創建了什麼樣的實例對象。
SparkContext中的SparkMasterRegex對象定義不同的正則表達式,從master字元串中根據正則表達式適配master資訊。
SparkContext.scala的源碼如下。
/** * A collection of regexes for extracting information from the master string. */ private object SparkMasterRegex { // Regular expression used for local[N] and local[*] master formats val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r // Regular expression for local[N, maxRetries], used in tests with failing tasks val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r // Regular expression for simulating a Spark cluster of [N, cores, memory] locally val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r // Regular expression for connecting to Spark deploy clusters val SPARK_REGEX = """spark://(.*)""".r // Regular expression for connection to Mesos cluster by mesos:// or mesos://zk:// url val MESOS_REGEX = """mesos://(.*)""".r }
這是設計模式中的策略模式,它會根據實際需要創建出不同的SchedulerBackend的子類。
SparkContext.scala的createTaskScheduler方法的源碼如下。
/** * Create a task scheduler based on a given master URL. * Return a 2-tuple of the scheduler backend and the task scheduler. */ private def createTaskScheduler( sc: SparkContext, master: String, deployMode: String): (SchedulerBackend, TaskScheduler) = { import SparkMasterRegex._ // When running locally, don't try to re-execute tasks on failure. val MAX_LOCAL_TASK_FAILURES = 1 master match { case "local" => val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1) scheduler.initialize(backend) (backend, scheduler)
在實際生產環境下,我們都是用集群模式,即以spark://開頭,此時在程式運行時,框架會創建一個TaskSchedulerImpl和StandaloneSchedulerBackend的實例,在這個過程中也會初始化taskscheduler,把StandaloneSchedulerBackend的實例對象作為參數傳入。StandaloneSchedulerBackend被TaskSchedulerImpl管理,最後返回TaskScheduler和StandaloneSchdeulerBackend。
SparkContext.scala的源碼如下。
case SPARK_REGEX(sparkUrl) => val scheduler = new TaskSchedulerImpl(sc) val masterUrls = sparkUrl.split(",").map("spark://" + _) val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls) scheduler.initialize(backend) (backend, scheduler)
createTaskScheduler方法執行完畢後,調用了taskscheduler.start()方法來正式啟動taskscheduler,這裡雖然調用了taskscheduler.start方法,但實際上是調用了taskSchedulerImpl的start方法,因為taskSchedulerImpl是taskScheduler的子類。
Task默認失敗重試次數是4次,如果任務不容許失敗,就可以調大這個參數。調大spark.task.maxFailures參數有助於確保重要的任務失敗後可以重試多次。
初始化TaskSchedulerImpl:調用createTaskScheduler方法時會初始化TaskSchedulerImpl,然後把StandaloneSchedulerBackend當作參數傳進去,初始化TaskSchedulerImpl時首先是創建一個Pool來初定義資源分布的模式Scheduling Mode,默認是先進先出(FIFO)的模式。
回到taskScheduler start方法,taskScheduler.start方法調用時會再調用schedulerbackend的start方法。
SchedulerBackend包含多個子類,分別是LocalSchedulerBackend、CoarseGrainedScheduler-Backend和StandaloneSchedulerBackend、MesosCoarseGrainedSchedulerBackend、YarnScheduler-Backend。
StandaloneSchedulerBackend的start方法調用了CoarseGraninedSchedulerBackend的start方法,通過StandaloneSchedulerBackend註冊程式把command提交給Master:Command (“org.apache.spark.executor.CoarseGrainedExecutorBackend”, args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)來創建一個StandaloneAppClient的實例。
Master髮指令給Worker去啟動Executor所有的進程時載入的Main方法所在的入口類就是command中的CoarseGrainedExecutorBackend,在CoarseGrainedExecutorBackend中啟動Executor(Executor是先註冊,再實例化),Executor通過執行緒池並發執行Task,然後再調用它的run方法。
回到StandaloneSchedulerBackend.scala的start方法:其中創建了一個很重要的對象,即StandaloneAppClient對象,然後調用它的client.start()方法。
在start方法中創建一個ClientEndpoint對象。
StandaloneAppClient.scala的star方法的源碼如下。
def start() { // Just launch an rpcEndpoint; it will call back into the listener. endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv))) }
ClientEndpoint是一個RpcEndPoint,首先調用自己的onStart方法,接下來向Master註冊。
調用registerWithMaster方法,從registerWithMaster調用tryRegisterAllMasters,開一條新的執行緒來註冊,然後發送一條資訊(RegisterApplication的case class)給Master。
Master收到RegisterApplication資訊後便開始註冊,註冊後再次調用schedule()方法。
四、總結
從SparkContext創建taskSchedulerImpl初始化不同的實例對象來完成最終向Master註冊的任務,中間包括調用scheduler的start方法和創建StandaloneAppClient來間接創建ClientEndPoint完成註冊工作。
我們把SparkContext稱為天堂之門,SparkContext開啟天堂之門:Spark程式是通過SparkContext發布到Spark集群的;SparkContext導演天堂世界:Spark程式的運行都是在SparkContext為核心的調度器的指揮下進行的;SparkContext關閉天堂之門:SparkContext崩潰或者結束的時候整個Spark程式也結束。