diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/AllocationExpirationInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/AllocationExpirationInfo.java new file mode 100644 index 0000000..9afe16b --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/AllocationExpirationInfo.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer; + + +import org.apache.hadoop.yarn.api.records.ContainerId; + +public class AllocationExpirationInfo implements + Comparable { + + private final ContainerId containerId; + private final boolean increase; + + public AllocationExpirationInfo(ContainerId containerId) { + this(containerId, false); + } + + public AllocationExpirationInfo( + ContainerId containerId, boolean increase) { + this.containerId = containerId; + this.increase = increase; + } + + public ContainerId getContainerId() { + return this.containerId; + } + + public boolean isIncrease() { + return this.increase; + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof AllocationExpirationInfo)) { + return false; + } + return compareTo((AllocationExpirationInfo)other) == 0; + } + + @Override + public int compareTo(AllocationExpirationInfo other) { + if (other == null) { + return -1; + } + // Only need to compare containerId. + return getContainerId().compareTo(other.getContainerId()); + } + + @Override + public String toString() { + return ""; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/ContainerAllocationExpirer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/ContainerAllocationExpirer.java index 1bd64b4..8977e55 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/ContainerAllocationExpirer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/ContainerAllocationExpirer.java @@ -19,7 +19,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; @@ -29,7 +28,7 @@ @SuppressWarnings({"unchecked", "rawtypes"}) public class ContainerAllocationExpirer extends - AbstractLivelinessMonitor { + AbstractLivelinessMonitor { private EventHandler dispatcher; @@ -48,7 +47,9 @@ public void serviceInit(Configuration conf) throws Exception { } @Override - protected void expire(ContainerId containerId) { - dispatcher.handle(new ContainerExpiredSchedulerEvent(containerId)); + protected void expire(AllocationExpirationInfo allocationExpirationInfo) { + dispatcher.handle(new ContainerExpiredSchedulerEvent( + allocationExpirationInfo.getContainerId(), + allocationExpirationInfo.isIncrease())); } } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java index dc0d9ba..5d26931 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java @@ -57,6 +57,8 @@ Resource getAllocatedResource(); + Resource getLastConfirmedResource(); + NodeId getAllocatedNode(); Priority getAllocatedPriority(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerChangeResourceEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerChangeResourceEvent.java index 920cfdb..888ac5f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerChangeResourceEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerChangeResourceEvent.java @@ -22,7 +22,7 @@ import org.apache.hadoop.yarn.api.records.Resource; public class RMContainerChangeResourceEvent extends RMContainerEvent { - + final Resource targetResource; final boolean increase; @@ -33,11 +33,11 @@ public RMContainerChangeResourceEvent(ContainerId containerId, this.targetResource = targetResource; this.increase = increase; } - + public Resource getTargetResource() { return targetResource; } - + public boolean isIncrease() { return increase; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index 8133657..e56bef3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -119,9 +119,8 @@ RMContainerEventType.RELEASED, new KillTransition()) .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING, RMContainerEventType.RESERVED, new ContainerReservedTransition()) - .addTransition(RMContainerState.RUNNING, RMContainerState.EXPIRED, - RMContainerEventType.EXPIRE, - new ContainerExpiredWhileRunningTransition()) + .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING, + RMContainerEventType.EXPIRE) .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING, RMContainerEventType.CHANGE_RESOURCE, new ChangeResourceTransition()) .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING, @@ -177,6 +176,9 @@ private List resourceRequests; private volatile boolean hasIncreaseReservation = false; + // Only used for container resource increase and decrease. This is the + // resource to rollback to should container resource increase token expires. + private Resource lastConfirmedResource; public RMContainerImpl(Container container, ApplicationAttemptId appAttemptId, NodeId nodeId, String user, @@ -210,6 +212,7 @@ public RMContainerImpl(Container container, this.isAMContainer = false; this.resourceRequests = null; this.nodeLabelExpression = nodeLabelExpression; + this.lastConfirmedResource = container.getResource(); ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); this.readLock = lock.readLock(); @@ -284,6 +287,16 @@ public Resource getAllocatedResource() { } @Override + public Resource getLastConfirmedResource() { + try { + readLock.lock(); + return this.lastConfirmedResource; + } finally { + readLock.unlock(); + } + } + + @Override public NodeId getAllocatedNode() { return container.getNodeId(); } @@ -524,7 +537,8 @@ public void transition(RMContainerImpl container, RMContainerEvent event) { container.setResourceRequests(null); // Register with containerAllocationExpirer. - container.containerAllocationExpirer.register(container.getContainerId()); + container.containerAllocationExpirer.register( + new AllocationExpirationInfo(container.getContainerId())); // Tell the app container.eventHandler.handle(new RMAppRunningOnNodeEvent(container @@ -542,7 +556,8 @@ public void transition(RMContainerImpl container, RMContainerEvent event) { if (acquiredEvent.isIncreasedContainer()) { // If container is increased but not acquired by AM, we will start // containerAllocationExpirer for this container in this transition. - container.containerAllocationExpirer.register(event.getContainerId()); + container.containerAllocationExpirer.register( + new AllocationExpirationInfo(event.getContainerId(), true)); } } } @@ -552,22 +567,10 @@ public void transition(RMContainerImpl container, RMContainerEvent event) { @Override public void transition(RMContainerImpl container, RMContainerEvent event) { - // Unregister the allocation expirer, it is already increased.. - container.containerAllocationExpirer.unregister(event.getContainerId()); - } - } - - private static final class ContainerExpiredWhileRunningTransition extends - BaseTransition { - - @Override - public void transition(RMContainerImpl container, RMContainerEvent event) { - // When the container expired, and it has a pending increased request, we - // will kill the container. - // TODO, we can do better for this: roll back container resource to the - // resource before increase, and notify scheduler about this decrease as - // well. Will do that in a separated JIRA. - new KillTransition().transition(container, event); + // Unregister the allocation expirer, it is already increased. + container.lastConfirmedResource = container.getAllocatedResource(); + container.containerAllocationExpirer.unregister( + new AllocationExpirationInfo(event.getContainerId())); } } @@ -576,19 +579,25 @@ public void transition(RMContainerImpl container, RMContainerEvent event) { @Override public void transition(RMContainerImpl container, RMContainerEvent event) { RMContainerChangeResourceEvent changeEvent = (RMContainerChangeResourceEvent)event; - - // Register with containerAllocationExpirer. - // For now, we assume timeout for increase is as same as container - // allocation. + + Resource targetResource = changeEvent.getTargetResource(); + Resource lastConfirmedResource = container.lastConfirmedResource; + if (!changeEvent.isIncrease()) { // if this is a decrease request, if container was increased but not - // told to NM, we can consider previous increase is cancelled, - // unregister from the containerAllocationExpirer - container.containerAllocationExpirer.unregister(container - .getContainerId()); + // told to NM, we can consider previous increase is cancelled. + // unregister from the containerAllocationExpirer only when target + // resource is less than or equal to the last confirmed resource. + if (targetResource.getMemory() <= lastConfirmedResource.getMemory() + && targetResource.getVirtualCores() <= lastConfirmedResource + .getVirtualCores()) { + container.lastConfirmedResource = targetResource; + container.containerAllocationExpirer.unregister( + new AllocationExpirationInfo(event.getContainerId())); + } } - - container.container.setResource(changeEvent.getTargetResource()); + + container.container.setResource(targetResource); // We reach here means we either allocated increase reservation OR // decreased container, reservation will be cancelled anyway. @@ -672,8 +681,8 @@ private static void updateAttemptMetrics(RMContainerImpl container) { public void transition(RMContainerImpl container, RMContainerEvent event) { // Unregister from containerAllocationExpirer. - container.containerAllocationExpirer.unregister(container - .getContainerId()); + container.containerAllocationExpirer.unregister( + new AllocationExpirationInfo(container.getContainerId())); // Inform node container.eventHandler.handle(new RMNodeCleanContainerEvent( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 7a43598..ca49d5c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.AllocationExpirationInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; @@ -1038,14 +1039,16 @@ private void handleContainerStatus(List containerStatuses) { launchedContainers.add(containerId); newlyLaunchedContainers.add(remoteContainer); // Unregister from containerAllocationExpirer. - containerAllocationExpirer.unregister(containerId); + containerAllocationExpirer.unregister( + new AllocationExpirationInfo(containerId)); } } else { // A finished container launchedContainers.remove(containerId); completedContainers.add(remoteContainer); // Unregister from containerAllocationExpirer. - containerAllocationExpirer.unregister(containerId); + containerAllocationExpirer.unregister( + new AllocationExpirationInfo(containerId)); } } if (newlyLaunchedContainers.size() != 0 || completedContainers.size() != 0) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index cf7f6c0..2f5a238 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -102,6 +102,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping.MappingType; @@ -1365,11 +1366,15 @@ public void handle(SchedulerEvent event) { ContainerExpiredSchedulerEvent containerExpiredEvent = (ContainerExpiredSchedulerEvent) event; ContainerId containerId = containerExpiredEvent.getContainerId(); - completedContainer(getRMContainer(containerId), - SchedulerUtils.createAbnormalContainerStatus( - containerId, - SchedulerUtils.EXPIRED_CONTAINER), - RMContainerEventType.EXPIRE); + if (containerExpiredEvent.isIncrease()) { + rollbackContainerResource(containerId); + } else { + completedContainer(getRMContainer(containerId), + SchedulerUtils.createAbnormalContainerStatus( + containerId, + SchedulerUtils.EXPIRED_CONTAINER), + RMContainerEventType.EXPIRE); + } } break; case DROP_RESERVATION: @@ -1465,10 +1470,10 @@ private synchronized void removeNode(RMNode nodeInfo) { // Remove reservations, if any RMContainer reservedContainer = node.getReservedContainer(); if (reservedContainer != null) { - completedContainer(reservedContainer, + completedContainer(reservedContainer, SchedulerUtils.createAbnormalContainerStatus( - reservedContainer.getContainerId(), - SchedulerUtils.LOST_CONTAINER), + reservedContainer.getContainerId(), + SchedulerUtils.LOST_CONTAINER), RMContainerEventType.KILL); } @@ -1478,7 +1483,24 @@ private synchronized void removeNode(RMNode nodeInfo) { LOG.info("Removed node " + nodeInfo.getNodeAddress() + " clusterResource: " + clusterResource); } - + + private void rollbackContainerResource(ContainerId containerId) { + RMContainer rmContainer = getRMContainer(containerId); + if (rmContainer == null) { + LOG.info("Cannot rollback resource for container " + containerId + + ". The container does not exist."); + return; + } + SchedulerNode schedulerNode = + getSchedulerNode(rmContainer.getAllocatedNode()); + SchedContainerChangeRequest decreaseRequest = + new SchedContainerChangeRequest( + schedulerNode, rmContainer, + rmContainer.getLastConfirmedResource()); + decreaseContainer(decreaseRequest, + getCurrentAttemptForContainer(containerId)); + } + @Lock(CapacityScheduler.class) @Override protected synchronized void completedContainer(RMContainer rmContainer, @@ -1547,7 +1569,7 @@ protected synchronized void decreaseContainer( // Save resource before decrease Resource resourceBeforeDecrease = - Resources.clone(rmContainer.getContainer().getResource()); + Resources.clone(rmContainer.getAllocatedResource()); FiCaSchedulerApp app = (FiCaSchedulerApp)attempt; LeafQueue queue = (LeafQueue) attempt.getQueue(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerExpiredSchedulerEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerExpiredSchedulerEvent.java index 4a999c8..c80fc4f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerExpiredSchedulerEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerExpiredSchedulerEvent.java @@ -29,14 +29,24 @@ public class ContainerExpiredSchedulerEvent extends SchedulerEvent { private final ContainerId containerId; - + private final boolean increase; + public ContainerExpiredSchedulerEvent(ContainerId containerId) { + this(containerId, false); + } + + public ContainerExpiredSchedulerEvent( + ContainerId containerId, boolean increase) { super(SchedulerEventType.CONTAINER_EXPIRED); this.containerId = containerId; + this.increase = increase; } public ContainerId getContainerId() { return containerId; } + public boolean isIncrease() { + return increase; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index 4964c59..b9eef1d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.event.InlineDispatcher; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.AllocationExpirationInfo; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent; @@ -724,10 +725,14 @@ public void testContainerExpire() throws Exception { ApplicationAttemptId.newInstance(appId, 1); ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1L); ContainerId containerId2 = ContainerId.newContainerId(appAttemptId, 2L); - mockExpirer.register(containerId1); - mockExpirer.register(containerId2); - verify(mockExpirer).register(containerId1); - verify(mockExpirer).register(containerId2); + AllocationExpirationInfo expirationInfo1 = + new AllocationExpirationInfo(containerId1); + AllocationExpirationInfo expirationInfo2 = + new AllocationExpirationInfo(containerId2); + mockExpirer.register(expirationInfo1); + mockExpirer.register(expirationInfo2); + verify(mockExpirer).register(expirationInfo1); + verify(mockExpirer).register(expirationInfo2); ((RMContextImpl) rmContext).setContainerAllocationExpirer(mockExpirer); RMNodeImpl rmNode = getRunningNode(); ContainerStatus status1 = @@ -741,7 +746,7 @@ public void testContainerExpire() throws Exception { statusList.add(status2); RMNodeStatusEvent statusEvent = getMockRMNodeStatusEvent(statusList); rmNode.handle(statusEvent); - verify(mockExpirer).unregister(containerId1); - verify(mockExpirer).unregister(containerId2); + verify(mockExpirer).unregister(expirationInfo1); + verify(mockExpirer).unregister(expirationInfo2); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java index 415e891..035a9d2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java @@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; @@ -239,120 +240,13 @@ public void testExpireWhileRunning() { rmContainer.handle(new RMContainerFinishedEvent(containerId, containerStatus, RMContainerEventType.EXPIRE)); drainDispatcher.await(); - assertEquals(RMContainerState.EXPIRED, rmContainer.getState()); - verify(writer, times(1)).containerFinished(any(RMContainer.class)); - verify(publisher, times(1)).containerFinished(any(RMContainer.class), - anyLong()); - } - - private void testExpireAfterIncreased(boolean acquired) { - /* - * Similar to previous test, a container is increased but not acquired by - * AM. In this case, if a container is expired, the container should be - * finished. - */ - DrainDispatcher drainDispatcher = new DrainDispatcher(); - EventHandler appAttemptEventHandler = - mock(EventHandler.class); - EventHandler generic = mock(EventHandler.class); - drainDispatcher.register(RMAppAttemptEventType.class, - appAttemptEventHandler); - drainDispatcher.register(RMNodeEventType.class, generic); - drainDispatcher.init(new YarnConfiguration()); - drainDispatcher.start(); - NodeId nodeId = BuilderUtils.newNodeId("host", 3425); - ApplicationId appId = BuilderUtils.newApplicationId(1, 1); - ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( - appId, 1); - ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1); - ContainerAllocationExpirer expirer = mock(ContainerAllocationExpirer.class); - - Resource resource = BuilderUtils.newResource(512, 1); - Priority priority = BuilderUtils.newPriority(5); - - Container container = BuilderUtils.newContainer(containerId, nodeId, - "host:3465", resource, priority, null); - - RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); - SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class); - RMContext rmContext = mock(RMContext.class); - when(rmContext.getDispatcher()).thenReturn(drainDispatcher); - when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer); - when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer); - when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher); - when(rmContext.getYarnConfiguration()).thenReturn(new YarnConfiguration()); - ConcurrentMap apps = - new ConcurrentHashMap(); - apps.put(appId, mock(RMApp.class)); - when(rmContext.getRMApps()).thenReturn(apps); - RMContainer rmContainer = new RMContainerImpl(container, appAttemptId, - nodeId, "user", rmContext); - - assertEquals(RMContainerState.NEW, rmContainer.getState()); - assertEquals(resource, rmContainer.getAllocatedResource()); - assertEquals(nodeId, rmContainer.getAllocatedNode()); - assertEquals(priority, rmContainer.getAllocatedPriority()); - verify(writer).containerStarted(any(RMContainer.class)); - verify(publisher).containerCreated(any(RMContainer.class), anyLong()); - - rmContainer.handle(new RMContainerEvent(containerId, - RMContainerEventType.START)); - drainDispatcher.await(); - assertEquals(RMContainerState.ALLOCATED, rmContainer.getState()); - - rmContainer.handle(new RMContainerEvent(containerId, - RMContainerEventType.ACQUIRED)); - drainDispatcher.await(); - assertEquals(RMContainerState.ACQUIRED, rmContainer.getState()); - - rmContainer.handle(new RMContainerEvent(containerId, - RMContainerEventType.LAUNCHED)); - drainDispatcher.await(); assertEquals(RMContainerState.RUNNING, rmContainer.getState()); - assertEquals( - "http://host:3465/node/containerlogs/container_1_0001_01_000001/user", - rmContainer.getLogURL()); - - // newResource is more than the old resource - Resource newResource = BuilderUtils.newResource(1024, 2); - rmContainer.handle(new RMContainerChangeResourceEvent(containerId, - newResource, true)); - - if (acquired) { - rmContainer - .handle(new RMContainerUpdatesAcquiredEvent(containerId, true)); - drainDispatcher.await(); - // status is still RUNNING since this is a increased container acquired by - // AM - assertEquals(RMContainerState.RUNNING, rmContainer.getState()); - } - - // In RUNNING state. Verify EXPIRE and associated actions. - reset(appAttemptEventHandler); - ContainerStatus containerStatus = SchedulerUtils - .createAbnormalContainerStatus(containerId, - SchedulerUtils.EXPIRED_CONTAINER); - rmContainer.handle(new RMContainerFinishedEvent(containerId, - containerStatus, RMContainerEventType.EXPIRE)); - drainDispatcher.await(); - assertEquals(RMContainerState.EXPIRED, rmContainer.getState()); - - // Container will be finished only when it is acquired by AM after increase, - // we will only notify expirer when it is acquired by AM. - verify(writer, times(1)).containerFinished(any(RMContainer.class)); - verify(publisher, times(1)).containerFinished(any(RMContainer.class), + verify(writer, never()).containerFinished(any(RMContainer.class)); + verify(publisher, never()).containerFinished(any(RMContainer.class), anyLong()); } @Test - public void testExpireAfterContainerResourceIncreased() throws Exception { - // expire after increased and acquired by AM - testExpireAfterIncreased(true); - // expire after increased but not acquired by AM - testExpireAfterIncreased(false); - } - - @Test public void testExistenceOfResourceRequestInRMContainer() throws Exception { Configuration conf = new Configuration(); MockRM rm1 = new MockRM(conf);