diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java index 2045eb63092..b2140db86b6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java @@ -42,18 +42,18 @@ public DrainDispatcher(BlockingQueue eventQueue) { * Wait till event thread enters WAITING state (i.e. waiting for new events). */ public void waitForEventThreadToWait() { - while (!isEventThreadWaiting()) { + do { Thread.yield(); - } + } while (!isEventThreadWaiting()); } /** * Busy loop waiting for all queued events to drain. */ public void await() { - while (!isDrained()) { + do { Thread.yield(); - } + } while (!isDrained()); } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java index c1b1c52edef..64436f83337 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java @@ -24,6 +24,11 @@ import java.util.Collections; import java.util.List; +import java.util.concurrent.TimeoutException; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; +import org.junit.After; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; @@ -75,6 +80,7 @@ private YarnConfiguration conf; + private MockRM rm1; RMNodeLabelsManager mgr; class MyScheduler extends CapacityScheduler { @@ -109,6 +115,16 @@ public void setUp() throws Exception { ResourceScheduler.class); mgr = new NullRMNodeLabelsManager(); mgr.init(conf); + rm1 = null; + } + + @After + public void tearDown() throws Exception { + if (rm1 != null) { + rm1.close(); + rm1.waitForServiceToStop(1000); + rm1 = null; + } } @Test @@ -117,7 +133,7 @@ public void testSimpleIncreaseContainer() throws Exception { * Application has a container running, and the node has enough available * resource. Add a increase request to see if container will be increased */ - MockRM rm1 = new MockRM() { + rm1 = new MockRM() { @Override public RMNodeLabelsManager createNodeLabelManager() { return mgr; @@ -167,8 +183,6 @@ public RMNodeLabelsManager createNodeLabelManager() { verifyContainerIncreased(am1.allocate(null, null), containerId1, 3 * GB); verifyAvailableResourceOfSchedulerNode(rm1, nm1.getNodeId(), 17 * GB); - - rm1.close(); } @Test @@ -177,8 +191,8 @@ public void testSimpleDecreaseContainer() throws Exception { * Application has a container running, try to decrease the container and * check queue's usage and container resource will be updated. */ - final DrainDispatcher dispatcher = new DrainDispatcher(); - MockRM rm1 = new MockRM() { + DrainDispatcher dispatcher = new DrainDispatcher(); + rm1 = new MockRM() { @Override public RMNodeLabelsManager createNodeLabelManager() { return mgr; @@ -206,7 +220,7 @@ protected Dispatcher createDispatcher() { FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp( rm1, app1.getApplicationId()); - checkUsedResource(rm1, "default", 3 * GB, null); + checkUsedResource(dispatcher, rm1, "default", 3 * GB, null); Assert.assertEquals(3 * GB, app.getAppAttemptResourceUsage().getUsed().getMemorySize()); @@ -224,9 +238,7 @@ protected Dispatcher createDispatcher() { verifyContainerDecreased(response, containerId1, 1 * GB); // Wait for scheduler to finish processing kill events.. - dispatcher.waitForEventThreadToWait(); - - checkUsedResource(rm1, "default", 1 * GB, null); + checkUsedResource(dispatcher, rm1, "default", 1 * GB, null); Assert.assertEquals(1 * GB, app.getAppAttemptResourceUsage().getUsed().getMemorySize()); @@ -243,8 +255,6 @@ protected Dispatcher createDispatcher() { } } Assert.assertTrue(rmNodeReceivedDecreaseContainer); - - rm1.close(); } @Test @@ -255,7 +265,7 @@ public void testSimpleIncreaseRequestReservation() throws Exception { * Check resource usage after container reserved, finish a container, the * reserved container should be allocated. */ - MockRM rm1 = new MockRM() { + rm1 = new MockRM() { @Override public RMNodeLabelsManager createNodeLabelManager() { return mgr; @@ -297,7 +307,6 @@ public RMNodeLabelsManager createNodeLabelManager() { ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); sentRMContainerLaunched(rm1, containerId1); - // am1 asks to change its AM container from 1GB to 3GB am1.sendContainerResizingRequest(Arrays.asList( UpdateContainerRequest @@ -313,7 +322,7 @@ public RMNodeLabelsManager createNodeLabelManager() { CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); - + /* Check reservation statuses */ // Increase request should be reserved Assert.assertFalse(app.getReservedContainers().isEmpty()); @@ -337,7 +346,7 @@ public RMNodeLabelsManager createNodeLabelManager() { // Now container should be increased verifyContainerIncreased(am1.allocate(null, null), containerId1, 7 * GB); - + /* Check statuses after reservation satisfied */ // Increase request should be unreserved Assert.assertTrue(app.getReservedContainers().isEmpty()); @@ -355,8 +364,6 @@ public RMNodeLabelsManager createNodeLabelManager() { Assert.assertEquals(7 * GB, app.getAppAttemptResourceUsage().getUsed().getMemorySize()); verifyAvailableResourceOfSchedulerNode(rm1, nm1.getNodeId(), 1 * GB); - - rm1.close(); } @Test @@ -365,7 +372,7 @@ public void testIncreaseRequestWithNoHeadroomLeft() throws Exception { * Application has two containers running, try to increase one of them, the * requested amount exceeds user's headroom for the queue. */ - MockRM rm1 = new MockRM() { + rm1 = new MockRM() { @Override public RMNodeLabelsManager createNodeLabelManager() { return mgr; @@ -435,7 +442,6 @@ public RMNodeLabelsManager createNodeLabelManager() { app.getAppAttemptResourceUsage().getUsed().getMemorySize()); Assert.assertEquals(0 * GB, app.getAppAttemptResourceUsage().getReserved().getMemorySize()); - rm1.close(); } @Test @@ -447,7 +453,7 @@ public void testExcessiveReservationWhenCancelIncreaseRequest() * Check resource usage after container reserved, finish a container & * cancel the increase request, reservation should be cancelled */ - MockRM rm1 = new MockRM() { + rm1 = new MockRM() { @Override public RMNodeLabelsManager createNodeLabelManager() { return mgr; @@ -504,7 +510,7 @@ public RMNodeLabelsManager createNodeLabelManager() { CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); - + /* Check reservation statuses */ // Increase request should be reserved Assert.assertFalse(app.getReservedContainers().isEmpty()); @@ -534,7 +540,7 @@ public RMNodeLabelsManager createNodeLabelManager() { Resources.createResource(1 * GB), null))); // Trigger a node heartbeat.. cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); - + /* Check statuses after reservation satisfied */ // Increase request should be unreserved Assert.assertTrue(app.getReservedContainers().isEmpty()); @@ -551,8 +557,6 @@ public RMNodeLabelsManager createNodeLabelManager() { app.getAppAttemptResourceUsage().getReserved().getMemorySize()); Assert.assertEquals(1 * GB, app.getAppAttemptResourceUsage().getUsed().getMemorySize()); - - rm1.close(); } @Test @@ -563,8 +567,8 @@ public void testExcessiveReservationWhenDecreaseSameContainer() * the increase request reserved, it decreases the reserved container, * container should be decreased and reservation will be cancelled */ - final DrainDispatcher dispatcher = new DrainDispatcher(); - MockRM rm1 = new MockRM() { + DrainDispatcher dispatcher = new DrainDispatcher(); + rm1 = new MockRM() { @Override public RMNodeLabelsManager createNodeLabelManager() { return mgr; @@ -619,7 +623,7 @@ protected Dispatcher createDispatcher() { ContainerUpdateType.INCREASE_RESOURCE, Resources.createResource(8 * GB), null))); - checkPendingResource(rm1, "default", 6 * GB, null); + checkPendingResource(dispatcher, rm1, "default", 6 * GB, null); Assert.assertEquals(6 * GB, app.getAppAttemptResourceUsage().getPending().getMemorySize()); @@ -627,17 +631,17 @@ protected Dispatcher createDispatcher() { CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); - + /* Check reservation statuses */ // Increase request should be reserved Assert.assertFalse(app.getReservedContainers().isEmpty()); Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); // Pending resource will not be changed since it's not satisfied - checkPendingResource(rm1, "default", 6 * GB, null); + checkPendingResource(dispatcher, rm1, "default", 6 * GB, null); Assert.assertEquals(6 * GB, app.getAppAttemptResourceUsage().getPending().getMemorySize()); // Queue/user/application's usage will be updated - checkUsedResource(rm1, "default", 10 * GB, null); + checkUsedResource(dispatcher, rm1, "default", 10 * GB, null); Assert.assertEquals(10 * GB, ((LeafQueue) cs.getQueue("default")) .getUser("user").getUsed().getMemorySize()); Assert.assertEquals(4 * GB, @@ -657,25 +661,20 @@ protected Dispatcher createDispatcher() { // Trigger a node heartbeat.. cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); - dispatcher.waitForEventThreadToWait(); - /* Check statuses after reservation satisfied */ - // Increase request should be unreserved - Assert.assertTrue(app.getReservedContainers().isEmpty()); - Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); + // Check statuses after reservation satisfied + waitForStatusesPostReservation(dispatcher, app, cs, nm1); // Pending resource will be changed since it's satisfied - checkPendingResource(rm1, "default", 0 * GB, null); + checkPendingResource(dispatcher, rm1, "default", 0 * GB, null); Assert.assertEquals(0 * GB, app.getAppAttemptResourceUsage().getPending().getMemorySize()); // Queue/user/application's usage will be updated - checkUsedResource(rm1, "default", 1 * GB, null); + checkUsedResource(dispatcher, rm1, "default", 1 * GB, null); Assert.assertEquals(1 * GB, ((LeafQueue) cs.getQueue("default")) .getUser("user").getUsed().getMemorySize()); Assert.assertEquals(0 * GB, app.getAppAttemptResourceUsage().getReserved().getMemorySize()); Assert.assertEquals(1 * GB, app.getAppAttemptResourceUsage().getUsed().getMemorySize()); - - rm1.close(); } @Test @@ -688,8 +687,8 @@ public void testIncreaseContainerUnreservedWhenContainerCompleted() * So increase container request will be reserved. When app releases * container2, reserved part should be released as well. */ - final DrainDispatcher dispatcher = new DrainDispatcher(); - MockRM rm1 = new MockRM() { + DrainDispatcher dispatcher = new DrainDispatcher(); + rm1 = new MockRM() { @Override public RMNodeLabelsManager createNodeLabelManager() { return mgr; @@ -741,7 +740,7 @@ protected Dispatcher createDispatcher() { ContainerUpdateType.INCREASE_RESOURCE, Resources.createResource(8 * GB), null))); - checkPendingResource(rm1, "default", 6 * GB, null); + checkPendingResource(dispatcher, rm1, "default", 6 * GB, null); Assert.assertEquals(6 * GB, app.getAppAttemptResourceUsage().getPending().getMemorySize()); @@ -755,11 +754,11 @@ protected Dispatcher createDispatcher() { Assert.assertFalse(app.getReservedContainers().isEmpty()); Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); // Pending resource will not be changed since it's not satisfied - checkPendingResource(rm1, "default", 6 * GB, null); + checkPendingResource(dispatcher, rm1, "default", 6 * GB, null); Assert.assertEquals(6 * GB, app.getAppAttemptResourceUsage().getPending().getMemorySize()); // Queue/user/application's usage will be updated - checkUsedResource(rm1, "default", 9 * GB, null); + checkUsedResource(dispatcher, rm1, "default", 9 * GB, null); Assert.assertEquals(9 * GB, ((LeafQueue) cs.getQueue("default")) .getUser("user").getUsed().getMemorySize()); Assert.assertEquals(3 * GB, @@ -773,27 +772,20 @@ protected Dispatcher createDispatcher() { cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); am1.allocate(null, null); - // Wait for scheduler to process all events. - dispatcher.waitForEventThreadToWait(); - - /* Check statuses after reservation satisfied */ - // Increase request should be unreserved - Assert.assertTrue(app.getReservedContainers().isEmpty()); - Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); + // Check statuses after reservation satisfied + waitForStatusesPostReservation(dispatcher, app, cs, nm1); // Pending resource will be changed since it's satisfied - checkPendingResource(rm1, "default", 0 * GB, null); + checkPendingResource(dispatcher, rm1, "default", 0 * GB, null); Assert.assertEquals(0 * GB, app.getAppAttemptResourceUsage().getPending().getMemorySize()); // Queue/user/application's usage will be updated - checkUsedResource(rm1, "default", 1 * GB, null); + checkUsedResource(dispatcher, rm1, "default", 1 * GB, null); Assert.assertEquals(1 * GB, ((LeafQueue) cs.getQueue("default")) .getUser("user").getUsed().getMemorySize()); Assert.assertEquals(0 * GB, app.getAppAttemptResourceUsage().getReserved().getMemorySize()); Assert.assertEquals(1 * GB, app.getAppAttemptResourceUsage().getUsed().getMemorySize()); - - rm1.close(); } @Test @@ -806,7 +798,7 @@ public void testIncreaseContainerUnreservedWhenApplicationCompleted() * Similar to testIncreaseContainerUnreservedWhenContainerCompleted, when * application finishes, reserved increase container should be cancelled */ - MockRM rm1 = new MockRM(conf) { + rm1 = new MockRM(conf) { @Override public RMNodeLabelsManager createNodeLabelManager() { return mgr; @@ -903,13 +895,11 @@ public RMNodeLabelsManager createNodeLabelManager() { app.getAppAttemptResourceUsage().getReserved().getMemorySize()); Assert.assertEquals(0 * GB, app.getAppAttemptResourceUsage().getUsed().getMemorySize()); - - rm1.close(); } private void allocateAndLaunchContainers(MockAM am, MockNM nm, MockRM rm, int nContainer, int mem, int priority, int startContainerId) - throws Exception { + throws Exception { am.allocate(Arrays .asList(ResourceRequest.newInstance(Priority.newInstance(priority), "*", Resources.createResource(mem), nContainer)), @@ -939,7 +929,7 @@ public void testOrderOfIncreaseContainerRequestAllocation() * be increase sorted by priority, if priority is same, smaller containerId * container will get preferred */ - MockRM rm1 = new MockRM() { + rm1 = new MockRM() { @Override public RMNodeLabelsManager createNodeLabelManager() { return mgr; @@ -1018,15 +1008,13 @@ public RMNodeLabelsManager createNodeLabelManager() { app.getAppAttemptResourceUsage().getReserved().getMemorySize()); Assert.assertEquals(10 * GB, app.getAppAttemptResourceUsage().getUsed().getMemorySize()); - - rm1.close(); } @Test (timeout = 60000) public void testDecreaseContainerWillNotDeadlockContainerAllocation() throws Exception { // create and start MockRM with our MyScheduler - MockRM rm = new MockRM() { + rm1 = new MockRM() { @Override public ResourceScheduler createScheduler() { CapacityScheduler cs = new MyScheduler(); @@ -1034,30 +1022,30 @@ public ResourceScheduler createScheduler() { return cs; } }; - rm.start(); + rm1.start(); // register a node - MockNM nm = rm.registerNode("h1:1234", 20 * GB); + MockNM nm = rm1.registerNode("h1:1234", 20 * GB); // submit an application -> app1 MockRMAppSubmissionData data = - MockRMAppSubmissionData.Builder.createWithMemory(3 * GB, rm) + MockRMAppSubmissionData.Builder.createWithMemory(3 * GB, rm1) .withAppName("app") .withUser("user") .withAcls(null) .withQueue("default") .withUnmanagedAM(false) .build(); - RMApp app1 = MockRMAppSubmitter.submit(rm, data); - MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm); + RMApp app1 = MockRMAppSubmitter.submit(rm1, data); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm); // making sure resource is allocated - checkUsedResource(rm, "default", 3 * GB, null); + checkUsedResource(rm1, "default", 3 * GB, null); FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp( - rm, app1.getApplicationId()); + rm1, app1.getApplicationId()); Assert.assertEquals(3 * GB, app.getAppAttemptResourceUsage().getUsed().getMemorySize()); // making sure container is launched ContainerId containerId1 = ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); - sentRMContainerLaunched(rm, containerId1); + sentRMContainerLaunched(rm1, containerId1); // submit allocation request for a new container am1.allocate(Collections.singletonList(ResourceRequest.newInstance( Priority.newInstance(1), "*", Resources.createResource(2 * GB), 1)), @@ -1073,28 +1061,75 @@ public ResourceScheduler createScheduler() { Resources.createResource(GB), null))); // verify that the containe resource is decreased verifyContainerDecreased(response, containerId1, GB); + } - rm.close(); + private void checkPendingResource(final MockRM rm, final String queueName, + final int memory, final String label) + throws TimeoutException, InterruptedException { + checkPendingResource(null, rm, queueName, memory, label); } - private void checkPendingResource(MockRM rm, String queueName, int memory, - String label) { - CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); - CSQueue queue = cs.getQueue(queueName); - Assert.assertEquals(memory, - queue.getQueueResourceUsage() - .getPending(label == null ? RMNodeLabelsManager.NO_LABEL : label) - .getMemorySize()); + private void checkPendingResource(final DrainDispatcher dispatcher, + final MockRM rm, final String queueName, + final int memory, final String label) + throws TimeoutException, InterruptedException { + waitForResourcesToBeUpdated(dispatcher, rm, queueName, memory, label, false); } - private void checkUsedResource(MockRM rm, String queueName, int memory, - String label) { - CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); - CSQueue queue = cs.getQueue(queueName); - Assert.assertEquals(memory, - queue.getQueueResourceUsage() - .getUsed(label == null ? RMNodeLabelsManager.NO_LABEL : label) - .getMemorySize()); + private void checkUsedResource(final MockRM rm, final String queueName, + final int memory, final String label) + throws TimeoutException, InterruptedException { + checkUsedResource(null, rm, queueName, memory, label); + } + + private void checkUsedResource(final DrainDispatcher dispatcher, + final MockRM rm, final String queueName, + final int memory, final String label) + throws TimeoutException, InterruptedException { + waitForResourcesToBeUpdated(dispatcher, rm, queueName, memory, label, true); + } + + private void waitForResourcesToBeUpdated(final DrainDispatcher dispatcher, + final MockRM rm, + final String queueName, + final int memory, final String label, + final boolean usedResources) + throws TimeoutException, InterruptedException { + if (dispatcher != null) { + dispatcher.waitForEventThreadToWait(); + } + final CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + final CSQueue queue = cs.getQueue(queueName); + final String labelParam = label == null + ? RMNodeLabelsManager.NO_LABEL : label; + GenericTestUtils.waitFor(() -> { + ResourceUsage rUsage = queue.getQueueResourceUsage(); + Resource resource = usedResources ? rUsage.getUsed(labelParam) + : rUsage.getPending(labelParam); + return memory == resource.getMemorySize(); + }, 1, 5000); + } + + + /** + * Check statuses after reservation satisfied. Increase request should be unreserved. + * + * @param app the schedulerApp + * @param cs the scheduler managing the nodes + * @param nm the NM mock object + * @throws TimeoutException + * @throws InterruptedException + */ + private void waitForStatusesPostReservation(final DrainDispatcher dispatcher, + final FiCaSchedulerApp app, + final CapacityScheduler cs, + final MockNM nm) + throws TimeoutException, InterruptedException { + dispatcher.waitForEventThreadToWait(); + /* Check statuses after reservation satisfied */ + // Increase request should be unreserved + GenericTestUtils.waitFor(() -> (app.getReservedContainers().isEmpty() + && cs.getNode(nm.getNodeId()).getReservedContainer() == null), 1, 5000); } private void verifyContainerIncreased(AllocateResponse response,