From 6449b943b00d7ba827d41cb5c5879977cdf997ef Mon Sep 17 00:00:00 2001 From: ananyo Date: Tue, 2 Mar 2021 19:37:55 +0530 Subject: [PATCH] YARN-10663: Add runningApps stats in SLS --- .../yarn/sls/appmaster/AMSimulator.java | 14 +++ .../yarn/sls/appmaster/DAGAMSimulator.java | 3 +- .../yarn/sls/appmaster/MRAMSimulator.java | 6 +- .../yarn/sls/appmaster/StreamAMSimulator.java | 3 +- .../yarn/sls/nodemanager/NMSimulator.java | 21 ++++- .../sls/resourcemanager/MockAMLauncher.java | 3 +- .../yarn/sls/nodemanager/TestNMSimulator.java | 94 ++++++++++++++++++- 7 files changed, 136 insertions(+), 8 deletions(-) diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java index 1330e4d2f2b..031cecf1a34 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java @@ -25,7 +25,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.LinkedBlockingQueue; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -50,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; @@ -118,6 +121,8 @@ private Map appIdToAMSim; + private Set ranNodes = new ConcurrentSkipListSet(); + public AMSimulator() { this.responseQueue = new LinkedBlockingQueue<>(); } @@ -236,6 +241,11 @@ public void lastStep() throws Exception { LOG.info("AM container is null"); } + // Clear runningApps for ranNodes of this app + for(NodeId nodeId : ranNodes) { + se.getNmMap().get(nodeId).finishApplication(getApplicationId()); + } + if (null == appAttemptId) { // If appAttemptId == null, AM is not launched from RM's perspective, so // it's unnecessary to finish am as well @@ -497,4 +507,8 @@ public ApplicationId getApplicationId() { public ApplicationAttemptId getApplicationAttemptId() { return appAttemptId; } + + public Set getRanNodes() { + return this.ranNodes; + } } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/DAGAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/DAGAMSimulator.java index f886a69e024..c67544ee381 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/DAGAMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/DAGAMSimulator.java @@ -189,7 +189,8 @@ protected void processResponseQueue() throws Exception { appId, container.getId()); assignedContainers.put(container.getId(), cs); se.getNmMap().get(container.getNodeId()) - .addNewContainer(container, cs.getLifeTime()); + .addNewContainer(container, cs.getLifeTime(), appId); + getRanNodes().add(container.getNodeId()); } } } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java index 586c671afed..5e6e6d6524a 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java @@ -231,14 +231,16 @@ protected void processResponseQueue() throws Exception { appId, container.getId()); assignedMaps.put(container.getId(), cs); se.getNmMap().get(container.getNodeId()) - .addNewContainer(container, cs.getLifeTime()); + .addNewContainer(container, cs.getLifeTime(), appId); + getRanNodes().add(container.getNodeId()); } else if (! this.scheduledReduces.isEmpty()) { ContainerSimulator cs = scheduledReduces.remove(); LOG.debug("Application {} starts to launch a reducer ({}).", appId, container.getId()); assignedReduces.put(container.getId(), cs); se.getNmMap().get(container.getNodeId()) - .addNewContainer(container, cs.getLifeTime()); + .addNewContainer(container, cs.getLifeTime(), appId); + getRanNodes().add(container.getNodeId()); } } } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java index 46bc90a337c..7e3545191f2 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java @@ -172,7 +172,8 @@ protected void processResponseQueue() throws Exception { container.getId()); assignedStreams.put(container.getId(), cs); se.getNmMap().get(container.getNodeId()).addNewContainer(container, - cs.getLifeTime()); + cs.getLifeTime(), appId); + getRanNodes().add(container.getNodeId()); } } } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java index 2ec39762b8b..a3473800556 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java @@ -250,7 +250,8 @@ public RMNode getNode() { /** * launch a new container with the given life time */ - public void addNewContainer(Container container, long lifeTimeMS) { + public void addNewContainer(Container container, long lifeTimeMS, + ApplicationId applicationId) { LOG.debug("NodeManager {} launches a new container ({}).", node.getNodeID(), container.getId()); if (lifeTimeMS != -1) { @@ -267,6 +268,15 @@ public void addNewContainer(Container container, long lifeTimeMS) { amContainerList.add(container.getId()); } } + + // update runningApplications on the node + if(applicationId != null + && !getNode().getRunningApps().contains(applicationId)) { + getNode().getRunningApps().add(applicationId); + } + LOG.debug("Adding running app: {} on node: {}. " + + "Updated runningApps on this node are: {}", + applicationId, getNode().getNodeID(), getNode().getRunningApps()); } /** @@ -296,4 +306,13 @@ public void cleanupContainer(ContainerId containerId) { List getCompletedContainers() { return completedContainerList; } + + public void finishApplication(ApplicationId applicationId) { + if(getNode().getRunningApps().contains(applicationId)) { + getNode().getRunningApps().remove(applicationId); + LOG.debug("Removed running app: {} from node: {}. " + + "Updated runningApps on this node are: {}", + applicationId, getNode().getNodeID(), getNode().getRunningApps()); + } + } } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java index 37bf96afa05..6090f8e92a1 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java @@ -104,7 +104,8 @@ public void handle(AMLauncherEvent event) { LOG.info("Notify AM launcher launched:" + amContainer.getId()); se.getNmMap().get(amContainer.getNodeId()) - .addNewContainer(amContainer, -1); + .addNewContainer(amContainer, -1, appId); + ams.getRanNodes().add(amContainer.getNodeId()); return; } catch (Exception e) { throw new YarnRuntimeException(e); diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/nodemanager/TestNMSimulator.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/nodemanager/TestNMSimulator.java index 003417e6d21..a29043b8faa 100644 --- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/nodemanager/TestNMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/nodemanager/TestNMSimulator.java @@ -19,6 +19,8 @@ import java.util.function.Supplier; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -109,7 +111,7 @@ public void testNMSimulator() throws Exception { ContainerId cId1 = newContainerId(1, 1, 1); Container container1 = Container.newInstance(cId1, null, null, Resources.createResource(GB, 1), null, null); - node1.addNewContainer(container1, 100000l); + node1.addNewContainer(container1, 100000l, null); Assert.assertTrue("Node1 should have one running container.", node1.getRunningContainers().containsKey(cId1)); @@ -117,7 +119,7 @@ public void testNMSimulator() throws Exception { ContainerId cId2 = newContainerId(2, 1, 1); Container container2 = Container.newInstance(cId2, null, null, Resources.createResource(GB, 1), null, null); - node1.addNewContainer(container2, -1l); + node1.addNewContainer(container2, -1l, null); Assert.assertTrue("Node1 should have one running AM container", node1.getAMContainers().contains(cId2)); @@ -137,6 +139,94 @@ private ContainerId newContainerId(int appId, int appAttemptId, int cId) { appAttemptId), cId); } + @Test + public void testNMSimAppAddedAndRemoved() throws Exception { + // Register one node + NMSimulator node = new NMSimulator(); + node.init("/rack1/node1", Resources.createResource(GB * 10, 10), 0, 1000, + rm, -1f); + node.middleStep(); + + int numClusterNodes = rm.getResourceScheduler().getNumClusterNodes(); + int cumulativeSleepTime = 0; + int sleepInterval = 100; + + while(numClusterNodes != 1 && cumulativeSleepTime < 5000) { + Thread.sleep(sleepInterval); + cumulativeSleepTime = cumulativeSleepTime + sleepInterval; + numClusterNodes = rm.getResourceScheduler().getNumClusterNodes(); + } + + GenericTestUtils.waitFor(new com.google.common.base.Supplier() { + @Override public Boolean get() { + return rm.getResourceScheduler().getRootQueueMetrics() + .getAvailableMB() > 0; + } + }, 500, 10000); + + Assert.assertEquals("Node should have no runningApps.", + node.getNode().getRunningApps().size(), 0); + + // Allocate one app container on node + ApplicationId appId = BuilderUtils.newApplicationId(1, 1); + ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(appId, 1); + ContainerId cId = BuilderUtils.newContainerId(appAttemptId, 1); + Container container = Container.newInstance(cId, null, null, + Resources.createResource(GB, 1), null, null); + node.addNewContainer(container, 100000l, appId); + Assert.assertTrue("Node should have app: " + + appId + " in runningApps list.", + node.getNode().getRunningApps().contains(appId)); + + // Finish the app on the node. + node.finishApplication(appId); + Assert.assertEquals("Node should have no runningApps.", + node.getNode().getRunningApps().size(), 0); + } + + @Test + public void testNMSimNullAppAddedAndRemoved() throws Exception { + // Register one node + NMSimulator node = new NMSimulator(); + node.init("/rack1/node1", Resources.createResource(GB * 10, 10), 0, 1000, + rm, -1f); + node.middleStep(); + + int numClusterNodes = rm.getResourceScheduler().getNumClusterNodes(); + int cumulativeSleepTime = 0; + int sleepInterval = 100; + + while(numClusterNodes != 1 && cumulativeSleepTime < 5000) { + Thread.sleep(sleepInterval); + cumulativeSleepTime = cumulativeSleepTime + sleepInterval; + numClusterNodes = rm.getResourceScheduler().getNumClusterNodes(); + } + + GenericTestUtils.waitFor(new com.google.common.base.Supplier() { + @Override public Boolean get() { + return rm.getResourceScheduler().getRootQueueMetrics() + .getAvailableMB() > 0; + } + }, 500, 10000); + + Assert.assertEquals("Node should have no runningApps.", + node.getNode().getRunningApps().size(), 0); + + // Allocate null app container on node + ContainerId cId = newContainerId(1, 1, 1); + Container container = Container.newInstance(cId, null, null, + Resources.createResource(GB, 1), null, null); + node.addNewContainer(container, 100000l, null); + Assert.assertEquals("Node should have no runningApps if appId is null.", + node.getNode().getRunningApps().size(), 0); + + // Finish non-existent app on the node. + ApplicationId appId = BuilderUtils.newApplicationId(1, 1); + node.finishApplication(appId); + Assert.assertEquals("Node should have no runningApps.", + node.getNode().getRunningApps().size(), 0); + } + @After public void tearDown() throws Exception { rm.stop(); -- 2.23.0