diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml index 09a56ea..5efe8f5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml @@ -121,6 +121,20 @@ mockito-all test + + + org.apache.hadoop + hadoop-yarn-server-resourcemanager + test-jar + test + + + + org.apache.hadoop + hadoop-yarn-common + test-jar + test + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index b28c0c9..0c42ec2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -30,6 +30,7 @@ import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -187,7 +188,7 @@ // Handle to communicate with the Resource Manager @SuppressWarnings("rawtypes") - private AMRMClientAsync amRMClient; + private AMRMClientAsync amRMClient; // In both secure and non-secure modes, this points to the job-submitter. @VisibleForTesting @@ -227,7 +228,7 @@ // Allocated container count so that we know how many containers has the RM // allocated to us @VisibleForTesting - protected AtomicInteger numAllocatedContainers = new AtomicInteger(); + protected final AtomicInteger numAllocatedContainers = new AtomicInteger(); // Count of failed containers private AtomicInteger numFailedContainers = new AtomicInteger(); // Count of containers already requested from the RM @@ -801,8 +802,55 @@ public void onContainersCompleted(List completedContainers) { public void onContainersAllocated(List allocatedContainers) { LOG.info("Got response from RM for container ask, allocatedCnt=" + allocatedContainers.size()); - numAllocatedContainers.addAndGet(allocatedContainers.size()); + AMRMClientAsync amRMClient = ApplicationMaster.this + .amRMClient; for (Container allocatedContainer : allocatedContainers) { + synchronized (numAllocatedContainers) { + if (numAllocatedContainers.get() == numTotalContainers) { + // This may happen, see the following explanation: + // + // allocation progress AM request RM respond + // 1 5 0 + // 2 5 2 + // 3 3 1 + // 4 2 3 + // 5 0 2 + // 6 0 0 + // AM request: num of containers AM request in allocation request + // RM respond: num of newly allocated containers RM tells AM + + // The above table shows AM request for 5 containers, but RM may + // give it 2+1+3+2=8 containers. + // + // We know previously AM only tells RM the num of containers it + // needs once in the first allocation request. But after this + // patch, since we have called amRMClient.removeContainerRequest, + // AM will become to tell RM how many containers each time it + // allocates like the table above. And because AM calculates how + // many containers it needs to make an allocation request without + // counting in the num of containers which RM will give to it + // right away in the allocation response, the last few allocation + // requests may get more allocated containers than expected, we + // should break here if it happens. + LOG.debug("APP_MASTER gets more containers from RM !"); + break; + } else { + numAllocatedContainers.addAndGet(1); + } + } + List> matchingRequests = + amRMClient.getMatchingRequests(allocatedContainer.getPriority(), + ResourceRequest.ANY, allocatedContainer.getResource()); + if (matchingRequests != null && !matchingRequests.isEmpty()) { + for (ContainerRequest containerRequest : matchingRequests.get(0)) { + // When RM fail over, AM will send its entire outstanding requests + // to RM, AM should remove allocated containers from + // AMRMClientImpl#remoteRequestsTable to avoid request for more + // containers than it needs. + amRMClient.removeContainerRequest(containerRequest); + break; + } + } LOG.info("Launching shell command on a new container." + ", containerId=" + allocatedContainer.getId() + ", containerNode=" + allocatedContainer.getNodeId().getHost() @@ -1174,7 +1222,7 @@ RMCallbackHandler getRMCallbackHandler() { } @VisibleForTesting - void setAmRMClient(AMRMClientAsync client) { + void setAmRMClient(AMRMClientAsync client) { this.amRMClient = client; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java index 0fed14d..cb036e9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java @@ -85,6 +85,11 @@ public void testDSAppMasterAllocateHandler() throws Exception { master.setNumTotalContainers(targetContainers); Mockito.doNothing().when(mockClient) .addContainerRequest(Matchers.any(AMRMClient.ContainerRequest.class)); + Mockito.when(mockClient.getMatchingRequests(Matchers.any(Priority.class), + Matchers.any(String.class), Matchers.any(Resource.class))) + .thenReturn(null); + Mockito.doNothing().when(mockClient).removeContainerRequest( + Matchers.any(AMRMClient.ContainerRequest.class)); ApplicationMaster.RMCallbackHandler handler = master.getRMCallbackHandler(); @@ -98,7 +103,6 @@ public void testDSAppMasterAllocateHandler() throws Exception { handler.onContainersAllocated(containers); Assert.assertEquals("Wrong container allocation count", 1, master.getAllocatedContainers()); - Mockito.verifyZeroInteractions(mockClient); Assert.assertEquals("Incorrect number of threads launched", 1, master.threadsLaunched); @@ -111,10 +115,10 @@ public void testDSAppMasterAllocateHandler() throws Exception { ContainerId id4 = BuilderUtils.newContainerId(1, 1, 1, 4); containers.add(generateContainer(id4)); handler.onContainersAllocated(containers); - Assert.assertEquals("Wrong final container allocation count", 4, + Assert.assertEquals("Wrong final container allocation count", 2, master.getAllocatedContainers()); - Assert.assertEquals("Incorrect number of threads launched", 4, + Assert.assertEquals("Incorrect number of threads launched", 2, master.threadsLaunched); // make sure we handle completion events correctly diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 967d172..9f749f4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -68,14 +68,14 @@ @Before public void setup() throws Exception { - setupInternal(NUM_NMS); + setupInternal(1, NUM_NMS, new YarnConfiguration()); } - protected void setupInternal(int numNodeManager) throws Exception { - + protected void setupInternal(int numResourceManager, int numNodeManager, + YarnConfiguration conf) throws Exception { LOG.info("Starting up YARN cluster"); - - conf = new YarnConfiguration(); + this.conf = conf; + conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, false); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128); conf.set("yarn.log.dir", "target"); conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); @@ -84,10 +84,9 @@ protected void setupInternal(int numNodeManager) throws Exception { if (yarnCluster == null) { yarnCluster = - new MiniYARNCluster(TestDistributedShell.class.getSimpleName(), 1, - numNodeManager, 1, 1); + new MiniYARNCluster(TestDistributedShell.class.getSimpleName(), + numResourceManager, numNodeManager, 1, 1); yarnCluster.init(conf); - yarnCluster.start(); waitForNMsToRegister(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShellWithNodeLabels.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShellWithNodeLabels.java index b62b091..778f9c1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShellWithNodeLabels.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShellWithNodeLabels.java @@ -25,9 +25,11 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -44,7 +46,7 @@ @Before public void setup() throws Exception { distShellTest = new TestDistributedShell(); - distShellTest.setupInternal(NUM_NMS); + distShellTest.setupInternal(1, NUM_NMS, new YarnConfiguration()); } private void initializeNodeLabels() throws IOException { @@ -162,4 +164,9 @@ public void run() { return maxRunningContainersOnNMs; } } + + @After + public void tearDown() throws IOException { + distShellTest.tearDown(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShellWithRMHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShellWithRMHA.java new file mode 100644 index 0000000..8011018 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShellWithRMHA.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.applications.distributedshell; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.ClientBaseWithFixes; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.HATestUtil; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.ConcurrentMap; + +import static org.junit.Assert.assertEquals; + +public class TestDistributedShellWithRMHA extends ClientBaseWithFixes { + + private static final Log LOG = + LogFactory.getLog(TestDistributedShellWithRMHA.class); + + MiniYARNCluster yarnCluster; + TestDistributedShell distShellTest; + + private static final String RM1_NODE_ID = "rm1"; + private static final int RM1_PORT_BASE = 10000; + private static final String RM2_NODE_ID = "rm2"; + private static final int RM2_PORT_BASE = 20000; + + @Before + public void setup() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.RM_CLUSTER_ID, "yarn-test-cluster"); + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, true); + conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); + conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID); + conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort); + conf.set(YarnConfiguration.RM_STORE, ZKRMStateStore.class.getName()); + conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, 2000); + conf.setLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS, 100L); + HATestUtil.setRpcAddressForRM(RM1_NODE_ID, RM1_PORT_BASE, conf); + HATestUtil.setRpcAddressForRM(RM2_NODE_ID, RM2_PORT_BASE, conf); + conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true); + conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC, true); + + distShellTest = new TestDistributedShell(); + distShellTest.setupInternal(2, 1, conf); + yarnCluster = distShellTest.yarnCluster; + } + + @Test + public void testAllocatesExpectedNumOfContainersWhenRMFailover() throws + Exception { + + String[] args = { + "--jar", + TestDistributedShell.APPMASTER_JAR, + "--num_containers", + "10", + "--shell_command", + "sleep 3", + "--master_memory", + "128", + "--container_memory", + "128" + }; + + LOG.info("Initializing DS Client"); + final Client client = + new Client(new Configuration(yarnCluster.getConfig())); + boolean initSuccess = client.init(args); + // Start Failover thread + Thread t = new Thread(new FailoverThread()); + t.start(); + + Assert.assertTrue(initSuccess); + LOG.info("Running DS Client"); + boolean result = client.run(); + Assert.assertTrue(result); + yarnCluster.getNMDispatcher(0).await(); + // Check number of completed containers on NM, should be 11 containers + // totally including 1 am container and 10 shell containers + Assert.assertEquals(11, yarnCluster.getNodeManagerMetrics(0) + .getCompletedContainers()); + } + + private void failover() + throws IOException, InterruptedException, YarnException { + int activeRMIndex = yarnCluster.getActiveRMIndex(); + yarnCluster.stopResourceManager(activeRMIndex); + assertEquals("Failover failed", (activeRMIndex + 1) % 2, + yarnCluster.getActiveRMIndex()); + } + + class FailoverThread implements Runnable { + // The interval of milliseconds of sampling (500ms) + final static int SAMPLING_INTERVAL_MS = 500; + + @Override + public void run() { + boolean containerComplete = false; + ConcurrentMap containers = yarnCluster + .getNodeManager(0).getNMContext().getContainers(); + while (true) { + for (Container c : containers.values()) { + LOG.info(c.getContainerId() + " " + c.getContainerState()); + // Fail over when one of the container completes. + if (c.getContainerState() == ContainerState.DONE) { + containerComplete = true; + break; + } + } + if (containerComplete) { + break; + } + try { + Thread.sleep(SAMPLING_INTERVAL_MS); + } catch (InterruptedException e) { + e.printStackTrace(); + break; + } + } + try { + failover(); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + @After + public void tearDown() throws Exception { + distShellTest.tearDown(); + super.tearDown(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 185ba12..9975ac0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -262,7 +262,7 @@ protected void serviceInit(Configuration conf) throws Exception { addService(del); // NodeManager level dispatcher - this.dispatcher = new AsyncDispatcher(); + this.dispatcher = createAsyncDispatcher(); dirsHandler = new LocalDirsHandlerService(metrics); nodeHealthChecker = @@ -600,4 +600,8 @@ public static void main(String[] args) throws IOException { public NodeStatusUpdater getNodeStatusUpdater() { return nodeStatusUpdater; } + + protected AsyncDispatcher createAsyncDispatcher() { + return new AsyncDispatcher(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index d659a65..68017c7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -43,7 +43,9 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -62,6 +64,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl; +import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; @@ -591,10 +594,25 @@ protected synchronized void serviceStop() throws Exception { } private class CustomNodeManager extends NodeManager { + private DrainDispatcher dispatcher; + @Override protected void doSecureLogin() throws IOException { // Don't try to login using keytab in the testcase. } + + public NodeManagerMetrics getNodeManagerMetrics() { + return this.metrics; + } + + public AsyncDispatcher createAsyncDispatcher() { + dispatcher = new DrainDispatcher(); + return dispatcher; + } + + public DrainDispatcher getNMDispatcher() { + return this.dispatcher; + } } private class ShortCircuitedNodeManager extends CustomNodeManager { @@ -750,4 +768,12 @@ protected void doSecureLogin() throws IOException { public int getNumOfResourceManager() { return this.resourceManagers.length; } + + public NodeManagerMetrics getNodeManagerMetrics(int i) { + return ((CustomNodeManager) this.getNodeManager(i)).getNodeManagerMetrics(); + } + + public DrainDispatcher getNMDispatcher(int i) { + return ((CustomNodeManager) this.getNodeManager(i)).getNMDispatcher(); + } }