在前四篇博文中,我们分析了Job提交运行总流程的第一阶段Stage划分与提交,它又被细化为三个分阶段:

1、Job的调度模型与运行反馈;

2、Stage划分;

3、Stage提交:对应TaskSet的生成。

Stage划分与提交阶段主要是由DAGScheduler完成的,而DAGScheduler负责Job的逻辑调度,主要职责也即DAG图的分解,按照RDD间是否为shuffle dependency,将整个Job划分为一个个stage,并将每个stage转化为tasks的集合--TaskSet。

接下来我们要讲的第二阶段Task调度与执行,则是Spark中Job的物理调度,它实际上分为两个主要阶段:

1、Task调度;

2、Task运行。

下面,我们分析下Task的调度。我们知道,在第一阶段的末尾,stage被提交后,每个stage被转化为一组task的集合--TaskSet,而紧接着,则调用taskScheduler.submitTasks()提交这些tasks,而TaskScheduler的主要职责,则是负责Job物理调度阶段--Task调度。TaskScheduler为scala中的一个trait,你可以简单的把它理解为Java中的接口,目前它仅仅有一个实现类TaskSchedulerImpl。

TaskScheduler负责低层次任务的调度,每个TaskScheduler为一个特定的SparkContext调度tasks。这些调度器获取到由DAGScheduler为每个stage提交至他们的一组Tasks,并负责将这些tasks发送到集群,以执行它们,在它们失败时重试,并减轻掉队情况(类似MapReduce的推测执行原理吧,在这里留个疑问)。这些调度器返回一些事件events给DAGScheduler。其源码如下:

  1. /**
  2. * Low-level task scheduler interface, currently implemented exclusively by
  3. * [[org.apache.spark.scheduler.TaskSchedulerImpl]].
  4. * This interface allows plugging in different task schedulers. Each TaskScheduler schedules tasks
  5. * for a single SparkContext. These schedulers get sets of tasks submitted to them from the
  6. * DAGScheduler for each stage, and are responsible for sending the tasks to the cluster, running
  7. * them, retrying if there are failures, and mitigating stragglers. They return events to the
  8. * DAGScheduler.
  9. *
  10. */
  11. private[spark] trait TaskScheduler {
  12. private val appId = "spark-application-" + System.currentTimeMillis
  13. def rootPool: Pool
  14. def schedulingMode: SchedulingMode
  15. def start(): Unit
  16. // Invoked after system has successfully initialized (typically in spark context).
  17. // Yarn uses this to bootstrap allocation of resources based on preferred locations,
  18. // wait for slave registrations, etc.
  19. def postStartHook() { }
  20. // Disconnect from the cluster.
  21. def stop(): Unit
  22. // Submit a sequence of tasks to run.
  23. def submitTasks(taskSet: TaskSet): Unit
  24. // Cancel a stage.
  25. def cancelTasks(stageId: Int, interruptThread: Boolean)
  26. // Set the DAG scheduler for upcalls. This is guaranteed to be set before submitTasks is called.
  27. def setDAGScheduler(dagScheduler: DAGScheduler): Unit
  28. // Get the default level of parallelism to use in the cluster, as a hint for sizing jobs.
  29. def defaultParallelism(): Int
  30. /**
  31. * Update metrics for in-progress tasks and let the master know that the BlockManager is still
  32. * alive. Return true if the driver knows about the given block manager. Otherwise, return false,
  33. * indicating that the block manager should re-register.
  34. */
  35. def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)],
  36. blockManagerId: BlockManagerId): Boolean
  37. /**
  38. * Get an application ID associated with the job.
  39. *
  40. * @return An application ID
  41. */
  42. def applicationId(): String = appId
  43. /**
  44. * Process a lost executor
  45. */
  46. def executorLost(executorId: String, reason: ExecutorLossReason): Unit
  47. /**
  48. * Get an application's attempt ID associated with the job.
  49. *
  50. * @return An application's Attempt ID
  51. */
  52. def applicationAttemptId(): Option[String]
  53. }

