diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 0305702fb5f..b3aa0278002 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -459,7 +459,6 @@ public RMContainer allocate(NodeType type, FSSchedulerNode node, // Add it to allContainers list. addToNewlyAllocatedContainers(node, rmContainer); liveContainers.put(container.getId(), rmContainer); - // Update consumption and track allocations ContainerRequest containerRequest = appSchedulingInfo.allocate( type, node, schedulerKey, container); @@ -867,6 +866,12 @@ private Resource assignContainer( if (reserved) { unreserve(schedulerKey, node); } + if (LOG.isDebugEnabled()) { + LOG.debug(String.format( + "Resource ask %s fits in available node resources %s, " + + "but the allocated container was null!", + capability, available)); + } return Resources.none(); } @@ -1096,7 +1101,8 @@ private boolean hasContainerForNode(SchedulerRequestKey key, } else if (!getQueue().fitsInMaxShare(resource)) { // The requested container must fit in queue maximum share updateAMDiagnosticMsg(resource, - " exceeds current queue or its parents maximum resource allowed)."); + " exceeds current queue or its parents maximum resource allowed). " + + "Max share of queue: " + getQueue().getMaxShare()); ret = false; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java index a8e53fc26f2..c1612f0ccae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java @@ -182,6 +182,9 @@ public Resource assignContainer(FSSchedulerNode node) { // If this queue is over its limit, reject if (!assignContainerPreCheck(node)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Assign container precheck was false on node: " + node); + } return assigned; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 1f85814adac..0faeb0b90d8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -94,6 +94,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Comparator; import java.util.EnumSet; @@ -449,10 +450,7 @@ protected void addApplication(ApplicationId applicationId, String message = "Reject application " + applicationId + " submitted by user " + user + " with an empty queue name."; - LOG.info(message); - rmContext.getDispatcher().getEventHandler().handle( - new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, - message)); + rejectApplicationWithMessage(applicationId, message); return; } @@ -461,10 +459,7 @@ protected void addApplication(ApplicationId applicationId, "Reject application " + applicationId + " submitted by user " + user + " with an illegal queue name " + queueName + ". " + "The queue name cannot start/end with period."; - LOG.info(message); - rmContext.getDispatcher().getEventHandler().handle( - new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, - message)); + rejectApplicationWithMessage(applicationId, message); return; } @@ -476,6 +471,31 @@ protected void addApplication(ApplicationId applicationId, return; } + if (rmApp == null || rmApp.getAMResourceRequests() == null) { + LOG.debug("rmApp or rmApp.AMResourceRequests was null!"); + } + + if (rmApp != null && rmApp.getAMResourceRequests() != null) { + for (ResourceRequest amResourceRequest : rmApp + .getAMResourceRequests()) { + final Resource queueMaxShare = queue.getMaxShare(); + if (!Resources.fitsIn(amResourceRequest.getCapability(), + queueMaxShare) + && Resources.isAnyMajorResourceZero(DOMINANT_RESOURCE_CALCULATOR, + queueMaxShare)) { + String msg = String.format( + "Cannot submit application %s to queue %s because " + + "it has zero amount of resource for a requested " + + "resource! " + "Requested AM resource: %s, " + + "maximum queue resources: %s", + applicationId, queue.getName(), + amResourceRequest.getCapability(), queueMaxShare); + rejectApplicationWithMessage(applicationId, msg); + return; + } + } + } + // Enforce ACLs UserGroupInformation userUgi = UserGroupInformation.createRemoteUser( user); @@ -485,9 +505,7 @@ protected void addApplication(ApplicationId applicationId, String msg = "User " + userUgi.getUserName() + " cannot submit applications to queue " + queue.getName() + "(requested queuename is " + queueName + ")"; - LOG.info(msg); - rmContext.getDispatcher().getEventHandler().handle( - new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, msg)); + rejectApplicationWithMessage(applicationId, msg); return; } @@ -604,10 +622,7 @@ FSLeafQueue assignToQueue(RMApp rmApp, String queueName, String user) { } if (appRejectMsg != null && rmApp != null) { - LOG.error(appRejectMsg); - rmContext.getDispatcher().getEventHandler().handle( - new RMAppEvent(rmApp.getApplicationId(), - RMAppEventType.APP_REJECTED, appRejectMsg)); + rejectApplicationWithMessage(rmApp.getApplicationId(), appRejectMsg); return null; } @@ -834,7 +849,6 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, List ask, List schedulingRequests, List release, List blacklistAdditions, List blacklistRemovals, ContainerUpdates updateRequests) { - // Make sure this application exists FSAppAttempt application = getSchedulerApp(appAttemptId); if (application == null) { @@ -854,6 +868,29 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, return EMPTY_ALLOCATION; } + ApplicationId applicationId = application.getApplicationId(); + FSLeafQueue queue = application.getQueue(); + + for (ResourceRequest resourceRequest : ask) { + // An application can also hang if 0 mb memory or 0 vcores are + // requested using either of + // -Dmapreduce.map.resource.memory-mb=0 or + // -Dmapreduce.map.resource.vcores=0 + // This is why Resources.isAnyMajorResourceZero is used here. + if (Resources.isAnyMajorResourceZero(DOMINANT_RESOURCE_CALCULATOR, + queue.getMaxShare())) { + String msg = String.format( + "Cannot submit application %s to queue %s because " + + "it has zero amount of resource for a requested " + + "resource! Requested resources: %s, " + + "maximum queue resources: %s", + applicationId, queue.getName(), resourceRequest.getCapability(), + queue.getMaxShare()); + rejectApplicationWithMessage(applicationId, msg); + return EMPTY_ALLOCATION; + } + } + // Handle promotions and demotions handleContainerUpdates(application, updateRequests); @@ -1060,9 +1097,15 @@ void attemptScheduling(FSSchedulerNode node) { Resource assignedResource = Resources.clone(Resources.none()); Resource maxResourcesToAssign = Resources.multiply( node.getUnallocatedResource(), 0.5f); + while (node.getReservedContainer() == null) { Resource assignment = queueMgr.getRootQueue().assignContainer(node); + if (assignment.equals(Resources.none())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Assignment of container on node " + node + + " is zero!"); + } break; } @@ -1254,9 +1297,7 @@ private String resolveReservationQueueName(String queueName, String message = "Application " + applicationId + " submitted to a reservation which is not yet " + "currently active: " + resQName; - this.rmContext.getDispatcher().getEventHandler().handle( - new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, - message)); + rejectApplicationWithMessage(applicationId, message); return null; } if (!queue.getParent().getQueueName().equals(queueName)) { @@ -1264,9 +1305,7 @@ private String resolveReservationQueueName(String queueName, "Application: " + applicationId + " submitted to a reservation " + resQName + " which does not belong to the specified queue: " + queueName; - this.rmContext.getDispatcher().getEventHandler().handle( - new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, - message)); + rejectApplicationWithMessage(applicationId, message); return null; } // use the reservation queue to run the app @@ -1279,7 +1318,13 @@ private String resolveReservationQueueName(String queueName, } finally { readLock.unlock(); } + } + private void rejectApplicationWithMessage(ApplicationId applicationId, + String msg) { + LOG.info(msg); + rmContext.getDispatcher().getEventHandler().handle(new RMAppEvent( + applicationId, RMAppEventType.APP_REJECTED, msg)); } private String getDefaultQueueForPlanQueue(String queueName) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index b99856467cf..b00b64a2abc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -252,13 +252,36 @@ protected void createSchedulingRequestExistingApplication( protected void createApplicationWithAMResource(ApplicationAttemptId attId, String queue, String user, Resource amResource) { + createApplicationWithAMResourceInternal(attId, queue, user, amResource, + null); + ApplicationId appId = attId.getApplicationId(); + processEvents(queue, user, appId); + processAttempAddedEvent(attId); + } + + protected void createApplicationWithAMResource(ApplicationAttemptId attId, + String queue, String user, Resource amResource, + List amReqs) { + createApplicationWithAMResourceInternal(attId, queue, user, amResource, + amReqs); + ApplicationId appId = attId.getApplicationId(); + processEvents(queue, user, appId); + } + + private void createApplicationWithAMResourceInternal( + ApplicationAttemptId attId, String queue, String user, + Resource amResource, List amReqs) { RMContext rmContext = resourceManager.getRMContext(); ApplicationId appId = attId.getApplicationId(); RMApp rmApp = new RMAppImpl(appId, rmContext, conf, null, user, null, ApplicationSubmissionContext.newInstance(appId, null, queue, null, mock(ContainerLaunchContext.class), false, false, 0, amResource, - null), scheduler, null, 0, null, null, null); + null), + scheduler, null, 0, null, null, amReqs); rmContext.getRMApps().put(appId, rmApp); + } + + private void processEvents(String queue, String user, ApplicationId appId) { RMAppEvent event = new RMAppEvent(appId, RMAppEventType.START); resourceManager.getRMContext().getRMApps().get(appId).handle(event); event = new RMAppEvent(appId, RMAppEventType.APP_NEW_SAVED); @@ -268,8 +291,11 @@ protected void createApplicationWithAMResource(ApplicationAttemptId attId, AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent( appId, queue, user); scheduler.handle(appAddedEvent); + } + + private void processAttempAddedEvent(ApplicationAttemptId attId) { AppAttemptAddedSchedulerEvent attempAddedEvent = - new AppAttemptAddedSchedulerEvent(attId, false); + new AppAttemptAddedSchedulerEvent(attId, false); scheduler.handle(attempAddedEvent); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index d9c06a79db2..79716411c8c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -44,6 +44,7 @@ import javax.xml.parsers.ParserConfigurationException; +import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.ha.HAServiceProtocol; @@ -62,6 +63,7 @@ import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -5414,4 +5416,161 @@ public void testCompletedContainerOnRemovedNode() throws IOException { SchedulerUtils.COMPLETED_APPLICATION), RMContainerEventType.EXPIRE); } + + @Test + public void testAppRejectedToQueueZeroCapacityOfResourceVcores() + throws IOException { + testAppRejectedToQueueZeroCapacityOfResource( + ResourceInformation.VCORES_URI); + } + + @Test + public void testAppRejectedToQueueZeroCapacityOfResourceMemory() + throws IOException { + testAppRejectedToQueueZeroCapacityOfResource( + ResourceInformation.MEMORY_URI); + } + + private void testAppRejectedToQueueZeroCapacityOfResource(String resource) + throws IOException { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + generateAllocationFileWithZeroResource(resource); + + final List recordedEvents = Lists.newArrayList(); + + RMContext spyContext = Mockito.spy(resourceManager.getRMContext()); + Dispatcher mockDispatcher = mock(AsyncDispatcher.class); + when(mockDispatcher.getEventHandler()).thenReturn((EventHandler) event -> { + if (event instanceof RMAppEvent) { + recordedEvents.add(event); + } + }); + Mockito.doReturn(mockDispatcher).when(spyContext).getDispatcher(); + ((AsyncDispatcher) mockDispatcher).start(); + + scheduler.setRMContext(spyContext); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // submit app with queue name (queueA) + ApplicationAttemptId appAttemptId1 = createAppAttemptId(1, 1); + + ResourceRequest amReqs = ResourceRequest.newBuilder() + .capability(Resource.newInstance(5 * GB, 3)).build(); + createApplicationWithAMResource(appAttemptId1, "queueA", "user1", + Resource.newInstance(GB, 1), Lists.newArrayList(amReqs)); + scheduler.update(); + + assertEquals("Exactly one APP_REJECTED event is expected", 1, + recordedEvents.size()); + Event event = recordedEvents.get(0); + RMAppEvent rmAppEvent = (RMAppEvent) event; + assertEquals(RMAppEventType.APP_REJECTED, rmAppEvent.getType()); + assertTrue(rmAppEvent.getDiagnosticMsg() + .matches("Cannot submit application application[\\d_]+ to queue " + + "root.queueA because it has zero amount of resource " + + "for a requested resource! " + "Requested AM resource: .+, " + + "maximum queue resources: .+")); + } + + private void generateAllocationFileWithZeroResource(String resource) + throws IOException { + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + + String resources = ""; + if (resource.equals(ResourceInformation.MEMORY_URI)) { + resources = "0 mb,2vcores"; + } else if (resource.equals(ResourceInformation.VCORES_URI)) { + resources = "10000 mb,0vcores"; + } + out.println("" + resources + ""); + out.println("" + resources + ""); + out.println("2.0"); + out.println(""); + out.println(""); + out.println("1 mb 1 vcores"); + out.println("0.0"); + out.println(""); + out.println(""); + out.close(); + } + + @Test + public void testSchedulingRejectedToQueueZeroCapacityOfMemory1() + throws IOException { + testSchedulingRejectedToQueueZeroCapacityOfResource( + ResourceInformation.MEMORY_URI, 2048, 2); + } + + @Test + public void testSchedulingRejectedToQueueZeroCapacityOfMemory2() + throws IOException { + testSchedulingRejectedToQueueZeroCapacityOfResource( + ResourceInformation.MEMORY_URI, 2048, 2); + } + + @Test + public void testSchedulingRejectedToQueueZeroCapacityOfVcores1() + throws IOException { + testSchedulingRejectedToQueueZeroCapacityOfResource( + ResourceInformation.VCORES_URI, 0, 0); + } + + @Test + public void testSchedulingRejectedToQueueZeroCapacityOfVcores2() + throws IOException { + testSchedulingRejectedToQueueZeroCapacityOfResource( + ResourceInformation.VCORES_URI, 0, 0); + } + + private void testSchedulingRejectedToQueueZeroCapacityOfResource( + String resource, int memory, int vCores) throws IOException { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + generateAllocationFileWithZeroResource(resource); + + final List recordedEvents = Lists.newArrayList(); + + RMContext spyContext = Mockito.spy(resourceManager.getRMContext()); + Dispatcher mockDispatcher = mock(AsyncDispatcher.class); + when(mockDispatcher.getEventHandler()).thenReturn((EventHandler) event -> { + if (event instanceof RMAppEvent) { + recordedEvents.add(event); + } + }); + Mockito.doReturn(mockDispatcher).when(spyContext).getDispatcher(); + ((AsyncDispatcher) mockDispatcher).start(); + + scheduler.setRMContext(spyContext); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node + RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(2048, 2)); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + + // Create another app and reserve at a lower priority first + createSchedulingRequest(memory, vCores, "queueA", "user1", 1, 2); + + assertEquals( + "Exactly one APP_ACCEPTED and one APP_REJECTED event is expected", 2, + recordedEvents.size()); + Event event = recordedEvents.get(1); + RMAppEvent rmAppEvent = (RMAppEvent) event; + assertEquals(RMAppEventType.APP_REJECTED, rmAppEvent.getType()); + assertTrue(rmAppEvent.getDiagnosticMsg() + .matches("Cannot submit application application[\\d_]+ to queue " + + "root\\.queueA because it has zero amount of resource " + + "for a requested resource! Requested resources: <.*>, " + + "maximum queue resources: <.*>")); + + } }