Spark系列(八)Worker工作原理
2024-10-13 00:44:04
工作原理图
源代码分析
包名:org.apache.spark.deploy.worker
启动driver入口点:registerWithMaster方法中的case LaunchDriver
1 | ) => DriverState.FINISHED |
37 | case _ => DriverState.FAILED |
38 | } |
39 | } |
40 | |
41 | finalState = Some(state) |
42 | // 向Driver所属worker发送DriverStateChanged消息 |
43 | worker ! DriverStateChanged(driverId, state, finalException) |
44 | } |
45 | }.start() |
46 | } |
LaunchExecutor
管理LaunchExecutor的启动
1 | case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) => |
2 | if (masterUrl != activeMasterUrl) { |
3 | logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.") |
4 | } else { |
5 | try { |
6 | logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) |
7 | |
8 | // Create the executor's working directory |
9 | // 创建executor本地工作目录 |
10 | val executorDir = new File(workDir, appId + "/" + execId) |
11 | if (!executorDir.mkdirs()) { |
12 | throw new IOException("Failed to create directory " + executorDir) |
13 | } |
14 | |
15 | // Create local dirs for the executor. These are passed to the executor via the |
16 | // SPARK_LOCAL_DIRS environment variable, and deleted by the Worker when the |
17 | // application finishes. |
18 | val appLocalDirs = appDirectories.get(appId).getOrElse { |
19 | Utils.getOrCreateLocalRootDirs(conf).map { dir => |
20 | Utils.createDirectory(dir).getAbsolutePath() |
21 | }.toSeq |
22 | } |
23 | appDirectories(appId) = appLocalDirs |
24 | // 创建ExecutorRunner对象 |
25 | val manager = new ExecutorRunner( |
26 | appId, |
27 | execId, |
28 | appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)), |
29 | cores_, |
30 | memory_, |
31 | self, |
32 | workerId, |
33 | host, |
34 | webUi.boundPort, |
35 | publicAddress, |
36 | sparkHome, |
37 | executorDir, |
38 | akkaUrl, |
39 | conf, |
40 | appLocalDirs, ExecutorState.LOADING) |
41 | // executor加入本地缓存 |
42 | executors(appId + "/" + execId) = manager |
43 | manager.start() |
44 | // 增加worker已使用core |
45 | coresUsed += cores_ |
46 | // 增加worker已使用memory |
47 | memoryUsed += memory_ |
48 | // 通知master发送ExecutorStateChanged消息 |
49 | master ! ExecutorStateChanged(appId, execId, manager.state, None, None) |
50 | } |
51 | // 异常情况处理,通知master发送ExecutorStateChanged FAILED消息 |
52 | catch { |
53 | case e: Exception => { |
54 | logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e) |
55 | if (executors.contains(appId + "/" + execId)) { |
56 | executors(appId + "/" + execId).kill() |
57 | executors -= appId + "/" + execId |
58 | } |
59 | master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, |
60 | Some(e.toString), None) |
61 | } |
62 | } |
63 | } |
总结
1、Worker、Driver、Application启动后都会向Master进行注册,并缓存到Master内存数据模型中
2、完成注册后发送LaunchExecutor、LaunchDriver到Worker
3、Worker收到消息后启动executor和driver进程,并调用Worker的ExecutorStateChanged和DriverStateChanged方法
4、发送ExecutorStateChanged和DriverStateChanged消息到Master的,根据各自的状态信息进行处理,最重要的是会调用schedule方法进行资源的重新调度
最新文章
- java 内部类 *** 最爱那水货
- CSS Hack大全-教你如何区分出IE6-IE10、FireFox、Chrome、Opera
- php大力力 [043节] 现在要做个删除前的提示功能
- json一些特点
- asmdisk opened &; asmdisk cached
- Java SE学习之printf 日期转换符
- Qt之模型/视图(自定义按钮)(使用QStyleOption的子类进行drawControl,和我用的方法完全不一样)
- Entityframework 伪CodeFirst开发模式应用于Sqlite数据库
- .Net轻量级ORM-NPoco的使用方法-摘自NPoco国外官方Wiki
- beanshell postprocessor解决编码
- Oracle_建表
- JavaScript中Array数组的方法
- 解决mysql插入数据报错[Err] 1146 - Table 'performance_schema.session_status' doesn't exist
- JDK动态代理和cglib代理详解
- Python默认参数的坑
- 在sublime中安装使用TortoiseSVN-sublime使用心得(4)
- python百分号%—%s、%d、%f
- 前端- html -总结
- 【转载】Hybrid APP了解
- 2243: [SDOI2011]染色 树链剖分+线段树染色
热门文章
- java调用phantomjs采集ajax加载生成的网页
- win8系统 Reflect 破解
- PHP命名空间概念解析
- [51NOD1105]第k大的数(二分答案)
- bzoj2875: [Noi2012]随机数生成器
- busybox filesystem httpd php-5.5.31 sqlite3 webserver
- HDU 5312 Sequence (规律题)
- HDU 1213 How Many Tables (并查集,常规)
- 省常中模拟 Test4
- Darwin Streaming Server用vs2005编译运行过程