diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 5c0b718..3f6ab5d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -427,6 +427,18 @@ public boolean accept(Resource cluster, // accepted & confirmed, it will become RESERVED state if (schedulerContainer.getRmContainer().getState() == RMContainerState.RESERVED) { + // Check if node currently reserved by other application, there may + // be some outdated proposals in async-scheduling environment + if (schedulerContainer.getRmContainer() != schedulerContainer + .getSchedulerNode().getReservedContainer()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Try to re-reserve a container, but the node is " + + "already reserved by another container" + + schedulerContainer.getSchedulerNode() + .getReservedContainer().getContainerId()); + } + return false; + } // Set reReservation == true reReservation = true; } else { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java index 9854a15..a1d7293 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java @@ -18,6 +18,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; @@ -25,13 +33,28 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; public class TestCapacitySchedulerAsyncScheduling { private final int GB = 1024; @@ -140,4 +163,165 @@ public RMNodeLabelsManager createNodeLabelManager() { rm.close(); } + + // Testcase for YARN-6678 + @Test(timeout = 30000) + public void testCommitOutdatedReservedProposal() throws Exception { + // disable async-scheduling for simulating complex since scene + Configuration disableAsyncConf = new Configuration(conf); + disableAsyncConf.setBoolean( + CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, false); + + // init RM & NMs & Nodes + final MockRM rm = new MockRM(disableAsyncConf); + rm.start(); + final MockNM nm1 = rm.registerNode("h1:1234", 9 * GB); + final MockNM nm2 = rm.registerNode("h2:2234", 9 * GB); + + // init scheduler nodes + while (((AbstractYarnScheduler) rm.getRMContext().getScheduler()) + .getNodeTracker().nodeCount() < 2) { + Thread.sleep(100); + } + Assert.assertEquals(2, + ((AbstractYarnScheduler) rm.getRMContext().getScheduler()) + .getNodeTracker().nodeCount()); + + YarnScheduler scheduler = rm.getRMContext().getScheduler(); + final SchedulerNode sn1 = + ((CapacityScheduler) scheduler).getSchedulerNode(nm1.getNodeId()); + final SchedulerNode sn2 = + ((CapacityScheduler) scheduler).getSchedulerNode(nm2.getNodeId()); + + // submit app1, am1 is running on nm1 + RMApp app = rm.submitApp(200, "app", "user", null, "default"); + final MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1); + // submit app2, am2 is running on nm1 + RMApp app2 = rm.submitApp(200, "app", "user", null, "default"); + final MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1); + + // allocate and launch 2 containers for app1 + allocateAndLaunchContainers(am, nm1, rm, 1, + Resources.createResource(5 * GB), 0, 2); + allocateAndLaunchContainers(am, nm2, rm, 1, + Resources.createResource(5 * GB), 0, 3); + + // nm1 runs 3 containers(app1-container_01/AM, app1-container_02, + // app2-container_01/AM) + // nm2 runs 1 container(app1-container_03) + Assert.assertEquals(3, sn1.getNumContainers()); + Assert.assertEquals(1, sn2.getNumContainers()); + + // reserve 1 container(app1-container_04) for app1 on nm1 + ResourceRequest rr2 = ResourceRequest + .newInstance(Priority.newInstance(0), "*", + Resources.createResource(5 * GB), 1); + am.allocate(Arrays.asList(rr2), null); + nm1.nodeHeartbeat(true); + // wait app1-container_04 reserved on nm1 + while (true) { + if (sn1.getReservedContainer() != null) { + break; + } + Thread.sleep(100); + } + Assert.assertNotNull(sn1.getReservedContainer()); + + final CapacityScheduler cs = (CapacityScheduler) scheduler; + final CapacityScheduler spyCs = Mockito.spy(cs); + final AtomicBoolean isFirstReserve = new AtomicBoolean(true); + final AtomicBoolean isChecked = new AtomicBoolean(false); + // handle CapacityScheduler#tryCommit, + // reproduce the process that can raise IllegalStateException before + Mockito.doAnswer(new Answer() { + public Object answer(InvocationOnMock invocation) throws Exception { + ResourceCommitRequest request = + (ResourceCommitRequest) invocation.getArguments()[1]; + if (request.getContainersToReserve().size() > 0 && isFirstReserve + .compareAndSet(true, false)) { + // release app1-container_03 on nm2 + RMContainer killableContainer = + sn2.getCopiedListOfRunningContainers().get(0); + cs.completedContainer(killableContainer, ContainerStatus + .newInstance(killableContainer.getContainerId(), + ContainerState.COMPLETE, "", + ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), + RMContainerEventType.KILL); + Assert.assertEquals(0, sn2.getCopiedListOfRunningContainers().size()); + // unreserve app1-container_04 on nm1 + // and allocate app1-container_05 on nm2 + cs.handle(new NodeUpdateSchedulerEvent(sn2.getRMNode())); + while (sn2.getCopiedListOfRunningContainers().size() < 1) { + Thread.sleep(100); + } + Assert.assertEquals(1, sn2.getCopiedListOfRunningContainers().size()); + Assert.assertNull(sn1.getReservedContainer()); + + // reserve app2-container_02 on nm1 + ResourceRequest rr3 = ResourceRequest + .newInstance(Priority.newInstance(0), "*", + Resources.createResource(5 * GB), 1); + am2.allocate(Arrays.asList(rr3), null); + cs.handle(new NodeUpdateSchedulerEvent(sn1.getRMNode())); + while (sn1.getReservedContainer() == null) { + Thread.sleep(100); + } + Assert.assertNotNull(sn1.getReservedContainer()); + + // call real apply + try { + cs.tryCommit((Resource) invocation.getArguments()[0], + (ResourceCommitRequest) invocation.getArguments()[1]); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } + isChecked.set(true); + } else { + cs.tryCommit((Resource) invocation.getArguments()[0], + (ResourceCommitRequest) invocation.getArguments()[1]); + } + return null; + } + }).when(spyCs).tryCommit(Mockito.any(Resource.class), + Mockito.any(ResourceCommitRequest.class)); + + spyCs.handle(new NodeUpdateSchedulerEvent(sn1.getRMNode())); + + while (!isChecked.get()) { + Thread.sleep(100); + } + } + + private void allocateAndLaunchContainers(MockAM am, MockNM nm, MockRM rm, + int nContainer, Resource resource, int priority, int startContainerId) + throws Exception { + am.allocate(Arrays.asList(ResourceRequest + .newInstance(Priority.newInstance(priority), "*", resource, + nContainer)), null); + ContainerId lastContainerId = ContainerId + .newContainerId(am.getApplicationAttemptId(), + startContainerId + nContainer - 1); + Assert.assertTrue( + rm.waitForState(nm, lastContainerId, RMContainerState.ALLOCATED)); + // Acquire them, and NM report RUNNING + am.allocate(null, null); + + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + for (int cId = startContainerId; + cId < startContainerId + nContainer; cId++) { + ContainerId containerId = + ContainerId.newContainerId(am.getApplicationAttemptId(), cId); + RMContainer rmContainer = cs.getRMContainer(containerId); + if (rmContainer != null) { + rmContainer.handle( + new RMContainerEvent(containerId, RMContainerEventType.LAUNCHED)); + } else { + Assert.fail("Cannot find RMContainer"); + } + rm.waitForState(nm, + ContainerId.newContainerId(am.getApplicationAttemptId(), cId), + RMContainerState.RUNNING); + } + } }