diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml index 09a56ea..e6f8431 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml @@ -121,6 +121,12 @@ mockito-all test + + org.apache.hadoop + hadoop-yarn-server-resourcemanager + test-jar + test + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index b28c0c9..c309233 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -30,6 +30,7 @@ import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -187,7 +188,7 @@ // Handle to communicate with the Resource Manager @SuppressWarnings("rawtypes") - private AMRMClientAsync amRMClient; + private AMRMClientAsync amRMClient; // In both secure and non-secure modes, this points to the job-submitter. @VisibleForTesting @@ -227,7 +228,7 @@ // Allocated container count so that we know how many containers has the RM // allocated to us @VisibleForTesting - protected AtomicInteger numAllocatedContainers = new AtomicInteger(); + protected final AtomicInteger numAllocatedContainers = new AtomicInteger(); // Count of failed containers private AtomicInteger numFailedContainers = new AtomicInteger(); // Count of containers already requested from the RM @@ -802,7 +803,55 @@ public void onContainersAllocated(List allocatedContainers) { LOG.info("Got response from RM for container ask, allocatedCnt=" + allocatedContainers.size()); numAllocatedContainers.addAndGet(allocatedContainers.size()); + AMRMClientAsync amRMClient = ApplicationMaster.this + .amRMClient; for (Container allocatedContainer : allocatedContainers) { + synchronized (numAllocatedContainers) { + if (numAllocatedContainers.get() == numTotalContainers) { + // This may happen, see the following explanation: + // + // allocation progress AM request RM respond + // 1 5 0 + // 2 5 2 + // 3 3 1 + // 4 2 3 + // 5 0 2 + // 6 0 0 + // AM request: num of containers AM request in allocation request + // RM respond: num of newly allocated containers RM tells AM + + // The above table shows AM request for 5 containers, but RM may + // give it 2+1+3+2=8 containers. + // + // We know previously AM only tells RM the num of containers it + // needs once in the first allocation request. But after this + // patch, since we have called amRMClient.removeContainerRequest, + // AM will become to tell RM how many containers each time it + // allocates like the table above. And because AM calculates how + // many containers it needs to make an allocation request without + // counting in the num of containers which RM will give to it + // right away in the allocation response, the last few allocation + // requests may get more allocated containers than expected, we + // should break here if it happens. + LOG.debug("APP_MASTER gets more containers from RM !"); + break; + } else { + numAllocatedContainers.addAndGet(1); + } + } + List> matchingRequests = + amRMClient.getMatchingRequests(allocatedContainer.getPriority(), + ResourceRequest.ANY, allocatedContainer.getResource()); + if (!matchingRequests.isEmpty()) { + for (ContainerRequest containerRequest : matchingRequests.get(0)) { + // When RM fail over, AM will send its entire outstanding requests + // to RM, AM should remove allocated containers from + // AMRMClientImpl#remoteRequestsTable to avoid request for more + // containers than it needs. + amRMClient.removeContainerRequest(containerRequest); + break; + } + } LOG.info("Launching shell command on a new container." + ", containerId=" + allocatedContainer.getId() + ", containerNode=" + allocatedContainer.getNodeId().getHost() diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 967d172..bdc2b14 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -68,14 +68,14 @@ @Before public void setup() throws Exception { - setupInternal(NUM_NMS); + setupInternal(1, NUM_NMS, new YarnConfiguration()); } - protected void setupInternal(int numNodeManager) throws Exception { + protected void setupInternal(int numResourceManager, int numNodeManager, + Configuration conf) throws Exception { LOG.info("Starting up YARN cluster"); - conf = new YarnConfiguration(); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128); conf.set("yarn.log.dir", "target"); conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); @@ -84,10 +84,9 @@ protected void setupInternal(int numNodeManager) throws Exception { if (yarnCluster == null) { yarnCluster = - new MiniYARNCluster(TestDistributedShell.class.getSimpleName(), 1, - numNodeManager, 1, 1); + new MiniYARNCluster(TestDistributedShell.class.getSimpleName(), + numResourceManager, numNodeManager, 1, 1); yarnCluster.init(conf); - yarnCluster.start(); waitForNMsToRegister(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShellWithNodeLabels.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShellWithNodeLabels.java index b62b091..390eff8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShellWithNodeLabels.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShellWithNodeLabels.java @@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; @@ -44,7 +45,7 @@ @Before public void setup() throws Exception { distShellTest = new TestDistributedShell(); - distShellTest.setupInternal(NUM_NMS); + distShellTest.setupInternal(1, NUM_NMS, new YarnConfiguration()); } private void initializeNodeLabels() throws IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShellWithRMHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShellWithRMHA.java new file mode 100644 index 0000000..fad65f4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShellWithRMHA.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.applications.distributedshell; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.ClientBaseWithFixes; +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; +import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.AdminService; +import org.apache.hadoop.yarn.server.resourcemanager.HATestUtil; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.ConcurrentMap; + +import static org.junit.Assert.assertEquals; + +public class TestDistributedShellWithRMHA extends ClientBaseWithFixes { + + private static final Log LOG = + LogFactory.getLog(TestDistributedShellWithRMHA.class); + + MiniYARNCluster yarnCluster; + + private static final String RM1_NODE_ID = "rm1"; + private static final int RM1_PORT_BASE = 10000; + private static final String RM2_NODE_ID = "rm2"; + private static final int RM2_PORT_BASE = 20000; + + @Before + public void setup() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.RM_CLUSTER_ID, "yarn-test-cluster"); + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, true); + conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); + conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID); + conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort); + conf.set(YarnConfiguration.RM_STORE, ZKRMStateStore.class.getName()); + conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, 2000); + conf.setLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS, 100L); + HATestUtil.setRpcAddressForRM(RM1_NODE_ID, RM1_PORT_BASE, conf); + HATestUtil.setRpcAddressForRM(RM2_NODE_ID, RM2_PORT_BASE, conf); + conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true); + conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC, true); + + TestDistributedShell distShellTest = new TestDistributedShell(); + distShellTest.setupInternal(2, 1, conf); + yarnCluster = distShellTest.yarnCluster; + } + + @Test + public void testAllocatesExpectedNumOfContainersWhenRMFailover() throws + Exception { + + String[] args = { + "--jar", + TestDistributedShell.APPMASTER_JAR, + "--num_containers", + "10", + "--shell_command", + "sleep 3", + "--master_memory", + "128", + "--container_memory", + "128" + }; + + LOG.info("Initializing DS Client"); + final Client client = + new Client(new Configuration(yarnCluster.getConfig())); + boolean initSuccess = client.init(args); + // Start Failover thread + Thread t = new Thread(new FailoverThread()); + t.start(); + + Assert.assertTrue(initSuccess); + LOG.info("Running DS Client"); + boolean result = client.run(); + Assert.assertTrue(result); + + // Check number of completed containers on NM, should be 11 containers + // totally including 1 am container and 10 shell containers + Assert.assertEquals(11, yarnCluster.getNodeManagerMetrics(0) + .getCompletedContainers()); + } + + private void failover() + throws IOException, InterruptedException, YarnException { + int activeRMIndex = yarnCluster.getActiveRMIndex(); + yarnCluster.stopResourceManager(activeRMIndex); + assertEquals("Failover failed", (activeRMIndex + 1) % 2, + yarnCluster.getActiveRMIndex()); + } + + /** + * Monitor completed containers on NM + */ + class FailoverThread implements Runnable { + // The interval of milliseconds of sampling (500ms) + final static int SAMPLING_INTERVAL_MS = 500; + + @Override + public void run() { + boolean containerComplete = false; + ConcurrentMap containers = yarnCluster + .getNodeManager(0).getNMContext().getContainers(); + while (true) { + for (Container c : containers.values()) { + LOG.info(c.getContainerId() + " " + c.getContainerState()); + // Fail over when one of the container completes. + if (c.getContainerState() == ContainerState.DONE) { + containerComplete = true; + break; + } + } + if (containerComplete) { + break; + } + try { + Thread.sleep(SAMPLING_INTERVAL_MS); + } catch (InterruptedException e) { + e.printStackTrace(); + break; + } + } + try { + failover(); + } catch (Exception e) { + e.printStackTrace(); + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index d659a65..2a18d67 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl; +import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; @@ -595,6 +596,10 @@ protected synchronized void serviceStop() throws Exception { protected void doSecureLogin() throws IOException { // Don't try to login using keytab in the testcase. } + + public NodeManagerMetrics getNodeManagerMetrics() { + return this.metrics; + } } private class ShortCircuitedNodeManager extends CustomNodeManager { @@ -750,4 +755,8 @@ protected void doSecureLogin() throws IOException { public int getNumOfResourceManager() { return this.resourceManagers.length; } + + public NodeManagerMetrics getNodeManagerMetrics(int i) { + return ((CustomNodeManager) this.getNodeManager(i)).getNodeManagerMetrics(); + } }