diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java index 38dfa58..b418d47 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java @@ -30,6 +30,7 @@ NodeAction getNodeAction(); List getContainersToCleanup(); + List getCleanedupContainersNotified(); List getApplicationsToCleanup(); @@ -43,6 +44,7 @@ void setNMTokenMasterKey(MasterKey secretKey); void addAllContainersToCleanup(List containers); + void addCleanedupContainersNotified(List containers); void addAllApplicationsToCleanup(List applications); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index 775f95a..095c368 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -46,6 +46,7 @@ boolean viaProto = false; private List containersToCleanup = null; + private List cleanedupContainersNotified = null; private List applicationsToCleanup = null; private MasterKey containerTokenMasterKey = null; private MasterKey nmTokenMasterKey = null; @@ -73,6 +74,9 @@ private void mergeLocalToBuilder() { if (this.applicationsToCleanup != null) { addApplicationsToCleanupToProto(); } + if (this.cleanedupContainersNotified != null) { + addCleanedupContainersNotifiedToProto(); + } if (this.containerTokenMasterKey != null) { builder.setContainerTokenMasterKey( convertToProtoFormat(this.containerTokenMasterKey)); @@ -199,6 +203,12 @@ public void setDiagnosticsMessage(String diagnosticsMessage) { return this.containersToCleanup; } + @Override + public List getCleanedupContainersNotified() { + initCleanedupContainersNotified(); + return this.cleanedupContainersNotified; + } + private void initContainersToCleanup() { if (this.containersToCleanup != null) { return; @@ -212,6 +222,19 @@ private void initContainersToCleanup() { } } + private void initCleanedupContainersNotified() { + if (this.cleanedupContainersNotified != null) { + return; + } + NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getCleanedupContainersNotifiedList(); + this.cleanedupContainersNotified = new ArrayList(); + + for (ContainerIdProto c : list) { + this.cleanedupContainersNotified.add(convertFromProtoFormat(c)); + } + } + @Override public void addAllContainersToCleanup( final List containersToCleanup) { @@ -221,6 +244,15 @@ public void addAllContainersToCleanup( this.containersToCleanup.addAll(containersToCleanup); } + @Override + public void addCleanedupContainersNotified( + final List cleanedupContainersNotified) { + if (cleanedupContainersNotified == null) + return; + initCleanedupContainersNotified(); + this.cleanedupContainersNotified.addAll(cleanedupContainersNotified); + } + private void addContainersToCleanupToProto() { maybeInitBuilder(); builder.clearContainersToCleanup(); @@ -256,6 +288,41 @@ public void remove() { builder.addAllContainersToCleanup(iterable); } + private void addCleanedupContainersNotifiedToProto() { + maybeInitBuilder(); + builder.clearCleanedupContainersNotified(); + if (cleanedupContainersNotified == null) + return; + Iterable iterable = new Iterable() { + + @Override + public Iterator iterator() { + return new Iterator() { + + Iterator iter = cleanedupContainersNotified.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public ContainerIdProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + + } + }; + + } + }; + builder.addAllCleanedupContainersNotified(iterable); + } + @Override public List getApplicationsToCleanup() { initApplicationsToCleanup(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index 29cd64e..8918376 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -58,6 +58,7 @@ message NodeHeartbeatResponseProto { repeated ApplicationIdProto applications_to_cleanup = 6; optional int64 nextHeartBeatInterval = 7; optional string diagnostics_message = 8; + repeated ContainerIdProto cleanedup_containers_notified = 9; } message NMContainerStatusProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index a479be2..43770c1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -311,7 +311,7 @@ public void run() { public static class NMContext implements Context { private NodeId nodeId = null; - private final ConcurrentMap applications = + protected final ConcurrentMap applications = new ConcurrentHashMap(); protected final ConcurrentMap containers = new ConcurrentSkipListMap(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java index f0b99ec..875ef7b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.nodemanager; import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; public interface NodeStatusUpdater extends Service { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index b52b0fb..27c4ef0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -107,7 +107,7 @@ // This is used to track the current completed containers when nodeheartBeat // is called. These completed containers will be removed from NM context after // nodeHeartBeat succeeds and the response from the nodeHeartBeat is - // processed. + // processed and RM acks that the AM has received the notification. private final Set previousCompletedContainers; private final NodeHealthCheckerService healthChecker; private final NodeManagerMetrics metrics; @@ -406,21 +406,46 @@ public void addCompletedContainer(ContainerId containerId) { previousCompletedContainers.add(containerId); } synchronized (recentlyStoppedContainers) { - removeVeryOldStoppedContainersFromCache(); - recentlyStoppedContainers.put(containerId, - System.currentTimeMillis() + durationToTrackStoppedContainers); + removeVeryOldStoppedContainersFromContext(); + if (!recentlyStoppedContainers.containsKey(containerId)) { + recentlyStoppedContainers.put(containerId, + System.currentTimeMillis() + durationToTrackStoppedContainers); + } } } - private void removeCompletedContainersFromContext() { + @VisibleForTesting + @Private + public void removeCompletedContainersFromContext(List + containerIds) throws + IOException { + Set removedContainers = new HashSet(); synchronized (previousCompletedContainers) { - if (!previousCompletedContainers.isEmpty()) { - for (ContainerId containerId : previousCompletedContainers) { - this.context.getContainers().remove(containerId); + // Either if the RM acks the completedContainer it can be removed + for (ContainerId cleanedUpContainerId : containerIds) { + if (previousCompletedContainers.contains(cleanedUpContainerId)) { + previousCompletedContainers.remove(cleanedUpContainerId); + context.getContainers().remove(cleanedUpContainerId); + } + } + + // Or if the application no longer exists in can be removed + Iterator iterator = + previousCompletedContainers.iterator(); + while(iterator.hasNext()) { + ContainerId cid = iterator.next(); + ApplicationId applicationId = cid.getApplicationAttemptId() + .getApplicationId(); + if (!this.context.getApplications().containsKey(applicationId)) { + iterator.remove(); + context.getContainers().remove(cid); + removedContainers.add(cid); } - LOG.info("Removed completed containers from NM context: " - + previousCompletedContainers); - previousCompletedContainers.clear(); + } + + if (!removedContainers.isEmpty()) { + LOG.info("Removed completed containers from NM context that have " + + "been acked by AM: " + removedContainers); } } } @@ -454,7 +479,7 @@ public boolean isContainerRecentlyStopped(ContainerId containerId) { return recentlyStoppedContainers.containsKey(containerId); } } - + @Override public void clearFinishedContainersFromCache() { synchronized (recentlyStoppedContainers) { @@ -464,7 +489,7 @@ public void clearFinishedContainersFromCache() { @Private @VisibleForTesting - public void removeVeryOldStoppedContainersFromCache() { + public void removeVeryOldStoppedContainersFromContext() { synchronized (recentlyStoppedContainers) { long currentTime = System.currentTimeMillis(); Iterator i = @@ -474,6 +499,7 @@ public void removeVeryOldStoppedContainersFromCache() { if (recentlyStoppedContainers.get(cid) < currentTime) { i.remove(); try { + context.getContainers().remove(cid); context.getNMStateStore().removeContainer(cid); } catch (IOException e) { LOG.error("Unable to remove container " + cid + " in store", e); @@ -542,7 +568,9 @@ public void run() { // don't want to remove the completed containers before resync // because these completed containers will be reported back to RM // when NM re-registers with RM. - removeCompletedContainersFromContext(); + // Only remove the cleanedup containers that are acked + removeCompletedContainersFromContext(response + .getCleanedupContainersNotified()); lastHeartBeatID = response.getResponseId(); List containersToCleanup = response diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java index acda2a9..3ed9550 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java @@ -382,9 +382,17 @@ protected void rebootNodeStatusUpdaterAndRegisterWithRM() { if (containersShouldBePreserved) { Assert.assertFalse(containers.isEmpty()); Assert.assertTrue(containers.containsKey(existingCid)); + Assert.assertEquals(ContainerState.RUNNING, + containers.get(existingCid) + .cloneAndGetContainerStatus().getState()); } else { - // ensure that containers are empty before restart nodeStatusUpdater - Assert.assertTrue(containers.isEmpty()); + // ensure that containers are empty or are completed before + // restart nodeStatusUpdater + if (!containers.isEmpty()) { + Assert.assertEquals(ContainerState.COMPLETE, + containers.get(existingCid) + .cloneAndGetContainerStatus().getState()); + } } super.rebootNodeStatusUpdaterAndRegisterWithRM(); } @@ -465,7 +473,11 @@ protected void rebootNodeStatusUpdaterAndRegisterWithRM() { try { // ensure that containers are empty before restart nodeStatusUpdater - Assert.assertTrue(containers.isEmpty()); + if (!containers.isEmpty()) { + for (Container container: containers.values()) + Assert.assertEquals(ContainerState.COMPLETE, + container.cloneAndGetContainerStatus().getState()); + } super.rebootNodeStatusUpdaterAndRegisterWithRM(); // After this point new containers are free to be launched, except // containers from previous RM diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index f2a3a4a..7ee3cfa 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.RMProxy; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; @@ -180,7 +181,7 @@ public RegisterNodeManagerResponse registerNodeManager( Map> map = new HashMap>(); for (ContainerStatus cs : containers) { - ApplicationId applicationId = + ApplicationId applicationId = cs.getContainerId().getApplicationAttemptId().getApplicationId(); List appContainers = map.get(applicationId); if (appContainers == null) { @@ -205,10 +206,10 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) nodeStatus.setResponseId(heartBeatID++); Map> appToContainers = getAppToContainerStatusMap(nodeStatus.getContainersStatuses()); - + ApplicationId appId1 = ApplicationId.newInstance(0, 1); ApplicationId appId2 = ApplicationId.newInstance(0, 2); - + if (heartBeatID == 1) { Assert.assertEquals(0, nodeStatus.getContainersStatuses().size()); @@ -419,7 +420,7 @@ protected void stopRMProxy() { } private class MyNodeManager extends NodeManager { - + private MyNodeStatusUpdater3 nodeStatusUpdater; @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, @@ -433,7 +434,7 @@ public MyNodeStatusUpdater3 getNodeStatusUpdater() { return this.nodeStatusUpdater; } } - + private class MyNodeManager2 extends NodeManager { public boolean isStopped = false; private NodeStatusUpdater nodeStatusUpdater; @@ -467,7 +468,7 @@ protected void serviceStop() throws Exception { syncBarrier.await(10000, TimeUnit.MILLISECONDS); } } - // + // private class MyResourceTracker2 implements ResourceTracker { public NodeAction heartBeatNodeAction = NodeAction.NORMAL; public NodeAction registerNodeAction = NodeAction.NORMAL; @@ -478,7 +479,7 @@ protected void serviceStop() throws Exception { public RegisterNodeManagerResponse registerNodeManager( RegisterNodeManagerRequest request) throws YarnException, IOException { - + RegisterNodeManagerResponse response = recordFactory .newRecordInstance(RegisterNodeManagerResponse.class); response.setNodeAction(registerNodeAction ); @@ -493,7 +494,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws YarnException, IOException { NodeStatus nodeStatus = request.getNodeStatus(); nodeStatus.setResponseId(heartBeatID++); - + NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils. newNodeHeartbeatResponse(heartBeatID, heartBeatNodeAction, null, null, null, null, 1000L); @@ -501,7 +502,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) return nhResponse; } } - + private class MyResourceTracker3 implements ResourceTracker { public NodeAction heartBeatNodeAction = NodeAction.NORMAL; public NodeAction registerNodeAction = NodeAction.NORMAL; @@ -513,7 +514,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) MyResourceTracker3(Context context) { this.context = context; } - + @Override public RegisterNodeManagerResponse registerNodeManager( RegisterNodeManagerRequest request) throws YarnException, @@ -564,6 +565,14 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) public NodeAction registerNodeAction = NodeAction.NORMAL; public NodeAction heartBeatNodeAction = NodeAction.NORMAL; private Context context; + private final ContainerStatus containerStatus2 = + createContainerStatus(2, ContainerState.RUNNING); + private final ContainerStatus containerStatus3 = + createContainerStatus(3, ContainerState.COMPLETE); + private final ContainerStatus containerStatus4 = + createContainerStatus(4, ContainerState.RUNNING); + private final ContainerStatus containerStatus5 = + createContainerStatus(5, ContainerState.COMPLETE); public MyResourceTracker4(Context context) { this.context = context; @@ -583,6 +592,8 @@ public RegisterNodeManagerResponse registerNodeManager( @Override public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws YarnException, IOException { + List cleanedUpContainersNotified = new ArrayList + (); try { if (heartBeatID == 0) { Assert.assertEquals(request.getNodeStatus().getContainersStatuses() @@ -594,10 +605,6 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) Assert.assertEquals(statuses.size(), 2); Assert.assertEquals(context.getContainers().size(), 2); - ContainerStatus containerStatus2 = - createContainerStatus(2, ContainerState.RUNNING); - ContainerStatus containerStatus3 = - createContainerStatus(3, ContainerState.COMPLETE); boolean container2Exist = false, container3Exist = false; for (ContainerStatus status : statuses) { if (status.getContainerId().equals( @@ -619,23 +626,14 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) // nodeStatusUpdaterRunnable, otherwise nm just shuts down and the // test passes. throw new YarnRuntimeException("Lost the heartbeat response"); - } else if (heartBeatID == 2) { + } else if (heartBeatID == 2 || heartBeatID == 3) { List statuses = request.getNodeStatus().getContainersStatuses(); Assert.assertEquals(statuses.size(), 4); Assert.assertEquals(context.getContainers().size(), 4); - ContainerStatus containerStatus2 = - createContainerStatus(2, ContainerState.RUNNING); - ContainerStatus containerStatus3 = - createContainerStatus(3, ContainerState.COMPLETE); - ContainerStatus containerStatus4 = - createContainerStatus(4, ContainerState.RUNNING); - ContainerStatus containerStatus5 = - createContainerStatus(5, ContainerState.COMPLETE); - - boolean container2Exist = false, container3Exist = false, container4Exist = - false, container5Exist = false; + boolean container2Exist = false, container3Exist = false, + container4Exist = false, container5Exist = false; for (ContainerStatus status : statuses) { if (status.getContainerId().equals( containerStatus2.getContainerId())) { @@ -664,6 +662,15 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) } Assert.assertTrue(container2Exist && container3Exist && container4Exist && container5Exist); + + if (heartBeatID == 3) { + cleanedUpContainersNotified.add(containerStatus3.getContainerId()); + } + } else if (heartBeatID == 4) { + List statuses = + request.getNodeStatus().getContainersStatuses(); + Assert.assertEquals(statuses.size(), 3); + Assert.assertEquals(context.getContainers().size(), 3); } } catch (AssertionError error) { error.printStackTrace(); @@ -676,6 +683,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID, heartBeatNodeAction, null, null, null, null, 1000L); + nhResponse.addCleanedupContainersNotified(cleanedUpContainersNotified); return nhResponse; } } @@ -686,7 +694,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) public RegisterNodeManagerResponse registerNodeManager( RegisterNodeManagerRequest request) throws YarnException, IOException { - + RegisterNodeManagerResponse response = recordFactory .newRecordInstance(RegisterNodeManagerResponse.class); response.setNodeAction(registerNodeAction ); @@ -694,7 +702,7 @@ public RegisterNodeManagerResponse registerNodeManager( response.setNMTokenMasterKey(createMasterKey()); return response; } - + @Override public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws YarnException, IOException { @@ -767,11 +775,11 @@ public void deleteBaseDir() throws IOException { lfs.delete(new Path(basedir.getPath()), true); } - @Test(timeout = 90000) - public void testRecentlyFinishedContainers() throws Exception { - NodeManager nm = new NodeManager(); - YarnConfiguration conf = new YarnConfiguration(); - conf.set( + @Test(timeout = 90000) + public void testRecentlyFinishedContainers() throws Exception { + NodeManager nm = new NodeManager(); + YarnConfiguration conf = new YarnConfiguration(); + conf.set( NodeStatusUpdaterImpl.YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS, "10000"); nm.init(conf); @@ -780,9 +788,11 @@ public void testRecentlyFinishedContainers() throws Exception { ApplicationId appId = ApplicationId.newInstance(0, 0); ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 0); - ContainerId cId = ContainerId.newInstance(appAttemptId, 0); - - + ContainerId cId = ContainerId.newInstance(appAttemptId, 0); + nm.getNMContext().getApplications().putIfAbsent(appId, + mock(Application.class)); + nm.getNMContext().getContainers().putIfAbsent(cId, mock(Container.class)); + nodeStatusUpdater.addCompletedContainer(cId); Assert.assertTrue(nodeStatusUpdater.isContainerRecentlyStopped(cId)); @@ -790,17 +800,100 @@ public void testRecentlyFinishedContainers() throws Exception { int waitInterval = 15; while (waitInterval-- > 0 && nodeStatusUpdater.isContainerRecentlyStopped(cId)) { - nodeStatusUpdater.removeVeryOldStoppedContainersFromCache(); + nodeStatusUpdater.removeVeryOldStoppedContainersFromContext(); Thread.sleep(1000); } - long time2 = System.currentTimeMillis(); + long time2 = System.currentTimeMillis(); // By this time the container will be removed from cache. need to verify. - Assert.assertFalse(nodeStatusUpdater.isContainerRecentlyStopped(cId)); - Assert.assertTrue((time2 - time1) >= 10000 && (time2 -time1) <= 250000); - } - + Assert.assertFalse(nodeStatusUpdater.isContainerRecentlyStopped(cId)); + Assert.assertFalse(nm.getNMContext().getContainers().containsKey(cId)); + Assert.assertTrue((time2 - time1) >= 10000 && (time2 - time1) <= 250000); + } + @Test(timeout = 90000) + public void testPreviousCompletedContainers() throws Exception { + NodeManager nm = new NodeManager(); + YarnConfiguration conf = new YarnConfiguration(); + conf.set( + NodeStatusUpdaterImpl + .YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS, + "10000"); + nm.init(conf); + NodeStatusUpdaterImpl nodeStatusUpdater = + (NodeStatusUpdaterImpl) nm.getNodeStatusUpdater(); + ApplicationId appId = ApplicationId.newInstance(0, 0); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 0); + ContainerId cId = ContainerId.newInstance(appAttemptId, 1); + Token containerToken = + BuilderUtils.newContainerToken(cId, "anyHost", 1234, "anyUser", + BuilderUtils.newResource(1024, 1), 0, 123, + "password".getBytes(), 0); + Container anyCompletedContainer = new ContainerImpl(conf, null, + null, null, null, null, + BuilderUtils.newContainerTokenIdentifier(containerToken)) { + + @Override + public ContainerState getCurrentState() { + return ContainerState.COMPLETE; + } + }; + + nm.getNMContext().getApplications().putIfAbsent(appId, + mock(Application.class)); + nm.getNMContext().getContainers().put(cId, anyCompletedContainer); + Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size()); + + List ackedContainers = new ArrayList(); + ackedContainers.add(cId); + + nodeStatusUpdater.removeCompletedContainersFromContext(ackedContainers); + Assert.assertTrue(nodeStatusUpdater.getContainerStatuses().isEmpty()); + } + + @Test + public void testCleanedupApplicationContainerCleanup() throws IOException { + NodeManager nm = new NodeManager(); + YarnConfiguration conf = new YarnConfiguration(); + conf.set(NodeStatusUpdaterImpl + .YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS, + "1000000"); + nm.init(conf); + + NodeStatusUpdaterImpl nodeStatusUpdater = + (NodeStatusUpdaterImpl) nm.getNodeStatusUpdater(); + ApplicationId appId = ApplicationId.newInstance(0, 0); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 0); + + ContainerId cId = ContainerId.newInstance(appAttemptId, 1); + Token containerToken = + BuilderUtils.newContainerToken(cId, "anyHost", 1234, "anyUser", + BuilderUtils.newResource(1024, 1), 0, 123, + "password".getBytes(), 0); + Container anyCompletedContainer = new ContainerImpl(conf, null, + null, null, null, null, + BuilderUtils.newContainerTokenIdentifier(containerToken)) { + + @Override + public ContainerState getCurrentState() { + return ContainerState.COMPLETE; + } + }; + + nm.getNMContext().getApplications().putIfAbsent(appId, + mock(Application.class)); + nm.getNMContext().getContainers().put(cId, anyCompletedContainer); + + Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size()); + + nm.getNMContext().getApplications().remove(appId); + nodeStatusUpdater.removeCompletedContainersFromContext(new ArrayList + ()); + Assert.assertEquals(0, nodeStatusUpdater.getContainerStatuses().size()); + } + @Test public void testNMRegistration() throws InterruptedException { nm = new NodeManager() { @@ -860,7 +953,7 @@ public void run() { nm.stop(); } - + @Test public void testStopReentrant() throws Exception { final AtomicInteger numCleanups = new AtomicInteger(0); @@ -875,7 +968,7 @@ protected NodeStatusUpdater createNodeStatusUpdater(Context context, myNodeStatusUpdater.resourceTracker = myResourceTracker2; return myNodeStatusUpdater; } - + @Override protected ContainerManagerImpl createContainerManager(Context context, ContainerExecutor exec, DeletionService del, @@ -897,7 +990,7 @@ public void cleanUpApplicationsOnNMShutDown() { YarnConfiguration conf = createNMConfig(); nm.init(conf); nm.start(); - + int waitCount = 0; while (heartBeatID < 1 && waitCount++ != 200) { Thread.sleep(500); @@ -906,7 +999,7 @@ public void cleanUpApplicationsOnNMShutDown() { // Meanwhile call stop directly as the shutdown hook would nm.stop(); - + // NM takes a while to reach the STOPPED state. waitCount = 0; while (nm.getServiceState() != STATE.STOPPED && waitCount++ != 20) { @@ -1172,9 +1265,13 @@ protected NMContext createNMContext( nm.start(); int waitCount = 0; - while (heartBeatID <= 3 && waitCount++ != 20) { + while (heartBeatID <= 4 && waitCount++ != 20) { Thread.sleep(500); } + if (heartBeatID <= 4) { + Assert.fail("Failed to get all heartbeats in time, " + + "heartbeatID:" + heartBeatID); + } if(assertionFailedInThread.get()) { Assert.fail("ContainerStatus Backup failed"); } @@ -1182,7 +1279,7 @@ protected NMContext createNMContext( } @Test(timeout = 200000) - public void testNodeStatusUpdaterRetryAndNMShutdown() + public void testNodeStatusUpdaterRetryAndNMShutdown() throws Exception { final long connectionWaitSecs = 1000; final long connectionRetryIntervalMs = 1000; @@ -1190,7 +1287,7 @@ public void testNodeStatusUpdaterRetryAndNMShutdown() conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS, connectionWaitSecs); conf.setLong(YarnConfiguration - .RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS, + .RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS, connectionRetryIntervalMs); conf.setLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS, 5000); conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1); @@ -1281,30 +1378,36 @@ public MyNMContext( } else if (heartBeatID == 1) { ContainerStatus containerStatus2 = createContainerStatus(2, ContainerState.RUNNING); - Container container2 = getMockContainer(containerStatus2); - containers.put(containerStatus2.getContainerId(), container2); + putMockContainer(containerStatus2); ContainerStatus containerStatus3 = createContainerStatus(3, ContainerState.COMPLETE); - Container container3 = getMockContainer(containerStatus3); - containers.put(containerStatus3.getContainerId(), container3); + putMockContainer(containerStatus3); return containers; } else if (heartBeatID == 2) { ContainerStatus containerStatus4 = createContainerStatus(4, ContainerState.RUNNING); - Container container4 = getMockContainer(containerStatus4); - containers.put(containerStatus4.getContainerId(), container4); + putMockContainer(containerStatus4); ContainerStatus containerStatus5 = createContainerStatus(5, ContainerState.COMPLETE); - Container container5 = getMockContainer(containerStatus5); - containers.put(containerStatus5.getContainerId(), container5); + putMockContainer(containerStatus5); + return containers; + } else if (heartBeatID == 3 || heartBeatID == 4) { return containers; } else { containers.clear(); return containers; } } + + private void putMockContainer(ContainerStatus containerStatus) { + Container container = getMockContainer(containerStatus); + containers.put(containerStatus.getContainerId(), container); + applications.putIfAbsent(containerStatus.getContainerId() + .getApplicationAttemptId().getApplicationId(), + mock(Application.class)); + } } public static ContainerStatus createContainerStatus(int id, @@ -1345,7 +1448,7 @@ private void verifyNodeStartFailure(String errMessage) throws Exception { throw e; } } - + // the service should be stopped Assert.assertEquals("NM state is wrong!", STATE.STOPPED, nm .getServiceState()); @@ -1364,7 +1467,7 @@ private YarnConfiguration createNMConfig() { } conf.setInt(YarnConfiguration.NM_PMEM_MB, 5 * 1024); // 5GB conf.set(YarnConfiguration.NM_ADDRESS, localhostAddress + ":12345"); - conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, localhostAddress + ":12346"); + conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, localhostAddress + ":12346"); conf.set(YarnConfiguration.NM_LOG_DIRS, logsDir.getAbsolutePath()); conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogsDir.getAbsolutePath()); @@ -1372,7 +1475,7 @@ private YarnConfiguration createNMConfig() { conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1); return conf; } - + private NodeManager getNodeManager(final NodeAction nodeHeartBeatAction) { return new NodeManager() { @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 4798120..4222888 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -198,7 +198,7 @@ protected void serviceStop() throws Exception { */ @SuppressWarnings("unchecked") @VisibleForTesting - void handleNMContainerStatus(NMContainerStatus containerStatus) { + void handleNMContainerStatus(NMContainerStatus containerStatus, NodeId nodeId) { ApplicationAttemptId appAttemptId = containerStatus.getContainerId().getApplicationAttemptId(); RMApp rmApp = @@ -229,7 +229,8 @@ void handleNMContainerStatus(NMContainerStatus containerStatus) { containerStatus.getContainerExitStatus()); // sending master container finished event. RMAppAttemptContainerFinishedEvent evt = - new RMAppAttemptContainerFinishedEvent(appAttemptId, status); + new RMAppAttemptContainerFinishedEvent(appAttemptId, status, + nodeId); rmContext.getDispatcher().getEventHandler().handle(evt); } } @@ -324,7 +325,7 @@ public RegisterNodeManagerResponse registerNodeManager( LOG.info("received container statuses on node manager register :" + request.getNMContainerStatuses()); for (NMContainerStatus status : request.getNMContainerStatuses()) { - handleNMContainerStatus(status); + handleNMContainerStatus(status, nodeId); } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java index b5ed92c..c211427 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt; import java.util.List; +import java.util.Map; import javax.crypto.SecretKey; @@ -31,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; @@ -120,12 +122,12 @@ List pullJustFinishedContainers(); /** - * Return the list of last set of finished containers. This does not reset the - * finished containers. + * Return the map of last set of finished containers to the corresponding + * node. This does not reset the finished containers. * @return the list of just finished contianers, this does not reset the * finished containers. */ - List getJustFinishedContainers(); + Map getJustFinishedContainers(); /** * The container on which the Application Master is running. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 19fc800..253aa69 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -26,7 +26,9 @@ import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; @@ -42,7 +44,6 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -53,6 +54,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; @@ -84,6 +86,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanedupContainerNotifiedEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; @@ -130,9 +133,14 @@ private final ApplicationSubmissionContext submissionContext; private Token amrmToken = null; private SecretKey clientTokenMasterKey = null; - - private List justFinishedContainers = - new ArrayList(); + + private Map justFinishedContainers = + new HashMap(); + // Tracks the previous finished containers that are waiting to be + // verified as received by the AM. If the AM sends the next allocate + // request it implicitly acks this list. + private Map finishedContainersSentToAM = + new HashMap(); private Container masterContainer; private float progress = 0; @@ -627,7 +635,7 @@ public float getProgress() { } @Override - public List getJustFinishedContainers() { + public Map getJustFinishedContainers() { this.readLock.lock(); try { return this.justFinishedContainers; @@ -643,7 +651,16 @@ public float getProgress() { try { List returnList = new ArrayList( this.justFinishedContainers.size()); - returnList.addAll(this.justFinishedContainers); + returnList.addAll(this.justFinishedContainers.keySet()); + for (Map.Entry finishedContainerStatus: this + .finishedContainersSentToAM.entrySet()) { + // Implicitly acks the previous list as being received by the AM + eventHandler.handle(new RMNodeCleanedupContainerNotifiedEvent( + finishedContainerStatus.getValue(), finishedContainerStatus + .getKey().getContainerId())); + } + finishedContainersSentToAM.clear(); + finishedContainersSentToAM.putAll(this.justFinishedContainers); this.justFinishedContainers.clear(); return returnList; } finally { @@ -1501,7 +1518,8 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, } // Normal container.Put it in completed containers list - appAttempt.justFinishedContainers.add(containerStatus); + appAttempt.justFinishedContainers.put(containerStatus, + containerFinishedEvent.getNodeId()); return this.currentState; } } @@ -1517,7 +1535,8 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, ContainerStatus containerStatus = containerFinishedEvent.getContainerStatus(); // Normal container. Add it in completed containers list - appAttempt.justFinishedContainers.add(containerStatus); + appAttempt.justFinishedContainers.put(containerStatus, + containerFinishedEvent.getNodeId()); } } @@ -1558,7 +1577,8 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, return RMAppAttemptState.FINISHED; } // Normal container. - appAttempt.justFinishedContainers.add(containerStatus); + appAttempt.justFinishedContainers.put(containerStatus, + containerFinishedEvent.getNodeId()); return RMAppAttemptState.FINISHING; } } @@ -1593,7 +1613,8 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, return; } // Normal container. - appAttempt.justFinishedContainers.add(containerStatus); + appAttempt.justFinishedContainers.put(containerStatus, + containerFinishedEvent.getNodeId()); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerFinishedEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerFinishedEvent.java index 3660597..b1feae2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerFinishedEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerFinishedEvent.java @@ -20,21 +20,25 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; public class RMAppAttemptContainerFinishedEvent extends RMAppAttemptEvent { private final ContainerStatus containerStatus; + private final NodeId nodeId; public RMAppAttemptContainerFinishedEvent(ApplicationAttemptId appAttemptId, - ContainerStatus containerStatus) { + ContainerStatus containerStatus, NodeId nodeId) { super(appAttemptId, RMAppAttemptEventType.CONTAINER_FINISHED); this.containerStatus = containerStatus; + this.nodeId = nodeId; } public ContainerStatus getContainerStatus() { return this.containerStatus; } + public NodeId getNodeId() { return this.nodeId; } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index eef361f..0c867be 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -76,13 +76,13 @@ RMContainerEventType.RECOVER, new ContainerRecoveredTransition()) // Transitions from RESERVED state - .addTransition(RMContainerState.RESERVED, RMContainerState.RESERVED, + .addTransition(RMContainerState.RESERVED, RMContainerState.RESERVED, RMContainerEventType.RESERVED, new ContainerReservedTransition()) - .addTransition(RMContainerState.RESERVED, RMContainerState.ALLOCATED, + .addTransition(RMContainerState.RESERVED, RMContainerState.ALLOCATED, RMContainerEventType.START, new ContainerStartedTransition()) - .addTransition(RMContainerState.RESERVED, RMContainerState.KILLED, + .addTransition(RMContainerState.RESERVED, RMContainerState.KILLED, RMContainerEventType.KILL) // nothing to do - .addTransition(RMContainerState.RESERVED, RMContainerState.RELEASED, + .addTransition(RMContainerState.RESERVED, RMContainerState.RELEASED, RMContainerEventType.RELEASED) // nothing to do @@ -98,7 +98,7 @@ .addTransition(RMContainerState.ACQUIRED, RMContainerState.RUNNING, RMContainerEventType.LAUNCHED, new LaunchedTransition()) .addTransition(RMContainerState.ACQUIRED, RMContainerState.COMPLETED, - RMContainerEventType.FINISHED, new ContainerFinishedAtAcquiredState()) + RMContainerEventType.FINISHED, new ContainerFinishedAtAcquiredState()) .addTransition(RMContainerState.ACQUIRED, RMContainerState.RELEASED, RMContainerEventType.RELEASED, new KillTransition()) .addTransition(RMContainerState.ACQUIRED, RMContainerState.EXPIRED, @@ -491,7 +491,8 @@ public void transition(RMContainerImpl container, RMContainerEvent event) { updateMetricsIfPreempted(container); container.eventHandler.handle(new RMAppAttemptContainerFinishedEvent( - container.appAttemptId, finishedEvent.getRemoteContainerStatus())); + container.appAttemptId, finishedEvent.getRemoteContainerStatus(), + container.getAllocatedNode())); container.rmContext.getRMApplicationHistoryWriter().containerFinished( container); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeCleanedupContainerNotifiedEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeCleanedupContainerNotifiedEvent.java new file mode 100644 index 0000000..ab8cdbd --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeCleanedupContainerNotifiedEvent.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmnode; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; + +// Happens after an implicit ack from AM that the container completion has +// been notified successfully to the AM +public class RMNodeCleanedupContainerNotifiedEvent extends RMNodeEvent { + + private ContainerId contId; + + public RMNodeCleanedupContainerNotifiedEvent(NodeId nodeId, + ContainerId contId) { + super(nodeId, RMNodeEventType.CLEANEDUP_CONTAINER_NOTIFIED); + this.contId = contId; + } + + public ContainerId getContainerId() { + return this.contId; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java index c0096b9..47f4ed7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java @@ -39,6 +39,7 @@ // Source: Container CONTAINER_ALLOCATED, CLEANUP_CONTAINER, + CLEANEDUP_CONTAINER_NOTIFIED, // Source: NMLivelinessMonitor EXPIRE diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 3ce6416..c20ddd0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -112,6 +112,10 @@ private final Set containersToClean = new TreeSet( new ContainerIdComparator()); + /* set of containers that were notified to AM about their completion */ + private final Set cleanupContainersNotified = + new HashSet(); + /* the list of applications that have finished and need to be purged */ private final List finishedApplications = new ArrayList(); @@ -135,7 +139,7 @@ new UpdateNodeResourceWhenUnusableTransition()) //Transitions from RUNNING state - .addTransition(NodeState.RUNNING, + .addTransition(NodeState.RUNNING, EnumSet.of(NodeState.RUNNING, NodeState.UNHEALTHY), RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenHealthyTransition()) .addTransition(NodeState.RUNNING, NodeState.DECOMMISSIONED, @@ -152,6 +156,9 @@ .addTransition(NodeState.RUNNING, NodeState.RUNNING, RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition()) .addTransition(NodeState.RUNNING, NodeState.RUNNING, + RMNodeEventType.CLEANEDUP_CONTAINER_NOTIFIED, + new CleanedupContainerNotifiedTransition()) + .addTransition(NodeState.RUNNING, NodeState.RUNNING, RMNodeEventType.RECONNECTED, new ReconnectNodeTransition()) .addTransition(NodeState.RUNNING, NodeState.RUNNING, RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenRunningTransition()) @@ -172,7 +179,7 @@ new UpdateNodeResourceWhenUnusableTransition()) //Transitions from UNHEALTHY state - .addTransition(NodeState.UNHEALTHY, + .addTransition(NodeState.UNHEALTHY, EnumSet.of(NodeState.UNHEALTHY, NodeState.RUNNING), RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenUnHealthyTransition()) .addTransition(NodeState.UNHEALTHY, NodeState.DECOMMISSIONED, @@ -192,7 +199,20 @@ RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition()) .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY, RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenUnusableTransition()) - + .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY, + RMNodeEventType.CLEANEDUP_CONTAINER_NOTIFIED, + new CleanedupContainerNotifiedTransition()) + + //Transitions from DECOMMISSIONED state + .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY, + RMNodeEventType.CLEANEDUP_CONTAINER_NOTIFIED, + new CleanedupContainerNotifiedTransition()) + + //Transitions from LOST state + .addTransition(NodeState.LOST, NodeState.LOST, + RMNodeEventType.CLEANEDUP_CONTAINER_NOTIFIED, + new CleanedupContainerNotifiedTransition()) + // create the topology tables .installTopology(); @@ -365,8 +385,11 @@ public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response response.addAllContainersToCleanup( new ArrayList(this.containersToClean)); response.addAllApplicationsToCleanup(this.finishedApplications); + response.addCleanedupContainersNotified( + new ArrayList(this.cleanupContainersNotified)); this.containersToClean.clear(); this.finishedApplications.clear(); + this.cleanupContainersNotified.clear(); } finally { this.writeLock.unlock(); } @@ -617,6 +640,16 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { } } + public static class CleanedupContainerNotifiedTransition implements + SingleArcTransition { + + @Override + public void transition(RMNodeImpl rmNode, RMNodeEvent event) { + rmNode.cleanupContainersNotified.add((( + RMNodeCleanedupContainerNotifiedEvent) event).getContainerId()); + } + } + public static class DeactivateNodeTransition implements SingleArcTransition { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 3817637..87cd562 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -23,6 +23,7 @@ import java.security.PrivilegedAction; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DataOutputBuffer; @@ -157,7 +158,8 @@ public void waitForContainerAllocated(MockNM nm, ContainerId containerId) public void waitForContainerToComplete(RMAppAttempt attempt, NMContainerStatus completedContainer) throws InterruptedException { while (true) { - List containers = attempt.getJustFinishedContainers(); + Set containers = attempt.getJustFinishedContainers() + .keySet(); System.out.println("Received completed containers " + containers); for (ContainerStatus container : containers) { if (container.getContainerId().equals( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 4827620..78b69cc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -490,7 +490,7 @@ public void testHandleContainerStatusInvalidCompletions() throws Exception { ApplicationAttemptId.newInstance(app.getApplicationId(), 2), 1), ContainerState.COMPLETE, Resource.newInstance(1024, 1), "Dummy Completed", 0, Priority.newInstance(10), 1234); - rm.getResourceTrackerService().handleNMContainerStatus(report); + rm.getResourceTrackerService().handleNMContainerStatus(report, null); verify(handler, never()).handle((Event) any()); // Case 1.2: Master container is null @@ -501,7 +501,7 @@ public void testHandleContainerStatusInvalidCompletions() throws Exception { ContainerId.newInstance(currentAttempt.getAppAttemptId(), 0), ContainerState.COMPLETE, Resource.newInstance(1024, 1), "Dummy Completed", 0, Priority.newInstance(10), 1234); - rm.getResourceTrackerService().handleNMContainerStatus(report); + rm.getResourceTrackerService().handleNMContainerStatus(report, null); verify(handler, never()).handle((Event)any()); // Case 2: Managed AM @@ -514,7 +514,7 @@ public void testHandleContainerStatusInvalidCompletions() throws Exception { ContainerState.COMPLETE, Resource.newInstance(1024, 1), "Dummy Completed", 0, Priority.newInstance(10), 1234); try { - rm.getResourceTrackerService().handleNMContainerStatus(report); + rm.getResourceTrackerService().handleNMContainerStatus(report, null); } catch (Exception e) { // expected - ignore } @@ -529,7 +529,7 @@ public void testHandleContainerStatusInvalidCompletions() throws Exception { ContainerState.COMPLETE, Resource.newInstance(1024, 1), "Dummy Completed", 0, Priority.newInstance(10), 1234); try { - rm.getResourceTrackerService().handleNMContainerStatus(report); + rm.getResourceTrackerService().handleNMContainerStatus(report, null); } catch (Exception e) { // expected - ignore } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index 6c5c818..20cf46a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -197,7 +197,8 @@ public void testAMRestartWithExistingContainers() throws Exception { waitForContainersToFinish(4, newAttempt); boolean container3Exists = false, container4Exists = false, container5Exists = false, container6Exists = false; - for(ContainerStatus status : newAttempt.getJustFinishedContainers()) { + for(ContainerStatus status : newAttempt.getJustFinishedContainers() + .keySet()) { if(status.getContainerId().equals(containerId3)) { // containerId3 is the container ran by previous attempt but finished by the // new attempt. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java index efcecd9..83132ef 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -192,7 +192,7 @@ public void handle(AMLauncherEvent event) { applicationMasterLauncher.handle(event); } } - + private static int appId = 1; private ApplicationSubmissionContext submissionContext = null; @@ -678,7 +678,8 @@ private void testUnmanagedAMSuccess(String url) { application.handle(new RMAppRunningOnNodeEvent(application.getApplicationId(), container.getNodeId())); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( - applicationAttempt.getAppAttemptId(), mock(ContainerStatus.class))); + applicationAttempt.getAppAttemptId(), mock(ContainerStatus.class), + container.getNodeId())); // complete AM String diagnostics = "Successful"; FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED; @@ -792,7 +793,7 @@ public void testAMCrashAtScheduled() { // send CONTAINER_FINISHED event at SCHEDULED state, // The state should be FINAL_SAVING with previous state SCHEDULED applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( - applicationAttempt.getAppAttemptId(), cs)); + applicationAttempt.getAppAttemptId(), cs, null)); // createApplicationAttemptState will return previous state (SCHEDULED), // if the current state is FINAL_SAVING. assertEquals(YarnApplicationAttemptState.SCHEDULED, @@ -839,7 +840,7 @@ public void testAMCrashAtAllocated() { BuilderUtils.newContainerStatus(amContainer.getId(), ContainerState.COMPLETE, containerDiagMsg, exitCode); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( - applicationAttempt.getAppAttemptId(), cs)); + applicationAttempt.getAppAttemptId(), cs, null)); assertEquals(YarnApplicationAttemptState.ALLOCATED, applicationAttempt.createApplicationAttemptState()); sendAttemptUpdateSavedEvent(applicationAttempt); @@ -863,7 +864,7 @@ public void testRunningToFailed() { ContainerState.COMPLETE, containerDiagMsg, exitCode); ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId(); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( - appAttemptId, cs)); + appAttemptId, cs, null)); // ignored ContainerFinished and Expire at FinalSaving if we were supposed // to Failed state. @@ -871,7 +872,7 @@ public void testRunningToFailed() { applicationAttempt.getAppAttemptState()); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus( - amContainer.getId(), ContainerState.COMPLETE, "", 0))); + amContainer.getId(), ContainerState.COMPLETE, "", 0), null)); applicationAttempt.handle(new RMAppAttemptEvent( applicationAttempt.getAppAttemptId(), RMAppAttemptEventType.EXPIRE)); assertEquals(RMAppAttemptState.FINAL_SAVING, @@ -908,7 +909,7 @@ public void testRunningToKilled() { applicationAttempt.getAppAttemptState()); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus( - amContainer.getId(), ContainerState.COMPLETE, "", 0))); + amContainer.getId(), ContainerState.COMPLETE, "", 0), null)); applicationAttempt.handle(new RMAppAttemptEvent( applicationAttempt.getAppAttemptId(), RMAppAttemptEventType.EXPIRE)); assertEquals(RMAppAttemptState.FINAL_SAVING, @@ -1084,7 +1085,7 @@ public void testFinishingToFinishing() { BuilderUtils.newContainerStatus( BuilderUtils.newContainerId( applicationAttempt.getAppAttemptId(), 42), - ContainerState.COMPLETE, "", 0))); + ContainerState.COMPLETE, "", 0), null)); testAppAttemptFinishingState(amContainer, finalStatus, trackingUrl, diagnostics); } @@ -1103,7 +1104,7 @@ public void testSuccessfulFinishingToFinished() { new RMAppAttemptContainerFinishedEvent( applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus(amContainer.getId(), - ContainerState.COMPLETE, "", 0))); + ContainerState.COMPLETE, "", 0), null)); testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl, diagnostics, 0, false); } @@ -1131,7 +1132,7 @@ public void testSuccessfulFinishingToFinished() { // Container_finished event comes before Attempt_Saved event. applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus( - amContainer.getId(), ContainerState.COMPLETE, "", 0))); + amContainer.getId(), ContainerState.COMPLETE, "", 0), null)); assertEquals(RMAppAttemptState.FINAL_SAVING, applicationAttempt.getAppAttemptState()); // send attempt_saved @@ -1216,7 +1217,7 @@ public void testFailedToFailed() { ContainerState.COMPLETE, "some error", 123); ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId(); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( - appAttemptId, cs1)); + appAttemptId, cs1, null)); assertEquals(YarnApplicationAttemptState.RUNNING, applicationAttempt.createApplicationAttemptState()); sendAttemptUpdateSavedEvent(applicationAttempt); @@ -1232,10 +1233,10 @@ public void testFailedToFailed() { ContainerStatus.newInstance(ContainerId.newInstance(appAttemptId, 2), ContainerState.COMPLETE, "", 0); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( - appAttemptId, cs2)); + appAttemptId, cs2, null)); assertEquals(1, applicationAttempt.getJustFinishedContainers().size()); assertEquals(cs2.getContainerId(), applicationAttempt - .getJustFinishedContainers().get(0).getContainerId()); + .getJustFinishedContainers().keySet().iterator().next().getContainerId()); } @@ -1257,7 +1258,7 @@ public void testContainersCleanupForLastAttempt() { ContainerState.COMPLETE, "some error", 123); ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId(); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( - appAttemptId, cs1)); + appAttemptId, cs1, null)); assertEquals(YarnApplicationAttemptState.RUNNING, applicationAttempt.createApplicationAttemptState()); sendAttemptUpdateSavedEvent(applicationAttempt); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java index b3dc35f..a0d5d84 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java @@ -161,7 +161,7 @@ public void testTokenExpiry() throws Exception { .getEventHandler() .handle( new RMAppAttemptContainerFinishedEvent(applicationAttemptId, - containerStatus)); + containerStatus, nm1.getNodeId())); // Make sure the RMAppAttempt is at Finished State. // Both AMRMToken and ClientToAMToken have been removed.