通过源码我们可以知道,TaskScheduler提供了实例化与销毁时必要的start()和stop()方法,并提供了提交Tasks与取消Tasks的submitTasks()和cancelTasks()方法,并且通过executorHeartbeatReceived()周期性的接收executor的心跳,更新运行中tasks的元信息,并让master知晓BlockManager仍然存活。

好了,结合源码,我们一步步来看吧。

首先,在DAGScheduler的submitMissingTasks()方法的最后,每个stage生成一组tasks后,即调用用TaskScheduler的submitTasks()方法提交task,代码如下:

  1. // 利用taskScheduler.submitTasks()提交task
  2. taskScheduler.submitTasks(new TaskSet(
  3. tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
  4. // 记录提交时间
  5. stage.latestInfo.submissionTime = Some(clock.getTimeMillis())

那么我们先来看下TaskScheduler的submitTasks()方法,在其实现类TaskSchedulerImpl中,代码如下:

  1. override def submitTasks(taskSet: TaskSet) {
  2. // 获取TaskSet中的tasks
  3. val tasks = taskSet.tasks
  4. logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
  5. // 使用synchronized进行同步
  6. this.synchronized {
  7. // 创建TaskSetManager
  8. val manager = createTaskSetManager(taskSet, maxTaskFailures)
  9. // 获取taskSet对应的stageId
  10. val stage = taskSet.stageId
  11. // taskSetsByStageIdAndAttempt存储的是stageId->[taskSet.stageAttemptId->TaskSetManager]
  12. // 更新taskSetsByStageIdAndAttempt,将上述对应关系存入
  13. val stageTaskSets =
  14. taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
  15. stageTaskSets(taskSet.stageAttemptId) = manager
  16. // 查看是否存在冲突的taskSet,如果存在,抛出IllegalStateException异常
  17. val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
  18. ts.taskSet != taskSet && !ts.isZombie
  19. }
  20. if (conflictingTaskSet) {
  21. throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
  22. s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
  23. }
  24. // 将TaskSetManager添加到schedulableBuilder中
  25. schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
  26. // 如果不是本地任务,且不再接受任务
  27. if (!isLocal && !hasReceivedTask) {
  28. starvationTimer.scheduleAtFixedRate(new TimerTask() {
  29. override def run() {
  30. if (!hasLaunchedTask) {
  31. logWarning("Initial job has not accepted any resources; " +
  32. "check your cluster UI to ensure that workers are registered " +
  33. "and have sufficient resources")
  34. } else {
  35. this.cancel()
  36. }
  37. }
  38. }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
  39. }
  40. // 设置标志位hasReceivedTask为true
  41. hasReceivedTask = true
  42. }
  43. // 最后调用SchedulerBackend的reviveOffers()
  44. backend.reviveOffers()
  45. }

该方法首先从入参TaskSet中获取tasks;

接下来,在synchronized同步代码块内,主要完成以下几件事:

1、创建TaskSetManager,TaskSetManager主要用来干什么呢,后面我们会分析;

2、通过taskSet获取stageId;

3、更新数据结构taskSetsByStageIdAndAttempt,将映射关系stageId->[taskSet.stageAttemptId->TaskSetManager]存入,这里的TaskSetManager就是上面创建的TaskSetManager,taskSet.stageAttemptId是怎么赋值的呢?为了保证叙述的完整性,还是先留个小小的疑问吧;

4、查看是否存在冲突的taskSet,如果存在,抛出IllegalStateException异常;

5、将TaskSetManager添加到schedulableBuilder中;

6、最后调用SchedulerBackend的reviveOffers()。

下面慢慢分析上述流程,首先这个TaskSetManager是干什么呢?通过名字可以简单的推论出,它是TaskSet的管理者,主要在TaskSchedulerImpl中调度同一个TaskSet中的tasks。该类追踪每个task,当它们失败时重试(直到限制的最大次数),并通过延迟调度处理位置感知调度。该类最主要的接口就是resourceOffer()方法,该方法会询问TaskSet,它是否想要在一个节点上运行一个task,并在TaskSet中的task状态变更时通知它(比如完成等)。

现在再来看下taskSet的stageAttemptId,在DAGScheduler的submitMissingTasks()方法中调用TaskScheduler的submitTasks()方法提交task,构造TaskSet对象时,赋值给TaskSet的stageAttemptId字段的是stage.latestInfo.attemptId。而Stage的latestInfo是这样定义的:

  1. /** Returns the StageInfo for the most recent attempt for this stage. */
  2. // 返回该stage的最新尝试attempt的StageInfo
  3. def latestInfo: StageInfo = _latestInfo

即它是由_latestInfo来赋值的,那么_latestInfo呢?代码如下:

  1. /**
  2. * Pointer to the [StageInfo] object for the most recent attempt. This needs to be initialized
  3. * here, before any attempts have actually been created, because the DAGScheduler uses this
  4. * StageInfo to tell SparkListeners when a job starts (which happens before any stage attempts
  5. * have been created).
  6. * 指向stage最新一次尝试的StageInfo对象。
  7. * 在任何尝试实际发生之前,都需要在这里被初始化,因为当一个Job启动时(任何stage尝试发生时)DAGScheduler使用
  8. * 这个StageInfo告诉SparkListeners。
  9. */
  10. private var _latestInfo: StageInfo = StageInfo.fromStage(this, nextAttemptId)

具体初始化过程我们不做过多讨论,我们只要知道,StageInfo中存在一个成员变量attemptId即可,而这个成员变量就是上面我们所说的taskSet的stageAttemptId。而StageInfo中attemptId的值,则是由Stage中nextAttemptId的值确定的,其定义如下:

  1. /** The ID to use for the next new attempt for this stage. */
  2. // 该stage下一次新尝试的id
  3. private var nextAttemptId: Int = 0

而它值的变化是怎么样的呢?答案就在Stage的makeNewStageAttempt()方法中,代码如下:

  1. /** Creates a new attempt for this stage by creating a new StageInfo with a new attempt ID. */
  2. // 通过用一个最新的nextAttemptId创建的StageInfo对象来创建该stage的最新的一次尝试
  3. def makeNewStageAttempt(
  4. numPartitionsToCompute: Int,
  5. taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = {
  6. // 构造_latestInfo
  7. _latestInfo = StageInfo.fromStage(
  8. this, nextAttemptId, Some(numPartitionsToCompute), taskLocalityPreferences)
  9. // nextAttemptId自增
  10. nextAttemptId += 1
  11. }

什么时候调用makeNewStageAttempt()方法呢?还记得《Spark源码分析之Stage提交》一文的最后,真正提交stage的方法submitMissingTasks()中第6步,标记新的stage attempt,并发送一个SparkListenerStageSubmitted事件吗,代码如下:

  1. // 标记新的stage attempt
  2. stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)
  3. // 发送一个SparkListenerStageSubmitted事件
  4. listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))

