前言

  Spark会将用户提交的作业看作一个job,在提交的时候首先将job转换为一系列的RDD,并按照RDD之间的依赖关系构建DAG(有向无环图),DAGScheduler会按照RDD依赖的不同将DAG划分为不同的stage,每个stage内部按照RDD分区数创建多个task,最后将task封装成taskSets发送到TaskScheduler调度执行。

  RDD依赖

  窄依赖(NarrowDependency):下游RDD与上游RDD的分区是一一对应的关系;

  宽依赖(ShuffleDependency):下游RDD的一个分区对应上游RDD的多个分区。

  Partitioner

  如果RDD间的依赖关系是Shuffle依赖,spark通过分区计算器partitioner来确定上游RDD的分区数据输出到下游RDD的哪个分区。

  spark大部分API默认采用HashPartitioner,将key的hashCode值与分区数进行取模运算后得到该key的分区下标;HashPartitioner的劣势:Java中数组的hashCode是基于数组对象本身的,如果key的数据类型为数组,数组内容一样的key可能会被分到不同的分区。

  RangePartitioner:先从整个RDD中抽取样本并排序取得一个Array[key]类型的数组变量rangeBounds,判断key在rangeBounds中的范围,得到该key在下游RDD中分区的下标。

  当然也可以自定义partitioner,写一个类继承自抽象类Partitioner并重写numPartitions、getPartition和equals方法即可。

  DAGScheduler

  DAGScheduler会将DAG中的RDD按照依赖类型划分为不同的stage,并构建这些stage的依赖关系,让没有依赖的stage并行计算,有依赖的stage串行运算,提升资源使用率和运行效率;stage分为最下游的ResultStage和需要shuffle的ShuffleMapStage。

  1、JobListener & JobWaiter & ActiveJob & DAGSchedulerEventProcessLoop

  特质JobListener用于对作业中每个task执行结果进行监听,JobWaiter实现了JobListener并最终确定作业的成功或失败;ActiveJob表示已经激活的Job;DAGSchedulerEventProcessLoop是DAGScheduler内部的事件循环处理器,spark组件都通过向DAGScheduler投递DAGSchedulerEvent来使用DAGScheduler,DAGSchedulerEventProcessLoop将处理这些Event并调用DAGScheduler的不同方法,它内部有三个方法:doOnReceive()用于调用DAGScheduler的方法处理不同的事件、onError()和onStop()。

  2、提交Job

  spark程序中每触发一次action算子,就会提交一次job,job会被转换成一系列RDD并调用DAGScheduler.runJob方法,内部调用submitJob方法。

def runJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): Unit = {
//生成运行job的启动时间
val start = System.nanoTime
//提交job,submitJob是异步执行的,所以会立刻返回JobWaiter对象
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
//等待job处理,如果成功打印日志,如果失败打印日志并抛出异常
ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
waiter.completionFuture.value.get match {
case scala.util.Success(_) =>
logInfo("Job %d finished: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
case scala.util.Failure(exception) =>
logInfo("Job %d failed: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
// SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
val callerStackTrace = Thread.currentThread().getStackTrace.tail
exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
throw exception
}
}

  submitJob方法内部根据job创建jobWaiter,并向DAGSchedulerEventProcessLoop发送jobSubmitted事件。

def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] = {
// Check to make sure we are not launching a task on a partition that does not exist.
//获取当前job的最大分区数并检查,如果有不存在的分区则抛出异常
val maxPartitions = rdd.partitions.length
partitions.find(p => p >= maxPartitions || p < ).foreach { p =>
throw new IllegalArgumentException(
"Attempting to access a non-existent partition: " + p + ". " +
"Total number of partitions: " + maxPartitions)
}
//生成下一个job的ID
val jobId = nextJobId.getAndIncrement()
//如果job的分区数为0,创建一个totalTasks属性为0的JobWaiter并返回,JobWaiter会将jobPromise设置为Success表示job处理成功
if (partitions.size == ) {
// Return immediately if the job is running 0 tasks
return new JobWaiter[U](this, jobId, , resultHandler)
}
//如果job的分区数大于0,创建一个等待执行完成的JobWaiter,向DAGSchedulerEventProcessLoop发送JobSubmited事件
assert(partitions.size > )
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
//返回JobWaiter对象
waiter
}

  DAGSchedulerEventProcessLoop接收到JobSubmited事件后会调用DAGScheduler的handleJobSubmited方法,创建ResultStage并提交,实现如下

private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
var finalStage: ResultStage = null
//创建ResultStage,期间可能会发生异常,比如运行在HadoopRDD上的任务依赖的HDFS文件被删除了,异常发生时需要调用JobWaiter的jobFailed方法
try {
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
...
} // Job submitted, clear internal data.
barrierJobIdToNumTasksCheckFailures.remove(jobId)
//创建ActiveJob
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
//清除cacheLocs缓存中RDD的分区位置信息
clearCacheLocs()
logInfo("Got job %s (%s) with %d output partitions".format(
job.jobId, callSite.shortForm, partitions.length))
logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage)) //生成job提交的时间
val jobSubmissionTime = clock.getTimeMillis()
//将jobID与刚创建的activeJob之间的对应关系放入jobIdToActiveJob中
jobIdToActiveJob(jobId) = job
//将刚创建的activeJob放入activeJobs集合
activeJobs += job
//让finalStage的_activeJob属性持有刚创建的activeJob
finalStage.setActiveJob(job)
//获取当前job的所有stage对应的stageinfo
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
//向LiveListenerBus投递sparkListenerJobStart事件,缓存stageId
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
//提交ResultStage
submitStage(finalStage)
}

 3、构建Stage

