Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java (revision 1606717) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java (working copy) @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer; +import java.util.List; + import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -26,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.event.EventHandler; /** @@ -71,5 +74,7 @@ ContainerState getContainerState(); ContainerReport createContainerReport(); - + + List getResourceRequests(); + } Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java (revision 1606717) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java (working copy) @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer; import java.util.EnumSet; +import java.util.List; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; @@ -34,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -155,17 +157,18 @@ private long creationTime; private long finishTime; private ContainerStatus finishedStatus; + private List resourceRequests; public RMContainerImpl(Container container, ApplicationAttemptId appAttemptId, NodeId nodeId, String user, - RMContext rmContext) { + RMContext rmContext, List resourceRequests) { this(container, appAttemptId, nodeId, user, rmContext, System - .currentTimeMillis()); + .currentTimeMillis(), resourceRequests); } public RMContainerImpl(Container container, - ApplicationAttemptId appAttemptId, NodeId nodeId, - String user, RMContext rmContext, long creationTime) { + ApplicationAttemptId appAttemptId, NodeId nodeId, String user, + RMContext rmContext, long creationTime, List resourceRequests) { this.stateMachine = stateMachineFactory.make(this); this.containerId = container.getId(); this.nodeId = nodeId; @@ -176,6 +179,7 @@ this.rmContext = rmContext; this.eventHandler = rmContext.getDispatcher().getEventHandler(); this.containerAllocationExpirer = rmContext.getContainerAllocationExpirer(); + this.resourceRequests = resourceRequests; ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); this.readLock = lock.readLock(); @@ -307,6 +311,20 @@ readLock.unlock(); } } + + @Override + public List getResourceRequests() { + try { + readLock.lock(); + return resourceRequests; + } finally { + readLock.unlock(); + } + } + + public void clearResourceRequest() { + this.resourceRequests = null; + } @Override public String toString() { @@ -409,6 +427,9 @@ @Override public void transition(RMContainerImpl container, RMContainerEvent event) { + // Clear ResourceRequest stored in RMContainer + container.clearResourceRequest(); + // Register with containerAllocationExpirer. container.containerAllocationExpirer.register(container.getContainerId()); Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java (revision 1606717) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java (working copy) @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -256,10 +257,26 @@ RMContainer rmContainer = new RMContainerImpl(container, attemptId, node.getNodeID(), applications.get(attemptId.getApplicationId()).getUser(), rmContext, - status.getCreationTime()); + status.getCreationTime(), null); return rmContainer; } + + public void recoverResourceRequest(RMContainer rmContainer) { + List requests = rmContainer.getResourceRequests(); + // If container state is moved to ACQUIRED, request will be null. + if (requests == null) { + return; + } + // Add resource request back to Scheduler. + ApplicationId appId = rmContainer.getContainerId() + .getApplicationAttemptId().getApplicationId(); + SchedulerApplication schedulerApp = applications.get(appId); + SchedulerApplicationAttempt schedulerAttempt = schedulerApp + .getCurrentAppAttempt(); + schedulerAttempt.recoverResourceRequests(requests); + } + public SchedulerNode getSchedulerNode(NodeId nodeId) { return nodes.get(nodeId); } Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java (revision 1606717) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java (working copy) @@ -127,9 +127,10 @@ * by the application. * * @param requests resources to be acquired + * @param recoverPreemptedRequest recover Resource Request on preemption */ synchronized public void updateResourceRequests( - List requests) { + List requests, boolean recoverPreemptedRequest) { QueueMetrics metrics = queue.getMetrics(); // Update resource requests @@ -163,11 +164,21 @@ asks = new HashMap(); this.requests.put(priority, asks); this.priorities.add(priority); - } else if (updatePendingResources) { - lastRequest = asks.get(resourceName); } + lastRequest = asks.get(resourceName); + if (recoverPreemptedRequest) { + if (lastRequest != null) { + // Increment the number of containers to 1, as it is recovering a + // single container. + request.setNumContainers(lastRequest.getNumContainers() + 1); + } else { + // Set number of container to 1 if the entry is not found. + request.setNumContainers(1); + } + } asks.put(resourceName, request); + if (updatePendingResources) { // Similarly, deactivate application? @@ -239,6 +250,28 @@ return blacklist.contains(resourceName); } + synchronized public List getPerAskResourceRequests( + NodeType type, Priority priority, String hostName, String rackName) { + List list = new ArrayList(); + Map nodeRequests = requests.get(priority); + if (nodeRequests == null) { + return null; + } + + // Get NodeLocal resource request and add to list + if (type.equals(NodeType.NODE_LOCAL)) { + list.add(nodeRequests.get(hostName)); + } + // Include rack resource requests for both NodeLocal and RackLocal types + if (type.equals(NodeType.NODE_LOCAL) || type.equals(NodeType.RACK_LOCAL)) { + list.add(nodeRequests.get(rackName)); + } + + // For all cases, add OffRack resource request. + list.add(nodeRequests.get(ResourceRequest.ANY)); + return list; + } + /** * Resources have been allocated to this application by the resource * scheduler. Track them. Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (revision 1606717) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (working copy) @@ -1089,6 +1089,7 @@ if (LOG.isDebugEnabled()) { LOG.debug("KILL_CONTAINER: container" + cont.toString()); } + recoverResourceRequest(cont); completedContainer(cont, SchedulerUtils.createPreemptedContainerStatus( cont.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER), RMContainerEventType.KILL); Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java (revision 1606717) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java (working copy) @@ -20,6 +20,7 @@ import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; @@ -77,6 +78,9 @@ if (null == liveContainers.remove(rmContainer.getContainerId())) { return false; } + + // Remove from the list of newly allocated containers if found + newlyAllocatedContainers.remove(rmContainer); Container container = rmContainer.getContainer(); ContainerId containerId = container.getId(); @@ -119,10 +123,13 @@ return null; } + List list = appSchedulingInfo.getPerAskResourceRequests( + type, priority, request.getResourceName(), node.getRackName()); + // Create RMContainer RMContainer rmContainer = new RMContainerImpl(container, this .getApplicationAttemptId(), node.getNodeID(), - appSchedulingInfo.getUser(), this.rmContext); + appSchedulingInfo.getUser(), this.rmContext, list); // Add it to allContainers list. newlyAllocatedContainers.add(rmContainer); Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (revision 1606717) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (working copy) @@ -426,6 +426,7 @@ SchedulerUtils.createPreemptedContainerStatus( container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER); + recoverResourceRequest(container); // TODO: Not sure if this ever actually adds this to the list of cleanup // containers on the RMNode (see SchedulerNode.releaseContainer()). completedContainer(container, status, RMContainerEventType.KILL); Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java (revision 1606717) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java (working copy) @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; @@ -271,10 +272,13 @@ return null; } + List list = appSchedulingInfo.getPerAskResourceRequests( + type, priority, request.getResourceName(), node.getRackName()); + // Create RMContainer RMContainer rmContainer = new RMContainerImpl(container, getApplicationAttemptId(), node.getNodeID(), - appSchedulingInfo.getUser(), rmContext); + appSchedulingInfo.getUser(), rmContext, list); // Add it to allContainers list. newlyAllocatedContainers.add(rmContainer); Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java (revision 1606717) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java (working copy) @@ -241,10 +241,17 @@ public synchronized void updateResourceRequests( List requests) { if (!isStopped) { - appSchedulingInfo.updateResourceRequests(requests); + appSchedulingInfo.updateResourceRequests(requests, false); } } + public synchronized void recoverResourceRequests( + List requests) { + if (!isStopped) { + appSchedulingInfo.updateResourceRequests(requests, true); + } + } + public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) { // Cleanup all scheduling information isStopped = true; @@ -274,7 +281,7 @@ if (rmContainer == null) { rmContainer = new RMContainerImpl(container, getApplicationAttemptId(), - node.getNodeID(), appSchedulingInfo.getUser(), rmContext); + node.getNodeID(), appSchedulingInfo.getUser(), rmContext, null); Resources.addTo(currentReservation, container.getResource()); Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java (revision 1606717) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java (working copy) @@ -26,6 +26,9 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.ArrayList; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; @@ -36,17 +39,24 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.junit.Assert; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -83,7 +93,7 @@ when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer); when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer); RMContainer rmContainer = new RMContainerImpl(container, appAttemptId, - nodeId, "user", rmContext); + nodeId, "user", rmContext, null); assertEquals(RMContainerState.NEW, rmContainer.getState()); assertEquals(resource, rmContainer.getAllocatedResource()); @@ -168,7 +178,7 @@ when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer); when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer); RMContainer rmContainer = new RMContainerImpl(container, appAttemptId, - nodeId, "user", rmContext); + nodeId, "user", rmContext, null); assertEquals(RMContainerState.NEW, rmContainer.getState()); assertEquals(resource, rmContainer.getAllocatedResource()); @@ -204,4 +214,36 @@ assertEquals(RMContainerState.RUNNING, rmContainer.getState()); verify(writer, never()).containerFinished(any(RMContainer.class)); } + + @Test + public void testExistenceOfResourceRequestInRMContainer() throws Exception { + Configuration conf = new Configuration(); + MockRM rm1 = new MockRM(conf); + rm1.start(); + MockNM nm1 = rm1.registerNode("unknownhost:1234", 8000); + RMApp app1 = rm1.submitApp(1024); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + ResourceScheduler scheduler = rm1.getResourceScheduler(); + + // request a container. + am1.allocate("127.0.0.1", 1024, 1, new ArrayList()); + ContainerId containerId2 = ContainerId.newInstance( + am1.getApplicationAttemptId(), 2); + rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED); + + // Verify whether list of ResourceRequest is present in RMContainer + // while moving to ALLOCATED state + Assert.assertNotNull(scheduler.getRMContainer(containerId2) + .getResourceRequests()); + + // Allocate container + am1.allocate(new ArrayList(), new ArrayList()) + .getAllocatedContainers(); + rm1.waitForState(nm1, containerId2, RMContainerState.ACQUIRED); + + // After RMContainer moving to ACQUIRED state, list of ResourceRequest will + // be empty + Assert.assertNull(scheduler.getRMContainer(containerId2) + .getResourceRequests()); + } } Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java (revision 1606717) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java (working copy) @@ -27,6 +27,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.security.PrivilegedAction; +import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -50,6 +51,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueInfo; @@ -62,6 +64,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.server.resourcemanager.Application; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; @@ -74,14 +77,20 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; @@ -828,4 +837,72 @@ cs.stop(); } + + @Test + public void testRecoverRequestUnderPreemption() throws Exception { + Configuration conf = new Configuration(); + MockRM rm1 = new MockRM(conf); + rm1.start(); + MockNM nm1 = rm1.registerNode("unknownhost:1234", 8000); + RMApp app1 = rm1.submitApp(1024); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + + // request a container. + am1.allocate("127.0.0.1", 1024, 1, new ArrayList()); + ContainerId containerId1 = ContainerId.newInstance( + am1.getApplicationAttemptId(), 2); + rm1.waitForState(nm1, containerId1, RMContainerState.ALLOCATED); + + RMContainer rmContainer = cs.getRMContainer(containerId1); + List requests = rmContainer.getResourceRequests(); + FiCaSchedulerApp app = cs.getApplicationAttempt(am1 + .getApplicationAttemptId()); + + FiCaSchedulerNode node = cs.getNode(rmContainer.getAllocatedNode()); + for (ResourceRequest request : requests) { + // Skip the OffRack and RackLocal resource requests. + if (request.getResourceName().equals(node.getRackName()) + || request.getResourceName().equals(ResourceRequest.ANY)) { + Assert.assertEquals(request.getNumContainers(), 0); + continue; + } + + // Already the node local resource request is cleared from RM after + // allocation. + Assert.assertNull(app.getResourceRequest(request.getPriority(), + request.getResourceName())); + } + + // Raise a preempt event to the Allocated container and raised a recover + // ResourceRequest call. In real scenario, this part will be done by + // CapacityScheduler#killContainer or FairScheduler#warnOrKillContainer + rmContainer.handle(new RMContainerFinishedEvent(containerId1, + SchedulerUtils.createPreemptedContainerStatus( + rmContainer.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER), + RMContainerEventType.KILL)); + cs.recoverResourceRequest(rmContainer); + + for (ResourceRequest request : requests) { + // Skip the OffRack and RackLocal resource requests. + if (request.getResourceName().equals(node.getRackName()) + || request.getResourceName().equals(ResourceRequest.ANY)) { + Assert.assertEquals(request.getNumContainers(), 1); + continue; + } + + // Resource request must have added back in RM after preempt event handling. + Assert.assertNotNull(app.getResourceRequest(request.getPriority(), + request.getResourceName())); + } + + rm1.waitForState(nm1, containerId1, RMContainerState.KILLED); + + // allocate container + List containers = am1.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers(); + + // Now with updated ResourceRequest, a container is allocated for AM. + Assert.assertTrue(containers.size() == 1); + } } Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java (revision 1606717) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java (working copy) @@ -259,7 +259,7 @@ Container container=TestUtils.getMockContainer(containerId, node_0.getNodeID(), Resources.createResource(1*GB), priority); RMContainer rmContainer = new RMContainerImpl(container, appAttemptId, - node_0.getNodeID(), "user", rmContext); + node_0.getNodeID(), "user", rmContext, null); // Assign {1,2,3,4} 1GB containers respectively to queues stubQueueAllocation(a, clusterResource, node_0, 1*GB); Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java (revision 1606717) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java (working copy) @@ -947,7 +947,7 @@ Container container = Container.newInstance( ContainerId.newInstance(attemptId, 1), null, "", null, null, null); RMContainerImpl containerimpl = spy(new RMContainerImpl(container, - attemptId, null, "", rmContext)); + attemptId, null, "", rmContext, null)); Map attempts = new HashMap(); attempts.put(attemptId, rmAppAttemptImpl);