一、主备切换机制原理剖析

1、图解

Master实际上可以配置两个,那么Spark原生的standalone模式是支持Master主备切换的。也就是说,当Active Master节点挂掉时,可以将StandBy master节点切换
为Active Master。

Spark Master主备切换可以基于两种机制,一种是基于文件系统的,一种是基于Zookeeper的。基于文件系统的主备切换机制,需要在Active Master挂掉之后,由我们
手动切换到StandBy Master上;而基于Zookeeper的主备切换机制,可以自动实现切换Master。

所以这里说的主备切换机制,实际上指的是在Active Master挂掉之后,切换到StandBy Master时,Master会执行的操作。

首先,StandBy Master会使用持久化引擎去读取持久化的storedApps,storedDrivers,storedWorkers。持久化引擎有两种:
FileSystemPersistenceEngine和ZookeeperPersistentEngine。读取出来后,会进行判断,如果storedApps,storedDrivers,storedWorkers有任何一个是非空的,
继续向下执行,去启动master恢复机制,将持久化的Application,Driver,Worker信息重新进行注册,注册到Master内部的缓存结构中。注册完之后,
将Application和Worker的状态修改为UNKNOWN,然后向Application所对应的Driver,以及Worker发送StandBy Master的地址。Driver和Worker,理论上来说,
如果它们目前都在正常运行的话,那么在接收到Master发送来的地址之后,就会返回相应消息给新的Master。此时,Master在陆续接收到Driver和Worker发送
来的响应消息后,会使用completeRecovery()方法对没有发送响应消息的Driver和Worker进行处理,过滤掉它们的信息。最后,调用Master自己的schedule()方法,
对正在等待资源调度的Driver和Application进行调度,比如在某个worker上启动Driver,或者为Application在Worker上启动它需要的Executor。

2、部分源码

###master.scala中的completeRecovery方法:

    /*
* 完成Master的主备切换
*/
def completeRecovery() {
// Ensure "only-once" recovery semantics using a short synchronization period.
synchronized {
if (state != RecoveryState.RECOVERING) { return }
state = RecoveryState.COMPLETING_RECOVERY
}
/*
* 将Application和worker,过滤出来目前的状态还是UNKNOW的
* 然后遍历,分别调用removeWorker和finishApplication方法,
* 对可能已经出故障,或者甚至已经死掉的Application和Worker,进行清理
*/
// Kill off any workers and apps that didn't respond to us.
workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker)
apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication) // Reschedule drivers which were not claimed by any workers
drivers.filter(_.worker.isEmpty).foreach { d =>
logWarning(s"Driver ${d.id} was not found after master recovery")
if (d.desc.supervise) {
logWarning(s"Re-launching ${d.id}")
relaunchDriver(d)
} else {
removeDriver(d.id, DriverState.ERROR, None)
logWarning(s"Did not re-launch ${d.id} because it was not supervised")
}
} state = RecoveryState.ALIVE
schedule()
logInfo("Recovery complete - resuming operations!")
}

二、注册机制原理剖析与源码分析

1、图解

master 的注册流程:
1 worker.
当worker 启动之后,就会主动的向master 进行注册。 master接收到worker的注册请求之后,会将状态为DEAD的worker过滤掉。对于状态为UNKNOWN的worker 节点清理掉worker的信息替换为新的worker节点。 把worker 的注册信息写入到内存缓存中(hashmap) 用持久化引擎,将worker 信息进行持久化(文件系统,zookeeper) 调用schedule()方法; 2.Driver
用spark-submit 提交Application 首先会注册Driver 将Driver 信息写入到内存中 加入等待调度队列 用持久化引擎将driver信息写入到 文件系统/zookeeper 调用schedule()方法,进行资源调度; Driver 启动好了之后 执行编写的application代码 执行 Sparkcontext 初始化 底层的 SparkDeploySchedulerBackend 会通过ClientActor
发送RegisterApplication,到master进行Application 注册。 将Application 信息写入内存。 将application 写入对面。 调用持久化将application 写入到文件系统/zookeeper