private def createResultStage(
rdd: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
jobId: Int,
callSite: CallSite): ResultStage = {
//RDD的一些初始检查
checkBarrierStageWithDynamicAllocation(rdd)
checkBarrierStageWithNumSlots(rdd)
checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
//获取所有父stage的列表
val parents = getOrCreateParentStages(rdd, jobId)
//生成resultStage的标识
val id = nextStageId.getAndIncrement()
//创建ResultStage
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
//将ResultStage与它的ID对应关系添加到stageIdToStage缓存中
stageIdToStage(id) = stage
//更新job的身份标识与ResultStage的映射关系
updateJobIdStageIdMaps(jobId, stage)
stage
} //获取或创建给定RDD的所有父stage,这些stage将被分配给jobId对应的job
private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
getShuffleDependencies(rdd).map { shuffleDep =>
getOrCreateShuffleMapStage(shuffleDep, firstJobId)
}.toList
} //获取给定RDD直接父类的shuffle依赖
private[scheduler] def getShuffleDependencies(
rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
val parents = new HashSet[ShuffleDependency[_, _, _]]
val visited = new HashSet[RDD[_]]
val waitingForVisit = new ArrayStack[RDD[_]]
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
val toVisit = waitingForVisit.pop()
if (!visited(toVisit)) {
visited += toVisit
toVisit.dependencies.foreach {
case shuffleDep: ShuffleDependency[_, _, _] =>
parents += shuffleDep
case dependency =>
waitingForVisit.push(dependency.rdd)
}
}
}
parents
} private def getOrCreateShuffleMapStage(
shuffleDep: ShuffleDependency[_, _, _],
firstJobId: Int): ShuffleMapStage = {
shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
//如果已经创建了shullfeDependecy对应的shullfeMapStage,则直接返回shullfeMapStage
case Some(stage) =>
stage
//否则调用getMissingAncestorShuffleDependencies找到所有未创建shuffleMapStage的shullfeDependecy
//并调用createShuffleMapStage创建shullfeMapStage并注册
case None =>
getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
createShuffleMapStage(dep, firstJobId)
}
}
// 最后为当前shullfeDependecy创建shullfeMapStage并注册
createShuffleMapStage(shuffleDep, firstJobId)
}
} private def getMissingAncestorShuffleDependencies(
rdd: RDD[_]): ArrayStack[ShuffleDependency[_, _, _]] = {
val ancestors = new ArrayStack[ShuffleDependency[_, _, _]]
val visited = new HashSet[RDD[_]]
// We are manually maintaining a stack here to prevent StackOverflowError
// caused by recursively visiting
val waitingForVisit = new ArrayStack[RDD[_]]
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
val toVisit = waitingForVisit.pop()
if (!visited(toVisit)) {
visited += toVisit
getShuffleDependencies(toVisit).foreach { shuffleDep =>
if (!shuffleIdToMapStage.contains(shuffleDep.shuffleId)) {
ancestors.push(shuffleDep)
waitingForVisit.push(shuffleDep.rdd)
} // Otherwise, the dependency and its ancestors have already been registered.
}
}
}
ancestors
} def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
//获取shuffleDependecy的rdd属性作为将要创建shuffleMapStage的rdd
val rdd = shuffleDep.rdd
//rdd初始检查
checkBarrierStageWithDynamicAllocation(rdd)
checkBarrierStageWithNumSlots(rdd)
checkBarrierStageWithRDDChainPattern(rdd, rdd.getNumPartitions)
//获取rdd的分区数组的长度作为要创建的shullfeMapStage的task的数量
val numTasks = rdd.partitions.length
//获取将要创建的shullfeMapStage的所有父stage
val parents = getOrCreateParentStages(rdd, jobId)
//生成将要创建的shullfeMapStage的身份标识
val id = nextStageId.getAndIncrement()
//创建shullfeMapStage
val stage = new ShuffleMapStage(
id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker) //更新刚创建的shuffleMapStage的映射关系
stageIdToStage(id) = stage
shuffleIdToMapStage(shuffleDep.shuffleId) = stage
updateJobIdStageIdMaps(jobId, stage) //mapStatus的检查,因为stage可以重试,所以当前stage可能已经执行过,有部分map任务可能执行成功并将mapStatus更新到
//mapOutputTrackerMaster的缓存,当前stage只要复制这些mapStatus即可,避免重复计算
if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
// Kind of ugly: need to register RDDs with the cache and map output tracker here
// since we can't do it in the RDD constructor because # of partitions is unknown
logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
}
stage
}

 4、提交ResultStage

  submitStage方法会通过入参ResultStage逐层获取父stage,再从最上游stage开始逐步调用TaskScheduler.submitTasks方法提交task集合,最后才提交ResultStage的task集合。

