diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/AllocationExpirationInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/AllocationExpirationInfo.java new file mode 100644 index 0000000..f4fc72a --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/AllocationExpirationInfo.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer; + + +import org.apache.hadoop.yarn.api.records.ContainerId; + +public class AllocationExpirationInfo implements + Comparable { + + private final ContainerId containerId; + private final boolean increase; + + public AllocationExpirationInfo(ContainerId containerId) { + this(containerId, false); + } + + public AllocationExpirationInfo( + ContainerId containerId, boolean increase) { + this.containerId = containerId; + this.increase = increase; + } + + public ContainerId getContainerId() { + return this.containerId; + } + + public boolean isIncrease() { + return this.increase; + } + + @Override + public int hashCode() { + return (getContainerId().hashCode() << 16); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof AllocationExpirationInfo)) { + return false; + } + return compareTo((AllocationExpirationInfo)other) == 0; + } + + @Override + public int compareTo(AllocationExpirationInfo other) { + if (other == null) { + return -1; + } + // Only need to compare containerId. + return getContainerId().compareTo(other.getContainerId()); + } + + @Override + public String toString() { + return ""; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/ContainerAllocationExpirer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/ContainerAllocationExpirer.java index c393f4e..d8198f4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/ContainerAllocationExpirer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/ContainerAllocationExpirer.java @@ -19,7 +19,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; @@ -28,7 +27,7 @@ @SuppressWarnings({"unchecked", "rawtypes"}) public class ContainerAllocationExpirer extends - AbstractLivelinessMonitor { + AbstractLivelinessMonitor { private EventHandler dispatcher; @@ -47,7 +46,9 @@ public void serviceInit(Configuration conf) throws Exception { } @Override - protected void expire(ContainerId containerId) { - dispatcher.handle(new ContainerExpiredSchedulerEvent(containerId)); + protected void expire(AllocationExpirationInfo allocationExpirationInfo) { + dispatcher.handle(new ContainerExpiredSchedulerEvent( + allocationExpirationInfo.getContainerId(), + allocationExpirationInfo.isIncrease())); } -} \ No newline at end of file +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java index dc0d9ba..5d26931 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java @@ -57,6 +57,8 @@ Resource getAllocatedResource(); + Resource getLastConfirmedResource(); + NodeId getAllocatedNode(); Priority getAllocatedPriority(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index 83876d0..d994ddb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; @SuppressWarnings({"unchecked", "rawtypes"}) @@ -119,9 +120,6 @@ RMContainerEventType.RELEASED, new KillTransition()) .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING, RMContainerEventType.RESERVED, new ContainerReservedTransition()) - .addTransition(RMContainerState.RUNNING, RMContainerState.EXPIRED, - RMContainerEventType.EXPIRE, - new ContainerExpiredWhileRunningTransition()) .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING, RMContainerEventType.CHANGE_RESOURCE, new ChangeResourceTransition()) .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING, @@ -177,7 +175,10 @@ private List resourceRequests; private volatile boolean hasIncreaseReservation = false; - + // Only used for container resource increase and decrease. This is the + // resource to rollback to should container resource increase token expires. + private Resource lastConfirmedResource; + public RMContainerImpl(Container container, ApplicationAttemptId appAttemptId, NodeId nodeId, String user, RMContext rmContext) { @@ -210,6 +211,7 @@ public RMContainerImpl(Container container, this.isAMContainer = false; this.resourceRequests = null; this.nodeLabelExpression = nodeLabelExpression; + this.lastConfirmedResource = container.getResource(); ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); this.readLock = lock.readLock(); @@ -284,6 +286,16 @@ public Resource getAllocatedResource() { } @Override + public Resource getLastConfirmedResource() { + try { + readLock.lock(); + return this.lastConfirmedResource; + } finally { + readLock.unlock(); + } + } + + @Override public NodeId getAllocatedNode() { return container.getNodeId(); } @@ -525,7 +537,8 @@ public void transition(RMContainerImpl container, RMContainerEvent event) { container.setResourceRequests(null); // Register with containerAllocationExpirer. - container.containerAllocationExpirer.register(container.getContainerId()); + container.containerAllocationExpirer.register( + new AllocationExpirationInfo(container.getContainerId())); // Tell the app container.eventHandler.handle(new RMAppRunningOnNodeEvent(container @@ -543,7 +556,8 @@ public void transition(RMContainerImpl container, RMContainerEvent event) { if (acquiredEvent.isIncreasedContainer()) { // If container is increased but not acquired by AM, we will start // containerAllocationExpirer for this container in this transition. - container.containerAllocationExpirer.register(event.getContainerId()); + container.containerAllocationExpirer.register( + new AllocationExpirationInfo(event.getContainerId(), true)); } } } @@ -553,22 +567,10 @@ public void transition(RMContainerImpl container, RMContainerEvent event) { @Override public void transition(RMContainerImpl container, RMContainerEvent event) { - // Unregister the allocation expirer, it is already increased.. - container.containerAllocationExpirer.unregister(event.getContainerId()); - } - } - - private static final class ContainerExpiredWhileRunningTransition extends - BaseTransition { - - @Override - public void transition(RMContainerImpl container, RMContainerEvent event) { - // When the container expired, and it has a pending increased request, we - // will kill the container. - // TODO, we can do better for this: roll back container resource to the - // resource before increase, and notify scheduler about this decrease as - // well. Will do that in a separated JIRA. - new KillTransition().transition(container, event); + // Unregister the allocation expirer, it is already increased. + container.lastConfirmedResource = container.getAllocatedResource(); + container.containerAllocationExpirer.unregister( + new AllocationExpirationInfo(event.getContainerId())); } } @@ -577,20 +579,22 @@ public void transition(RMContainerImpl container, RMContainerEvent event) { @Override public void transition(RMContainerImpl container, RMContainerEvent event) { RMContainerChangeResourceEvent changeEvent = (RMContainerChangeResourceEvent)event; - - // Register with containerAllocationExpirer. - // For now, we assume timeout for increase is as same as container - // allocation. + + Resource targetResource = changeEvent.getTargetResource(); + Resource lastConfirmedResource = container.lastConfirmedResource; + if (!changeEvent.isIncrease()) { - // if this is a decrease request, if container was increased but not - // told to NM, we can consider previous increase is cancelled, - // unregister from the containerAllocationExpirer - container.containerAllocationExpirer.unregister(container - .getContainerId()); + // Only unregister from the containerAllocationExpirer when target + // resource is less than or equal to the last confirmed resource. + if (Resources.fitsIn(targetResource, lastConfirmedResource)) { + container.lastConfirmedResource = targetResource; + container.containerAllocationExpirer.unregister( + new AllocationExpirationInfo(event.getContainerId())); + } } - - container.container.setResource(changeEvent.getTargetResource()); - + + container.container.setResource(targetResource); + // We reach here means we either allocated increase reservation OR // decreased container, reservation will be cancelled anyway. container.hasIncreaseReservation = false; @@ -662,8 +666,8 @@ private static void updateAttemptMetrics(RMContainerImpl container) { public void transition(RMContainerImpl container, RMContainerEvent event) { // Unregister from containerAllocationExpirer. - container.containerAllocationExpirer.unregister(container - .getContainerId()); + container.containerAllocationExpirer.unregister( + new AllocationExpirationInfo(container.getContainerId())); // Inform node container.eventHandler.handle(new RMNodeCleanContainerEvent( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 433e189..f4e483b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.AllocationExpirationInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; @@ -1306,14 +1307,16 @@ private void handleContainerStatus(List containerStatuses) { launchedContainers.add(containerId); newlyLaunchedContainers.add(remoteContainer); // Unregister from containerAllocationExpirer. - containerAllocationExpirer.unregister(containerId); + containerAllocationExpirer.unregister( + new AllocationExpirationInfo(containerId)); } } else { // A finished container launchedContainers.remove(containerId); completedContainers.add(remoteContainer); // Unregister from containerAllocationExpirer. - containerAllocationExpirer.unregister(containerId); + containerAllocationExpirer.unregister( + new AllocationExpirationInfo(containerId)); } } if (newlyLaunchedContainers.size() != 0 || completedContainers.size() != 0) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 27d4f91..220471b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -289,11 +289,16 @@ protected synchronized void containerIncreasedOnNode(ContainerId containerId, // when rmContainerResource >= nmContainerResource, we won't do anything, // it is possible a container increased is issued by RM, but AM hasn't // told NM. + LOG.debug("Container size reported by NM: " + nmContainerResource + + " is smaller than allocated size in RM: " + rmContainerResource + + ". ContainerID=" + containerId + ". Ignore."); } else if (Resources.fitsIn(getResourceCalculator(), clusterResource, rmContainerResource, nmContainerResource)) { // When rmContainerResource <= nmContainerResource, it could happen when a // container decreased by RM before it is increased in NM. - + LOG.debug("Container size reported by NM: " + nmContainerResource + + " is greater than allocated size in RM: " + rmContainerResource + + ". ContainerID=" + containerId + ". Notify NM to decrease."); // Tell NM to decrease the container this.rmContext.getDispatcher().getEventHandler() .handle(new RMNodeDecreaseContainerEvent(node.getNodeID(), diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index dcb60fc..ee3a3f9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -105,6 +105,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; @@ -1392,11 +1393,15 @@ public void handle(SchedulerEvent event) { ContainerExpiredSchedulerEvent containerExpiredEvent = (ContainerExpiredSchedulerEvent) event; ContainerId containerId = containerExpiredEvent.getContainerId(); - super.completedContainer(getRMContainer(containerId), - SchedulerUtils.createAbnormalContainerStatus( - containerId, - SchedulerUtils.EXPIRED_CONTAINER), - RMContainerEventType.EXPIRE); + if (containerExpiredEvent.isIncrease()) { + rollbackContainerResource(containerId); + } else { + completedContainer(getRMContainer(containerId), + SchedulerUtils.createAbnormalContainerStatus( + containerId, + SchedulerUtils.EXPIRED_CONTAINER), + RMContainerEventType.EXPIRE); + } } break; case KILL_RESERVED_CONTAINER: @@ -1498,7 +1503,33 @@ private synchronized void removeNode(RMNode nodeInfo) { LOG.info("Removed node " + nodeInfo.getNodeAddress() + " clusterResource: " + clusterResource); } - + + private void rollbackContainerResource( + ContainerId containerId) { + RMContainer rmContainer = getRMContainer(containerId); + if (rmContainer == null) { + LOG.info("Cannot rollback resource for container " + containerId + + ". The container does not exist."); + return; + } + FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId); + if (application == null) { + LOG.info("Cannot rollback resource for container " + containerId + + ". The application that the container belongs to does not exist."); + return; + } + LOG.info("Roll back resource for container " + containerId); + LeafQueue leafQueue = (LeafQueue) application.getQueue(); + synchronized(leafQueue) { + SchedulerNode schedulerNode = + getSchedulerNode(rmContainer.getAllocatedNode()); + SchedContainerChangeRequest decreaseRequest = + new SchedContainerChangeRequest(this.rmContext, schedulerNode, + rmContainer, rmContainer.getLastConfirmedResource()); + decreaseContainer(decreaseRequest, application); + } + } + @Lock(CapacityScheduler.class) @Override protected synchronized void completedContainerInternal( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerExpiredSchedulerEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerExpiredSchedulerEvent.java index 4a999c8..c80fc4f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerExpiredSchedulerEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerExpiredSchedulerEvent.java @@ -29,14 +29,24 @@ public class ContainerExpiredSchedulerEvent extends SchedulerEvent { private final ContainerId containerId; - + private final boolean increase; + public ContainerExpiredSchedulerEvent(ContainerId containerId) { + this(containerId, false); + } + + public ContainerExpiredSchedulerEvent( + ContainerId containerId, boolean increase) { super(SchedulerEventType.CONTAINER_EXPIRED); this.containerId = containerId; + this.increase = increase; } public ContainerId getContainerId() { return containerId; } + public boolean isIncrease() { + return increase; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index 4233cd4..4407fe9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -20,12 +20,14 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; @@ -103,6 +105,17 @@ public void containerStatus(ContainerStatus containerStatus) throws Exception { nodeHeartbeat(conts, true); } + public void containerIncreaseStatus(Container container) throws Exception { + Map> conts = new HashMap<>(); + ContainerStatus containerStatus = BuilderUtils.newContainerStatus( + container.getId(), ContainerState.RUNNING, "Success", 0, + container.getResource()); + conts.put(container.getId().getApplicationAttemptId().getApplicationId(), + Collections.singletonList(containerStatus)); + List increasedConts = Collections.singletonList(container); + nodeHeartbeat(conts, increasedConts, true, ++responseId); + } + public RegisterNodeManagerResponse registerNode() throws Exception { return registerNode(null, null); } @@ -159,6 +172,12 @@ public NodeHeartbeatResponse nodeHeartbeat(Map> conts, boolean isHealthy, int resId) throws Exception { + return nodeHeartbeat(conts, new ArrayList(), isHealthy, resId); + } + + public NodeHeartbeatResponse nodeHeartbeat(Map> conts, List increasedConts, + boolean isHealthy, int resId) throws Exception { NodeHeartbeatRequest req = Records.newRecord(NodeHeartbeatRequest.class); NodeStatus status = Records.newRecord(NodeStatus.class); status.setResponseId(resId); @@ -167,6 +186,7 @@ public NodeHeartbeatResponse nodeHeartbeat(Map appAttemptEventHandler = - mock(EventHandler.class); - EventHandler generic = mock(EventHandler.class); - drainDispatcher.register(RMAppAttemptEventType.class, - appAttemptEventHandler); - drainDispatcher.register(RMNodeEventType.class, generic); - drainDispatcher.init(new YarnConfiguration()); - drainDispatcher.start(); - NodeId nodeId = BuilderUtils.newNodeId("host", 3425); - ApplicationId appId = BuilderUtils.newApplicationId(1, 1); - ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( - appId, 1); - ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1); - ContainerAllocationExpirer expirer = mock(ContainerAllocationExpirer.class); - - Resource resource = BuilderUtils.newResource(512, 1); - Priority priority = BuilderUtils.newPriority(5); - - Container container = BuilderUtils.newContainer(containerId, nodeId, - "host:3465", resource, priority, null); - - RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); - SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class); - RMContext rmContext = mock(RMContext.class); - when(rmContext.getDispatcher()).thenReturn(drainDispatcher); - when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer); - when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer); - when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher); - when(rmContext.getYarnConfiguration()).thenReturn(new YarnConfiguration()); - ConcurrentMap apps = - new ConcurrentHashMap(); - apps.put(appId, mock(RMApp.class)); - when(rmContext.getRMApps()).thenReturn(apps); - RMContainer rmContainer = new RMContainerImpl(container, appAttemptId, - nodeId, "user", rmContext); - - assertEquals(RMContainerState.NEW, rmContainer.getState()); - assertEquals(resource, rmContainer.getAllocatedResource()); - assertEquals(nodeId, rmContainer.getAllocatedNode()); - assertEquals(priority, rmContainer.getAllocatedPriority()); - verify(writer).containerStarted(any(RMContainer.class)); - verify(publisher).containerCreated(any(RMContainer.class), anyLong()); - - rmContainer.handle(new RMContainerEvent(containerId, - RMContainerEventType.START)); - drainDispatcher.await(); - assertEquals(RMContainerState.ALLOCATED, rmContainer.getState()); - - rmContainer.handle(new RMContainerEvent(containerId, - RMContainerEventType.ACQUIRED)); - drainDispatcher.await(); - assertEquals(RMContainerState.ACQUIRED, rmContainer.getState()); - - rmContainer.handle(new RMContainerEvent(containerId, - RMContainerEventType.LAUNCHED)); - drainDispatcher.await(); assertEquals(RMContainerState.RUNNING, rmContainer.getState()); - assertEquals( - "http://host:3465/node/containerlogs/container_1_0001_01_000001/user", - rmContainer.getLogURL()); - - // newResource is more than the old resource - Resource newResource = BuilderUtils.newResource(1024, 2); - rmContainer.handle(new RMContainerChangeResourceEvent(containerId, - newResource, true)); - - if (acquired) { - rmContainer - .handle(new RMContainerUpdatesAcquiredEvent(containerId, true)); - drainDispatcher.await(); - // status is still RUNNING since this is a increased container acquired by - // AM - assertEquals(RMContainerState.RUNNING, rmContainer.getState()); - } - - // In RUNNING state. Verify EXPIRE and associated actions. - reset(appAttemptEventHandler); - ContainerStatus containerStatus = SchedulerUtils - .createAbnormalContainerStatus(containerId, - SchedulerUtils.EXPIRED_CONTAINER); - rmContainer.handle(new RMContainerFinishedEvent(containerId, - containerStatus, RMContainerEventType.EXPIRE)); - drainDispatcher.await(); - assertEquals(RMContainerState.EXPIRED, rmContainer.getState()); - - // Container will be finished only when it is acquired by AM after increase, - // we will only notify expirer when it is acquired by AM. - verify(writer, times(1)).containerFinished(any(RMContainer.class)); - verify(publisher, times(1)).containerFinished(any(RMContainer.class), + verify(writer, never()).containerFinished(any(RMContainer.class)); + verify(publisher, never()).containerFinished(any(RMContainer.class), anyLong()); } @Test - public void testExpireAfterContainerResourceIncreased() throws Exception { - // expire after increased and acquired by AM - testExpireAfterIncreased(true); - // expire after increased but not acquired by AM - testExpireAfterIncreased(false); - } - - @Test public void testExistenceOfResourceRequestInRMContainer() throws Exception { Configuration conf = new Configuration(); MockRM rm1 = new MockRM(conf); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java index c08af9d..9e29842 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java @@ -28,7 +28,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; @@ -143,7 +142,8 @@ public RMNodeLabelsManager createNodeLabelManager() { .newInstance(containerId1, Resources.createResource(3 * GB))), null); - FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId()); + FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp( + rm1, app1.getApplicationId()); checkPendingResource(rm1, "default", 2 * GB, null); Assert.assertEquals(2 * GB, @@ -183,7 +183,8 @@ public RMNodeLabelsManager createNodeLabelManager() { // app1 -> a1 RMApp app1 = rm1.submitApp(3 * GB, "app", "user", null, "default"); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); - FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId()); + FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp( + rm1, app1.getApplicationId()); checkUsedResource(rm1, "default", 3 * GB, null); Assert.assertEquals(3 * GB, @@ -242,7 +243,8 @@ public RMNodeLabelsManager createNodeLabelManager() { RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default"); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); - FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId()); + FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp( + rm1, app1.getApplicationId()); // Allocate two more containers am1.allocate( @@ -346,7 +348,8 @@ public RMNodeLabelsManager createNodeLabelManager() { RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default"); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); - FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId()); + FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp( + rm1, app1.getApplicationId()); // Allocate 1 container am1.allocate( @@ -422,7 +425,8 @@ public RMNodeLabelsManager createNodeLabelManager() { RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default"); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); - FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId()); + FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp( + rm1, app1.getApplicationId()); // Allocate two more containers am1.allocate( @@ -532,7 +536,8 @@ public RMNodeLabelsManager createNodeLabelManager() { RMApp app1 = rm1.submitApp(2 * GB, "app", "user", null, "default"); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); - FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId()); + FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp( + rm1, app1.getApplicationId()); // Allocate two more containers am1.allocate( @@ -593,8 +598,8 @@ public RMNodeLabelsManager createNodeLabelManager() { am1.allocate(null, Arrays.asList(containerId2)); // am1 asks to change its AM container from 2G to 1G (decrease) am1.sendContainerResizingRequest(null, Arrays.asList( - ContainerResourceChangeRequest - .newInstance(containerId1, Resources.createResource(1 * GB)))); + ContainerResourceChangeRequest + .newInstance(containerId1, Resources.createResource(1 * GB)))); // Trigger a node heartbeat.. cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); @@ -643,7 +648,8 @@ public RMNodeLabelsManager createNodeLabelManager() { RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default"); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); - FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId()); + FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp( + rm1, app1.getApplicationId()); // Allocate two more containers am1.allocate( @@ -740,7 +746,8 @@ public RMNodeLabelsManager createNodeLabelManager() { RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default"); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); - FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId()); + FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp( + rm1, app1.getApplicationId()); // Allocate two more containers am1.allocate( @@ -862,7 +869,8 @@ public RMNodeLabelsManager createNodeLabelManager() { RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default"); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); - FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId()); + FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp( + rm1, app1.getApplicationId()); ApplicationAttemptId attemptId = am1.getApplicationAttemptId(); // Container 2, 3 (priority=3) @@ -942,7 +950,8 @@ public RMNodeLabelsManager createNodeLabelManager() { RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default"); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); - FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId()); + FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp( + rm1, app1.getApplicationId()); ApplicationAttemptId attemptId = am1.getApplicationAttemptId(); // Container 2, 3 (priority=3) @@ -1021,7 +1030,8 @@ public ResourceScheduler createScheduler() { MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm); // making sure resource is allocated checkUsedResource(rm, "default", 3 * GB, null); - FiCaSchedulerApp app = getFiCaSchedulerApp(rm, app1.getApplicationId()); + FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp( + rm, app1.getApplicationId()); Assert.assertEquals(3 * GB, app.getAppAttemptResourceUsage().getUsed().getMemory()); // making sure container is launched @@ -1113,10 +1123,4 @@ private void verifyAvailableResourceOfSchedulerNode(MockRM rm, NodeId nodeId, Assert .assertEquals(expectedMemory, node.getAvailableResource().getMemory()); } - - private FiCaSchedulerApp getFiCaSchedulerApp(MockRM rm, - ApplicationId appId) { - CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); - return cs.getSchedulerApplications().get(appId).getCurrentAppAttempt(); - } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java new file mode 100644 index 0000000..a703124 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java @@ -0,0 +1,411 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class TestIncreaseAllocationExpirer { + private final int GB = 1024; + private YarnConfiguration conf; + RMNodeLabelsManager mgr; + + @Before + public void setUp() throws Exception { + conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + mgr = new NullRMNodeLabelsManager(); + mgr.init(conf); + } + + @Test + public void testContainerIsRemovedFromAllocationExpirer() + throws Exception { + /** + * 1. Allocate 1 container: containerId2 (1G) + * 2. Increase resource of containerId2: 1G -> 3G + * 3. AM acquires the token + * 4. AM uses the token + * 5. Verify containerId2 is removed from allocation expirer such + * that it still runs fine after allocation expiration interval + */ + // Set the allocation expiration to 5 seconds + conf.setLong( + YarnConfiguration.RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS, 5000); + MockRM rm1 = new MockRM(conf); + rm1.start(); + // Submit an application + MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 20 * GB); + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + // Report AM container status RUNNING to remove it from expirer + nm1.nodeHeartbeat( + app1.getCurrentAppAttempt() + .getAppAttemptId(), 1, ContainerState.RUNNING); + // AM request a new container + am1.allocate("127.0.0.1", 1 * GB, 1, new ArrayList()); + ContainerId containerId2 = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); + rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED); + // AM acquire a new container to start container allocation expirer + List containers = + am1.allocate(null, null).getAllocatedContainers(); + Assert.assertEquals(containerId2, containers.get(0).getId()); + Assert.assertNotNull(containers.get(0).getContainerToken()); + checkUsedResource(rm1, "default", 2 * GB, null); + FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp( + rm1, app1.getApplicationId()); + Assert.assertEquals(2 * GB, + app.getAppAttemptResourceUsage().getUsed().getMemory()); + verifyAvailableResourceOfSchedulerNode(rm1, nm1.getNodeId(), 18 * GB); + // Report container status + nm1.nodeHeartbeat( + app1.getCurrentAppAttempt() + .getAppAttemptId(), 2, ContainerState.RUNNING); + // Wait until container status is RUNNING, and is removed from + // allocation expirer + rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING); + // am1 asks to increase containerId2 from 1GB to 3GB + am1.sendContainerResizingRequest(Collections.singletonList( + ContainerResourceChangeRequest.newInstance( + containerId2, Resources.createResource(3 * GB))), null); + // Kick off scheduling and sleep for 1 second; + nm1.nodeHeartbeat(true); + Thread.sleep(1000); + // Start container increase allocation expirer" + am1.allocate(null, null); + // Remember the resource in order to report status + Resource resource = Resources.clone( + rm1.getResourceScheduler().getRMContainer(containerId2) + .getAllocatedResource()); + nm1.containerIncreaseStatus(getContainer(rm1, containerId2, resource)); + // Wait long enough and verify that the container was removed + // from allocation expirer, and the container is still running + Thread.sleep(10000); + Assert.assertEquals(RMContainerState.RUNNING, + rm1.getResourceScheduler().getRMContainer(containerId2).getState()); + // Verify container size is 3G + Assert.assertEquals( + 3 * GB, rm1.getResourceScheduler().getRMContainer(containerId2) + .getAllocatedResource().getMemory()); + // Verify total resource usage + checkUsedResource(rm1, "default", 4 * GB, null); + Assert.assertEquals(4 * GB, + app.getAppAttemptResourceUsage().getUsed().getMemory()); + // Verify available resource + verifyAvailableResourceOfSchedulerNode(rm1, nm1.getNodeId(), 16 * GB); + rm1.stop(); + } + + @Test + public void testContainerIncreaseAllocationExpiration() + throws Exception { + /** + * 1. Allocate 1 container: containerId2 (1G) + * 2. Increase resource of containerId2: 1G -> 3G + * 3. AM acquires the token + * 4. AM does not use the token + * 5. Verify containerId2's resource usage falls back to + * 1G after the increase token expires + */ + // Set the allocation expiration to 5 seconds + conf.setLong(YarnConfiguration.RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS, 5000); + MockRM rm1 = new MockRM(conf); + rm1.start(); + MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 20 * GB); + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + nm1.nodeHeartbeat( + app1.getCurrentAppAttempt() + .getAppAttemptId(), 1, ContainerState.RUNNING); + am1.allocate("127.0.0.1", 1 * GB, 1, new ArrayList()); + ContainerId containerId2 = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); + rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED); + List containers = + am1.allocate(null, null).getAllocatedContainers(); + Assert.assertEquals(containerId2, containers.get(0).getId()); + Assert.assertNotNull(containers.get(0).getContainerToken()); + checkUsedResource(rm1, "default", 2 * GB, null); + FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp( + rm1, app1.getApplicationId()); + Assert.assertEquals(2 * GB, + app.getAppAttemptResourceUsage().getUsed().getMemory()); + verifyAvailableResourceOfSchedulerNode(rm1, nm1.getNodeId(), 18 * GB); + nm1.nodeHeartbeat( + app1.getCurrentAppAttempt() + .getAppAttemptId(), 2, ContainerState.RUNNING); + rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING); + // am1 asks to increase containerId2 from 1GB to 3GB + am1.sendContainerResizingRequest(Collections.singletonList( + ContainerResourceChangeRequest.newInstance( + containerId2, Resources.createResource(3 * GB))), null); + // Kick off scheduling and wait for 1 second; + nm1.nodeHeartbeat(true); + Thread.sleep(1000); + // Start container increase allocation expirer + am1.allocate(null, null); + // Verify resource usage + checkUsedResource(rm1, "default", 4 * GB, null); + Assert.assertEquals(4 * GB, + app.getAppAttemptResourceUsage().getUsed().getMemory()); + verifyAvailableResourceOfSchedulerNode(rm1, nm1.getNodeId(), 16 * GB); + // Wait long enough for the increase token to expire, and for the roll + // back action to complete + Thread.sleep(10000); + // Verify container size is 1G + Assert.assertEquals( + 1 * GB, rm1.getResourceScheduler().getRMContainer(containerId2) + .getAllocatedResource().getMemory()); + // Verify total resource usage is 2G + checkUsedResource(rm1, "default", 2 * GB, null); + Assert.assertEquals(2 * GB, + app.getAppAttemptResourceUsage().getUsed().getMemory()); + // Verify available resource is rolled back to 18GB + verifyAvailableResourceOfSchedulerNode(rm1, nm1.getNodeId(), 18 * GB); + rm1.stop(); + } + + @Test + public void testConsecutiveContainerIncreaseAllocationExpiration() + throws Exception { + /** + * 1. Allocate 1 container: containerId2 (1G) + * 2. Increase resource of containerId2: 1G -> 3G + * 3. AM acquires the token + * 4. Increase resource of containerId2 again: 3G -> 6G + * 5. AM acquires the token + * 6. AM uses the 1st token to increase the container in NM to 3G + * 7. AM does NOT use the second token + * 8. Verify containerId2 eventually uses 1G after token expires + * 9. Verify NM receives the decrease message to decrease to 1G + */ + // Set the allocation expiration to 5 seconds + conf.setLong(YarnConfiguration.RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS, 5000); + MockRM rm1 = new MockRM(conf); + rm1.start(); + // Submit an application + MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 20 * GB); + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + nm1.nodeHeartbeat( + app1.getCurrentAppAttempt() + .getAppAttemptId(), 1, ContainerState.RUNNING); + // AM request a new container + am1.allocate("127.0.0.1", 1 * GB, 1, new ArrayList()); + ContainerId containerId2 = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); + rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED); + // AM acquire a new container to start container allocation expirer + am1.allocate(null, null).getAllocatedContainers(); + // Report container status + nm1.nodeHeartbeat( + app1.getCurrentAppAttempt() + .getAppAttemptId(), 2, ContainerState.RUNNING); + // Wait until container status is RUNNING, and is removed from + // allocation expirer + rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING); + // am1 asks to change containerId2 from 1GB to 3GB + am1.sendContainerResizingRequest(Collections.singletonList( + ContainerResourceChangeRequest.newInstance( + containerId2, Resources.createResource(3 * GB))), null); + // Kick off scheduling and sleep for 1 second to + // make sure the allocation is done + nm1.nodeHeartbeat(true); + Thread.sleep(1000); + // Start container increase allocation expirer + am1.allocate(null, null); + // Remember the resource in order to report status + Resource resource = Resources.clone( + rm1.getResourceScheduler().getRMContainer(containerId2) + .getAllocatedResource()); + // am1 asks to change containerId2 from 3GB to 6GB + am1.sendContainerResizingRequest(Collections.singletonList( + ContainerResourceChangeRequest.newInstance( + containerId2, Resources.createResource(6 * GB))), null); + // Kick off scheduling and sleep for 1 second to + // make sure the allocation is done + nm1.nodeHeartbeat(true); + Thread.sleep(1000); + // Reset container increase allocation expirer + am1.allocate(null, null); + checkUsedResource(rm1, "default", 7 * GB, null); + FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp( + rm1, app1.getApplicationId()); + Assert.assertEquals(7 * GB, + app.getAppAttemptResourceUsage().getUsed().getMemory()); + // Verify available resource is now reduced to 13GB + verifyAvailableResourceOfSchedulerNode(rm1, nm1.getNodeId(), 13 * GB); + // Use the first token + nm1.containerIncreaseStatus(getContainer(rm1, containerId2, resource)); + // Wait long enough for the second token to expire, and verify that + // the roll back action is completed + Thread.sleep(10000); + // Verify container size is 1G + Assert.assertEquals( + 1 * GB, rm1.getResourceScheduler().getRMContainer(containerId2) + .getAllocatedResource().getMemory()); + // Verify total resource usage is 2G + checkUsedResource(rm1, "default", 2 * GB, null); + Assert.assertEquals(2 * GB, + app.getAppAttemptResourceUsage().getUsed().getMemory()); + // Verify available resource is rolled back to 18GB + verifyAvailableResourceOfSchedulerNode(rm1, nm1.getNodeId(), 18 * GB); + // Verify NM receives the decrease message + List containersToDecrease = + nm1.nodeHeartbeat(true).getContainersToDecrease(); + Assert.assertEquals(1, containersToDecrease.size()); + Assert.assertEquals( + 1 * GB, containersToDecrease.get(0).getResource().getMemory()); + rm1.stop(); + } + + @Test + public void testDecreaseAfterIncreaseAllocationExpiration() + throws Exception { + /** + * 1. Allocate two containers: containerId2 (3G), containerId3 (3G) + * 2. Increase resource of containerId2: 3G -> 6G + * 3. Increase resource of containerId3: 3G -> 6G + * 4. Do NOT use the increase tokens + * 5. Decrease containerId2: 6G -> 2G (i.e., below last confirmed resource) + * 6. Decrease containerId3: 6G -> 4G (i.e., above last confirmed resource) + * 7. Verify containerId2 eventually uses 2G (removed from expirer) + * 8. verify containerId3 eventually uses 3G (increase token expires) + */ + // Set the allocation expiration to 5 seconds + conf.setLong(YarnConfiguration.RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS, 5000); + MockRM rm1 = new MockRM(conf); + rm1.start(); + // Submit an application + MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 20 * GB); + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + nm1.nodeHeartbeat( + app1.getCurrentAppAttempt() + .getAppAttemptId(), 1, ContainerState.RUNNING); + // AM request two new continers + am1.allocate("127.0.0.1", 3 * GB, 2, new ArrayList()); + ContainerId containerId2 = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); + rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED); + ContainerId containerId3 = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 3); + rm1.waitForState(nm1, containerId3, RMContainerState.ALLOCATED); + // AM acquires tokens to start container allocation expirer + List containers = + am1.allocate(null, null).getAllocatedContainers(); + Assert.assertEquals(2, containers.size()); + Assert.assertNotNull(containers.get(0).getContainerToken()); + Assert.assertNotNull(containers.get(1).getContainerToken()); + // Report container status + nm1.nodeHeartbeat(app1.getCurrentAppAttempt().getAppAttemptId(), + 2, ContainerState.RUNNING); + nm1.nodeHeartbeat(app1.getCurrentAppAttempt().getAppAttemptId(), + 3, ContainerState.RUNNING); + // Wait until container status becomes RUNNING + rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING); + rm1.waitForState(nm1, containerId3, RMContainerState.RUNNING); + // am1 asks to change containerId2 and containerId3 from 1GB to 3GB + List increaseRequests = new ArrayList<>(); + increaseRequests.add(ContainerResourceChangeRequest.newInstance( + containerId2, Resources.createResource(6 * GB))); + increaseRequests.add(ContainerResourceChangeRequest.newInstance( + containerId3, Resources.createResource(6 * GB))); + am1.sendContainerResizingRequest(increaseRequests, null); + nm1.nodeHeartbeat(true); + Thread.sleep(1000); + // Start container increase allocation expirer + am1.allocate(null, null); + // Decrease containers + List decreaseRequests = new ArrayList<>(); + decreaseRequests.add(ContainerResourceChangeRequest.newInstance( + containerId2, Resources.createResource(2 * GB))); + decreaseRequests.add(ContainerResourceChangeRequest.newInstance( + containerId3, Resources.createResource(4 * GB))); + AllocateResponse response = + am1.sendContainerResizingRequest(null, decreaseRequests); + // Verify containers are decreased in scheduler + Assert.assertEquals(2, response.getDecreasedContainers().size()); + // Wait for containerId3 token to expire, + Thread.sleep(10000); + Assert.assertEquals( + 2 * GB, rm1.getResourceScheduler().getRMContainer(containerId2) + .getAllocatedResource().getMemory()); + Assert.assertEquals( + 3 * GB, rm1.getResourceScheduler().getRMContainer(containerId3) + .getAllocatedResource().getMemory()); + rm1.stop(); + } + + private void checkUsedResource(MockRM rm, String queueName, int memory, + String label) { + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + CSQueue queue = cs.getQueue(queueName); + Assert.assertEquals(memory, + queue.getQueueResourceUsage() + .getUsed(label == null ? RMNodeLabelsManager.NO_LABEL : label) + .getMemory()); + } + + private void verifyAvailableResourceOfSchedulerNode(MockRM rm, NodeId nodeId, + int expectedMemory) { + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + SchedulerNode node = cs.getNode(nodeId); + Assert + .assertEquals(expectedMemory, node.getAvailableResource().getMemory()); + } + + private Container getContainer( + MockRM rm, ContainerId containerId, Resource resource) { + RMContainer rmContainer = rm.getResourceScheduler() + .getRMContainer(containerId); + return Container.newInstance( + containerId, rmContainer.getAllocatedNode(), null, + resource, null, null); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java index 489ef77..1786069 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java @@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; @@ -91,10 +92,10 @@ public EventHandler getEventHandler() { } }; - // No op - ContainerAllocationExpirer cae = + // No op + ContainerAllocationExpirer cae = new ContainerAllocationExpirer(nullDispatcher); - + Configuration conf = new Configuration(); RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); RMContextImpl rmContext = @@ -122,7 +123,7 @@ public Resource answer(InvocationOnMock invocation) throws Throwable { return (Resource) args[1]; } }); - + rmContext.setNodeLabelManager(nlm); rmContext.setSystemMetricsPublisher(mock(SystemMetricsPublisher.class)); rmContext.setRMApplicationHistoryWriter(mock(RMApplicationHistoryWriter.class)); @@ -349,4 +350,10 @@ public static Configuration getConfigurationWithDefaultQueueLabels( conf.setDefaultNodeLabelExpression(B, "y"); return conf; } + + public static FiCaSchedulerApp getFiCaSchedulerApp(MockRM rm, + ApplicationId appId) { + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + return cs.getSchedulerApplications().get(appId).getCurrentAppAttempt(); + } }