diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index c94c3795963..4b7632741cc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -980,11 +980,11 @@ public SchedulerNode getNode(NodeId nodeId) { /** * Get lists of new containers from NodeManager and process them. * @param nm The RMNode corresponding to the NodeManager + * @param schedulerNode schedulerNode * @return list of completed containers */ - protected List updateNewContainerInfo(RMNode nm) { - SchedulerNode node = getNode(nm.getNodeID()); - + private List updateNewContainerInfo(RMNode nm, + SchedulerNode schedulerNode) { List containerInfoList = nm.pullContainerUpdates(); List newlyLaunchedContainers = new ArrayList<>(); @@ -999,14 +999,15 @@ public SchedulerNode getNode(NodeId nodeId) { // Processing the newly launched containers for (ContainerStatus launchedContainer : newlyLaunchedContainers) { - containerLaunchedOnNode(launchedContainer.getContainerId(), node); + containerLaunchedOnNode(launchedContainer.getContainerId(), + schedulerNode); } // Processing the newly increased containers List newlyIncreasedContainers = nm.pullNewlyIncreasedContainers(); for (Container container : newlyIncreasedContainers) { - containerIncreasedOnNode(container.getId(), node, container); + containerIncreasedOnNode(container.getId(), schedulerNode, container); } return completedContainers; @@ -1017,12 +1018,12 @@ public SchedulerNode getNode(NodeId nodeId) { * @param completedContainers Extracted list of completed containers * @param releasedResources Reference resource object for completed containers * @param nodeId NodeId corresponding to the NodeManager + * @param schedulerNode schedulerNode * @return The total number of released containers */ - protected int updateCompletedContainers(List - completedContainers, Resource releasedResources, NodeId nodeId) { + private int updateCompletedContainers(List completedContainers, + Resource releasedResources, NodeId nodeId, SchedulerNode schedulerNode) { int releasedContainers = 0; - SchedulerNode node = getNode(nodeId); List untrackedContainerIdList = new ArrayList(); for (ContainerStatus completedContainer : completedContainers) { ContainerId containerId = completedContainer.getContainerId(); @@ -1030,8 +1031,8 @@ protected int updateCompletedContainers(List RMContainer container = getRMContainer(containerId); completedContainer(container, completedContainer, RMContainerEventType.FINISHED); - if (node != null) { - node.releaseContainer(containerId, true); + if (schedulerNode != null) { + schedulerNode.releaseContainer(containerId, true); } if (container != null) { @@ -1076,14 +1077,14 @@ protected void updateSchedulerHealthInformation(Resource releasedResources, /** * Update container and utilization information on the NodeManager. * @param nm The NodeManager to update + * @param schedulerNode schedulerNode */ - protected void updateNodeResourceUtilization(RMNode nm) { - SchedulerNode node = getNode(nm.getNodeID()); + protected void updateNodeResourceUtilization(RMNode nm, + SchedulerNode schedulerNode) { // Updating node resource utilization - node.setAggregatedContainersUtilization( + schedulerNode.setAggregatedContainersUtilization( nm.getAggregatedContainersUtilization()); - node.setNodeUtilization(nm.getNodeUtilization()); - + schedulerNode.setNodeUtilization(nm.getNodeUtilization()); } /** @@ -1097,12 +1098,17 @@ protected void nodeUpdate(RMNode nm) { } // Process new container information - List completedContainers = updateNewContainerInfo(nm); + SchedulerNode schedulerNode = getNode(nm.getNodeID()); + List completedContainers = updateNewContainerInfo(nm, + schedulerNode); + + // Notify Scheduler Node updated. + schedulerNode.notifyNodeUpdate(); // Process completed containers Resource releasedResources = Resource.newInstance(0, 0); int releasedContainers = updateCompletedContainers(completedContainers, - releasedResources, nm.getNodeID()); + releasedResources, nm.getNodeID(), schedulerNode); // If the node is decommissioning, send an update to have the total // resource equal to the used resource, so no available resource to @@ -1115,18 +1121,17 @@ protected void nodeUpdate(RMNode nm) { .getEventHandler() .handle( new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption - .newInstance(getSchedulerNode(nm.getNodeID()) - .getAllocatedResource(), 0))); + .newInstance(schedulerNode.getAllocatedResource(), 0))); } updateSchedulerHealthInformation(releasedResources, releasedContainers); - updateNodeResourceUtilization(nm); + updateNodeResourceUtilization(nm, schedulerNode); // Now node data structures are up-to-date and ready for scheduling. if(LOG.isDebugEnabled()) { - SchedulerNode node = getNode(nm.getNodeID()); - LOG.debug("Node being looked for scheduling " + nm + - " availableResource: " + node.getUnallocatedResource()); + LOG.debug( + "Node being looked for scheduling " + nm + " availableResource: " + + schedulerNode.getUnallocatedResource()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index 05dbf1e51a5..89f748d4181 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ExecutionType; @@ -76,6 +77,9 @@ private volatile Set labels = null; + // Last updated time + private volatile long lastHeartbeatMonotonicTime; + public SchedulerNode(RMNode node, boolean usePortForNodeName, Set labels) { this.rmNode = node; @@ -87,6 +91,7 @@ public SchedulerNode(RMNode node, boolean usePortForNodeName, nodeName = rmNode.getHostName(); } this.labels = ImmutableSet.copyOf(labels); + this.lastHeartbeatMonotonicTime = Time.monotonicNow(); } public SchedulerNode(RMNode node, boolean usePortForNodeName) { @@ -453,6 +458,17 @@ public ResourceUtilization getNodeUtilization() { return this.nodeUtilization; } + public long getLastHeartbeatMonotonicTime() { + return lastHeartbeatMonotonicTime; + } + + /** + * This will be called for each node heartbeat. + */ + public void notifyNodeUpdate() { + this.lastHeartbeatMonotonicTime = Time.monotonicNow(); + } + private static class ContainerInfo { private final RMContainer container; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index ba2f85a4fbf..4e1324e7f18 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -140,7 +140,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.Lock; -import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceUtils; @@ -179,8 +178,6 @@ private CSConfigurationProvider csConfProvider; - protected Clock monotonicClock; - @Override public void setConf(Configuration conf) { yarnConf = conf; @@ -237,6 +234,7 @@ public Configuration getConf() { private boolean scheduleAsynchronously; private List asyncSchedulerThreads; + private AsyncScheduleAMRescheduleThread asyncScheduleAMRescheduleThread; private ResourceCommitterService resourceCommitterService; private RMNodeLabelsManager labelManager; private AppPriorityACLsManager appPriorityACLManager; @@ -358,6 +356,8 @@ void initScheduler(Configuration configuration) throws asyncSchedulerThreads.add(new AsyncScheduleThread(this)); } resourceCommitterService = new ResourceCommitterService(this); + asyncScheduleAMRescheduleThread = new AsyncScheduleAMRescheduleThread( + this); } // Setup how many containers we can allocate for each round @@ -386,6 +386,7 @@ private void startSchedulerThreads() { } resourceCommitterService.start(); + asyncScheduleAMRescheduleThread.start(); } } finally { writeLock.unlock(); @@ -418,6 +419,8 @@ public void serviceStop() throws Exception { } resourceCommitterService.interrupt(); resourceCommitterService.join(THREAD_JOIN_TIMEOUT_MS); + asyncScheduleAMRescheduleThread.interrupt(); + asyncScheduleAMRescheduleThread.join(THREAD_JOIN_TIMEOUT_MS); } } finally { writeLock.unlock(); @@ -479,19 +482,76 @@ static void schedule(CapacityScheduler cs) throws InterruptedException{ Collection nodes = cs.nodeTracker.getAllNodes(); int start = random.nextInt(nodes.size()); + // Allocate containers of node [start, end) for (FiCaSchedulerNode node : nodes) { if (current++ >= start) { + // Skip node which missed 2 heartbeats since the node might be dead and + // we should not continue allocate containers on that. + if (Time.monotonicNow() - node.getLastHeartbeatMonotonicTime() + > cs.nmHeartbeatInterval * 2) { + continue; + } cs.allocateContainersToNode(node.getNodeID(), false); } } - // Now, just get everyone to be safe + + current = 0; + + // Allocate containers of node [0, start) for (FiCaSchedulerNode node : nodes) { + if (current++ > start) { + break; + } + if (Time.monotonicNow() - node.getLastHeartbeatMonotonicTime() + > cs.nmHeartbeatInterval * 2) { + continue; + } cs.allocateContainersToNode(node.getNodeID(), false); } Thread.sleep(cs.getAsyncScheduleInterval()); } + static class AsyncScheduleAMRescheduleThread extends Thread { + private final CapacityScheduler cs; + + public AsyncScheduleAMRescheduleThread(CapacityScheduler cs) { + this.cs = cs; + setDaemon(true); + setName(AsyncScheduleAMRescheduleThread.class.getSimpleName()); + } + + @Override + public void run() { + long longestWaitingTime = cs.nmHeartbeatInterval * 10; + + while (true) { + for (FiCaSchedulerNode node : this.cs.getAllNodes()) { + for (RMContainer rmContainer : node + .getCopiedListOfRunningContainers()) { + if (rmContainer.isAMContainer() && rmContainer.getState().equals( + RMContainerState.ALLOCATED) + && Time.monotonicNow() - node.getLastHeartbeatMonotonicTime() + > longestWaitingTime) { + cs.completedContainer(rmContainer, SchedulerUtils + .createAbnormalContainerStatus(rmContainer.getContainerId(), + "To avoid AM got stuck in ACCEPTED state, Kill AM " + + "container which allocated but not launched on " + + "a possibly lost node. " + + "(which missed several node heartbeats)"), + RMContainerEventType.KILL); + } + } + } + try { + Thread.sleep(longestWaitingTime); + } catch (InterruptedException e) { + LOG.warn(e, e); + } + } + } + } + static class AsyncScheduleThread extends Thread { private final CapacityScheduler cs; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java index 77596e25be5..960aba0a010 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; @@ -72,6 +73,8 @@ RMNodeLabelsManager mgr; + private NMHeartbeatThread nmHeartbeatThread = null; + @Before public void setUp() throws Exception { conf = new YarnConfiguration(); @@ -122,9 +125,11 @@ public RMNodeLabelsManager createNodeLabelManager() { List nms = new ArrayList<>(); // Add 10 nodes to the cluster, in the cluster we have 200 GB resource for (int i = 0; i < 10; i++) { - nms.add(rm.registerNode("h-" + i + ":1234", 20 * GB)); + nms.add(rm.registerNode("127.0.0." + i + ":1234", 20 * GB)); } + keepNMHeartbeat(nms, 1000); + List ams = new ArrayList<>(); // Add 3 applications to the cluster, one app in one queue // the i-th app ask (20 * i) containers. So in total we will have @@ -185,8 +190,8 @@ public void testCommitProposalForFailedAppAttempt() // init RM & NMs & Nodes final MockRM rm = new MockRM(disableAsyncConf); rm.start(); - final MockNM nm1 = rm.registerNode("h1:1234", 9 * GB); - final MockNM nm2 = rm.registerNode("h2:2234", 9 * GB); + final MockNM nm1 = rm.registerNode("192.168.0.1:1234", 9 * GB); + final MockNM nm2 = rm.registerNode("192.168.0.2:2234", 9 * GB); List nmLst = new ArrayList<>(); nmLst.add(nm1); nmLst.add(nm2); @@ -277,8 +282,8 @@ public void testCommitOutdatedReservedProposal() throws Exception { // init RM & NMs & Nodes final MockRM rm = new MockRM(disableAsyncConf); rm.start(); - final MockNM nm1 = rm.registerNode("h1:1234", 9 * GB); - final MockNM nm2 = rm.registerNode("h2:2234", 9 * GB); + final MockNM nm1 = rm.registerNode("127.0.0.1:1234", 9 * GB); + final MockNM nm2 = rm.registerNode("127.0.0.2:2234", 9 * GB); // init scheduler nodes int waitTime = 1000; @@ -416,8 +421,8 @@ public void testNodeResourceOverAllocated() // init RM & NMs & Nodes final MockRM rm = new MockRM(disableAsyncConf); rm.start(); - final MockNM nm1 = rm.registerNode("h1:1234", 9 * GB); - final MockNM nm2 = rm.registerNode("h2:1234", 9 * GB); + final MockNM nm1 = rm.registerNode("127.0.0.1:1234", 9 * GB); + final MockNM nm2 = rm.registerNode("127.0.0.2:1234", 9 * GB); List nmLst = new ArrayList<>(); nmLst.add(nm1); nmLst.add(nm2); @@ -476,6 +481,221 @@ public void testNodeResourceOverAllocated() rm.stop(); } + /** + * Make sure scheduler skips NMs which haven't heartbeat for a while. + * @throws Exception + */ + @Test + public void testAsyncSchedulerSkipNoHeartbeatNMs() throws Exception { + int heartbeatInterval = 100; + conf.setInt( + CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD, + 1); + conf.setInt(CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX + + ".scheduling-interval-ms", 100); + // Heartbeat interval is 100 ms. + conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, heartbeatInterval); + + final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); + mgr.init(conf); + + // inject node label manager + MockRM rm = new MockRM(TestUtils.getConfigurationWithMultipleQueues(conf)) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + + rm.getRMContext().setNodeLabelManager(mgr); + rm.start(); + + List nms = new ArrayList<>(); + // Add 10 nodes to the cluster, in the cluster we have 200 GB resource + for (int i = 0; i < 10; i++) { + nms.add(rm.registerNode("127.0.0." + i + ":1234", 20 * GB)); + } + + List ams = new ArrayList<>(); + + keepNMHeartbeat(nms, heartbeatInterval); + + for (int i = 0; i < 3; i++) { + RMApp rmApp = rm.submitApp(1024, "app", "user", null, false, + Character.toString((char) (i % 34 + 97)), 1, null, null, false); + MockAM am = MockRM.launchAMWhenAsyncSchedulingEnabled(rmApp, rm); + am.registerAppAttempt(); + ams.add(am); + } + + pauseNMHeartbeat(); + + Thread.sleep(heartbeatInterval * 3); + + // Applications request containers. + for (int i = 0; i < 3; i++) { + ams.get(i).allocate("*", 1024, 20 * (i + 1), new ArrayList<>()); + } + + for (int i = 0; i < 5; i++) { + // Do heartbeat for NM 0-4 + nms.get(i).nodeHeartbeat(true); + } + + // Wait for 2000 ms. + Thread.sleep(2000); + + // Make sure that NM5-9 don't have non-AM containers. + for (int i = 0; i < 9; i++) { + if (i < 5) { + Assert.assertTrue(checkNumNonAMContainersOnNode(cs, nms.get(i)) > 0); + } else { + Assert.assertTrue(checkNumNonAMContainersOnNode(cs, nms.get(i)) == 0); + } + } + + rm.close(); + } + + /** + * Make sure scheduler skips NMs which haven't heartbeat for a while. + * @throws Exception + */ + @Test + public void testAMContainerRescheduledWhenStuckedInAllocatedState() + throws Exception { + int heartbeatInterval = 100; + conf.setInt( + CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD, + 1); + conf.setInt(CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX + + ".scheduling-interval-ms", 100); + // Heartbeat interval is 100 ms. + conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, heartbeatInterval); + + final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); + mgr.init(conf); + + // inject node label manager + MockRM rm = new MockRM(TestUtils.getConfigurationWithMultipleQueues(conf)) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + + rm.getRMContext().setNodeLabelManager(mgr); + rm.start(); + + List nms = new ArrayList<>(); + // Add 10 nodes to the cluster, in the cluster we have 200 GB resource + for (int i = 0; i < 10; i++) { + nms.add(rm.registerNode("127.0.0." + i + ":1234", 20 * GB)); + } + + keepNMHeartbeat(nms, heartbeatInterval); + pauseNMHeartbeat(); + + // After stop NM heartbeat, AM container should be allocated + RMApp rmApp = rm.submitApp(1024, "app", "user", null, false, "a", 1, null, + null, false); + int i = 0; + while (rmApp.getCurrentAppAttempt() == null) { + if (i < 100) { + i++; + } + Thread.sleep(50); + } + RMAppAttempt attempt = rmApp.getCurrentAppAttempt(); + rm.drainEvents(); + + // Make sure AM container get allocated. + FiCaSchedulerApp app = null; + i = 0; + while (i < 200) { + i++; + app = cs.getApplicationAttempt(attempt.getAppAttemptId()); + if (app != null && app.getLiveContainers().size() > 0) { + break; + } + Thread.sleep(100); + } + Assert.assertTrue(app != null && app.getLiveContainers().size() > 0); + + // Sleep 20 heartbeat and we should find AM container get killed. + Thread.sleep(heartbeatInterval * 30); + + Assert.assertEquals(0, app.getLiveContainers().size()); + + rm.close(); + } + + private static class NMHeartbeatThread extends Thread { + private List mockNMS; + private int interval; + private volatile boolean shouldStop = false; + + public NMHeartbeatThread(List mockNMs, int interval) { + this.mockNMS = mockNMs; + this.interval = interval; + } + + public void run() { + while (true) { + if (shouldStop) { + break; + } + for (MockNM nm : mockNMS) { + try { + nm.nodeHeartbeat(true); + } catch (Exception e) { + e.printStackTrace(); + } + } + try { + Thread.sleep(interval); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + public void setShouldStop() { + shouldStop = true; + } + } + + private void keepNMHeartbeat(List mockNMs, int interval) { + if (nmHeartbeatThread != null) { + nmHeartbeatThread.setShouldStop(); + nmHeartbeatThread = null; + } + nmHeartbeatThread = new NMHeartbeatThread(mockNMs, interval); + nmHeartbeatThread.start(); + } + + private void pauseNMHeartbeat() { + if (nmHeartbeatThread != null) { + nmHeartbeatThread.setShouldStop(); + nmHeartbeatThread = null; + } + } + + private int checkNumNonAMContainersOnNode(CapacityScheduler cs, MockNM nm) { + SchedulerNode node = cs.getNode(nm.getNodeId()); + int nonAMContainer = 0; + for (RMContainer c : node.getCopiedListOfRunningContainers()) { + if (!c.isAMContainer()) { + nonAMContainer++; + } + } + return nonAMContainer; + } + private void allocateAndLaunchContainers(MockAM am, MockNM nm, MockRM rm, int nContainer, Resource resource, int priority, int startContainerId) throws Exception {