From caaea5cc16f21c90f31ec15cb0e0d12b96806415 Mon Sep 17 00:00:00 2001 From: Sunil Date: Mon, 14 Nov 2016 15:52:04 +0530 Subject: [PATCH] YARN-5865 --- .../resourcemanager/ApplicationMasterService.java | 2 +- .../server/resourcemanager/ClientRMService.java | 10 ++--- .../yarn/server/resourcemanager/RMAppManager.java | 48 ++++++++++++++++++++-- .../metrics/TimelineServiceV1Publisher.java | 4 +- .../metrics/TimelineServiceV2Publisher.java | 4 +- .../yarn/server/resourcemanager/rmapp/RMApp.java | 14 +++++++ .../server/resourcemanager/rmapp/RMAppImpl.java | 47 +++++++++++++++++---- .../scheduler/AbstractYarnScheduler.java | 6 ++- .../resourcemanager/scheduler/YarnScheduler.java | 9 +++- .../scheduler/capacity/CapacityScheduler.java | 28 ++++++------- .../scheduler/event/AppAddedSchedulerEvent.java | 5 ++- .../resourcemanager/webapp/RMWebServices.java | 6 +-- .../server/resourcemanager/webapp/dao/AppInfo.java | 4 +- .../TestApplicationMasterService.java | 2 +- .../resourcemanager/TestClientRMService.java | 3 +- .../applicationsmanager/MockAsm.java | 11 +++++ .../metrics/TestSystemMetricsPublisher.java | 2 + .../metrics/TestSystemMetricsPublisherForV2.java | 1 + .../server/resourcemanager/rmapp/MockRMApp.java | 10 +++++ .../capacity/TestApplicationPriority.java | 13 +++--- 20 files changed, 171 insertions(+), 58 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 4f952b7..3d7b2b1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -616,7 +616,7 @@ public AllocateResponse allocate(AllocateRequest request) // Set application priority allocateResponse.setApplicationPriority(app - .getApplicationSubmissionContext().getPriority()); + .getApplicationPriority()); // update AMRMToken if the token is rolled-up MasterKeyData nextMasterKey = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index c8af526..4e36b6c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -1602,14 +1602,14 @@ public UpdateApplicationPriorityResponse updateApplicationPriority( .newRecordInstance(UpdateApplicationPriorityResponse.class); // Update priority only when app is tracked by the scheduler if (!ACTIVE_APP_STATES.contains(application.getState())) { - if (COMPLETED_APP_STATES.contains(application.getState())) { + if (application.isAppInCompletedStates()) { // If Application is in any of the final states, change priority // can be skipped rather throwing exception. RMAuditLogger.logSuccess(callerUGI.getShortUserName(), AuditConstants.UPDATE_APP_PRIORITY, "ClientRMService", applicationId); response.setApplicationPriority(application - .getApplicationSubmissionContext().getPriority()); + .getApplicationPriority()); return response; } String msg = "Application in " + application.getState() @@ -1622,8 +1622,7 @@ public UpdateApplicationPriorityResponse updateApplicationPriority( } try { - rmContext.getScheduler().updateApplicationPriority(newAppPriority, - applicationId); + rmAppManager.updateApplicationPriority(applicationId, newAppPriority); } catch (YarnException ex) { RMAuditLogger.logFailure(callerUGI.getShortUserName(), AuditConstants.UPDATE_APP_PRIORITY, "UNKNOWN", "ClientRMService", @@ -1633,8 +1632,7 @@ public UpdateApplicationPriorityResponse updateApplicationPriority( RMAuditLogger.logSuccess(callerUGI.getShortUserName(), AuditConstants.UPDATE_APP_PRIORITY, "ClientRMService", applicationId); - response.setApplicationPriority(application - .getApplicationSubmissionContext().getPriority()); + response.setApplicationPriority(application.getApplicationPriority()); return response; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index e566243..e0183cd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -398,10 +398,11 @@ private RMAppImpl createAndPopulateNewRMApp( RMAppImpl application = new RMAppImpl(applicationId, rmContext, this.conf, submissionContext.getApplicationName(), user, - submissionContext.getQueue(), - submissionContext, this.scheduler, this.masterService, - submitTime, submissionContext.getApplicationType(), - submissionContext.getApplicationTags(), amReq, startTime); + submissionContext.getQueue(), submissionContext, this.scheduler, + this.masterService, submitTime, + submissionContext.getApplicationType(), + submissionContext.getApplicationTags(), amReq, startTime, + submissionContext.getPriority()); // Concurrent app submissions with same applicationId will fail here // Concurrent app submissions with different applicationIds will not // influence each other @@ -548,4 +549,43 @@ public void updateApplicationTimeout(RMApp app, ((RMAppImpl) app).updateApplicationTimeout(newExpireTime); } } + + /** + * updateApplicationPriority will invoke scheduler api to update the + * new priority to RM and StateStore. + * @param applicationId Application Id + * @param newAppPriority proposed new application priority + * @throws YarnException Handle exceptions + */ + public void updateApplicationPriority(ApplicationId applicationId, + Priority newAppPriority) throws YarnException { + RMApp app = this.rmContext.getRMApps().get(applicationId); + + synchronized (applicationId) { + // Create a future object to capture exceptions from StateStore. + SettableFuture future = SettableFuture.create(); + + if (app.isAppInCompletedStates()) { + return; + } + + // Invoke scheduler api to update priority in scheduler and to + // State Store. + Priority appPriority = rmContext.getScheduler() + .updateApplicationPriority(newAppPriority, applicationId, future); + + Futures.get(future, YarnException.class); + + if (newAppPriority.equals(appPriority)) { + return; + } + + // update in-memory + ((RMAppImpl) app).setApplicationPriority(appPriority); + } + + // Update the changed application state to timeline server + rmContext.getSystemMetricsPublisher().appUpdated(app, + System.currentTimeMillis()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java index aa3ef0a..73bb301 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java @@ -88,7 +88,7 @@ public void appCreated(RMApp app, long createdTime) { ApplicationMetricsConstants.UNMANAGED_APPLICATION_ENTITY_INFO, app.getApplicationSubmissionContext().getUnmanagedAM()); entityInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, - app.getApplicationSubmissionContext().getPriority().getPriority()); + app.getApplicationPriority().getPriority()); entityInfo.put(ApplicationMetricsConstants.AM_NODE_LABEL_EXPRESSION, app.getAmNodeLabelExpression()); entityInfo.put(ApplicationMetricsConstants.APP_NODE_LABEL_EXPRESSION, @@ -164,7 +164,7 @@ public void appUpdated(RMApp app, long updatedTime) { eventInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO, app.getQueue()); eventInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, - app.getApplicationSubmissionContext().getPriority().getPriority()); + app.getApplicationPriority().getPriority()); TimelineEvent tEvent = new TimelineEvent(); tEvent.setEventType(ApplicationMetricsConstants.UPDATED_EVENT_TYPE); tEvent.setTimestamp(updatedTime); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java index 6eb6eb9..412d573 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java @@ -117,7 +117,7 @@ public void appCreated(RMApp app, long createdTime) { ApplicationMetricsConstants.UNMANAGED_APPLICATION_ENTITY_INFO, app.getApplicationSubmissionContext().getUnmanagedAM()); entityInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, - app.getApplicationSubmissionContext().getPriority().getPriority()); + app.getApplicationPriority().getPriority()); entity.getConfigs().put( ApplicationMetricsConstants.AM_NODE_LABEL_EXPRESSION, app.getAmNodeLabelExpression()); @@ -272,7 +272,7 @@ public void appUpdated(RMApp app, long currentTimeMillis) { eventInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO, app.getQueue()); eventInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, - app.getApplicationSubmissionContext().getPriority().getPriority()); + app.getApplicationPriority().getPriority()); TimelineEvent tEvent = new TimelineEvent(); tEvent.setId(ApplicationMetricsConstants.UPDATED_EVENT_TYPE); tEvent.setTimestamp(currentTimeMillis); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index cd08743..575a826 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; @@ -283,4 +284,17 @@ ApplicationReport createAndGetApplicationReport(String clientUserName, CallerContext getCallerContext(); Map getApplicationTimeouts(); + + /** + * Get priority of the application. + * @return priority + */ + Priority getApplicationPriority(); + + /** + * To verify whether app has reached in its completed/completed states. + * + * @return True/False to confirm whether app is in final states + */ + boolean isAppInCompletedStates(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index a376311..3ce61b4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -197,6 +198,8 @@ Object transitionTodo; + private Priority applicationPriority; + private static final StateMachineFactory applicationTags, ResourceRequest amReq, long startTime) { + this(applicationId, rmContext, config, name, user, queue, submissionContext, + scheduler, masterService, submitTime, applicationType, applicationTags, + amReq, startTime, Priority.newInstance(0)); + } + + public RMAppImpl(ApplicationId applicationId, RMContext rmContext, + Configuration config, String name, String user, String queue, + ApplicationSubmissionContext submissionContext, YarnScheduler scheduler, + ApplicationMasterService masterService, long submitTime, + String applicationType, Set applicationTags, + ResourceRequest amReq, long startTime, Priority appPriority) { this.systemClock = SystemClock.getInstance(); @@ -461,6 +475,7 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, this.applicationType = applicationType; this.applicationTags = applicationTags; this.amReq = amReq; + this.setApplicationPriority(Priority.newInstance(appPriority.getPriority())); int globalMaxAppAttempts = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); @@ -533,8 +548,6 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, DEFAULT_AM_SCHEDULING_NODE_BLACKLISTING_DISABLE_THRESHOLD; } } - - } /** @@ -777,7 +790,7 @@ public ApplicationReport createAndGetApplicationReport(String clientUserName, createApplicationState(), diags, trackingUrl, this.startTime, this.finishTime, finishState, appUsageReport, origTrackingUrl, progress, this.applicationType, amrmToken, applicationTags, - this.submissionContext.getPriority()); + this.getApplicationPriority()); report.setLogAggregationStatus(logAggregationStatus); report.setUnmanagedApp(submissionContext.getUnmanagedAM()); report.setAppNodeLabelExpression(getAppNodeLabelExpression()); @@ -1138,14 +1151,14 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { // started or started but not yet saved. if (app.attempts.isEmpty()) { app.scheduler.handle(new AppAddedSchedulerEvent(app.user, - app.submissionContext, false)); + app.submissionContext, false, app.applicationPriority)); return RMAppState.SUBMITTED; } // Add application to scheduler synchronously to guarantee scheduler // knows applications before AM or NM re-registers. app.scheduler.handle(new AppAddedSchedulerEvent(app.user, - app.submissionContext, true)); + app.submissionContext, true, app.applicationPriority)); // recover attempts app.recoverAppAttempts(); @@ -1162,7 +1175,7 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { @Override public void transition(RMAppImpl app, RMAppEvent event) { app.handler.handle(new AppAddedSchedulerEvent(app.user, - app.submissionContext, false)); + app.submissionContext, false, app.applicationPriority)); // send the ATS create Event app.sendATSCreateEvent(); } @@ -1619,7 +1632,16 @@ public static boolean isAppInFinalState(RMApp rmApp) { return appState == RMAppState.FAILED || appState == RMAppState.FINISHED || appState == RMAppState.KILLED; } - + + @Override + public boolean isAppInCompletedStates() { + RMAppState appState = getState(); + return appState == RMAppState.FINISHED || appState == RMAppState.FINISHING + || appState == RMAppState.FAILED || appState == RMAppState.KILLED + || appState == RMAppState.FINAL_SAVING + || appState == RMAppState.KILLING; + } + public RMAppState getRecoveredFinalState() { return this.recoveredFinalState; } @@ -2018,4 +2040,13 @@ public void updateApplicationTimeout( this.writeLock.unlock(); } } + + @Override + public Priority getApplicationPriority() { + return applicationPriority; + } + + public void setApplicationPriority(Priority applicationPriority) { + this.applicationPriority = applicationPriority; + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 93df1e7..24161ad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -777,10 +777,12 @@ public Priority checkAndGetApplicationPriority(Priority priorityFromContext, } @Override - public void updateApplicationPriority(Priority newPriority, - ApplicationId applicationId) throws YarnException { + public Priority updateApplicationPriority(Priority newPriority, + ApplicationId applicationId, SettableFuture future) + throws YarnException { // Dummy Implementation till Application Priority changes are done in // specific scheduler. + return Priority.newInstance(0); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java index c4f575f..fd45ec6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java @@ -51,6 +51,8 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import com.google.common.util.concurrent.SettableFuture; + /** * This interface is used by the components to talk to the * scheduler for allocating of resources, cleaning up resources. @@ -318,9 +320,12 @@ public Priority checkAndGetApplicationPriority(Priority priorityFromContext, * @param newPriority Submitted Application priority. * * @param applicationId Application ID + * + * @return updated priority */ - public void updateApplicationPriority(Priority newPriority, - ApplicationId applicationId) throws YarnException; + public Priority updateApplicationPriority(Priority newPriority, + ApplicationId applicationId, SettableFuture future) + throws YarnException; /** * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 65a08c6..f68045b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -145,6 +145,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.SettableFuture; @LimitedPrivate("yarn") @Evolving @@ -2405,8 +2407,9 @@ private Priority getDefaultPriorityForQueue(String queueName) { } @Override - public void updateApplicationPriority(Priority newPriority, - ApplicationId applicationId) throws YarnException { + public Priority updateApplicationPriority(Priority newPriority, + ApplicationId applicationId, SettableFuture future) + throws YarnException { Priority appPriority = null; SchedulerApplication application = applications .get(applicationId); @@ -2417,38 +2420,35 @@ public void updateApplicationPriority(Priority newPriority, } RMApp rmApp = rmContext.getRMApps().get(applicationId); + appPriority = checkAndGetApplicationPriority(newPriority, rmApp.getUser(), rmApp.getQueue(), applicationId); if (application.getPriority().equals(appPriority)) { - return; + return appPriority; } - // Update new priority in Submission Context to keep track in HA + // Update new priority in Submission Context to update to StateStore. rmApp.getApplicationSubmissionContext().setPriority(appPriority); // Update to state store - ApplicationStateData appState = - ApplicationStateData.newInstance(rmApp.getSubmitTime(), - rmApp.getStartTime(), rmApp.getApplicationSubmissionContext(), - rmApp.getUser(), rmApp.getCallerContext()); + ApplicationStateData appState = ApplicationStateData.newInstance( + rmApp.getSubmitTime(), rmApp.getStartTime(), + rmApp.getApplicationSubmissionContext(), rmApp.getUser(), + rmApp.getCallerContext()); appState.setApplicationTimeouts(rmApp.getApplicationTimeouts()); rmContext.getStateStore().updateApplicationStateSynchronously(appState, - false, null); + false, future); // As we use iterator over a TreeSet for OrderingPolicy, once we change // priority then reinsert back to make order correct. LeafQueue queue = (LeafQueue) getQueue(rmApp.getQueue()); - queue.updateApplicationPriority(application, appPriority); - // Update the changed application state to timeline server - rmContext.getSystemMetricsPublisher().appUpdated(rmApp, - System.currentTimeMillis()); - LOG.info("Priority '" + appPriority + "' is updated in queue :" + rmApp.getQueue() + " for application: " + applicationId + " for the user: " + rmApp.getUser()); + return appPriority; } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java index 89d2f66..0a8d6fe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java @@ -43,10 +43,11 @@ public AppAddedSchedulerEvent(ApplicationId applicationId, String queue, } public AppAddedSchedulerEvent(String user, - ApplicationSubmissionContext submissionContext, boolean isAppRecovering) { + ApplicationSubmissionContext submissionContext, boolean isAppRecovering, + Priority appPriority) { this(submissionContext.getApplicationId(), submissionContext.getQueue(), user, isAppRecovering, submissionContext.getReservationID(), - submissionContext.getPriority()); + appPriority); } public AppAddedSchedulerEvent(ApplicationId applicationId, String queue, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java index 2c61339..852f937 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java @@ -1248,7 +1248,7 @@ public AppPriority getAppPriority(@Context HttpServletRequest hsr, AppPriority ret = new AppPriority(); ret.setPriority( - app.getApplicationSubmissionContext().getPriority().getPriority()); + app.getApplicationPriority().getPriority()); return ret; } @@ -1289,7 +1289,7 @@ public Response updateApplicationPriority(AppPriority targetPriority, "Trying to update priority an absent application " + appId); throw e; } - Priority priority = app.getApplicationSubmissionContext().getPriority(); + Priority priority = app.getApplicationPriority(); if (priority == null || priority.getPriority() != targetPriority.getPriority()) { return modifyApplicationPriority(app, callerUGI, @@ -1336,7 +1336,7 @@ public Void run() throws IOException, YarnException { } } AppPriority ret = new AppPriority( - app.getApplicationSubmissionContext().getPriority().getPriority()); + app.getApplicationPriority().getPriority()); return Response.status(Status.OK).entity(ret).build(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java index 42449b0..d0a3294 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java @@ -150,8 +150,8 @@ public AppInfo(ResourceManager rm, RMApp app, Boolean hasAccess, this.priority = 0; ApplicationSubmissionContext appSubmissionContext = app.getApplicationSubmissionContext(); - if (appSubmissionContext.getPriority() != null) { - this.priority = appSubmissionContext.getPriority() + if (app.getApplicationPriority() != null) { + this.priority = app.getApplicationPriority() .getPriority(); } this.progress = app.getProgress() * 100; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java index 93befcb..6bda6e4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java @@ -466,7 +466,7 @@ public void testPriorityInAllocatedResponse() throws Exception { // Change the priority of App1 to 8 Priority appPriority2 = Priority.newInstance(8); - cs.updateApplicationPriority(appPriority2, app1.getApplicationId()); + cs.updateApplicationPriority(appPriority2, app1.getApplicationId(), null); AllocateResponse response2 = am1.allocate(allocateRequest); Assert.assertEquals(appPriority2, response2.getApplicationPriority()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index e48d9d2..f82c8ab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -1675,8 +1675,7 @@ public void testUpdateApplicationPriorityRequest() throws Exception { RMApp app1 = rm.submitApp(1024, Priority.newInstance(appPriority)); Assert.assertEquals("Incorrect priority has been set to application", - appPriority, app1.getApplicationSubmissionContext().getPriority() - .getPriority()); + appPriority, app1.getApplicationPriority().getPriority()); appPriority = 11; ClientRMService rmService = rm.getClientRMService(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java index f21c7c0..9be52c6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -235,6 +236,16 @@ public CallerContext getCallerContext() { public Map getApplicationTimeouts() { throw new UnsupportedOperationException("Not supported yet."); } + + @Override + public Priority getApplicationPriority() { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public boolean isAppInCompletedStates() { + throw new UnsupportedOperationException("Not supported yet."); + } } public static RMApp newApplication(int i) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java index 2b1740f..55e93c1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java @@ -126,6 +126,7 @@ public void testPublishApplicationMetrics() throws Exception { .thenReturn(Collections.singletonList("java -Xmx1024m")); when(asc.getAMContainerSpec()).thenReturn(containerLaunchContext); when(app.getApplicationSubmissionContext()).thenReturn(asc); + when(app.getApplicationPriority()).thenReturn(Priority.newInstance(1)); metricsPublisher.appUpdated(app, 4L); } else { metricsPublisher.appUpdated(app, 4L); @@ -527,6 +528,7 @@ private static RMApp createRMApp(ApplicationId appId) { when(amReq.getNodeLabelExpression()).thenReturn("high-mem"); when(app.getAMResourceRequest()).thenReturn(amReq); when(app.getAmNodeLabelExpression()).thenCallRealMethod(); + when(app.getApplicationPriority()).thenReturn(Priority.newInstance(10)); when(app.getCallerContext()) .thenReturn(new CallerContext.Builder("context").build()); return app; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java index 3ab2501..2d40c91 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java @@ -362,6 +362,7 @@ private static RMApp createRMApp(ApplicationId appId) { when(appSubmissionContext.getPriority()) .thenReturn(Priority.newInstance(0)); + when(app.getApplicationPriority()).thenReturn(Priority.newInstance(10)); ContainerLaunchContext containerLaunchContext = mock(ContainerLaunchContext.class); when(containerLaunchContext.getCommands()) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index bbfa60f..48a60b5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -321,4 +321,14 @@ public void setCollectorAddr(String collectorAddr) { public Map getApplicationTimeouts() { throw new UnsupportedOperationException("Not supported yet."); } + + @Override + public Priority getApplicationPriority() { + return null; + } + + @Override + public boolean isAppInCompletedStates() { + return false; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java index 2a346f8..164ca20 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java @@ -344,7 +344,7 @@ public void testUpdatePriorityAtRuntime() throws Exception { // Change the priority of App1 to 8 Priority appPriority2 = Priority.newInstance(8); - cs.updateApplicationPriority(appPriority2, app1.getApplicationId()); + cs.updateApplicationPriority(appPriority2, app1.getApplicationId(), null); // get scheduler app FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications() @@ -378,7 +378,7 @@ public void testUpdateInvalidPriorityAtRuntime() throws Exception { // Change the priority of App1 to 15 Priority appPriority2 = Priority.newInstance(15); - cs.updateApplicationPriority(appPriority2, app1.getApplicationId()); + cs.updateApplicationPriority(appPriority2, app1.getApplicationId(), null); // get scheduler app FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications() @@ -428,7 +428,7 @@ public void testRMRestartWithChangeInPriority() throws Exception { // Change the priority of App1 to 8 Priority appPriority2 = Priority.newInstance(8); - cs.updateApplicationPriority(appPriority2, app1.getApplicationId()); + cs.updateApplicationPriority(appPriority2, app1.getApplicationId(), null); // let things settle down Thread.sleep(1000); @@ -449,8 +449,7 @@ public void testRMRestartWithChangeInPriority() throws Exception { .get(app1.getApplicationId()); // Verify whether priority 15 is reset to 10 - Assert.assertEquals(appPriority2, loadedApp.getCurrentAppAttempt() - .getSubmissionContext().getPriority()); + Assert.assertEquals(appPriority2, loadedApp.getApplicationPriority()); rm2.stop(); rm1.stop(); @@ -558,7 +557,7 @@ public void testApplicationPriorityAllocationWithChangeInPriority() // Change the priority of App1 to 3 (lowest) Priority appPriority3 = Priority.newInstance(3); - cs.updateApplicationPriority(appPriority3, app2.getApplicationId()); + cs.updateApplicationPriority(appPriority3, app2.getApplicationId(), null); // add request for containers App2 am2.allocate("127.0.0.1", 2 * GB, 3, new ArrayList()); @@ -790,7 +789,7 @@ private void killAppAndVerifyOrderingPolicy(MockRM rm, CSQueue defaultQueue, throws YarnException { CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); cs.updateApplicationPriority(Priority.newInstance(2), - app.getApplicationId()); + app.getApplicationId(), null); SchedulerEvent removeAttempt; removeAttempt = new AppAttemptRemovedSchedulerEvent( app.getCurrentAppAttempt().getAppAttemptId(), RMAppAttemptState.KILLED, -- 2.7.4 (Apple Git-66)