diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 9197630..25afa67 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -279,7 +279,7 @@ protected void submitApplication( ApplicationId applicationId = submissionContext.getApplicationId(); RMAppImpl application = - createAndPopulateNewRMApp(submissionContext, submitTime, user); + createAndPopulateNewRMApp(submissionContext, submitTime, user, false); ApplicationId appId = submissionContext.getApplicationId(); if (UserGroupInformation.isSecurityEnabled()) { @@ -316,16 +316,18 @@ protected void recoverApplication(ApplicationStateData appState, // create and recover app. RMAppImpl application = createAndPopulateNewRMApp(appContext, appState.getSubmitTime(), - appState.getUser()); + appState.getUser(), true); + application.handle(new RMAppRecoverEvent(appId, rmState)); } private RMAppImpl createAndPopulateNewRMApp( - ApplicationSubmissionContext submissionContext, - long submitTime, String user) - throws YarnException { + ApplicationSubmissionContext submissionContext, long submitTime, + String user, boolean isRecovery) throws YarnException{ ApplicationId applicationId = submissionContext.getApplicationId(); - ResourceRequest amReq = validateAndCreateResourceRequest(submissionContext); + ResourceRequest amReq = + validateAndCreateResourceRequest(submissionContext, isRecovery); + // Create RMApp RMAppImpl application = new RMAppImpl(applicationId, rmContext, this.conf, @@ -343,7 +345,7 @@ private RMAppImpl createAndPopulateNewRMApp( String message = "Application with id " + applicationId + " is already present! Cannot add a duplicate!"; LOG.warn(message); - throw RPCUtil.getRemoteException(message); + throw new YarnException(message); } // Inform the ACLs Manager this.applicationACLsManager.addApplication(applicationId, @@ -356,7 +358,7 @@ private RMAppImpl createAndPopulateNewRMApp( } private ResourceRequest validateAndCreateResourceRequest( - ApplicationSubmissionContext submissionContext) + ApplicationSubmissionContext submissionContext, boolean isRecovery) throws InvalidResourceRequestException { // Validation of the ApplicationSubmissionContext needs to be completed // here. Only those fields that are dependent on RM's configuration are @@ -365,14 +367,11 @@ private ResourceRequest validateAndCreateResourceRequest( // Check whether AM resource requirements are within required limits if (!submissionContext.getUnmanagedAM()) { - ResourceRequest amReq; - if (submissionContext.getAMContainerResourceRequest() != null) { - amReq = submissionContext.getAMContainerResourceRequest(); - } else { - amReq = - BuilderUtils.newResourceRequest( - RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY, - submissionContext.getResource(), 1); + ResourceRequest amReq = submissionContext.getAMContainerResourceRequest(); + if (amReq == null) { + amReq = BuilderUtils + .newResourceRequest(RMAppAttemptImpl.AM_CONTAINER_PRIORITY, + ResourceRequest.ANY, submissionContext.getResource(), 1); } // set label expression for AM container @@ -386,9 +385,13 @@ private ResourceRequest validateAndCreateResourceRequest( scheduler.getMaximumResourceCapability(), submissionContext.getQueue(), scheduler); } catch (InvalidResourceRequestException e) { - LOG.warn("RM app submission failed in validating AM resource request" - + " for application " + submissionContext.getApplicationId(), e); - throw e; + if (isRecovery) { + LOG.info("Ignore InvalidResourceRequestException on recovery."); + } else { + LOG.warn("RM app submission failed in validating AM resource request" + + " for application " + submissionContext.getApplicationId(), e); + throw e; + } } SchedulerUtils.normalizeRequest(amReq, scheduler.getResourceCalculator(), scheduler.getClusterResource(), diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java index 3033496..d66442b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -1011,4 +1011,30 @@ public void addApplicationSync(ApplicationId applicationId, rm2.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED); rm2.waitForState(app1.getApplicationId(), RMAppState.FAILED); } + + /** + * Test validateAndCreateResourceRequest fails on recovery, app should ignore + * this Exception and continue + */ + @Test (timeout = 30000) + public void testAppFailToValidateResourceRequestOnRecovery() throws Exception{ + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); + nm1.registerNode(); + RMApp app1 = rm1.submitApp(200); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // Change the config so that validateAndCreateResourceRequest throws + // exception on recovery + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 50); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 100); + + MockRM rm2 = new MockRM(conf, memStore); + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + rm2.start(); + } }