diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index 76bc14b248c..728b10400e1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -87,6 +87,8 @@ .addTransition(RMContainerState.NEW, EnumSet.of(RMContainerState.RUNNING, RMContainerState.COMPLETED), RMContainerEventType.RECOVER, new ContainerRecoveredTransition()) + .addTransition(RMContainerState.NEW, RMContainerState.RELEASED, + RMContainerEventType.RELEASED) // nothing to do // Transitions from RESERVED state .addTransition(RMContainerState.RESERVED, RMContainerState.RESERVED, @@ -106,6 +108,8 @@ RMContainerEventType.EXPIRE, new FinishedTransition()) .addTransition(RMContainerState.ALLOCATED, RMContainerState.KILLED, RMContainerEventType.KILL, new FinishedTransition()) + .addTransition(RMContainerState.ALLOCATED, RMContainerState.RELEASED, + RMContainerEventType.RELEASED) // nothing to do // Transitions from ACQUIRED state .addTransition(RMContainerState.ACQUIRED, RMContainerState.RUNNING, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index a798b97af5f..898369bd20c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.ResourceSizing; import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerError; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; @@ -1230,6 +1231,25 @@ protected void normalizeResourceRequests(List asks, } } + protected void normalizeSchedulingRequests(List asks) { + normalizeSchedulingRequests(asks, null); + } + + protected void normalizeSchedulingRequests(List asks, + String queueName) { + if (asks == null) { + return; + } + Resource maxAllocation = getMaximumResourceCapability(queueName); + for (SchedulingRequest ask: asks) { + ResourceSizing sizing = ask.getResourceSizing(); + if (sizing != null && sizing.getResources() != null) { + sizing.setResources( + getNormalizedResource(sizing.getResources(), maxAllocation)); + } + } + } + protected void handleContainerUpdates( SchedulerApplicationAttempt appAttempt, ContainerUpdates updates) { List promotionRequests = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index ca7d9ce712e..9061db3ab2b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -159,6 +159,19 @@ private void clearRequests() { LOG.info("Application " + applicationId + " requests cleared"); } + /** + * Clear the requests for the specific key from this application. + * This is used when a container increase is cancelled to cleanup the node + * and rack requests. + * @param schedulerKey the key to clean up. + */ + public void clearRequests(SchedulerRequestKey schedulerKey) { + schedulerKeys.remove(schedulerKey); + schedulerKeyToAppPlacementAllocator.remove(schedulerKey); + LOG.info("Application " + applicationId + " key " + schedulerKey + + " cleared"); + } + public ContainerUpdateContext getUpdateContext() { return updateContext; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java index ab4fc1eb245..e486997fd34 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java @@ -35,7 +35,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer .RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.util.resource.Resources; @@ -153,21 +152,17 @@ public synchronized boolean checkAndAddToOutstandingIncreases( private void cancelPreviousRequest(SchedulerNode schedulerNode, SchedulerRequestKey schedulerKey) { - AppPlacementAllocator appPlacementAllocator = - appSchedulingInfo.getAppPlacementAllocator(schedulerKey); - if (appPlacementAllocator != null) { - PendingAsk pendingAsk = appPlacementAllocator.getPendingAsk( - ResourceRequest.ANY); - // Decrement the pending using a dummy RR with - // resource = prev update req capability - if (pendingAsk != null && pendingAsk.getCount() > 0) { - appSchedulingInfo.allocate(NodeType.OFF_SWITCH, schedulerNode, - schedulerKey, Container.newInstance(UNDEFINED, - schedulerNode.getNodeID(), "host:port", - pendingAsk.getPerAllocationResource(), - schedulerKey.getPriority(), null)); - } + PendingAsk pendingAsk = appSchedulingInfo.getPendingAsk(schedulerKey); + // Decrement the pending using a dummy RR with + // resource = prev update req capability + if (pendingAsk != null && pendingAsk.getCount() > 0) { + appSchedulingInfo.allocate(NodeType.OFF_SWITCH, schedulerNode, + schedulerKey, Container.newInstance(UNDEFINED, + schedulerNode.getNodeID(), "host:port", + pendingAsk.getPerAllocationResource(), + schedulerKey.getPriority(), null)); } + appSchedulingInfo.clearRequests(schedulerKey); } private Map createResourceRequests( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 618ee20cfe0..d3d76c3ebb9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -63,7 +63,6 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.api.records.ResourceSizing; import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -1150,25 +1149,6 @@ private void doneApplicationAttempt( } } - /** - * Normalize a list of SchedulingRequest. - * - * @param asks scheduling request - */ - private void normalizeSchedulingRequests(List asks) { - if (asks == null) { - return; - } - Resource maxAllocation = getMaximumResourceCapability(); - for (SchedulingRequest ask: asks) { - ResourceSizing sizing = ask.getResourceSizing(); - if (sizing != null && sizing.getResources() != null) { - sizing.setResources( - getNormalizedResource(sizing.getResources(), maxAllocation)); - } - } - } - @Override @Lock(Lock.NoLock.class) public Allocation allocate(ApplicationAttemptId applicationAttemptId, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 2a6657aa5df..c03393629d9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -971,6 +971,10 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) { // Scheduler can choose to use various/pluggable delay-scheduling // implementation. for (SchedulerRequestKey schedulerKey : keysToTry) { + if (LOG.isTraceEnabled()) { + LOG.trace("Application " + getName() + " SchedulerKey processed: " + + schedulerKey); + } // Skip it for reserved container, since // we already check it in isValidReservation. if (!reserved && !hasContainerForNode(schedulerKey, node)) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 282367edbaa..1aafc5f116f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -33,7 +33,6 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; @@ -92,6 +91,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ReleaseContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -768,6 +768,12 @@ protected void completedContainerInternal( updateRootQueueMetrics(); } + // Remove outstanding update requests for this container + SchedulerRequestKey contSchedKey = new SchedulerRequestKey( + container.getPriority(), container.getAllocationRequestId(), + container.getId()); + application.getAppSchedulingInfo().clearRequests(contSchedKey); + if (LOG.isDebugEnabled()) { LOG.debug("Application attempt " + application.getApplicationAttemptId() + " released container " + container.getId() + " on node: " + @@ -924,13 +930,14 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, applicationId, queue.getName(), invalidAsks)); } - // Handle promotions and demotions + // Handle promotions, demotions and resizing of containers handleContainerUpdates(application, updateRequests); // Sanity check normalizeResourceRequests(ask, queue.getName()); - // TODO, normalize SchedulingRequest + // Normalize scheduling requests + normalizeSchedulingRequests(schedulingRequests, queue.getName()); // Record container allocation start time application.recordContainerRequestTime(getClock().getTime()); @@ -951,8 +958,8 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, // Update application requests application.updateResourceRequests(ask); + application.updateSchedulingRequests(schedulingRequests); - // TODO, handle SchedulingRequest application.showRequests(); } } finally { @@ -983,15 +990,14 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, Resource headroom = application.getHeadroom(); application.setApplicationHeadroomForMetrics(headroom); - List previousAttemptContainers = application - .pullPreviousAttemptContainers(); - List updatedNMTokens = application.pullUpdatedNMTokens(); return new Allocation(newlyAllocatedContainers, headroom, preemptionContainerIds, null, null, - updatedNMTokens, null, null, + application.pullUpdatedNMTokens(), + application.pullNewlyIncreasedContainers(), + application.pullNewlyDecreasedContainers(), application.pullNewlyPromotedContainers(), application.pullNewlyDemotedContainers(), - previousAttemptContainers); + application.pullPreviousAttemptContainers()); } private List validateResourceRequests( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContainerResizing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContainerResizing.java new file mode 100644 index 00000000000..77d6d14bebb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContainerResizing.java @@ -0,0 +1,914 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerUpdateType; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; +import org.apache.hadoop.yarn.api.records.UpdatedContainer; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Container resizing tests for the FairScheduler. + */ +public class TestContainerResizing extends FairSchedulerTestBase { + private static final Logger LOG = + LoggerFactory.getLogger(TestContainerResizing.class); + private static final int GB = 1024; + // expect user placement rule to be active + private static final String QUEUE_NAME = "root.user"; + private static final String SUBMIT_QUEUE = "default"; + private static final String USER = "user"; + private static final String APPNAME = "testapp"; + + private MockRM rm; + private FairScheduler fs; + private FSLeafQueue queue; + private DrainDispatcher dispatcher; + + @Before + public void setUp() throws Exception { + conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class, + ResourceScheduler.class); + // Disable relaunch app attempt on failure, in order to check + // resource usages for current app only. + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); + // Need this to make sure the priority assignments work + conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, true); + conf.setBoolean(FairSchedulerConfiguration.DYNAMIC_MAX_ASSIGN, false); + conf.setInt(FairSchedulerConfiguration.MAX_ASSIGN, 10); + + dispatcher = new DrainDispatcher(); + rm = new MockRM(conf) { + @Override + protected Dispatcher createDispatcher() { + return dispatcher; + } + }; + rm.start(); + fs = (FairScheduler) rm.getResourceScheduler(); + } + + @After + public void tearDown() throws Exception { + rm.stop(); + } + + /** + * Application has a container running, and the node has enough available + * resource. Add a increase request to see if container will be increased. + */ + @Test + public void testSimpleIncreaseContainer() throws Exception { + MockNM node = rm.registerNode("node1:1234", 20 * GB); + + // Submit app to queue + RMApp rmApp = submitApp(GB); + MockAM appMaster = MockRM.launchAndRegisterAM(rmApp, rm, node); + ApplicationAttemptId appAttemptId = appMaster.getApplicationAttemptId(); + + ContainerId containerId1 = + ContainerId.newContainerId(appAttemptId, 1); + sentRMContainerLaunched(containerId1); + // appMaster asks to change its AM container from 1GB to 3GB + appMaster.sendContainerResizingRequest( + Collections.singletonList( + UpdateContainerRequest.newInstance(0, containerId1, + ContainerUpdateType.INCREASE_RESOURCE, + Resources.createResource(3 * GB), null))); + + // Force an update: if we don't the demand might not be updated when the + // checks are done + fs.update(); + + checkPendingResource(appAttemptId, 2 * GB); + checkPendingResource(2 * GB); + + // NM do 1 heartbeat + RMNode rmNode = rm.getRMContext().getRMNodes().get(node.getNodeId()); + fs.handle(new NodeUpdateSchedulerEvent(rmNode)); + fs.update(); + + // Pending resource should be deducted + checkPendingResource(appAttemptId, 0); + checkPendingResource(0); + // Check the container and node + verifyContainerIncreased(appMaster.allocate(null, null), containerId1, + 3 * GB); + verifyAvailableResourceOfSchedulerNode(node.getNodeId(), 17 * GB); + } + + /** + * Application has a container running, try to decrease the container and + * check queue's usage and container resource will be updated. + */ + @Test + public void testSimpleDecreaseContainer() throws Exception { + MockNM node = rm.registerNode("node1:1234", 8 * GB); + + // Submit app to queue + RMApp rmApp = submitApp(3 * GB); + MockAM appMaster = MockRM.launchAndRegisterAM(rmApp, rm, node); + ApplicationAttemptId appAttemptId = appMaster.getApplicationAttemptId(); + + checkUsedResource(3 * GB); + ContainerId containerId1 = + ContainerId.newContainerId(appAttemptId, 1); + sentRMContainerLaunched(containerId1); + // appMaster asks to change its AM container from 3GB to 1GB + AllocateResponse response = appMaster.sendContainerResizingRequest( + Collections.singletonList( + UpdateContainerRequest.newInstance(0, containerId1, + ContainerUpdateType.DECREASE_RESOURCE, + Resources.createResource(GB), null))); + + // NM do 1 heartbeat + RMNodeImpl rmNode = (RMNodeImpl) rm.getRMContext().getRMNodes(). + get(node.getNodeId()); + fs.handle(new NodeUpdateSchedulerEvent(rmNode)); + dispatcher.waitForEventThreadToWait(); + fs.update(); + + // Check the container and node + verifyContainerDecreased(response, containerId1, GB); + + verifyAvailableResourceOfSchedulerNode(node.getNodeId(), 7 * GB); + checkUsedResource(GB); + checkUsedResource(appAttemptId, GB); + + verifyNodeContainer(rmNode, containerId1, GB); + } + + /** + * Application has two containers running, try to increase one of them. The + * node doesn't have enough resource, so the increase request will be + * reserved. Check resource usage after container reserved, finish the non + * increased container, the reserved container should be allocated. + */ + @Test + public void testSimpleIncreaseRequestReservation() throws Exception { + MockNM node = rm.registerNode("node1:1234", 5 * GB); + + // Submit app to queue + RMApp rmApp = submitApp(2 * GB); + MockAM appMaster = MockRM.launchAndRegisterAM(rmApp, rm, node); + ApplicationAttemptId appAttemptId = appMaster.getApplicationAttemptId(); + + checkUsedResource(2 * GB); + ContainerId containerId1 = + ContainerId.newContainerId(appAttemptId, 1); + sentRMContainerLaunched(containerId1); + + // Allocate one more container + appMaster.allocate(Collections.singletonList( + ResourceRequest.newInstance(Priority.newInstance(1), "*", + Resources.createResource(2 * GB), 1)), null); + ContainerId containerId2 = + ContainerId.newContainerId(appMaster.getApplicationAttemptId(), 2); + assertTrue(rm.waitForState(node, containerId2, + RMContainerState.ALLOCATED)); + // Acquire them, and NM report RUNNING + appMaster.allocate(null, null); + sentRMContainerLaunched(containerId2); + + // appMaster asks to change its AM container from 2GB to 4GB + appMaster.sendContainerResizingRequest( + Collections.singletonList( + UpdateContainerRequest.newInstance(0, containerId1, + ContainerUpdateType.INCREASE_RESOURCE, + Resources.createResource(4 * GB), null))); + + // Force an update + fs.update(); + + checkPendingResource(2 * GB); + checkPendingResource(appAttemptId, 2 * GB); + + // NM do 1 heartbeat + RMNode rmNode = rm.getRMContext().getRMNodes().get(node.getNodeId()); + fs.handle(new NodeUpdateSchedulerEvent(rmNode)); + fs.update(); + + // Increase request should be reserved + FSAppAttempt fsAppAttempt = fs.getSchedulerApp(appAttemptId); + assertNotEquals("No reservations found for FS app", + 0, fsAppAttempt.getNumReservations(null, true)); + assertNotNull(fs.getNode(node.getNodeId()).getReservedContainer()); + // Pending resource will not be changed since it's not satisfied + checkPendingResource(2 * GB); + checkPendingResource(appAttemptId, 2 * GB); + + // Check the queue reservation + checkReservedResource(2 * GB); + checkReservedResource(appAttemptId, 2 * GB); + + // Complete one container and do another allocation + appMaster.allocate(null, Collections.singletonList(containerId2)); + fs.handle(new NodeUpdateSchedulerEvent(rmNode)); + fs.update(); + + // Now container should be increased + verifyContainerIncreased(appMaster.allocate(null, null), containerId1, + 4 * GB); + + // Increase request should be unreserved + checkReservedResource(appAttemptId, 0); + assertNull(fs.getNode(node.getNodeId()).getReservedContainer()); + // Pending resource will be changed since it's satisfied + checkPendingResource(0); + checkPendingResource(appAttemptId, 0); + // Queue & application's usage will not be updated + checkUsedResource(4 * GB); + checkUsedResource(appAttemptId, 4 * GB); + verifyAvailableResourceOfSchedulerNode(node.getNodeId(), GB); + } + + /** + * Application has two containers running, try to increase one of them, the + * requested amount exceeds headroom/limit for the queue. + */ + @Test + public void testIncreaseRequestOverQueueLimit() throws Exception { + MockNM node = rm.registerNode("node1:1234", 20 * GB); + + // Submit app to queue + RMApp rmApp = submitApp(2 * GB); + MockAM appMaster = MockRM.launchAndRegisterAM(rmApp, rm, node); + ApplicationAttemptId appAttemptId = appMaster.getApplicationAttemptId(); + queue.setMaxShare( + new ConfigurableResource(Resources.createResource(5 * GB, 5))); + ContainerId containerId1 = + ContainerId.newContainerId(appMaster.getApplicationAttemptId(), 1); + sentRMContainerLaunched(containerId1); + + // Allocate one more container + appMaster.allocate(Collections.singletonList( + ResourceRequest.newInstance(Priority.newInstance(1), "*", + Resources.createResource(2 * GB), 1)), null); + ContainerId containerId2 = + ContainerId.newContainerId(appMaster.getApplicationAttemptId(), 2); + assertTrue(rm.waitForState(node, containerId2, + RMContainerState.ALLOCATED)); + // Acquire them, and NM report RUNNING + appMaster.allocate(null, null); + sentRMContainerLaunched(containerId2); + + // appMaster asks to change its 2nd container from 2GB to 4GB, which will + // exceed the queue limit + appMaster.sendContainerResizingRequest( + Collections.singletonList( + UpdateContainerRequest.newInstance(0, containerId2, + ContainerUpdateType.INCREASE_RESOURCE, + Resources.createResource(4 * GB), null))); + + // Force an update + fs.update(); + + // The queue is limited: demand will never go above the queue size + checkPendingResource(GB); + // The app should show a pending request + checkPendingResource(appAttemptId, 2 * GB); + + // NM do 1 heartbeat + RMNode rmNode = rm.getRMContext().getRMNodes().get(node.getNodeId()); + fs.handle(new NodeUpdateSchedulerEvent(rmNode)); + fs.update(); + + // Increase request should *NOT* be reserved as it exceeds queue limit + checkReservedResource(appAttemptId, 0); + assertNull(fs.getNode(node.getNodeId()).getReservedContainer()); + // Nothing should change as it goes above the limits + checkPendingResource(GB); + checkPendingResource(appAttemptId, 2 * GB); + + checkUsedResource(4 * GB); + checkUsedResource(appAttemptId, 4 * GB); + checkReservedResource(appAttemptId, 0); + } + + /** + * Application has two containers running, try to increase one of them. The + * node doesn't have enough resources, so the increase request will be + * reserved. Check resource usage after container is reserved, cancel the + * increase request, the reservation should be cancelled. + */ + @Test + public void testCancelIncreaseRequest() throws Exception { + MockNM node = rm.registerNode("node1:1234", 5 * GB); + + // Submit app to queue + RMApp rmApp = submitApp(2 * GB); + MockAM appMaster = MockRM.launchAndRegisterAM(rmApp, rm, node); + ApplicationAttemptId appAttemptId = appMaster.getApplicationAttemptId(); + + ContainerId containerId1 = + ContainerId.newContainerId(appMaster.getApplicationAttemptId(), 1); + sentRMContainerLaunched(containerId1); + + // Allocate one more container + appMaster.allocate(Collections.singletonList( + ResourceRequest.newInstance(Priority.newInstance(1), "*", + Resources.createResource(2 * GB), 1)), null); + ContainerId containerId2 = + ContainerId.newContainerId(appMaster.getApplicationAttemptId(), 2); + assertTrue(rm.waitForState(node, containerId2, + RMContainerState.ALLOCATED)); + // Acquire them, and NM report RUNNING + appMaster.allocate(null, null); + sentRMContainerLaunched(containerId2); + + checkUsedResource(4 * GB); + checkUsedResource(appAttemptId, 4 * GB); + + // appMaster asks to change its 2nd container from 2GB to 4GB + appMaster.sendContainerResizingRequest( + Collections.singletonList( + UpdateContainerRequest.newInstance(0, containerId2, + ContainerUpdateType.INCREASE_RESOURCE, + Resources.createResource(4 * GB), null))); + + // Force an update + fs.update(); + + checkPendingResource(2 * GB); + checkPendingResource(appAttemptId, 2 * GB); + + // NM do 1 heartbeat + RMNode rmNode = rm.getRMContext().getRMNodes().get(node.getNodeId()); + fs.handle(new NodeUpdateSchedulerEvent(rmNode)); + + // Increase request should be reserved + checkReservedResource(2 * GB); + checkReservedResource(appAttemptId, 2 * GB); + assertNotNull(fs.getNode(node.getNodeId()).getReservedContainer()); + // Pending resource will not be changed since it's not satisfied + checkPendingResource(2 * GB); + checkPendingResource(appAttemptId, 2 * GB); + // Queue & application's usage will not be updated + checkUsedResource(4 * GB); + checkUsedResource(appAttemptId, 4 * GB); + + // cancel increase request: send a increase request which makes the + // target_capacity=existing_capacity + appMaster.sendContainerResizingRequest( + Collections.singletonList( + UpdateContainerRequest.newInstance(0, containerId2, + ContainerUpdateType.INCREASE_RESOURCE, + Resources.createResource(2 * GB), null))); + // Trigger a node heartbeat.. + fs.handle(new NodeUpdateSchedulerEvent(rmNode)); + fs.update(); + + // Increase request should be unreserved + checkReservedResource(0); + checkReservedResource(appAttemptId, 0); + assertNull(fs.getNode(node.getNodeId()).getReservedContainer()); + // Pending resource will be changed since it's cancelled + checkPendingResource(0); + checkPendingResource(appAttemptId, 0); + // Queue & application's usage will not be updated + checkUsedResource(4 * GB); + checkUsedResource(appAttemptId, 4 * GB); + } + + /** + * Very similar to testCancelIncreaseRequest, after the increase request + * is reserved, the increase is replaced with a smaller increase. The + * reserved container should be decreased and the original reservation will + * be cancelled. + */ + @Test + public void testDecreaseAfterIncreaseOneContainer() throws Exception { + MockNM node = rm.registerNode("node1:1234", 6 * GB); + + // Submit app to queue + RMApp rmApp = submitApp(2 * GB); + MockAM appMaster = MockRM.launchAndRegisterAM(rmApp, rm, node); + ApplicationAttemptId appAttemptId = appMaster.getApplicationAttemptId(); + + ContainerId containerId1 = + ContainerId.newContainerId(appMaster.getApplicationAttemptId(), 1); + sentRMContainerLaunched(containerId1); + // Allocate one more container + appMaster.allocate(Collections.singletonList( + ResourceRequest.newInstance(Priority.newInstance(1), "*", + Resources.createResource(2 * GB), 1)), null); + ContainerId containerId2 = + ContainerId.newContainerId(appMaster.getApplicationAttemptId(), 2); + assertTrue(rm.waitForState(node, containerId2, + RMContainerState.ALLOCATED)); + + // Acquire them, and NM report RUNNING + appMaster.allocate(null, null); + sentRMContainerLaunched(containerId2); + + // appMaster asks to change its container from 2GB to 5GB + appMaster.sendContainerResizingRequest( + Collections.singletonList( + UpdateContainerRequest.newInstance(0, containerId2, + ContainerUpdateType.INCREASE_RESOURCE, + Resources.createResource(5 * GB), null))); + + // Force an update + fs.update(); + + checkPendingResource(3 * GB); + checkPendingResource(appAttemptId, 3 * GB); + + // NM do 1 heartbeat + RMNode rmNode = rm.getRMContext().getRMNodes().get(node.getNodeId()); + fs.handle(new NodeUpdateSchedulerEvent(rmNode)); + fs.update(); + + // Increase request should be reserved + checkReservedResource(3 * GB); + checkReservedResource(appAttemptId, 3 * GB); + assertNotNull(fs.getNode(node.getNodeId()).getReservedContainer()); + + // Pending resource will not be changed since it's not satisfied + checkPendingResource(3 * GB); + checkPendingResource(appAttemptId, 3 * GB); + // Queue & application's usage will not be updated + checkUsedResource(4 * GB); + checkUsedResource(appAttemptId, 4 * GB); + + // Complete one container and cancel increase request (via a decrease + // request, make target_capacity < current) + appMaster.allocate(null, Collections.singletonList(containerId2)); + // appMaster asks to change its AM container from 2G to 1G (decrease) + appMaster.sendContainerResizingRequest( + Collections.singletonList( + UpdateContainerRequest.newInstance(0, containerId1, + ContainerUpdateType.DECREASE_RESOURCE, + Resources.createResource(GB), null))); + // Trigger a node heartbeat.. + fs.handle(new NodeUpdateSchedulerEvent(rmNode)); + dispatcher.waitForEventThreadToWait(); + + // Increase request should be unreserved + checkReservedResource(0); + checkReservedResource(appAttemptId, 0); + assertNull(fs.getNode(node.getNodeId()).getReservedContainer()); + // Queue & application's usage will be updated + fs.update(); + checkUsedResource(GB); + checkUsedResource(appAttemptId, GB); + // Pending resource will be changed since it's satisfied + checkPendingResource(0); + checkPendingResource(appAttemptId, 0); + } + + /** + * A reservation for an increase should be released when the container + * finishes. + * Application with two containers on the same node. Increase one container + * to a size larger than what is available on the node. This will reserve the + * node. Then release the container with the outstanding reservation for the + * increase. Container and the reserved part should both be released. + */ + @Test + public void testReleaseReservationOnContComplete() + throws Exception { + MockNM node = rm.registerNode("node1:1234", 6 * GB); + + // Submit app to queue + RMApp rmApp = submitApp(2 * GB); + MockAM appMaster = MockRM.launchAndRegisterAM(rmApp, rm, node); + ApplicationAttemptId appAttemptId = appMaster.getApplicationAttemptId(); + + ContainerId containerId1 = + ContainerId.newContainerId(appMaster.getApplicationAttemptId(), 1); + sentRMContainerLaunched(containerId1); + + // Allocate one more container + appMaster.allocate(Collections.singletonList( + ResourceRequest.newInstance(Priority.newInstance(1), "*", + Resources.createResource(2 * GB), 1)), null); + ContainerId containerId2 = + ContainerId.newContainerId(appMaster.getApplicationAttemptId(), 2); + assertTrue(rm.waitForState(node, containerId2, + RMContainerState.ALLOCATED)); + + // Acquire them, and NM report RUNNING + appMaster.allocate(null, null); + sentRMContainerLaunched(containerId2); + + // appMaster asks to change 2nd container from 2GB to 5GB + appMaster.sendContainerResizingRequest( + Collections.singletonList( + UpdateContainerRequest.newInstance(0, containerId2, + ContainerUpdateType.INCREASE_RESOURCE, + Resources.createResource(5 * GB), null))); + + // Force an update + fs.update(); + + checkPendingResource(3 * GB); + checkPendingResource(appAttemptId, 3 * GB); + + // NM do 1 heartbeat + RMNode rmNode = rm.getRMContext().getRMNodes().get(node.getNodeId()); + fs.handle(new NodeUpdateSchedulerEvent(rmNode)); + fs.update(); + + // Increase request should be reserved + checkReservedResource(3 * GB); + checkReservedResource(appAttemptId, 3 * GB); + assertNotNull(fs.getNode(node.getNodeId()).getReservedContainer()); + // Pending resource will not be changed since it's not satisfied + checkPendingResource(3 * GB); + checkPendingResource(appAttemptId, 3 * GB); + // Queue & application's usage will not be updated + checkUsedResource(4 * GB); + checkUsedResource(appAttemptId, 4 * GB); + + // Complete container2, container will be unreserved and completed + appMaster.allocate(null, Collections.singletonList(containerId2)); + + fs.handle(new NodeUpdateSchedulerEvent(rmNode)); + fs.update(); + + // Increase request should be unreserved + checkReservedResource(0); + checkReservedResource(appAttemptId, 0); + assertNull(fs.getNode(node.getNodeId()).getReservedContainer()); + // Pending resource will be changed since it's satisfied + checkPendingResource(0); + checkPendingResource(appAttemptId, 0); + // Queue & application's usage will be updated + checkUsedResource(2 * GB); + checkUsedResource(appAttemptId, 2 * GB); + } + + /** + * Similar to testReleaseReservationOnContComplete, but when an attempt + * finishes the reservation should be cancelled. + */ + @Test + public void testReleaseReservationOnAttemptComplete() + throws Exception { + MockNM node = rm.registerNode("node1:1234", 6 * GB); + + // Submit app to queue + RMApp rmApp = submitApp(2 * GB); + MockAM appMaster = MockRM.launchAndRegisterAM(rmApp, rm, node); + ApplicationAttemptId appAttemptId = appMaster.getApplicationAttemptId(); + + ContainerId containerId1 = + ContainerId.newContainerId(appMaster.getApplicationAttemptId(), 1); + sentRMContainerLaunched(containerId1); + + // Allocate one more container + appMaster.allocate(Collections.singletonList( + ResourceRequest.newInstance(Priority.newInstance(1), "*", + Resources.createResource(2 * GB), 1)), null); + ContainerId containerId2 = + ContainerId.newContainerId(appMaster.getApplicationAttemptId(), 2); + assertTrue(rm.waitForState(node, containerId2, + RMContainerState.ALLOCATED)); + + // Acquire them, and NM report RUNNING + appMaster.allocate(null, null); + sentRMContainerLaunched(containerId2); + + // appMaster asks to change 2nd container from 2GB to 5GB + appMaster.sendContainerResizingRequest( + Collections.singletonList( + UpdateContainerRequest.newInstance(0, containerId2, + ContainerUpdateType.INCREASE_RESOURCE, + Resources.createResource(5 * GB), null))); + + fs.update(); + + checkPendingResource(3 * GB); + checkPendingResource(appAttemptId, 3 * GB); + + // NM do 1 heartbeat + RMNode rmNode = rm.getRMContext().getRMNodes().get(node.getNodeId()); + fs.handle(new NodeUpdateSchedulerEvent(rmNode)); + fs.update(); + + // Increase request should be reserved + checkReservedResource(3 * GB); + checkReservedResource(appAttemptId, 3 * GB); + assertNotNull(fs.getNode(node.getNodeId()).getReservedContainer()); + // Pending resource will not be changed since it's not satisfied + checkPendingResource(3 * GB); + checkPendingResource(appAttemptId, 3 * GB); + // Queue & application's usage will not be updated + checkUsedResource(4 * GB); + checkUsedResource(appAttemptId, 4 * GB); + + // Kill the application by killing the AM container + fs.killContainer(fs.getRMContainer(containerId1)); + rm.waitForState(appMaster.getApplicationAttemptId(), + RMAppAttemptState.FAILED); + rm.waitForState(appMaster.getApplicationAttemptId().getApplicationId(), + RMAppState.FAILED); + + // Force an update + fs.update(); + + // Increase request should be unreserved + checkReservedResource(0); + assertNull(fs.getNode(node.getNodeId()).getReservedContainer()); + // Pending resource will be changed since app is done + checkPendingResource(0); + // Queue & application's usage will be updated + checkUsedResource(0); + } + + /** + * There're multiple containers need to be increased. Check container will + * be increase sorted by priority, if priority is same, lower containerId + * container will get preferred (earlier allocated). + */ + @Test + public void testOrderOfIncrease() + throws Exception { + MockNM node = rm.registerNode("node1:1234", 10 * GB); + + // Submit app to queue + RMApp rmApp = submitApp(1 * GB); + MockAM appMaster = MockRM.launchAndRegisterAM(rmApp, rm, node); + ApplicationAttemptId appAttemptId = appMaster.getApplicationAttemptId(); + + ContainerId containerId1 = + ContainerId.newContainerId(appMaster.getApplicationAttemptId(), 1); + sentRMContainerLaunched(containerId1); + + // Container 2, 3 (priority=3) + allocateAndLaunchContainers(appMaster, node, 2, 1 * GB, 3, 2); + + // Container 4, 5 (priority=2) + allocateAndLaunchContainers(appMaster, node, 2, 1 * GB, 2, 4); + + // Container 6, 7 (priority=4) + allocateAndLaunchContainers(appMaster, node, 2, 1 * GB, 4, 6); + + // AM asks to change its container[2-7] from 1G to 2G + List increaseRequests = new ArrayList<>(); + for (int cId = 2; cId <= 7; cId++) { + ContainerId containerId = + ContainerId.newContainerId(appAttemptId, cId); + increaseRequests.add( + UpdateContainerRequest.newInstance(0, containerId, + ContainerUpdateType.INCREASE_RESOURCE, + Resources.createResource(2 * GB), null)); + } + appMaster.sendContainerResizingRequest(increaseRequests); + + // Force an update + fs.update(); + + checkPendingResource(6 * GB); + checkPendingResource(appAttemptId, 6 * GB); + + // assignContainer, container-4/5/2 increased (which has highest priority + // OR was earlier allocated) + // NM do 1 heartbeat + RMNode rmNode = rm.getRMContext().getRMNodes().get(node.getNodeId()); + fs.handle(new NodeUpdateSchedulerEvent(rmNode)); + fs.update(); + AllocateResponse allocateResponse = appMaster.allocate(null, null); + + assertEquals(3, allocateResponse.getUpdatedContainers().size()); + verifyContainerIncreased(allocateResponse, + ContainerId.newContainerId(appAttemptId, 4), 2 * GB); + verifyContainerIncreased(allocateResponse, + ContainerId.newContainerId(appAttemptId, 5), 2 * GB); + verifyContainerIncreased(allocateResponse, + ContainerId.newContainerId(appAttemptId, 2), 2 * GB); + + // There're still 3 GB pending increase requests + checkPendingResource(3 * GB); + checkPendingResource(appAttemptId, 3 * GB); + // Queue & application's usage will be updated + checkUsedResource(10 * GB); + checkUsedResource(appAttemptId, 10 * GB); + checkReservedResource(0); + checkReservedResource(appAttemptId, 0); + } + + private RMApp submitApp(int contMemory) throws Exception { + RMApp newApp = rm.submitApp(contMemory, APPNAME, USER, null, SUBMIT_QUEUE); + queue = fs.getQueueManager().getLeafQueue(QUEUE_NAME, false); + return newApp; + } + + private void checkReservedResource(long expectedMem) { + assertEquals("Reserved queue resource is not correct size", expectedMem, + queue.getReservedResource().getMemorySize()); + LOG.debug("Checked reserved queue resources: " + + queue.getReservedResource().getMemorySize()); + } + + private void checkReservedResource(ApplicationAttemptId appAttemptId, + long expectedMem) { + FSAppAttempt fsAppAttempt = fs.getSchedulerApp(appAttemptId); + assertNotNull("Scheduler application not found", fsAppAttempt); + List reserved = fsAppAttempt.getReservedContainers(); + long actual = 0; + for (RMContainer cont: reserved) { + actual += cont.getReservedResource().getMemorySize(); + } + assertEquals("Reserved app resource is not correct size", expectedMem, + actual); + LOG.debug("Checked reserved app resources: " + actual); + } + + private void checkPendingResource(long expectedMem) { + // Pending in the test is any demand minus used + long used = queue.getResourceUsage().getMemorySize(); + long demand = queue.getDemand().getMemorySize(); + LOG.debug("Checked pending queue resources: used " + used + " demand " + + demand); + assertEquals("Pending queue resource is not correct size", expectedMem, + demand - used); + } + + private void checkPendingResource(ApplicationAttemptId appAttemptId, + long expectedMem) { + List resReqList = + fs.getPendingResourceRequestsForAttempt(appAttemptId); + long actual = 0; + long total = 0; + LOG.debug("Request list size: " + resReqList.size()); + for (ResourceRequest resReq: resReqList) { + if (resReq.getResourceName().equalsIgnoreCase(ResourceRequest.ANY)) { + actual += resReq.getCapability().getMemorySize(); + } + total += resReq.getCapability().getMemorySize(); + } + assertEquals("Pending app resource is not correct size", expectedMem, + actual); + LOG.debug("Checked pending app resources: only any " + actual + ", all " + + total); + } + + private void checkUsedResource(long expectedMem) { + assertEquals("Used queue resources is not correct size", expectedMem, + queue.getResourceUsage().getMemorySize()); + LOG.debug("Checked used queue resources " + + queue.getResourceUsage().getMemorySize()); + } + + private void checkUsedResource(ApplicationAttemptId appAttemptId, + long expectedMem) { + FSAppAttempt fsAppAttempt = fs.getSchedulerApp(appAttemptId); + assertNotNull("Scheduler application not found", fsAppAttempt); + assertEquals("Used app resources is not correct size", expectedMem, + fsAppAttempt.getCurrentConsumption().getMemorySize()); + LOG.debug("Checked used app resources: " + + fsAppAttempt.getCurrentConsumption().getMemorySize()); + } + + private void verifyContainerDecreased(AllocateResponse response, + ContainerId containerId, + long expectedMem) { + verifyContainer(ContainerUpdateType.DECREASE_RESOURCE, response, + containerId, expectedMem); + } + + private void verifyContainerIncreased(AllocateResponse response, + ContainerId containerId, + long expectedMem) { + verifyContainer(ContainerUpdateType.INCREASE_RESOURCE, response, + containerId, expectedMem); + } + + private void verifyContainer(ContainerUpdateType type, + AllocateResponse response, + ContainerId containerId, + long expectedMem) { + List updatedContainers = + response.getUpdatedContainers(); + boolean found = false; + for (UpdatedContainer cont : updatedContainers) { + if (cont.getContainer().getId().equals(containerId)) { + found = true; + assertEquals("Wrong update type found ", type, cont.getUpdateType()); + assertEquals("Used resources is not correct size", expectedMem, + cont.getContainer().getResource().getMemorySize()); + } + } + if (!found) { + fail("Container not changed: containerId = " + containerId); + } + } + + private void allocateAndLaunchContainers(MockAM appMaster, MockNM node, + int nContainer, int mem, + int priority, int startContainerId) + throws Exception { + + // setup what is needed + int lastId = startContainerId + nContainer - 1; + ApplicationAttemptId attemptId = appMaster.getApplicationAttemptId(); + ContainerId tempId = ContainerId.newContainerId(attemptId, lastId); + + // allocate the container(s) + appMaster.allocate(Collections.singletonList( + ResourceRequest.newInstance(Priority.newInstance(priority), "*", + Resources.createResource(mem), nContainer)), null); + assertTrue(rm.waitForState(node, tempId, RMContainerState.ALLOCATED)); + + // Acquire them, and NM report RUNNING + appMaster.allocate(null, null); + for (int cId = startContainerId; cId <= lastId; cId++) { + tempId = ContainerId.newContainerId(attemptId, cId); + sentRMContainerLaunched(tempId); + rm.waitForState(node, tempId, RMContainerState.RUNNING); + } + } + + private void sentRMContainerLaunched(ContainerId containerId) { + RMContainer rmContainer = fs.getRMContainer(containerId); + if (rmContainer != null) { + rmContainer.handle( + new RMContainerEvent(containerId, RMContainerEventType.LAUNCHED)); + } else { + fail("Cannot find RMContainer"); + } + } + + private void verifyAvailableResourceOfSchedulerNode(NodeId nodeId, + long expectedMem) { + SchedulerNode node = fs.getNode(nodeId); + assertEquals("Available node resource is not correct size", expectedMem, + node.getUnallocatedResource().getMemorySize()); + } + + private void verifyNodeContainer(RMNodeImpl rmNode, ContainerId containerId1, + int expectedMem) { + Collection updatedContainers = + rmNode.getToBeUpdatedContainers(); + boolean receivedContainer = false; + for (Container cont : updatedContainers) { + if (cont.getId().equals(containerId1)) { + receivedContainer = + (cont.getResource().getMemorySize() == expectedMem); + } + } + assertTrue("Node did not receive updated container", receivedContainer); + } +}