spark 笔记 7: DAGScheduler
在前面的sparkContex和RDD都可以看到,真正的计算工作都是同过调用DAGScheduler的runjob方法来实现的。这是一个很重要的类。在看这个类实现之前,需要对actor模式有一点了解:http://en.wikipedia.org/wiki/Actor_model http://www.slideshare.net/YungLinHo/introduction-to-actor-model-and-akka 粗略知道actor模式怎么实现就可以了。另外,应该先看看DAG相关的概念和论文 http://en.wikipedia.org/wiki/Directed_acyclic_graph http://www.netlib.org/utk/people/JackDongarra/PAPERS/DAGuE_technical_report.pdf
/**
* The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of
* stages for each job, keeps track of which RDDs and stage outputs are materialized, and finds a
* minimal schedule to run the job. It then submits stages as TaskSets to an underlying
* TaskScheduler implementation that runs them on the cluster.
*
* In addition to coming up with a DAG of stages, this class also determines the preferred
* locations to run each task on, based on the current cache status, and passes these to the
* low-level TaskScheduler. Furthermore, it handles failures due to shuffle output files being
* lost, in which case old stages may need to be resubmitted. Failures *within* a stage that are
* not caused by shuffle file loss are handled by the TaskScheduler, which will retry each task
* a small number of times before cancelling the whole stage.
*
*/
package org.apache.spark.schedulerprivate[spark]
class DAGScheduler(
private[scheduler] val sc: SparkContext,
private[scheduler] val taskScheduler: TaskScheduler,
listenerBus: LiveListenerBus,
mapOutputTracker: MapOutputTrackerMaster,
blockManagerMaster: BlockManagerMaster,
env: SparkEnv,
clock: Clock = SystemClock)
extends Logging {
private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGScheduler)
extends Actor with Logging {
override def preStart() {
// set DAGScheduler for taskScheduler to ensure eventProcessActor is always
// valid when the messages arrive
dagScheduler.taskScheduler.setDAGScheduler(dagScheduler)
}
/**
* The main event loop of the DAG scheduler.
*/
def receive = {
case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,
listener, properties)
case StageCancelled(stageId) =>
dagScheduler.handleStageCancellation(stageId)
case JobCancelled(jobId) =>
dagScheduler.handleJobCancellation(jobId)
case JobGroupCancelled(groupId) =>
dagScheduler.handleJobGroupCancelled(groupId)
case AllJobsCancelled =>
dagScheduler.doCancelAllJobs()
case ExecutorAdded(execId, host) =>
dagScheduler.handleExecutorAdded(execId, host)
case ExecutorLost(execId) =>
dagScheduler.handleExecutorLost(execId)
case BeginEvent(task, taskInfo) =>
dagScheduler.handleBeginEvent(task, taskInfo)
case GettingResultEvent(taskInfo) =>
dagScheduler.handleGetTaskResult(taskInfo)
case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
dagScheduler.handleTaskCompletion(completion)
case TaskSetFailed(taskSet, reason) =>
dagScheduler.handleTaskSetFailed(taskSet, reason)
case ResubmitFailedStages =>
dagScheduler.resubmitFailedStages()
}
private val nextStageId = new AtomicInteger(0)
private[scheduler] val nextJobId = new AtomicInteger(0)private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]]
private[scheduler] val stageIdToStage = new HashMap[Int, Stage]
private[scheduler] val shuffleToMapStage = new HashMap[Int, Stage]
private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob]
// Stages we need to run whose parents aren't done
private[scheduler] val waitingStages = new HashSet[Stage]
// Stages we are running right now
private[scheduler] val runningStages = new HashSet[Stage]
// Stages that must be resubmitted due to fetch failures
private[scheduler] val failedStages = new HashSet[Stage]
private[scheduler] val activeJobs = new HashSet[ActiveJob]
// Contains the locations that each RDD's partitions are cached on
private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]]
private val dagSchedulerActorSupervisor =
env.actorSystem.actorOf(Props(new DAGSchedulerActorSupervisor(this)))
// A closure serializer that we reuse.
// This is only safe because DAGScheduler runs in a single thread.
private val closureSerializer = SparkEnv.get.closureSerializer.newInstance()
private[scheduler] var eventProcessActor: ActorRef = _
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
allowLocal: Boolean,
callSite: CallSite,
listener: JobListener,
properties: Properties = null)
{
/** Submits stage, but first recursively submits any missing parents. */
private def submitStage(stage: Stage) {
/** Called when stage's parents are available and we can now do its task. */
private def submitMissingTasks(stage: Stage, jobId: Int) {
/** Finds the earliest-created active job that needs the stage */
// TODO: Probably should actually find among the active jobs that need this
// stage the one with the highest priority (highest-priority pool, earliest created).
// That should take care of at least part of the priority inversion problem with
// cross-job dependencies.
private def activeJobForStage(stage: Stage): Option[Int] = {
val jobsThatUseStage: Array[Int] = stage.jobIds.toArray.sorted
jobsThatUseStage.find(jobIdToActiveJob.contains)
}
/**
* Types of events that can be handled by the DAGScheduler. The DAGScheduler uses an event queue
* architecture where any thread can post an event (e.g. a task finishing or a new job being
* submitted) but there is a single "logic" thread that reads these events and takes decisions.
* This greatly simplifies synchronization.
*/
private[scheduler] sealed trait DAGSchedulerEvent
/**
* Asynchronously passes SparkListenerEvents to registered SparkListeners.
*
* Until start() is called, all posted events are only buffered. Only after this listener bus
* has started will events be actually propagated to all attached listeners. This listener bus
* is stopped when it receives a SparkListenerShutdown event, which is posted using stop().
*/
private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
/**
* A SparkListenerEvent bus that relays events to its listeners
*/
private[spark] trait SparkListenerBus extends Logging {
// SparkListeners attached to this event bus
protected val sparkListeners = new ArrayBuffer[SparkListener]
with mutable.SynchronizedBuffer[SparkListener]
def addListener(listener: SparkListener) {
sparkListeners += listener
}
/**
* Post an event to all attached listeners.
* This does nothing if the event is SparkListenerShutdown.
*/
def postToAll(event: SparkListenerEvent) {
/**
* Apply the given function to all attached listeners, catching and logging any exception.
*/
private def foreachListener(f: SparkListener => Unit): Unit = {
sparkListeners.foreach { listener =>
try {
f(listener)
} catch {
case e: Exception =>
logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)
}
}
}
}
最新文章
- 6.Linux的文件权限与目录配置
- linux环境下部署tomcat
- 我YY的一个移动应用运营模式
- samba服务搭建及管理
- cssText
- 多线程基本概论multithread
- 1.7.4.2 Local Parameters in Queries--局部参数
- 函数buf_pool_get
- PAT-乙级-1026. 程序运行时间(15)
- sqlserver 变量
- 对Jsp提交input标签空格和回车的处理
- Html+CSS命名规范:
- Android学习之Animation(二)
- 时区,GMT时间,UTC时间,UNIX时间戳
- vue 路由跳转,传参
- springBoot 自动配置原理--自己新建一个 starter
- windows 2008解决120天授权过期问题(亲测可用)
- bootstrap3在IE8下导航不显示,自动识别成手机模式
- [Linux] ssh-key 公钥文件格式
- XenServer:使用XenCenter开设VPS(多图完整版)