diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 57c8fce..2f7e4d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -23,9 +23,7 @@ import java.net.InetSocketAddress; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -190,12 +188,8 @@ public static long getClusterTimeStamp() { protected static void setClusterTimeStamp(long timestamp) { clusterTimeStamp = timestamp; } - - @Override - protected void serviceInit(Configuration conf) throws Exception { - this.conf = conf; - this.rmContext = new RMContextImpl(); - + + protected void internalServiceInit(Configuration conf) throws Exception { this.configurationProvider = ConfigurationProviderFactory.getConfigurationProvider(conf); this.configurationProvider.init(this.conf); @@ -242,16 +236,40 @@ protected void serviceInit(Configuration conf) throws Exception { if (this.rmContext.isHAEnabled()) { HAUtil.verifyAndSetConfiguration(this.conf); } + createAndInitActiveServices(); webAppAddress = WebAppUtils.getWebAppBindURL(this.conf, YarnConfiguration.RM_BIND_HOST, WebAppUtils.getRMWebAppURLWithoutScheme(this.conf)); - this.rmLoginUGI = UserGroupInformation.getCurrentUser(); - super.serviceInit(this.conf); } + + @Override + protected void serviceInit(final Configuration conf) throws Exception { + this.conf = conf; + this.rmContext = new RMContextImpl(); + + // Set UGI and do login + // If security is enabled, use login user + // If security is not enabled, use current user + this.rmLoginUGI = UserGroupInformation.getCurrentUser(); + try { + doSecureLogin(); + } catch(IOException ie) { + throw new YarnRuntimeException("Failed to login", ie); + } + + // Do serviceInit + this.rmLoginUGI.doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + internalServiceInit(conf); + return null; + } + }); + } protected QueueACLsManager createQueueACLsManager(ResourceScheduler scheduler, Configuration conf) { @@ -1019,30 +1037,20 @@ protected boolean areActiveServicesRunning() { } synchronized void transitionToActive() throws Exception { - if (rmContext.getHAServiceState() == - HAServiceProtocol.HAServiceState.ACTIVE) { + if (rmContext.getHAServiceState() == HAServiceProtocol.HAServiceState.ACTIVE) { LOG.info("Already in active state"); return; } LOG.info("Transitioning to active state"); - // use rmLoginUGI to startActiveServices. - // in non-secure model, rmLoginUGI will be current UGI - // in secure model, rmLoginUGI will be LoginUser UGI - this.rmLoginUGI.doAs(new PrivilegedExceptionAction() { - @Override - public Void run() throws Exception { - try { - startActiveServices(); - return null; - } catch (Exception e) { - resetDispatcher(); - createAndInitActiveServices(); - throw e; - } - } - }); + try { + startActiveServices(); + } catch (Exception e) { + resetDispatcher(); + createAndInitActiveServices(); + throw e; + } rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.ACTIVE); LOG.info("Transitioned to active state"); @@ -1068,15 +1076,8 @@ synchronized void transitionToStandby(boolean initialize) rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.STANDBY); LOG.info("Transitioned to standby state"); } - - @Override - protected void serviceStart() throws Exception { - try { - doSecureLogin(); - } catch(IOException ie) { - throw new YarnRuntimeException("Failed to login", ie); - } - + + private void internalServiceStart() throws Exception { if (this.rmContext.isHAEnabled()) { transitionToStandby(true); } else { @@ -1084,12 +1085,29 @@ protected void serviceStart() throws Exception { } startWepApp(); - if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) { + if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, + false)) { int port = webApp.port(); WebAppUtils.setRMWebAppPort(conf, port); } super.serviceStart(); } + + @Override + protected void serviceStart() throws Exception { + LOG.info("Transitioning to active state"); + + // use rmLoginUGI to startActiveServices. + // in non-secure model, rmLoginUGI will be current UGI + // in secure model, rmLoginUGI will be LoginUser UGI + this.rmLoginUGI.doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + internalServiceStart(); + return null; + } + }); + } protected void doSecureLogin() throws IOException { InetSocketAddress socAddr = getBindAddress(conf);