diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java index 38dfa58..b418d47 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java @@ -30,6 +30,7 @@ NodeAction getNodeAction(); List getContainersToCleanup(); + List getCleanedupContainersNotified(); List getApplicationsToCleanup(); @@ -43,6 +44,7 @@ void setNMTokenMasterKey(MasterKey secretKey); void addAllContainersToCleanup(List containers); + void addCleanedupContainersNotified(List containers); void addAllApplicationsToCleanup(List applications); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index 775f95a..095c368 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -46,6 +46,7 @@ boolean viaProto = false; private List containersToCleanup = null; + private List cleanedupContainersNotified = null; private List applicationsToCleanup = null; private MasterKey containerTokenMasterKey = null; private MasterKey nmTokenMasterKey = null; @@ -73,6 +74,9 @@ private void mergeLocalToBuilder() { if (this.applicationsToCleanup != null) { addApplicationsToCleanupToProto(); } + if (this.cleanedupContainersNotified != null) { + addCleanedupContainersNotifiedToProto(); + } if (this.containerTokenMasterKey != null) { builder.setContainerTokenMasterKey( convertToProtoFormat(this.containerTokenMasterKey)); @@ -199,6 +203,12 @@ public void setDiagnosticsMessage(String diagnosticsMessage) { return this.containersToCleanup; } + @Override + public List getCleanedupContainersNotified() { + initCleanedupContainersNotified(); + return this.cleanedupContainersNotified; + } + private void initContainersToCleanup() { if (this.containersToCleanup != null) { return; @@ -212,6 +222,19 @@ private void initContainersToCleanup() { } } + private void initCleanedupContainersNotified() { + if (this.cleanedupContainersNotified != null) { + return; + } + NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getCleanedupContainersNotifiedList(); + this.cleanedupContainersNotified = new ArrayList(); + + for (ContainerIdProto c : list) { + this.cleanedupContainersNotified.add(convertFromProtoFormat(c)); + } + } + @Override public void addAllContainersToCleanup( final List containersToCleanup) { @@ -221,6 +244,15 @@ public void addAllContainersToCleanup( this.containersToCleanup.addAll(containersToCleanup); } + @Override + public void addCleanedupContainersNotified( + final List cleanedupContainersNotified) { + if (cleanedupContainersNotified == null) + return; + initCleanedupContainersNotified(); + this.cleanedupContainersNotified.addAll(cleanedupContainersNotified); + } + private void addContainersToCleanupToProto() { maybeInitBuilder(); builder.clearContainersToCleanup(); @@ -256,6 +288,41 @@ public void remove() { builder.addAllContainersToCleanup(iterable); } + private void addCleanedupContainersNotifiedToProto() { + maybeInitBuilder(); + builder.clearCleanedupContainersNotified(); + if (cleanedupContainersNotified == null) + return; + Iterable iterable = new Iterable() { + + @Override + public Iterator iterator() { + return new Iterator() { + + Iterator iter = cleanedupContainersNotified.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public ContainerIdProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + + } + }; + + } + }; + builder.addAllCleanedupContainersNotified(iterable); + } + @Override public List getApplicationsToCleanup() { initApplicationsToCleanup(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index 29cd64e..8918376 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -58,6 +58,7 @@ message NodeHeartbeatResponseProto { repeated ApplicationIdProto applications_to_cleanup = 6; optional int64 nextHeartBeatInterval = 7; optional string diagnostics_message = 8; + repeated ContainerIdProto cleanedup_containers_notified = 9; } message NMContainerStatusProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java index f0b99ec..270c6d1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.nodemanager; import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; public interface NodeStatusUpdater extends Service { @@ -46,8 +47,10 @@ /** * Add a container to the list of containers that have recently completed * @param containerId the ID of the completed container + * @param exitStatus the exit code of the container */ - public void addCompletedContainer(ContainerId containerId); + public void addCompletedContainer(ContainerId containerId, + int exitStatus); /** * Clear the list of recently completed containers diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index b52b0fb..c7b6a6b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -41,6 +41,7 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.VersionUtil; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -107,8 +108,9 @@ // This is used to track the current completed containers when nodeheartBeat // is called. These completed containers will be removed from NM context after // nodeHeartBeat succeeds and the response from the nodeHeartBeat is - // processed. - private final Set previousCompletedContainers; + // processed and RM acks that the AM has received the notification. + // Otherwise its removed after a timeout. + private final Map previousCompletedContainers; private final NodeHealthCheckerService healthChecker; private final NodeManagerMetrics metrics; @@ -125,7 +127,7 @@ public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, this.metrics = metrics; this.recentlyStoppedContainers = new LinkedHashMap(); - this.previousCompletedContainers = new HashSet(); + this.previousCompletedContainers = new LinkedHashMap(); } @Override @@ -364,7 +366,8 @@ private NodeStatus getNodeStatus(int responseId) { // Adding to finished containers cache. Cache will keep it around at // least for #durationToTrackStoppedContainers duration. In the // subsequent call to stop container it will get removed from cache. - addCompletedContainer(container.getContainerId()); + addCompletedContainer(container.getContainerId(), + container.getExitCode()); } } if (LOG.isDebugEnabled()) { @@ -392,7 +395,8 @@ private NodeStatus getNodeStatus(int responseId) { // Adding to finished containers cache. Cache will keep it around at // least for #durationToTrackStoppedContainers duration. In the // subsequent call to stop container it will get removed from cache. - addCompletedContainer(container.getContainerId()); + addCompletedContainer(container.getContainerId(), + container.getExitCode()); } } LOG.info("Sending out " + containerStatuses.size() @@ -401,26 +405,65 @@ private NodeStatus getNodeStatus(int responseId) { } @Override - public void addCompletedContainer(ContainerId containerId) { + public void addCompletedContainer(ContainerId containerId, + int exitStatus) { synchronized (previousCompletedContainers) { - previousCompletedContainers.add(containerId); - } - synchronized (recentlyStoppedContainers) { - removeVeryOldStoppedContainersFromCache(); - recentlyStoppedContainers.put(containerId, - System.currentTimeMillis() + durationToTrackStoppedContainers); + if (!previousCompletedContainers.containsKey(containerId)) { + long expiryTime = System.currentTimeMillis() + + durationToTrackStoppedContainers; + if (exitStatus == ContainerExitStatus + .KILLED_AFTER_APP_COMPLETION) { + expiryTime = 0; + } + + previousCompletedContainers.put(containerId, expiryTime); + } + synchronized (recentlyStoppedContainers) { + removeVeryOldStoppedContainersFromCache(); + if (!recentlyStoppedContainers.containsKey(containerId)) { + recentlyStoppedContainers.put(containerId, + System.currentTimeMillis() + durationToTrackStoppedContainers); + } + } } } - private void removeCompletedContainersFromContext() { + @VisibleForTesting + @Private + public void removeCompletedContainersFromContext(List + containerIds) { synchronized (previousCompletedContainers) { - if (!previousCompletedContainers.isEmpty()) { - for (ContainerId containerId : previousCompletedContainers) { - this.context.getContainers().remove(containerId); + Set removedContainers = new HashSet(); + + // Either if the RM acks the completedContainer can be removed + for (ContainerId cleanedUpContainerId : containerIds) { + if (previousCompletedContainers.containsKey(cleanedUpContainerId)) { + previousCompletedContainers.remove(cleanedUpContainerId); + removedContainers.add(cleanedUpContainerId); + } + } + + // Or if its expires + long currentTime = System.currentTimeMillis(); + Iterator> iterator = + previousCompletedContainers.entrySet().iterator(); + while(iterator.hasNext()) { + Entry entry = iterator.next(); + if (entry.getValue() < currentTime) { + removedContainers.add(entry.getKey()); + iterator.remove(); + } else { + break; } + } + + for (ContainerId containerId : removedContainers) { + this.context.getContainers().remove(containerId); + } + + if (!removedContainers.isEmpty()) { LOG.info("Removed completed containers from NM context: " - + previousCompletedContainers); - previousCompletedContainers.clear(); + + removedContainers); } } } @@ -454,7 +497,7 @@ public boolean isContainerRecentlyStopped(ContainerId containerId) { return recentlyStoppedContainers.containsKey(containerId); } } - + @Override public void clearFinishedContainersFromCache() { synchronized (recentlyStoppedContainers) { @@ -542,7 +585,9 @@ public void run() { // don't want to remove the completed containers before resync // because these completed containers will be reported back to RM // when NM re-registers with RM. - removeCompletedContainersFromContext(); + // Only remove the cleanedup containers that are acked + removeCompletedContainersFromContext(response + .getCleanedupContainersNotified()); lastHeartBeatID = response.getResponseId(); List containersToCleanup = response diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 12166e0..cc06cfa 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -310,7 +310,8 @@ private void recoverContainer(RecoveredContainerState rcs) LOG.warn(containerId + " has no corresponding application!"); } LOG.info("Adding " + containerId + " to recently stopped containers"); - nodeStatusUpdater.addCompletedContainer(containerId); + nodeStatusUpdater.addCompletedContainer(containerId, + ContainerExitStatus.SUCCESS); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java index 56b4fdd..ed21c0b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java @@ -55,4 +55,5 @@ String toString(); + int getExitCode(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index fa54ee1..3d95d7b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -1015,6 +1015,11 @@ public String toString() { } } + @Override + public int getExitCode() { + return exitCode; + } + private boolean hasDefaultExitCode() { return (this.exitCode == ContainerExitStatus.INVALID); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index f2a3a4a..2101252 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -52,12 +52,14 @@ import org.apache.hadoop.service.ServiceOperations; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.RMProxy; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; @@ -180,7 +182,7 @@ public RegisterNodeManagerResponse registerNodeManager( Map> map = new HashMap>(); for (ContainerStatus cs : containers) { - ApplicationId applicationId = + ApplicationId applicationId = cs.getContainerId().getApplicationAttemptId().getApplicationId(); List appContainers = map.get(applicationId); if (appContainers == null) { @@ -205,10 +207,10 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) nodeStatus.setResponseId(heartBeatID++); Map> appToContainers = getAppToContainerStatusMap(nodeStatus.getContainersStatuses()); - + ApplicationId appId1 = ApplicationId.newInstance(0, 1); ApplicationId appId2 = ApplicationId.newInstance(0, 2); - + if (heartBeatID == 1) { Assert.assertEquals(0, nodeStatus.getContainersStatuses().size()); @@ -419,7 +421,7 @@ protected void stopRMProxy() { } private class MyNodeManager extends NodeManager { - + private MyNodeStatusUpdater3 nodeStatusUpdater; @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, @@ -433,7 +435,7 @@ public MyNodeStatusUpdater3 getNodeStatusUpdater() { return this.nodeStatusUpdater; } } - + private class MyNodeManager2 extends NodeManager { public boolean isStopped = false; private NodeStatusUpdater nodeStatusUpdater; @@ -467,7 +469,7 @@ protected void serviceStop() throws Exception { syncBarrier.await(10000, TimeUnit.MILLISECONDS); } } - // + // private class MyResourceTracker2 implements ResourceTracker { public NodeAction heartBeatNodeAction = NodeAction.NORMAL; public NodeAction registerNodeAction = NodeAction.NORMAL; @@ -478,7 +480,7 @@ protected void serviceStop() throws Exception { public RegisterNodeManagerResponse registerNodeManager( RegisterNodeManagerRequest request) throws YarnException, IOException { - + RegisterNodeManagerResponse response = recordFactory .newRecordInstance(RegisterNodeManagerResponse.class); response.setNodeAction(registerNodeAction ); @@ -493,7 +495,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws YarnException, IOException { NodeStatus nodeStatus = request.getNodeStatus(); nodeStatus.setResponseId(heartBeatID++); - + NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils. newNodeHeartbeatResponse(heartBeatID, heartBeatNodeAction, null, null, null, null, 1000L); @@ -501,7 +503,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) return nhResponse; } } - + private class MyResourceTracker3 implements ResourceTracker { public NodeAction heartBeatNodeAction = NodeAction.NORMAL; public NodeAction registerNodeAction = NodeAction.NORMAL; @@ -513,7 +515,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) MyResourceTracker3(Context context) { this.context = context; } - + @Override public RegisterNodeManagerResponse registerNodeManager( RegisterNodeManagerRequest request) throws YarnException, @@ -564,6 +566,14 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) public NodeAction registerNodeAction = NodeAction.NORMAL; public NodeAction heartBeatNodeAction = NodeAction.NORMAL; private Context context; + private final ContainerStatus containerStatus2 = + createContainerStatus(2, ContainerState.RUNNING); + private final ContainerStatus containerStatus3 = + createContainerStatus(3, ContainerState.COMPLETE); + private final ContainerStatus containerStatus4 = + createContainerStatus(4, ContainerState.RUNNING); + private final ContainerStatus containerStatus5 = + createContainerStatus(5, ContainerState.COMPLETE); public MyResourceTracker4(Context context) { this.context = context; @@ -583,6 +593,9 @@ public RegisterNodeManagerResponse registerNodeManager( @Override public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws YarnException, IOException { + List cleanedUpContainersNotified = new ArrayList + (); + System.out.println("received heartBeatID:" + heartBeatID); try { if (heartBeatID == 0) { Assert.assertEquals(request.getNodeStatus().getContainersStatuses() @@ -594,10 +607,6 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) Assert.assertEquals(statuses.size(), 2); Assert.assertEquals(context.getContainers().size(), 2); - ContainerStatus containerStatus2 = - createContainerStatus(2, ContainerState.RUNNING); - ContainerStatus containerStatus3 = - createContainerStatus(3, ContainerState.COMPLETE); boolean container2Exist = false, container3Exist = false; for (ContainerStatus status : statuses) { if (status.getContainerId().equals( @@ -619,23 +628,14 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) // nodeStatusUpdaterRunnable, otherwise nm just shuts down and the // test passes. throw new YarnRuntimeException("Lost the heartbeat response"); - } else if (heartBeatID == 2) { + } else if (heartBeatID == 2 || heartBeatID == 3) { List statuses = request.getNodeStatus().getContainersStatuses(); Assert.assertEquals(statuses.size(), 4); Assert.assertEquals(context.getContainers().size(), 4); - ContainerStatus containerStatus2 = - createContainerStatus(2, ContainerState.RUNNING); - ContainerStatus containerStatus3 = - createContainerStatus(3, ContainerState.COMPLETE); - ContainerStatus containerStatus4 = - createContainerStatus(4, ContainerState.RUNNING); - ContainerStatus containerStatus5 = - createContainerStatus(5, ContainerState.COMPLETE); - - boolean container2Exist = false, container3Exist = false, container4Exist = - false, container5Exist = false; + boolean container2Exist = false, container3Exist = false, + container4Exist = false, container5Exist = false; for (ContainerStatus status : statuses) { if (status.getContainerId().equals( containerStatus2.getContainerId())) { @@ -664,6 +664,15 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) } Assert.assertTrue(container2Exist && container3Exist && container4Exist && container5Exist); + + if (heartBeatID == 3) { + cleanedUpContainersNotified.add(containerStatus3.getContainerId()); + } + } else if (heartBeatID == 4) { + List statuses = + request.getNodeStatus().getContainersStatuses(); + Assert.assertEquals(statuses.size(), 3); + Assert.assertEquals(context.getContainers().size(), 3); } } catch (AssertionError error) { error.printStackTrace(); @@ -676,6 +685,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID, heartBeatNodeAction, null, null, null, null, 1000L); + nhResponse.addCleanedupContainersNotified(cleanedUpContainersNotified); return nhResponse; } } @@ -686,7 +696,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) public RegisterNodeManagerResponse registerNodeManager( RegisterNodeManagerRequest request) throws YarnException, IOException { - + RegisterNodeManagerResponse response = recordFactory .newRecordInstance(RegisterNodeManagerResponse.class); response.setNodeAction(registerNodeAction ); @@ -694,7 +704,7 @@ public RegisterNodeManagerResponse registerNodeManager( response.setNMTokenMasterKey(createMasterKey()); return response; } - + @Override public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws YarnException, IOException { @@ -767,11 +777,11 @@ public void deleteBaseDir() throws IOException { lfs.delete(new Path(basedir.getPath()), true); } - @Test(timeout = 90000) - public void testRecentlyFinishedContainers() throws Exception { - NodeManager nm = new NodeManager(); - YarnConfiguration conf = new YarnConfiguration(); - conf.set( + @Test(timeout = 90000) + public void testRecentlyFinishedContainers() throws Exception { + NodeManager nm = new NodeManager(); + YarnConfiguration conf = new YarnConfiguration(); + conf.set( NodeStatusUpdaterImpl.YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS, "10000"); nm.init(conf); @@ -781,9 +791,8 @@ public void testRecentlyFinishedContainers() throws Exception { ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 0); ContainerId cId = ContainerId.newInstance(appAttemptId, 0); - - - nodeStatusUpdater.addCompletedContainer(cId); + + nodeStatusUpdater.addCompletedContainer(cId, 0); Assert.assertTrue(nodeStatusUpdater.isContainerRecentlyStopped(cId)); long time1 = System.currentTimeMillis(); @@ -793,14 +802,84 @@ public void testRecentlyFinishedContainers() throws Exception { nodeStatusUpdater.removeVeryOldStoppedContainersFromCache(); Thread.sleep(1000); } - long time2 = System.currentTimeMillis(); + long time2 = System.currentTimeMillis(); // By this time the container will be removed from cache. need to verify. - Assert.assertFalse(nodeStatusUpdater.isContainerRecentlyStopped(cId)); - Assert.assertTrue((time2 - time1) >= 10000 && (time2 -time1) <= 250000); - } - + Assert.assertFalse(nodeStatusUpdater.isContainerRecentlyStopped(cId)); + Assert.assertTrue((time2 - time1) >= 10000 && (time2 -time1) <= 250000); + } + + @Test(timeout = 90000) + public void testPreviousCompletedContainers() throws Exception { + NodeManager nm = new NodeManager(); + YarnConfiguration conf = new YarnConfiguration(); + conf.set( + NodeStatusUpdaterImpl.YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS, + "10000"); + nm.init(conf); + NodeStatusUpdaterImpl nodeStatusUpdater = + (NodeStatusUpdaterImpl) nm.getNodeStatusUpdater(); + ApplicationId appId = ApplicationId.newInstance(0, 0); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 0); + + ContainerId cId = ContainerId.newInstance(appAttemptId, 0); + Token containerToken = + BuilderUtils.newContainerToken(cId, "anyHost", 1234, "anyUser", + BuilderUtils.newResource(1024, 1), 0, 123, + "password".getBytes(), 0); + Container containerKilledAfterAppCompletion = new ContainerImpl(conf, null, + null, null, null, null, + BuilderUtils.newContainerTokenIdentifier(containerToken)) { + + @Override + public ContainerState getCurrentState() { + return ContainerState.COMPLETE; + } + + @Override + public int getExitCode() { + return ContainerExitStatus.KILLED_AFTER_APP_COMPLETION; + } + }; + + ContainerId cId2 = ContainerId.newInstance(appAttemptId, 1); + Token containerToken2 = + BuilderUtils.newContainerToken(cId2, "anyHost", 1234, "anyUser", + BuilderUtils.newResource(1024, 1), 0, 123, + "password".getBytes(), 0); + Container anyCompletedContainer = new ContainerImpl(conf, null, + null, null, null, null, + BuilderUtils.newContainerTokenIdentifier(containerToken2)) { + + @Override + public ContainerState getCurrentState() { + return ContainerState.COMPLETE; + } + }; + + nm.getNMContext().getContainers().put(cId, + containerKilledAfterAppCompletion); + nm.getNMContext().getContainers().put(cId2, anyCompletedContainer); + Assert.assertEquals(2, nodeStatusUpdater.getContainerStatuses().size()); + + nodeStatusUpdater.removeCompletedContainersFromContext(new ArrayList + ()); + Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size()); + + long time1 = System.currentTimeMillis(); + int waitInterval = 15; + while (waitInterval-- > 0 + && !nodeStatusUpdater.getContainerStatuses().isEmpty()) { + nodeStatusUpdater.removeCompletedContainersFromContext(new + ArrayList()); + Thread.sleep(1000); + } + long time2 = System.currentTimeMillis(); + // By this time the container will be removed from cache. need to verify. + Assert.assertTrue(nodeStatusUpdater.getContainerStatuses().isEmpty()); + Assert.assertTrue((time2 - time1) >= 10000 && (time2 -time1) <= 250000); + } - @Test public void testNMRegistration() throws InterruptedException { nm = new NodeManager() { @@ -860,7 +939,7 @@ public void run() { nm.stop(); } - + @Test public void testStopReentrant() throws Exception { final AtomicInteger numCleanups = new AtomicInteger(0); @@ -875,7 +954,7 @@ protected NodeStatusUpdater createNodeStatusUpdater(Context context, myNodeStatusUpdater.resourceTracker = myResourceTracker2; return myNodeStatusUpdater; } - + @Override protected ContainerManagerImpl createContainerManager(Context context, ContainerExecutor exec, DeletionService del, @@ -897,7 +976,7 @@ public void cleanUpApplicationsOnNMShutDown() { YarnConfiguration conf = createNMConfig(); nm.init(conf); nm.start(); - + int waitCount = 0; while (heartBeatID < 1 && waitCount++ != 200) { Thread.sleep(500); @@ -906,7 +985,7 @@ public void cleanUpApplicationsOnNMShutDown() { // Meanwhile call stop directly as the shutdown hook would nm.stop(); - + // NM takes a while to reach the STOPPED state. waitCount = 0; while (nm.getServiceState() != STATE.STOPPED && waitCount++ != 20) { @@ -1172,9 +1251,13 @@ protected NMContext createNMContext( nm.start(); int waitCount = 0; - while (heartBeatID <= 3 && waitCount++ != 20) { + while (heartBeatID <= 4 && waitCount++ != 20) { Thread.sleep(500); } + if (heartBeatID <= 4) { + Assert.fail("Failed to get all heartbeats in time, " + + "heartbeatID:" + heartBeatID); + } if(assertionFailedInThread.get()) { Assert.fail("ContainerStatus Backup failed"); } @@ -1182,7 +1265,7 @@ protected NMContext createNMContext( } @Test(timeout = 200000) - public void testNodeStatusUpdaterRetryAndNMShutdown() + public void testNodeStatusUpdaterRetryAndNMShutdown() throws Exception { final long connectionWaitSecs = 1000; final long connectionRetryIntervalMs = 1000; @@ -1300,6 +1383,8 @@ public MyNMContext( Container container5 = getMockContainer(containerStatus5); containers.put(containerStatus5.getContainerId(), container5); return containers; + } else if (heartBeatID == 3 || heartBeatID == 4) { + return containers; } else { containers.clear(); return containers; @@ -1345,7 +1430,7 @@ private void verifyNodeStartFailure(String errMessage) throws Exception { throw e; } } - + // the service should be stopped Assert.assertEquals("NM state is wrong!", STATE.STOPPED, nm .getServiceState()); @@ -1364,7 +1449,7 @@ private YarnConfiguration createNMConfig() { } conf.setInt(YarnConfiguration.NM_PMEM_MB, 5 * 1024); // 5GB conf.set(YarnConfiguration.NM_ADDRESS, localhostAddress + ":12345"); - conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, localhostAddress + ":12346"); + conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, localhostAddress + ":12346"); conf.set(YarnConfiguration.NM_LOG_DIRS, logsDir.getAbsolutePath()); conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogsDir.getAbsolutePath()); @@ -1372,7 +1457,7 @@ private YarnConfiguration createNMConfig() { conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1); return conf; } - + private NodeManager getNodeManager(final NodeAction nodeHeartBeatAction) { return new NodeManager() { @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java index b2ccb61..970a04d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java @@ -118,6 +118,11 @@ public String toString() { } @Override + public int getExitCode() { + return 0; + } + + @Override public void handle(ContainerEvent event) { } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java index a7006e0..3e75e48 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java @@ -216,7 +216,13 @@ public boolean isPmemCheckEnabled() { @Override public ContainerState getContainerState() { return ContainerState.RUNNING; - }; + } + + @Override + public int getExitCode() { + return 0; + } + }; nmContext.getContainers().put(containerId, container); //TODO: Gross hack. Fix in code. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index b532dd5..5019f34 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -199,7 +199,7 @@ protected void serviceStop() throws Exception { */ @SuppressWarnings("unchecked") @VisibleForTesting - void handleNMContainerStatus(NMContainerStatus containerStatus) { + void handleNMContainerStatus(NMContainerStatus containerStatus, NodeId nodeId) { ApplicationAttemptId appAttemptId = containerStatus.getContainerId().getApplicationAttemptId(); RMApp rmApp = @@ -230,7 +230,8 @@ void handleNMContainerStatus(NMContainerStatus containerStatus) { containerStatus.getContainerExitStatus()); // sending master container finished event. RMAppAttemptContainerFinishedEvent evt = - new RMAppAttemptContainerFinishedEvent(appAttemptId, status); + new RMAppAttemptContainerFinishedEvent(appAttemptId, status, + nodeId); rmContext.getDispatcher().getEventHandler().handle(evt); } } @@ -326,7 +327,7 @@ public RegisterNodeManagerResponse registerNodeManager( LOG.info("received container statuses on node manager register :" + request.getNMContainerStatuses()); for (NMContainerStatus status : request.getNMContainerStatuses()) { - handleNMContainerStatus(status); + handleNMContainerStatus(status, nodeId); } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java index b5ed92c..c211427 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt; import java.util.List; +import java.util.Map; import javax.crypto.SecretKey; @@ -31,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; @@ -120,12 +122,12 @@ List pullJustFinishedContainers(); /** - * Return the list of last set of finished containers. This does not reset the - * finished containers. + * Return the map of last set of finished containers to the corresponding + * node. This does not reset the finished containers. * @return the list of just finished contianers, this does not reset the * finished containers. */ - List getJustFinishedContainers(); + Map getJustFinishedContainers(); /** * The container on which the Application Master is running. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 19fc800..48d96f5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -26,7 +26,9 @@ import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; @@ -53,6 +55,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; @@ -84,6 +87,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanedupContainerNotifiedEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; @@ -130,9 +134,14 @@ private final ApplicationSubmissionContext submissionContext; private Token amrmToken = null; private SecretKey clientTokenMasterKey = null; - - private List justFinishedContainers = - new ArrayList(); + + private Map justFinishedContainers = + new HashMap(); + // Tracks the previous finished containers that are waiting to be + // verified as received by the AM. If the AM sends the next allocate + // request it implicitly acks this list. + private Map previousJustFinishedContainers = + new HashMap(); private Container masterContainer; private float progress = 0; @@ -627,7 +636,7 @@ public float getProgress() { } @Override - public List getJustFinishedContainers() { + public Map getJustFinishedContainers() { this.readLock.lock(); try { return this.justFinishedContainers; @@ -643,7 +652,16 @@ public float getProgress() { try { List returnList = new ArrayList( this.justFinishedContainers.size()); - returnList.addAll(this.justFinishedContainers); + returnList.addAll(this.justFinishedContainers.keySet()); + for (Map.Entry finishedContainerStatus: this + .previousJustFinishedContainers.entrySet()) { + // Implicitly acks the previous list as being received by the AM + eventHandler.handle(new RMNodeCleanedupContainerNotifiedEvent( + finishedContainerStatus.getValue(), finishedContainerStatus + .getKey().getContainerId())); + } + previousJustFinishedContainers.clear(); + previousJustFinishedContainers.putAll(this.justFinishedContainers); this.justFinishedContainers.clear(); return returnList; } finally { @@ -1501,7 +1519,8 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, } // Normal container.Put it in completed containers list - appAttempt.justFinishedContainers.add(containerStatus); + appAttempt.justFinishedContainers.put(containerStatus, + containerFinishedEvent.getNodeId()); return this.currentState; } } @@ -1517,7 +1536,8 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, ContainerStatus containerStatus = containerFinishedEvent.getContainerStatus(); // Normal container. Add it in completed containers list - appAttempt.justFinishedContainers.add(containerStatus); + appAttempt.justFinishedContainers.put(containerStatus, + containerFinishedEvent.getNodeId()); } } @@ -1558,7 +1578,8 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, return RMAppAttemptState.FINISHED; } // Normal container. - appAttempt.justFinishedContainers.add(containerStatus); + appAttempt.justFinishedContainers.put(containerStatus, + containerFinishedEvent.getNodeId()); return RMAppAttemptState.FINISHING; } } @@ -1593,7 +1614,8 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, return; } // Normal container. - appAttempt.justFinishedContainers.add(containerStatus); + appAttempt.justFinishedContainers.put(containerStatus, + containerFinishedEvent.getNodeId()); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerFinishedEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerFinishedEvent.java index 3660597..b1feae2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerFinishedEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerFinishedEvent.java @@ -20,21 +20,25 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; public class RMAppAttemptContainerFinishedEvent extends RMAppAttemptEvent { private final ContainerStatus containerStatus; + private final NodeId nodeId; public RMAppAttemptContainerFinishedEvent(ApplicationAttemptId appAttemptId, - ContainerStatus containerStatus) { + ContainerStatus containerStatus, NodeId nodeId) { super(appAttemptId, RMAppAttemptEventType.CONTAINER_FINISHED); this.containerStatus = containerStatus; + this.nodeId = nodeId; } public ContainerStatus getContainerStatus() { return this.containerStatus; } + public NodeId getNodeId() { return this.nodeId; } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index eef361f..0c867be 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -76,13 +76,13 @@ RMContainerEventType.RECOVER, new ContainerRecoveredTransition()) // Transitions from RESERVED state - .addTransition(RMContainerState.RESERVED, RMContainerState.RESERVED, + .addTransition(RMContainerState.RESERVED, RMContainerState.RESERVED, RMContainerEventType.RESERVED, new ContainerReservedTransition()) - .addTransition(RMContainerState.RESERVED, RMContainerState.ALLOCATED, + .addTransition(RMContainerState.RESERVED, RMContainerState.ALLOCATED, RMContainerEventType.START, new ContainerStartedTransition()) - .addTransition(RMContainerState.RESERVED, RMContainerState.KILLED, + .addTransition(RMContainerState.RESERVED, RMContainerState.KILLED, RMContainerEventType.KILL) // nothing to do - .addTransition(RMContainerState.RESERVED, RMContainerState.RELEASED, + .addTransition(RMContainerState.RESERVED, RMContainerState.RELEASED, RMContainerEventType.RELEASED) // nothing to do @@ -98,7 +98,7 @@ .addTransition(RMContainerState.ACQUIRED, RMContainerState.RUNNING, RMContainerEventType.LAUNCHED, new LaunchedTransition()) .addTransition(RMContainerState.ACQUIRED, RMContainerState.COMPLETED, - RMContainerEventType.FINISHED, new ContainerFinishedAtAcquiredState()) + RMContainerEventType.FINISHED, new ContainerFinishedAtAcquiredState()) .addTransition(RMContainerState.ACQUIRED, RMContainerState.RELEASED, RMContainerEventType.RELEASED, new KillTransition()) .addTransition(RMContainerState.ACQUIRED, RMContainerState.EXPIRED, @@ -491,7 +491,8 @@ public void transition(RMContainerImpl container, RMContainerEvent event) { updateMetricsIfPreempted(container); container.eventHandler.handle(new RMAppAttemptContainerFinishedEvent( - container.appAttemptId, finishedEvent.getRemoteContainerStatus())); + container.appAttemptId, finishedEvent.getRemoteContainerStatus(), + container.getAllocatedNode())); container.rmContext.getRMApplicationHistoryWriter().containerFinished( container); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeCleanedupContainerNotifiedEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeCleanedupContainerNotifiedEvent.java new file mode 100644 index 0000000..ab8cdbd --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeCleanedupContainerNotifiedEvent.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmnode; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; + +// Happens after an implicit ack from AM that the container completion has +// been notified successfully to the AM +public class RMNodeCleanedupContainerNotifiedEvent extends RMNodeEvent { + + private ContainerId contId; + + public RMNodeCleanedupContainerNotifiedEvent(NodeId nodeId, + ContainerId contId) { + super(nodeId, RMNodeEventType.CLEANEDUP_CONTAINER_NOTIFIED); + this.contId = contId; + } + + public ContainerId getContainerId() { + return this.contId; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java index ef644be..69a5d39 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java @@ -36,6 +36,7 @@ // Source: Container CONTAINER_ALLOCATED, CLEANUP_CONTAINER, + CLEANEDUP_CONTAINER_NOTIFIED, // Source: NMLivelinessMonitor EXPIRE diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 9ead898..4ff2e38 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -111,6 +111,10 @@ private final Set containersToClean = new TreeSet( new ContainerIdComparator()); + /* set of containers that were notified to AM about their completion */ + private final Set cleanupContainersNotified = + new HashSet(); + /* the list of applications that have finished and need to be purged */ private final List finishedApplications = new ArrayList(); @@ -131,7 +135,7 @@ RMNodeEventType.STARTED, new AddNodeTransition()) //Transitions from RUNNING state - .addTransition(NodeState.RUNNING, + .addTransition(NodeState.RUNNING, EnumSet.of(NodeState.RUNNING, NodeState.UNHEALTHY), RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenHealthyTransition()) .addTransition(NodeState.RUNNING, NodeState.DECOMMISSIONED, @@ -148,10 +152,13 @@ .addTransition(NodeState.RUNNING, NodeState.RUNNING, RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition()) .addTransition(NodeState.RUNNING, NodeState.RUNNING, + RMNodeEventType.CLEANEDUP_CONTAINER_NOTIFIED, + new CleanedupContainerNotifiedTransition()) + .addTransition(NodeState.RUNNING, NodeState.RUNNING, RMNodeEventType.RECONNECTED, new ReconnectNodeTransition()) //Transitions from UNHEALTHY state - .addTransition(NodeState.UNHEALTHY, + .addTransition(NodeState.UNHEALTHY, EnumSet.of(NodeState.UNHEALTHY, NodeState.RUNNING), RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenUnHealthyTransition()) .addTransition(NodeState.UNHEALTHY, NodeState.DECOMMISSIONED, @@ -169,7 +176,10 @@ RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition()) .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY, RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition()) - + .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY, + RMNodeEventType.CLEANEDUP_CONTAINER_NOTIFIED, + new CleanedupContainerNotifiedTransition()) + // create the topology tables .installTopology(); @@ -352,8 +362,11 @@ public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response response.addAllContainersToCleanup( new ArrayList(this.containersToClean)); response.addAllApplicationsToCleanup(this.finishedApplications); + response.addCleanedupContainersNotified( + new ArrayList(this.cleanupContainersNotified)); this.containersToClean.clear(); this.finishedApplications.clear(); + this.cleanupContainersNotified.clear(); } finally { this.writeLock.unlock(); } @@ -563,6 +576,16 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { } } + public static class CleanedupContainerNotifiedTransition implements + SingleArcTransition { + + @Override + public void transition(RMNodeImpl rmNode, RMNodeEvent event) { + rmNode.cleanupContainersNotified.add((( + RMNodeCleanedupContainerNotifiedEvent) event).getContainerId()); + } + } + public static class DeactivateNodeTransition implements SingleArcTransition { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index a7f6240..db3f9e7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -23,6 +23,7 @@ import java.security.PrivilegedAction; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DataOutputBuffer; @@ -157,7 +158,8 @@ public void waitForContainerAllocated(MockNM nm, ContainerId containerId) public void waitForContainerToComplete(RMAppAttempt attempt, NMContainerStatus completedContainer) throws InterruptedException { while (true) { - List containers = attempt.getJustFinishedContainers(); + Set containers = attempt.getJustFinishedContainers() + .keySet(); System.out.println("Received completed containers " + containers); for (ContainerStatus container : containers) { if (container.getContainerId().equals( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 4827620..78b69cc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -490,7 +490,7 @@ public void testHandleContainerStatusInvalidCompletions() throws Exception { ApplicationAttemptId.newInstance(app.getApplicationId(), 2), 1), ContainerState.COMPLETE, Resource.newInstance(1024, 1), "Dummy Completed", 0, Priority.newInstance(10), 1234); - rm.getResourceTrackerService().handleNMContainerStatus(report); + rm.getResourceTrackerService().handleNMContainerStatus(report, null); verify(handler, never()).handle((Event) any()); // Case 1.2: Master container is null @@ -501,7 +501,7 @@ public void testHandleContainerStatusInvalidCompletions() throws Exception { ContainerId.newInstance(currentAttempt.getAppAttemptId(), 0), ContainerState.COMPLETE, Resource.newInstance(1024, 1), "Dummy Completed", 0, Priority.newInstance(10), 1234); - rm.getResourceTrackerService().handleNMContainerStatus(report); + rm.getResourceTrackerService().handleNMContainerStatus(report, null); verify(handler, never()).handle((Event)any()); // Case 2: Managed AM @@ -514,7 +514,7 @@ public void testHandleContainerStatusInvalidCompletions() throws Exception { ContainerState.COMPLETE, Resource.newInstance(1024, 1), "Dummy Completed", 0, Priority.newInstance(10), 1234); try { - rm.getResourceTrackerService().handleNMContainerStatus(report); + rm.getResourceTrackerService().handleNMContainerStatus(report, null); } catch (Exception e) { // expected - ignore } @@ -529,7 +529,7 @@ public void testHandleContainerStatusInvalidCompletions() throws Exception { ContainerState.COMPLETE, Resource.newInstance(1024, 1), "Dummy Completed", 0, Priority.newInstance(10), 1234); try { - rm.getResourceTrackerService().handleNMContainerStatus(report); + rm.getResourceTrackerService().handleNMContainerStatus(report, null); } catch (Exception e) { // expected - ignore } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index 6c5c818..20cf46a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -197,7 +197,8 @@ public void testAMRestartWithExistingContainers() throws Exception { waitForContainersToFinish(4, newAttempt); boolean container3Exists = false, container4Exists = false, container5Exists = false, container6Exists = false; - for(ContainerStatus status : newAttempt.getJustFinishedContainers()) { + for(ContainerStatus status : newAttempt.getJustFinishedContainers() + .keySet()) { if(status.getContainerId().equals(containerId3)) { // containerId3 is the container ran by previous attempt but finished by the // new attempt. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java index efcecd9..1791376 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -678,7 +678,8 @@ private void testUnmanagedAMSuccess(String url) { application.handle(new RMAppRunningOnNodeEvent(application.getApplicationId(), container.getNodeId())); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( - applicationAttempt.getAppAttemptId(), mock(ContainerStatus.class))); + applicationAttempt.getAppAttemptId(), mock(ContainerStatus.class), + container.getNodeId())); // complete AM String diagnostics = "Successful"; FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED; @@ -792,7 +793,7 @@ public void testAMCrashAtScheduled() { // send CONTAINER_FINISHED event at SCHEDULED state, // The state should be FINAL_SAVING with previous state SCHEDULED applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( - applicationAttempt.getAppAttemptId(), cs)); + applicationAttempt.getAppAttemptId(), cs, null)); // createApplicationAttemptState will return previous state (SCHEDULED), // if the current state is FINAL_SAVING. assertEquals(YarnApplicationAttemptState.SCHEDULED, @@ -839,7 +840,7 @@ public void testAMCrashAtAllocated() { BuilderUtils.newContainerStatus(amContainer.getId(), ContainerState.COMPLETE, containerDiagMsg, exitCode); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( - applicationAttempt.getAppAttemptId(), cs)); + applicationAttempt.getAppAttemptId(), cs, null)); assertEquals(YarnApplicationAttemptState.ALLOCATED, applicationAttempt.createApplicationAttemptState()); sendAttemptUpdateSavedEvent(applicationAttempt); @@ -863,7 +864,7 @@ public void testRunningToFailed() { ContainerState.COMPLETE, containerDiagMsg, exitCode); ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId(); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( - appAttemptId, cs)); + appAttemptId, cs, null)); // ignored ContainerFinished and Expire at FinalSaving if we were supposed // to Failed state. @@ -871,7 +872,7 @@ public void testRunningToFailed() { applicationAttempt.getAppAttemptState()); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus( - amContainer.getId(), ContainerState.COMPLETE, "", 0))); + amContainer.getId(), ContainerState.COMPLETE, "", 0), null)); applicationAttempt.handle(new RMAppAttemptEvent( applicationAttempt.getAppAttemptId(), RMAppAttemptEventType.EXPIRE)); assertEquals(RMAppAttemptState.FINAL_SAVING, @@ -908,7 +909,7 @@ public void testRunningToKilled() { applicationAttempt.getAppAttemptState()); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus( - amContainer.getId(), ContainerState.COMPLETE, "", 0))); + amContainer.getId(), ContainerState.COMPLETE, "", 0), null)); applicationAttempt.handle(new RMAppAttemptEvent( applicationAttempt.getAppAttemptId(), RMAppAttemptEventType.EXPIRE)); assertEquals(RMAppAttemptState.FINAL_SAVING, @@ -1084,7 +1085,7 @@ public void testFinishingToFinishing() { BuilderUtils.newContainerStatus( BuilderUtils.newContainerId( applicationAttempt.getAppAttemptId(), 42), - ContainerState.COMPLETE, "", 0))); + ContainerState.COMPLETE, "", 0), null)); testAppAttemptFinishingState(amContainer, finalStatus, trackingUrl, diagnostics); } @@ -1103,7 +1104,7 @@ public void testSuccessfulFinishingToFinished() { new RMAppAttemptContainerFinishedEvent( applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus(amContainer.getId(), - ContainerState.COMPLETE, "", 0))); + ContainerState.COMPLETE, "", 0), null)); testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl, diagnostics, 0, false); } @@ -1131,7 +1132,7 @@ public void testSuccessfulFinishingToFinished() { // Container_finished event comes before Attempt_Saved event. applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus( - amContainer.getId(), ContainerState.COMPLETE, "", 0))); + amContainer.getId(), ContainerState.COMPLETE, "", 0), null)); assertEquals(RMAppAttemptState.FINAL_SAVING, applicationAttempt.getAppAttemptState()); // send attempt_saved @@ -1216,7 +1217,7 @@ public void testFailedToFailed() { ContainerState.COMPLETE, "some error", 123); ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId(); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( - appAttemptId, cs1)); + appAttemptId, cs1, null)); assertEquals(YarnApplicationAttemptState.RUNNING, applicationAttempt.createApplicationAttemptState()); sendAttemptUpdateSavedEvent(applicationAttempt); @@ -1232,10 +1233,10 @@ public void testFailedToFailed() { ContainerStatus.newInstance(ContainerId.newInstance(appAttemptId, 2), ContainerState.COMPLETE, "", 0); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( - appAttemptId, cs2)); + appAttemptId, cs2, null)); assertEquals(1, applicationAttempt.getJustFinishedContainers().size()); assertEquals(cs2.getContainerId(), applicationAttempt - .getJustFinishedContainers().get(0).getContainerId()); + .getJustFinishedContainers().keySet().iterator().next().getContainerId()); } @@ -1257,7 +1258,7 @@ public void testContainersCleanupForLastAttempt() { ContainerState.COMPLETE, "some error", 123); ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId(); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( - appAttemptId, cs1)); + appAttemptId, cs1, null)); assertEquals(YarnApplicationAttemptState.RUNNING, applicationAttempt.createApplicationAttemptState()); sendAttemptUpdateSavedEvent(applicationAttempt); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java index b3dc35f..a0d5d84 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java @@ -161,7 +161,7 @@ public void testTokenExpiry() throws Exception { .getEventHandler() .handle( new RMAppAttemptContainerFinishedEvent(applicationAttemptId, - containerStatus)); + containerStatus, nm1.getNodeId())); // Make sure the RMAppAttempt is at Finished State. // Both AMRMToken and ClientToAMToken have been removed.