本篇文章讲述 Flink Application On Yarn 提交模式下,从命令提交到 AM 容器创建

1、脚本入口

flink run-application -t yarn-application hdfs:///TopSpeedWindowing.jar

以上是flink application 模式的 任务提交命令,可以发现,任务提交入口在 FLINK_HOME/bin 目录中的flink 脚本中

根据flink 脚本中的执行操作,可发现flink 脚本最终指向了 org.apache.flink.client.cli.CliFrontend 这个入口类

2、flink 程序入口类org.apache.flink.client.cli.CliFrontend

main 作为程序的入口方法,从main 方法开始进行代码跟踪

根据 CliFrontend 中的main 方法,可以发现,在执行命令前,进行了一些环境的信息输出,flink 配置加载,运行环境准备等工作,最后 执行了 parseAndRun 方法,开始执行。

进入parseAndRun 方法,发现,系统根据提交命令参数进行解析,提取执行命令的类型,根据命令类型不同,执行对应的操作。

进入 runApplication 方法 ,继续跟踪 application 模式下的任务提交逻辑

protected void runApplication(String[] args) throws Exception {
LOG.info("Running 'run-application' command."); final Options commandOptions = CliFrontendParser.getRunCommandOptions();
//todo 组装提交命令行对象
final CommandLine commandLine = getCommandLine(commandOptions, args, true); // todo 如果包含help 命令 比如 flink flink run-application --help 这种命令,那么就进行命令帮助输出
if (commandLine.hasOption(HELP_OPTION.getOpt())) {
CliFrontendParser.printHelpForRunApplication(customCommandLines);
return;
} // todo 获取激活的命令行对象
final CustomCommandLine activeCommandLine =
validateAndGetActiveCommandLine(checkNotNull(commandLine)); final ApplicationDeployer deployer =
new ApplicationClusterDeployer(clusterClientServiceLoader); final ProgramOptions programOptions;
final Configuration effectiveConfiguration; // No need to set a jarFile path for Pyflink job.
// todo 如果是 pyflink 的命令入口
if (ProgramOptionsUtils.isPythonEntryPoint(commandLine)) {
// todo 组装 pyflink 所需要的依赖配置
programOptions = ProgramOptionsUtils.createPythonProgramOptions(commandLine);
effectiveConfiguration =
getEffectiveConfiguration(
activeCommandLine,
commandLine,
programOptions,
Collections.emptyList());
} else {
// todo 组装非pyflink 程序的配置
programOptions = new ProgramOptions(commandLine);
programOptions.validate();
final URI uri = PackagedProgramUtils.resolveURI(programOptions.getJarFilePath());
effectiveConfiguration =
//todo 主要是做了将命令行中的配置覆盖 从 confDir 中读取的默认配置
getEffectiveConfiguration(
activeCommandLine,
commandLine,
programOptions,
Collections.singletonList(uri.toString()));
} final ApplicationConfiguration applicationConfiguration =
new ApplicationConfiguration(
programOptions.getProgramArgs(), programOptions.getEntryPointClassName());
// todo 部署应用
deployer.run(effectiveConfiguration, applicationConfiguration);
}

在 runApplication 方法中,程序进行了命令行对象的组装,程序运行配置的组装,然后进行任务提交,继续跟踪  deployer.run(effectiveConfiguration, applicationConfiguration);

发现在run 方法中,获取了一个集群描述器,然后进行了应用部署

由于是采用的yarn application 模式,因此使用 YarnClisterDescriptor,进入到 deployApplicationCluster 方法中,可发现 其继续进行了 部署模式校验,任务jar 的校验 、配置应用等操作,最后向集群执行 部署

在部署 flink 任务到 yarn 上时,入口程序指定的是 YarnApplicationClusterEntryPoint.class.getName()

根据  deployInternal 方法的调用,可以发现,在 yarn  application 模式下,部署并未传入 jobGraph ,此处也能说明,yarn  application 的 流图生成是在 AM 容器中完成的

在 deployInternal 方法中,执行了大量的校验工作,比如进行了  Kerberos 认证校验,yarn 的资源校验, yarn 的队列校验等等,一切校验通过后,就开始创建AM 容器

在创建AM 容器时,flink 进行了文件系统初始化 ,构造了 一个资源上传器,用于进行依赖的lib 包,配置文件的上传,上传能完成以后,设置了这些资源的classpath 信息,然后构造了AM 启动命令,由于yarn application 模式的启动入口传入的是YarnApplicationClusterEntryPoint,因此在构造 AM 中启动时运行的是 YarnApplicationClusterEntryPoint 的main 方法, 最后进行了 AM 容器提交

至此,flink 的任务终于提交到yarn 上,并开始创建AM 容器了

具体AM 容器中都干了些啥,咱们下回再说

最新文章

  1. 使用Hystrix提高系统可用性
  2. 【javascript 技巧】谈谈setTimeout的作用域以及this的指向问题
  3. Unix 复制文件至指定目录
  4. MyISAM 和InnoDB 区别 转
  5. css形状大全
  6. iOS-解决iOS8及以上设置applicationIconBadgeNumber报错的问题
  7. [转].net中的认证(authentication)与授权(authorization)
  8. poj 2632 Crashing Robots
  9. mysql view
  10. DWZ 验证 CLASS 规则
  11. [delphi技术]Delphi常见图象格式转换技术
  12. Relative与Absolute组合使用
  13. Windows Azure功能更新:弹性伸缩(autoscale)、监控报警、移动服务及网站服务商用、新的虚拟机镜像
  14. 关于redis内部的数据结构
  15. Spring中多个工程停多个资源文件ignoreUnresolvablePlaceholders配置
  16. SPOJ 839 OPTM - Optimal Marks (最小割)(权值扩大,灵活应用除和取模)
  17. HTML注释标签
  18. js页面加载完后执行(document.onreadystatechange 和 document.readyState)
  19. RedHat 6 yum 使用网易源
  20. UnicodeEncodeError: ‘ascii’ codec can’t encode

热门文章

  1. vue 修改单页标题 --- document.title
  2. java高级用法之:在JNA中将本地方法映射到JAVA代码中
  3. android软件简约记账app开发day10-主页面模块--头信息的展示,和之后功能完善的目标。
  4. Vue异步更新机制以及$nextTick原理
  5. asp.net core MVC 添加静态文件
  6. Nessus简单介绍与安装
  7. Windows平台安装SQLite3数据库
  8. 为什么 IPv6 难以取代 IPv4
  9. 第一个MVC程序
  10. maven install resources failed: newPosition < 0: (-1 < 0)