SparkContext源码解读

在SparkContext中主要做几件事情,如下:

1 创建TaskSceduler

val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)

TaskSceduler在具体创建的时候,是由不同发布模式比如standalone、yarn、mesos决定的,返回一个SchedulerBackend.

private def createTaskScheduler(      sc: SparkContext,      master: String,      deployMode: String): (SchedulerBackend, TaskScheduler) = {    import SparkMasterRegex._      // When running locally, don't try to re-execute tasks on failure.    val MAX_LOCAL_TASK_FAILURES = 1      master match {      case "local" =>        val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)        val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)        scheduler.initialize(backend)        (backend, scheduler)        case LOCAL_N_REGEX(threads) =>        def localCpuCount: Int = Runtime.getRuntime.availableProcessors()        // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.        val threadCount = if (threads == "*") localCpuCount else threads.toInt        if (threadCount <= 0) {          throw new SparkException(s"Asked to run locally with $threadCount threads")        }        val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)        val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)        scheduler.initialize(backend)        (backend, scheduler)        case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>        def localCpuCount: Int = Runtime.getRuntime.availableProcessors()        // local[*, M] means the number of cores on the computer with M failures        // local[N, M] means exactly N threads with M failures        val threadCount = if (threads == "*") localCpuCount else threads.toInt        val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)        val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)        scheduler.initialize(backend)        (backend, scheduler)        //Standalone模式      case SPARK_REGEX(sparkUrl) =>        val scheduler = new TaskSchedulerImpl(sc)        val masterUrls = sparkUrl.split(",").map("spark://" + _)        val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)        scheduler.initialize(backend)        (backend, scheduler)        case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>        // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.        val memoryPerSlaveInt = memoryPerSlave.toInt        if (sc.executorMemory > memoryPerSlaveInt) {          throw new SparkException(            "Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(              memoryPerSlaveInt, sc.executorMemory))        }          val scheduler = new TaskSchedulerImpl(sc)        val localCluster = new LocalSparkCluster(          numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)        val masterUrls = localCluster.start()        val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)        scheduler.initialize(backend)        backend.shutdownCallback = (backend: StandaloneSchedulerBackend) => {          localCluster.stop()        }        (backend, scheduler)        case MESOS_REGEX(mesosUrl) =>        MesosNativeLibrary.load()        val scheduler = new TaskSchedulerImpl(sc)        val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", defaultValue = true)        val backend = if (coarseGrained) {          new MesosCoarseGrainedSchedulerBackend(scheduler, sc, mesosUrl, sc.env.securityManager)        } else {          new MesosFineGrainedSchedulerBackend(scheduler, sc, mesosUrl)        }        scheduler.initialize(backend)        (backend, scheduler)        //对其他资源管理方式(除了local和standelone{spark://}外的mesos,yarn,kubernetes【外部资源管理器】的资源管理方式)的处理。      case masterUrl =>        //这句很重要,实现了外部资源管理器,具体实现访问为Yarn的YarnClusterManager        val cm = getClusterManager(masterUrl) match {          case Some(clusterMgr) => clusterMgr          case None => throw new SparkException("Could not parse Master URL: '" + master + "'")        }        try {          val scheduler = cm.createTaskScheduler(sc, masterUrl)          val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)          cm.initialize(scheduler, backend)          (backend, scheduler)        } catch {          case se: SparkException => throw se          case NonFatal(e) =>            throw new SparkException("External scheduler cannot be instantiated", e)        }    }

SparkContext这里启动TaskSchelerImpl之后,具体进行初始化任务的时候是通过以下代码调用的:

scheduler.initialize(backend)

TaskScheduler的Initalize方法:

def initialize(backend: SchedulerBackend) {    this.backend = backend    // temporarily set rootPool name to empty    rootPool = new Pool("", schedulingMode, 0, 0)    schedulableBuilder = {      schedulingMode match {        case SchedulingMode.FIFO =>          new FIFOSchedulableBuilder(rootPool)        case SchedulingMode.FAIR =>          new FairSchedulableBuilder(rootPool, conf)        case _ =>          throw new IllegalArgumentException(s"Unsupported spark.scheduler.mode: $schedulingMode")      }    }    schedulableBuilder.buildPools()  }

TaskScheluerImpl的一个重要功能就是决定job的调度顺序,启动任务推测执行机制。

另外在SparkContext中还有个地方调用TaskScheluerImpl方法:

_taskScheduler.start()

调用的方法如下:

override def start() {    //调用SchedulerBackend.start()方法    backend.start()    //在这里会判断spark的推测执行机制    if (!isLocal && conf.getBoolean("spark.speculation", false)) {      logInfo("Starting speculative execution thread")      speculationScheduler.scheduleAtFixedRate(new Runnable {        override def run(): Unit = Utils.tryOrStopSparkContext(sc) {          checkSpeculatableTasks()        }      }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)    }  }