diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java index 03fc40e..b282c80 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.prioritylabels.RMPriorityLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; @@ -95,6 +96,7 @@ private RMApplicationHistoryWriter rmApplicationHistoryWriter; private SystemMetricsPublisher systemMetricsPublisher; private RMNodeLabelsManager nodeLabelManager; + private RMPriorityLabelsManager priorityLabelManager; private long epoch; private Clock systemClock = new SystemClock(); private long schedulerRecoveryStartTime = 0; @@ -420,6 +422,18 @@ public void setNodeLabelManager(RMNodeLabelsManager mgr) { @Private @Unstable + public RMPriorityLabelsManager getPriorityLabelManager() { + return priorityLabelManager; + } + + @Private + @Unstable + public void setPriorityLabelManager(RMPriorityLabelsManager mgr) { + priorityLabelManager = mgr; + } + + @Private + @Unstable public void setSchedulerRecoveryStartAndWaitTime(long waitTime) { this.schedulerRecoveryStartTime = systemClock.getTime(); this.schedulerRecoveryWaitTime = waitTime; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index f38e128..5cb341b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; @@ -325,6 +326,16 @@ private RMAppImpl createAndPopulateNewRMApp( throws YarnException { ApplicationId applicationId = submissionContext.getApplicationId(); ResourceRequest amReq = validateAndCreateResourceRequest(submissionContext); + Priority appPriority = null; + if (null != submissionContext.getApplicationPriority()) { + try { + appPriority = Priority.newInstance(rmContext + .getPriorityLabelManager().getIntegerPriorityLabelMapping( + submissionContext.getApplicationPriority())); + } catch (IOException e) { + throw RPCUtil.getRemoteException(e.getMessage()); + } + } // Create RMApp RMAppImpl application = new RMAppImpl(applicationId, rmContext, this.conf, @@ -332,7 +343,8 @@ private RMAppImpl createAndPopulateNewRMApp( submissionContext.getQueue(), submissionContext, this.scheduler, this.masterService, submitTime, submissionContext.getApplicationType(), - submissionContext.getApplicationTags(), amReq); + submissionContext.getApplicationTags(), amReq, + appPriority); // Concurrent app submissions with same applicationId will fail here // Concurrent app submissions with different applicationIds will not diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index ecf6166..75bbb19 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.prioritylabels.RMPriorityLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -116,6 +117,10 @@ void setRMApplicationHistoryWriter( RMNodeLabelsManager getNodeLabelManager(); public void setNodeLabelManager(RMNodeLabelsManager mgr); + + RMPriorityLabelsManager getPriorityLabelManager(); + + public void setPriorityLabelManager(RMPriorityLabelsManager mgr); long getEpoch(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index 1d0d6c0..7162c61 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.prioritylabels.RMPriorityLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -399,6 +400,16 @@ public RMNodeLabelsManager getNodeLabelManager() { public void setNodeLabelManager(RMNodeLabelsManager mgr) { activeServiceContext.setNodeLabelManager(mgr); } + + @Override + public RMPriorityLabelsManager getPriorityLabelManager() { + return activeServiceContext.getPriorityLabelManager(); + } + + @Override + public void setPriorityLabelManager(RMPriorityLabelsManager mgr) { + activeServiceContext.setPriorityLabelManager(mgr); + } public void setSchedulerRecoveryStartAndWaitTime(long waitTime) { activeServiceContext.setSchedulerRecoveryStartAndWaitTime(waitTime); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index cceee2b..7dac5fb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; import com.google.common.annotations.VisibleForTesting; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -61,6 +62,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.MemoryRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.prioritylabels.MemoryRMPriorityLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.prioritylabels.RMPriorityLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; @@ -340,6 +343,14 @@ protected RMNodeLabelsManager createNodeLabelManager() return nlmCls.newInstance(); } + protected RMPriorityLabelsManager createPriorityLabelManager() + throws InstantiationException, IllegalAccessException { + Class plmCls = conf.getClass( + YarnConfiguration.RM_PRIORITY_LABELS_MANAGER_CLASS, + MemoryRMPriorityLabelsManager.class, RMPriorityLabelsManager.class); + return plmCls.newInstance(); + } + protected DelegationTokenRenewer createDelegationTokenRenewer() { return new DelegationTokenRenewer(); } @@ -429,6 +440,11 @@ protected void serviceInit(Configuration configuration) throws Exception { nlm.setRMContext(rmContext); addService(nlm); rmContext.setNodeLabelManager(nlm); + + RMPriorityLabelsManager prManager = createPriorityLabelManager(); + prManager.setRMContext(rmContext); + addService(prManager); + rmContext.setPriorityLabelManager(prManager); boolean isRecoveryEnabled = conf.getBoolean( YarnConfiguration.RECOVERY_ENABLED, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index fbcaab9..1a2ad7b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; @@ -242,4 +243,10 @@ ApplicationReport createAndGetApplicationReport(String clientUserName, ReservationId getReservationId(); ResourceRequest getAMResourceRequest(); + + /** + * Returns the application priority + * @return the application priority. + */ + Priority getApplicationPriority(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 2d1737a..7e1afb3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -123,6 +124,7 @@ private final Set updatedNodes = new HashSet(); private final String applicationType; private final Set applicationTags; + private final Priority appPriority; private final long attemptFailuresValidityInterval; @@ -362,7 +364,7 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, ApplicationSubmissionContext submissionContext, YarnScheduler scheduler, ApplicationMasterService masterService, long submitTime, String applicationType, Set applicationTags, - ResourceRequest amReq) { + ResourceRequest amReq, Priority appPriority) { this.systemClock = new SystemClock(); @@ -382,6 +384,7 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, this.applicationType = applicationType; this.applicationTags = applicationTags; this.amReq = amReq; + this.appPriority = appPriority; int globalMaxAppAttempts = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); @@ -872,7 +875,7 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { if (app.attempts.isEmpty()) { app.scheduler.handle(new AppAddedSchedulerEvent(app.applicationId, app.submissionContext.getQueue(), app.user, - app.submissionContext.getReservationID())); + app.submissionContext.getReservationID(), app.getApplicationPriority())); return RMAppState.SUBMITTED; } @@ -880,7 +883,7 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { // knows applications before AM or NM re-registers. app.scheduler.handle(new AppAddedSchedulerEvent(app.applicationId, app.submissionContext.getQueue(), app.user, true, - app.submissionContext.getReservationID())); + app.submissionContext.getReservationID(), app.getApplicationPriority())); // recover attempts app.recoverAppAttempts(); @@ -908,7 +911,7 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { public void transition(RMAppImpl app, RMAppEvent event) { app.handler.handle(new AppAddedSchedulerEvent(app.applicationId, app.submissionContext.getQueue(), app.user, - app.submissionContext.getReservationID())); + app.submissionContext.getReservationID(), app.getApplicationPriority())); } } @@ -1356,4 +1359,9 @@ protected Credentials parseCredentials() throws IOException { } return credentials; } + + @Override + public Priority getApplicationPriority() { + return this.appPriority; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java index a54e4bf..2ad5880 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ReservationId; public class AppAddedSchedulerEvent extends SchedulerEvent { @@ -28,25 +29,28 @@ private final String user; private final ReservationId reservationID; private final boolean isAppRecovering; - - public AppAddedSchedulerEvent( - ApplicationId applicationId, String queue, String user) { - this(applicationId, queue, user, false, null); + private final Priority appPriority; + + public AppAddedSchedulerEvent(ApplicationId applicationId, String queue, + String user) { + this(applicationId, queue, user, false, null, null); } public AppAddedSchedulerEvent(ApplicationId applicationId, String queue, - String user, ReservationId reservationID) { - this(applicationId, queue, user, false, reservationID); + String user, ReservationId reservationID, Priority appPriority) { + this(applicationId, queue, user, false, reservationID, appPriority); } public AppAddedSchedulerEvent(ApplicationId applicationId, String queue, - String user, boolean isAppRecovering, ReservationId reservationID) { + String user, boolean isAppRecovering, ReservationId reservationID, + Priority appPriority) { super(SchedulerEventType.APP_ADDED); this.applicationId = applicationId; this.queue = queue; this.user = user; this.reservationID = reservationID; this.isAppRecovering = isAppRecovering; + this.appPriority = appPriority; } public ApplicationId getApplicationId() { @@ -64,6 +68,10 @@ public String getUser() { public boolean getIsAppRecovering() { return isAppRecovering; } + + public Priority getAppPriority() { + return appPriority; + } public ReservationId getReservationID() { return reservationID; -- 1.9.4.msysgit.1