diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java index 38dfa58..44a6875 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java @@ -30,6 +30,7 @@ NodeAction getNodeAction(); List getContainersToCleanup(); + List getFinishedContainersPulledByAM(); List getApplicationsToCleanup(); @@ -43,6 +44,7 @@ void setNMTokenMasterKey(MasterKey secretKey); void addAllContainersToCleanup(List containers); + void addFinishedContainersPulledByAM(List containers); void addAllApplicationsToCleanup(List applications); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index 775f95a..e9296f4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -46,6 +46,7 @@ boolean viaProto = false; private List containersToCleanup = null; + private List finishedContainersPulledByAM = null; private List applicationsToCleanup = null; private MasterKey containerTokenMasterKey = null; private MasterKey nmTokenMasterKey = null; @@ -73,6 +74,9 @@ private void mergeLocalToBuilder() { if (this.applicationsToCleanup != null) { addApplicationsToCleanupToProto(); } + if (this.finishedContainersPulledByAM != null) { + addFinishedContainersPulledByAMToProto(); + } if (this.containerTokenMasterKey != null) { builder.setContainerTokenMasterKey( convertToProtoFormat(this.containerTokenMasterKey)); @@ -199,6 +203,12 @@ public void setDiagnosticsMessage(String diagnosticsMessage) { return this.containersToCleanup; } + @Override + public List getFinishedContainersPulledByAM() { + initFinishedContainersPulledByAM(); + return this.finishedContainersPulledByAM; + } + private void initContainersToCleanup() { if (this.containersToCleanup != null) { return; @@ -212,6 +222,19 @@ private void initContainersToCleanup() { } } + private void initFinishedContainersPulledByAM() { + if (this.finishedContainersPulledByAM != null) { + return; + } + NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getFinishedContainersPulledByAmList(); + this.finishedContainersPulledByAM = new ArrayList(); + + for (ContainerIdProto c : list) { + this.finishedContainersPulledByAM.add(convertFromProtoFormat(c)); + } + } + @Override public void addAllContainersToCleanup( final List containersToCleanup) { @@ -221,6 +244,15 @@ public void addAllContainersToCleanup( this.containersToCleanup.addAll(containersToCleanup); } + @Override + public void addFinishedContainersPulledByAM( + final List finishedContainersPulledByAM) { + if (finishedContainersPulledByAM == null) + return; + initFinishedContainersPulledByAM(); + this.finishedContainersPulledByAM.addAll(finishedContainersPulledByAM); + } + private void addContainersToCleanupToProto() { maybeInitBuilder(); builder.clearContainersToCleanup(); @@ -256,6 +288,41 @@ public void remove() { builder.addAllContainersToCleanup(iterable); } + private void addFinishedContainersPulledByAMToProto() { + maybeInitBuilder(); + builder.clearFinishedContainersPulledByAm(); + if (finishedContainersPulledByAM == null) + return; + Iterable iterable = new Iterable() { + + @Override + public Iterator iterator() { + return new Iterator() { + + Iterator iter = finishedContainersPulledByAM.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public ContainerIdProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + + } + }; + + } + }; + builder.addAllFinishedContainersPulledByAm(iterable); + } + @Override public List getApplicationsToCleanup() { initApplicationsToCleanup(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index 29cd64e..600f54d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -58,6 +58,7 @@ message NodeHeartbeatResponseProto { repeated ApplicationIdProto applications_to_cleanup = 6; optional int64 nextHeartBeatInterval = 7; optional string diagnostics_message = 8; + repeated ContainerIdProto finished_containers_pulled_by_am = 9; } message NMContainerStatusProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index a479be2..43770c1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -311,7 +311,7 @@ public void run() { public static class NMContext implements Context { private NodeId nodeId = null; - private final ConcurrentMap applications = + protected final ConcurrentMap applications = new ConcurrentHashMap(); protected final ConcurrentMap containers = new ConcurrentSkipListMap(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index b52b0fb..7c913b0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -104,11 +104,6 @@ // Duration for which to track recently stopped container. private long durationToTrackStoppedContainers; - // This is used to track the current completed containers when nodeheartBeat - // is called. These completed containers will be removed from NM context after - // nodeHeartBeat succeeds and the response from the nodeHeartBeat is - // processed. - private final Set previousCompletedContainers; private final NodeHealthCheckerService healthChecker; private final NodeManagerMetrics metrics; @@ -125,7 +120,6 @@ public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, this.metrics = metrics; this.recentlyStoppedContainers = new LinkedHashMap(); - this.previousCompletedContainers = new HashSet(); } @Override @@ -331,7 +325,7 @@ protected void registerWithRM() return appList; } - private NodeStatus getNodeStatus(int responseId) { + private NodeStatus getNodeStatus(int responseId) throws IOException { NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus(); nodeHealthStatus.setHealthReport(healthChecker.getHealthReport()); @@ -352,11 +346,19 @@ private NodeStatus getNodeStatus(int responseId) { // Iterate through the NMContext and clone and get all the containers' // statuses. If it's a completed container, add into the - // recentlyStoppedContainers and previousCompletedContainers collections. + // recentlyStoppedContainers collections. @VisibleForTesting - protected List getContainerStatuses() { + protected List getContainerStatuses() throws IOException { List containerStatuses = new ArrayList(); for (Container container : this.context.getContainers().values()) { + ContainerId containerId = container.getContainerId(); + ApplicationId applicationId = container.getContainerId() + .getApplicationAttemptId().getApplicationId(); + if (!this.context.getApplications().containsKey(applicationId)) { + context.getContainers().remove(containerId); + context.getNMStateStore().removeContainer(containerId); + continue; + } org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus = container.cloneAndGetContainerStatus(); containerStatuses.add(containerStatus); @@ -381,10 +383,18 @@ private NodeStatus getNodeStatus(int responseId) { } // These NMContainerStatus are sent on NM registration and used by YARN only. - private List getNMContainerStatuses() { + private List getNMContainerStatuses() throws IOException { List containerStatuses = new ArrayList(); for (Container container : this.context.getContainers().values()) { + ContainerId containerId = container.getContainerId(); + ApplicationId applicationId = container.getContainerId() + .getApplicationAttemptId().getApplicationId(); + if (!this.context.getApplications().containsKey(applicationId)) { + context.getContainers().remove(containerId); + context.getNMStateStore().removeContainer(containerId); + continue; + } NMContainerStatus status = container.getNMContainerStatus(); containerStatuses.add(status); @@ -402,26 +412,31 @@ private NodeStatus getNodeStatus(int responseId) { @Override public void addCompletedContainer(ContainerId containerId) { - synchronized (previousCompletedContainers) { - previousCompletedContainers.add(containerId); - } synchronized (recentlyStoppedContainers) { removeVeryOldStoppedContainersFromCache(); - recentlyStoppedContainers.put(containerId, - System.currentTimeMillis() + durationToTrackStoppedContainers); + if (!recentlyStoppedContainers.containsKey(containerId)) { + recentlyStoppedContainers.put(containerId, + System.currentTimeMillis() + durationToTrackStoppedContainers); + } } } - private void removeCompletedContainersFromContext() { - synchronized (previousCompletedContainers) { - if (!previousCompletedContainers.isEmpty()) { - for (ContainerId containerId : previousCompletedContainers) { - this.context.getContainers().remove(containerId); - } - LOG.info("Removed completed containers from NM context: " - + previousCompletedContainers); - previousCompletedContainers.clear(); - } + @VisibleForTesting + @Private + public void removeCompletedContainersFromContext( + ListcontainerIds) throws IOException { + Set removedContainers = new HashSet(); + + // If the AM has pulled the completedContainer it can be removed + for (ContainerId containerId : containerIds) { + context.getContainers().remove(containerId); + context.getNMStateStore().removeContainer(containerId); + removedContainers.add(containerId); + } + + if (!removedContainers.isEmpty()) { + LOG.info("Removed completed containers from NM context: " + + removedContainers); } } @@ -454,7 +469,7 @@ public boolean isContainerRecentlyStopped(ContainerId containerId) { return recentlyStoppedContainers.containsKey(containerId); } } - + @Override public void clearFinishedContainersFromCache() { synchronized (recentlyStoppedContainers) { @@ -542,7 +557,9 @@ public void run() { // don't want to remove the completed containers before resync // because these completed containers will be reported back to RM // when NM re-registers with RM. - removeCompletedContainersFromContext(); + // Only remove the cleanedup containers that are acked + removeCompletedContainersFromContext(response + .getFinishedContainersPulledByAM()); lastHeartBeatID = response.getResponseId(); List containersToCleanup = response diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java index acda2a9..1b6c0b3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java @@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; @@ -247,6 +248,10 @@ public RegisterNodeManagerResponse registerNodeManager( // put the completed container into the context getNMContext().getContainers().put( testCompleteContainer.getContainerId(), container); + getNMContext().getApplications().put( + testCompleteContainer.getContainerId() + .getApplicationAttemptId().getApplicationId(), + mock(Application.class)); } else { // second register contains the completed container info. List statuses = @@ -382,9 +387,17 @@ protected void rebootNodeStatusUpdaterAndRegisterWithRM() { if (containersShouldBePreserved) { Assert.assertFalse(containers.isEmpty()); Assert.assertTrue(containers.containsKey(existingCid)); + Assert.assertEquals(ContainerState.RUNNING, + containers.get(existingCid) + .cloneAndGetContainerStatus().getState()); } else { - // ensure that containers are empty before restart nodeStatusUpdater - Assert.assertTrue(containers.isEmpty()); + // ensure that containers are empty or are completed before + // restart nodeStatusUpdater + if (!containers.isEmpty()) { + Assert.assertEquals(ContainerState.COMPLETE, + containers.get(existingCid) + .cloneAndGetContainerStatus().getState()); + } } super.rebootNodeStatusUpdaterAndRegisterWithRM(); } @@ -465,7 +478,11 @@ protected void rebootNodeStatusUpdaterAndRegisterWithRM() { try { // ensure that containers are empty before restart nodeStatusUpdater - Assert.assertTrue(containers.isEmpty()); + if (!containers.isEmpty()) { + for (Container container: containers.values()) + Assert.assertEquals(ContainerState.COMPLETE, + container.cloneAndGetContainerStatus().getState()); + } super.rebootNodeStatusUpdaterAndRegisterWithRM(); // After this point new containers are free to be launched, except // containers from previous RM diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index f2a3a4a..2ec585f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.RMProxy; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; @@ -180,7 +181,7 @@ public RegisterNodeManagerResponse registerNodeManager( Map> map = new HashMap>(); for (ContainerStatus cs : containers) { - ApplicationId applicationId = + ApplicationId applicationId = cs.getContainerId().getApplicationAttemptId().getApplicationId(); List appContainers = map.get(applicationId); if (appContainers == null) { @@ -205,10 +206,10 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) nodeStatus.setResponseId(heartBeatID++); Map> appToContainers = getAppToContainerStatusMap(nodeStatus.getContainersStatuses()); - + ApplicationId appId1 = ApplicationId.newInstance(0, 1); ApplicationId appId2 = ApplicationId.newInstance(0, 2); - + if (heartBeatID == 1) { Assert.assertEquals(0, nodeStatus.getContainersStatuses().size()); @@ -419,7 +420,7 @@ protected void stopRMProxy() { } private class MyNodeManager extends NodeManager { - + private MyNodeStatusUpdater3 nodeStatusUpdater; @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, @@ -433,7 +434,7 @@ public MyNodeStatusUpdater3 getNodeStatusUpdater() { return this.nodeStatusUpdater; } } - + private class MyNodeManager2 extends NodeManager { public boolean isStopped = false; private NodeStatusUpdater nodeStatusUpdater; @@ -467,7 +468,7 @@ protected void serviceStop() throws Exception { syncBarrier.await(10000, TimeUnit.MILLISECONDS); } } - // + // private class MyResourceTracker2 implements ResourceTracker { public NodeAction heartBeatNodeAction = NodeAction.NORMAL; public NodeAction registerNodeAction = NodeAction.NORMAL; @@ -478,7 +479,7 @@ protected void serviceStop() throws Exception { public RegisterNodeManagerResponse registerNodeManager( RegisterNodeManagerRequest request) throws YarnException, IOException { - + RegisterNodeManagerResponse response = recordFactory .newRecordInstance(RegisterNodeManagerResponse.class); response.setNodeAction(registerNodeAction ); @@ -493,7 +494,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws YarnException, IOException { NodeStatus nodeStatus = request.getNodeStatus(); nodeStatus.setResponseId(heartBeatID++); - + NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils. newNodeHeartbeatResponse(heartBeatID, heartBeatNodeAction, null, null, null, null, 1000L); @@ -501,7 +502,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) return nhResponse; } } - + private class MyResourceTracker3 implements ResourceTracker { public NodeAction heartBeatNodeAction = NodeAction.NORMAL; public NodeAction registerNodeAction = NodeAction.NORMAL; @@ -513,7 +514,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) MyResourceTracker3(Context context) { this.context = context; } - + @Override public RegisterNodeManagerResponse registerNodeManager( RegisterNodeManagerRequest request) throws YarnException, @@ -564,6 +565,14 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) public NodeAction registerNodeAction = NodeAction.NORMAL; public NodeAction heartBeatNodeAction = NodeAction.NORMAL; private Context context; + private final ContainerStatus containerStatus2 = + createContainerStatus(2, ContainerState.RUNNING); + private final ContainerStatus containerStatus3 = + createContainerStatus(3, ContainerState.COMPLETE); + private final ContainerStatus containerStatus4 = + createContainerStatus(4, ContainerState.RUNNING); + private final ContainerStatus containerStatus5 = + createContainerStatus(5, ContainerState.COMPLETE); public MyResourceTracker4(Context context) { this.context = context; @@ -583,6 +592,8 @@ public RegisterNodeManagerResponse registerNodeManager( @Override public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws YarnException, IOException { + List finishedContainersPulledByAM = new ArrayList + (); try { if (heartBeatID == 0) { Assert.assertEquals(request.getNodeStatus().getContainersStatuses() @@ -594,10 +605,6 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) Assert.assertEquals(statuses.size(), 2); Assert.assertEquals(context.getContainers().size(), 2); - ContainerStatus containerStatus2 = - createContainerStatus(2, ContainerState.RUNNING); - ContainerStatus containerStatus3 = - createContainerStatus(3, ContainerState.COMPLETE); boolean container2Exist = false, container3Exist = false; for (ContainerStatus status : statuses) { if (status.getContainerId().equals( @@ -619,23 +626,14 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) // nodeStatusUpdaterRunnable, otherwise nm just shuts down and the // test passes. throw new YarnRuntimeException("Lost the heartbeat response"); - } else if (heartBeatID == 2) { + } else if (heartBeatID == 2 || heartBeatID == 3) { List statuses = request.getNodeStatus().getContainersStatuses(); Assert.assertEquals(statuses.size(), 4); Assert.assertEquals(context.getContainers().size(), 4); - ContainerStatus containerStatus2 = - createContainerStatus(2, ContainerState.RUNNING); - ContainerStatus containerStatus3 = - createContainerStatus(3, ContainerState.COMPLETE); - ContainerStatus containerStatus4 = - createContainerStatus(4, ContainerState.RUNNING); - ContainerStatus containerStatus5 = - createContainerStatus(5, ContainerState.COMPLETE); - - boolean container2Exist = false, container3Exist = false, container4Exist = - false, container5Exist = false; + boolean container2Exist = false, container3Exist = false, + container4Exist = false, container5Exist = false; for (ContainerStatus status : statuses) { if (status.getContainerId().equals( containerStatus2.getContainerId())) { @@ -664,6 +662,15 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) } Assert.assertTrue(container2Exist && container3Exist && container4Exist && container5Exist); + + if (heartBeatID == 3) { + finishedContainersPulledByAM.add(containerStatus3.getContainerId()); + } + } else if (heartBeatID == 4) { + List statuses = + request.getNodeStatus().getContainersStatuses(); + Assert.assertEquals(statuses.size(), 3); + Assert.assertEquals(context.getContainers().size(), 3); } } catch (AssertionError error) { error.printStackTrace(); @@ -676,6 +683,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID, heartBeatNodeAction, null, null, null, null, 1000L); + nhResponse.addFinishedContainersPulledByAM(finishedContainersPulledByAM); return nhResponse; } } @@ -686,7 +694,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) public RegisterNodeManagerResponse registerNodeManager( RegisterNodeManagerRequest request) throws YarnException, IOException { - + RegisterNodeManagerResponse response = recordFactory .newRecordInstance(RegisterNodeManagerResponse.class); response.setNodeAction(registerNodeAction ); @@ -694,7 +702,7 @@ public RegisterNodeManagerResponse registerNodeManager( response.setNMTokenMasterKey(createMasterKey()); return response; } - + @Override public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws YarnException, IOException { @@ -767,11 +775,11 @@ public void deleteBaseDir() throws IOException { lfs.delete(new Path(basedir.getPath()), true); } - @Test(timeout = 90000) - public void testRecentlyFinishedContainers() throws Exception { - NodeManager nm = new NodeManager(); - YarnConfiguration conf = new YarnConfiguration(); - conf.set( + @Test(timeout = 90000) + public void testRecentlyFinishedContainers() throws Exception { + NodeManager nm = new NodeManager(); + YarnConfiguration conf = new YarnConfiguration(); + conf.set( NodeStatusUpdaterImpl.YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS, "10000"); nm.init(conf); @@ -780,9 +788,11 @@ public void testRecentlyFinishedContainers() throws Exception { ApplicationId appId = ApplicationId.newInstance(0, 0); ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 0); - ContainerId cId = ContainerId.newInstance(appAttemptId, 0); - - + ContainerId cId = ContainerId.newInstance(appAttemptId, 0); + nm.getNMContext().getApplications().putIfAbsent(appId, + mock(Application.class)); + nm.getNMContext().getContainers().putIfAbsent(cId, mock(Container.class)); + nodeStatusUpdater.addCompletedContainer(cId); Assert.assertTrue(nodeStatusUpdater.isContainerRecentlyStopped(cId)); @@ -790,17 +800,99 @@ public void testRecentlyFinishedContainers() throws Exception { int waitInterval = 15; while (waitInterval-- > 0 && nodeStatusUpdater.isContainerRecentlyStopped(cId)) { - nodeStatusUpdater.removeVeryOldStoppedContainersFromCache(); + nodeStatusUpdater.removeVeryOldStoppedContainersFromCache(); Thread.sleep(1000); } - long time2 = System.currentTimeMillis(); + long time2 = System.currentTimeMillis(); // By this time the container will be removed from cache. need to verify. - Assert.assertFalse(nodeStatusUpdater.isContainerRecentlyStopped(cId)); - Assert.assertTrue((time2 - time1) >= 10000 && (time2 -time1) <= 250000); - } - + Assert.assertFalse(nodeStatusUpdater.isContainerRecentlyStopped(cId)); + Assert.assertTrue((time2 - time1) >= 10000 && (time2 - time1) <= 250000); + } + @Test(timeout = 90000) + public void testPreviousCompletedContainers() throws Exception { + NodeManager nm = new NodeManager(); + YarnConfiguration conf = new YarnConfiguration(); + conf.set( + NodeStatusUpdaterImpl + .YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS, + "10000"); + nm.init(conf); + NodeStatusUpdaterImpl nodeStatusUpdater = + (NodeStatusUpdaterImpl) nm.getNodeStatusUpdater(); + ApplicationId appId = ApplicationId.newInstance(0, 0); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 0); + ContainerId cId = ContainerId.newInstance(appAttemptId, 1); + Token containerToken = + BuilderUtils.newContainerToken(cId, "anyHost", 1234, "anyUser", + BuilderUtils.newResource(1024, 1), 0, 123, + "password".getBytes(), 0); + Container anyCompletedContainer = new ContainerImpl(conf, null, + null, null, null, null, + BuilderUtils.newContainerTokenIdentifier(containerToken)) { + + @Override + public ContainerState getCurrentState() { + return ContainerState.COMPLETE; + } + }; + + nm.getNMContext().getApplications().putIfAbsent(appId, + mock(Application.class)); + nm.getNMContext().getContainers().put(cId, anyCompletedContainer); + Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size()); + + List ackedContainers = new ArrayList(); + ackedContainers.add(cId); + + nodeStatusUpdater.removeCompletedContainersFromContext(ackedContainers); + Assert.assertTrue(nodeStatusUpdater.getContainerStatuses().isEmpty()); + } + + @Test + public void testCleanedupApplicationContainerCleanup() throws IOException { + NodeManager nm = new NodeManager(); + YarnConfiguration conf = new YarnConfiguration(); + conf.set(NodeStatusUpdaterImpl + .YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS, + "1000000"); + nm.init(conf); + + NodeStatusUpdaterImpl nodeStatusUpdater = + (NodeStatusUpdaterImpl) nm.getNodeStatusUpdater(); + ApplicationId appId = ApplicationId.newInstance(0, 0); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 0); + + ContainerId cId = ContainerId.newInstance(appAttemptId, 1); + Token containerToken = + BuilderUtils.newContainerToken(cId, "anyHost", 1234, "anyUser", + BuilderUtils.newResource(1024, 1), 0, 123, + "password".getBytes(), 0); + Container anyCompletedContainer = new ContainerImpl(conf, null, + null, null, null, null, + BuilderUtils.newContainerTokenIdentifier(containerToken)) { + + @Override + public ContainerState getCurrentState() { + return ContainerState.COMPLETE; + } + }; + + nm.getNMContext().getApplications().putIfAbsent(appId, + mock(Application.class)); + nm.getNMContext().getContainers().put(cId, anyCompletedContainer); + + Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size()); + + nm.getNMContext().getApplications().remove(appId); + nodeStatusUpdater.removeCompletedContainersFromContext(new ArrayList + ()); + Assert.assertEquals(0, nodeStatusUpdater.getContainerStatuses().size()); + } + @Test public void testNMRegistration() throws InterruptedException { nm = new NodeManager() { @@ -860,7 +952,7 @@ public void run() { nm.stop(); } - + @Test public void testStopReentrant() throws Exception { final AtomicInteger numCleanups = new AtomicInteger(0); @@ -875,7 +967,7 @@ protected NodeStatusUpdater createNodeStatusUpdater(Context context, myNodeStatusUpdater.resourceTracker = myResourceTracker2; return myNodeStatusUpdater; } - + @Override protected ContainerManagerImpl createContainerManager(Context context, ContainerExecutor exec, DeletionService del, @@ -897,7 +989,7 @@ public void cleanUpApplicationsOnNMShutDown() { YarnConfiguration conf = createNMConfig(); nm.init(conf); nm.start(); - + int waitCount = 0; while (heartBeatID < 1 && waitCount++ != 200) { Thread.sleep(500); @@ -906,7 +998,7 @@ public void cleanUpApplicationsOnNMShutDown() { // Meanwhile call stop directly as the shutdown hook would nm.stop(); - + // NM takes a while to reach the STOPPED state. waitCount = 0; while (nm.getServiceState() != STATE.STOPPED && waitCount++ != 20) { @@ -1172,9 +1264,13 @@ protected NMContext createNMContext( nm.start(); int waitCount = 0; - while (heartBeatID <= 3 && waitCount++ != 20) { + while (heartBeatID <= 4 && waitCount++ != 20) { Thread.sleep(500); } + if (heartBeatID <= 4) { + Assert.fail("Failed to get all heartbeats in time, " + + "heartbeatID:" + heartBeatID); + } if(assertionFailedInThread.get()) { Assert.fail("ContainerStatus Backup failed"); } @@ -1182,7 +1278,7 @@ protected NMContext createNMContext( } @Test(timeout = 200000) - public void testNodeStatusUpdaterRetryAndNMShutdown() + public void testNodeStatusUpdaterRetryAndNMShutdown() throws Exception { final long connectionWaitSecs = 1000; final long connectionRetryIntervalMs = 1000; @@ -1190,7 +1286,7 @@ public void testNodeStatusUpdaterRetryAndNMShutdown() conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS, connectionWaitSecs); conf.setLong(YarnConfiguration - .RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS, + .RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS, connectionRetryIntervalMs); conf.setLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS, 5000); conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1); @@ -1281,30 +1377,36 @@ public MyNMContext( } else if (heartBeatID == 1) { ContainerStatus containerStatus2 = createContainerStatus(2, ContainerState.RUNNING); - Container container2 = getMockContainer(containerStatus2); - containers.put(containerStatus2.getContainerId(), container2); + putMockContainer(containerStatus2); ContainerStatus containerStatus3 = createContainerStatus(3, ContainerState.COMPLETE); - Container container3 = getMockContainer(containerStatus3); - containers.put(containerStatus3.getContainerId(), container3); + putMockContainer(containerStatus3); return containers; } else if (heartBeatID == 2) { ContainerStatus containerStatus4 = createContainerStatus(4, ContainerState.RUNNING); - Container container4 = getMockContainer(containerStatus4); - containers.put(containerStatus4.getContainerId(), container4); + putMockContainer(containerStatus4); ContainerStatus containerStatus5 = createContainerStatus(5, ContainerState.COMPLETE); - Container container5 = getMockContainer(containerStatus5); - containers.put(containerStatus5.getContainerId(), container5); + putMockContainer(containerStatus5); + return containers; + } else if (heartBeatID == 3 || heartBeatID == 4) { return containers; } else { containers.clear(); return containers; } } + + private void putMockContainer(ContainerStatus containerStatus) { + Container container = getMockContainer(containerStatus); + containers.put(containerStatus.getContainerId(), container); + applications.putIfAbsent(containerStatus.getContainerId() + .getApplicationAttemptId().getApplicationId(), + mock(Application.class)); + } } public static ContainerStatus createContainerStatus(int id, @@ -1345,7 +1447,7 @@ private void verifyNodeStartFailure(String errMessage) throws Exception { throw e; } } - + // the service should be stopped Assert.assertEquals("NM state is wrong!", STATE.STOPPED, nm .getServiceState()); @@ -1364,7 +1466,7 @@ private YarnConfiguration createNMConfig() { } conf.setInt(YarnConfiguration.NM_PMEM_MB, 5 * 1024); // 5GB conf.set(YarnConfiguration.NM_ADDRESS, localhostAddress + ":12345"); - conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, localhostAddress + ":12346"); + conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, localhostAddress + ":12346"); conf.set(YarnConfiguration.NM_LOG_DIRS, logsDir.getAbsolutePath()); conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogsDir.getAbsolutePath()); @@ -1372,7 +1474,7 @@ private YarnConfiguration createNMConfig() { conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1); return conf; } - + private NodeManager getNodeManager(final NodeAction nodeHeartBeatAction) { return new NodeManager() { @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 4798120..4222888 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -198,7 +198,7 @@ protected void serviceStop() throws Exception { */ @SuppressWarnings("unchecked") @VisibleForTesting - void handleNMContainerStatus(NMContainerStatus containerStatus) { + void handleNMContainerStatus(NMContainerStatus containerStatus, NodeId nodeId) { ApplicationAttemptId appAttemptId = containerStatus.getContainerId().getApplicationAttemptId(); RMApp rmApp = @@ -229,7 +229,8 @@ void handleNMContainerStatus(NMContainerStatus containerStatus) { containerStatus.getContainerExitStatus()); // sending master container finished event. RMAppAttemptContainerFinishedEvent evt = - new RMAppAttemptContainerFinishedEvent(appAttemptId, status); + new RMAppAttemptContainerFinishedEvent(appAttemptId, status, + nodeId); rmContext.getDispatcher().getEventHandler().handle(evt); } } @@ -324,7 +325,7 @@ public RegisterNodeManagerResponse registerNodeManager( LOG.info("received container statuses on node manager register :" + request.getNMContainerStatuses()); for (NMContainerStatus status : request.getNMContainerStatuses()) { - handleNMContainerStatus(status); + handleNMContainerStatus(status, nodeId); } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java index b5ed92c..c211427 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt; import java.util.List; +import java.util.Map; import javax.crypto.SecretKey; @@ -31,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; @@ -120,12 +122,12 @@ List pullJustFinishedContainers(); /** - * Return the list of last set of finished containers. This does not reset the - * finished containers. + * Return the map of last set of finished containers to the corresponding + * node. This does not reset the finished containers. * @return the list of just finished contianers, this does not reset the * finished containers. */ - List getJustFinishedContainers(); + Map getJustFinishedContainers(); /** * The container on which the Application Master is running. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 1489696..3447069 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -26,7 +26,9 @@ import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; @@ -39,10 +41,10 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.net.Node; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -53,6 +55,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; @@ -84,8 +87,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeFinishedContainersPulledByAMEvent; + + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; @@ -131,9 +136,14 @@ private final ApplicationSubmissionContext submissionContext; private Token amrmToken = null; private SecretKey clientTokenMasterKey = null; - - private List justFinishedContainers = - new ArrayList(); + + private Map justFinishedContainers = + new HashMap(); + // Tracks the previous finished containers that are waiting to be + // verified as received by the AM. If the AM sends the next allocate + // request it implicitly acks this list. + private Map> finishedContainersSentToAM = + new HashMap>(); private Container masterContainer; private float progress = 0; @@ -629,7 +639,7 @@ public float getProgress() { } @Override - public List getJustFinishedContainers() { + public Map getJustFinishedContainers() { this.readLock.lock(); try { return this.justFinishedContainers; @@ -645,7 +655,32 @@ public float getProgress() { try { List returnList = new ArrayList( this.justFinishedContainers.size()); - returnList.addAll(this.justFinishedContainers); + returnList.addAll(this.justFinishedContainers.keySet()); + for (Map.Entry> finishedContainerStatuses: + this.finishedContainersSentToAM.entrySet()) { + // Implicitly acks the previous list as being received by the AM + + List containerIdList = new ArrayList(); + for (ContainerStatus containerStatus : finishedContainerStatuses + .getValue()) { + containerIdList.add(containerStatus.getContainerId()); + } + + eventHandler.handle(new RMNodeFinishedContainersPulledByAMEvent( + finishedContainerStatuses.getKey(), containerIdList)); + } + + finishedContainersSentToAM.clear(); + + for (Map.Entry entry:this + .justFinishedContainers.entrySet()) { + if (!finishedContainersSentToAM.containsKey(entry.getValue())) { + finishedContainersSentToAM.put(entry.getValue(), new ArrayList + ()); + } + finishedContainersSentToAM.get(entry.getValue()).add(entry.getKey()); + } + this.justFinishedContainers.clear(); return returnList; } finally { @@ -733,7 +768,7 @@ public void recover(RMState state) throws Exception { } setMasterContainer(attemptState.getMasterContainer()); recoverAppAttemptCredentials(attemptState.getAppAttemptCredentials(), - attemptState.getState()); + attemptState.getState()); this.recoveredFinalState = attemptState.getState(); this.originalTrackingUrl = attemptState.getFinalTrackingUrl(); this.proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl); @@ -1149,6 +1184,7 @@ public void transition(RMAppAttemptImpl appAttempt, break; } + appAttempt.finishedContainersSentToAM.clear(); appAttempt.eventHandler.handle(appEvent); appAttempt.eventHandler.handle(new AppAttemptRemovedSchedulerEvent( appAttemptId, finalAttemptState, keepContainersAcrossAppAttempts)); @@ -1516,7 +1552,8 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, } // Normal container.Put it in completed containers list - appAttempt.justFinishedContainers.add(containerStatus); + appAttempt.justFinishedContainers.put(containerStatus, + containerFinishedEvent.getNodeId()); return this.currentState; } } @@ -1532,7 +1569,8 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, ContainerStatus containerStatus = containerFinishedEvent.getContainerStatus(); // Normal container. Add it in completed containers list - appAttempt.justFinishedContainers.add(containerStatus); + appAttempt.justFinishedContainers.put(containerStatus, + containerFinishedEvent.getNodeId()); } } @@ -1573,7 +1611,8 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, return RMAppAttemptState.FINISHED; } // Normal container. - appAttempt.justFinishedContainers.add(containerStatus); + appAttempt.justFinishedContainers.put(containerStatus, + containerFinishedEvent.getNodeId()); return RMAppAttemptState.FINISHING; } } @@ -1608,7 +1647,8 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, return; } // Normal container. - appAttempt.justFinishedContainers.add(containerStatus); + appAttempt.justFinishedContainers.put(containerStatus, + containerFinishedEvent.getNodeId()); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerFinishedEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerFinishedEvent.java index 3660597..39c6f29 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerFinishedEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerFinishedEvent.java @@ -20,21 +20,27 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; public class RMAppAttemptContainerFinishedEvent extends RMAppAttemptEvent { private final ContainerStatus containerStatus; + private final NodeId nodeId; public RMAppAttemptContainerFinishedEvent(ApplicationAttemptId appAttemptId, - ContainerStatus containerStatus) { + ContainerStatus containerStatus, NodeId nodeId) { super(appAttemptId, RMAppAttemptEventType.CONTAINER_FINISHED); this.containerStatus = containerStatus; + this.nodeId = nodeId; } public ContainerStatus getContainerStatus() { return this.containerStatus; } + public NodeId getNodeId() { + return this.nodeId; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index 885e864..479734a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -78,13 +78,13 @@ RMContainerEventType.RECOVER, new ContainerRecoveredTransition()) // Transitions from RESERVED state - .addTransition(RMContainerState.RESERVED, RMContainerState.RESERVED, + .addTransition(RMContainerState.RESERVED, RMContainerState.RESERVED, RMContainerEventType.RESERVED, new ContainerReservedTransition()) - .addTransition(RMContainerState.RESERVED, RMContainerState.ALLOCATED, + .addTransition(RMContainerState.RESERVED, RMContainerState.ALLOCATED, RMContainerEventType.START, new ContainerStartedTransition()) - .addTransition(RMContainerState.RESERVED, RMContainerState.KILLED, + .addTransition(RMContainerState.RESERVED, RMContainerState.KILLED, RMContainerEventType.KILL) // nothing to do - .addTransition(RMContainerState.RESERVED, RMContainerState.RELEASED, + .addTransition(RMContainerState.RESERVED, RMContainerState.RELEASED, RMContainerEventType.RELEASED) // nothing to do @@ -100,7 +100,7 @@ .addTransition(RMContainerState.ACQUIRED, RMContainerState.RUNNING, RMContainerEventType.LAUNCHED, new LaunchedTransition()) .addTransition(RMContainerState.ACQUIRED, RMContainerState.COMPLETED, - RMContainerEventType.FINISHED, new ContainerFinishedAtAcquiredState()) + RMContainerEventType.FINISHED, new ContainerFinishedAtAcquiredState()) .addTransition(RMContainerState.ACQUIRED, RMContainerState.RELEASED, RMContainerEventType.RELEASED, new KillTransition()) .addTransition(RMContainerState.ACQUIRED, RMContainerState.EXPIRED, @@ -495,7 +495,8 @@ public void transition(RMContainerImpl container, RMContainerEvent event) { updateAttemptMetrics(container); container.eventHandler.handle(new RMAppAttemptContainerFinishedEvent( - container.appAttemptId, finishedEvent.getRemoteContainerStatus())); + container.appAttemptId, finishedEvent.getRemoteContainerStatus(), + container.getAllocatedNode())); container.rmContext.getRMApplicationHistoryWriter().containerFinished( container); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java index c0096b9..b4d0b8b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java @@ -40,6 +40,9 @@ CONTAINER_ALLOCATED, CLEANUP_CONTAINER, + // Source: RMAppAttempt + FINISHED_CONTAINERS_PULLED_BY_AM, + // Source: NMLivelinessMonitor EXPIRE } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeFinishedContainersPulledByAMEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeFinishedContainersPulledByAMEvent.java new file mode 100644 index 0000000..a4fb707 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeFinishedContainersPulledByAMEvent.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmnode; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; + +import java.util.List; + +// Happens after an implicit ack from AM that the container completion has +// been notified successfully to the AM +public class RMNodeFinishedContainersPulledByAMEvent extends RMNodeEvent { + + private List containers; + + public RMNodeFinishedContainersPulledByAMEvent(NodeId nodeId, + List containers) { + super(nodeId, RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM); + this.containers = containers; + } + + public List getContainers() { + return this.containers; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 3ce6416..75e1061 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -112,6 +112,10 @@ private final Set containersToClean = new TreeSet( new ContainerIdComparator()); + /* set of containers that were notified to AM about their completion */ + private final Set finishedContainersPulledByAM = + new HashSet(); + /* the list of applications that have finished and need to be purged */ private final List finishedApplications = new ArrayList(); @@ -135,7 +139,7 @@ new UpdateNodeResourceWhenUnusableTransition()) //Transitions from RUNNING state - .addTransition(NodeState.RUNNING, + .addTransition(NodeState.RUNNING, EnumSet.of(NodeState.RUNNING, NodeState.UNHEALTHY), RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenHealthyTransition()) .addTransition(NodeState.RUNNING, NodeState.DECOMMISSIONED, @@ -152,29 +156,33 @@ .addTransition(NodeState.RUNNING, NodeState.RUNNING, RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition()) .addTransition(NodeState.RUNNING, NodeState.RUNNING, + RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM, + new FinishedContainersPulledByAMTransition()) + .addTransition(NodeState.RUNNING, NodeState.RUNNING, RMNodeEventType.RECONNECTED, new ReconnectNodeTransition()) .addTransition(NodeState.RUNNING, NodeState.RUNNING, RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenRunningTransition()) //Transitions from REBOOTED state .addTransition(NodeState.REBOOTED, NodeState.REBOOTED, - RMNodeEventType.RESOURCE_UPDATE, + RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenUnusableTransition()) //Transitions from DECOMMISSIONED state .addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED, - RMNodeEventType.RESOURCE_UPDATE, + RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenUnusableTransition()) //Transitions from LOST state .addTransition(NodeState.LOST, NodeState.LOST, - RMNodeEventType.RESOURCE_UPDATE, + RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenUnusableTransition()) //Transitions from UNHEALTHY state - .addTransition(NodeState.UNHEALTHY, + .addTransition(NodeState.UNHEALTHY, EnumSet.of(NodeState.UNHEALTHY, NodeState.RUNNING), - RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenUnHealthyTransition()) + RMNodeEventType.STATUS_UPDATE, + new StatusUpdateWhenUnHealthyTransition()) .addTransition(NodeState.UNHEALTHY, NodeState.DECOMMISSIONED, RMNodeEventType.DECOMMISSION, new DeactivateNodeTransition(NodeState.DECOMMISSIONED)) @@ -192,7 +200,20 @@ RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition()) .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY, RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenUnusableTransition()) - + .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY, + RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM, + new FinishedContainersPulledByAMTransition()) + + //Transitions from DECOMMISSIONED state + .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY, + RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM, + new FinishedContainersPulledByAMTransition()) + + //Transitions from LOST state + .addTransition(NodeState.LOST, NodeState.LOST, + RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM, + new FinishedContainersPulledByAMTransition()) + // create the topology tables .installTopology(); @@ -365,8 +386,11 @@ public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response response.addAllContainersToCleanup( new ArrayList(this.containersToClean)); response.addAllApplicationsToCleanup(this.finishedApplications); + response.addFinishedContainersPulledByAM( + new ArrayList(this.finishedContainersPulledByAM)); this.containersToClean.clear(); this.finishedApplications.clear(); + this.finishedContainersPulledByAM.clear(); } finally { this.writeLock.unlock(); } @@ -617,6 +641,16 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { } } + public static class FinishedContainersPulledByAMTransition implements + SingleArcTransition { + + @Override + public void transition(RMNodeImpl rmNode, RMNodeEvent event) { + rmNode.finishedContainersPulledByAM.addAll((( + RMNodeFinishedContainersPulledByAMEvent) event).getContainers()); + } + } + public static class DeactivateNodeTransition implements SingleArcTransition { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 3817637..87cd562 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -23,6 +23,7 @@ import java.security.PrivilegedAction; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DataOutputBuffer; @@ -157,7 +158,8 @@ public void waitForContainerAllocated(MockNM nm, ContainerId containerId) public void waitForContainerToComplete(RMAppAttempt attempt, NMContainerStatus completedContainer) throws InterruptedException { while (true) { - List containers = attempt.getJustFinishedContainers(); + Set containers = attempt.getJustFinishedContainers() + .keySet(); System.out.println("Received completed containers " + containers); for (ContainerStatus container : containers) { if (container.getContainerId().equals( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 4827620..78b69cc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -490,7 +490,7 @@ public void testHandleContainerStatusInvalidCompletions() throws Exception { ApplicationAttemptId.newInstance(app.getApplicationId(), 2), 1), ContainerState.COMPLETE, Resource.newInstance(1024, 1), "Dummy Completed", 0, Priority.newInstance(10), 1234); - rm.getResourceTrackerService().handleNMContainerStatus(report); + rm.getResourceTrackerService().handleNMContainerStatus(report, null); verify(handler, never()).handle((Event) any()); // Case 1.2: Master container is null @@ -501,7 +501,7 @@ public void testHandleContainerStatusInvalidCompletions() throws Exception { ContainerId.newInstance(currentAttempt.getAppAttemptId(), 0), ContainerState.COMPLETE, Resource.newInstance(1024, 1), "Dummy Completed", 0, Priority.newInstance(10), 1234); - rm.getResourceTrackerService().handleNMContainerStatus(report); + rm.getResourceTrackerService().handleNMContainerStatus(report, null); verify(handler, never()).handle((Event)any()); // Case 2: Managed AM @@ -514,7 +514,7 @@ public void testHandleContainerStatusInvalidCompletions() throws Exception { ContainerState.COMPLETE, Resource.newInstance(1024, 1), "Dummy Completed", 0, Priority.newInstance(10), 1234); try { - rm.getResourceTrackerService().handleNMContainerStatus(report); + rm.getResourceTrackerService().handleNMContainerStatus(report, null); } catch (Exception e) { // expected - ignore } @@ -529,7 +529,7 @@ public void testHandleContainerStatusInvalidCompletions() throws Exception { ContainerState.COMPLETE, Resource.newInstance(1024, 1), "Dummy Completed", 0, Priority.newInstance(10), 1234); try { - rm.getResourceTrackerService().handleNMContainerStatus(report); + rm.getResourceTrackerService().handleNMContainerStatus(report, null); } catch (Exception e) { // expected - ignore } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index 6c5c818..20cf46a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -197,7 +197,8 @@ public void testAMRestartWithExistingContainers() throws Exception { waitForContainersToFinish(4, newAttempt); boolean container3Exists = false, container4Exists = false, container5Exists = false, container6Exists = false; - for(ContainerStatus status : newAttempt.getJustFinishedContainers()) { + for(ContainerStatus status : newAttempt.getJustFinishedContainers() + .keySet()) { if(status.getContainerId().equals(containerId3)) { // containerId3 is the container ran by previous attempt but finished by the // new attempt. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java index 6608ccd..1a6cef9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -203,7 +203,7 @@ public void handle(AMLauncherEvent event) { applicationMasterLauncher.handle(event); } } - + private static int appId = 1; private ApplicationSubmissionContext submissionContext = null; @@ -705,7 +705,8 @@ private void testUnmanagedAMSuccess(String url) { application.handle(new RMAppRunningOnNodeEvent(application.getApplicationId(), container.getNodeId())); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( - applicationAttempt.getAppAttemptId(), mock(ContainerStatus.class))); + applicationAttempt.getAppAttemptId(), mock(ContainerStatus.class), + container.getNodeId())); // complete AM String diagnostics = "Successful"; FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED; @@ -756,7 +757,7 @@ public void testUsageReport() { applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( attemptId, ContainerStatus.newInstance( - amContainer.getId(), ContainerState.COMPLETE, "", 0))); + amContainer.getId(), ContainerState.COMPLETE, "", 0), null)); when(scheduler.getSchedulerAppInfo(eq(attemptId))).thenReturn(null); @@ -859,7 +860,7 @@ public void testAMCrashAtScheduled() { // send CONTAINER_FINISHED event at SCHEDULED state, // The state should be FINAL_SAVING with previous state SCHEDULED applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( - applicationAttempt.getAppAttemptId(), cs)); + applicationAttempt.getAppAttemptId(), cs, null)); // createApplicationAttemptState will return previous state (SCHEDULED), // if the current state is FINAL_SAVING. assertEquals(YarnApplicationAttemptState.SCHEDULED, @@ -906,7 +907,7 @@ public void testAMCrashAtAllocated() { BuilderUtils.newContainerStatus(amContainer.getId(), ContainerState.COMPLETE, containerDiagMsg, exitCode); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( - applicationAttempt.getAppAttemptId(), cs)); + applicationAttempt.getAppAttemptId(), cs, null)); assertEquals(YarnApplicationAttemptState.ALLOCATED, applicationAttempt.createApplicationAttemptState()); sendAttemptUpdateSavedEvent(applicationAttempt); @@ -930,7 +931,7 @@ public void testRunningToFailed() { ContainerState.COMPLETE, containerDiagMsg, exitCode); ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId(); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( - appAttemptId, cs)); + appAttemptId, cs, null)); // ignored ContainerFinished and Expire at FinalSaving if we were supposed // to Failed state. @@ -938,7 +939,7 @@ public void testRunningToFailed() { applicationAttempt.getAppAttemptState()); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus( - amContainer.getId(), ContainerState.COMPLETE, "", 0))); + amContainer.getId(), ContainerState.COMPLETE, "", 0), null)); applicationAttempt.handle(new RMAppAttemptEvent( applicationAttempt.getAppAttemptId(), RMAppAttemptEventType.EXPIRE)); assertEquals(RMAppAttemptState.FINAL_SAVING, @@ -975,7 +976,7 @@ public void testRunningToKilled() { applicationAttempt.getAppAttemptState()); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus( - amContainer.getId(), ContainerState.COMPLETE, "", 0))); + amContainer.getId(), ContainerState.COMPLETE, "", 0), null)); applicationAttempt.handle(new RMAppAttemptEvent( applicationAttempt.getAppAttemptId(), RMAppAttemptEventType.EXPIRE)); assertEquals(RMAppAttemptState.FINAL_SAVING, @@ -1151,7 +1152,7 @@ public void testFinishingToFinishing() { BuilderUtils.newContainerStatus( BuilderUtils.newContainerId( applicationAttempt.getAppAttemptId(), 42), - ContainerState.COMPLETE, "", 0))); + ContainerState.COMPLETE, "", 0), null)); testAppAttemptFinishingState(amContainer, finalStatus, trackingUrl, diagnostics); } @@ -1170,7 +1171,7 @@ public void testSuccessfulFinishingToFinished() { new RMAppAttemptContainerFinishedEvent( applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus(amContainer.getId(), - ContainerState.COMPLETE, "", 0))); + ContainerState.COMPLETE, "", 0), null)); testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl, diagnostics, 0, false); } @@ -1198,7 +1199,7 @@ public void testSuccessfulFinishingToFinished() { // Container_finished event comes before Attempt_Saved event. applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus( - amContainer.getId(), ContainerState.COMPLETE, "", 0))); + amContainer.getId(), ContainerState.COMPLETE, "", 0), null)); assertEquals(RMAppAttemptState.FINAL_SAVING, applicationAttempt.getAppAttemptState()); // send attempt_saved @@ -1283,7 +1284,7 @@ public void testFailedToFailed() { ContainerState.COMPLETE, "some error", 123); ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId(); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( - appAttemptId, cs1)); + appAttemptId, cs1, null)); assertEquals(YarnApplicationAttemptState.RUNNING, applicationAttempt.createApplicationAttemptState()); sendAttemptUpdateSavedEvent(applicationAttempt); @@ -1299,10 +1300,10 @@ public void testFailedToFailed() { ContainerStatus.newInstance(ContainerId.newInstance(appAttemptId, 2), ContainerState.COMPLETE, "", 0); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( - appAttemptId, cs2)); + appAttemptId, cs2, null)); assertEquals(1, applicationAttempt.getJustFinishedContainers().size()); assertEquals(cs2.getContainerId(), applicationAttempt - .getJustFinishedContainers().get(0).getContainerId()); + .getJustFinishedContainers().keySet().iterator().next().getContainerId()); } @@ -1324,7 +1325,7 @@ public void testContainersCleanupForLastAttempt() { ContainerState.COMPLETE, "some error", 123); ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId(); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( - appAttemptId, cs1)); + appAttemptId, cs1, null)); assertEquals(YarnApplicationAttemptState.RUNNING, applicationAttempt.createApplicationAttemptState()); sendAttemptUpdateSavedEvent(applicationAttempt); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java index b3dc35f..a0d5d84 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java @@ -161,7 +161,7 @@ public void testTokenExpiry() throws Exception { .getEventHandler() .handle( new RMAppAttemptContainerFinishedEvent(applicationAttemptId, - containerStatus)); + containerStatus, nm1.getNodeId())); // Make sure the RMAppAttempt is at Finished State. // Both AMRMToken and ClientToAMToken have been removed.