Spark 源码浅读-SparkSubmit
2024-09-05 15:57:49
Spark 源码浅读-任务提交SparkSubmit
main方法
main方法主要用于初始化日志,然后接着调用doSubmit方法。
override def main(args: Array[String]): Unit = {
/*
初始化日志
*/
val submit = new SparkSubmit() {
self => override protected def parseArguments(args: Array[String]): SparkSubmitArguments = {
new SparkSubmitArguments(args) {
override protected def logInfo(msg: => String): Unit = self.logInfo(msg) override protected def logWarning(msg: => String): Unit = self.logWarning(msg) ....... /**
*
*提交任务
*/ submit.doSubmit(args)
}
doSubmit方法
def doSubmit(args: Array[String]): Unit = {
// Initialize logging if it hasn't been done yet. Keep track of whether logging needs to
// be reset before the application starts.
val uninitLog = initializeLogIfNecessary(true, silent = true) val appArgs = parseArguments(args)
if (appArgs.verbose) {
logInfo(appArgs.toString)
}
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
case SparkSubmitAction.PRINT_VERSION => printVersion()
}
}
doSubmit方法最终会调用runMain方法
private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
/*
childArgs包含任务设置的各项参数 以及mainClass等
childMainClass 为下图 STANDALONE_CLUSTER_SUBMIT_CLASS
创建Driver 和Master通信
*/
val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
// Let the main class re-initialize the logging system once it starts.
if (uninitLog) {
Logging.uninitialize()
} if (args.verbose) {
logInfo(s"Main class:\n$childMainClass")
logInfo(s"Arguments:\n${childArgs.mkString("\n")}")
// sysProps may contain sensitive information, so redact before printing
logInfo(s"Spark config:\n${Utils.redact(sparkConf.getAll.toMap).mkString("\n")}")
logInfo(s"Classpath elements:\n${childClasspath.mkString("\n")}")
logInfo("\n")
}
val loader = getSubmitClassLoader(sparkConf)
for (jar <- childClasspath) {
addJarToClasspath(jar, loader)
} var mainClass: Class[_] = null try {
mainClass = Utils.classForName(childMainClass)
} catch {
case e: ClassNotFoundException =>
logError(s"Failed to load class $childMainClass.")
if (childMainClass.contains("thriftserver")) {
logInfo(s"Failed to load main class $childMainClass.")
logInfo("You need to build Spark with -Phive and -Phive-thriftserver.")
}
throw new SparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS)
case e: NoClassDefFoundError =>
logError(s"Failed to load $childMainClass: ${e.getMessage()}")
if (e.getMessage.contains("org/apache/hadoop/hive")) {
logInfo(s"Failed to load hive class.")
logInfo("You need to build Spark with -Phive and -Phive-thriftserver.")
}
throw new SparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS)
}
最新文章
- 由js apply与call方法想到的js数据类型(原始类型和引用类型)
- adminLTE的自动化菜单
- DIP依赖倒置原则
- FreeBSD-安装与配置(10.3@VMware)
- 安装pyspider
- ACM: hihicoder #1174 : 拓扑排序&#183;一 STL- queue
- 用 xampp 在 windows/Linux 下搭建代理服务器
- springMVC之HelloWorld
- Jquery 扩展方法
- 改变UIView 的位置 Center和Frame
- ubuntu设置ip和dns
- 再次轻度破解EXE文件
- HDFS Safemode问题
- CodeForces 721A
- java udp socket(双通信)
- Kotlin实现《第一行代码》案例“酷欧天气”
- EffectiveJava阅读笔记(一)
- zabbix之 自定义内存使用率监控报警
- 修复cocos2dx的Label,WP8下不能换行的问题
- fis入门-单文件编译之文件优化(optimize)