也就是说,在每次提交stage时,即会调用该方法,创建一个新的_latestInfo对象,并对nextAttemptId进行自增。

好了,言归正传,继续往下看。第5步便是将TaskSetManager添加到schedulableBuilder中,那么这里就有两个问题:

1、schedulableBuilder是什么?

2、为什么要将TaskSetManager添加到schedulableBuilder中呢?

我们首先看下schedulableBuilder的定义及初始化。其定义代码如下:

  1. var schedulableBuilder: SchedulableBuilder = null

而它的初始化则是在TaskSchedulerImpl的initialize()方法中。如下:

  1. // 初始化
  2. def initialize(backend: SchedulerBackend) {
  3. // 赋值SchedulerBackend
  4. this.backend = backend
  5. // temporarily set rootPool name to empty
  6. // 临时将rootPool的名字设置为空
  7. rootPool = new Pool("", schedulingMode, 0, 0)
  8. // 调度构造器,分两种,FIFO和FAIR
  9. schedulableBuilder = {
  10. schedulingMode match {
  11. case SchedulingMode.FIFO =>
  12. new FIFOSchedulableBuilder(rootPool)
  13. case SchedulingMode.FAIR =>
  14. new FairSchedulableBuilder(rootPool, conf)
  15. }
  16. }
  17. schedulableBuilder.buildPools()
  18. }

