TaskScheduler概述:

TaskScheduler是一个可插拔任务调度接口,通过不同的SchedulerBackend进行任务的调度。主要功能如下:

1、一个TaskScheduler只为一个SparkContext服务,接收DAGScheduler提交过来的一组组的TaskSet;

2、TaskScheduler将task提交到集群中并执行,如果其中某个Task执行失败则重试之;TaskScheduler将TaskSet对应的执行结果返回才DAGScheduler;

3、TaskScheduler处理straggle任务(比如:100个任务运行,其中99个任务快,1个任务慢,需要在另外一个节点上开启一个相同的任务来运行,谁先完成取用谁);

4、遇到shuffle输出丢失则汇报给DAGScheduler;

5、为每个TaskSet维护一个TaskSetManager追踪本地性(resourceOffer-->findTask)及错误信息;

TaskSet.scala

private[spark] class TaskSet(
val tasks: Array[Task[_]],
val stageId: Int, //该TaskSet对应哪个stage
val attempt: Int,
val priority: Int,
val properties: Properties) {
val id: String = stageId + "." + attempt
}

Task分析:

1、Task是Executor中的执行单元;不像MR中,这里并没有map/reduce任务;

2、Task处理数据常见的两个来源:外部存储以及shuffle数据;

3、Task可以运行在集群中的任意一个节点上(最差的情况就是集群节点之间数据的传输);

4、Task可以使用缓存但是已经被置换出来的数据;

5、为了容错,会将shuffle输出写到磁盘或者内存中;

Spark中有两种Task:

1、ShuffleMapTASK:输出的数据作为后续操作的来源

  A ShuffleMapTask divides the elements of an RDD into multiple buckets (based on a partitioner);

2、ResultTask:输出的是结果

  A task that sends back the output to the driver application.

源码执行流程:TaskSchedulerImpl.scala

override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
this.synchronized {
val manager = new TaskSetManager(this, taskSet, maxTaskFailures) //每个taskset被封装成一个TaskSetManager
activeTaskSets(taskSet.id) = manager
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) //将tasksetmanager添加到调度器中,FIFO/Fair ......
hasReceivedTask = true
}
backend.reviveOffers() //请求资源执行task,backend是SchedulerBackend,向DriverActor发送ReviveOffers的请求
} CoarseGrainedSchedulerBackend.scala
override def reviveOffers() {
driverActor ! ReviveOffers
} case ReviveOffers =>
makeOffers
() def makeOffers() { //启动tasks
launchTasks(scheduler.resourceOffers(
executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))
} TaskSchedulerImpl.scala
//从FIFO或者Fair调度器哪里获得拍戏后的TaskSetManager
def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
SparkEnv.set(sc.env) ..... // Randomly shuffle offers to avoid always placing tasks on the same set of workers.
val shuffledOffers = Random.shuffle(offers)
// Build a list of tasks to assign to each worker.
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
val availableCpus = shuffledOffers.map(o => o.cores).toArray
val sortedTaskSets = rootPool.getSortedTaskSetQueue // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
// of locality levels so that it gets a chance to launch local tasks on all of them.
var launchedTask = false
for (taskSet <- sortedTaskSets; maxLocality <- TaskLocality.values) {
do {
launchedTask = false
for (i <- 0 until shuffledOffers.size) {
val execId = shuffledOffers(i).executorId
val host = shuffledOffers(i).host
if (availableCpus(i) >= CPUS_PER_TASK) {
for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { //考虑locality等因素来确定task的信息
...
launchedTask = true
}
}
}
} while (launchedTask)
} if (tasks.size > 0) {
hasLaunchedTask = true
}
return tasks
} TaskSetManager.scala
// Respond to an offer of a single executor from the scheduler by finding a task
resourceOffer{
findTask(execId, host, allowedLocality) match { //找到合适的可本地性的任务
......
}
} CoarseGrainedSchedulerBackend.scala
// Launch tasks returned by a set of resource offers
def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
      val serializedTask = ser.serialize(task) //序列化每个task
      if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) { //task序列化后的大小超过指定的大小就中断执行
    taskSet.abort(msg)
      }else{
        //向CoarseGrainedExecutorBackend发送启动任务的请       
executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask)
  
    }
      }
} CoarseGrainedExecutorBackend.scala
case LaunchTask(data) =>
   if(executor == null){} //一个 CoarseGrainedExecutorBackend 进程有且仅有一个executor对象。
val taskDesc = ser.deserialize[TaskDescription](data.value) //执行之前需要反序列化,因为在提交任务时将任务做的序列化操作
executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
} Executor.scala
val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker")
def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
val tr = new TaskRunner(context, taskId, serializedTask) //serializedTask:任务都是需要序列化的
runningTasks.put(taskId, tr)
threadPool.execute(tr)  //提交到executor执行
}

最新文章

  1. PostCSS深入学习: PostCSS和Sass、Stylus或LESS一起使用
  2. ipad和iphone的适配
  3. python对象
  4. SPSS数据分析—协方差分析
  5. Python爬虫(小练习)
  6. c#基础语言编程-装箱和拆箱
  7. SqlServer中计算列详解
  8. storm-kafka教程
  9. 滚动到指定元素的id处+当元素出现在浏览器显示区域就会自动加载
  10. C#多线程的死锁演示
  11. yeah,我的博客成功建立!
  12. 超越Ctrl+S保存页面所有资源
  13. selenium java 浏览器操作
  14. javascript 自动填充功能
  15. VUE打包上线优化
  16. 快速部署Apache服务静态网站
  17. Starting httpd: httpd: Could not reliably determine the server&#39;s fully qualified domain name
  18. PHP流程控制笔记
  19. 工作记录[续] android OBB
  20. mysql主从不同步问题 Error_code: 1197

热门文章

  1. Python mode_+
  2. clipboard.js 实现web端---&gt; 复制到剪切板功能
  3. WebGL编程指南案例解析之多数据存储于一个缓冲区以及着色器通信
  4. HDU 1588 Gauss Fibonacci(矩阵快速幂)
  5. Ubuntu终端点击确定按钮的方法
  6. requests.exceptions.MissingSchema
  7. android中的两种上下文区别
  8. 【C#】 增加多个分部类
  9. 如何调优JVM
  10. JS 中 this 的用法