diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 8a2c539..159e6c2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -31,6 +31,7 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.VersionUtil; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -187,12 +188,53 @@ protected void serviceStop() throws Exception { super.serviceStop(); } + private void handleContainerStatus(ContainerStatus containerStatus) { + ApplicationAttemptId appAttemptId = + containerStatus.getContainerId().getApplicationAttemptId(); + RMApp rmApp = + rmContext.getRMApps().get(appAttemptId.getApplicationId()); + if (rmApp == null) { + LOG.error("Received finished container : " + + containerStatus.getContainerId() + + "for unknown application " + appAttemptId.getApplicationId() + + " Skipping."); + return; + } + + RMAppAttempt rmAppAttempt = rmApp.getRMAppAttempt(appAttemptId); + if (rmAppAttempt == null) { + LOG.warn("Received finished container : " + + containerStatus.getContainerId() + + ", but the application doesn't know about this attempt " + + appAttemptId + " Skipping."); + return; + } + + Container masterContainer = rmAppAttempt.getMasterContainer(); + if (masterContainer == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Received finished container : " + + containerStatus.getContainerId() + " for an application attempt" + + " that doesn't have a master container. Ignoring."); + } + return; + } + + if (masterContainer.getId().equals(containerStatus.getContainerId()) + && containerStatus.getState() == ContainerState.COMPLETE) { + // sending master container finished event. + RMAppAttemptContainerFinishedEvent evt = + new RMAppAttemptContainerFinishedEvent(appAttemptId, + containerStatus); + rmContext.getDispatcher().getEventHandler().handle(evt); + } + } + @SuppressWarnings("unchecked") @Override public RegisterNodeManagerResponse registerNodeManager( RegisterNodeManagerRequest request) throws YarnException, IOException { - NodeId nodeId = request.getNodeId(); String host = nodeId.getHost(); int cmPort = nodeId.getPort(); @@ -204,29 +246,7 @@ public RegisterNodeManagerResponse registerNodeManager( LOG.info("received container statuses on node manager register :" + request.getContainerStatuses()); for (ContainerStatus containerStatus : request.getContainerStatuses()) { - ApplicationAttemptId appAttemptId = - containerStatus.getContainerId().getApplicationAttemptId(); - RMApp rmApp = - rmContext.getRMApps().get(appAttemptId.getApplicationId()); - if (rmApp != null) { - RMAppAttempt rmAppAttempt = rmApp.getRMAppAttempt(appAttemptId); - if (rmAppAttempt != null) { - if (rmAppAttempt.getMasterContainer().getId() - .equals(containerStatus.getContainerId()) - && containerStatus.getState() == ContainerState.COMPLETE) { - // sending master container finished event. - RMAppAttemptContainerFinishedEvent evt = - new RMAppAttemptContainerFinishedEvent(appAttemptId, - containerStatus); - rmContext.getDispatcher().getEventHandler().handle(evt); - } - } - } else { - LOG.error("Received finished container :" - + containerStatus.getContainerId() - + " for non existing application :" - + appAttemptId.getApplicationId()); - } + handleContainerStatus(containerStatus); } } RegisterNodeManagerResponse response = recordFactory diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 697a180..3e90ec8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -35,9 +35,11 @@ import javax.crypto.SecretKey; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; @@ -629,7 +631,9 @@ public Container getMasterContainer() { } } - private void setMasterContainer(Container container) { + @InterfaceAudience.Private + @VisibleForTesting + public void setMasterContainer(Container container) { masterContainer = container; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 303e0fb..e4f648b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -26,6 +26,9 @@ import java.util.HashMap; import java.util.List; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt + .RMAppAttemptImpl; import org.junit.Assert; import org.apache.hadoop.conf.Configuration; @@ -468,6 +471,9 @@ private void checkUnealthyNMCount(MockRM rm, MockNM nm1, boolean health, ClusterMetrics.getMetrics().getUnhealthyNMs()); } + /** + * Test verifies node registration with completed containers + */ @Test public void testNodeRegistrationWithContainers() throws Exception { rm = new MockRM(); @@ -478,13 +484,22 @@ public void testNodeRegistrationWithContainers() throws Exception { MockNM nm = rm.registerNode("host1:1234", 8192); nm.nodeHeartbeat(true); - // Register node with some container statuses + // Case 1: AppAttemptId is null ContainerStatus status = ContainerStatus.newInstance( ContainerId.newInstance(ApplicationAttemptId.newInstance( app.getApplicationId(), 2), 1), ContainerState.COMPLETE, "Dummy Completed", 0); + nm.registerNode(Collections.singletonList(status)); + assertEquals("Incorrect number of nodes", 1, + rm.getRMContext().getRMNodes().size()); - // The following shouldn't throw NPE + // Case 2: Master container is null + RMAppAttemptImpl currentAttempt = + (RMAppAttemptImpl) app.getCurrentAppAttempt(); + currentAttempt.setMasterContainer(null); + status = ContainerStatus.newInstance( + ContainerId.newInstance(currentAttempt.getAppAttemptId(), 0), + ContainerState.COMPLETE, "Dummy Completed", 0); nm.registerNode(Collections.singletonList(status)); assertEquals("Incorrect number of nodes", 1, rm.getRMContext().getRMNodes().size());