private def submitStage(stage: Stage) {
//获取当前stage对应的jobId
val jobId = activeJobForStage(stage)
//判断获取到的jobId是否能找到对应的ActiveJob
if (jobId.isDefined) {
logDebug("submitStage(" + stage + ")")
//如果当前stage还未提交
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
//获取当前stage的所有未提交的父stage
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
//如果没有未提交的父stage,就提交当前stage中所有未提交的task;否则先提交所有未提交的fustage,并将当前stage加入waitingSrages集合
if (missing.isEmpty) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
submitMissingTasks(stage, jobId.get)
} else {
for (parent <- missing) {
submitStage(parent)
}
waitingStages += stage
}
}
} else { //如果没有找到当前stage对应的ActiveJob,终止依赖于当前stage的所有Job
abortStage(stage, "No active job for stage " + stage.id, None)
}
}

  

最新文章

  1. 【JUC】JDK1.8源码分析之LinkedBlockingQueue(四)
  2. ecmobile-ios笔记
  3. 【javascript】:Highcharts实战
  4. SQL SERVER 使用select和union插入多条数据
  5. 使用SSIS包调度开发的包
  6. Hive参数层面常用优化
  7. 自己动手写了第三阶段的处理器——教学OpenMIPS处理器蓝图
  8. Max Flow
  9. OpenCV角点检测goodFeaturesToTrack()源代码分析
  10. Django用户继承AbstractUser后密码为明文
  11. Java canlendar task
  12. servlet增删改查
  13. luogu4389 付公主的背包
  14. 用Filter实现图片防盗链
  15. re模块逐步进阶
  16. day7:vcp考试
  17. PHP面向对象__set(赋值方法)
  18. 7-13 Power Calculus 快速幂计算 uva1374
  19. Windows下C语言调用dll动态链接库
  20. VMware跨电脑移动Linux虚拟机

热门文章

  1. 对于ado.net dataProvider的介绍
  2. NLP(paper + code)
  3. python 教程 第一章、 简介
  4. react学习(6)——关于组件生命周期的问题
  5. blockchain_eth客户端安装 &amp; geth使用 &amp;批量转账(一)
  6. springboot 集成swagger ui
  7. WPF - 本质:数据和行为
  8. Tab切换效果的实现
  9. WPF生命周期
  10. MVC WebApi的两种访问方法