理解stage, 关键就是理解Narrow DependencyWide Dependency, 可能还是觉得比较难理解
关键在于是否需要shuffle, 不需要shuffle是可以随意并发的, 所以stage的边界就是需要shuffle的地方, 如下图很清楚

并且Stage分为两种,

shuffle map stage, in which case its tasks' results are input for another stage
其实就是,非最终stage, 后面还有其他的stage, 所以它的输出一定是需要shuffle并作为后续的输入
result stage, in which case its tasks directly compute the action that initiated a job (e.g. count(), save(), etc)
最终的stage, 没有输出, 而是直接产生结果或存储

 

1 stage class

这个注释写的很清楚
可以看到stage的RDD参数只有一个RDD, final RDD, 而不是一系列的RDD
因为在一个stage中的所有RDD都是map, partition不会有任何改变, 只是在data依次执行不同的map function
所以对于task scheduler而言, 一个RDD的状况就可以代表这个stage

/**
* A stage is a set of independent tasks all computing the same function that need to run as part
* of a Spark job, where all the tasks have the same shuffle dependencies. Each DAG of tasks run
* by the scheduler is split up into stages at the boundaries where shuffle occurs, and then the
* DAGScheduler runs these stages in topological order.
*
* Each Stage can either be a shuffle map stage, in which case its tasks' results are input for
* another stage, or a result stage, in which case its tasks directly compute the action that
* initiated a job (e.g. count(), save(), etc). For shuffle map stages, we also track the nodes
* that each output partition is on.
*
* Each Stage also has a jobId, identifying the job that first submitted the stage. When FIFO
* scheduling is used, this allows Stages from earlier jobs to be computed first or recovered
* faster on failure.
*/
private[spark] class Stage(
val id: Int,
val rdd: RDD[_], // final RDD
val shuffleDep: Option[ShuffleDependency[_,_]], // Output shuffle if stage is a map stage
val parents: List[Stage], // 父stage
val jobId: Int,
callSite: Option[String])
extends Logging { val isShuffleMap = shuffleDep != None // 是否是shuffle map stage, 取决于是否有shuffleDep
val numPartitions = rdd.partitions.size
val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil) // 用于buffer每个shuffle中每个maptask的MapStatus
var numAvailableOutputs = 0 private var nextAttemptId = 0
 
  def isAvailable: Boolean = {
if (!isShuffleMap) {
true
} else {
numAvailableOutputs == numPartitions
}
}

}

 

2 newStage

如果是shuffle map stage, 需要在这里向mapOutputTracker注册shuffle

  /**
* Create a Stage for the given RDD, either as a shuffle map stage (for a ShuffleDependency) or
* as a result stage for the final RDD used directly in an action. The stage will also be
* associated with the provided jobId.
*/
private def newStage(
rdd: RDD[_],
shuffleDep: Option[ShuffleDependency[_,_]],
jobId: Int,
callSite: Option[String] = None)
: Stage =
{
if (shuffleDep != None) {
// Kind of ugly: need to register RDDs with the cache and map output tracker here
// since we can't do it in the RDD constructor because # of partitions is unknown
logInfo("Registering RDD " + rdd.id + " (" + rdd.origin + ")")
mapOutputTracker.registerShuffle(shuffleDep.get.shuffleId, rdd.partitions.size)
}
val id = nextStageId.getAndIncrement()
val stage = new Stage(id, rdd, shuffleDep, getParentStages(rdd, jobId), jobId, callSite)
stageIdToStage(id) = stage
stageToInfos(stage) = StageInfo(stage)
stage
}

3 getMissingParentStages

可以根据final stage的deps找出所有的parent stage

  private def getMissingParentStages(stage: Stage): List[Stage] = {
val missing = new HashSet[Stage]
val visited = new HashSet[RDD[_]]
def visit(rdd: RDD[_]) {
if (!visited(rdd)) {
visited += rdd
if (getCacheLocs(rdd).contains(Nil)) {
for (dep <- rdd.dependencies) {
dep match {
case shufDep: ShuffleDependency[_,_] => // 如果发现ShuffleDependency, 说明遇到新的stage
val mapStage = getShuffleMapStage(shufDep, stage.jobId) // check shuffleToMapStage, 如果该stage已经被创建则直接返回, 否则newStage
if (!mapStage.isAvailable) {
missing += mapStage
}
case narrowDep: NarrowDependency[_] => // 对于NarrowDependency, 说明仍然在这个stage中
visit(narrowDep.rdd)
}
}
}
}
}
visit(stage.rdd)
missing.toList
}

最新文章

  1. C#开发中常用方法2------json转DataTable
  2. WBS功能分解
  3. USB通信协议——深入理解
  4. Beta版本冲刺Day6
  5. Poj-2250-Compromise
  6. LeetCode 122
  7. iOS安全——代码混淆&amp;反编译
  8. 连接ACCESS的AccessHelper.cs类
  9. [转]Oracle查询树形数据的叶节点和子节点
  10. 新安装 wampserver 出现 You don&#39;t have permission to access / on this server. 或者访问数据库出现You don&#39;t have permission to access /phpmyadmin/ on this server.(解决方法)转
  11. CFLAGS/CPPFLAGS/CXXFLAGS in Makefile介绍
  12. centos directory server
  13. Android 通过应用程序来设置系统的日期和时间中的
  14. juggle dsl语法介绍及codegen浅析
  15. python-给微信好友自动发送天气预报和每日一句
  16. PMP:10.项目采购管理
  17. 大型运输行业实战_day01_2_需求文档
  18. HTML图片热区map area的用法(转)
  19. Splash runjs() 方法
  20. 教你玩转产品管理系统iClap(基础功能篇)

热门文章

  1. cocos2dx 3.x 开发环境搭建
  2. iOS_文件上传进度条的实现思路-AFNettworking
  3. 自定义流水号的autocode
  4. 《TCP/IP图解》读书笔记
  5. 基于html5和css3响应式全屏滚动页面切换效果
  6. 应有dataGridView控件
  7. Maven基础命令
  8. 修复安卓的bug
  9. 关于如何自定义handler
  10. SQL on Hadoop 的真相(1)