diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java index 8c5d00b..04d4ca9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java @@ -300,4 +300,17 @@ private static void checkAndSetRMRPCAddress(String prefix, String RMId, throwBadConfigurationException(errmsg); } } + + public static void setRpcAddressForRM(String rmId, int base, + Configuration conf) { + for (String confKey : YarnConfiguration.getServiceAddressConfKeys(conf)) { + setConfForRM(rmId, confKey, "0.0.0.0:" + (base + + YarnConfiguration.getRMDefaultPortNumber(confKey, conf)), conf); + } + } + + public static void setConfForRM(String rmId, String prefix, String value, + Configuration conf) { + conf.set(HAUtil.addSuffix(prefix, rmId), value); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java index 903dd94..f27ff43 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java @@ -161,26 +161,6 @@ protected Thread failoverThread = null; private volatile boolean keepRunning; - private void setConfForRM(String rmId, String prefix, String value) { - conf.set(HAUtil.addSuffix(prefix, rmId), value); - } - - private void setRpcAddressForRM(String rmId, int base) { - setConfForRM(rmId, YarnConfiguration.RM_ADDRESS, "0.0.0.0:" + - (base + YarnConfiguration.DEFAULT_RM_PORT)); - setConfForRM(rmId, YarnConfiguration.RM_SCHEDULER_ADDRESS, "0.0.0.0:" + - (base + YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT)); - setConfForRM(rmId, YarnConfiguration.RM_ADMIN_ADDRESS, "0.0.0.0:" + - (base + YarnConfiguration.DEFAULT_RM_ADMIN_PORT)); - setConfForRM(rmId, YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, - "0.0.0.0:" + (base + YarnConfiguration - .DEFAULT_RM_RESOURCE_TRACKER_PORT)); - setConfForRM(rmId, YarnConfiguration.RM_WEBAPP_ADDRESS, "0.0.0.0:" + - (base + YarnConfiguration.DEFAULT_RM_WEBAPP_PORT)); - setConfForRM(rmId, YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, "0.0.0.0:" + - (base + YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_PORT)); - } - @Before public void setup() throws IOException { failoverThread = null; @@ -189,8 +169,9 @@ public void setup() throws IOException { conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); conf.setInt(YarnConfiguration.CLIENT_FAILOVER_MAX_ATTEMPTS, 5); conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID); - setRpcAddressForRM(RM1_NODE_ID, RM1_PORT_BASE); - setRpcAddressForRM(RM2_NODE_ID, RM2_PORT_BASE); + conf.set(YarnConfiguration.RM_HA_ID, RM2_NODE_ID); + HAUtil.setRpcAddressForRM(RM1_NODE_ID, RM1_PORT_BASE, conf); + HAUtil.setRpcAddressForRM(RM2_NODE_ID, RM2_PORT_BASE, conf); conf.setLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS, 100L); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolOnHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolOnHA.java index 41e1800..ad86fb3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolOnHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolOnHA.java @@ -54,18 +54,18 @@ public void initialize() throws Exception { startHACluster(0, false, false, true); attemptId = this.cluster.createFakeApplicationAttemptId(); - amClient = ClientRMProxy - .createRMProxy(this.conf, ApplicationMasterProtocol.class); Token appToken = this.cluster.getResourceManager().getRMContext() .getAMRMTokenSecretManager().createAndGetAMRMToken(attemptId); - appToken.setService(ClientRMProxy.getAMRMTokenService(conf)); + appToken.setService(ClientRMProxy.getAMRMTokenService(this.conf)); UserGroupInformation.setLoginUser(UserGroupInformation - .createRemoteUser(UserGroupInformation.getCurrentUser() - .getUserName())); + .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName())); UserGroupInformation.getCurrentUser().addToken(appToken); syncToken(appToken); + + amClient = ClientRMProxy + .createRMProxy(this.conf, ApplicationMasterProtocol.class); } @After diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java index cd22743..9116078 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java @@ -66,34 +66,15 @@ private MiniYARNCluster cluster; private ApplicationId fakeAppId; - - private void setConfForRM(String rmId, String prefix, String value) { - conf.set(HAUtil.addSuffix(prefix, rmId), value); - } - - private void setRpcAddressForRM(String rmId, int base) { - setConfForRM(rmId, YarnConfiguration.RM_ADDRESS, "0.0.0.0:" + - (base + YarnConfiguration.DEFAULT_RM_PORT)); - setConfForRM(rmId, YarnConfiguration.RM_SCHEDULER_ADDRESS, "0.0.0.0:" + - (base + YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT)); - setConfForRM(rmId, YarnConfiguration.RM_ADMIN_ADDRESS, "0.0.0.0:" + - (base + YarnConfiguration.DEFAULT_RM_ADMIN_PORT)); - setConfForRM(rmId, YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, "0.0.0.0:" + - (base + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT)); - setConfForRM(rmId, YarnConfiguration.RM_WEBAPP_ADDRESS, "0.0.0.0:" + - (base + YarnConfiguration.DEFAULT_RM_WEBAPP_PORT)); - setConfForRM(rmId, YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, "0.0.0.0:" + - (base + YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_PORT)); - } - @Before public void setup() throws IOException { fakeAppId = ApplicationId.newInstance(System.currentTimeMillis(), 0); conf = new YarnConfiguration(); conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID); - setRpcAddressForRM(RM1_NODE_ID, RM1_PORT_BASE); - setRpcAddressForRM(RM2_NODE_ID, RM2_PORT_BASE); + HAUtil.setRpcAddressForRM(RM1_NODE_ID, RM1_PORT_BASE, conf); + HAUtil.setRpcAddressForRM(RM2_NODE_ID, RM2_PORT_BASE, conf); + conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID); conf.setLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS, 100L); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index ee6f6be..ff9b820 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -107,7 +107,7 @@ private static final Log LOG = LogFactory.getLog(ApplicationMasterService.class); private final AMLivelinessMonitor amLivelinessMonitor; private YarnScheduler rScheduler; - private InetSocketAddress bindAddress; + private InetSocketAddress masterServiceAddress; private Server server; private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); @@ -123,15 +123,18 @@ public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) { } @Override - protected void serviceStart() throws Exception { - Configuration conf = getConfig(); - YarnRPC rpc = YarnRPC.create(conf); - - InetSocketAddress masterServiceAddress = conf.getSocketAddr( + protected void serviceInit(Configuration conf) throws Exception { + masterServiceAddress = conf.getSocketAddr( YarnConfiguration.RM_BIND_HOST, YarnConfiguration.RM_SCHEDULER_ADDRESS, YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); + } + + @Override + protected void serviceStart() throws Exception { + Configuration conf = getConfig(); + YarnRPC rpc = YarnRPC.create(conf); Configuration serverConf = conf; // If the auth is not-simple, enforce it to be token-based. @@ -160,7 +163,7 @@ protected void serviceStart() throws Exception { } this.server.start(); - this.bindAddress = + this.masterServiceAddress = conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST, YarnConfiguration.RM_SCHEDULER_ADDRESS, YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, @@ -170,7 +173,7 @@ protected void serviceStart() throws Exception { @Private public InetSocketAddress getBindAddress() { - return this.bindAddress; + return this.masterServiceAddress; } // Obtain the needed AMRMTokenIdentifier from the remote-UGI. RPC layer diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java index 0d9ee6d..5ed7d09 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java @@ -42,25 +42,6 @@ private Configuration conf; private AtomicBoolean callbackCalled; - private void setConfForRM(String rmId, String prefix, String value) { - conf.set(HAUtil.addSuffix(prefix, rmId), value); - } - - private void setRpcAddressForRM(String rmId, int base) { - setConfForRM(rmId, YarnConfiguration.RM_ADDRESS, "0.0.0.0:" + - (base + YarnConfiguration.DEFAULT_RM_PORT)); - setConfForRM(rmId, YarnConfiguration.RM_SCHEDULER_ADDRESS, "0.0.0.0:" + - (base + YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT)); - setConfForRM(rmId, YarnConfiguration.RM_ADMIN_ADDRESS, "0.0.0.0:" + - (base + YarnConfiguration.DEFAULT_RM_ADMIN_PORT)); - setConfForRM(rmId, YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, "0.0.0.0:" + - (base + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT)); - setConfForRM(rmId, YarnConfiguration.RM_WEBAPP_ADDRESS, "0.0.0.0:" + - (base + YarnConfiguration.DEFAULT_RM_WEBAPP_PORT)); - setConfForRM(rmId, YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, "0.0.0.0:" + - (base + YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_PORT)); - } - @Before public void setup() throws IOException { conf = new YarnConfiguration(); @@ -73,8 +54,8 @@ public void setup() throws IOException { conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID); conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID); - setRpcAddressForRM(RM1_NODE_ID, RM1_PORT_BASE); - setRpcAddressForRM(RM2_NODE_ID, RM2_PORT_BASE); + HAUtil.setRpcAddressForRM(RM1_NODE_ID, RM1_PORT_BASE, conf); + HAUtil.setRpcAddressForRM(RM2_NODE_ID, RM2_PORT_BASE, conf); conf.setLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS, 100L); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index f8b27b3..27e6177 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -239,6 +239,7 @@ public void serviceInit(Configuration conf) throws Exception { rmIds.append("rm" + i); } conf.set(YarnConfiguration.RM_HA_IDS, rmIds.toString()); + conf.set(YarnConfiguration.RM_HA_ID, "rm0"); } Collection rmIdsCollection = HAUtil.getRMHAIds(conf); rmIds = rmIdsCollection.toArray(new String[rmIdsCollection.size()]); @@ -287,10 +288,11 @@ private void setHARMConfiguration(final int index, Configuration conf) { } private synchronized void initResourceManager(int index, Configuration conf) { - if (HAUtil.isHAEnabled(conf)) { - conf.set(YarnConfiguration.RM_HA_ID, rmIds[index]); + Configuration newConf = new YarnConfiguration(conf); + if (HAUtil.isHAEnabled(newConf)) { + newConf.set(YarnConfiguration.RM_HA_ID, rmIds[index]); } - resourceManagers[index].init(conf); + resourceManagers[index].init(newConf); resourceManagers[index].getRMContext().getDispatcher().register( RMAppAttemptEventType.class, new EventHandler() { @@ -329,10 +331,11 @@ public void run() { } catch (Throwable t) { throw new YarnRuntimeException(t); } + Configuration conf = resourceManagers[index].getConfig(); LOG.info("MiniYARN ResourceManager address: " + - getConfig().get(YarnConfiguration.RM_ADDRESS)); + conf.get(YarnConfiguration.RM_ADDRESS)); LOG.info("MiniYARN ResourceManager web address: " + - WebAppUtils.getRMWebAppURLWithoutScheme(getConfig())); + WebAppUtils.getRMWebAppURLWithoutScheme(conf)); } @InterfaceAudience.Private @@ -352,7 +355,6 @@ public synchronized void restartResourceManager(int index) resourceManagers[index].stop(); resourceManagers[index] = null; } - Configuration conf = getConfig(); resourceManagers[index] = new ResourceManager(); initResourceManager(index, getConfig()); startResourceManager(index); @@ -433,6 +435,7 @@ public static String getHostname() { private class ResourceManagerWrapper extends AbstractService { private int index; + public ResourceManagerWrapper(int i) { super(ResourceManagerWrapper.class.getName() + "_" + i); index = i; @@ -448,10 +451,11 @@ protected synchronized void serviceInit(Configuration conf) @Override protected synchronized void serviceStart() throws Exception { startResourceManager(index); + Configuration conf = resourceManagers[index].getConfig(); LOG.info("MiniYARN ResourceManager address: " + - getConfig().get(YarnConfiguration.RM_ADDRESS)); - LOG.info("MiniYARN ResourceManager web address: " + - WebAppUtils.getRMWebAppURLWithoutScheme(getConfig())); + conf.get(YarnConfiguration.RM_ADDRESS)); + LOG.info("MiniYARN ResourceManager web address: " + WebAppUtils + .getRMWebAppURLWithoutScheme(conf)); super.serviceStart(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYarnCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYarnCluster.java index 8a3c9e7..ddb1616 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYarnCluster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYarnCluster.java @@ -19,7 +19,9 @@ package org.apache.hadoop.yarn.server; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore; import org.junit.Assert; import org.junit.Test; @@ -112,4 +114,39 @@ public void testTimelineServiceStartInMiniCluster() throws Exception { } } } + + @Test + public void testMultiRMConf() { + String RM1_NODE_ID = "rm1", RM2_NODE_ID = "rm2"; + int RM1_PORT_BASE = 10000, RM2_PORT_BASE = 20000; + Configuration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.RM_CLUSTER_ID, "yarn-test-cluster"); + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); + conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); + conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID); + HAUtil.setRpcAddressForRM(RM1_NODE_ID, RM1_PORT_BASE, conf); + HAUtil.setRpcAddressForRM(RM2_NODE_ID, RM2_PORT_BASE, conf); + conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true); + conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC, true); + + MiniYARNCluster cluster = + new MiniYARNCluster(TestMiniYarnCluster.class.getName(), + 2, 0, 1, 1); + cluster.init(conf); + Configuration conf1 = cluster.getResourceManager(0).getConfig(), + conf2 = cluster.getResourceManager(1).getConfig(); + Assert.assertFalse(conf1 == conf2); + Assert.assertEquals("0.0.0.0:18032", + conf1.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS, RM1_NODE_ID))); + Assert.assertEquals("0.0.0.0:28032", + conf1.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS, RM2_NODE_ID))); + Assert.assertEquals("rm1", conf1.get(YarnConfiguration.RM_HA_ID)); + + Assert.assertEquals("0.0.0.0:18032", + conf2.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS, RM1_NODE_ID))); + Assert.assertEquals("0.0.0.0:28032", + conf2.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS, RM2_NODE_ID))); + Assert.assertEquals("rm2", conf2.get(YarnConfiguration.RM_HA_ID)); + } }