这个方法同时也初始化了TaskSchedulerImpl中SchedulerBackend类型的backend对象,这个对象在最后一步会用到,我们稍后再说。

继续看schedulableBuilder,通过代码我们就能知道,这个schedulableBuilder是调度构造器,分FIFO和FAIR两种。至于这两种构造器的含义和区别,我们以后再分析。下面看下SchedulableBuilder的源码:

  1. /**
  2. * An interface to build Schedulable tree
  3. * buildPools: build the tree nodes(pools)
  4. * addTaskSetManager: build the leaf nodes(TaskSetManagers)
  5. */
  6. private[spark] trait SchedulableBuilder {
  7. def rootPool: Pool
  8. def buildPools()
  9. def addTaskSetManager(manager: Schedulable, properties: Properties)
  10. }

从上面的英文注释我们就能知道,SchedulableBuilder是一个构造调度树的接口,它提供了一个成员变量Pool类型的rootPool和两个主要方法:
        1、buildPools()方法:构造调度树节点(调度池);

2、addTaskSetManager()方法:构造叶子节点(TaskSetManagers)。

下面,我们以FIFOSchedulableBuilder为例,简单说下。FIFOSchedulableBuilder中buildPools()是个空方法,没什么可说的,我们主要分析下它的buildPools()方法,代码如下:

  1. override def addTaskSetManager(manager: Schedulable, properties: Properties) {
  2. rootPool.addSchedulable(manager)
  3. }

可以看到,它实际上是调用的Pool的addSchedulable()方法。继续追踪:

  1. override def addSchedulable(schedulable: Schedulable) {
  2. require(schedulable != null)
  3. // 将schedulable加入到schedulableQueue队列,队列为ConcurrentLinkedQueue类型
  4. schedulableQueue.add(schedulable)
  5. // 将schedulable的name与schedulable的对应关系添加到schedulableNameToSchedulable集合,集合为ConcurrentHashMap类型
  6. schedulableNameToSchedulable.put(schedulable.name, schedulable)
  7. // 将this赋值给schedulable的parent,即形成schedulable为this子节点(即截至目前时点的叶子节点)的树形结构
  8. schedulable.parent = this
  9. }

而翻看TaskSetManager的源码可以知道,TaskSetManager就实现了Schedulable这个trait(特质,类似java的接口),也就意味着TaskSetManager是可以被调度的,这也就回答了上面的问题2。

好了,我们继续看最后一步,调用SchedulerBackend的reviveOffers()。问题又来了,问题不断啊。

1、SchedulerBackend是什么?

2、SchedulerBackend如何被初始化?

3、SchedulerBackend的reviveOffers()到底做了什么?

带着问题去学习终究是好的,它让我们有了暂时的目标。下面,我们一步步来分析。

SchedulerBackend是Spark中一个可插拔组件,可插拔意味着它可以有多种实现方式,后续我们会概略讲讲。按照字面意思,它就是调度器的一个后台服务或者实现,其主要作用就是在物理机器或者说worker就绪后,能够提供其上的资源并将tasks加载到那些机器或者worker上。

上文中我们已经预先提到过,在TaskSchedulerImpl的initialize()方法初始化schedulableBuilder时,同时也初始化了SchedulerBackend,即:

  1. // 赋值SchedulerBackend
  2. this.backend = backend

