diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 8acf7d5..a611f0b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -423,8 +423,12 @@ public boolean isPlaceBlacklisted(String resourceName, updateMetricsForAllocatedContainer(type, node, containerAllocated); } - return schedulerKeyToPlacementSets.get(schedulerKey).allocate( - schedulerKey, type, node); + if (schedulerKeyToPlacementSets.containsKey(schedulerKey)) { + return schedulerKeyToPlacementSets.get(schedulerKey) + .allocate(schedulerKey, type, node); + } else { + return null; + } } finally { writeLock.unlock(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 64e0df4..25d8cf8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -41,6 +41,8 @@ import java.util.Set; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.base.Supplier; import org.apache.commons.logging.Log; @@ -64,6 +66,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -132,6 +135,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; @@ -170,6 +174,8 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public class TestCapacityScheduler { private static final Log LOG = LogFactory.getLog(TestCapacityScheduler.class); @@ -4808,4 +4814,52 @@ private void waitforNMRegistered(ResourceScheduler scheduler, int nodecount, } } } + + @Test (timeout = 60000) + public void testClearRequestsBeforeApplyTheProposal() + throws Exception { + // init RM & NMs & Nodes + final MockRM rm = new MockRM(new CapacitySchedulerConfiguration()); + rm.start(); + final MockNM nm = rm.registerNode("h1:1234", 200 * GB); + + // submit app + final RMApp app = rm.submitApp(200, "app", "user"); + final MockAM am = MockRM.launchAndRegisterAM(app, rm, nm); + rm.waitForState(app.getApplicationId(), RMAppState.RUNNING); + + // spy capacity scheduler to handle CapacityScheduler#tryCommit, + // reproduce the process that can raise NPE + final Priority priority = Priority.newInstance(1); + final CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + final CapacityScheduler spyCs = Mockito.spy(cs); + Mockito.doAnswer(new Answer() { + public Object answer(InvocationOnMock invocation) throws Exception { + ResourceCommitRequest request = + (ResourceCommitRequest) invocation.getArguments()[1]; + // clear resource request before applying the proposal for container_1 + spyCs.allocate(app.getCurrentAppAttempt().getAppAttemptId(), + Arrays.asList(ResourceRequest.newInstance(priority, "*", + Resources.createResource(1 * GB), 0)), + Collections.emptyList(), null, null, + NULL_UPDATE_REQUESTS); + // apply proposal for container_1 + FiCaSchedulerApp schedulerApp = cs.getApplicationAttempt( + app.getCurrentAppAttempt().getAppAttemptId()); + schedulerApp.apply((Resource) invocation.getArguments()[0], + (ResourceCommitRequest) invocation.getArguments()[1]); + return null; + } + }).when(spyCs).tryCommit(Mockito.any(Resource.class), + Mockito.any(ResourceCommitRequest.class)); + + // rm allocates container_1 + spyCs.allocate(app.getCurrentAppAttempt().getAppAttemptId(), + Arrays.asList(ResourceRequest.newInstance(priority, "*", + Resources.createResource(1 * GB), 1)), + Collections.emptyList(), null, null, + NULL_UPDATE_REQUESTS); + spyCs.handle(new NodeUpdateSchedulerEvent( + spyCs.getNode(nm.getNodeId()).getRMNode())); + } }