2、master.scala中的Application注册原理代码分析

case RegisterApplication(description) => {
//如果master的状态是standby,就是当前的这个master,是standby master,
//而不是Active Master,那么,当Application来请求注册时,会忽略请求。
if (state == RecoveryState.STANDBY) {
// ignore, don't send response
} else {
logInfo("Registering app " + description.name)
//用ApplicationDescription信息,创建ApplicationInfo
val app = createApplication(description, sender)
//注册Application
//将Application加入缓存中,并将Application加入等待调度的队列中-waitingApps
registerApplication(app)
logInfo("Registered app " + description.name + " with ID " + app.id)
//使用持久化引擎,将Application进行持久化
persistenceEngine.addApplication(app)
//反向,向SparkDeploySchedulerBackend的APPClient的ClientActor,发送消息,也就是RegisterApplication
sender ! RegisteredApplication(app.id, masterUrl)
schedule()
}

三、状态改变机制源码分析

1、Driver状态改变

###Master.scala 

case DriverStateChanged(driverId, state, exception) => {
state match {
// 如果Driver的状态是错误、完成、杀死、失败,就移除Driver
case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>
removeDriver(driverId, state, exception)
case _ =>
throw new Exception(s"Received unexpected state update for driver $driverId: $state")
} // 删除driver
def removeDriver(driverId: String, finalState: DriverState, exception: Option[Exception]) {
//用Scala高阶函数find()根据driverId,查找到driver
drivers.find(d => d.id == driverId) match {
case Some(driver) =>
logInfo(s"Removing driver: $driverId")
//将driver将内存缓存中删除
drivers -= driver
if (completedDrivers.size >= RETAINED_DRIVERS) {
val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
completedDrivers.trimStart(toRemove)
}
//将driver加入到已经完成的completeDrivers
completedDrivers += driver
//从持久化引擎中删除driver
persistenceEngine.removeDriver(driver)
//设置driver状态设置为完成
driver.state = finalState
driver.exception = exception
//从worker中遍历删除传入的driver
driver.worker.foreach(w => w.removeDriver(driver))
//重新调用schedule
schedule()
case None =>
logWarning(s"Asked to remove unknown driver: $driverId")
}
}

2、Executor状态改变

###org.apache.spark.deploy.master/Master.scala

    case ExecutorStateChanged(appId, execId, state, message, exitStatus) => {
// 找到Executor对应的Application,然后再反过来通过Application内部的Executor缓存获取Executor信息
val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))
execOption match {
case Some(exec) => {
// 如果有值
val appInfo = idToApp(appId)
exec.state = state
if (state == ExecutorState.RUNNING) { appInfo.resetRetryCount() }
// 向driver同步发送ExecutorUpdated消息
exec.application.driver ! ExecutorUpdated(execId, state, message, exitStatus)
// 判断,如果Executor完成了
if (ExecutorState.isFinished(state)) {
// Remove this executor from the worker and app
logInfo(s"Removing executor ${exec.fullId} because it is $state")
// 从Application缓存中移除Executor
appInfo.removeExecutor(exec)
// 从运行Executor的Worker的缓存中移除Executor
exec.worker.removeExecutor(exec)
// 判断 如果Executor的退出状态是非正常的
val normalExit = exitStatus == Some(0)
// Only retry certain number of times so we don't go into an infinite loop. if (!normalExit) {
// 判断Application当前的重试次数,是否达到了最大值,最大值是10
// 也就是说,Executor反复调度都是失败,那么认为Application也失败了
if (appInfo.incrementRetryCount() < ApplicationState.MAX_NUM_RETRY) {
// 重新进行调度
schedule()
} else {
// 否则,进行移除Application操作
val execs = appInfo.executors.values
if (!execs.exists(_.state == ExecutorState.RUNNING)) {
logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +
s"${appInfo.retryCount} times; removing it")
removeApplication(appInfo, ApplicationState.FAILED)
}
}
}
}
}
case None =>
logWarning(s"Got status update for unknown executor $appId/$execId")
}
} ###removeApplication()方法: def removeApplication(app: ApplicationInfo, state: ApplicationState.Value) {
if (apps.contains(app)) {
logInfo("Removing app " + app.id)
//从application队列(hashset)中删除当前application
apps -= app
idToApp -= app.id
actorToApp -= app.driver
addressToApp -= app.driver.path.address
if (completedApps.size >= RETAINED_APPLICATIONS) {
val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
completedApps.take(toRemove).foreach( a => {
appIdToUI.remove(a.id).foreach { ui => webUi.detachSparkUI(ui) }
applicationMetricsSystem.removeSource(a.appSource)
})
completedApps.trimStart(toRemove)
}
//加入已完成的application队列
completedApps += app // Remember it in our history
//从当前等待运行的application队列中删除当前APP
waitingApps -= app // If application events are logged, use them to rebuild the UI
rebuildSparkUI(app) for (exec <- app.executors.values) {
//停止executor
exec.worker.removeExecutor(exec)
exec.worker.actor ! KillExecutor(masterUrl, exec.application.id, exec.id)
exec.state = ExecutorState.KILLED
}
app.markFinished(state)
if (state != ApplicationState.FINISHED) {
//从driver中删除application
app.driver ! ApplicationRemoved(state.toString)
}
//从持久化引擎中删除application
persistenceEngine.removeApplication(app)
//从新调度任务
schedule() // Tell all workers that the application has finished, so they can clean up any app state.
//告诉所有的worker,APP已经启动完成了,所以他们可以清空APP state
workers.foreach { w =>
w.actor ! ApplicationFinished(app.id)
}
}
}

