From cd0e743797e83d957cb0cdf71f8ae9fa6fae6d0c Mon Sep 17 00:00:00 2001 From: Sunil G Date: Thu, 5 Mar 2015 21:10:28 +0530 Subject: [PATCH] YARN-2004 --- .../scheduler/SchedulerApplication.java | 18 ++++++++- .../scheduler/SchedulerApplicationAttempt.java | 9 +++++ .../scheduler/capacity/CapacityScheduler.java | 16 ++++++-- .../capacity/CapacitySchedulerConfiguration.java | 22 +++++++++++ .../scheduler/capacity/LeafQueue.java | 33 +++++++++++++++- .../hadoop/yarn/server/resourcemanager/MockRM.java | 44 ++++++++++++++++++++-- 6 files changed, 133 insertions(+), 9 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java index 2c788aa..ce9787c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java @@ -19,6 +19,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; @Private @@ -28,16 +29,24 @@ private Queue queue; private final String user; private T currentAttempt; + private Priority priority; public SchedulerApplication(Queue queue, String user) { this.queue = queue; this.user = user; + this.priority = null; + } + + public SchedulerApplication(Queue queue, String user, Priority priority) { + this.queue = queue; + this.user = user; + this.priority = priority; } public Queue getQueue() { return queue; } - + public void setQueue(Queue queue) { this.queue = queue; } @@ -58,4 +67,11 @@ public void stop(RMAppState rmAppFinalState) { queue.getMetrics().finishApp(user, rmAppFinalState); } + public Priority getPriority() { + return priority; + } + + public void setPriority(Priority priority) { + this.priority = priority; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index ed78097..bf3275d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -94,6 +94,7 @@ private boolean unmanagedAM = true; private boolean amRunning = false; private LogAggregationContext logAggregationContext; + private Priority appPriority = null; protected List newlyAllocatedContainers = new ArrayList(); @@ -632,4 +633,12 @@ public void incNumAllocatedContainers(NodeType containerType, requestType); } } + + public Priority getApplicationPriority() { + return appPriority; + } + + public void setApplicationPriority(Priority appPriority) { + this.appPriority = appPriority; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 28ce264..bcca9e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; @@ -148,6 +149,11 @@ public int compare(CSQueue q1, CSQueue q2) { new Comparator() { @Override public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) { + if (a1.getApplicationPriority() != null + && !a1.getApplicationPriority().equals(a2.getApplicationPriority())) { + return a1.getApplicationPriority().compareTo( + a2.getApplicationPriority()); + } return a1.getApplicationId().compareTo(a2.getApplicationId()); } }; @@ -668,7 +674,7 @@ else if (mapping.queue.equals(PRIMARY_GROUP_MAPPING)) { } private synchronized void addApplication(ApplicationId applicationId, - String queueName, String user, boolean isAppRecovering) { + String queueName, String user, boolean isAppRecovering, Priority priority) { if (mappings != null && mappings.size() > 0) { try { @@ -737,7 +743,7 @@ private synchronized void addApplication(ApplicationId applicationId, // update the metrics queue.getMetrics().submitApp(user); SchedulerApplication application = - new SchedulerApplication(queue, user); + new SchedulerApplication(queue, user, priority); applications.put(applicationId, application); LOG.info("Accepted application " + applicationId + " from user: " + user + ", in queue: " + queueName); @@ -767,6 +773,9 @@ private synchronized void addApplicationAttempt( .getCurrentAppAttempt()); } application.setCurrentAppAttempt(attempt); + if (application.getPriority() != null) { + attempt.setApplicationPriority(application.getPriority()); + } queue.submitApplicationAttempt(attempt, application.getUser()); LOG.info("Added Application Attempt " + applicationAttemptId @@ -1158,7 +1167,8 @@ public void handle(SchedulerEvent event) { addApplication(appAddedEvent.getApplicationId(), queueName, appAddedEvent.getUser(), - appAddedEvent.getIsAppRecovering()); + appAddedEvent.getIsAppRecovering(), + appAddedEvent.getApplicatonPriority()); } } break; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 102e553..b16f324 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -191,6 +191,15 @@ @Private public static final String QUEUE_PREEMPTION_DISABLED = "disable_preemption"; + + @Private + public static final String MAX_APPLICATION_PRIORITY = "max_application_priority"; + + @Private + public static final String DEFAULT_APPLICATION_PRIORITY = "default_application_priority"; + + @Private + public static final Integer DEFAULT_CONFIGURATION_APPLICATION_PRIORITY = -1; @Private public static class QueueMapping { @@ -859,4 +868,17 @@ public boolean getPreemptionDisabled(String queue, boolean defaultVal) { defaultVal); return preemptionDisabled; } + + public Integer getMaxApplicationPriorityConfPerQueue(String queue) { + Integer maxPriority = getInt(getQueuePrefix(queue) + + MAX_APPLICATION_PRIORITY, DEFAULT_CONFIGURATION_APPLICATION_PRIORITY); + return maxPriority; + } + + public Integer getDefaultApplicationPriorityConfPerQueue(String queue) { + Integer dfltPriority = getInt(getQueuePrefix(queue) + + DEFAULT_APPLICATION_PRIORITY, + DEFAULT_CONFIGURATION_APPLICATION_PRIORITY); + return dfltPriority; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index a607a62..8d61ceb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.security.AccessType; +import org.apache.hadoop.yarn.server.resourcemanager.applicationpriority.ApplicationPriorityPerQueue; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; @@ -93,6 +94,9 @@ private float maxAMResourcePerQueuePercent; private int nodeLocalityDelay; + + private Integer maxAppPriorityPerQueue; + private Integer dfltAppPriorityPerQueue; Set activeApplications; Map applicationAttemptMap = @@ -209,6 +213,23 @@ protected synchronized void setupQueueConfigs(Resource clusterResource) labelStrBuilder.append(","); } } + + maxAppPriorityPerQueue = conf + .getMaxApplicationPriorityConfPerQueue(getQueuePath()); + dfltAppPriorityPerQueue = conf + .getDefaultApplicationPriorityConfPerQueue(getQueuePath()); + + // Set the priority level configuration of a queue to ApplicationPriority + // Manager + if (maxAppPriorityPerQueue != -1 || dfltAppPriorityPerQueue != -1) { + this.scheduler + .getRMContext() + .getApplicationPriorityManager() + .setApplicationPriorityPerQueue( + this.queueName, + ApplicationPriorityPerQueue.newInstance(maxAppPriorityPerQueue, + dfltAppPriorityPerQueue)); + } LOG.info("Initializing " + queueName + "\n" + "capacity = " + queueCapacities.getCapacity() + @@ -255,7 +276,9 @@ protected synchronized void setupQueueConfigs(Resource clusterResource) "nodeLocalityDelay = " + nodeLocalityDelay + "\n" + "reservationsContinueLooking = " + reservationsContinueLooking + "\n" + - "preemptionDisabled = " + getPreemptionDisabled() + "\n"); + "preemptionDisabled = " + getPreemptionDisabled() + "\n" + + "maxAppPriorityPerQueue = " + maxAppPriorityPerQueue + "\n" + + "dfltAppPriorityPerQueue = " + dfltAppPriorityPerQueue); } @Override @@ -1981,6 +2004,14 @@ public void setMaxApplications(int maxApplications) { this.maxApplications = maxApplications; } + public Integer getMaxAppPriorityPerQueue() { + return maxAppPriorityPerQueue; + } + + public Integer getDefaultAppPriorityPerQueue() { + return dfltAppPriorityPerQueue; + } + /* * Holds shared values used by all applications in * the queue to calculate headroom on demand diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 06c6b32..72d2237 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -283,6 +284,15 @@ public RMApp submitApp(int masterMemory, String name, String user, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, waitForAccepted); } + + public RMApp submitApp(int masterMemory, String name, String user, + Map acls, String queue, + boolean waitForAccepted, Integer priority) throws Exception { + return submitApp(masterMemory, name, user, acls, false, queue, + super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, + waitForAccepted, false, priority); + } public RMApp submitApp(int masterMemory, String name, String user, Map acls, boolean unmanaged, String queue, @@ -313,7 +323,16 @@ public RMApp submitApp(int masterMemory, String name, String user, boolean waitForAccepted, boolean keepContainers) throws Exception { return submitApp(masterMemory, name, user, acls, unmanaged, queue, maxAppAttempts, ts, appType, waitForAccepted, keepContainers, - false, null, 0, null, true); + false, null, 0, null, true, -1); + } + + public RMApp submitApp(int masterMemory, String name, String user, + Map acls, boolean unmanaged, String queue, + int maxAppAttempts, Credentials ts, String appType, + boolean waitForAccepted, boolean keepContainers, Integer priority) throws Exception { + return submitApp(masterMemory, name, user, acls, unmanaged, queue, + maxAppAttempts, ts, appType, waitForAccepted, keepContainers, + false, null, 0, null, true, priority); } public RMApp submitApp(int masterMemory, long attemptFailuresValidityInterval) @@ -322,7 +341,7 @@ public RMApp submitApp(int masterMemory, long attemptFailuresValidityInterval) .getShortUserName(), null, false, null, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false, - false, null, attemptFailuresValidityInterval, null, true); + false, null, attemptFailuresValidityInterval, null, true, -1); } public RMApp submitApp(int masterMemory, String name, String user, @@ -332,7 +351,7 @@ public RMApp submitApp(int masterMemory, String name, String user, ApplicationId applicationId) throws Exception { return submitApp(masterMemory, name, user, acls, unmanaged, queue, maxAppAttempts, ts, appType, waitForAccepted, keepContainers, - isAppIdProvided, applicationId, 0, null, true); + isAppIdProvided, applicationId, 0, null, true, -1); } public RMApp submitApp(int masterMemory, @@ -341,7 +360,7 @@ public RMApp submitApp(int masterMemory, .getShortUserName(), null, false, null, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false, - false, null, 0, logAggregationContext, true); + false, null, 0, logAggregationContext, true, -1); } public RMApp submitApp(int masterMemory, String name, String user, @@ -351,6 +370,22 @@ public RMApp submitApp(int masterMemory, String name, String user, ApplicationId applicationId, long attemptFailuresValidityInterval, LogAggregationContext logAggregationContext, boolean cancelTokensWhenComplete) throws Exception { + return submitApp(masterMemory, name, user, + acls, unmanaged, queue, + maxAppAttempts, ts, appType, + waitForAccepted, keepContainers, isAppIdProvided, + applicationId, attemptFailuresValidityInterval, + logAggregationContext, cancelTokensWhenComplete, -1); + } + + public RMApp submitApp(int masterMemory, String name, String user, + Map acls, boolean unmanaged, String queue, + int maxAppAttempts, Credentials ts, String appType, + boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided, + ApplicationId applicationId, long attemptFailuresValidityInterval, + LogAggregationContext logAggregationContext, boolean cancelTokensWhenComplete, + Integer priority) + throws Exception { ApplicationId appId = isAppIdProvided ? applicationId : null; ApplicationClientProtocol client = getClientRMService(); if (! isAppIdProvided) { @@ -373,6 +408,7 @@ public RMApp submitApp(int masterMemory, String name, String user, sub.setQueue(queue); } sub.setApplicationType(appType); + sub.setPriority(Priority.newInstance(priority)); ContainerLaunchContext clc = Records .newRecord(ContainerLaunchContext.class); final Resource capability = Records.newRecord(Resource.class); -- 1.9.4.msysgit.1