diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 3c187f9..3acab96 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -975,6 +975,12 @@ public AMRMTokenSecretManager getAMRMTokenSecretManager(){ return this.amRmTokenSecretManager; } + @Private + public RMHAProtocolService getRMHAService() { + return this.haService; + } + + @Override public void recover(RMState state) throws Exception { // recover RMdelegationTokenSecretManager diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index dbb6507..52437da 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -30,13 +30,14 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.http.HttpConfig; +import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; @@ -54,6 +55,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl; +import org.apache.hadoop.yarn.server.resourcemanager.RMHAProtocolService; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; @@ -87,7 +89,7 @@ } private NodeManager[] nodeManagers; - private ResourceManager resourceManager; + private ResourceManager[] resourceManagers; private ResourceManagerWrapper resourceManagerWrapper; @@ -103,12 +105,14 @@ /** * @param testName name of the test - * @param noOfNodeManagers the number of node managers in the cluster + * @param numResourceManagers the number of resource managers in the cluster + * @param numNodeManagers the number of node managers in the cluster * @param numLocalDirs the number of nm-local-dirs per nodemanager * @param numLogDirs the number of nm-log-dirs per nodemanager */ - public MiniYARNCluster(String testName, int noOfNodeManagers, - int numLocalDirs, int numLogDirs) { + public MiniYARNCluster( + String testName, int numResourceManagers, int numNodeManagers, + int numLocalDirs, int numLogDirs) { super(testName.replace("$", "")); this.numLocalDirs = numLocalDirs; this.numLogDirs = numLogDirs; @@ -157,20 +161,46 @@ public MiniYARNCluster(String testName, int noOfNodeManagers, this.testWorkDir = targetWorkDir; } - resourceManagerWrapper = new ResourceManagerWrapper(); - addService(resourceManagerWrapper); - nodeManagers = new CustomNodeManager[noOfNodeManagers]; - for(int index = 0; index < noOfNodeManagers; index++) { + resourceManagers = new CustomResourceManager[numResourceManagers]; + for (int i = 0; i < numResourceManagers; i++) { + resourceManagers[i] = new CustomResourceManager(); + addService(new ResourceManagerWrapper(i)); + } + nodeManagers = new CustomNodeManager[numNodeManagers]; + for(int index = 0; index < numNodeManagers; index++) { addService(new NodeManagerWrapper(index)); nodeManagers[index] = new CustomNodeManager(); } } - - @Override + + /** + * @param testName name of the test + * @param numNodeManagers the number of node managers in the cluster + * @param numLocalDirs the number of nm-local-dirs per nodemanager + * @param numLogDirs the number of nm-log-dirs per nodemanager + */ + public MiniYARNCluster(String testName, int numNodeManagers, + int numLocalDirs, int numLogDirs) { + this(testName, 1, numNodeManagers, numLocalDirs, numLogDirs); + } + + @Override public void serviceInit(Configuration conf) throws Exception { - super.serviceInit(conf instanceof YarnConfiguration ? conf - : new YarnConfiguration( - conf)); + if (resourceManagers.length > 1) { + conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); + + StringBuilder rmIds = new StringBuilder(); + for (int i = 0; i < resourceManagers.length; i++) { + if (i != 0) { + rmIds.append(","); + } + rmIds.append("rm" + i); + } + conf.set(YarnConfiguration.RM_HA_IDS, rmIds.toString()); + } + super.serviceInit(conf instanceof YarnConfiguration + ? conf + : new YarnConfiguration(conf)); } public File getTestWorkDir() { @@ -178,7 +208,11 @@ public File getTestWorkDir() { } public ResourceManager getResourceManager() { - return this.resourceManager; + return this.resourceManagers[0]; + } + + public ResourceManager getResourceManager(int i) { + return this.resourceManagers[i]; } public NodeManager getNodeManager(int i) { @@ -195,8 +229,29 @@ public static String getHostname() { } private class ResourceManagerWrapper extends AbstractService { - public ResourceManagerWrapper() { - super(ResourceManagerWrapper.class.getName()); + private int index; + + public ResourceManagerWrapper(int i) { + super(ResourceManagerWrapper.class.getName() + "_" + i); + index = i; + } + + private void setNonHARMConfiguration(Configuration conf) { + String hostname = MiniYARNCluster.getHostname(); + conf.set(YarnConfiguration.RM_ADDRESS, hostname + ":0"); + conf.set(YarnConfiguration.RM_ADMIN_ADDRESS, hostname + ":0"); + conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, hostname + ":0"); + conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, hostname + ":0"); + WebAppUtils.setRMWebAppHostnameAndPort(conf, hostname, 0); + } + + private void setHARMConfiguration(Configuration conf) { + String rmId = "rm" + index; + String hostname = MiniYARNCluster.getHostname(); + conf.set(YarnConfiguration.RM_HA_ID, rmId); + for (String confKey : YarnConfiguration.RM_RPC_ADDRESS_CONF_KEYS) { + conf.set(HAUtil.addSuffix(confKey, rmId), hostname + ":0"); + } } @Override @@ -206,22 +261,15 @@ protected synchronized void serviceInit(Configuration conf) if (!conf.getBoolean( YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, YarnConfiguration.DEFAULT_YARN_MINICLUSTER_FIXED_PORTS)) { - // pick free random ports. - String hostname = MiniYARNCluster.getHostname(); - conf.set(YarnConfiguration.RM_ADDRESS, hostname + ":0"); - conf.set(YarnConfiguration.RM_ADMIN_ADDRESS, hostname + ":0"); - conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, hostname + ":0"); - conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, hostname + ":0"); - WebAppUtils.setRMWebAppHostnameAndPort(conf, hostname, 0); + if (HAUtil.isHAEnabled(conf)) { + setHARMConfiguration(conf); + } else { + setNonHARMConfiguration(conf); + } } - resourceManager = new ResourceManager() { - @Override - protected void doSecureLogin() throws IOException { - // Don't try to login using keytab in the testcase. - }; - }; - resourceManager.init(conf); - resourceManager.getRMContext().getDispatcher().register(RMAppAttemptEventType.class, + resourceManagers[index].init(conf); + resourceManagers[index].getRMContext().getDispatcher().register + (RMAppAttemptEventType.class, new EventHandler() { public void handle(RMAppAttemptEvent event) { if (event instanceof RMAppAttemptRegistrationEvent) { @@ -239,20 +287,20 @@ protected synchronized void serviceStart() throws Exception { try { new Thread() { public void run() { - resourceManager.start(); - }; + resourceManagers[index].start(); + } }.start(); int waitCount = 0; - while (resourceManager.getServiceState() == STATE.INITED + while (resourceManagers[index].getServiceState() == STATE.INITED && waitCount++ < 60) { LOG.info("Waiting for RM to start..."); Thread.sleep(1500); } - if (resourceManager.getServiceState() != STATE.STARTED) { + if (resourceManagers[index].getServiceState() != STATE.STARTED) { // RM could have failed. throw new IOException( "ResourceManager failed to start. Final state is " - + resourceManager.getServiceState()); + + resourceManagers[index].getServiceState()); } super.serviceStart(); } catch (Throwable t) { @@ -278,9 +326,9 @@ private void waitForAppMastersToFinish(long timeoutMillis) throws InterruptedExc @Override protected synchronized void serviceStop() throws Exception { - if (resourceManager != null) { + if (resourceManagers[index] != null) { waitForAppMastersToFinish(5000); - resourceManager.stop(); + resourceManagers[index].stop(); } super.serviceStop(); @@ -372,7 +420,7 @@ protected synchronized void serviceStart() throws Exception { new Thread() { public void run() { nodeManagers[index].start(); - }; + } }.start(); int waitCount = 0; while (nodeManagers[index].getServiceState() == STATE.INITED @@ -398,12 +446,48 @@ protected synchronized void serviceStop() throws Exception { super.serviceStop(); } } + + private class CustomResourceManager extends ResourceManager { + @Override + protected void doSecureLogin() throws IOException { + // Don't try to login using keytab in the testcase. + } + + @Override + protected RMHAProtocolService createRMHAProtocolService() { + return new RMHAProtocolService(this) { + @Override + protected void startHAAdminServer() { + // do nothing[[[ + } + }; + } + } private class CustomNodeManager extends NodeManager { @Override protected void doSecureLogin() throws IOException { // Don't try to login using keytab in the testcase. - }; + } + + private int getIndexOfActiveRM() { + if (resourceManagers.length == 1) + return 0; + + while (true) { + // HA is enabled. Wait until one of the RMs is Active + for (int i = 0; i < resourceManagers.length; i++) { + try { + if (HAServiceProtocol.HAServiceState.ACTIVE == + resourceManagers[i].getRMHAService().getServiceStatus().getState()) { + return i; + } + } catch (IOException e) { + e.printStackTrace(); + } + } + } + } @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, @@ -412,8 +496,8 @@ protected NodeStatusUpdater createNodeStatusUpdater(Context context, healthChecker, metrics) { @Override protected ResourceTracker getRMClient() { - final ResourceTrackerService rt = resourceManager - .getResourceTrackerService(); + final ResourceTrackerService rt = + resourceManagers[getIndexOfActiveRM()].getResourceTrackerService(); final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); @@ -452,13 +536,11 @@ public RegisterNodeManagerResponse registerNodeManager( return response; } }; - }; + } @Override - protected void stopRMProxy() { - return; - } + protected void stopRMProxy() { } }; - }; + } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java new file mode 100644 index 0000000..1dc820c --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java @@ -0,0 +1,76 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class TestMiniYARNClusterForHA { + MiniYARNCluster cluster; + + @Before + public void setup() throws IOException, InterruptedException { + Configuration conf = new YarnConfiguration(); + + cluster = new MiniYARNCluster(TestMiniYARNClusterForHA.class.getName(), + 2, 1, 1, 1); + cluster.init(conf); + cluster.start(); + + cluster.getResourceManager(0).getRMHAService().transitionToActive(new + HAServiceProtocol.StateChangeRequestInfo( + HAServiceProtocol.RequestSource.REQUEST_BY_USER)); + + for (int i = 0; i < 3000; i++) { + if (HAServiceProtocol.HAServiceState.ACTIVE == + cluster.getResourceManager(0).getRMHAService().getServiceStatus(). + getState()) { + return; + } + Thread.sleep(100); + } + fail("RM never turned active"); + } + + @Test + public void testClusterWorks() throws YarnException, InterruptedException { + ResourceManager rm = cluster.getResourceManager(0); + GetClusterMetricsRequest req = GetClusterMetricsRequest.newInstance(); + + for (int i = 0; i < 600; i++) { + if (1 == rm.getClientRMService().getClusterMetrics(req) + .getClusterMetrics().getNumNodeManagers()) { + return; + } + Thread.sleep(100); + } + fail("NodeManager never registered with the RM"); + } +}