YARN分析系列之三 -- 从脚本入口分析 ResourceManager的初始化过程
由脚本找到 RM 主类
这部分,我们从脚本作为入口去逐步深入ResourceManager源码。
从 Hadoop 官方文档 中可以看到 ResourceManager 的启动命令为:
Usage: yarn resourcemanager [-format-state-store]
COMMAND_OPTIONS | Description |
---|---|
-format-state-store | Formats the RMStateStore. This will clear the RMStateStore and is useful if past applications are no longer needed. This should be run only when the ResourceManager is not running. |
-remove-application-from-state-store <appId> | Remove the application from RMStateStore. This should be run only when the ResourceManager is not running. |
定位到 源代码 hadoop-yarn-project > hadoop-yarn > bin > start-yarn.sh
# start resourceManager
HARM=$("${HADOOP_HDFS_HOME}/bin/hdfs" getconf -confKey yarn.resourcemanager.ha.enabled >&-) # 查看配置,是否启用 ResourceManager 的 HA 机制
# 未启用 ResourceManager 的 HA 机制
if [[ ${HARM} = "false" ]]; then
echo "Starting resourcemanager"
hadoop_uservar_su yarn resourcemanager "${HADOOP_YARN_HOME}/bin/yarn" \
--config "${HADOOP_CONF_DIR}" \
--daemon start \
resourcemanager
(( HADOOP_JUMBO_RETCOUNTER=HADOOP_JUMBO_RETCOUNTER + $? ))
else # 启用ResourceManager的 HA 机制
logicals=$("${HADOOP_HDFS_HOME}/bin/hdfs" getconf -confKey yarn.resourcemanager.ha.rm-ids >&-) # yarn.resoucemanager.ha.rm-ids 表示 RM 的逻辑Ids,多个按逗号分割
logicals=${logicals//,/ } # 按逗号分割成多个 RM id
for id in ${logicals}
do
rmhost=$("${HADOOP_HDFS_HOME}/bin/hdfs" getconf -confKey "yarn.resourcemanager.hostname.${id}" >&-)
RMHOSTS="${RMHOSTS} ${rmhost}" # 最终,RMHOSTS 变量会是由空格分割的 hostname 字符串
done
echo "Starting resourcemanagers on [${RMHOSTS}]"
hadoop_uservar_su yarn resourcemanager "${HADOOP_YARN_HOME}/bin/yarn" \ # 运行 yarn 命令
--config "${HADOOP_CONF_DIR}" \
--daemon start \
--workers \
--hostnames "${RMHOSTS}" \
resourcemanager
(( HADOOP_JUMBO_RETCOUNTER=HADOOP_JUMBO_RETCOUNTER + $? )) # 累加上一个命令的返回值
fi
首先解释 shell 分割字符串的语法:
$ aa='1,2,3';for i in ${aa//,/ }; do echo $i; done;
1
2
3
参照 官方的配置sample 会比较容易理解,下面已经启用了HA,并且 RM ids 有 rm1,rm2, 其中rm1 的hostname 是 master1, rm2 的 hostname 是 master2,:
<property>
<name>yarn.resourcemanager.ha.enabled</name>
<value>true</value>
</property>
<property>
<name>yarn.resourcemanager.cluster-id</name>
<value>cluster1</value>
</property>
<property>
<name>yarn.resourcemanager.ha.rm-ids</name>
<value>rm1,rm2</value>
</property>
<property>
<name>yarn.resourcemanager.hostname.rm1</name>
<value>master1</value>
</property>
<property>
<name>yarn.resourcemanager.hostname.rm2</name>
<value>master2</value>
</property>
<property>
<name>yarn.resourcemanager.zk-address</name>
<value>zk1:2181,zk2:2181,zk3:2181</value>
</property>
然后再结合 yarn 脚本,可以得出,resourcemanager 的 入口类是 org.apache.hadoop.yarn.server.resourcemanager.ResourceManager,参数为 --config "${HADOOP_CONF_DIR}" --daemon start --workers --hostnames "${RMHOSTS}" 以及经由 shell函数 传递的参数值(不做具体分析)
分析 RM 服务初始化过程
分析ResouceManager 类继承关系
接下来,终于到了入口类 org.apache.hadoop.yarn.server.resourcemanager.ResourceManager, 该类在 hadoop-yarn-server-resourcemanager 的子 mudule 下。
先来看 RM 对象的 声明, 继承了 CompositeService 服务类,说明 RM 是一个组件服务,实现了ResourceManagerMXBean接口,可以交给 JMX 管理:
public class ResourceManager extends CompositeService
implements Recoverable, ResourceManagerMXBean
分析 ResourceManager 的入口函数
然后,找到 Main 函数:
public static void main(String argv[]) {
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
StringUtils.startupShutdownMessage(ResourceManager.class, argv, LOG);
try {
Configuration conf = new YarnConfiguration();
GenericOptionsParser hParser = new GenericOptionsParser(conf, argv); # 解析参数
argv = hParser.getRemainingArgs(); # --参数名 参数值之外的剩余以"-"开头的参数,第一次,没有指定剩余参数
// If -format-state-store, then delete RMStateStore; else startup normally
if (argv.length >= 1) {
if (argv[0].equals("-format-state-store")) {
deleteRMStateStore(conf);
} else if (argv[0].equals("-remove-application-from-state-store")
&& argv.length == 2) {
removeApplication(conf, argv[1]);
} else {
printUsage(System.err);
}
} else {
ResourceManager resourceManager = new ResourceManager();
// 初始化RM对象实例,在超类中初始化服务名称为 “ResouceManager” ,并实例化了状态模型成员字段 stateModel,初始化状态为 Service.State.NOTINITED ,后面详细介绍
ShutdownHookManager.get().addShutdownHook( // 添加服务组件关闭的回调函数
new CompositeServiceShutdownHook(resourceManager),
SHUTDOWN_HOOK_PRIORITY);
resourceManager.init(conf); // 初始化 RM 服务
resourceManager.start(); // 启动 RM 服务
}
} catch (Throwable t) {
LOG.fatal("Error starting ResourceManager", t);
System.exit(-1);
}
}
分析 ResourceManager的 初始化过程
@Override // 定义在其父类 AbstractService 中
public void init(Configuration conf) {
if (conf == null) {
throw new ServiceStateException("Cannot initialize service "
+ getName() + ": null configuration");
}
if (isInState(STATE.INITED)) {
return;
}
synchronized (stateChangeLock) {
if (enterState(STATE.INITED) != STATE.INITED) { // 服务没有没有被初始化过
setConfig(conf); // 设值 conf 对象
try {
serviceInit(config); // 初始化服务
if (isInState(STATE.INITED)) { // 如果服务正确初始化
//if the service ended up here during init,
//notify the listeners
notifyListeners(); // 通知 listener
}
} catch (Exception e) {
noteFailure(e);
ServiceOperations.stopQuietly(LOG, this);
throw ServiceStateException.convert(e);
}
}
}
}
serviceInit 方法在 ResouceManager 类中有实现:
@Override
protected void serviceInit(Configuration conf) throws Exception {
this.conf = conf;
// 1. 初始化服务上下文
// RMContextImpl 保存了两类服务的上下文
// 一类是 serviceContext : 这类服务是 Always On 服务,即不考虑HA状态的一直运行的服务
// 一类是 activeServiceCotext : 活动的服务上下文,即需要运行在Active RM 节点上的服务
this.rmContext = new RMContextImpl();
rmContext.setResourceManager(this); // 2. 设置配置的provider
this.configurationProvider =
ConfigurationProviderFactory.getConfigurationProvider(conf);
this.configurationProvider.init(this.conf);
rmContext.setConfigurationProvider(configurationProvider); // 3.加载 core-site.xml
loadConfigurationXml(YarnConfiguration.CORE_SITE_CONFIGURATION_FILE); // Do refreshSuperUserGroupsConfiguration with loaded core-site.xml
// Or use RM specific configurations to overwrite the common ones first
// if they exist
RMServerUtils.processRMProxyUsersConf(conf);
ProxyUsers.refreshSuperUserGroupsConfiguration(this.conf); // 4. 加载 yarn-site.xml
loadConfigurationXml(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
// 5. 配置校验
validateConfigs(this.conf); // 6. login
// Set HA configuration should be done before login
this.rmContext.setHAEnabled(HAUtil.isHAEnabled(this.conf));
if (this.rmContext.isHAEnabled()) { // 如果RM 启用了 HA,设置 HA 的配置
HAUtil.verifyAndSetConfiguration(this.conf);
} // Set UGI and do login
// If security is enabled, use login user
// If security is not enabled, use current user
// 如果是启用了 安全认证,比如 kerberos,使用kerberos 登陆用户,否则默认使用当前用户
this.rmLoginUGI = UserGroupInformation.getCurrentUser();
try {
doSecureLogin();
} catch(IOException ie) {
throw new YarnRuntimeException("Failed to login", ie);
} // register the handlers for all AlwaysOn services using setupDispatcher().
// 7. 初始化所有的一直运行的服务的事件的handler
rmDispatcher = setupDispatcher();
addIfService(rmDispatcher);
rmContext.setDispatcher(rmDispatcher); // The order of services below should not be changed as services will be
// started in same order
// As elector service needs admin service to be initialized and started,
// first we add admin service then elector service
// 8. 创建 AdminService
adminService = createAdminService();
addService(adminService);
rmContext.setRMAdminService(adminService); // elector must be added post adminservice
if (this.rmContext.isHAEnabled()) {
// If the RM is configured to use an embedded leader elector,
// initialize the leader elector.
if (HAUtil.isAutomaticFailoverEnabled(conf)
&& HAUtil.isAutomaticFailoverEmbedded(conf)) {
EmbeddedElector elector = createEmbeddedElector();
addIfService(elector);
rmContext.setLeaderElectorService(elector);
}
} // 9. 设置 Yarn Configuration
rmContext.setYarnConfiguration(conf);
// 10. 创建并初始化 Active Service
createAndInitActiveServices(false); // 11. 获取 yarn wenApp地址
webAppAddress = WebAppUtils.getWebAppBindURL(this.conf,
YarnConfiguration.RM_BIND_HOST,
WebAppUtils.getRMWebAppURLWithoutScheme(this.conf)); // 12. 创建 RMApplicationHistoryWriter 服务
RMApplicationHistoryWriter rmApplicationHistoryWriter =
createRMApplicationHistoryWriter();
addService(rmApplicationHistoryWriter);
rmContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter); // initialize the RM timeline collector first so that the system metrics
// publisher can bind to it
// 13. 创建 RM timeline collector
if (YarnConfiguration.timelineServiceV2Enabled(this.conf)) {
RMTimelineCollectorManager timelineCollectorManager =
createRMTimelineCollectorManager();
addService(timelineCollectorManager);
rmContext.setRMTimelineCollectorManager(timelineCollectorManager);
} // 14. 设置 SystemMetricsPublisher
SystemMetricsPublisher systemMetricsPublisher =
createSystemMetricsPublisher();
addIfService(systemMetricsPublisher);
rmContext.setSystemMetricsPublisher(systemMetricsPublisher); // 15. 注册 JMX
registerMXBean();
// 16. 调用父类的服务 init 方法
super.serviceInit(this.conf);
}
下面逐一查看初始化的各个子步骤
初始化服务上下文
public RMContextImpl() {
// 一直运行的服务上下文
this.serviceContext = new RMServiceContext();
// 只运行在 active RM 节点上的 上下文
this.activeServiceContext = new RMActiveServiceContext();
}
设置配置的 provider
这里使用了工厂模式和配置提供了默认的ConfigurationProvider ,并且用户可以实现 ConfigurationProvider 自定义 provider。
provider 其实在其他的源码中也经常用到。在这里,provider 提供了可以做一些内部的初始化以及返回 配置文件的 inputstream 流对象,关闭流对象等操作。对于处理解析配置的类来说,只需要一个输入流即可。
// ConfigurationProviderFactory 是一个工厂类
/**
* Creates an instance of {@link ConfigurationProvider} using given
* configuration.
* @param bootstrapConf
* @return configurationProvider
*/
@SuppressWarnings("unchecked")
public static ConfigurationProvider
getConfigurationProvider(Configuration bootstrapConf) {
Class<? extends ConfigurationProvider> defaultProviderClass;
try {
// 默认的 provider class 是org.apache.hadoop.yarn.LocalConfigurationProvider
defaultProviderClass = (Class<? extends ConfigurationProvider>)
Class.forName(
YarnConfiguration.DEFAULT_RM_CONFIGURATION_PROVIDER_CLASS);
} catch (Exception e) {
throw new YarnRuntimeException(
"Invalid default configuration provider class"
+ YarnConfiguration.DEFAULT_RM_CONFIGURATION_PROVIDER_CLASS, e);
}
ConfigurationProvider configurationProvider =
// 从缓存池中获取到该类的 构造方法,然后根据构造方法反射得到 provider实例
// 可以 通过 yarn.resourcemanager.configuration.provider-class 参数指定 provider
ReflectionUtils.newInstance(bootstrapConf.getClass(
YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
defaultProviderClass, ConfigurationProvider.class),
bootstrapConf);
return configurationProvider;
}
加载 core-site.xml 文件
private void loadConfigurationXml(String configurationFile)
throws YarnException, IOException {
InputStream configurationInputStream =
this.configurationProvider.getConfigurationInputStream(this.conf,
configurationFile);
if (configurationInputStream != null) {
this.conf.addResource(configurationInputStream, configurationFile);
}
}
加载 yarn-site.xml
跟加载 core-site.xml 文件操作类似
校验配置文件
主要校验 最大尝试次数 和 过期会话时长 和 心跳间隔的关系
protected static void validateConfigs(Configuration conf) {
// validate max-attempts
int globalMaxAppAttempts =
conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
if (globalMaxAppAttempts <= 0) {
throw new YarnRuntimeException("Invalid global max attempts configuration"
+ ", " + YarnConfiguration.RM_AM_MAX_ATTEMPTS
+ "=" + globalMaxAppAttempts + ", it should be a positive integer.");
} // validate expireIntvl >= heartbeatIntvl
long expireIntvl = conf.getLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
long heartbeatIntvl =
conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS);
if (expireIntvl < heartbeatIntvl) {
throw new YarnRuntimeException("Nodemanager expiry interval should be no"
+ " less than heartbeat interval, "
+ YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS + "=" + expireIntvl
+ ", " + YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS + "="
+ heartbeatIntvl);
}
}
用户登陆
第一步:校验是否启用了HA, 如果启用了HA,需要配置HA 的相关信息,因为 用户登陆,是每个节点都需要登陆的。
第二步:获取当前的用户, 如果启用了 kerberos,那么是当前登陆kerberos的用户,否则是当前用户
@InterfaceAudience.Public
@InterfaceStability.Evolving
public static UserGroupInformation getCurrentUser() throws IOException {
AccessControlContext context = AccessController.getContext();
Subject subject = Subject.getSubject(context);
if (subject == null || subject.getPrincipals(User.class).isEmpty()) {
return getLoginUser();
} else {
return new UserGroupInformation(subject);
}
}
第三步: 调用安全API登陆,并获取登陆用户
protected void doSecureLogin() throws IOException {
InetSocketAddress socAddr = getBindAddress(conf);
SecurityUtil.login(this.conf, YarnConfiguration.RM_KEYTAB,
YarnConfiguration.RM_PRINCIPAL, socAddr.getHostName()); // if security is enable, set rmLoginUGI as UGI of loginUser
if (UserGroupInformation.isSecurityEnabled()) {
this.rmLoginUGI = UserGroupInformation.getLoginUser();
}
}
初始化所有一直运行的服务事件的handler
private Dispatcher setupDispatcher() {
// 创建 dispatcher
Dispatcher dispatcher = createDispatcher();
// 将 RMFatalEventType 事件的handler RMFatalEventDispatcher
// 注册到 dispatcher
dispatcher.register(RMFatalEventType.class,
new ResourceManager.RMFatalEventDispatcher());
return dispatcher;
} protected Dispatcher createDispatcher() {
return new AsyncDispatcher("RM Event dispatcher");
}
AsyncDispatcher 内部是 有一个 阻塞的 事件队列,有一个一直运行的 执行线程,当阻塞队列中有事件被放入,执行线程会把事件取出来,并获取事件的类型,从事件注册器Map<Class<? extends Enum>, EventHandler>中 获取到对应的 EventHandler 对象,并调用 该对象的 dispatch 方法。这样就完成了一次异步事件调用。
创建 AdminService
protected AdminService createAdminService() {
return new AdminService(this);
}
设置 Yarn Configuration
rmContext.setYarnConfiguration(conf);
// 调用了
public void setYarnConfiguration(Configuration yarnConfiguration) {
serviceContext.setYarnConfiguration(yarnConfiguration);
}
创建并初始化 Active Service
protected void createAndInitActiveServices(boolean fromActive) {
activeServices = new RMActiveServices(this);
activeServices.fromActive = fromActive;
activeServices.init(conf);
}
// 其中,init 方法如下
@Override
public void init(Configuration conf) {
if (conf == null) {
throw new ServiceStateException("Cannot initialize service "
+ getName() + ": null configuration");
}
if (isInState(STATE.INITED)) {
return;
}
synchronized (stateChangeLock) {
if (enterState(STATE.INITED) != STATE.INITED) {
setConfig(conf);
try {
serviceInit(config);
if (isInState(STATE.INITED)) {
//if the service ended up here during init,
//notify the listeners
notifyListeners();
}
} catch (Exception e) {
noteFailure(e);
ServiceOperations.stopQuietly(LOG, this);
throw ServiceStateException.convert(e);
}
}
}
}
// 调用的 serviceInit 方法如下,后面具体分析 @Override
protected void serviceInit(Configuration configuration) throws Exception {
standByTransitionRunnable = new StandByTransitionRunnable(); rmSecretManagerService = createRMSecretManagerService();
addService(rmSecretManagerService); containerAllocationExpirer = new ContainerAllocationExpirer(rmDispatcher);
addService(containerAllocationExpirer);
rmContext.setContainerAllocationExpirer(containerAllocationExpirer); AMLivelinessMonitor amLivelinessMonitor = createAMLivelinessMonitor();
addService(amLivelinessMonitor);
rmContext.setAMLivelinessMonitor(amLivelinessMonitor); AMLivelinessMonitor amFinishingMonitor = createAMLivelinessMonitor();
addService(amFinishingMonitor);
rmContext.setAMFinishingMonitor(amFinishingMonitor); RMAppLifetimeMonitor rmAppLifetimeMonitor = createRMAppLifetimeMonitor();
addService(rmAppLifetimeMonitor);
rmContext.setRMAppLifetimeMonitor(rmAppLifetimeMonitor); RMNodeLabelsManager nlm = createNodeLabelManager();
nlm.setRMContext(rmContext);
addService(nlm);
rmContext.setNodeLabelManager(nlm); AllocationTagsManager allocationTagsManager =
createAllocationTagsManager();
rmContext.setAllocationTagsManager(allocationTagsManager); PlacementConstraintManagerService placementConstraintManager =
createPlacementConstraintManager();
addService(placementConstraintManager);
rmContext.setPlacementConstraintManager(placementConstraintManager); // add resource profiles here because it's used by AbstractYarnScheduler
ResourceProfilesManager resourceProfilesManager =
createResourceProfileManager();
resourceProfilesManager.init(conf);
rmContext.setResourceProfilesManager(resourceProfilesManager); RMDelegatedNodeLabelsUpdater delegatedNodeLabelsUpdater =
createRMDelegatedNodeLabelsUpdater();
if (delegatedNodeLabelsUpdater != null) {
addService(delegatedNodeLabelsUpdater);
rmContext.setRMDelegatedNodeLabelsUpdater(delegatedNodeLabelsUpdater);
} recoveryEnabled = conf.getBoolean(YarnConfiguration.RECOVERY_ENABLED,
YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED); RMStateStore rmStore = null;
if (recoveryEnabled) {
rmStore = RMStateStoreFactory.getStore(conf);
boolean isWorkPreservingRecoveryEnabled =
conf.getBoolean(
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED);
rmContext
.setWorkPreservingRecoveryEnabled(isWorkPreservingRecoveryEnabled);
} else {
rmStore = new NullRMStateStore();
} try {
rmStore.setResourceManager(rm);
rmStore.init(conf);
rmStore.setRMDispatcher(rmDispatcher);
} catch (Exception e) {
// the Exception from stateStore.init() needs to be handled for
// HA and we need to give up master status if we got fenced
LOG.error("Failed to init state store", e);
throw e;
}
rmContext.setStateStore(rmStore); if (UserGroupInformation.isSecurityEnabled()) {
delegationTokenRenewer = createDelegationTokenRenewer();
rmContext.setDelegationTokenRenewer(delegationTokenRenewer);
} // Register event handler for NodesListManager
nodesListManager = new NodesListManager(rmContext);
rmDispatcher.register(NodesListManagerEventType.class, nodesListManager);
addService(nodesListManager);
rmContext.setNodesListManager(nodesListManager); // Initialize the scheduler
scheduler = createScheduler();
scheduler.setRMContext(rmContext);
addIfService(scheduler);
rmContext.setScheduler(scheduler); schedulerDispatcher = createSchedulerEventDispatcher();
addIfService(schedulerDispatcher);
rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher); // Register event handler for RmAppEvents
rmDispatcher.register(RMAppEventType.class,
new ApplicationEventDispatcher(rmContext)); // Register event handler for RmAppAttemptEvents
rmDispatcher.register(RMAppAttemptEventType.class,
new ApplicationAttemptEventDispatcher(rmContext)); // Register event handler for RmNodes
rmDispatcher.register(
RMNodeEventType.class, new NodeEventDispatcher(rmContext)); nmLivelinessMonitor = createNMLivelinessMonitor();
addService(nmLivelinessMonitor); resourceTracker = createResourceTrackerService();
addService(resourceTracker);
rmContext.setResourceTrackerService(resourceTracker); MetricsSystem ms = DefaultMetricsSystem.initialize("ResourceManager");
if (fromActive) {
JvmMetrics.reattach(ms, jvmMetrics);
UserGroupInformation.reattachMetrics();
} else {
jvmMetrics = JvmMetrics.initSingleton("ResourceManager", null);
} JvmPauseMonitor pauseMonitor = new JvmPauseMonitor();
addService(pauseMonitor);
jvmMetrics.setPauseMonitor(pauseMonitor); // Initialize the Reservation system
if (conf.getBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE,
YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_ENABLE)) {
reservationSystem = createReservationSystem();
if (reservationSystem != null) {
reservationSystem.setRMContext(rmContext);
addIfService(reservationSystem);
rmContext.setReservationSystem(reservationSystem);
LOG.info("Initialized Reservation system");
}
} masterService = createApplicationMasterService();
createAndRegisterOpportunisticDispatcher(masterService);
addService(masterService) ;
rmContext.setApplicationMasterService(masterService); applicationACLsManager = new ApplicationACLsManager(conf); queueACLsManager = createQueueACLsManager(scheduler, conf); rmAppManager = createRMAppManager();
// Register event handler for RMAppManagerEvents
rmDispatcher.register(RMAppManagerEventType.class, rmAppManager); clientRM = createClientRMService();
addService(clientRM);
rmContext.setClientRMService(clientRM); applicationMasterLauncher = createAMLauncher();
rmDispatcher.register(AMLauncherEventType.class,
applicationMasterLauncher); addService(applicationMasterLauncher);
if (UserGroupInformation.isSecurityEnabled()) {
addService(delegationTokenRenewer);
delegationTokenRenewer.setRMContext(rmContext);
} if(HAUtil.isFederationEnabled(conf)) {
String cId = YarnConfiguration.getClusterId(conf);
if (cId.isEmpty()) {
String errMsg =
"Cannot initialize RM as Federation is enabled"
+ " but cluster id is not configured.";
LOG.error(errMsg);
throw new YarnRuntimeException(errMsg);
}
federationStateStoreService = createFederationStateStoreService();
addIfService(federationStateStoreService);
LOG.info("Initialized Federation membership.");
} new RMNMInfo(rmContext, scheduler); if (conf.getBoolean(YarnConfiguration.YARN_API_SERVICES_ENABLE,
false)) {
SystemServiceManager systemServiceManager = createServiceManager();
addIfService(systemServiceManager);
} super.serviceInit(conf);
}
获取 yarn wenApp地址
// yarn.resourcemanager.bind-host 可以根据这个参数来动态指定 RM HOST
webAppAddress = WebAppUtils.getWebAppBindURL(this.conf,
YarnConfiguration.RM_BIND_HOST,
WebAppUtils.getRMWebAppURLWithoutScheme(this.conf));
创建 RMApplicationHistoryWriter 服务
protected RMApplicationHistoryWriter createRMApplicationHistoryWriter() {
return new RMApplicationHistoryWriter();
} RMApplicationHistoryWriter rmApplicationHistoryWriter =
createRMApplicationHistoryWriter();
addService(rmApplicationHistoryWriter);
rmContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
创建 RM timeline collector
private RMTimelineCollectorManager createRMTimelineCollectorManager() {
return new RMTimelineCollectorManager(this);
} if (YarnConfiguration.timelineServiceV2Enabled(this.conf)) {
RMTimelineCollectorManager timelineCollectorManager =
createRMTimelineCollectorManager();
addService(timelineCollectorManager);
rmContext.setRMTimelineCollectorManager(timelineCollectorManager);
}
设置 SystemMetricsPublisher
protected SystemMetricsPublisher createSystemMetricsPublisher() {
List<SystemMetricsPublisher> publishers =
new ArrayList<SystemMetricsPublisher>();
// 使用 v1
if (YarnConfiguration.timelineServiceV1Enabled(conf)) {
SystemMetricsPublisher publisherV1 = new TimelineServiceV1Publisher();
publishers.add(publisherV1);
}
// 使用 v2
if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
// we're dealing with the v.2.x publisher
LOG.info("system metrics publisher with the timeline service V2 is "
+ "configured");
SystemMetricsPublisher publisherV2 = new TimelineServiceV2Publisher(
rmContext.getRMTimelineCollectorManager());
publishers.add(publisherV2);
}
// 如果没有 publisher, 给一个 空的 publisher,这里运用了null object 模式,防止了空指针的出现。
if (publishers.isEmpty()) {
LOG.info("TimelineServicePublisher is not configured");
SystemMetricsPublisher noopPublisher = new NoOpSystemMetricPublisher();
publishers.add(noopPublisher);
} for (SystemMetricsPublisher publisher : publishers) {
addIfService(publisher);
} SystemMetricsPublisher combinedPublisher =
new CombinedSystemMetricsPublisher(publishers);
return combinedPublisher;
}
注册 JMX
/**
* Register ResourceManagerMXBean.
*/
private void registerMXBean() {
MBeans.register("ResourceManager", "ResourceManager", this);
}
调用父类的服务 init 方法
// 在这里,之前初始化过程中创建的任何被加入到服务列表中的服务,都会被初始化。
protected void serviceInit(Configuration conf) throws Exception {
List<Service> services = getServices();
if (LOG.isDebugEnabled()) {
LOG.debug(getName() + ": initing services, size=" + services.size());
}
for (Service service : services) {
service.init(conf);
}
super.serviceInit(conf);
}
// 奇怪,为什么不直接 返回呢?ArrayList 的构造方法里面做的事就是 Arrays.copyOf 的工作(浅拷贝),防止了外部应用更新或删除服务列表。这是一个建议的做法,还可以返回一个 iterator 对象
public List<Service> getServices() {
synchronized (serviceList) {
return new ArrayList<Service>(serviceList);
}
}
至此,初始化的大致代码,基本上走完了,后续涉及到哪部分代码,再回来具体看。
最新文章
- JavaScript作用域闭包简述
- js页面跳转整理(转载未整理)
- SSH框架整合配置所需JAR包(SSH整合)
- HashMap优雅的初始化方式以及引申
- edge.js架起node.js和.net互操作桥梁
- 第4章 管道与FIFO
- C#中使用Bogus创建模拟数据
- 三、Dockerfile的说明和编写
- 从零开始学python
- label 的for属性总结
- MySQL 使用join操作时出现重复数据
- Spring Boot 构建电商基础秒杀项目 (五) 用户注册
- 洛谷 P1162 填涂颜色
- Python学习之旅(三十三)
- iOS10 UIImageWriteToSavedPhotosAlbum程序奔溃crash解决办法
- 在移动端如何用swiper实现导航栏效果
- POJ 1330 Nearest Common Ancestors(LCA Tarjan算法)
- NodeJS遍历文件生产文件列表
- bzoj 1036: [ZJOI2008]树的统计Count 树链剖分+线段树
- Cordova 混合开发
热门文章
- [Example of Sklearn] - Example
- ASP.NET Page执行顺序(ASP.NET生命周期)
- intraweb 11.0.63 for delphi7 破解
- AI2XAML's Bug
- C#中的String.Format介绍
- WPF内实现与串口发送数据和接收数据
- 零元学Expression Design 4 - Chapter 7 使用内建功能「Clone」来达成Path的影分身之术
- WPF 自定义的图表(适用大量数据绘制)下
- leetcode先刷_Pascal&;#39;s Triangle II
- html5中 table数据导出到excel文件