diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index abd72bf..6fdeb3b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -573,30 +573,43 @@ protected void releaseContainers(List containers, SchedulerUtils.RELEASED_CONTAINER), RMContainerEventType.RELEASED); } } - + + // Synchronize this call to prevent race condition when calculating the delta + // resource in SchedContainerChangeRequest protected void decreaseContainers( - List decreaseRequests, + List decreaseRequests, SchedulerApplicationAttempt attempt) { - for (SchedContainerChangeRequest request : decreaseRequests) { - if (LOG.isDebugEnabled()) { - LOG.debug("Processing decrease request:" + request); - } - - boolean hasIncreaseRequest = - attempt.removeIncreaseRequest(request.getNodeId(), - request.getPriority(), request.getContainerId()); - - if (hasIncreaseRequest) { + + if (null == decreaseRequests || decreaseRequests.isEmpty()) { + return; + } + + synchronized(this) { + // Pre-process decrease requests + List normalizedDecreaseRequests = + checkAndNormalizeContainerChangeRequests(decreaseRequests, false); + + for (SchedContainerChangeRequest request : normalizedDecreaseRequests) { if (LOG.isDebugEnabled()) { - LOG.debug("While processing decrease request, found a increase request " - + "for the same container " - + request.getContainerId() - + ", removed the increase request"); + LOG.debug("Processing decrease request:" + request); + } + + boolean hasIncreaseRequest = + attempt.removeIncreaseRequest(request.getNodeId(), + request.getPriority(), request.getContainerId()); + + if (hasIncreaseRequest) { + if (LOG.isDebugEnabled()) { + LOG.debug("While processing decrease requests, found an increase" + + " request for the same container " + + request.getContainerId() + + ", removed the increase request"); + } } + + // handle decrease request + decreaseContainer(request, attempt); } - - // handle decrease request - decreaseContainer(request, attempt); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index c7b73fb..c816e62 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -892,9 +892,39 @@ private synchronized void doneApplicationAttempt( } } + // It is crucial to acquire CS lock first before acquiring app lock + // to prevent: + // 1. Race condition when calculating the delta resource in + // SchedContainerChangeRequest + // 2. Deadlock with the scheduling thread. + private LeafQueue updateIncreaseRequests( + List increaseRequests, + FiCaSchedulerApp app) { + if (null == increaseRequests || increaseRequests.isEmpty()) { + return null; + } + synchronized(this) { + // Pre-process increase requests + List normalizedIncreaseRequests = + checkAndNormalizeContainerChangeRequests(increaseRequests, true); + synchronized (app) { + // make sure we aren't stopping/removing the application + // when the allocate comes in + if (app.isStopped()) { + return null; + } + // Process increase resource requests + if (app.updateIncreaseRequests(normalizedIncreaseRequests)) { + return (LeafQueue) app.getQueue(); + } + return null; + } + } + } + @Override - // Note: when AM asks to decrease container or release container, we will - // acquire scheduler lock + // Note: when AM asks to increase/decrease container or release container, + // we will acquire scheduler lock @Lock(Lock.NoLock.class) public Allocation allocate(ApplicationAttemptId applicationAttemptId, List ask, List release, @@ -906,26 +936,23 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId, if (application == null) { return EMPTY_ALLOCATION; } - - // Sanity check - SchedulerUtils.normalizeRequests( - ask, getResourceCalculator(), getClusterResource(), - getMinimumResourceCapability(), getMaximumResourceCapability()); - - // Pre-process increase requests - List normalizedIncreaseRequests = - checkAndNormalizeContainerChangeRequests(increaseRequests, true); - - // Pre-process decrease requests - List normalizedDecreaseRequests = - checkAndNormalizeContainerChangeRequests(decreaseRequests, false); // Release containers releaseContainers(release, application); - Allocation allocation; + // update increase requests + LeafQueue updateDemandForQueue = + updateIncreaseRequests(increaseRequests, application); - LeafQueue updateDemandForQueue = null; + // Decrease containers + decreaseContainers(decreaseRequests, application); + + // Sanity check for new allocation requests + SchedulerUtils.normalizeRequests( + ask, getResourceCalculator(), getClusterResource(), + getMinimumResourceCapability(), getMaximumResourceCapability()); + + Allocation allocation; synchronized (application) { @@ -944,7 +971,8 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId, } // Update application requests - if (application.updateResourceRequests(ask)) { + if (application.updateResourceRequests(ask) + && (updateDemandForQueue == null)) { updateDemandForQueue = (LeafQueue) application.getQueue(); } @@ -954,12 +982,6 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId, } } - // Process increase resource requests - if (application.updateIncreaseRequests(normalizedIncreaseRequests) - && (updateDemandForQueue == null)) { - updateDemandForQueue = (LeafQueue) application.getQueue(); - } - if (application.isWaitingForAMContainer()) { // Allocate is for AM and update AM blacklist for this application.updateAMBlacklist( @@ -968,8 +990,6 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId, application.updateBlacklist(blacklistAdditions, blacklistRemovals); } - // Decrease containers - decreaseContainers(normalizedDecreaseRequests, application); allocation = application.getAllocation(getResourceCalculator(), clusterResource, getMinimumResourceCapability()); @@ -1163,7 +1183,8 @@ private void updateSchedulerHealth(long now, FiCaSchedulerNode node, .getAssignmentInformation().getReserved()); } - private synchronized void allocateContainersToNode(FiCaSchedulerNode node) { + @VisibleForTesting + protected synchronized void allocateContainersToNode(FiCaSchedulerNode node) { if (rmContext.isWorkPreservingRecoveryEnabled() && !rmContext.isSchedulerReadyForAllocatingContainers()) { return; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java index 672af64..c08af9d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java @@ -21,8 +21,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -47,8 +50,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler + .SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica + .FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.util.resource.Resources; @@ -57,12 +64,48 @@ import org.junit.Test; public class TestContainerResizing { + private static final Log LOG = LogFactory.getLog(TestContainerResizing.class); private final int GB = 1024; private YarnConfiguration conf; RMNodeLabelsManager mgr; + class MyScheduler extends CapacityScheduler { + /* + * A Mock Scheduler to simulate the potential effect of deadlock between: + * 1. The AbstractYarnScheduler.decreaseContainers() call (from + * ApplicationMasterService thread) + * 2. The CapacityScheduler.allocateContainersToNode() call (from the + * scheduler thread) + */ + MyScheduler() { + super(); + } + + @Override + protected void decreaseContainers( + List decreaseRequests, + SchedulerApplicationAttempt attempt) { + try { + Thread.sleep(1000); + } catch(InterruptedException e) { + LOG.debug("Thread interrupted."); + } + super.decreaseContainers(decreaseRequests, attempt); + } + + @Override + public synchronized void allocateContainersToNode(FiCaSchedulerNode node) { + try { + Thread.sleep(1000); + } catch(InterruptedException e) { + LOG.debug("Thread interrupted."); + } + super.allocateContainersToNode(node); + } + } + @Before public void setUp() throws Exception { conf = new YarnConfiguration(); @@ -958,6 +1001,50 @@ public RMNodeLabelsManager createNodeLabelManager() { rm1.close(); } + @Test (timeout = 60000) + public void testDecreaseContainerWillNotDeadlockContainerAllocation() + throws Exception { + // create and start MockRM with our MyScheduler + MockRM rm = new MockRM() { + @Override + public ResourceScheduler createScheduler() { + CapacityScheduler cs = new MyScheduler(); + cs.setConf(conf); + return cs; + } + }; + rm.start(); + // register a node + MockNM nm = rm.registerNode("h1:1234", 20 * GB); + // submit an application -> app1 + RMApp app1 = rm.submitApp(3 * GB, "app", "user", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm); + // making sure resource is allocated + checkUsedResource(rm, "default", 3 * GB, null); + FiCaSchedulerApp app = getFiCaSchedulerApp(rm, app1.getApplicationId()); + Assert.assertEquals(3 * GB, + app.getAppAttemptResourceUsage().getUsed().getMemory()); + // making sure container is launched + ContainerId containerId1 = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); + sentRMContainerLaunched(rm, containerId1); + // submit allocation request for a new container + am1.allocate(Collections.singletonList(ResourceRequest.newInstance( + Priority.newInstance(1), "*", Resources.createResource(2 * GB), 1)), + null); + // nm reports status update and triggers container allocation + nm.nodeHeartbeat(true); + // *In the mean time*, am1 asks to decrease its AM container resource from + // 3GB to 1GB + AllocateResponse response = am1.sendContainerResizingRequest(null, + Collections.singletonList(ContainerResourceChangeRequest + .newInstance(containerId1, Resources.createResource(GB)))); + // verify that the containe resource is decreased + verifyContainerDecreased(response, containerId1, GB); + + rm.close(); + } + private void checkPendingResource(MockRM rm, String queueName, int memory, String label) { CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();