这个SchedulerBackend是被传递进来的,那么这时我们就要追溯到TaskSchedulerImpl实例化的时候了。在Spark应用环境的初始化时,其上下文信息SparkContext中存在以下代码:

  1. // Create and start the scheduler
  2. val (sched, ts) = SparkContext.createTaskScheduler(this, master)
  3. _schedulerBackend = sched
  4. _taskScheduler = ts

createTaskScheduler()方法主要就是根据给定的Master URL创建一个TaskScheduler。大致代码如下:

  1. /**
  2. * Create a task scheduler based on a given master URL.
  3. * Return a 2-tuple of the scheduler backend and the task scheduler.
  4. * 根据给定的Master URL创建一个TaskScheduler。
  5. */
  6. private def createTaskScheduler(
  7. sc: SparkContext,
  8. master: String): (SchedulerBackend, TaskScheduler) = {
  9. import SparkMasterRegex._
  10. // When running locally, don't try to re-execute tasks on failure.
  11. val MAX_LOCAL_TASK_FAILURES = 1
  12. master match {
  13. case "local" =>
  14. val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
  15. val backend = new LocalBackend(sc.getConf, scheduler, 1)
  16. scheduler.initialize(backend)
  17. (backend, scheduler)
  18. case LOCAL_N_REGEX(threads) =>
  19. def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
  20. // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.
  21. val threadCount = if (threads == "*") localCpuCount else threads.toInt
  22. if (threadCount <= 0) {
  23. throw new SparkException(s"Asked to run locally with $threadCount threads")
  24. }
  25. val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
  26. val backend = new LocalBackend(sc.getConf, scheduler, threadCount)
  27. scheduler.initialize(backend)
  28. (backend, scheduler)
  29. case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
  30. def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
  31. // local[*, M] means the number of cores on the computer with M failures
  32. // local[N, M] means exactly N threads with M failures
  33. val threadCount = if (threads == "*") localCpuCount else threads.toInt
  34. val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
  35. val backend = new LocalBackend(sc.getConf, scheduler, threadCount)
  36. scheduler.initialize(backend)
  37. (backend, scheduler)
  38. // Standalone模式
  39. case SPARK_REGEX(sparkUrl) =>
  40. // 初始化TaskSchedulerImpl实例scheduler
  41. val scheduler = new TaskSchedulerImpl(sc)
  42. val masterUrls = sparkUrl.split(",").map("spark://" + _)
  43. // 初始化一个SparkDeploySchedulerBackend实例backend
  44. val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
  45. // 调用TaskSchedulerImpl的initialize()方法,
  46. // 为其成员变量SchedulerBackend赋值SparkDeploySchedulerBackend
  47. scheduler.initialize(backend)
  48. // 返回backend和scheduler
  49. (backend, scheduler)
  50. case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
  51. // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
  52. val memoryPerSlaveInt = memoryPerSlave.toInt
  53. if (sc.executorMemory > memoryPerSlaveInt) {
  54. throw new SparkException(
  55. "Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(
  56. memoryPerSlaveInt, sc.executorMemory))
  57. }
  58. val scheduler = new TaskSchedulerImpl(sc)
  59. val localCluster = new LocalSparkCluster(
  60. numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)
  61. val masterUrls = localCluster.start()
  62. val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
  63. scheduler.initialize(backend)
  64. backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => {
  65. localCluster.stop()
  66. }
  67. (backend, scheduler)
  68. case "yarn-standalone" | "yarn-cluster" =>
  69. if (master == "yarn-standalone") {
  70. logWarning(
  71. "\"yarn-standalone\" is deprecated as of Spark 1.0. Use \"yarn-cluster\" instead.")
  72. }
  73. val scheduler = try {
  74. val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnClusterScheduler")
  75. val cons = clazz.getConstructor(classOf[SparkContext])
  76. cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
  77. } catch {
  78. // TODO: Enumerate the exact reasons why it can fail
  79. // But irrespective of it, it means we cannot proceed !
  80. case e: Exception => {
  81. throw new SparkException("YARN mode not available ?", e)
  82. }
  83. }
  84. val backend = try {
  85. val clazz =
  86. Utils.classForName("org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend")
  87. val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
  88. cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
  89. } catch {
  90. case e: Exception => {
  91. throw new SparkException("YARN mode not available ?", e)
  92. }
  93. }
  94. scheduler.initialize(backend)
  95. (backend, scheduler)
  96. case "yarn-client" =>
  97. val scheduler = try {
  98. val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnScheduler")
  99. val cons = clazz.getConstructor(classOf[SparkContext])
  100. cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
  101. } catch {
  102. case e: Exception => {
  103. throw new SparkException("YARN mode not available ?", e)
  104. }
  105. }
  106. val backend = try {
  107. val clazz =
  108. Utils.classForName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend")
  109. val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
  110. cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
  111. } catch {
  112. case e: Exception => {
  113. throw new SparkException("YARN mode not available ?", e)
  114. }
  115. }
  116. scheduler.initialize(backend)
  117. (backend, scheduler)
  118. case MESOS_REGEX(mesosUrl) =>
  119. MesosNativeLibrary.load()
  120. val scheduler = new TaskSchedulerImpl(sc)
  121. val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", defaultValue = true)
  122. val backend = if (coarseGrained) {
  123. new CoarseMesosSchedulerBackend(scheduler, sc, mesosUrl, sc.env.securityManager)
  124. } else {
  125. new MesosSchedulerBackend(scheduler, sc, mesosUrl)
  126. }
  127. scheduler.initialize(backend)
  128. (backend, scheduler)
  129. case SIMR_REGEX(simrUrl) =>
  130. val scheduler = new TaskSchedulerImpl(sc)
  131. val backend = new SimrSchedulerBackend(scheduler, sc, simrUrl)
  132. scheduler.initialize(backend)
  133. (backend, scheduler)
  134. case zkUrl if zkUrl.startsWith("zk://") =>
  135. logWarning("Master URL for a multi-master Mesos cluster managed by ZooKeeper should be " +
  136. "in the form mesos://zk://host:port. Current Master URL will stop working in Spark 2.0.")
  137. createTaskScheduler(sc, "mesos://" + zkUrl)
  138. case _ =>
  139. throw new SparkException("Could not parse Master URL: '" + master + "'")
  140. }
  141. }