四、资源调度算法原理剖析与源码分析

1、剖析

首先判断,master状态不是ALIVE的话,直接返回,也就是说,standby master是不会进行Application等资源调度的;

首先调度Driver
只有用yarn-cluster模式提交的时候,才会注册driver,因为standalone和yarn-client模式,都会在本地直接启动driver,而不会来注册driver,就更不可能让master来调度driver了; Application的调度机制
首先,Application的调度算法有两种,一种是spreadOutApps,另一种是非spreadOutApps,默认是spreadOutApps; 通过spreadOutApps这种算法,其实会将每个Application,要启动的Executor,都平均分布到各个worker上去; 比如有20个cpu core要分配,有10个worker,那么实际上会循环两遍worker,每次循环,给每个worker分配一个core,最后每个worker分配了两个core; 所以,比如,spark-submit里,配置的是要10个executor,每个要2个core,那么总共是20个core,但这种算法下,其实总共只会启动2个executor,每个有10个core; 非spreadOutApps调度算法,将每一个application,尽可能少的分配到Worker上去,这种算法和spreadOutApps算法正好相反,每个application都尽可能分配到尽量
少的worker上去; 比如总共有10个worker,每个有10个core,Application总共要分配20个core,
那么其实只会分配到两个worker上,每个worker都占满10个core,那么其余的application,就只能分配到下一个worker了;

2、源码

