由脚本找到 RM 主类

  这部分,我们从脚本作为入口去逐步深入ResourceManager源码。

  从 Hadoop 官方文档 中可以看到 ResourceManager 的启动命令为:

  Usage: yarn resourcemanager [-format-state-store]

COMMAND_OPTIONS Description
-format-state-store Formats the RMStateStore. This will clear the RMStateStore and is useful if past applications are no longer needed. This should be run only when the ResourceManager is not running.
-remove-application-from-state-store <appId> Remove the application from RMStateStore. This should be run only when the ResourceManager is not running.

定位到 源代码 hadoop-yarn-project > hadoop-yarn > bin > start-yarn.sh

# start resourceManager
HARM=$("${HADOOP_HDFS_HOME}/bin/hdfs" getconf -confKey yarn.resourcemanager.ha.enabled >&-) # 查看配置,是否启用 ResourceManager 的 HA 机制
# 未启用 ResourceManager 的 HA 机制
if [[ ${HARM} = "false" ]]; then
echo "Starting resourcemanager"
hadoop_uservar_su yarn resourcemanager "${HADOOP_YARN_HOME}/bin/yarn" \
--config "${HADOOP_CONF_DIR}" \
--daemon start \
resourcemanager
(( HADOOP_JUMBO_RETCOUNTER=HADOOP_JUMBO_RETCOUNTER + $? ))
else # 启用ResourceManager的 HA 机制
logicals=$("${HADOOP_HDFS_HOME}/bin/hdfs" getconf -confKey yarn.resourcemanager.ha.rm-ids >&-) # yarn.resoucemanager.ha.rm-ids 表示 RM 的逻辑Ids,多个按逗号分割
logicals=${logicals//,/ } # 按逗号分割成多个 RM id
for id in ${logicals}
do
rmhost=$("${HADOOP_HDFS_HOME}/bin/hdfs" getconf -confKey "yarn.resourcemanager.hostname.${id}" >&-)
RMHOSTS="${RMHOSTS} ${rmhost}" # 最终,RMHOSTS 变量会是由空格分割的 hostname 字符串
done
echo "Starting resourcemanagers on [${RMHOSTS}]"
hadoop_uservar_su yarn resourcemanager "${HADOOP_YARN_HOME}/bin/yarn" \ # 运行 yarn 命令
--config "${HADOOP_CONF_DIR}" \
--daemon start \
--workers \
--hostnames "${RMHOSTS}" \
resourcemanager
(( HADOOP_JUMBO_RETCOUNTER=HADOOP_JUMBO_RETCOUNTER + $? )) # 累加上一个命令的返回值
fi

首先解释 shell 分割字符串的语法:

$ aa='1,2,3';for i in ${aa//,/ }; do echo $i; done;
1
2
3

参照 官方的配置sample 会比较容易理解,下面已经启用了HA,并且 RM ids 有 rm1,rm2, 其中rm1 的hostname 是 master1, rm2 的 hostname 是 master2,:

<property>
<name>yarn.resourcemanager.ha.enabled</name>
<value>true</value>
</property>
<property>
<name>yarn.resourcemanager.cluster-id</name>
<value>cluster1</value>
</property>
<property>
<name>yarn.resourcemanager.ha.rm-ids</name>
<value>rm1,rm2</value>
</property>
<property>
<name>yarn.resourcemanager.hostname.rm1</name>
<value>master1</value>
</property>
<property>
<name>yarn.resourcemanager.hostname.rm2</name>
<value>master2</value>
</property>
<property>
<name>yarn.resourcemanager.zk-address</name>
<value>zk1:2181,zk2:2181,zk3:2181</value>
</property>

然后再结合 yarn 脚本,可以得出,resourcemanager 的 入口类是 org.apache.hadoop.yarn.server.resourcemanager.ResourceManager,参数为 --config "${HADOOP_CONF_DIR}" --daemon start --workers --hostnames "${RMHOSTS}" 以及经由 shell函数 传递的参数值(不做具体分析)

分析 RM 服务初始化过程

分析ResouceManager 类继承关系

接下来,终于到了入口类 org.apache.hadoop.yarn.server.resourcemanager.ResourceManager, 该类在 hadoop-yarn-server-resourcemanager 的子 mudule 下。

先来看 RM 对象的 声明, 继承了 CompositeService 服务类,说明 RM 是一个组件服务,实现了ResourceManagerMXBean接口,可以交给 JMX 管理:

public class ResourceManager extends CompositeService
implements Recoverable, ResourceManagerMXBean

分析 ResourceManager 的入口函数

然后,找到 Main 函数:

public static void main(String argv[]) {
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
StringUtils.startupShutdownMessage(ResourceManager.class, argv, LOG);
try {
Configuration conf = new YarnConfiguration();
GenericOptionsParser hParser = new GenericOptionsParser(conf, argv); # 解析参数
argv = hParser.getRemainingArgs(); # --参数名 参数值之外的剩余以"-"开头的参数,第一次,没有指定剩余参数
// If -format-state-store, then delete RMStateStore; else startup normally
if (argv.length >= 1) {
if (argv[0].equals("-format-state-store")) {
deleteRMStateStore(conf);
} else if (argv[0].equals("-remove-application-from-state-store")
&& argv.length == 2) {
removeApplication(conf, argv[1]);
} else {
printUsage(System.err);
}
} else {
ResourceManager resourceManager = new ResourceManager();
    // 初始化RM对象实例,在超类中初始化服务名称为 “ResouceManager” ,并实例化了状态模型成员字段 stateModel,初始化状态为 Service.State.NOTINITED ,后面详细介绍
ShutdownHookManager.get().addShutdownHook( // 添加服务组件关闭的回调函数
new CompositeServiceShutdownHook(resourceManager),
SHUTDOWN_HOOK_PRIORITY);
resourceManager.init(conf); // 初始化 RM 服务
resourceManager.start(); // 启动 RM 服务
}
} catch (Throwable t) {
LOG.fatal("Error starting ResourceManager", t);
System.exit(-1);
}
}

分析 ResourceManager的 初始化过程

@Override // 定义在其父类 AbstractService 中
public void init(Configuration conf) {
if (conf == null) {
throw new ServiceStateException("Cannot initialize service "
+ getName() + ": null configuration");
}
if (isInState(STATE.INITED)) {
return;
}
synchronized (stateChangeLock) {
if (enterState(STATE.INITED) != STATE.INITED) { // 服务没有没有被初始化过
setConfig(conf); // 设值 conf 对象
try {
serviceInit(config); // 初始化服务
if (isInState(STATE.INITED)) { // 如果服务正确初始化
//if the service ended up here during init,
//notify the listeners
notifyListeners(); // 通知 listener
}
} catch (Exception e) {
noteFailure(e);
ServiceOperations.stopQuietly(LOG, this);
throw ServiceStateException.convert(e);
}
}
}
}

serviceInit 方法在 ResouceManager 类中有实现:

@Override
protected void serviceInit(Configuration conf) throws Exception {
this.conf = conf;
// 1. 初始化服务上下文
// RMContextImpl 保存了两类服务的上下文
// 一类是 serviceContext : 这类服务是 Always On 服务,即不考虑HA状态的一直运行的服务
// 一类是 activeServiceCotext : 活动的服务上下文,即需要运行在Active RM 节点上的服务
this.rmContext = new RMContextImpl();
rmContext.setResourceManager(this); // 2. 设置配置的provider
this.configurationProvider =
ConfigurationProviderFactory.getConfigurationProvider(conf);
this.configurationProvider.init(this.conf);
rmContext.setConfigurationProvider(configurationProvider); // 3.加载 core-site.xml
loadConfigurationXml(YarnConfiguration.CORE_SITE_CONFIGURATION_FILE); // Do refreshSuperUserGroupsConfiguration with loaded core-site.xml
// Or use RM specific configurations to overwrite the common ones first
// if they exist
RMServerUtils.processRMProxyUsersConf(conf);
ProxyUsers.refreshSuperUserGroupsConfiguration(this.conf); // 4. 加载 yarn-site.xml
loadConfigurationXml(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
// 5. 配置校验
validateConfigs(this.conf); // 6. login
// Set HA configuration should be done before login
this.rmContext.setHAEnabled(HAUtil.isHAEnabled(this.conf));
if (this.rmContext.isHAEnabled()) { // 如果RM 启用了 HA,设置 HA 的配置
HAUtil.verifyAndSetConfiguration(this.conf);
} // Set UGI and do login
// If security is enabled, use login user
// If security is not enabled, use current user
// 如果是启用了 安全认证,比如 kerberos,使用kerberos 登陆用户,否则默认使用当前用户
this.rmLoginUGI = UserGroupInformation.getCurrentUser();
try {
doSecureLogin();
} catch(IOException ie) {
throw new YarnRuntimeException("Failed to login", ie);
} // register the handlers for all AlwaysOn services using setupDispatcher().
// 7. 初始化所有的一直运行的服务的事件的handler
rmDispatcher = setupDispatcher();
addIfService(rmDispatcher);
rmContext.setDispatcher(rmDispatcher); // The order of services below should not be changed as services will be
// started in same order
// As elector service needs admin service to be initialized and started,
// first we add admin service then elector service
// 8. 创建 AdminService
adminService = createAdminService();
addService(adminService);
rmContext.setRMAdminService(adminService); // elector must be added post adminservice
if (this.rmContext.isHAEnabled()) {
// If the RM is configured to use an embedded leader elector,
// initialize the leader elector.
if (HAUtil.isAutomaticFailoverEnabled(conf)
&& HAUtil.isAutomaticFailoverEmbedded(conf)) {
EmbeddedElector elector = createEmbeddedElector();
addIfService(elector);
rmContext.setLeaderElectorService(elector);
}
} // 9. 设置 Yarn Configuration
rmContext.setYarnConfiguration(conf);
// 10. 创建并初始化 Active Service
createAndInitActiveServices(false); // 11. 获取 yarn wenApp地址
webAppAddress = WebAppUtils.getWebAppBindURL(this.conf,
YarnConfiguration.RM_BIND_HOST,
WebAppUtils.getRMWebAppURLWithoutScheme(this.conf)); // 12. 创建 RMApplicationHistoryWriter 服务
RMApplicationHistoryWriter rmApplicationHistoryWriter =
createRMApplicationHistoryWriter();
addService(rmApplicationHistoryWriter);
rmContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter); // initialize the RM timeline collector first so that the system metrics
// publisher can bind to it
// 13. 创建 RM timeline collector
if (YarnConfiguration.timelineServiceV2Enabled(this.conf)) {
RMTimelineCollectorManager timelineCollectorManager =
createRMTimelineCollectorManager();
addService(timelineCollectorManager);
rmContext.setRMTimelineCollectorManager(timelineCollectorManager);
} // 14. 设置 SystemMetricsPublisher
SystemMetricsPublisher systemMetricsPublisher =
createSystemMetricsPublisher();
addIfService(systemMetricsPublisher);
rmContext.setSystemMetricsPublisher(systemMetricsPublisher); // 15. 注册 JMX
registerMXBean();
// 16. 调用父类的服务 init 方法
super.serviceInit(this.conf);
}

下面逐一查看初始化的各个子步骤

初始化服务上下文

public RMContextImpl() {
// 一直运行的服务上下文
this.serviceContext = new RMServiceContext();
// 只运行在 active RM 节点上的 上下文
this.activeServiceContext = new RMActiveServiceContext();
}

设置配置的 provider

这里使用了工厂模式和配置提供了默认的ConfigurationProvider ,并且用户可以实现 ConfigurationProvider 自定义 provider。

provider 其实在其他的源码中也经常用到。在这里,provider 提供了可以做一些内部的初始化以及返回 配置文件的 inputstream 流对象,关闭流对象等操作。对于处理解析配置的类来说,只需要一个输入流即可。

// ConfigurationProviderFactory 是一个工厂类
/**
* Creates an instance of {@link ConfigurationProvider} using given
* configuration.
* @param bootstrapConf
* @return configurationProvider
*/
@SuppressWarnings("unchecked")
public static ConfigurationProvider
getConfigurationProvider(Configuration bootstrapConf) {
Class<? extends ConfigurationProvider> defaultProviderClass;
try {
// 默认的 provider class 是org.apache.hadoop.yarn.LocalConfigurationProvider
defaultProviderClass = (Class<? extends ConfigurationProvider>)
Class.forName(
YarnConfiguration.DEFAULT_RM_CONFIGURATION_PROVIDER_CLASS);
} catch (Exception e) {
throw new YarnRuntimeException(
"Invalid default configuration provider class"
+ YarnConfiguration.DEFAULT_RM_CONFIGURATION_PROVIDER_CLASS, e);
}
ConfigurationProvider configurationProvider =
// 从缓存池中获取到该类的 构造方法,然后根据构造方法反射得到 provider实例
// 可以 通过 yarn.resourcemanager.configuration.provider-class 参数指定 provider
ReflectionUtils.newInstance(bootstrapConf.getClass(
YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
defaultProviderClass, ConfigurationProvider.class),
bootstrapConf);
return configurationProvider;
}

加载 core-site.xml 文件

private void loadConfigurationXml(String configurationFile)
throws YarnException, IOException {
InputStream configurationInputStream =
this.configurationProvider.getConfigurationInputStream(this.conf,
configurationFile);
if (configurationInputStream != null) {
this.conf.addResource(configurationInputStream, configurationFile);
}
}

加载 yarn-site.xml

跟加载 core-site.xml 文件操作类似

校验配置文件

主要校验 最大尝试次数 和 过期会话时长 和 心跳间隔的关系

protected static void validateConfigs(Configuration conf) {
// validate max-attempts
int globalMaxAppAttempts =
conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
if (globalMaxAppAttempts <= 0) {
throw new YarnRuntimeException("Invalid global max attempts configuration"
+ ", " + YarnConfiguration.RM_AM_MAX_ATTEMPTS
+ "=" + globalMaxAppAttempts + ", it should be a positive integer.");
} // validate expireIntvl >= heartbeatIntvl
long expireIntvl = conf.getLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
long heartbeatIntvl =
conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS);
if (expireIntvl < heartbeatIntvl) {
throw new YarnRuntimeException("Nodemanager expiry interval should be no"
+ " less than heartbeat interval, "
+ YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS + "=" + expireIntvl
+ ", " + YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS + "="
+ heartbeatIntvl);
}
}

用户登陆

第一步:校验是否启用了HA, 如果启用了HA,需要配置HA 的相关信息,因为 用户登陆,是每个节点都需要登陆的。

第二步:获取当前的用户, 如果启用了 kerberos,那么是当前登陆kerberos的用户,否则是当前用户

@InterfaceAudience.Public
@InterfaceStability.Evolving
public static UserGroupInformation getCurrentUser() throws IOException {
AccessControlContext context = AccessController.getContext();
Subject subject = Subject.getSubject(context);
if (subject == null || subject.getPrincipals(User.class).isEmpty()) {
return getLoginUser();
} else {
return new UserGroupInformation(subject);
}
}

第三步: 调用安全API登陆,并获取登陆用户

protected void doSecureLogin() throws IOException {
InetSocketAddress socAddr = getBindAddress(conf);
SecurityUtil.login(this.conf, YarnConfiguration.RM_KEYTAB,
YarnConfiguration.RM_PRINCIPAL, socAddr.getHostName()); // if security is enable, set rmLoginUGI as UGI of loginUser
if (UserGroupInformation.isSecurityEnabled()) {
this.rmLoginUGI = UserGroupInformation.getLoginUser();
}
}

初始化所有一直运行的服务事件的handler

private Dispatcher setupDispatcher() {
// 创建 dispatcher
Dispatcher dispatcher = createDispatcher();
// 将 RMFatalEventType 事件的handler RMFatalEventDispatcher
// 注册到 dispatcher
dispatcher.register(RMFatalEventType.class,
new ResourceManager.RMFatalEventDispatcher());
return dispatcher;
} protected Dispatcher createDispatcher() {
return new AsyncDispatcher("RM Event dispatcher");
}

  AsyncDispatcher 内部是 有一个 阻塞的 事件队列,有一个一直运行的 执行线程,当阻塞队列中有事件被放入,执行线程会把事件取出来,并获取事件的类型,从事件注册器Map<Class<? extends Enum>, EventHandler>中 获取到对应的 EventHandler 对象,并调用 该对象的 dispatch 方法。这样就完成了一次异步事件调用。

创建 AdminService

protected AdminService createAdminService() {
return new AdminService(this);
}

设置 Yarn Configuration

rmContext.setYarnConfiguration(conf);
// 调用了
public void setYarnConfiguration(Configuration yarnConfiguration) {
serviceContext.setYarnConfiguration(yarnConfiguration);
}

创建并初始化 Active Service

protected void createAndInitActiveServices(boolean fromActive) {
activeServices = new RMActiveServices(this);
activeServices.fromActive = fromActive;
activeServices.init(conf);
}
// 其中,init 方法如下
@Override
public void init(Configuration conf) {
if (conf == null) {
throw new ServiceStateException("Cannot initialize service "
+ getName() + ": null configuration");
}
if (isInState(STATE.INITED)) {
return;
}
synchronized (stateChangeLock) {
if (enterState(STATE.INITED) != STATE.INITED) {
setConfig(conf);
try {
serviceInit(config);
if (isInState(STATE.INITED)) {
//if the service ended up here during init,
//notify the listeners
notifyListeners();
}
} catch (Exception e) {
noteFailure(e);
ServiceOperations.stopQuietly(LOG, this);
throw ServiceStateException.convert(e);
}
}
}
}
// 调用的 serviceInit 方法如下,后面具体分析 @Override
protected void serviceInit(Configuration configuration) throws Exception {
standByTransitionRunnable = new StandByTransitionRunnable(); rmSecretManagerService = createRMSecretManagerService();
addService(rmSecretManagerService); containerAllocationExpirer = new ContainerAllocationExpirer(rmDispatcher);
addService(containerAllocationExpirer);
rmContext.setContainerAllocationExpirer(containerAllocationExpirer); AMLivelinessMonitor amLivelinessMonitor = createAMLivelinessMonitor();
addService(amLivelinessMonitor);
rmContext.setAMLivelinessMonitor(amLivelinessMonitor); AMLivelinessMonitor amFinishingMonitor = createAMLivelinessMonitor();
addService(amFinishingMonitor);
rmContext.setAMFinishingMonitor(amFinishingMonitor); RMAppLifetimeMonitor rmAppLifetimeMonitor = createRMAppLifetimeMonitor();
addService(rmAppLifetimeMonitor);
rmContext.setRMAppLifetimeMonitor(rmAppLifetimeMonitor); RMNodeLabelsManager nlm = createNodeLabelManager();
nlm.setRMContext(rmContext);
addService(nlm);
rmContext.setNodeLabelManager(nlm); AllocationTagsManager allocationTagsManager =
createAllocationTagsManager();
rmContext.setAllocationTagsManager(allocationTagsManager); PlacementConstraintManagerService placementConstraintManager =
createPlacementConstraintManager();
addService(placementConstraintManager);
rmContext.setPlacementConstraintManager(placementConstraintManager); // add resource profiles here because it's used by AbstractYarnScheduler
ResourceProfilesManager resourceProfilesManager =
createResourceProfileManager();
resourceProfilesManager.init(conf);
rmContext.setResourceProfilesManager(resourceProfilesManager); RMDelegatedNodeLabelsUpdater delegatedNodeLabelsUpdater =
createRMDelegatedNodeLabelsUpdater();
if (delegatedNodeLabelsUpdater != null) {
addService(delegatedNodeLabelsUpdater);
rmContext.setRMDelegatedNodeLabelsUpdater(delegatedNodeLabelsUpdater);
} recoveryEnabled = conf.getBoolean(YarnConfiguration.RECOVERY_ENABLED,
YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED); RMStateStore rmStore = null;
if (recoveryEnabled) {
rmStore = RMStateStoreFactory.getStore(conf);
boolean isWorkPreservingRecoveryEnabled =
conf.getBoolean(
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED);
rmContext
.setWorkPreservingRecoveryEnabled(isWorkPreservingRecoveryEnabled);
} else {
rmStore = new NullRMStateStore();
} try {
rmStore.setResourceManager(rm);
rmStore.init(conf);
rmStore.setRMDispatcher(rmDispatcher);
} catch (Exception e) {
// the Exception from stateStore.init() needs to be handled for
// HA and we need to give up master status if we got fenced
LOG.error("Failed to init state store", e);
throw e;
}
rmContext.setStateStore(rmStore); if (UserGroupInformation.isSecurityEnabled()) {
delegationTokenRenewer = createDelegationTokenRenewer();
rmContext.setDelegationTokenRenewer(delegationTokenRenewer);
} // Register event handler for NodesListManager
nodesListManager = new NodesListManager(rmContext);
rmDispatcher.register(NodesListManagerEventType.class, nodesListManager);
addService(nodesListManager);
rmContext.setNodesListManager(nodesListManager); // Initialize the scheduler
scheduler = createScheduler();
scheduler.setRMContext(rmContext);
addIfService(scheduler);
rmContext.setScheduler(scheduler); schedulerDispatcher = createSchedulerEventDispatcher();
addIfService(schedulerDispatcher);
rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher); // Register event handler for RmAppEvents
rmDispatcher.register(RMAppEventType.class,
new ApplicationEventDispatcher(rmContext)); // Register event handler for RmAppAttemptEvents
rmDispatcher.register(RMAppAttemptEventType.class,
new ApplicationAttemptEventDispatcher(rmContext)); // Register event handler for RmNodes
rmDispatcher.register(
RMNodeEventType.class, new NodeEventDispatcher(rmContext)); nmLivelinessMonitor = createNMLivelinessMonitor();
addService(nmLivelinessMonitor); resourceTracker = createResourceTrackerService();
addService(resourceTracker);
rmContext.setResourceTrackerService(resourceTracker); MetricsSystem ms = DefaultMetricsSystem.initialize("ResourceManager");
if (fromActive) {
JvmMetrics.reattach(ms, jvmMetrics);
UserGroupInformation.reattachMetrics();
} else {
jvmMetrics = JvmMetrics.initSingleton("ResourceManager", null);
} JvmPauseMonitor pauseMonitor = new JvmPauseMonitor();
addService(pauseMonitor);
jvmMetrics.setPauseMonitor(pauseMonitor); // Initialize the Reservation system
if (conf.getBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE,
YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_ENABLE)) {
reservationSystem = createReservationSystem();
if (reservationSystem != null) {
reservationSystem.setRMContext(rmContext);
addIfService(reservationSystem);
rmContext.setReservationSystem(reservationSystem);
LOG.info("Initialized Reservation system");
}
} masterService = createApplicationMasterService();
createAndRegisterOpportunisticDispatcher(masterService);
addService(masterService) ;
rmContext.setApplicationMasterService(masterService); applicationACLsManager = new ApplicationACLsManager(conf); queueACLsManager = createQueueACLsManager(scheduler, conf); rmAppManager = createRMAppManager();
// Register event handler for RMAppManagerEvents
rmDispatcher.register(RMAppManagerEventType.class, rmAppManager); clientRM = createClientRMService();
addService(clientRM);
rmContext.setClientRMService(clientRM); applicationMasterLauncher = createAMLauncher();
rmDispatcher.register(AMLauncherEventType.class,
applicationMasterLauncher); addService(applicationMasterLauncher);
if (UserGroupInformation.isSecurityEnabled()) {
addService(delegationTokenRenewer);
delegationTokenRenewer.setRMContext(rmContext);
} if(HAUtil.isFederationEnabled(conf)) {
String cId = YarnConfiguration.getClusterId(conf);
if (cId.isEmpty()) {
String errMsg =
"Cannot initialize RM as Federation is enabled"
+ " but cluster id is not configured.";
LOG.error(errMsg);
throw new YarnRuntimeException(errMsg);
}
federationStateStoreService = createFederationStateStoreService();
addIfService(federationStateStoreService);
LOG.info("Initialized Federation membership.");
} new RMNMInfo(rmContext, scheduler); if (conf.getBoolean(YarnConfiguration.YARN_API_SERVICES_ENABLE,
false)) {
SystemServiceManager systemServiceManager = createServiceManager();
addIfService(systemServiceManager);
} super.serviceInit(conf);
}