可以看出,它是根据Spark的部署模式来确定创建何种TaskScheduler及SchedulerBackend的。我们就以常见的Standalone模式来说明,关键代码如下:

  1. // Standalone模式
  2. case SPARK_REGEX(sparkUrl) =>
  3. // 初始化TaskSchedulerImpl实例scheduler
  4. val scheduler = new TaskSchedulerImpl(sc)
  5. val masterUrls = sparkUrl.split(",").map("spark://" + _)
  6. // 初始化一个SparkDeploySchedulerBackend实例backend
  7. val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
  8. // 调用TaskSchedulerImpl的initialize()方法,
  9. // 为其成员变量SchedulerBackend赋值SparkDeploySchedulerBackend
  10. scheduler.initialize(backend)
  11. // 返回backend和scheduler
  12. (backend, scheduler)

Standalone模式模式中,TaskScheduler的实现为TaskSchedulerImpl,而SchedulerBackend的实现为SparkDeploySchedulerBackend,并且在TaskScheduler生成后,随即调用其initialize()方法完成了初始化,也就确定了SchedulableBuilder和SchedulerBackend。

至此,前两个是什么以及如何初始化的问题我们都已得到答案,下面再看最后一个关于做什么的问题:SchedulerBackend的reviveOffers()到底做了什么?还是以Standalone模式来说明。SparkDeploySchedulerBackend中没有提供此方法,我们只能寄希望于其父类CoarseGrainedSchedulerBackend,果不其然,在CoarseGrainedSchedulerBackend中我们找到了reviveOffers()方法。但是,代码很简单:

  1. override def reviveOffers() {
  2. // 调用driverEndpoint的send()方法,发送一个ReviveOffers消息
  3. driverEndpoint.send(ReviveOffers)
  4. }

