diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 8b1f8b4..789339f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -2483,4 +2483,10 @@ public boolean moveReservedContainer(RMContainer toBeMovedContainer, writeLock.unlock(); } } + + @VisibleForTesting + public void updateApplicationAttempt( + FiCaSchedulerApp app) { + this.applications.get(app.getApplicationId()).setCurrentAppAttempt(app); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 260b57a..14fd290 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -65,14 +65,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.AbstractContainerAllocator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; - import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; - -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -81,7 +79,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -423,26 +420,25 @@ public boolean accept(Resource cluster, commonCheckContainerAllocation(cluster, allocation, schedulerContainer); } else { + // When reserve a resource (state == NEW is for new container, + // state == RUNNING is for increase container). + // Just check if the node is not already reserved by someone + if (schedulerContainer.getSchedulerNode().getReservedContainer() + != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Try to reserve a container, but the node is " + + "already reserved by another container=" + + schedulerContainer.getSchedulerNode() + .getReservedContainer().getContainerId()); + } + return false; + } // Container reserved first time will be NEW, after the container // accepted & confirmed, it will become RESERVED state if (schedulerContainer.getRmContainer().getState() == RMContainerState.RESERVED) { // Set reReservation == true reReservation = true; - } else { - // When reserve a resource (state == NEW is for new container, - // state == RUNNING is for increase container). - // Just check if the node is not already reserved by someone - if (schedulerContainer.getSchedulerNode().getReservedContainer() - != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Try to reserve a container, but the node is " - + "already reserved by another container=" - + schedulerContainer.getSchedulerNode() - .getReservedContainer().getContainerId()); - } - return false; - } } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index ff8b206..d89e765 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -111,6 +112,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; @@ -140,6 +142,8 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import java.io.IOException; import java.net.InetSocketAddress; @@ -154,6 +158,7 @@ import java.util.Set; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicBoolean; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -4414,4 +4419,160 @@ public void testConvertLeafQueueToParentQueue() throws Exception { Assert.assertEquals(b1.getState(), QueueState.RUNNING); Assert.assertTrue(!b1.getChildQueues().isEmpty()); } + + @Test (timeout = 60000) + public void testIllegalStateExceptionWhenReserveResource() + throws Exception { + Configuration conf = new YarnConfiguration(); + + // init RM & NMs & Nodes + final MockRM rm = new MockRM(conf); + rm.start(); + + final MockNM nm1 = rm.registerNode("h1:1234", 9 * GB); + final MockNM nm2 = rm.registerNode("h2:2234", 9 * GB); + List nmLst = new ArrayList<>(); + nmLst.add(nm1); + nmLst.add(nm2); + + // init scheduler nodes + while (((AbstractYarnScheduler) rm.getRMContext().getScheduler()) + .getNodeTracker().nodeCount() < 2) { + Thread.sleep(100); + } + Assert.assertEquals(2, + ((AbstractYarnScheduler) rm.getRMContext().getScheduler()) + .getNodeTracker().nodeCount()); + + YarnScheduler scheduler = rm.getRMContext().getScheduler(); + final SchedulerNode sn1 = + ((CapacityScheduler) scheduler).getSchedulerNode(nm1.getNodeId()); + final SchedulerNode sn2 = + ((CapacityScheduler) scheduler).getSchedulerNode(nm2.getNodeId()); + + // submit app1, am1 is running on nm1 + RMApp app = rm.submitApp(200, "app", "user", null, "default"); + final MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1); + // submit app2, am2 is running on nm1 + RMApp app2 = rm.submitApp(200, "app", "user", null, "default"); + final MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1); + + // resource-request 1 : request 2 containers + final Priority priority = Priority.newInstance(0); + ResourceRequest rr = ResourceRequest + .newInstance(priority, "*", Resources.createResource(5 * GB), 2); + am.allocate(Arrays.asList(rr), null); + + kickOnceAndWaitUntilContainerAllocated(am, nmLst, 2, 100000); + + // nm1 runs 3 containers(app1-container_01/AM, app1-container_02, app2-container_01/AM) + // nm2 runs 1 containers(app1-container_03) + Assert.assertEquals(3, sn1.getNumContainers()); + Assert.assertEquals(1, sn2.getNumContainers()); + + // resource-request 2 : request 1 containers that should be reserved on nm1 + ResourceRequest rr2 = ResourceRequest + .newInstance(priority, "*", Resources.createResource(5 * GB), 1); + am.allocate(Arrays.asList(rr2), null); + nm1.nodeHeartbeat(true); + // wait container reserved on nm1 + while (true) { + if (sn1.getReservedContainer() != null) { + break; + } + Thread.sleep(100); + } + // nm1 reserved 1 container(app1-container_04) + Assert.assertNotNull(sn1.getReservedContainer()); + + // spy + final CapacityScheduler cs = (CapacityScheduler) scheduler; + final FiCaSchedulerApp schedulerApp = + cs.getApplicationAttempt(app.getCurrentAppAttempt().getAppAttemptId()); + final FiCaSchedulerApp spySchedulerApp = Mockito.spy(schedulerApp); + cs.updateApplicationAttempt(spySchedulerApp); + + final AtomicBoolean isFirstReserve = new AtomicBoolean(true); + final AtomicBoolean isSuccess = new AtomicBoolean(false); + final AtomicBoolean isChecked = new AtomicBoolean(false); + // handle FiCaSchedulerApp#apply + // clear resource request before real apply + Mockito.doAnswer(new Answer() { + public Object answer(InvocationOnMock invocation) throws Exception { + ResourceCommitRequest request = + (ResourceCommitRequest) invocation.getArguments()[1]; + if (request.getContainersToReserve().size() > 0 && isFirstReserve + .compareAndSet(true, false)) { + // release app1-container_03 on nm2 + RMContainer killableContainer = + sn2.getCopiedListOfRunningContainers().get(0); + cs.completedContainer(killableContainer, ContainerStatus + .newInstance(killableContainer.getContainerId(), + ContainerState.COMPLETE, "", + ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), + RMContainerEventType.KILL); + Assert.assertEquals(0, sn2.getCopiedListOfRunningContainers().size()); + // unreserve app1-container_04 on nm1 + // and allocate app1-container_05 on nm2 + cs.nodeUpdate(cs.getRMContext().getRMNodes().get(nm2.getNodeId())); + while (sn2.getCopiedListOfRunningContainers().size() < 1) { + Thread.sleep(100); + } + Assert.assertEquals(1, sn2.getCopiedListOfRunningContainers().size()); + Assert.assertNull(sn1.getReservedContainer()); + + // reserve app2-container_02 on nm1 + ResourceRequest rr3 = ResourceRequest + .newInstance(priority, "*", Resources.createResource(5 * GB), 1); + am2.allocate(Arrays.asList(rr3), null); + cs.nodeUpdate(cs.getRMContext().getRMNodes().get(nm1.getNodeId())); + while (sn1.getReservedContainer() == null) { + Thread.sleep(100); + } + Assert.assertNotNull(sn1.getReservedContainer()); + + // call real apply + boolean isAccept = schedulerApp + .accept((Resource) invocation.getArguments()[0], + (ResourceCommitRequest) invocation.getArguments()[1]); + isSuccess.set(!isAccept); + isChecked.set(true); + return isAccept; + } else { + return schedulerApp.accept((Resource) invocation.getArguments()[0], + (ResourceCommitRequest) invocation.getArguments()[1]); + } + } + }).when(spySchedulerApp) + .accept(Mockito.any(Resource.class), Mockito.any(ResourceCommitRequest.class)); + + // kick nm1 to fulfill reserved app1-container_04 + nm1.nodeHeartbeat(true); + + while (!isChecked.get()) { + Thread.sleep(100); + } + Assert.assertTrue(isSuccess.get()); + } + + + private void kickOnceAndWaitUntilContainerAllocated(MockAM am, + List nmLst, int expectNumContainers, int timeoutInMs) throws Exception { + int timeSpent = 0; + List conts = new ArrayList(); + // kick the scheduler + for (MockNM nm : nmLst) { + nm.nodeHeartbeat(true); + } + while (true) { + conts.addAll(am.allocate(new ArrayList(), null) + .getAllocatedContainers()); + if (timeSpent < timeoutInMs && conts.size() < expectNumContainers) { + Thread.sleep(100); + timeSpent += 100; + } else { + break; + } + } + } }