获取 yarn wenApp地址

// yarn.resourcemanager.bind-host 可以根据这个参数来动态指定 RM HOST
webAppAddress = WebAppUtils.getWebAppBindURL(this.conf,
YarnConfiguration.RM_BIND_HOST,
WebAppUtils.getRMWebAppURLWithoutScheme(this.conf));

创建 RMApplicationHistoryWriter 服务

protected RMApplicationHistoryWriter createRMApplicationHistoryWriter() {
return new RMApplicationHistoryWriter();
} RMApplicationHistoryWriter rmApplicationHistoryWriter =
createRMApplicationHistoryWriter();
addService(rmApplicationHistoryWriter);
rmContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);

创建 RM timeline collector

private RMTimelineCollectorManager createRMTimelineCollectorManager() {
return new RMTimelineCollectorManager(this);
} if (YarnConfiguration.timelineServiceV2Enabled(this.conf)) {
RMTimelineCollectorManager timelineCollectorManager =
createRMTimelineCollectorManager();
addService(timelineCollectorManager);
rmContext.setRMTimelineCollectorManager(timelineCollectorManager);
}

设置 SystemMetricsPublisher

protected SystemMetricsPublisher createSystemMetricsPublisher() {
List<SystemMetricsPublisher> publishers =
new ArrayList<SystemMetricsPublisher>();
// 使用 v1
if (YarnConfiguration.timelineServiceV1Enabled(conf)) {
SystemMetricsPublisher publisherV1 = new TimelineServiceV1Publisher();
publishers.add(publisherV1);
}
// 使用 v2
if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
// we're dealing with the v.2.x publisher
LOG.info("system metrics publisher with the timeline service V2 is "
+ "configured");
SystemMetricsPublisher publisherV2 = new TimelineServiceV2Publisher(
rmContext.getRMTimelineCollectorManager());
publishers.add(publisherV2);
}
// 如果没有 publisher, 给一个 空的 publisher,这里运用了null object 模式,防止了空指针的出现。
if (publishers.isEmpty()) {
LOG.info("TimelineServicePublisher is not configured");
SystemMetricsPublisher noopPublisher = new NoOpSystemMetricPublisher();
publishers.add(noopPublisher);
} for (SystemMetricsPublisher publisher : publishers) {
addIfService(publisher);
} SystemMetricsPublisher combinedPublisher =
new CombinedSystemMetricsPublisher(publishers);
return combinedPublisher;
}