我们继续看driverEndpoint是什么鬼。driverEndpoint是RPC中driver端Endpoint的引用,其类型为RpcEndpointRef。在CoarseGrainedSchedulerBackend启动时的start()方法中,对driverEndpoint进行了赋值:

  1. // TODO (prashant) send conf instead of properties
  2. driverEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))

这个RpcEnv只是一个抽象类,它有两种实现,一个是基于AKKa的AkkaRpcEnv,另外一个则是基于Netty的NettyRpcEnv,默认的实现是Netty。通过下述RpcEnv的代码即可看出:

  1. private def getRpcEnvFactory(conf: SparkConf): RpcEnvFactory = {
  2. // 两种实现方式:
  3. // akka:org.apache.spark.rpc.akka.AkkaRpcEnvFactory
  4. // netty:org.apache.spark.rpc.netty.NettyRpcEnvFactory
  5. val rpcEnvNames = Map(
  6. "akka" -> "org.apache.spark.rpc.akka.AkkaRpcEnvFactory",
  7. "netty" -> "org.apache.spark.rpc.netty.NettyRpcEnvFactory")
  8. // 通过参数spark.rpc配置,默认为netty
  9. val rpcEnvName = conf.get("spark.rpc", "netty")
  10. val rpcEnvFactoryClassName = rpcEnvNames.getOrElse(rpcEnvName.toLowerCase, rpcEnvName)
  11. Utils.classForName(rpcEnvFactoryClassName).newInstance().asInstanceOf[RpcEnvFactory]
  12. }

下面,我们就看下Netty的概要实现,在NettyRpcEnv的setupEndpoint()方法中:

  1. override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {
  2. // 调用Dispatcher的registerRpcEndpoint()方法完成注册
  3. dispatcher.registerRpcEndpoint(name, endpoint)
  4. }

它是通过dispatcher来完成endpoint注册的,name为“CoarseGrainedScheduler”,RpcEndpoint为CoarseGrainedSchedulerBackend中通过createDriverEndpoint()方法创建的DriverEndpoint对象。代码如下:

  1. protected def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
  2. new DriverEndpoint(rpcEnv, properties)
  3. }

那么这个DriverEndpoint是什么类呢?我们发现它继承自ThreadSafeRpcEndpoint,继而继承RpcEndpoint这个类。这里,我们只要知道这个RpcEndpoint是进程间消息传递调用的一个端点,定义了消息触发的函数。当一个消息到来时,方法调用顺序为  onStart, receive, onStop。它的生命周期为constructor -> onStart -> receive* -> onStop。

为什么要用RpcEndpoint呢?很简单,Task的调度与执行是在一个分布式集群上进行的,自然需要进程间的通讯。

继续分析,那么上面提到的driverEndpoint是如何赋值的呢?我们继续看Dispatcher的registerRpcEndpoint()方法,因为最终是由它向上返回RpcEndpointRef来完成driverEndpoint的赋值的。代码如下:

  1. // 注册RpcEndpoint
  2. // name为“Master”,endpoint为Master对象
  3. def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
  4. // 创建RpcEndpointAddress
  5. val addr = RpcEndpointAddress(nettyEnv.address, name)
  6. // 创建NettyRpcEndpointRef
  7. val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
  8. // 同步代码块
  9. synchronized {
  10. if (stopped) {
  11. throw new IllegalStateException("RpcEnv has been stopped")
  12. }
  13. // ConcurrentHashMap的putIfAbsent()方法确保不会重复创建EndpointData
  14. if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {
  15. throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
  16. }
  17. val data = endpoints.get(name)
  18. endpointRefs.put(data.endpoint, data.ref)
  19. receivers.offer(data)  // for the OnStart message
  20. }
  21. endpointRef
  22. }

