diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java index ee6eb7b..440779c 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java @@ -62,7 +62,8 @@ public static NodeId newNodeID(String host, int port) { private NodeState state; private List toCleanUpContainers; private List toCleanUpApplications; - + private List runningApplications; + public FakeRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress, Resource perNode, String rackName, String healthReport, int cmdPort, String hostName, NodeState state) { @@ -77,6 +78,7 @@ public FakeRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress, this.state = state; toCleanUpApplications = new ArrayList(); toCleanUpContainers = new ArrayList(); + runningApplications = new ArrayList(); } public NodeId getNodeID() { @@ -135,6 +137,10 @@ public NodeState getState() { return toCleanUpApplications; } + public List getRunningApps() { + return runningApplications; + } + public void updateNodeHeartbeatResponseForCleanup( NodeHeartbeatResponse response) { } diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java index b64be1b..a6633ae 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java @@ -119,6 +119,11 @@ public NodeState getState() { } @Override + public List getRunningApps() { + return node.getRunningApps(); + } + + @Override public void updateNodeHeartbeatResponseForCleanup( NodeHeartbeatResponse nodeHeartbeatResponse) { node.updateNodeHeartbeatResponseForCleanup(nodeHeartbeatResponse); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index 95eeaf6..0386be6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -119,6 +119,8 @@ public List getAppsToCleanup(); + List getRunningApps(); + /** * Update a {@link NodeHeartbeatResponse} with the list of containers and * applications to clean up for this node. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index d1e6190..9bc91c7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -123,11 +123,16 @@ new HashSet(); /* the list of applications that have finished and need to be purged */ - private final List finishedApplications = new ArrayList(); + private final List finishedApplications = + new ArrayList(); + + /* the list of applications that are running on this node */ + private final List runningApplications = + new ArrayList(); private NodeHeartbeatResponse latestNodeHeartBeatResponse = recordFactory .newRecordInstance(NodeHeartbeatResponse.class); - + private static final StateMachineFactory(NodeState.NEW) - + //Transitions from NEW state .addTransition(NodeState.NEW, NodeState.RUNNING, RMNodeEventType.STARTED, new AddNodeTransition()) @@ -383,6 +388,16 @@ public NodeState getState() { } @Override + public List getRunningApps() { + this.readLock.lock(); + try { + return new ArrayList(this.runningApplications); + } finally { + this.readLock.unlock(); + } + } + + @Override public List getContainersToCleanUp() { this.readLock.lock(); @@ -519,9 +534,12 @@ private static void handleRunningAppOnNode(RMNodeImpl rmNode, LOG.warn("Cannot get RMApp by appId=" + appId + ", just added it to finishedApplications list for cleanup"); rmNode.finishedApplications.add(appId); + rmNode.runningApplications.remove(appId); return; } + // Add running applications back due to Node add or Node reconnection. + rmNode.runningApplications.add(appId); context.getDispatcher().getEventHandler() .handle(new RMAppRunningOnNodeEvent(appId, nodeId)); } @@ -707,8 +725,9 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { @Override public void transition(RMNodeImpl rmNode, RMNodeEvent event) { - rmNode.finishedApplications.add((( - RMNodeCleanAppEvent) event).getAppId()); + ApplicationId appId = ((RMNodeCleanAppEvent) event).getAppId(); + rmNode.finishedApplications.add(appId); + rmNode.runningApplications.remove(appId); } } @@ -910,12 +929,22 @@ private void handleContainerStatus(List containerStatuses) { + "cleanup, no further processing"); continue; } - if (finishedApplications.contains(containerId.getApplicationAttemptId() - .getApplicationId())) { + + ApplicationId containerAppId = + containerId.getApplicationAttemptId().getApplicationId(); + + if (finishedApplications.contains(containerAppId)) { LOG.info("Container " + containerId + " belongs to an application that is already killed," + " no further processing"); continue; + } else if (!runningApplications.contains(containerAppId)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Container " + containerId + + " is the first container get launched for application " + + containerAppId); + } + runningApplications.add(containerAppId); } // Process running containers diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index 2d863d1..095fe28 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -187,6 +187,11 @@ public NodeState getState() { } @Override + public List getRunningApps() { + return null; + } + + @Override public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response) { } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index 01f4357..ece896b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -33,6 +33,7 @@ import org.apache.hadoop.util.HostsFileReader; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; @@ -485,9 +486,9 @@ public void testUpdateHeartbeatResponseForCleanup() { NodeId nodeId = node.getNodeID(); // Expire a container - ContainerId completedContainerId = BuilderUtils.newContainerId( - BuilderUtils.newApplicationAttemptId( - BuilderUtils.newApplicationId(0, 0), 0), 0); + ContainerId completedContainerId = BuilderUtils.newContainerId( + BuilderUtils.newApplicationAttemptId( + BuilderUtils.newApplicationId(0, 0), 0), 0); node.handle(new RMNodeCleanContainerEvent(nodeId, completedContainerId)); Assert.assertEquals(1, node.getContainersToCleanUp().size()); @@ -512,6 +513,35 @@ public void testUpdateHeartbeatResponseForCleanup() { Assert.assertEquals(finishedAppId, hbrsp.getApplicationsToCleanup().get(0)); } + @Test(timeout=20000) + public void testUpdateHeartbeatResponseForAppLifeCycle() { + RMNodeImpl node = getRunningNode(); + NodeId nodeId = node.getNodeID(); + + ApplicationId runningAppId = BuilderUtils.newApplicationId(0, 1); + // Create a running container + ContainerId runningContainerId = BuilderUtils.newContainerId( + BuilderUtils.newApplicationAttemptId( + runningAppId, 0), 0); + + ContainerStatus status = ContainerStatus.newInstance(runningContainerId, + ContainerState.RUNNING, "", 0); + List statusList = new ArrayList(); + statusList.add(status); + NodeHealthStatus nodeHealth = NodeHealthStatus.newInstance(true, + "", System.currentTimeMillis()); + node.handle(new RMNodeStatusEvent(nodeId, nodeHealth, + statusList, null, null)); + + Assert.assertEquals(1, node.getRunningApps().size()); + + // Finish an application + ApplicationId finishedAppId = runningAppId; + node.handle(new RMNodeCleanAppEvent(nodeId, finishedAppId)); + Assert.assertEquals(1, node.getAppsToCleanup().size()); + Assert.assertEquals(0, node.getRunningApps().size()); + } + private RMNodeImpl getRunningNode() { return getRunningNode(null, 0); }