注册 JMX

/**
* Register ResourceManagerMXBean.
*/
private void registerMXBean() {
MBeans.register("ResourceManager", "ResourceManager", this);
}

调用父类的服务 init 方法

// 在这里,之前初始化过程中创建的任何被加入到服务列表中的服务,都会被初始化。
protected void serviceInit(Configuration conf) throws Exception {
List<Service> services = getServices();
if (LOG.isDebugEnabled()) {
LOG.debug(getName() + ": initing services, size=" + services.size());
}
for (Service service : services) {
service.init(conf);
}
super.serviceInit(conf);
}
// 奇怪,为什么不直接 返回呢?ArrayList 的构造方法里面做的事就是 Arrays.copyOf 的工作(浅拷贝),防止了外部应用更新或删除服务列表。这是一个建议的做法,还可以返回一个 iterator 对象
public List<Service> getServices() {
synchronized (serviceList) {
return new ArrayList<Service>(serviceList);
}
}

至此,初始化的大致代码,基本上走完了,后续涉及到哪部分代码,再回来具体看。

最新文章

  1. JavaScript作用域闭包简述
  2. js页面跳转整理(转载未整理)
  3. SSH框架整合配置所需JAR包(SSH整合)
  4. HashMap优雅的初始化方式以及引申
  5. edge.js架起node.js和.net互操作桥梁
  6. 第4章 管道与FIFO
  7. C#中使用Bogus创建模拟数据
  8. 三、Dockerfile的说明和编写
  9. 从零开始学python
  10. label 的for属性总结
  11. MySQL 使用join操作时出现重复数据
  12. Spring Boot 构建电商基础秒杀项目 (五) 用户注册
  13. 洛谷 P1162 填涂颜色
  14. Python学习之旅(三十三)
  15. iOS10 UIImageWriteToSavedPhotosAlbum程序奔溃crash解决办法
  16. 在移动端如何用swiper实现导航栏效果
  17. POJ 1330 Nearest Common Ancestors(LCA Tarjan算法)
  18. NodeJS遍历文件生产文件列表
  19. bzoj 1036: [ZJOI2008]树的统计Count 树链剖分+线段树
  20. Cordova 混合开发

热门文章

  1. [Example of Sklearn] - Example
  2. ASP.NET Page执行顺序(ASP.NET生命周期)
  3. intraweb 11.0.63 for delphi7 破解
  4. AI2XAML's Bug
  5. C#中的String.Format介绍
  6. WPF内实现与串口发送数据和接收数据
  7. 零元学Expression Design 4 - Chapter 7 使用内建功能「Clone」来达成Path的影分身之术
  8. WPF 自定义的图表(适用大量数据绘制)下
  9. leetcode先刷_Pascal&amp;#39;s Triangle II
  10. html5中 table数据导出到excel文件