版权声明:本文为原创文章,未经允许不得转载。
复习内容:
Spark中Job如何划分为Stage http://www.cnblogs.com/yourarebest/p/5342424.html

1.Spark中Stage的提交

1.在复习内容中,将Job划分为Stage这一过程的调用起始于方法handleJobSubmitted,同样Stage的提交也包含在该方法中,如下所示:

private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
//(1)根据jobId生成finalStage,详见文章-Spark中Job如何划分为Stage
//(2)Job的提交,详见文章-Spark中Job的提交
//(3)提交stages,但首先循环提交丢失的父Stage(s),即将丢失的stage加入到waitingStages中
//(4)提交Taskset(tasks)
//提交stage,但首先循环提交丢失的父Stage(s),即将丢失的stage加入到waitingStages中,详见2
submitStage(finalStage)
//check for 正在等待或失败的stages ,他们会重新提交
submitWaitingStages()
}

2.submitStage方法如下所示,根据finalStage循环调用submitStage方法进行Stages的提交,

//根据Stage找到jobId
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
logDebug("submitStage(" + stage + ")")
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
//如果没有丢失,那么就提交Stage
if (missing.isEmpty) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
submitMissingTasks(stage, jobId.get)详见3
} else {
for (parent <- missing) {
submitStage(parent)
}
waitingStages += stage
}
}
} else {
abortStage(stage, "No active job for stage " + stage.id, None)
}

3.submitMissingTasks方法如下所示,该方法中包括Stage、TaskSet的提交,TaskSet(tasks)的提交请看文章-Spark中TaskSet(Tasks)的提交

private def submitMissingTasks(stage: Stage, jobId: Int) {
logDebug("submitMissingTasks(" + stage + ")")
// Get our pending tasks and remember them in our pendingTasks entry
stage.pendingPartitions.clear()
//首先找到根据ShuffleMapStage和ResultStage两种类型来找到它们对应的分区的索引ids
val (allPartitions: Seq[Int], partitionsToCompute: Seq[Int]) = {
stage match {
case stage: ShuffleMapStage =>
val allPartitions = 0 until stage.numPartitions
val filteredPartitions = allPartitions.filter { id => stage.outputLocs(id).isEmpty }
(allPartitions, filteredPartitions)
case stage: ResultStage =>
val job = stage.resultOfJob.get
val allPartitions = 0 until job.numPartitions
val filteredPartitions = allPartitions.filter { id => !job.finished(id) }
(allPartitions, filteredPartitions)
}
}
//创建一个内部计算器,如果stage没有accumulator被初始化,那么重置内部的accumulator
if (stage.internalAccumulators.isEmpty || allPartitions == partitionsToCompute) {
stage.resetInternalAccumulators()
}
//得到Job的属性
val properties = jobIdToActiveJob.get(stage.firstJobId).map(_.properties).orNull
runningStages += stage
//SparkListenerStageSubmitted应用在测试task是否被serializable后 然后发送出去
//如果task没有序列化,SparkListenerStageCompleted事件将不会发送出去,它总是在SparkListenerStageSubmitted事件之后
outputCommitCoordinator.stageStart(stage.id)
//得到RDD得到它的分区的位置
val taskIdToLocations = try {
stage match {
case s: ShuffleMapStage =>
partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
case s: ResultStage =>
val job = s.resultOfJob.get
partitionsToCompute.map { id =>
val p = s.partitions(id)
(id, getPreferredLocs(stage.rdd, p))
}.toMap
}
} catch {
case NonFatal(e) =>
stage.makeNewStageAttempt(partitionsToCompute.size)
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}", Some(e))
runningStages -= stage
return
}
stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)
//给JobProgressListener发送SparkListenerStageSubmitted事件
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))

4.SparkListenerStageSubmitted前面我们提到,Job的启动是通过JobProgressListener的onJobStart方法执行的,同样,Stage的提交是通过,对应的事件类型是SparkListenerStageSubmitted,详见下:

override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = synchronized {
val stage = stageSubmitted.stageInfo
activeStages(stage.stageId) = stage
pendingStages.remove(stage.stageId)
val poolName = Option(stageSubmitted.properties).map {
p => p.getProperty("spark.scheduler.pool", SparkUI.DEFAULT_POOL_NAME)
}.getOrElse(SparkUI.DEFAULT_POOL_NAME)
stageIdToInfo(stage.stageId) = stage
val stageData = stageIdToData.getOrElseUpdate((stage.stageId, stage.attemptId), new StageUIData)
stageData.schedulingPool = poolName
stageData.description = Option(stageSubmitted.properties).flatMap {
p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION))
}
val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashMap[Int, StageInfo])
stages(stage.stageId) = stage
for (
activeJobsDependentOnStage <- stageIdToActiveJobIds.get(stage.stageId);
jobId <- activeJobsDependentOnStage;
jobData <- jobIdToData.get(jobId)
) {
jobData.numActiveStages += 1
// If a stage retries again, it should be removed from completedStageIndices set
jobData.completedStageIndices.remove(stage.stageId)
}
}

这样,我们就完成了Stage的提交,下一篇看Task的提交。

最新文章

  1. Java中的进程和线程
  2. Python黑帽编程 4.1 Sniffer(嗅探器)之数据捕获(上)
  3. MVC第二天
  4. 在Ubuntu上安装docker常见问题
  5. HTML学习笔记——post表单
  6. 给定一颗二叉搜索树,请找出其中的第k小的结点。例如, 5 / \ 3 7 /\ /\ 2 4 6 8 中,按结点数值大小顺序第三个结点的值为4。
  7. 图像处理特征不变算子系列之Moravec算子(一)
  8. windows 装 centos
  9. 在vmware 中使用桥连接 连接到网络
  10. jQuery事件 (jQuery实现图片轮播)
  11. AngularJS学习之旅—AngularJS 事件(十四)
  12. 《Odoo开发指南》精选分享—第1章-开始使用Odoo开发(1)
  13. jackson对日期的处理(序列化与反序列化)
  14. VS 2017 激活码
  15. 第6天【egrep、bash环境配置及脚本、vim编辑器】
  16. Linux之Ubuntu安装搜狗输入法
  17. XCode的The argument is invalid
  18. Kerberos认证与攻击学习总结
  19. hashlib 文件校验,MD5动态加盐返回加密后字符
  20. QT创建模态对话框阻塞整个应用程序和非模态对话框唯一性约束的简单示例

热门文章

  1. ECshop网点程序优化-自动生成类目页Keywords、Desciption Meta
  2. ZeroMQ/jzmq安装使用
  3. 温故知新——json
  4. pymssql 安装测试
  5. java JDBC操作MySQL数据库
  6. MVC-Html.ActionLink的几种写法
  7. &lt;三&gt; SQL 基础
  8. String类中常用的操作
  9. python 内置函数 getattr
  10. NUTCH Exception in thread &quot;Thread-12751&quot; java.lang.OutOfMemoryError: PermGen space