spark Master是spark集群的首脑,负责资源调度,任务分配,负载平衡等功能

以下是master启动流程概述

通过shell进行对master进行启动

首先看一下启动脚本more start-master.sh

此时我们知道最终调用的是org.apache.spark.deploy.master.Master

这是Master源码:

private[spark] object Master extends Logging {
  val systemName = "sparkMaster"
  private val actorName = "Master"

  //master启动的入口
  def main(argStrings: Array[String]) {
    SignalLogger.register(log)
    //创建SparkConf
    val conf = new SparkConf
    //保存参数到SparkConf
    val args = new MasterArguments(argStrings, conf)
    //创建ActorSystem和Actor
    val (actorSystem, _, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf)
    //等待结束
    actorSystem.awaitTermination()
  }

  /**
   * Returns an `akka.tcp://...` URL for the Master actor given a sparkUrl `spark://host:port`.
   *
   * @throws SparkException if the url is invalid
   */
  def toAkkaUrl(sparkUrl: String, protocol: String): String = {
    val (host, port) = Utils.extractHostPortFromSparkUrl(sparkUrl)
    AkkaUtils.address(protocol, systemName, host, port, actorName)
  }

  /**
   * Returns an akka `Address` for the Master actor given a sparkUrl `spark://host:port`.
   *
   * @throws SparkException if the url is invalid
   */
  def toAkkaAddress(sparkUrl: String, protocol: String): Address = {
    val (host, port) = Utils.extractHostPortFromSparkUrl(sparkUrl)
    Address(protocol, systemName, host, port)
  }

  /**
   * Start the Master and return a four tuple of:
   *   (1) The Master actor system
   *   (2) The bound port
   *   (3) The web UI bound port
   *   (4) The REST server bound port, if any
   */
  def startSystemAndActor(
      host: String,
      port: Int,
      webUiPort: Int,
      conf: SparkConf): (ActorSystem, Int, Int, Option[Int]) = {
    val securityMgr = new SecurityManager(conf)
    //利用AkkaUtils创建ActorSystem
    val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf,
      securityManager = securityMgr)
    //通过ActorSystem创建Actor -> actorSystem.actorOf, 就会执行Master的构造方法->然后执行生命周期方法
    val actor = actorSystem.actorOf(
      Props(classOf[Master], host, boundPort, webUiPort, securityMgr, conf), actorName)
    val timeout = AkkaUtils.askTimeout(conf)
    val portsRequest = actor.ask(BoundPortsRequest)(timeout)
    val portsResponse = Await.result(portsRequest, timeout).asInstanceOf[BoundPortsResponse]
    (actorSystem, boundPort, portsResponse.webUIPort, portsResponse.restPort)
  }
}

最终会通过Master的main函数进行最jvm进程启动

最新文章

  1. android 帧动画
  2. 固定Table的头部和左边的列-在Knockout Js使用场景下
  3. EF架构~EF异步改造之路~仓储接口的改造~续
  4. 《Android深度探索》(卷1)HAL与驱动开发读后感:
  5. 网站用域名能访问,用域名IP不能访问的原因分析
  6. android studio问题rendering problems no render target selected
  7. ThinkPHP中的三大自动简介
  8. php编写验证码
  9. 【CSS3】---块状元素、内联元素(又叫行内元素)和内联块状元素
  10. Git 安装与使用(二)
  11. 【Flume NG用户指南】(2)构造
  12. (转载)测试工具monkey
  13. TextView 实现跑马灯效果
  14. JVM高级特性-三、垃圾收集之判断对象存活算法
  15. canvas探照灯效果
  16. 微信小程序上的map组件bindregionchange地图视野变化函数成功回调会产生2次值的问题?
  17. linux系统ansible一键完成三大服务器基础配置(剧本)
  18. 『Python CoolBook』C扩展库_其一_用法讲解
  19. Solidity的地址 数组如何判断是否包含一个给定的地址?
  20. Java内部类详解(一)

热门文章

  1. Python服务器开发三:Socket
  2. day_04 基本数据类型的结构和使用方法
  3. ios获取系统当前日期并以一定格式显示
  4. python list颠倒写法
  5. 【shell】grep使用正则表达式
  6. 关于python pip安装第三方库 jieba 中文分词工具后提示"ImportError: cannot import name 'Random'"报错问题
  7. 更改pip源地址为阿里云
  8. opencv加椒盐噪声
  9. html body标签 语法
  10. rabbitmq 和 kafka 简单的性能测试