diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index d3186da..424a5de 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -2392,7 +2392,7 @@ public void tryCommit(Resource cluster, ResourceCommitRequest r) { if (attemptId != null) { FiCaSchedulerApp app = getApplicationAttempt(attemptId); - if (app != null) { + if (app != null && attemptId.equals(app.getApplicationAttemptId())) { if (app.accept(cluster, request)) { app.apply(cluster, request); LOG.info("Allocation proposal accepted"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index bf1f6eb..111844b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -18,29 +18,10 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.security.PrivilegedAction; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.BrokenBarrierException; -import java.util.concurrent.CyclicBarrier; - import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -90,7 +71,6 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.AdminService; import org.apache.hadoop.yarn.server.resourcemanager.Application; -import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; @@ -123,6 +103,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; @@ -131,13 +112,15 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event. - ContainerExpiredSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; @@ -162,9 +145,27 @@ import org.junit.Test; import org.mockito.Mockito; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Sets; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.security.PrivilegedAction; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class TestCapacityScheduler { private static final Log LOG = LogFactory.getLog(TestCapacityScheduler.class); @@ -4415,4 +4416,120 @@ public void testConvertLeafQueueToParentQueue() throws Exception { Assert.assertEquals(b1.getState(), QueueState.RUNNING); Assert.assertTrue(!b1.getChildQueues().isEmpty()); } + + + @Test (timeout = 30000) + public void testCommitOutdatedProposal() + throws Exception { + // init RM & NMs & Nodes + Configuration conf = new YarnConfiguration(); + final MockRM rm = new MockRM(conf); + rm.start(); + final MockNM nm1 = rm.registerNode("h1:1234", 9 * GB); + final MockNM nm2 = rm.registerNode("h2:2234", 9 * GB); + List nmLst = new ArrayList<>(); + nmLst.add(nm1); + nmLst.add(nm2); + + // init scheduler & nodes + while ( + ((CapacityScheduler) rm.getRMContext().getScheduler()).getNodeTracker() + .nodeCount() < 2) { + Thread.sleep(10); + } + Assert.assertEquals(2, + ((AbstractYarnScheduler) rm.getRMContext().getScheduler()) + .getNodeTracker().nodeCount()); + CapacityScheduler scheduler = + (CapacityScheduler) rm.getRMContext().getScheduler(); + SchedulerNode sn1 = scheduler.getSchedulerNode(nm1.getNodeId()); + SchedulerNode sn2 = scheduler.getSchedulerNode(nm2.getNodeId()); + + // launch app + RMApp app = rm.submitApp(200, "app", "user", null, false, "default", + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS, null, null, true, true); + MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1); + FiCaSchedulerApp schedulerApp = + scheduler.getApplicationAttempt(am.getApplicationAttemptId()); + + // request 1 containers and running on nm2 + Priority priority = Priority.newInstance(0); + ResourceRequest rr = ResourceRequest + .newInstance(priority, "*", Resources.createResource(5 * GB), 1); + am.allocate(Arrays.asList(rr), null); + List containers = + kickOnceAndWaitUntilContainerAllocated(am, Arrays.asList(nm2), 1, + 100000); + for (Container container : containers) { + sentRMContainerLaunched(rm, container.getId()); + rm.waitForState(nm2, container.getId(), RMContainerState.RUNNING); + } + // nm1 runs 1 containers(app1-container_01/AM) + // nm2 runs 1 containers(app1-container_02) + Assert.assertEquals(1, sn1.getNumContainers()); + Assert.assertEquals(1, sn2.getNumContainers()); + + // kill app attempt1 + scheduler.handle( + new AppAttemptRemovedSchedulerEvent(am.getApplicationAttemptId(), + RMAppAttemptState.KILLED, true)); + // wait until app attempt1 removed on nm1 + while (sn1.getCopiedListOfRunningContainers().size() == 1) { + Thread.sleep(100); + } + // wait until app attempt2 launched on nm1 + while (sn1.getCopiedListOfRunningContainers().size() == 0) { + nm1.nodeHeartbeat(true); + Thread.sleep(100); + } + + // generate reserved proposal of stopped app attempt + // and it could be committed for async-scheduling + // this kind of proposal should be skipped + Resource reservedResource = Resources.createResource(5 * GB); + Container container = Container.newInstance( + ContainerId.newContainerId(am.getApplicationAttemptId(), 3), + sn2.getNodeID(), sn2.getHttpAddress(), reservedResource, + Priority.newInstance(0), null); + RMContainer rmContainer = + new RMContainerImpl(container, + SchedulerRequestKey.create(ResourceRequest.newInstance(Priority.newInstance(0), "*", reservedResource, 1)), + am.getApplicationAttemptId(), + sn2.getNodeID(), "user", rm.getRMContext()); + SchedulerContainer reservedContainer = + new SchedulerContainer(schedulerApp, scheduler.getNode(sn2.getNodeID()), + rmContainer, "", false); + ContainerAllocationProposal reservedForAttempt1Proposal = + new ContainerAllocationProposal(reservedContainer, null, + reservedContainer, NodeType.OFF_SWITCH, NodeType.OFF_SWITCH, + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, reservedResource); + List reservedProposals = new ArrayList<>(); + reservedProposals.add(reservedForAttempt1Proposal); + ResourceCommitRequest request = + new ResourceCommitRequest(null, reservedProposals, null); + scheduler.tryCommit(scheduler.getClusterResource(), request); + Assert.assertNull("Outdated proposal should not be accepted!", sn2.getReservedContainer()); + + // close rm + rm.close(); + } + + private List kickOnceAndWaitUntilContainerAllocated(MockAM am, + List nmLst, int expectNumContainers, int timeoutInMs) throws Exception { + int timeSpent = 0; + List conts = new ArrayList(); + for (MockNM nm : nmLst) { + nm.nodeHeartbeat(true); + } + while (true) { + conts.addAll(am.allocate(new ArrayList(), null) + .getAllocatedContainers()); + if (timeSpent < timeoutInMs && conts.size() < expectNumContainers) { + Thread.sleep(100); + timeSpent += 100; + } else { + return conts; + } + } + } }