diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java index c5ba07222d4..e2325dce5b8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java @@ -49,18 +49,18 @@ public void serviceInit(Configuration conf) * Wait till event thread enters WAITING state (i.e. waiting for new events). */ public void waitForEventThreadToWait() { - while (!isEventThreadWaiting()) { + do { Thread.yield(); - } + } while (!isEventThreadWaiting()); } /** * Busy loop waiting for all queued events to drain. */ public void await() { - while (!isDrained()) { + do { Thread.yield(); - } + } while (!isDrained()); } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java index 541539d892f..86e480eeefd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java @@ -18,14 +18,17 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import com.google.common.base.Supplier; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.concurrent.TimeoutException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; @@ -33,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.ContainerUpdateType; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.api.records.UpdatedContainer; @@ -54,6 +58,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica @@ -62,6 +67,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -71,6 +77,7 @@ private final int GB = 1024; private YarnConfiguration conf; + private MockRM rm1; RMNodeLabelsManager mgr; @@ -105,6 +112,16 @@ public void setUp() throws Exception { ResourceScheduler.class); mgr = new NullRMNodeLabelsManager(); mgr.init(conf); + rm1 = null; + } + + @After + public void tearDown() throws Exception { + if (rm1 != null) { + rm1.close(); + rm1.waitForServiceToStop(1000); + rm1 = null; + } } @Test @@ -113,7 +130,7 @@ public void testSimpleIncreaseContainer() throws Exception { * Application has a container running, and the node has enough available * resource. Add a increase request to see if container will be increased */ - MockRM rm1 = new MockRM() { + rm1 = new MockRM() { @Override public RMNodeLabelsManager createNodeLabelManager() { return mgr; @@ -155,8 +172,6 @@ public RMNodeLabelsManager createNodeLabelManager() { verifyContainerIncreased(am1.allocate(null, null), containerId1, 3 * GB); verifyAvailableResourceOfSchedulerNode(rm1, nm1.getNodeId(), 17 * GB); - - rm1.close(); } @Test @@ -166,7 +181,7 @@ public void testSimpleDecreaseContainer() throws Exception { * check queue's usage and container resource will be updated. */ final DrainDispatcher dispatcher = new DrainDispatcher(); - MockRM rm1 = new MockRM() { + rm1 = new MockRM() { @Override public RMNodeLabelsManager createNodeLabelManager() { return mgr; @@ -186,7 +201,7 @@ protected Dispatcher createDispatcher() { FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp( rm1, app1.getApplicationId()); - checkUsedResource(rm1, "default", 3 * GB, null); + checkUsedResource(dispatcher, rm1, "default", 3 * GB, null); Assert.assertEquals(3 * GB, app.getAppAttemptResourceUsage().getUsed().getMemorySize()); @@ -204,9 +219,8 @@ protected Dispatcher createDispatcher() { verifyContainerDecreased(response, containerId1, 1 * GB); // Wait for scheduler to finish processing kill events.. - dispatcher.waitForEventThreadToWait(); + checkUsedResource(dispatcher, rm1, "default", 1 * GB, null); - checkUsedResource(rm1, "default", 1 * GB, null); Assert.assertEquals(1 * GB, app.getAppAttemptResourceUsage().getUsed().getMemorySize()); @@ -223,8 +237,6 @@ protected Dispatcher createDispatcher() { } } Assert.assertTrue(rmNodeReceivedDecreaseContainer); - - rm1.close(); } @Test @@ -235,7 +247,7 @@ public void testSimpleIncreaseRequestReservation() throws Exception { * Check resource usage after container reserved, finish a container, the * reserved container should be allocated. */ - MockRM rm1 = new MockRM() { + rm1 = new MockRM() { @Override public RMNodeLabelsManager createNodeLabelManager() { return mgr; @@ -327,8 +339,6 @@ public RMNodeLabelsManager createNodeLabelManager() { Assert.assertEquals(7 * GB, app.getAppAttemptResourceUsage().getUsed().getMemorySize()); verifyAvailableResourceOfSchedulerNode(rm1, nm1.getNodeId(), 1 * GB); - - rm1.close(); } @Test @@ -337,7 +347,7 @@ public void testIncreaseRequestWithNoHeadroomLeft() throws Exception { * Application has two containers running, try to increase one of them, the * requested amount exceeds user's headroom for the queue. */ - MockRM rm1 = new MockRM() { + rm1 = new MockRM() { @Override public RMNodeLabelsManager createNodeLabelManager() { return mgr; @@ -399,7 +409,6 @@ public RMNodeLabelsManager createNodeLabelManager() { app.getAppAttemptResourceUsage().getUsed().getMemorySize()); Assert.assertEquals(0 * GB, app.getAppAttemptResourceUsage().getReserved().getMemorySize()); - rm1.close(); } @Test @@ -411,7 +420,7 @@ public void testExcessiveReservationWhenCancelIncreaseRequest() * Check resource usage after container reserved, finish a container & * cancel the increase request, reservation should be cancelled */ - MockRM rm1 = new MockRM() { + rm1 = new MockRM() { @Override public RMNodeLabelsManager createNodeLabelManager() { return mgr; @@ -507,8 +516,6 @@ public RMNodeLabelsManager createNodeLabelManager() { app.getAppAttemptResourceUsage().getReserved().getMemorySize()); Assert.assertEquals(1 * GB, app.getAppAttemptResourceUsage().getUsed().getMemorySize()); - - rm1.close(); } @Test @@ -520,7 +527,7 @@ public void testExcessiveReservationWhenDecreaseSameContainer() * container should be decreased and reservation will be cancelled */ final DrainDispatcher dispatcher = new DrainDispatcher(); - MockRM rm1 = new MockRM() { + rm1 = new MockRM() { @Override public RMNodeLabelsManager createNodeLabelManager() { return mgr; @@ -567,7 +574,7 @@ protected Dispatcher createDispatcher() { ContainerUpdateType.INCREASE_RESOURCE, Resources.createResource(8 * GB), null))); - checkPendingResource(rm1, "default", 6 * GB, null); + checkPendingResource(dispatcher, rm1, "default", 6 * GB, null); Assert.assertEquals(6 * GB, app.getAppAttemptResourceUsage().getPending().getMemorySize()); @@ -581,11 +588,11 @@ protected Dispatcher createDispatcher() { Assert.assertFalse(app.getReservedContainers().isEmpty()); Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); // Pending resource will not be changed since it's not satisfied - checkPendingResource(rm1, "default", 6 * GB, null); + checkPendingResource(dispatcher, rm1, "default", 6 * GB, null); Assert.assertEquals(6 * GB, app.getAppAttemptResourceUsage().getPending().getMemorySize()); // Queue/user/application's usage will be updated - checkUsedResource(rm1, "default", 10 * GB, null); + checkUsedResource(dispatcher, rm1, "default", 10 * GB, null); Assert.assertEquals(10 * GB, ((LeafQueue) cs.getQueue("default")) .getUser("user").getUsed().getMemorySize()); Assert.assertEquals(4 * GB, @@ -605,25 +612,20 @@ protected Dispatcher createDispatcher() { // Trigger a node heartbeat.. cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); - dispatcher.waitForEventThreadToWait(); - /* Check statuses after reservation satisfied */ - // Increase request should be unreserved - Assert.assertTrue(app.getReservedContainers().isEmpty()); - Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); + // Check statuses after reservation satisfied + waitForStatusesPostReservation(dispatcher, app, cs, nm1); // Pending resource will be changed since it's satisfied - checkPendingResource(rm1, "default", 0 * GB, null); + checkPendingResource(dispatcher, rm1, "default", 0 * GB, null); Assert.assertEquals(0 * GB, app.getAppAttemptResourceUsage().getPending().getMemorySize()); // Queue/user/application's usage will be updated - checkUsedResource(rm1, "default", 1 * GB, null); + checkUsedResource(dispatcher, rm1, "default", 1 * GB, null); Assert.assertEquals(1 * GB, ((LeafQueue) cs.getQueue("default")) .getUser("user").getUsed().getMemorySize()); Assert.assertEquals(0 * GB, app.getAppAttemptResourceUsage().getReserved().getMemorySize()); Assert.assertEquals(1 * GB, app.getAppAttemptResourceUsage().getUsed().getMemorySize()); - - rm1.close(); } @Test @@ -637,7 +639,7 @@ public void testIncreaseContainerUnreservedWhenContainerCompleted() * container2, reserved part should be released as well. */ final DrainDispatcher dispatcher = new DrainDispatcher(); - MockRM rm1 = new MockRM() { + rm1 = new MockRM() { @Override public RMNodeLabelsManager createNodeLabelManager() { return mgr; @@ -655,10 +657,8 @@ protected Dispatcher createDispatcher() { // app1 -> a1 RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default"); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); - FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp( rm1, app1.getApplicationId()); - // Allocate two more containers am1.allocate( Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), "*", @@ -673,67 +673,53 @@ protected Dispatcher createDispatcher() { sentRMContainerLaunched(rm1, containerId2); rm1.waitForState(Arrays.asList(nm1, nm2), containerId2, RMContainerState.RUNNING); - // am1 asks to change its AM container from 2GB to 8GB am1.sendContainerResizingRequest(Arrays.asList( UpdateContainerRequest .newInstance(0, containerId2, ContainerUpdateType.INCREASE_RESOURCE, Resources.createResource(8 * GB), null))); - - checkPendingResource(rm1, "default", 6 * GB, null); + checkPendingResource(dispatcher, rm1, "default", 6 * GB, null); Assert.assertEquals(6 * GB, app.getAppAttemptResourceUsage().getPending().getMemorySize()); - // NM1 do 1 heartbeats CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); - /* Check reservation statuses */ // Increase request should be reserved Assert.assertFalse(app.getReservedContainers().isEmpty()); Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); // Pending resource will not be changed since it's not satisfied - checkPendingResource(rm1, "default", 6 * GB, null); + checkPendingResource(dispatcher, rm1, "default", 6 * GB, null); Assert.assertEquals(6 * GB, app.getAppAttemptResourceUsage().getPending().getMemorySize()); // Queue/user/application's usage will be updated - checkUsedResource(rm1, "default", 9 * GB, null); + checkUsedResource(dispatcher, rm1, "default", 9 * GB, null); Assert.assertEquals(9 * GB, ((LeafQueue) cs.getQueue("default")) .getUser("user").getUsed().getMemorySize()); Assert.assertEquals(3 * GB, app.getAppAttemptResourceUsage().getUsed().getMemorySize()); Assert.assertEquals(6 * GB, app.getAppAttemptResourceUsage().getReserved().getMemorySize()); - // Complete container2, container will be unreserved and completed am1.allocate(null, Arrays.asList(containerId2)); - cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); am1.allocate(null, null); - - // Wait for scheduler to process all events. - dispatcher.waitForEventThreadToWait(); - - /* Check statuses after reservation satisfied */ - // Increase request should be unreserved - Assert.assertTrue(app.getReservedContainers().isEmpty()); - Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); + // Check statuses after reservation satisfied + waitForStatusesPostReservation(dispatcher, app, cs, nm1); // Pending resource will be changed since it's satisfied - checkPendingResource(rm1, "default", 0 * GB, null); + checkPendingResource(dispatcher, rm1, "default", 0 * GB, null); Assert.assertEquals(0 * GB, app.getAppAttemptResourceUsage().getPending().getMemorySize()); // Queue/user/application's usage will be updated - checkUsedResource(rm1, "default", 1 * GB, null); + checkUsedResource(dispatcher, rm1, "default", 1 * GB, null); Assert.assertEquals(1 * GB, ((LeafQueue) cs.getQueue("default")) .getUser("user").getUsed().getMemorySize()); Assert.assertEquals(0 * GB, app.getAppAttemptResourceUsage().getReserved().getMemorySize()); Assert.assertEquals(1 * GB, app.getAppAttemptResourceUsage().getUsed().getMemorySize()); - - rm1.close(); } @Test @@ -743,7 +729,7 @@ public void testIncreaseContainerUnreservedWhenApplicationCompleted() * Similar to testIncreaseContainerUnreservedWhenContainerCompleted, when * application finishes, reserved increase container should be cancelled */ - MockRM rm1 = new MockRM() { + rm1 = new MockRM() { @Override public RMNodeLabelsManager createNodeLabelManager() { return mgr; @@ -785,7 +771,7 @@ public RMNodeLabelsManager createNodeLabelManager() { app.getAppAttemptResourceUsage().getPending().getMemorySize()); // NM1 do 1 heartbeats - CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + final CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); @@ -822,13 +808,17 @@ public RMNodeLabelsManager createNodeLabelManager() { // Queue/user/application's usage will be updated checkUsedResource(rm1, "default", 0 * GB, null); // User will be removed - Assert.assertNull(((LeafQueue) cs.getQueue("default")).getUser("user")); + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + LeafQueue leafQueue = (LeafQueue) cs.getQueue("default"); + return leafQueue.getUser("user") == null; + } + }, 1, 5000); Assert.assertEquals(0 * GB, app.getAppAttemptResourceUsage().getReserved().getMemorySize()); Assert.assertEquals(0 * GB, app.getAppAttemptResourceUsage().getUsed().getMemorySize()); - - rm1.close(); } private void allocateAndLaunchContainers(MockAM am, MockNM nm, MockRM rm, @@ -863,7 +853,7 @@ public void testOrderOfIncreaseContainerRequestAllocation() * be increase sorted by priority, if priority is same, smaller containerId * container will get preferred */ - MockRM rm1 = new MockRM() { + rm1 = new MockRM() { @Override public RMNodeLabelsManager createNodeLabelManager() { return mgr; @@ -934,15 +924,13 @@ public RMNodeLabelsManager createNodeLabelManager() { app.getAppAttemptResourceUsage().getReserved().getMemorySize()); Assert.assertEquals(10 * GB, app.getAppAttemptResourceUsage().getUsed().getMemorySize()); - - rm1.close(); } @Test (timeout = 60000) public void testDecreaseContainerWillNotDeadlockContainerAllocation() throws Exception { // create and start MockRM with our MyScheduler - MockRM rm = new MockRM() { + rm1 = new MockRM() { @Override public ResourceScheduler createScheduler() { CapacityScheduler cs = new MyScheduler(); @@ -950,22 +938,22 @@ public ResourceScheduler createScheduler() { return cs; } }; - rm.start(); + rm1.start(); // register a node - MockNM nm = rm.registerNode("h1:1234", 20 * GB); + MockNM nm = rm1.registerNode("h1:1234", 20 * GB); // submit an application -> app1 - RMApp app1 = rm.submitApp(3 * GB, "app", "user", null, "default"); - MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm); + RMApp app1 = rm1.submitApp(3 * GB, "app", "user", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm); // making sure resource is allocated - checkUsedResource(rm, "default", 3 * GB, null); + checkUsedResource(rm1, "default", 3 * GB, null); FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp( - rm, app1.getApplicationId()); + rm1, app1.getApplicationId()); Assert.assertEquals(3 * GB, app.getAppAttemptResourceUsage().getUsed().getMemorySize()); // making sure container is launched ContainerId containerId1 = ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); - sentRMContainerLaunched(rm, containerId1); + sentRMContainerLaunched(rm1, containerId1); // submit allocation request for a new container am1.allocate(Collections.singletonList(ResourceRequest.newInstance( Priority.newInstance(1), "*", Resources.createResource(2 * GB), 1)), @@ -979,30 +967,85 @@ public ResourceScheduler createScheduler() { .newInstance(0, containerId1, ContainerUpdateType.DECREASE_RESOURCE, Resources.createResource(GB), null))); - // verify that the containe resource is decreased + // verify that the container resource is decreased verifyContainerDecreased(response, containerId1, GB); + } - rm.close(); + private void checkPendingResource(final MockRM rm, final String queueName, + final int memory, final String label) + throws TimeoutException, InterruptedException { + waitForResourcesToBeUpdated(null, rm, queueName, memory, label, false); } - private void checkPendingResource(MockRM rm, String queueName, int memory, - String label) { - CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); - CSQueue queue = cs.getQueue(queueName); - Assert.assertEquals(memory, - queue.getQueueResourceUsage() - .getPending(label == null ? RMNodeLabelsManager.NO_LABEL : label) - .getMemorySize()); + private void checkPendingResource(final DrainDispatcher dispatcher, + final MockRM rm, final String queueName, + final int memory, final String label) + throws TimeoutException, InterruptedException { + waitForResourcesToBeUpdated(dispatcher, rm, queueName, memory, label, false); } - private void checkUsedResource(MockRM rm, String queueName, int memory, - String label) { - CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); - CSQueue queue = cs.getQueue(queueName); - Assert.assertEquals(memory, - queue.getQueueResourceUsage() - .getUsed(label == null ? RMNodeLabelsManager.NO_LABEL : label) - .getMemorySize()); + private void checkUsedResource(final MockRM rm, final String queueName, + final int memory, final String label) + throws TimeoutException, InterruptedException { + waitForResourcesToBeUpdated(null, rm, queueName, memory, label, true); + } + + private void checkUsedResource(final DrainDispatcher dispatcher, + final MockRM rm, final String queueName, + final int memory, final String label) + throws TimeoutException, InterruptedException { + waitForResourcesToBeUpdated(dispatcher, rm, queueName, memory, label, true); + } + + private void waitForResourcesToBeUpdated(final DrainDispatcher dispatcher, + final MockRM rm, + final String queueName, + final int memory, final String label, + final boolean usedResources) + throws TimeoutException, InterruptedException { + if (dispatcher != null) { + dispatcher.waitForEventThreadToWait(); + } + final CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + final CSQueue queue = cs.getQueue(queueName); + final String labelParam = label == null + ? RMNodeLabelsManager.NO_LABEL : label; + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + ResourceUsage rUsage = queue.getQueueResourceUsage(); + Resource resource = usedResources ? rUsage.getUsed(labelParam) + : rUsage.getPending(labelParam); + return memory == resource.getMemorySize(); + } + }, 1, 5000); + } + + /** + * Check statuses after reservation satisfied. + * Increase request should be unreserved. + * + * @param app the schedulerApp + * @param cs the scheduler managing the nodes + * @param nm the NM mock object + * @throws TimeoutException + * @throws InterruptedException + */ + private void waitForStatusesPostReservation(final DrainDispatcher dispatcher, + final FiCaSchedulerApp app, + final CapacityScheduler cs, + final MockNM nm) + throws TimeoutException, InterruptedException { + dispatcher.waitForEventThreadToWait(); + /* Check statuses after reservation satisfied */ + // Increase request should be unreserved + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + return (app.getReservedContainers().isEmpty() + && cs.getNode(nm.getNodeId()).getReservedContainer() == null); + } + }, 1, 5000); } private void verifyContainerIncreased(AllocateResponse response,