工作原理图

 

源代码分析

包名:org.apache.spark.deploy.worker

启动driver入口点:registerWithMaster方法中的case LaunchDriver

) => DriverState.FINISHED
37                case _ => DriverState.FAILED
38              }
39            }
40   
41          finalState = Some(state)
42          // 向Driver所属worker发送DriverStateChanged消息
43          worker ! DriverStateChanged(driverId, state, finalException)
44        }
45      }.start()
46  }

 

LaunchExecutor

管理LaunchExecutor的启动

case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
    if (masterUrl != activeMasterUrl) {
    logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
    } else {
    try {
      logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
 
      // Create the executor's working directory
      // 创建executor本地工作目录
10        val executorDir = new File(workDir, appId + "/" + execId)
11        if (!executorDir.mkdirs()) {
12          throw new IOException("Failed to create directory " + executorDir)
13        }
14   
15        // Create local dirs for the executor. These are passed to the executor via the
16        // SPARK_LOCAL_DIRS environment variable, and deleted by the Worker when the
17        // application finishes.
18        val appLocalDirs = appDirectories.get(appId).getOrElse {
19          Utils.getOrCreateLocalRootDirs(conf).map { dir =>
20            Utils.createDirectory(dir).getAbsolutePath()
21          }.toSeq
22        }
23        appDirectories(appId) = appLocalDirs
24        // 创建ExecutorRunner对象
25        val manager = new ExecutorRunner(
26          appId,
27          execId,
28          appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
29          cores_,
30          memory_,
31          self,
32          workerId,
33          host,
34          webUi.boundPort,
35          publicAddress,
36          sparkHome,
37          executorDir,
38          akkaUrl,
39          conf,
40          appLocalDirs, ExecutorState.LOADING)
41        // executor加入本地缓存
42        executors(appId + "/" + execId) = manager
43        manager.start()
44        // 增加worker已使用core
45        coresUsed += cores_
46        // 增加worker已使用memory
47        memoryUsed += memory_
48        // 通知master发送ExecutorStateChanged消息
49        master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
50      }
51      // 异常情况处理,通知master发送ExecutorStateChanged FAILED消息
52      catch {
53        case e: Exception => {
54          logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)
55          if (executors.contains(appId + "/" + execId)) {
56            executors(appId + "/" + execId).kill()
57            executors -= appId + "/" + execId
58          }
59          master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED,
60            Some(e.toString), None)
61        }
62      }
63  }

 

总结

1、Worker、Driver、Application启动后都会向Master进行注册,并缓存到Master内存数据模型中
2、完成注册后发送LaunchExecutor、LaunchDriver到Worker
3、Worker收到消息后启动executor和driver进程,并调用Worker的ExecutorStateChanged和DriverStateChanged方法
4、发送ExecutorStateChanged和DriverStateChanged消息到Master的,根据各自的状态信息进行处理,最重要的是会调用schedule方法进行资源的重新调度

最新文章

  1. java 内部类 *** 最爱那水货
  2. CSS Hack大全-教你如何区分出IE6-IE10、FireFox、Chrome、Opera
  3. php大力力 [043节] 现在要做个删除前的提示功能
  4. json一些特点
  5. asmdisk opened & asmdisk cached
  6. Java SE学习之printf 日期转换符
  7. Qt之模型/视图(自定义按钮)(使用QStyleOption的子类进行drawControl,和我用的方法完全不一样)
  8. Entityframework 伪CodeFirst开发模式应用于Sqlite数据库
  9. .Net轻量级ORM-NPoco的使用方法-摘自NPoco国外官方Wiki
  10. beanshell postprocessor解决编码
  11. Oracle_建表
  12. JavaScript中Array数组的方法
  13. 解决mysql插入数据报错[Err] 1146 - Table 'performance_schema.session_status' doesn't exist
  14. JDK动态代理和cglib代理详解
  15. Python默认参数的坑
  16. 在sublime中安装使用TortoiseSVN-sublime使用心得(4)
  17. python百分号%—%s、%d、%f
  18. 前端- html -总结
  19. 【转载】Hybrid APP了解
  20. 2243: [SDOI2011]染色 树链剖分+线段树染色

热门文章

  1. java调用phantomjs采集ajax加载生成的网页
  2. win8系统 Reflect 破解
  3. PHP命名空间概念解析
  4. [51NOD1105]第k大的数(二分答案)
  5. bzoj2875: [Noi2012]随机数生成器
  6. busybox filesystem httpd php-5.5.31 sqlite3 webserver
  7. HDU 5312 Sequence (规律题)
  8. HDU 1213 How Many Tables (并查集,常规)
  9. 省常中模拟 Test4
  10. Darwin Streaming Server用vs2005编译运行过程