###org.apache.spark.deploy.master/Master.scala

 private def schedule() {
// 首先判断,master状态不是ALIVE的话,直接返回
// 也就是说,standby master是不会进行Application等资源调度的
if (state != RecoveryState.ALIVE) { return } // First schedule drivers, they take strict precedence over applications
// Randomization helps balance drivers
// Random.shuffle的原理,就是对传入的集合的元素进行随机的打乱
// 取出了Workers中所有之前注册上来的worker,进行过滤,必须状态位ALIVE的worker
// 对状态为ALIVE的worker,调用Random.shuffle方法进行随机的打乱
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
// 拿到worker数量
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0 // 首先调度Driver
// 只有用yarn-cluster模式提交的时候,才会注册driver,因为standalone和yarn-client模式,都会在本地直接启动driver,而
// 不会来注册driver,就更不可能让master来调度driver了 // driver调度机制
// 遍历waitingDrivers ArrayBuffer
for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
// We assign workers to each waiting driver in a round-robin fashion. For each driver, we
// start from the last worker that was assigned a driver, and continue onwards until we have
// explored all alive workers.
var launched = false
var numWorkersVisited = 0
// while的条件 numWorkersVisited小于numWorkersAlive 只要还有活着的worker没有遍历到,就继续遍历
// 而且当前这个driver还没有被启动,也就是launched为false
while (numWorkersVisited < numWorkersAlive && !launched) {
val worker = shuffledAliveWorkers(curPos)
numWorkersVisited += 1
// 如果当前这个worker的空闲内存量大于等于driver需要的内存
// 并且worker的空闲cpu数量大于等于driver所需要的CPU数量
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
// 启动driver
launchDriver(worker, driver)
// 将driver从waitingDrivers队列中移除
waitingDrivers -= driver
// launched设置为true
launched = true
}
// 将指针指向下一个worker
curPos = (curPos + 1) % numWorkersAlive
}
} // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
// Application的调度机制
// 首先,Application的调度算法有两种,一种是spreadOutApps,另一种是非spreadOutApps
// 默认是spreadOutApps
if (spreadOutApps) {
// Try to spread out each app among all the nodes, until it has all its cores
// 首先,遍历waitingApps中的ApplicationInfo,并且过滤出Application还有需要调度的core的Application
for (app <- waitingApps if app.coresLeft > 0) {
// 从worker中过滤出状态为ALIVE的Worker
// 再次过滤出可以被Application使用的Worker,Worker剩余内存数量大于等于Application的每一个Actor需要的内存数量,而且该Worker没有运行过该Application对应的Executor
// 将Worker按照剩余cpu数量倒序排序
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(canUse(app, _)).sortBy(_.coresFree).reverse
val numUsable = usableWorkers.length
// 创建一个空数组,存储要分配给每个worker的cpu数量
val assigned = new Array[Int](numUsable) // Number of cores to give on each node
// 获取到底要分配多少cpu,取app剩余要分配的cpu的数量和worker总共可用cpu数量的最小值
var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
// 通过这种算法,其实会将每个Application,要启动的Executor,都平均分布到各个worker上去
// 比如有20个cpu core要分配,有10个worker,那么实际上会循环两遍worker,每次循环,给每个worker分配一个core,最后每个worker分配了两个core
// 所以,比如,spark-submit里,配置的是要10个executor,每个要2个core,那么总共是20个core,但这种算法下,其实总共只会启动2个executor,每个有10个core // while条件,只要 要分配的cpu,还未分配完,就继续循环
var pos = 0
while (toAssign > 0) {
// 每一个Worker,如果空闲的cpu数量大于已经分配出去的cpu数量,也就是说worker还有可分配的cpu
if (usableWorkers(pos).coresFree - assigned(pos) > 0) {
// 将总共要分配的cpu数量-1,因为这里已经决定在这个worker上分配一个cpu了
toAssign -= 1
// 给这个worker分配的cpu数量,加1
assigned(pos) += 1
}
// 指针移动到下一个worker
pos = (pos + 1) % numUsable
}
// Now that we've decided how many cores to give on each node, let's actually give them
// 给每个worker分配完Application要求的cpu core之后 遍历worker
for (pos <- 0 until numUsable) {
// 只要判断之前给这个worker分配到了core
if (assigned(pos) > 0) {
// 那么就在worker上启动Executor
// 首先,在Application内部缓存结构中,添加Executor,并且创建ExecutorDesc对象,其中封装了,给这个Executor分配多少个cpu core
// 这里,spark 1.3.0版本的Executor启动的内部机制
// 在spark-submit脚本中,可以指定要多少个Executor,每个Executor需要多少个cpu,多少内存
// 那么基于spreadOutApps机制,实际上,最终,Executor的实际数量,以及每个Executor的cpu,可能与配置是不一样的
// 因为我们这里是基于总的cpu来分配的,就是说,比如要求3个Executor,每个要三个cpu,有9个worker,每个有1个cpu
// 那么根据这种算法,会给每个worker分配一个core,然后给每个worker启动一个Executor
// 最后会启动9个Executor,每个Executor有一个cpu core
val exec = app.addExecutor(usableWorkers(pos), assigned(pos))
// 在worker上启动Executor
launchExecutor(usableWorkers(pos), exec)
// 将application的状态设置为RUNNING
app.state = ApplicationState.RUNNING
}
}
}
} else {
// Pack each app into as few nodes as possible until we've assigned all its cores
// 非spreadOutApps调度算法,将每一个application,尽可能少的分配到Worker上去
// 这种算法和spreadOutApps算法正好相反,每个application都尽可能分配到尽量少的worker上去
// 比如总共有10个worker,每个有10个core,Application总共要分配20个core
// 那么其实只会分配到两个worker上,每个worker都占满10个core,那么其余的application,就只能分配到下一个worker了 // 遍历worker,并且状态为ALIVE。还有空闲空间的worker
for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
// 遍历application,并且是还有需要分配的core的application
for (app <- waitingApps if app.coresLeft > 0) {
// 判断,如果当前这个worker可以被application使用
if (canUse(app, worker)) {
// 取worker剩余cpu数量,与application要分配的cpu数量的最小值
val coresToUse = math.min(worker.coresFree, app.coresLeft)
// 如果worker剩余cpu为0,那么就不分配了
if (coresToUse > 0) {
// 给application添加一个executor
val exec = app.addExecutor(worker, coresToUse)
// 在worker上启动executor
launchExecutor(worker, exec)
// 将application的状态设置为RUNNING
app.state = ApplicationState.RUNNING
}
}
}
}
}
} ###launchExecutor()方法 def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
// 将Executor加入worker内部的缓存
worker.addExecutor(exec)
// 向worker的actor发送LaunchExecutor消息
worker.actor ! LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)
// 向Executor对应的application对应的driver,发送ExecutorAdded消息
exec.application.driver ! ExecutorAdded(
exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
} canUse()方法 def canUse(app: ApplicationInfo, worker: WorkerInfo): Boolean = {
worker.memoryFree >= app.desc.memoryPerSlave && !worker.hasExecutor(app)
}