返回的RpcEndpointRef为NettyRpcEndpointRef类型,而RpcEndpointRef则是一个远程RpcEndpoint的引用,通过它可以给远程RpcEndpoint发送消息,可以是同步可以是异步,它映射一个地址。这么看来,我们在远端(ps:另外的机器或者进程)注册了一个RpcEndpoint,即DriverEndpoint,而在本地端(当前机器或者进程)则持有一个RpcEndpoint的引用,即NettyRpcEndpointRef,可以由它来往远端发送消息,那么发送的是什么消息呢?我们现在返回CoarseGrainedSchedulerBackend中的reviveOffers()方法,发现发送的是ReviveOffers消息。这里只是发送,具体处理还要看远端的RpcEndpoint,即DriverEndpoint。通过上面我们可以知道,RpcEndpoint的服务流程为onStart()-->receive()--> onStop(),每当消息来临时,DriverEndpoint都会调用receive()方法来处理。关键代码如下:

  1. // 如果是ReviveOffers事件,则调用makeOffers()方法
  2. case ReviveOffers =>
  3. makeOffers()

继续追踪其makeOffers()方法,代码如下:

  1. // Make fake resource offers on all executors
  2. // 在所有的executors上提供假的资源(抽象的资源,也就是资源的对象信息,我是这么理解的)
  3. private def makeOffers() {
  4. // Filter out executors under killing
  5. // 过滤掉under killing的executors
  6. val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
  7. // 获取workOffers,即资源
  8. val workOffers = activeExecutors.map { case (id, executorData) =>
  9. // 创建WorkerOffer对象
  10. new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
  11. }.toSeq
  12. // 调用scheduler的resourceOffers()方法,分配资源
  13. // 调用launchTasks()方法,启动tasks
  14. launchTasks(scheduler.resourceOffers(workOffers))
  15. }

好了,留个尾巴,明天再继续分析吧~

博客原地址:http://blog.csdn.net/lipeng_bigdata/article/details/50687992

最新文章

  1. UIScrollView的其他属性
  2. .NET LINQ查询语法与方法语法
  3. Appium移动自动化测试之安装Appium
  4. javascript设计模式6
  5. 安卓开发之APK安装之后自动在桌面上创建快捷图标
  6. -force_load
  7. BT是如何下载的
  8. 不要用for循环去遍历LinkedList
  9. Zabbix-server 3.4 安装详细和修改web界面中文出现的乱码(一)
  10. class-dump 安装使用详解
  11. 空间、域名与IP之间的关系?
  12. [PHP] 数据结构-反转链表PHP实现
  13. ASP.NET MVC4分页Site.CSS
  14. Hadoop框架之HDFS的shell操作
  15. MySQL知识小结
  16. vuex数据管理-数据适配
  17. (转)LVS安装使用详解
  18. FreeMarker自定义TemplateDirectiveModel
  19. mysql(什么是关系型数据库?)
  20. Tornado服务端基本的配置文件(Python)

热门文章

  1. AtCoder Regular Contest 090 F - Number of Digits
  2. Scala不使用null 而使用Option,None,Some的好处
  3. NOIP 2016 天天爱跑步 80分暴力
  4. JS-JavaScript String 对象-string对象方法1:fromCharCode()、charCodeAt()
  5. Guava源码学习(零)前言
  6. 分享Kali Linux 2017.1镜像
  7. luogu P3092 [USACO13NOV]没有找零No Change
  8. springboot 2.0 整合 同时支持jsp+html跳转
  9. BUG_ON&amp;&amp;WARN_ON&amp;BUILD_BUG_ON
  10. cordova热更新插件调试