diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java index 0ad2957..433b6d3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Set; @@ -52,9 +53,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.Assert; @@ -774,4 +778,103 @@ public RMNodeLabelsManager createNodeLabelManager() { rm1.close(); } + + @Test (timeout = 120000) + public void testContainerAllocationWithContainerIdLeap() throws Exception { + CapacitySchedulerConfiguration csc = new CapacitySchedulerConfiguration(); + csc.setFloat( + CapacitySchedulerConfiguration.MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, + 1.0f); + csc.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512); + + // inject node label manager + MockRM rm1 = new MockRM(csc) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 8192); + MockNM nm2 = rm1.registerNode("h2:1234", 8192); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); + + // launch an app to queue a1 (label = x), and check all container will + // be allocated in h1 + RMApp app1 = rm1.submitApp(3072, "app", "user", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + + // request a container. + am1.allocate("*", 3072, 3, new ArrayList()); + + // do a bunch of node heartbeat + for (int i = 0; i < 10; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } + + // Now we should have 4 containers allocated, 2 on h1 and 2 on h2 + FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( + am1.getApplicationAttemptId()); + Assert.assertEquals(4, schedulerApp1.getLiveContainers().size()); + am1.allocate(null, null); + + // launch an app to queue a1 (label = x), and check all container will + // be allocated in h1 + RMApp app2 = rm1.submitApp(1024, "app", "user", null, "default"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1); + + am2.allocate("*", 2560, 1, new ArrayList()); + + // Now on nm1, it has 8G - 3G * 2 - 1G = 1G resource available, + // not enough for the pending resource request, so + // one container should be reserved on nm1 for app2 + for (int i = 0; i < 10; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + } + FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt( + ApplicationAttemptId.newInstance(app2.getApplicationId(), 1)); + Assert.assertNotNull( + cs.getSchedulerNode(nm1.getNodeId()).getReservedContainer()); + Assert.assertEquals(schedulerApp2.getApplicationAttemptId(), + cs.getSchedulerNode(nm1.getNodeId()).getReservedContainer() + .getApplicationAttemptId()); + + // do a bunch of node heartbeat + for (int i = 0; i < 10; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } + + // Nothing should be allocated to app2 + Assert.assertEquals(1, schedulerApp2.getLiveContainers().size()); + + // app1 release one of the container, and check new container id + am1.allocate(null, Arrays.asList(ContainerId + .newContainerId(schedulerApp1.getApplicationAttemptId(), 3L))); + + // do an allocation on node2 + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + + // Now we should have am container allocated for app2 + Assert.assertEquals(2, schedulerApp2.getLiveContainers().size()); + + // And container id should be 2 + int maxContainerId = 0; + for (RMContainer sa : schedulerApp2.getLiveContainers()) { + if (sa.getContainerId().getId() > maxContainerId) { + maxContainerId = sa.getContainerId().getId(); + } + } + + // Max container Id of app2 should be 2 + Assert.assertEquals(2, maxContainerId); + + rm1.close(); + } }