diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index f3da0a3..8b39812 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -502,8 +502,15 @@ public void apply(Resource cluster, ResourceCommitRequest schedulerContainer = allocation.getAllocatedOrReservedContainer(); - RMContainer rmContainer = schedulerContainer.getRmContainer(); + // Required sanity check - AM can call 'allocate' to update resource + // request without locking the scheduler, hence we need to check + if (getOutstandingAsksCount(schedulerContainer.getSchedulerRequestKey()) + <= 0) { + return; + } + + RMContainer rmContainer = schedulerContainer.getRmContainer(); reReservation = (!schedulerContainer.isAllocated()) && (rmContainer.getState() == RMContainerState.RESERVED); @@ -545,7 +552,8 @@ public void apply(Resource cluster, ResourceCommitRequest() { + public Object answer(InvocationOnMock invocation) throws Exception { + // clear resource request before applying the proposal for container_2 + spyCs.allocate(app.getCurrentAppAttempt().getAppAttemptId(), + Arrays.asList(ResourceRequest.newInstance(priority, "*", + Resources.createResource(1 * GB), 0)), null, + Collections.emptyList(), null, null, + NULL_UPDATE_REQUESTS); + // trigger real apply which can raise NPE before YARN-6629 + try { + FiCaSchedulerApp schedulerApp = cs.getApplicationAttempt( + app.getCurrentAppAttempt().getAppAttemptId()); + schedulerApp.apply((Resource) invocation.getArguments()[0], + (ResourceCommitRequest) invocation.getArguments()[1], + (Boolean) invocation.getArguments()[2]); + // the proposal of removed request should be rejected + Assert.assertEquals(1, schedulerApp.getLiveContainers().size()); + } catch (Throwable e) { + Assert.fail(); + } + return null; + } + }).when(spyCs).tryCommit(Mockito.any(Resource.class), + Mockito.any(ResourceCommitRequest.class), Mockito.anyBoolean()); + + // rm allocates container_2 to reproduce the process that can raise NPE + spyCs.allocate(app.getCurrentAppAttempt().getAppAttemptId(), + Arrays.asList(ResourceRequest.newInstance(priority, "*", + Resources.createResource(1 * GB), 1)), null, + Collections.emptyList(), null, null, NULL_UPDATE_REQUESTS); + spyCs.handle(new NodeUpdateSchedulerEvent( + spyCs.getNode(nm.getNodeId()).getRMNode())); + } }