最新文章

  1. 浅谈c语言的指针
  2. Mac Mini中添加VNC访问
  3. 谷歌大神Jeff Dean:大规模深度学习最新进展 zz
  4. JS加密库Crypto-JS SEA加密
  5. Android数据的四种存储方式
  6. 如何使用java中的对象
  7. 数往知来C#面向对象〈三〉
  8. 7款外观迷人的HTML5/CSS3 3D按钮特效
  9. The Famous Clock
  10. Implementing Remote Validation in MVC
  11. 201521123035 《Java程序设计》第九周学习总结
  12. js fetch api
  13. defaultdict
  14. C#—Dev XtraTabControl操作总结如动态增加Tab和关闭选项卡方法等
  15. oc门
  16. 推荐一个MacOS苹果电脑系统解压缩软件
  17. JAVA开发中文乱码的几个解决方案
  18. kibana 与 grafana
  19. 机器学习--集成学习(Ensemble Learning)
  20. VS15 openGL 编程指南 配置库 triangle例子

热门文章

  1. Python爬虫快速上手教程
  2. zookerper入门、核心概念和使用场景
  3. 进入恢复模式(Recovery HD)
  4. H3C S3600V2 通过CONSOLE配置端口镜像
  5. z7z8记录
  6. Referer和空Referer
  7. SAP云平台上的SSO Principal Propagation设置
  8. Protobuf的上手使用
  9. js javascirpt 数学库、 算法库 (转载)
  10. 使用PLSQL工具连接远程Oracle