diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 97e25d5cafb..be936794fc2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -375,7 +375,7 @@ public long getSkipNodeInterval(){ } protected void containerLaunchedOnNode( - ContainerId containerId, SchedulerNode node) { + ContainerId containerId, RMNode nm, SchedulerNode node) { readLock.lock(); try { // Get the application for the finished container @@ -384,21 +384,23 @@ protected void containerLaunchedOnNode( if (application == null) { LOG.info("Unknown application " + containerId.getApplicationAttemptId() .getApplicationId() + " launched container " + containerId - + " on node: " + node); + + " on node: " + nm); this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeCleanContainerEvent(node.getNodeID(), containerId)); + new RMNodeCleanContainerEvent(nm.getNodeID(), containerId)); return; } - application.containerLaunchedOnNode(containerId, node.getNodeID()); - node.containerStarted(containerId); + application.containerLaunchedOnNode(containerId, nm.getNodeID()); + if (node != null) { + node.containerStarted(containerId); + } } finally { readLock.unlock(); } } protected void containerIncreasedOnNode(ContainerId containerId, - SchedulerNode node, Container increasedContainerReportedByNM) { + RMNode nm, Container increasedContainerReportedByNM) { /* * No lock is required, as this method is protected by scheduler's writeLock */ @@ -408,9 +410,9 @@ protected void containerIncreasedOnNode(ContainerId containerId, if (application == null) { LOG.info("Unknown application " + containerId.getApplicationAttemptId() .getApplicationId() + " increased container " + containerId - + " on node: " + node); + + " on node: " + nm); this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeCleanContainerEvent(node.getNodeID(), containerId)); + new RMNodeCleanContainerEvent(nm.getNodeID(), containerId)); return; } @@ -418,7 +420,7 @@ protected void containerIncreasedOnNode(ContainerId containerId, if (rmContainer == null) { // Some unknown container sneaked into the system. Kill it. this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeCleanContainerEvent(node.getNodeID(), containerId)); + new RMNodeCleanContainerEvent(nm.getNodeID(), containerId)); return; } rmContainer.handle(new RMContainerNMDoneChangeResourceEvent(containerId, @@ -1061,15 +1063,14 @@ public SchedulerNode getNode(NodeId nodeId) { // Processing the newly launched containers for (ContainerStatus launchedContainer : newlyLaunchedContainers) { - containerLaunchedOnNode(launchedContainer.getContainerId(), - schedulerNode); + containerLaunchedOnNode(launchedContainer.getContainerId(), nm, schedulerNode); } // Processing the newly increased containers List newlyIncreasedContainers = nm.pullNewlyIncreasedContainers(); for (Container container : newlyIncreasedContainers) { - containerIncreasedOnNode(container.getId(), schedulerNode, container); + containerIncreasedOnNode(container.getId(), nm, container); } // Processing the update exist containers diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index eea017848a1..350239b8186 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -143,6 +143,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; @@ -4491,6 +4492,27 @@ public void testRemovedNodeDecomissioningNode() throws Exception { Mockito.spy(resourceManager.getRMContext().getRMNodes() .get(nm.getNodeId())); when(spyNode.getState()).thenReturn(NodeState.DECOMMISSIONING); + // mock some container + List containerUpdates = new ArrayList<>(); + UpdatedContainerInfo containerInfo = mock(UpdatedContainerInfo.class); + List containerStatusList = new ArrayList<>(); + ContainerStatus containerStatus = mock(ContainerStatus.class); + ContainerId containerId = mock(ContainerId.class); + ApplicationId applicationId = ApplicationId.newInstance(1, 1); + ApplicationAttemptId applicationAttemptId = ApplicationAttemptId.newInstance(applicationId,1); + when(containerId.getApplicationAttemptId()).thenReturn(applicationAttemptId); + when(containerStatus.getContainerId()).thenReturn(containerId); + containerStatusList.add(containerStatus); + when(containerInfo.getNewlyLaunchedContainers()).thenReturn(containerStatusList); + containerUpdates.add(containerInfo); + when(spyNode.pullContainerUpdates()).thenReturn(containerUpdates); + SchedulerApplication schedulerApplication = mock(SchedulerApplication.class); + FiCaSchedulerApp fiCaSchedulerApp = + new FiCaSchedulerApp(applicationAttemptId, + "test", null, null, resourceManager.getRMContext()); + when(schedulerApplication.getCurrentAppAttempt()).thenReturn(fiCaSchedulerApp); + ((CapacityScheduler) resourceManager.getResourceScheduler()) + .getSchedulerApplications().put(applicationId, schedulerApplication); resourceManager.getResourceScheduler().handle( new NodeUpdateSchedulerEvent(spyNode)); }