diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 9197630..a6fef8c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -29,10 +29,7 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.yarn.api.records.ApplicationAccessType; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; -import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.*; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; @@ -279,7 +276,7 @@ protected void submitApplication( ApplicationId applicationId = submissionContext.getApplicationId(); RMAppImpl application = - createAndPopulateNewRMApp(submissionContext, submitTime, user); + createAndPopulateNewRMApp(submissionContext, submitTime, user, false); ApplicationId appId = submissionContext.getApplicationId(); if (UserGroupInformation.isSecurityEnabled()) { @@ -316,16 +313,18 @@ protected void recoverApplication(ApplicationStateData appState, // create and recover app. RMAppImpl application = createAndPopulateNewRMApp(appContext, appState.getSubmitTime(), - appState.getUser()); + appState.getUser(), true); + application.handle(new RMAppRecoverEvent(appId, rmState)); } private RMAppImpl createAndPopulateNewRMApp( - ApplicationSubmissionContext submissionContext, - long submitTime, String user) - throws YarnException { + ApplicationSubmissionContext submissionContext, long submitTime, + String user, boolean isRecovery) throws YarnException { ApplicationId applicationId = submissionContext.getApplicationId(); - ResourceRequest amReq = validateAndCreateResourceRequest(submissionContext); + ResourceRequest amReq = + validateAndCreateResourceRequest(submissionContext, isRecovery); + // Create RMApp RMAppImpl application = new RMAppImpl(applicationId, rmContext, this.conf, @@ -343,7 +342,7 @@ private RMAppImpl createAndPopulateNewRMApp( String message = "Application with id " + applicationId + " is already present! Cannot add a duplicate!"; LOG.warn(message); - throw RPCUtil.getRemoteException(message); + throw new YarnException(message); } // Inform the ACLs Manager this.applicationACLsManager.addApplication(applicationId, @@ -356,7 +355,7 @@ private RMAppImpl createAndPopulateNewRMApp( } private ResourceRequest validateAndCreateResourceRequest( - ApplicationSubmissionContext submissionContext) + ApplicationSubmissionContext submissionContext, boolean isRecovery) throws InvalidResourceRequestException { // Validation of the ApplicationSubmissionContext needs to be completed // here. Only those fields that are dependent on RM's configuration are @@ -365,14 +364,11 @@ private ResourceRequest validateAndCreateResourceRequest( // Check whether AM resource requirements are within required limits if (!submissionContext.getUnmanagedAM()) { - ResourceRequest amReq; - if (submissionContext.getAMContainerResourceRequest() != null) { - amReq = submissionContext.getAMContainerResourceRequest(); - } else { - amReq = - BuilderUtils.newResourceRequest( - RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY, - submissionContext.getResource(), 1); + ResourceRequest amReq = submissionContext.getAMContainerResourceRequest(); + if (amReq == null) { + amReq = BuilderUtils + .newResourceRequest(RMAppAttemptImpl.AM_CONTAINER_PRIORITY, + ResourceRequest.ANY, submissionContext.getResource(), 1); } // set label expression for AM container @@ -382,14 +378,15 @@ private ResourceRequest validateAndCreateResourceRequest( } try { - SchedulerUtils.validateResourceRequest(amReq, + SchedulerUtils.normalizeAndValidateRequest(amReq, scheduler.getMaximumResourceCapability(), - submissionContext.getQueue(), scheduler); + submissionContext.getQueue(), scheduler, isRecovery); } catch (InvalidResourceRequestException e) { LOG.warn("RM app submission failed in validating AM resource request" + " for application " + submissionContext.getApplicationId(), e); throw e; } + SchedulerUtils.normalizeRequest(amReq, scheduler.getResourceCalculator(), scheduler.getClusterResource(), scheduler.getMinimumResourceCapability(), diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java index 7a1a528..9ef3518 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java @@ -189,6 +189,49 @@ public static void normalizeRequest( ask.setCapability(normalized); } + public static void normalizeNodeLabelForRequest(ResourceRequest resReq, + QueueInfo queueInfo) { + + String labelExp = resReq.getNodeLabelExpression(); + + // if queue has default label expression, and RR doesn't have, use the + // default label expression of queue + if (labelExp == null && queueInfo != null && ResourceRequest.ANY + .equals(resReq.getResourceName())) { + labelExp = queueInfo.getDefaultNodeLabelExpression(); + } + + // If labelExp still equals to null, set it to be NO_LABEL + if (labelExp == null) { + resReq.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL); + } + } + + public static void normalizeAndValidateRequest(ResourceRequest resReq, + Resource maximumResource, String queueName, YarnScheduler scheduler, + boolean isRecovery) + throws InvalidResourceRequestException { + + QueueInfo queueInfo = null; + try { + queueInfo = scheduler.getQueueInfo(queueName, false, false); + } catch (IOException e) { + // it is possible queue cannot get when queue mapping is set, just ignore + // the queueInfo here, and move forward + } + SchedulerUtils.normalizeNodeLabelForRequest(resReq, queueInfo); + if (!isRecovery) { + validateResourceRequest(resReq, maximumResource, queueInfo); + } + } + + public static void validateResourceRequest(ResourceRequest resReq, + Resource maximumResource, String queueName, YarnScheduler scheduler) + throws InvalidResourceRequestException { + normalizeAndValidateRequest(resReq, maximumResource, queueName, scheduler, + false); + } + /** * Utility method to validate a resource request, by insuring that the * requested memory/vcore is non-negative and not greater than max @@ -196,7 +239,7 @@ public static void normalizeRequest( * @throws InvalidResourceRequestException when there is invalid request */ public static void validateResourceRequest(ResourceRequest resReq, - Resource maximumResource, String queueName, YarnScheduler scheduler) + Resource maximumResource, QueueInfo queueInfo) throws InvalidResourceRequestException { if (resReq.getCapability().getMemory() < 0 || resReq.getCapability().getMemory() > maximumResource.getMemory()) { @@ -216,31 +259,7 @@ public static void validateResourceRequest(ResourceRequest resReq, + resReq.getCapability().getVirtualCores() + ", maxVirtualCores=" + maximumResource.getVirtualCores()); } - - // Get queue from scheduler - QueueInfo queueInfo = null; - try { - queueInfo = scheduler.getQueueInfo(queueName, false, false); - } catch (IOException e) { - // it is possible queue cannot get when queue mapping is set, just ignore - // the queueInfo here, and move forward - } - - // check labels in the resource request. String labelExp = resReq.getNodeLabelExpression(); - - // if queue has default label expression, and RR doesn't have, use the - // default label expression of queue - if (labelExp == null && queueInfo != null - && ResourceRequest.ANY.equals(resReq.getResourceName())) { - labelExp = queueInfo.getDefaultNodeLabelExpression(); - } - - // If labelExp still equals to null, set it to be NO_LABEL - resReq - .setNodeLabelExpression(labelExp == null ? RMNodeLabelsManager.NO_LABEL - : labelExp); - // we don't allow specify label expression other than resourceName=ANY now if (!ResourceRequest.ANY.equals(resReq.getResourceName()) && labelExp != null && !labelExp.trim().isEmpty()) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java index 3033496..d66442b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -1011,4 +1011,30 @@ public void addApplicationSync(ApplicationId applicationId, rm2.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED); rm2.waitForState(app1.getApplicationId(), RMAppState.FAILED); } + + /** + * Test validateAndCreateResourceRequest fails on recovery, app should ignore + * this Exception and continue + */ + @Test (timeout = 30000) + public void testAppFailToValidateResourceRequestOnRecovery() throws Exception{ + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); + nm1.registerNode(); + RMApp app1 = rm1.submitApp(200); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // Change the config so that validateAndCreateResourceRequest throws + // exception on recovery + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 50); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 100); + + MockRM rm2 = new MockRM(conf, memStore); + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + rm2.start(); + } }