diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 5e82f401b6e..c21cd66b162 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -48,6 +48,10 @@ import org.apache.hadoop.yarn.security.AccessRequest; import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; + +import org.apache.hadoop.yarn.server.resourcemanager.placement + .ApplicationPlacementContext; +import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; @@ -361,8 +365,20 @@ private RMAppImpl createAndPopulateNewRMApp( ApplicationSubmissionContext submissionContext, long submitTime, String user, boolean isRecovery, long startTime) throws YarnException { + ApplicationPlacementContext placementContext = null; + if (!isRecovery) { // fail the submission if configured application timeout value is invalid + try { + placementContext = placeApplication(rmContext, + submissionContext, user); + replaceQueueFromPlacementContext(placementContext, + submissionContext); + } catch (YarnException e) { + String msg = "Failed to place application to queue :" + e.getMessage(); + LOG.error(msg, e); + } + RMServerUtils.validateApplicationTimeouts( submissionContext.getApplicationTimeouts()); } @@ -413,7 +429,8 @@ private RMAppImpl createAndPopulateNewRMApp( submissionContext.getQueue(), submissionContext, this.scheduler, this.masterService, submitTime, submissionContext.getApplicationType(), - submissionContext.getApplicationTags(), amReqs, startTime); + submissionContext.getApplicationTags(), amReqs, placementContext, + startTime); // Concurrent app submissions with same applicationId will fail here // Concurrent app submissions with different applicationIds will not // influence each other @@ -758,4 +775,35 @@ private void updateAppDataToStateStore(String queue, RMApp app, + "' with below exception:" + ex.getMessage()); } } + + @VisibleForTesting + static ApplicationPlacementContext placeApplication(RMContext rmContext, + ApplicationSubmissionContext context, String user) throws YarnException { + ApplicationPlacementContext placementContext = null; + PlacementManager placementManager = rmContext.getQueuePlacementManager(); + + if (placementManager != null) { + placementContext = placementManager.placeApplication(context, user); + } else{ + LOG.error( + "Queue Placement Manager is null. Cannot place application :" + " " + + context.getApplicationId() + " to queue "); + } + + return placementContext; +} + + static void replaceQueueFromPlacementContext( + ApplicationPlacementContext placementContext, + ApplicationSubmissionContext context) { + // Set it to ApplicationSubmissionContext + //apply queue mapping only to new application submissions + if (placementContext != null && !StringUtils.equalsIgnoreCase(context.getQueue(), + placementContext.getQueue())) { + LOG.info("Placed application=" + context.getApplicationId() + " to queue=" + + placementContext.getQueue() + ", original queue=" + context + .getQueue()); + context.setQueue(placementContext.getQueue()); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index 85837890919..b357d91cbfe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -43,6 +43,8 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.records.AppCollectorData; +import org.apache.hadoop.yarn.server.resourcemanager.placement + .ApplicationPlacementContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -303,4 +305,10 @@ ApplicationReport createAndGetApplicationReport(String clientUserName, * @return True/False to confirm whether app is in final states */ boolean isAppInCompletedStates(); + + /** + * Get the application -> queue placement context + * @return ApplicationPlacementContext + */ + ApplicationPlacementContext getApplicationPlacementContext(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 0266b83445c..6ac245b2ecf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -83,8 +83,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager; import org.apache.hadoop.yarn.server.resourcemanager.blacklist.DisabledBlacklistManager; import org.apache.hadoop.yarn.server.resourcemanager.blacklist.SimpleBlacklistManager; -import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext; -import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager; + +import org.apache.hadoop.yarn.server.resourcemanager.placement + .ApplicationPlacementContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; @@ -204,6 +205,8 @@ private CallerContext callerContext; + private ApplicationPlacementContext placementContext; + Object transitionTodo; private Priority applicationPriority; @@ -417,7 +420,7 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, List amReqs) { this(applicationId, rmContext, config, name, user, queue, submissionContext, scheduler, masterService, submitTime, applicationType, applicationTags, - amReqs, -1); + amReqs, null, -1); } public RMAppImpl(ApplicationId applicationId, RMContext rmContext, @@ -425,7 +428,8 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, ApplicationSubmissionContext submissionContext, YarnScheduler scheduler, ApplicationMasterService masterService, long submitTime, String applicationType, Set applicationTags, - List amReqs, long startTime) { + List amReqs, ApplicationPlacementContext + placementContext, long startTime) { this.systemClock = SystemClock.getInstance(); @@ -484,6 +488,8 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, this.callerContext = CallerContext.getCurrent(); + this.placementContext = placementContext; + long localLogAggregationStatusTimeout = conf.getLong(YarnConfiguration.LOG_AGGREGATION_STATUS_TIME_OUT_MS, YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS); @@ -1098,22 +1104,12 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { } } - ApplicationPlacementContext placementContext = null; - try { - placementContext = placeApplication(app.rmContext, - app.submissionContext, app.user); - } catch (Exception e) { - String msg = "Failed to place application to queue :" + e.getMessage(); - app.diagnostics.append(msg); - LOG.error(msg, e); - } - // No existent attempts means the attempt associated with this app was not // started or started but not yet saved. if (app.attempts.isEmpty()) { app.scheduler.handle( new AppAddedSchedulerEvent(app.user, app.submissionContext, false, - app.applicationPriority, placementContext)); + app.applicationPriority, app.placementContext)); return RMAppState.SUBMITTED; } @@ -1121,7 +1117,7 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { // knows applications before AM or NM re-registers. app.scheduler.handle( new AppAddedSchedulerEvent(app.user, app.submissionContext, true, - app.applicationPriority, placementContext)); + app.applicationPriority, app.placementContext)); // recover attempts app.recoverAppAttempts(); @@ -1137,20 +1133,9 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { RMAppTransition { @Override public void transition(RMAppImpl app, RMAppEvent event) { - ApplicationPlacementContext placementContext = null; - try { - placementContext = placeApplication(app.rmContext, - app.submissionContext, app.user); - replaceQueueFromPlacementContext(placementContext, - app.submissionContext); - } catch (YarnException e) { - String msg = "Failed to place application to queue :" + e.getMessage(); - app.diagnostics.append(msg); - LOG.error(msg, e); - } app.handler.handle( new AppAddedSchedulerEvent(app.user, app.submissionContext, false, - app.applicationPriority, placementContext)); + app.applicationPriority, app.placementContext)); // send the ATS create Event app.sendATSCreateEvent(); } @@ -1624,6 +1609,11 @@ public boolean isAppInCompletedStates() { || appState == RMAppState.KILLING; } + @Override + public ApplicationPlacementContext getApplicationPlacementContext() { + return placementContext; + } + public RMAppState getRecoveredFinalState() { return this.recoveredFinalState; } @@ -2046,37 +2036,4 @@ private void clearUnusedFields() { this.submissionContext.setAMContainerSpec(null); this.submissionContext.setLogAggregationContext(null); } - - @VisibleForTesting - static ApplicationPlacementContext placeApplication(RMContext rmContext, - ApplicationSubmissionContext context, String user) throws YarnException { - - ApplicationPlacementContext placementContext = null; - PlacementManager placementManager = rmContext.getQueuePlacementManager(); - - if (placementManager != null) { - placementContext = placementManager.placeApplication(context, user); - } else{ - LOG.error( - "Queue Placement Manager is null. Cannot place application :" + " " - + context.getApplicationId() + " to queue "); - } - - return placementContext; - } - - static void replaceQueueFromPlacementContext( - ApplicationPlacementContext placementContext, - ApplicationSubmissionContext context) { - // Set it to ApplicationSubmissionContext - //apply queue mapping only to new application submissions - if (placementContext != null && !StringUtils.equals(context.getQueue(), - placementContext.getQueue())) { - LOG.info("Placed application=" + context.getApplicationId() + " to queue=" - + placementContext.getQueue() + ", original queue=" + context - .getQueue()); - context.setQueue(placementContext.getQueue()); - } - } - } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java index 9ef48dbf883..2aca3752013 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java @@ -46,6 +46,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.records.AppCollectorData; +import org.apache.hadoop.yarn.server.resourcemanager.placement + .ApplicationPlacementContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; @@ -243,6 +245,11 @@ public boolean isAppInCompletedStates() { throw new UnsupportedOperationException("Not supported yet."); } + @Override + public ApplicationPlacementContext getApplicationPlacementContext() { + throw new UnsupportedOperationException("Not supported yet."); + } + @Override public CollectorInfo getCollectorInfo() { throw new UnsupportedOperationException("Not supported yet."); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index 6c64a6709e7..756759957e6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -45,6 +45,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.records.AppCollectorData; +import org.apache.hadoop.yarn.server.resourcemanager.placement + .ApplicationPlacementContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -327,6 +329,11 @@ public boolean isAppInCompletedStates() { return false; } + @Override + public ApplicationPlacementContext getApplicationPlacementContext() { + return null; + } + @Override public CollectorInfo getCollectorInfo() { throw new UnsupportedOperationException("Not supported yet.");