Spark 源码浅读-任务提交SparkSubmit

main方法

main方法主要用于初始化日志,然后接着调用doSubmit方法。

override def main(args: Array[String]): Unit = {
/*
初始化日志
*/
val submit = new SparkSubmit() {
self => override protected def parseArguments(args: Array[String]): SparkSubmitArguments = {
new SparkSubmitArguments(args) {
override protected def logInfo(msg: => String): Unit = self.logInfo(msg) override protected def logWarning(msg: => String): Unit = self.logWarning(msg) .......   /**
*
*提交任务
*/ submit.doSubmit(args)
}

  doSubmit方法

def doSubmit(args: Array[String]): Unit = {
// Initialize logging if it hasn't been done yet. Keep track of whether logging needs to
// be reset before the application starts.
val uninitLog = initializeLogIfNecessary(true, silent = true) val appArgs = parseArguments(args)
if (appArgs.verbose) {
logInfo(appArgs.toString)
}
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
case SparkSubmitAction.PRINT_VERSION => printVersion()
}
}

  doSubmit方法最终会调用runMain方法

private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
/*
childArgs包含任务设置的各项参数 以及mainClass等
childMainClass 为下图 STANDALONE_CLUSTER_SUBMIT_CLASS
创建Driver 和Master通信
*/
val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
// Let the main class re-initialize the logging system once it starts.
if (uninitLog) {
Logging.uninitialize()
} if (args.verbose) {
logInfo(s"Main class:\n$childMainClass")
logInfo(s"Arguments:\n${childArgs.mkString("\n")}")
// sysProps may contain sensitive information, so redact before printing
logInfo(s"Spark config:\n${Utils.redact(sparkConf.getAll.toMap).mkString("\n")}")
logInfo(s"Classpath elements:\n${childClasspath.mkString("\n")}")
logInfo("\n")
}
val loader = getSubmitClassLoader(sparkConf)
for (jar <- childClasspath) {
addJarToClasspath(jar, loader)
} var mainClass: Class[_] = null try {
mainClass = Utils.classForName(childMainClass)
} catch {
case e: ClassNotFoundException =>
logError(s"Failed to load class $childMainClass.")
if (childMainClass.contains("thriftserver")) {
logInfo(s"Failed to load main class $childMainClass.")
logInfo("You need to build Spark with -Phive and -Phive-thriftserver.")
}
throw new SparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS)
case e: NoClassDefFoundError =>
logError(s"Failed to load $childMainClass: ${e.getMessage()}")
if (e.getMessage.contains("org/apache/hadoop/hive")) {
logInfo(s"Failed to load hive class.")
logInfo("You need to build Spark with -Phive and -Phive-thriftserver.")
}
throw new SparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS)
}

  

最新文章

  1. 由js apply与call方法想到的js数据类型(原始类型和引用类型)
  2. adminLTE的自动化菜单
  3. DIP依赖倒置原则
  4. FreeBSD-安装与配置(10.3@VMware)
  5. 安装pyspider
  6. ACM: hihicoder #1174 : 拓扑排序&#183;一 STL- queue
  7. 用 xampp 在 windows/Linux 下搭建代理服务器
  8. springMVC之HelloWorld
  9. Jquery 扩展方法
  10. 改变UIView 的位置 Center和Frame
  11. ubuntu设置ip和dns
  12. 再次轻度破解EXE文件
  13. HDFS Safemode问题
  14. CodeForces 721A
  15. java udp socket(双通信)
  16. Kotlin实现《第一行代码》案例“酷欧天气”
  17. EffectiveJava阅读笔记(一)
  18. zabbix之 自定义内存使用率监控报警
  19. 修复cocos2dx的Label,WP8下不能换行的问题
  20. fis入门-单文件编译之文件优化(optimize)

热门文章

  1. FL Studio 插件使用教程 —— 3x Osc(下)
  2. 苹果电脑下载器Folx有没有自动下载功能
  3. Spring 事件监听机制及原理分析
  4. 数学分析理论(rudin版)笔记:实数系和复数系.1
  5. 【译】理解Rust中的局部移动
  6. TensorFlow安装方法:附带坑解决办法
  7. Alpha冲刺-第七次冲刺笔记
  8. 用了Dapper之后就不要再见到SqlConnection咯
  9. python数据更新
  10. java并发编程实战《二》java内存模型