diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index ff06eea..49b55b4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1694,6 +1694,15 @@ private static void addDeprecatedKeys() { "2000, 500"; /** + * Application Priority configurations + */ + public static final String APPLICATION_PRIORITY_PREFIX = YARN_PREFIX + + "application-priority."; + + public static final String YARN_CLUSTER_APP_PRIORITY = APPLICATION_PRIORITY_PREFIX + + "collection"; + + /** * Flag to indicate if the node labels feature enabled, by default it's * disabled */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationpriority/ApplicationPriorityManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationpriority/ApplicationPriorityManager.java new file mode 100644 index 0000000..697725f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationpriority/ApplicationPriorityManager.java @@ -0,0 +1,443 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.applicationpriority; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; + +public class ApplicationPriorityManager extends AbstractService { + + protected static final Log LOG = LogFactory + .getLog(ApplicationPriorityManager.class); + public static final String EMPTY_QUEUE_NAME = ""; + public static final Integer DEFAULT_PRIORITY = 0; + + protected Set clusterPriorityCollections = new HashSet(); + + protected final ReadLock readLock; + protected final WriteLock writeLock; + + private static Integer MAX_PRIORITY_VALUE = 65535; + private static Integer MIN_PRIORITY_VALUE = -65535; + + private RMContext rmContext = null; + + // Map to queueName vs Queue object which has priority labels + protected ConcurrentMap queueCollections = new ConcurrentHashMap(); + + public ApplicationPriorityManager() { + super(ApplicationPriorityManager.class.getName()); + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + readLock = lock.readLock(); + writeLock = lock.writeLock(); + + // Adding DEFAULT_PRIORITY by default to cluster set to avoid default + // issues. + clusterPriorityCollections.add(Priority.newInstance(DEFAULT_PRIORITY)); + } + + protected static class Queue { + private String queueName; + public ApplicationPriorityPerQueue applicationPriorityPerQueue; + + protected Queue() { + this.setQueueName(EMPTY_QUEUE_NAME); + this.applicationPriorityPerQueue = null; + } + + public Queue(String queueName, + ApplicationPriorityPerQueue applicationPriorityPerQueue) { + this.setQueueName(queueName); + this.applicationPriorityPerQueue = applicationPriorityPerQueue; + } + + public String getQueueName() { + return queueName; + } + + public void setQueueName(String queueName) { + this.queueName = queueName; + } + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + + // config file can have cluster priority label config, read and update. + String[] configPrLabels = conf + .getTrimmedStrings(YarnConfiguration.YARN_CLUSTER_APP_PRIORITY); + + getPriorityFromConfig(configPrLabels); + } + + private void getPriorityFromConfig(String[] configPrLabels) + throws IOException { + if (configPrLabels == null || configPrLabels.length == 0) { + LOG.info("Empty configuration for application priority collection in clustger level"); + return; + } + Set priorityList = new HashSet<>(); + try { + for (String priority : configPrLabels) { + priorityList.add(Integer.parseInt(priority)); + } + } catch (NumberFormatException e) { + LOG.error("Invalid priority configuration"); + return; + } + + if (!priorityList.isEmpty()) { + addToClusterApplicationPriorities(priorityList); + } + } + + public void authenticateApplicationPriority(Priority priority, String user, + String queueName, ApplicationId applicationId) throws IOException { + try { + readLock.lock(); + + // verify whether 'priority' is present in cluster + if (!clusterPriorityCollections.contains(priority)) { + throw new IOException("Priority =" + priority.getPriority() + + " is not present in cluster level"); + } + + Queue queue = queueCollections.get(queueName); + if (null == queue) { + throw new IOException("Invalid queue name " + queueName); + } + + // Verify whether submitted priority is lesser than max priority + // in the queue. + if (priority.getPriority() > queue.applicationPriorityPerQueue + .getMaxApplicationPriority()) { + throw new IOException("Invalid priority as Queue: " + queueName + + " cannot support more than priority '" + + queue.applicationPriorityPerQueue.getMaxApplicationPriority() + + "'"); + } + + LOG.info("Submitted priority '" + priority.getPriority() + + "' is acceptable in queue :" + queueName + "for application:" + + applicationId); + } finally { + readLock.unlock(); + } + } + + public Integer getDefaultApplicationPriorityFromQueue(String queueName) { + try { + readLock.lock(); + Queue currentQueue = null; + if (this.queueCollections.containsKey(queueName)) { + currentQueue = this.queueCollections.get(queueName); + } else { + return DEFAULT_PRIORITY; + } + return currentQueue.applicationPriorityPerQueue + .getDefaultApplicationPriority(); + } finally { + readLock.unlock(); + } + } + + public RMContext getRMContext() { + return this.rmContext; + } + + public void setRMContext(RMContext rmContext) { + this.rmContext = rmContext; + } + + /** + * Add cluster level priority to store + * + * @param priorityToAdd + * new application priority + */ + public void addToClusterApplicationPriorities(Set priorityToAdd) + throws IOException { + if (null == priorityToAdd || priorityToAdd.isEmpty()) { + return; + } + + try { + writeLock.lock(); + Set priorityList = normalizeApplicationPriorities(priorityToAdd); + + for (Priority priority : priorityList) { + this.clusterPriorityCollections.add(priority); + } + + LOG.info("Add to cluster priority: [" + + StringUtils.join(priorityToAdd.iterator(), ",") + "]"); + } finally { + writeLock.unlock(); + } + } + + protected void verifyAppPrioritiesToRemove(Collection prToRemove) + throws IOException { + if (null == prToRemove || prToRemove.isEmpty()) { + return; + } + + // Check if priority to remove doesn't exist or null/empty, will throw + // exception + for (Integer priority : prToRemove) { + if (priority == null) { + throw new IOException("Priority to be removed is null"); + } + + if (!isPriorityExistsInCluster(priority)) { + throw new IOException("Priority =" + priority + + " to be removed doesn't exist in cluster " + + "priority collection."); + } + + // check if any queue contains this label + for (Entry entry : queueCollections.entrySet()) { + String queueName = entry.getKey(); + ApplicationPriorityPerQueue queueLabels = entry.getValue().applicationPriorityPerQueue; + if (queueLabels.getDefaultApplicationPriority().equals(priority) + || queueLabels.getMaxApplicationPriority().equals(priority)) { + throw new IOException("Cannot remove priority label=" + priority + + ", because queue=" + queueName + " is using this label. " + + "Please remove label on queue before remove the label"); + } + } + } + } + + protected void internalRemoveFromClusterAppPriorityList( + Collection prToRemove) { + + // remove labels from queue labels collection + for (Integer priority : prToRemove) { + clusterPriorityCollections.remove(Priority.newInstance(priority)); + } + + LOG.info("Remove priority: [" + + StringUtils.join(prToRemove.iterator(), ",") + "]"); + } + + /** + * Remove from cluster priority list + * + * @param priorityToRemove + * app priority to remove + * @throws IOException + */ + public void removeFromClusterApplicationPriority( + Collection priorityToRemove) throws IOException { + try { + writeLock.lock(); + verifyAppPrioritiesToRemove(priorityToRemove); + internalRemoveFromClusterAppPriorityList(priorityToRemove); + } finally { + writeLock.unlock(); + } + } + + /** + * Set multiple priorities to Queue + * + * @param queueName + * , applicationPriorityPerQueue max and default priority + */ + public void setApplicationPriorityPerQueue(String queueName, + ApplicationPriorityPerQueue applicationPriorityPerQueue) + throws IOException { + if (null == applicationPriorityPerQueue) { + return; + } + + // Default priority also need to be in valid range + if (applicationPriorityPerQueue.getDefaultApplicationPriority() > applicationPriorityPerQueue + .getMaxApplicationPriority()) { + throw new IOException("Default Priority configured should be" + + "less than or equal to Max application priority '" + + applicationPriorityPerQueue.getMaxApplicationPriority()); + } + + // Validate whether configured priorities are present in Cluster too. + if (!validatePerQueueApplicationPriorityInCluster(applicationPriorityPerQueue)) { + throw new IOException("Application Priority=" + + applicationPriorityPerQueue + " to be set to Queue = '" + queueName + + "', doesn't exist in cluster " + "priority collection."); + } + + try { + writeLock.lock(); + + Queue currentQueue = null; + if (this.queueCollections.containsKey(queueName)) { + currentQueue = this.queueCollections.get(queueName); + currentQueue.applicationPriorityPerQueue = applicationPriorityPerQueue; + } else { + currentQueue = new Queue(queueName, applicationPriorityPerQueue); + this.queueCollections.put(queueName, currentQueue); + } + } finally { + writeLock.unlock(); + } + + LOG.info("Set priority labels to Queue: QueueName - " + queueName + + "Max Priority - " + + applicationPriorityPerQueue.getMaxApplicationPriority() + + ", Default Priority - " + + applicationPriorityPerQueue.getDefaultApplicationPriority()); + } + + /** + * Get existing valid labels in cluster + * + * @return existing valid labels in cluster + */ + public Set getClusterPriorities() { + try { + readLock.lock(); + Set priorities = new HashSet( + clusterPriorityCollections); + priorities.remove(Priority.newInstance(DEFAULT_PRIORITY)); + return Collections.unmodifiableSet(getPrioritiesToSave(priorities)); + } finally { + readLock.unlock(); + } + } + + /** + * Get cluster priority from Queue + * + * @param queueName + * @return PriorityLabelsPerQueue + * @throws IOException + */ + public ApplicationPriorityPerQueue getApplicationPriorityFromQueue( + String queueName) throws IOException { + try { + readLock.lock(); + Queue currentQueue = null; + if (this.queueCollections.containsKey(queueName)) { + currentQueue = this.queueCollections.get(queueName); + } else { + throw new IOException("Priority labels for Queue: '" + queueName + + "' doesn't exist in cluster."); + } + return currentQueue.applicationPriorityPerQueue; + } finally { + readLock.unlock(); + } + } + + /** + * Get mapping of queue to priority + * + * @return queues to priority map + */ + public Map getApplicationPrioritiesPerQueue() { + try { + readLock.lock(); + Map queueToPriorities = new HashMap(); + for (Entry entry : queueCollections.entrySet()) { + String queueName = entry.getKey(); + Queue queue = entry.getValue(); + + queueToPriorities.put(queueName, queue.applicationPriorityPerQueue); + } + return Collections.unmodifiableMap(queueToPriorities); + } finally { + readLock.unlock(); + } + } + + private Set getPrioritiesToSave(Set priorityList) { + Set prioritySet = new HashSet(); + for (Priority priority : priorityList) { + prioritySet.add(priority.getPriority()); + } + return prioritySet; + } + + private Set normalizeApplicationPriorities( + Set applicationPriorities) { + Set priorityList = new HashSet(); + + for (Integer priority : applicationPriorities) { + if (priority < MIN_PRIORITY_VALUE || priority > MAX_PRIORITY_VALUE) { + continue; // skip invalid entries + } + priorityList.add(Priority.newInstance(priority)); + } + return priorityList; + } + + private boolean validatePerQueueApplicationPriorityInCluster( + ApplicationPriorityPerQueue applicationPriorityPerQueue) { + if (!isPriorityExistsInCluster(applicationPriorityPerQueue + .getDefaultApplicationPriority())) { + return false; + } + + if (!isPriorityExistsInCluster(applicationPriorityPerQueue + .getMaxApplicationPriority())) { + return false; + } + + return true; + } + + /** + * Verify whether priority exists in repository + * + * @return true or false + */ + public boolean isPriorityExistsInCluster(Integer priorityValue) { + try { + readLock.lock(); + Priority priority = Priority.newInstance(priorityValue); + return this.clusterPriorityCollections.contains(priority); + } finally { + readLock.unlock(); + } + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationpriority/ApplicationPriorityPerQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationpriority/ApplicationPriorityPerQueue.java new file mode 100644 index 0000000..c99b084 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationpriority/ApplicationPriorityPerQueue.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.applicationpriority; + + + +/** + *

PriorityLabelsPerQueue holds different types of priority labels in a queue.

+ * + *

It includes information such as: + *

    + *
  • Maximum Application Priority.
  • + *
  • Default Application Priority.
  • + *
+ *

+ * + */ +public class ApplicationPriorityPerQueue implements + Comparable { + + private Integer defaultAppPriority; + private Integer maxAppPriority; + + public static ApplicationPriorityPerQueue newInstance( + Integer maxPriorityLabel, Integer defaultPriorityLabel) { + ApplicationPriorityPerQueue perQueuePriorityLabels = new ApplicationPriorityPerQueue(); + perQueuePriorityLabels.setMaxApplicationPriority(maxPriorityLabel); + perQueuePriorityLabels.setDefaultApplicationPriority(defaultPriorityLabel); + return perQueuePriorityLabels; + } + + /** + * Get the max application priority of the queue. + * + * @return max application priority configured in the queue + */ + public Integer getMaxApplicationPriority() { + return maxAppPriority; + } + + public void setMaxApplicationPriority(Integer maxApplicationPriority) { + maxAppPriority = maxApplicationPriority; + } + + /** + * Get the default application priority of the queue. + * + * @return default application priority configured in the queue + */ + public Integer getDefaultApplicationPriority() { + return defaultAppPriority; + } + + public void setDefaultApplicationPriority(Integer defaultApplicationPriority) { + defaultAppPriority = defaultApplicationPriority; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + + ApplicationPriorityPerQueue other = (ApplicationPriorityPerQueue) obj; + if (!this.getDefaultApplicationPriority().equals( + other.getDefaultApplicationPriority())) + return false; + + if (!this.getMaxApplicationPriority().equals( + other.getMaxApplicationPriority())) + return false; + + return true; + } + + @Override + public String toString() { + return "Max application priority: " + this.getMaxApplicationPriority() + + ", " + "Default application priority: " + + this.getDefaultApplicationPriority(); + } + + @Override + public int hashCode() { + final int prime = 92821; + int result = 17; + result = result + * prime + + Math + .min(getMaxApplicationPriority(), getDefaultApplicationPriority()); + result = result + * prime + + Math + .max(getMaxApplicationPriority(), getDefaultApplicationPriority()); + return result; + } + + @Override + public int compareTo(ApplicationPriorityPerQueue priorityLabelsPerQueue) { + if (priorityLabelsPerQueue == null) { + return -1; + } + int defltLabelCompare = priorityLabelsPerQueue + .getDefaultApplicationPriority() - this.getDefaultApplicationPriority(); + if (defltLabelCompare == 0) { + return priorityLabelsPerQueue.getMaxApplicationPriority() + - this.getMaxApplicationPriority(); + } else { + return defltLabelCompare; + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationpriority/ApplicationPriorityTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationpriority/ApplicationPriorityTestBase.java new file mode 100644 index 0000000..cd752c3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationpriority/ApplicationPriorityTestBase.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.applicationpriority; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.junit.Assert; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; + +public class ApplicationPriorityTestBase { + public static void assertMapEquals( + Map m1, + ImmutableMap m2) { + Assert.assertEquals(m1.size(), m2.size()); + for (String k : m1.keySet()) { + Assert.assertTrue(m2.containsKey(k)); + m1.get(k).equals(m2.get(k)); + } + } + + public static void assertMapContains( + Map m1, + ImmutableMap m2) { + for (String k : m2.keySet()) { + Assert.assertTrue(m1.containsKey(k)); + m1.get(k).equals(m2.get(k)); + } + } + + public static void assertCollectionEquals(Collection c1, + Collection c2) { + Set s1 = new HashSet(c1); + Set s2 = new HashSet(c2); + Assert.assertEquals(s1, s2); + Assert.assertTrue(s1.containsAll(s2)); + } + + public static Set toSet(E... elements) { + Set set = Sets.newHashSet(elements); + return set; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationpriority/DummyApplicationPriorityManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationpriority/DummyApplicationPriorityManager.java new file mode 100644 index 0000000..94e426b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationpriority/DummyApplicationPriorityManager.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.applicationpriority; + +import java.io.IOException; +import java.util.Collection; +import java.util.Set; + +public class DummyApplicationPriorityManager + extends + ApplicationPriorityManager { + + public DummyApplicationPriorityManager() { + super(); + } + + Collection lastAddedPrioritylabels = null; + Collection lastRemovedPrioritylabels = null; + + @Override + public void addToClusterApplicationPriorities(Set priorityToAdd) + throws IOException { + lastAddedPrioritylabels = priorityToAdd; + super.addToClusterApplicationPriorities(priorityToAdd); + } + + @Override + public void removeFromClusterApplicationPriority( + Collection priorityToRemove) throws IOException { + lastRemovedPrioritylabels = priorityToRemove; + super.removeFromClusterApplicationPriority(priorityToRemove); + } + + @Override + protected void serviceStop() throws Exception { + super.serviceStop(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationpriority/TestApplicationPriorityManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationpriority/TestApplicationPriorityManager.java new file mode 100644 index 0000000..ba054e0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationpriority/TestApplicationPriorityManager.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.applicationpriority; + +import java.io.IOException; +import java.util.Arrays; + +import org.apache.hadoop.conf.Configuration; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; + +public class TestApplicationPriorityManager + extends + ApplicationPriorityTestBase { + DummyApplicationPriorityManager mgr = null; + + @Before + public void before() { + mgr = new DummyApplicationPriorityManager(); + mgr.init(new Configuration()); + mgr.start(); + } + + @After + public void after() { + mgr.stop(); + } + + @Test + public void testAddRemovePrioritylabel() throws Exception { + // mgr.setMaxPriorityValue(-1); + // Add some label + mgr.addToClusterApplicationPriorities(ImmutableSet.of(1)); + assertCollectionEquals(mgr.lastAddedPrioritylabels, Arrays.asList(1)); + + // Add a label w/o integer mapping, it should take incremented value from + // last updated priority integer value. + mgr.addToClusterApplicationPriorities(ImmutableSet.of(3)); + mgr.addToClusterApplicationPriorities(toSet(5, 7)); + assertCollectionEquals(mgr.lastAddedPrioritylabels, Sets.newHashSet(5, 7)); + + Assert.assertTrue(mgr.getClusterPriorities().containsAll( + Sets.newHashSet(1, 3, 5, 7))); + + // try to remove null, empty and non-existed label, should fail + for (Integer p : Arrays.asList(null, 9)) { + boolean caught = false; + try { + mgr.removeFromClusterApplicationPriority(Arrays.asList(p)); + } catch (IOException e) { + caught = true; + } + Assert.assertTrue("remove priority should fail " + + "when label is null/empty/non-existed", caught); + } + + // Remove some label + mgr.removeFromClusterApplicationPriority(Arrays.asList(3)); + assertCollectionEquals(mgr.lastRemovedPrioritylabels, Arrays.asList(3)); + Assert.assertTrue(mgr.getClusterPriorities().containsAll( + Arrays.asList(1, 5, 7))); + + mgr.removeFromClusterApplicationPriority(Arrays.asList(1, 5, 7)); + Assert.assertTrue(mgr.lastRemovedPrioritylabels.containsAll(Sets + .newHashSet(1, 5, 7))); + Assert.assertTrue(mgr.getClusterPriorities().isEmpty()); + } + + @Test + public void testAddandSetPrioritylabel() throws Exception { + // Add some label + mgr.addToClusterApplicationPriorities(ImmutableSet.of(1)); + assertCollectionEquals(mgr.lastAddedPrioritylabels, Arrays.asList(1)); + + mgr.addToClusterApplicationPriorities(toSet(2, 3)); + assertCollectionEquals(mgr.lastAddedPrioritylabels, Sets.newHashSet(2, 3)); + + Assert.assertTrue(mgr.getClusterPriorities().containsAll( + Sets.newHashSet(1, 2, 3))); + + // Set non-exiting labels to a queue + boolean caught = false; + try { + mgr.setApplicationPriorityPerQueue("queueA", + ApplicationPriorityPerQueue.newInstance(4, 1)); + } catch (IOException e) { + caught = true; + } + Assert.assertTrue("Cannot set label to a Queue " + + "when label is non-existing in Queue", caught); + + // Set some labels to a queue + mgr.setApplicationPriorityPerQueue("queueA", + ApplicationPriorityPerQueue.newInstance(2, 1)); + + Assert.assertTrue(mgr.getApplicationPriorityFromQueue("queueA").equals( + ApplicationPriorityPerQueue.newInstance(2, 1))); + } + + + @Test(timeout = 5000) + public void testRemovePrioritylabelAddedToQueue() throws Exception { + mgr.addToClusterApplicationPriorities(toSet(1, 2, 3)); + mgr.setApplicationPriorityPerQueue("queueA", + ApplicationPriorityPerQueue.newInstance(2, 1)); + mgr.setApplicationPriorityPerQueue("queueB", + ApplicationPriorityPerQueue.newInstance(3, 1)); + + boolean caught = false; + try { + mgr.removeFromClusterApplicationPriority(ImmutableSet.of(2)); + } catch (IOException e) { + caught = true; + } + Assert.assertTrue("Trying to remove priority from cluster but " + + "label exists in a queue, should fail", caught); + + mgr.setApplicationPriorityPerQueue("queueA", + ApplicationPriorityPerQueue.newInstance(1, 3)); + + // Now removal should be a success + mgr.removeFromClusterApplicationPriority(ImmutableSet.of(2)); + assertMapEquals(mgr.getApplicationPrioritiesPerQueue(), ImmutableMap.of( + "queueB", ApplicationPriorityPerQueue.newInstance(3, 1), "queueA", + ApplicationPriorityPerQueue.newInstance(1, 3))); + assertCollectionEquals(mgr.lastRemovedPrioritylabels, Arrays.asList(2)); + } +} \ No newline at end of file -- 1.9.4.msysgit.1