From 226649a2053817d69f7486cf759ed32be5a68676 Mon Sep 17 00:00:00 2001 From: Sunil G Date: Mon, 19 Jan 2015 23:34:44 +0530 Subject: [PATCH] YARN-2693 --- .../apache/hadoop/yarn/conf/YarnConfiguration.java | 11 + .../CommonPriorityLabelsManager.java | 706 +++++++++++++++++++++ .../FileSystemPriorityLabelsStore.java | 282 ++++++++ .../yarn/prioritylabels/PriorityLabelsStore.java | 69 ++ .../event/AddToClusterPriorityLabels.java | 34 + .../event/PriorityLabelsStoreEvent.java | 28 + .../event/PriorityLabelsStoreEventType.java | 25 + .../event/RemoveFromClusterPriorityLabels.java | 34 + .../event/SetPriorityLabelPerQueue.java | 42 ++ .../DummyCommonPriorityLabelsManager.java | 85 +++ .../yarn/prioritylabels/PriorityLabelTestBase.java | 62 ++ .../TestCommonPriorityLabelsManager.java | 243 +++++++ .../TestFileSystemPriorityLabelsStore.java | 221 +++++++ .../MemoryRMPriorityLabelsManager.java | 79 +++ .../prioritylabels/RMPriorityLabelsManager.java | 70 ++ 15 files changed, 1991 insertions(+) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/CommonPriorityLabelsManager.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/FileSystemPriorityLabelsStore.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/PriorityLabelsStore.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/event/AddToClusterPriorityLabels.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/event/PriorityLabelsStoreEvent.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/event/PriorityLabelsStoreEventType.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/event/RemoveFromClusterPriorityLabels.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/event/SetPriorityLabelPerQueue.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/prioritylabels/DummyCommonPriorityLabelsManager.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/prioritylabels/PriorityLabelTestBase.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/prioritylabels/TestCommonPriorityLabelsManager.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/prioritylabels/TestFileSystemPriorityLabelsStore.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/prioritylabels/MemoryRMPriorityLabelsManager.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/prioritylabels/RMPriorityLabelsManager.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 9ac5438..11983e7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1664,6 +1664,17 @@ private static void addDeprecatedKeys() { NODE_LABELS_PREFIX + "fs-store.retry-policy-spec"; public static final String DEFAULT_FS_NODE_LABELS_STORE_RETRY_POLICY_SPEC = "2000, 500"; + + public static final String PRIORITY_LABELS_PREFIX = YARN_PREFIX + + "priority-labels."; + + /** URI for PriorityLabelManager */ + public static final String FS_PRIORITY_LABELS_STORE_ROOT_DIR = PRIORITY_LABELS_PREFIX + + "fs-store.root-dir"; + public static final String FS_PRIORITY_LABELS_STORE_RETRY_POLICY_SPEC = + PRIORITY_LABELS_PREFIX + "fs-store.retry-policy-spec"; + public static final String DEFAULT_FS_PRIORITY_LABELS_STORE_RETRY_POLICY_SPEC = + "2000, 500"; public YarnConfiguration() { super(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/CommonPriorityLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/CommonPriorityLabelsManager.java new file mode 100644 index 0000000..942a4df --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/CommonPriorityLabelsManager.java @@ -0,0 +1,706 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.prioritylabels; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.ApplicationPriority; +import org.apache.hadoop.yarn.api.records.PriorityLabelsPerQueue; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.prioritylabels.event.AddToClusterPriorityLabels; +import org.apache.hadoop.yarn.prioritylabels.event.PriorityLabelsStoreEvent; +import org.apache.hadoop.yarn.prioritylabels.event.PriorityLabelsStoreEventType; +import org.apache.hadoop.yarn.prioritylabels.event.RemoveFromClusterPriorityLabels; +import org.apache.hadoop.yarn.prioritylabels.event.SetPriorityLabelPerQueue; + +import com.google.common.annotations.VisibleForTesting; + +public class CommonPriorityLabelsManager extends AbstractService { + + protected static final Log LOG = LogFactory + .getLog(CommonPriorityLabelsManager.class); + + public static final String EMPTY_PRIORITY_LABEL_NAME = ""; + public static final String EMPTY_QUEUE_NAME = ""; + public static final Integer EMPTY_PRIORITY_VALUE = -1; + public static final ApplicationPriority EMPTY_PRIORITY_LABEL = ApplicationPriority + .newInstance(EMPTY_PRIORITY_LABEL_NAME); + + protected Dispatcher dispatcher; + + protected ConcurrentMap clusterPriorityLabelCollections + = new ConcurrentHashMap(); + + // Map to queueName vs Queue object which has priority labels + protected ConcurrentMap queueCollections = new ConcurrentHashMap(); + + protected final ReadLock readLock; + protected final WriteLock writeLock; + + protected PriorityLabelsStore store; + private static Integer MAX_PRIORITY_VALUE = -1; + + protected static class PriorityLabel { + private String priorityLabelName; + private Integer priorityValue; + + protected PriorityLabel() { + this.setPriorityLabelName(EMPTY_PRIORITY_LABEL_NAME); + this.setPriorityValue(EMPTY_PRIORITY_VALUE); + } + + protected PriorityLabel(String labelName, Integer priorityValue) { + this.setPriorityLabelName(labelName); + this.setPriorityValue(priorityValue); + } + + public String getPriorityLabelName() { + return priorityLabelName; + } + + public void setPriorityLabelName(String priorityLabelName) { + this.priorityLabelName = priorityLabelName; + } + + public Integer getPriorityValue() { + return priorityValue; + } + + public void setPriorityValue(Integer priorityValue) { + this.priorityValue = priorityValue; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + + PriorityLabel other = (PriorityLabel) obj; + String priorityLabelName = getPriorityLabelName(); + if (priorityLabelName == null) { + if (other.getPriorityLabelName() != null) + return false; + } else if (!priorityLabelName.equals(other.getPriorityLabelName())) + return false; + + Integer priorityLabelValue = getPriorityValue(); + if (priorityLabelValue == null) { + if (other.getPriorityValue() != null) + return false; + } else if (!priorityLabelValue.equals(other.getPriorityValue())) + return false; + + return true; + } + + @Override + public int hashCode() { + return getPriorityLabelName().hashCode() + getPriorityValue(); + } + + @Override + public String toString() { + return getPriorityLabelName() + ":" + getPriorityValue(); + } + } + + protected static class Queue { + public String queueName; + public PriorityLabelsPerQueue priorityLabelPerQueue; + + protected Queue() { + this.queueName = EMPTY_QUEUE_NAME; + this.priorityLabelPerQueue = null; + } + + public Queue(String queueName, PriorityLabelsPerQueue priroityLabelPerQueue) { + this.queueName = queueName; + this.priorityLabelPerQueue = priroityLabelPerQueue; + } + } + + private final class ForwardingEventHandler implements + EventHandler { + + @Override + public void handle(PriorityLabelsStoreEvent event) { + if (isInState(STATE.STARTED)) { + handleStoreEvent(event); + } + } + } + + // Dispatcher related code + protected void handleStoreEvent(PriorityLabelsStoreEvent event) { + try { + switch (event.getType()) { + case SET_PRIORITY_LABEL_PER_QUEUE: + SetPriorityLabelPerQueue setPriorityLabelPerQueueEvent + = (SetPriorityLabelPerQueue) event; + store.setPriorityLabelsForQueue( + setPriorityLabelPerQueueEvent.getQueueName(), + setPriorityLabelPerQueueEvent.getPriorityLabelsPerQueue()); + break; + case ADD_TO_CLUSTER_PRIORITY_LABELS: + AddToClusterPriorityLabels addToClusterPriorityLabelsEvent + = (AddToClusterPriorityLabels) event; + store.addToClusterPriorityLabels(addToClusterPriorityLabelsEvent + .getPriorityLabels()); + break; + case REMOVE_FROM_CLUSTER_PRIORITY_LABELS: + RemoveFromClusterPriorityLabels removeFromClusterPriorityLabelsEvent + = (RemoveFromClusterPriorityLabels) event; + store + .removeFromClusterPriorityLabels(removeFromClusterPriorityLabelsEvent + .getPriorityLabels()); + break; + default: + break; + } + } catch (IOException e) { + LOG.error("Failed to store priority label modification to storage"); + throw new YarnRuntimeException(e); + } + } + + public CommonPriorityLabelsManager() { + super(CommonPriorityLabelsManager.class.getName()); + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + readLock = lock.readLock(); + writeLock = lock.writeLock(); + } + + // UT purpose + protected void initDispatcher(Configuration conf) { + dispatcher = new AsyncDispatcher(); + AsyncDispatcher asyncDispatcher = (AsyncDispatcher) dispatcher; + asyncDispatcher.init(conf); + asyncDispatcher.setDrainEventsOnStop(); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + initPriorityLabelStore(conf); + clusterPriorityLabelCollections.put(EMPTY_PRIORITY_LABEL, + new PriorityLabel()); + } + + protected void initPriorityLabelStore(Configuration conf) throws Exception { + this.store = new FileSystemPriorityLabelsStore(this); + this.store.init(conf); + this.store.recover(); + } + + // for UT purpose + protected void startDispatcher() { + // start dispatcher + AsyncDispatcher asyncDispatcher = (AsyncDispatcher) dispatcher; + asyncDispatcher.start(); + } + + @Override + protected void serviceStart() throws Exception { + // init dispatcher only when service start, because recover will happen in + // service init, we don't want to trigger any event handling at that time. + initDispatcher(getConfig()); + + if (null != dispatcher) { + dispatcher.register(PriorityLabelsStoreEventType.class, + new ForwardingEventHandler()); + } + + startDispatcher(); + } + + // for UT purpose + protected void stopDispatcher() { + AsyncDispatcher asyncDispatcher = (AsyncDispatcher) dispatcher; + asyncDispatcher.stop(); + } + + @Override + protected void serviceStop() throws Exception { + // finalize store + stopDispatcher(); + + // only close store when we enabled store persistent + if (null != store) { + store.close(); + } + } + + /** + * Add cluster level priority labels to store + * + * @param labels + * new priority labels + */ + @SuppressWarnings("unchecked") + public void addToClusterPriorityLabels(Set priorityLabels) + throws IOException { + if (null == priorityLabels || priorityLabels.isEmpty()) { + return; + } + + Set priorityLabelMapping = + processLabelsForIntegerMapping(priorityLabels); + + // Verify labels and throw exception if not met + checkForInvalidPriorityLabels(priorityLabelMapping); + + for (PriorityLabel label : priorityLabelMapping) { + this.clusterPriorityLabelCollections.put( + ApplicationPriority.newInstance(label.getPriorityLabelName()), label); + } + if (null != dispatcher) { + dispatcher.getEventHandler().handle( + new AddToClusterPriorityLabels(priorityLabels)); + } + + LOG.info("Add to cluster priority labels: [" + + StringUtils.join(priorityLabels.iterator(), ",") + "]"); + } + + private void checkForInvalidPriorityLabels(Set priorityLabelMapping) + throws IOException { + if (null == priorityLabelMapping || priorityLabelMapping.isEmpty()) { + return; + } + + // Verify whether any of the incoming labels has already a same + // integer mapping in cluster label set. + for (PriorityLabel prLabel : priorityLabelMapping) { + for (PriorityLabel valueLabel : this.clusterPriorityLabelCollections + .values()) { + if (prLabel.getPriorityValue().equals(valueLabel.getPriorityValue())) { + throw new IOException("Priority label=" + prLabel + + " already exists in cluster " + + "priority labels collection. [Same priority value]"); + } + } + } + } + + /** + * Set multiple priority labels to Queue + * + * @param queueName, priorityLabelsPerQueue + * new priority labels + */ + @SuppressWarnings("unchecked") + public void setPriorityLabelPerQueue(String queueName, + PriorityLabelsPerQueue priorityLabelsPerQueue) throws IOException { + if (null == priorityLabelsPerQueue) { + return; + } + + // Validate whether configured priority labels are present in Cluster too. + if(!validatePerQueuePriorityLabelInCluster(priorityLabelsPerQueue)){ + throw new IOException("Priority label=" + priorityLabelsPerQueue + + " to be set to Queue = '"+ queueName + + "' doesn't exist in cluster " + + "priority labels collection."); + } + + Queue currentQueue = null; + if (this.queueCollections.containsKey(queueName)) { + currentQueue = this.queueCollections.get(queueName); + currentQueue.priorityLabelPerQueue = priorityLabelsPerQueue; + } else { + currentQueue = new Queue(queueName, priorityLabelsPerQueue); + this.queueCollections.put(queueName, currentQueue); + } + + if (null != dispatcher) { + dispatcher.getEventHandler().handle( + new SetPriorityLabelPerQueue(queueName, + currentQueue.priorityLabelPerQueue)); + } + + LOG.info("Add priority labels to Queue: QueueName - " + queueName + + "Max Priority label - " + + priorityLabelsPerQueue.getMaxPriorityLabel() + + "Default Priority Label - " + + priorityLabelsPerQueue.getDefaultPriorityLabel()); + } + + protected void checkRemoveFromClusterPriorityLabels( + Collection prLabelsToRemove) throws IOException { + if (null == prLabelsToRemove || prLabelsToRemove.isEmpty()) { + return; + } + + // Check if label to remove doesn't exist or null/empty, will throw + // exception if any of labels to remove doesn't meet requirement + for (String label : prLabelsToRemove) { + label = normalizeLabel(label); + if (label == null || label.isEmpty()) { + throw new IOException("PriorityLabel to be removed is null or empty"); + } + + ApplicationPriority appPrLabel = ApplicationPriority.newInstance(label); + if (!clusterPriorityLabelCollections.containsKey(appPrLabel)) { + throw new IOException("Priority label=" + label + + " to be removed doesn't exist in cluster " + + "priority labels collection."); + } + + // check if any queue contains this label + for (Entry entry : queueCollections.entrySet()) { + String queueName = entry.getKey(); + PriorityLabelsPerQueue queueLabels = entry.getValue().priorityLabelPerQueue; + if (queueLabels.getDefaultPriorityLabel().equals(label) + || queueLabels.getMaxPriorityLabel().equals(label)) { + throw new IOException("Cannot remove priority label=" + label + + ", because queue=" + queueName + " is using this label. " + + "Please remove label on queue before remove the label"); + } + } + } + } + + @SuppressWarnings("unchecked") + protected void internalRemoveFromClusterPriorityLabels( + Collection prLabelsToRemove) { + + // remove labels from queue labels collection + for (String labelToRemove : prLabelsToRemove) { + labelToRemove = normalizeLabel(labelToRemove); + clusterPriorityLabelCollections.remove(ApplicationPriority + .newInstance(labelToRemove)); + } + + // create event to remove labels + if (null != dispatcher) { + dispatcher.getEventHandler().handle( + new RemoveFromClusterPriorityLabels(prLabelsToRemove)); + } + + LOG.info("Remove priority labels: [" + + StringUtils.join(prLabelsToRemove.iterator(), ",") + "]"); + } + + /** + * Remove from cluster priority labels + * + * @param labelsToRemove + * priority labels to remove + * @throws IOException + */ + public void removeFromClusterPriorityLabels(Collection labelsToRemove) + throws IOException { + try { + writeLock.lock(); + + checkRemoveFromClusterPriorityLabels(labelsToRemove); + internalRemoveFromClusterPriorityLabels(labelsToRemove); + } finally { + writeLock.unlock(); + } + } + + protected void checkReplacePriorityLabelsOnQueue( + Map replacePriorityLabelsOnQueue) + throws IOException { + if (null == replacePriorityLabelsOnQueue + || replacePriorityLabelsOnQueue.isEmpty()) { + return; + } + + // check all labels being added existed + Set knownLabels = clusterPriorityLabelCollections + .keySet(); + for (Entry entry : replacePriorityLabelsOnQueue + .entrySet()) { + if (!knownLabels.contains(ApplicationPriority.newInstance(entry + .getValue().getMaxPriorityLabel().trim())) + || !knownLabels.contains(ApplicationPriority.newInstance(entry + .getValue().getDefaultPriorityLabel().trim()))) { + String msg = "Not all labels being replaced contained by known " + + "priority label collections, please check"; + LOG.error(msg); + throw new IOException(msg); + } + } + } + + @SuppressWarnings("unchecked") + protected void internalReplacePriorityLabelsOnQueue( + Map replacePriorityLabelsOnQueue) + throws IOException { + // do replace labels to queues + for (Entry entry : replacePriorityLabelsOnQueue + .entrySet()) { + String queueName = entry.getKey(); + PriorityLabelsPerQueue priorityLabelsPerQueue = entry.getValue(); + + Queue queue = queueCollections.get(queueName); + if (queue == null) { + queue = new Queue(queueName, priorityLabelsPerQueue); + this.queueCollections.put(queueName, queue); + } + + queue.priorityLabelPerQueue = priorityLabelsPerQueue; + if (null != dispatcher) { + dispatcher.getEventHandler().handle( + new SetPriorityLabelPerQueue(queue.queueName, + queue.priorityLabelPerQueue)); + } + } + } + + /** + * replace labels to nodes + * + * @param replacePriorityLabelsOnQueue + * queue -> labels map + */ + public void replacePriorityLabelsOnQueue( + Map replacePriorityLabelsOnQueue) + throws IOException { + try { + writeLock.lock(); + checkReplacePriorityLabelsOnQueue(replacePriorityLabelsOnQueue); + + internalReplacePriorityLabelsOnQueue(replacePriorityLabelsOnQueue); + } finally { + writeLock.unlock(); + } + } + + /** + * Get cluster priority labels from Queue + * + * @param queueName + * @return PriorityLabelsPerQueue + * @throws IOException + */ + public PriorityLabelsPerQueue getPriorityLabelsFromQueue(String queueName) + throws IOException { + try { + readLock.lock(); + Queue currentQueue = null; + if (this.queueCollections.containsKey(queueName)) { + currentQueue = this.queueCollections.get(queueName); + } else { + throw new IOException("Priority labels for Queue: '" + queueName + + "' doesn't exist in cluster."); + } + return currentQueue.priorityLabelPerQueue; + } finally { + readLock.unlock(); + } + } + + /** + * Get mapping of nodes to labels + * + * @return queues to labels map + */ + public Map getPriorityLabels() { + try { + readLock.lock(); + Map queueToPriorityLabels + = new HashMap(); + for (Entry entry : queueCollections.entrySet()) { + String queueName = entry.getKey(); + Queue queue = entry.getValue(); + + queueToPriorityLabels.put(queueName, queue.priorityLabelPerQueue); + } + return Collections.unmodifiableMap(queueToPriorityLabels); + } finally { + readLock.unlock(); + } + } + + private boolean validatePerQueuePriorityLabelInCluster( + PriorityLabelsPerQueue priorityLabelsPerQueue) { + if (!isPriorityLabelExistsInCluster(priorityLabelsPerQueue + .getDefaultPriorityLabel())) { + return false; + } + + if (!isPriorityLabelExistsInCluster(priorityLabelsPerQueue + .getMaxPriorityLabel())) { + return false; + } + + return true; + } + + private Set getPriorityLabelsToSave( + Set priorityLabels) { + Set priorityLabelSet = new HashSet(); + for (ApplicationPriority priorityLabel : priorityLabels) { + priorityLabelSet.add(priorityLabel.getApplicationPriority()); + } + return priorityLabelSet; + } + + private Set getPriorityLabelsToSaveForRecovery( + Set priorityLabels) { + Set priorityLabelSet = new HashSet(); + for (ApplicationPriority priorityLabel : priorityLabels) { + priorityLabelSet + .add(priorityLabel.getApplicationPriority() + + ":" + + this.clusterPriorityLabelCollections.get(priorityLabel).priorityValue + .toString()); + } + return priorityLabelSet; + } + + /** + * Get existing valid labels from repository + * + * @return valid labels + */ + public PriorityLabel getPriorityLabelFromCluster(String labelName) { + try { + readLock.lock(); + ApplicationPriority priority = ApplicationPriority.newInstance(labelName); + return this.clusterPriorityLabelCollections.get(priority); + } finally { + readLock.unlock(); + } + } + + /** + * Verify valid priority labels in repository + * + * @return true or false + */ + public boolean isPriorityLabelExistsInCluster(String labelName) { + try { + readLock.lock(); + ApplicationPriority priority = ApplicationPriority.newInstance(labelName.trim()); + return this.clusterPriorityLabelCollections.containsKey(priority); + } finally { + readLock.unlock(); + } + } + + /** + * Get existing valid labels in cluster + * + * @return existing valid labels in cluster + */ + public Set getClusterPriorityLabels() { + try { + readLock.lock(); + Set labels = new HashSet( + clusterPriorityLabelCollections.keySet()); + labels.remove(EMPTY_PRIORITY_LABEL); + return Collections.unmodifiableSet(getPriorityLabelsToSave(labels)); + } finally { + readLock.unlock(); + } + } + + /** + * Get existing labels from cluster for recovery + * + * @return existing priority labels in cluster + */ + public Set getClusterPriorityLabelsForRecovery() { + try { + readLock.lock(); + Set labels = new HashSet( + clusterPriorityLabelCollections.keySet()); + labels.remove(EMPTY_PRIORITY_LABEL); + return Collections.unmodifiableSet(getPriorityLabelsToSaveForRecovery(labels)); + } finally { + readLock.unlock(); + } + } + + protected String normalizeLabel(String label) { + if (label != null) { + return label.trim(); + } + return EMPTY_PRIORITY_LABEL_NAME; + } + + protected Collection normalizePriorityLabelForRemoval( + Collection priorityLabels) { + List priorityLabelCollection = new ArrayList(); + for (String prLabel : priorityLabels) { + PriorityLabel currentLabel = getPriorityLabelFromCluster(prLabel.trim()); + if (currentLabel != null) { + priorityLabelCollection.add(currentLabel); + } + } + return Collections.unmodifiableList(priorityLabelCollection); + } + + private Set processLabelsForIntegerMapping( + Set priorityLabels) { + Set priorityLabelMapping = new HashSet(); + + for (String priorityLabel : priorityLabels) { + String[] splittedOp = StringUtils.split(priorityLabel, ':'); + if (null != splittedOp && splittedOp.length > 1) { + int priorityValue = Integer.parseInt(splittedOp[1]); + priorityLabelMapping.add(new PriorityLabel(splittedOp[0].trim(), + priorityValue)); + if (priorityValue > MAX_PRIORITY_VALUE) { + MAX_PRIORITY_VALUE = priorityValue; + } + } else if (splittedOp.length == 1) { + // get the max priority value, and increase by 1 + priorityLabelMapping.add(new PriorityLabel(splittedOp[0].trim(), + ++MAX_PRIORITY_VALUE)); + } + } + return priorityLabelMapping; + } + + + @VisibleForTesting + public void setMax_Priority_Value(Integer max_Priority_Value) { + MAX_PRIORITY_VALUE = max_Priority_Value; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/FileSystemPriorityLabelsStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/FileSystemPriorityLabelsStore.java new file mode 100644 index 0000000..7e0c546 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/FileSystemPriorityLabelsStore.java @@ -0,0 +1,282 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.prioritylabels; + +import java.io.EOFException; +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.PriorityLabelsPerQueue; +import org.apache.hadoop.yarn.api.records.impl.pb.PriorityLabelsPerQueuePBImpl; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.proto.YarnProtos.PriorityLabelsPerQueueProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddToClusterPriorityLabelsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RemoveFromClusterPriorityLabelsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ReplacePriorityLabelsOnQueueRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.SetPriorityLabelForQueueRequestProto; +import org.apache.hadoop.yarn.server.api.protocolrecords.SetPriorityLabelForQueueRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterPriorityLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReplacePriorityLabelsOnQueueRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterPriorityLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterPriorityLabelsRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RemoveFromClusterPriorityLabelsRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReplacePriorityLabelsOnQueueRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SetPriorityLabelForQueueRequestPBImpl; + +import com.google.common.collect.Sets; + +public class FileSystemPriorityLabelsStore extends PriorityLabelsStore { + + public FileSystemPriorityLabelsStore(CommonPriorityLabelsManager mgr) { + super(mgr); + } + + protected static final Log LOG = LogFactory + .getLog(FileSystemPriorityLabelsStore.class); + + protected static final String DEFAULT_DIR_NAME = "priority-labels"; + protected static final String MIRROR_FILENAME = "prioritylabel.mirror"; + protected static final String EDITLOG_FILENAME = "prioritylabel.editlog"; + + protected enum SerializedLogType { + ADD_TO_CLUSTER_PRIORITY_LABELS, SET_PRIORITY_LABEL_PER_QUEUE, REMOVE_FROM_CLUSTER_PRIORITY_LABELS + } + + Path fsWorkingPath; + FileSystem fs; + FSDataOutputStream editlogOs; + Path editLogPath; + + private String getDefaultFSPriorityLabelsRootDir() throws IOException { + // default is in local: /tmp/hadoop-yarn-${user}/priority-labels/ + return "file:///tmp/hadoop-yarn-" + + UserGroupInformation.getCurrentUser().getShortUserName() + "/" + + DEFAULT_DIR_NAME; + } + + @Override + public void init(Configuration conf) throws Exception { + fsWorkingPath = new Path(conf.get( + YarnConfiguration.FS_PRIORITY_LABELS_STORE_ROOT_DIR, + getDefaultFSPriorityLabelsRootDir())); + + setFileSystem(conf); + + // mkdir of root dir path + fs.mkdirs(fsWorkingPath); + } + + @Override + public void close() throws IOException { + try { + fs.close(); + editlogOs.close(); + } catch (IOException e) { + LOG.warn("Exception happened whiling shutting down,", e); + } + } + + private void setFileSystem(Configuration conf) throws IOException { + Configuration confCopy = new Configuration(conf); + confCopy.setBoolean("dfs.client.retry.policy.enabled", true); + String retryPolicy = confCopy.get( + YarnConfiguration.FS_PRIORITY_LABELS_STORE_RETRY_POLICY_SPEC, + YarnConfiguration.DEFAULT_FS_PRIORITY_LABELS_STORE_RETRY_POLICY_SPEC); + confCopy.set("dfs.client.retry.policy.spec", retryPolicy); + fs = fsWorkingPath.getFileSystem(confCopy); + + // if it's local file system, use RawLocalFileSystem instead of + // LocalFileSystem, the latter one doesn't support append. + if (fs.getScheme().equals("file")) { + fs = ((LocalFileSystem) fs).getRaw(); + } + } + + private void ensureAppendEditlogFile() throws IOException { + editlogOs = fs.append(editLogPath); + } + + private void ensureCloseEditlogFile() throws IOException { + editlogOs.close(); + } + + private PriorityLabelsPerQueuePBImpl convertFromProtoFormat( + PriorityLabelsPerQueueProto p) { + return new PriorityLabelsPerQueuePBImpl(p); + } + + @Override + public void setPriorityLabelsForQueue(String queueName, + PriorityLabelsPerQueue priorityLabelsPerQueue) throws IOException { + ensureAppendEditlogFile(); + editlogOs + .writeInt(SerializedLogType.SET_PRIORITY_LABEL_PER_QUEUE.ordinal()); + ((SetPriorityLabelForQueueRequestPBImpl) SetPriorityLabelForQueueRequest + .newInstance(queueName, priorityLabelsPerQueue)).getProto() + .writeDelimitedTo(editlogOs); + ensureCloseEditlogFile(); + } + + @Override + public void addToClusterPriorityLabels(Set priorityLabels) + throws IOException { + ensureAppendEditlogFile(); + editlogOs.writeInt(SerializedLogType.ADD_TO_CLUSTER_PRIORITY_LABELS + .ordinal()); + ((AddToClusterPriorityLabelsRequestPBImpl) AddToClusterPriorityLabelsRequest + .newInstance(priorityLabels)).getProto().writeDelimitedTo(editlogOs); + ensureCloseEditlogFile(); + } + + @Override + public void removeFromClusterPriorityLabels(Collection priorityLabels) + throws IOException { + ensureAppendEditlogFile(); + editlogOs.writeInt(SerializedLogType.REMOVE_FROM_CLUSTER_PRIORITY_LABELS + .ordinal()); + ((RemoveFromClusterPriorityLabelsRequestPBImpl) RemoveFromClusterPriorityLabelsRequest + .newInstance(Sets.newHashSet(priorityLabels.iterator()))).getProto() + .writeDelimitedTo(editlogOs); + ensureCloseEditlogFile(); + } + + @Override + public void recover() throws IOException { + /* + * Steps of recover + * 1) Read from last mirror (from mirror or mirror.old) + * 2) Read from last edit log, and apply such edit log + * 3) Write new mirror to mirror.writing + * 4) Rename mirror to mirror.old + * 5) Move mirror.writing to mirror + * 6) Remove mirror.old + * 7) Remove edit log and create a new empty edit log + */ + + // Open mirror from serialized file + Path mirrorPath = new Path(fsWorkingPath, MIRROR_FILENAME); + Path oldMirrorPath = new Path(fsWorkingPath, MIRROR_FILENAME + ".old"); + + FSDataInputStream is = null; + if (fs.exists(mirrorPath)) { + is = fs.open(mirrorPath); + } else if (fs.exists(oldMirrorPath)) { + is = fs.open(oldMirrorPath); + } + + if (null != is) { + Set labels = new AddToClusterPriorityLabelsRequestPBImpl( + AddToClusterPriorityLabelsRequestProto.parseDelimitedFrom(is)) + .getPriorityLabels(); + Map queueToLabels = new ReplacePriorityLabelsOnQueueRequestPBImpl( + ReplacePriorityLabelsOnQueueRequestProto.parseDelimitedFrom(is)) + .getQueueToLabels(); + prLabelManager.addToClusterPriorityLabels(labels); + prLabelManager.replacePriorityLabelsOnQueue(queueToLabels); + is.close(); + } + + // Open and process editlog + editLogPath = new Path(fsWorkingPath, EDITLOG_FILENAME); + if (fs.exists(editLogPath)) { + is = fs.open(editLogPath); + + while (true) { + try { + // read edit log one by one + SerializedLogType type = SerializedLogType.values()[is.readInt()]; + + switch (type) { + case SET_PRIORITY_LABEL_PER_QUEUE : { + SetPriorityLabelForQueueRequestProto proto = SetPriorityLabelForQueueRequestProto + .parseDelimitedFrom(is); + String queueName = proto.getQueueName(); + PriorityLabelsPerQueue priorityLabelsPerQueue = convertFromProtoFormat(proto + .getPriorityLabelsPerQueue()); + prLabelManager.setPriorityLabelPerQueue(queueName, + priorityLabelsPerQueue); + break; + } + case ADD_TO_CLUSTER_PRIORITY_LABELS: { + Collection labels = AddToClusterPriorityLabelsRequestProto + .parseDelimitedFrom(is).getPriorityLabelsList(); + prLabelManager.addToClusterPriorityLabels(Sets.newHashSet(labels + .iterator())); + break; + } + case REMOVE_FROM_CLUSTER_PRIORITY_LABELS: { + Collection labels = RemoveFromClusterPriorityLabelsRequestProto + .parseDelimitedFrom(is).getPriorityLabelsList(); + prLabelManager.removeFromClusterPriorityLabels(labels); + break; + } + default: + break; + } + } catch (EOFException e) { + // EOF hit, break + break; + } + } + } + + // Serialize current mirror to mirror.writing + Path writingMirrorPath = new Path(fsWorkingPath, MIRROR_FILENAME + + ".writing"); + FSDataOutputStream os = fs.create(writingMirrorPath, true); + ((AddToClusterPriorityLabelsRequestPBImpl) AddToClusterPriorityLabelsRequest + .newInstance(prLabelManager.getClusterPriorityLabelsForRecovery())).getProto() + .writeDelimitedTo(os); + ((ReplacePriorityLabelsOnQueueRequestPBImpl) ReplacePriorityLabelsOnQueueRequest + .newInstance(prLabelManager.getPriorityLabels())).getProto() + .writeDelimitedTo(os); + os.close(); + + // Move mirror to mirror.old + if (fs.exists(mirrorPath)) { + fs.delete(oldMirrorPath, false); + fs.rename(mirrorPath, oldMirrorPath); + } + + // move mirror.writing to mirror + fs.rename(writingMirrorPath, mirrorPath); + fs.delete(writingMirrorPath, false); + + // remove mirror.old + fs.delete(oldMirrorPath, false); + + // create a new editlog file + editlogOs = fs.create(editLogPath, true); + editlogOs.close(); + + LOG.info("Finished write mirror at:" + mirrorPath.toString()); + LOG.info("Finished create editlog file at:" + editLogPath.toString()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/PriorityLabelsStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/PriorityLabelsStore.java new file mode 100644 index 0000000..e6efbfc --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/PriorityLabelsStore.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.prioritylabels; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Collection; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.PriorityLabelsPerQueue; + +public abstract class PriorityLabelsStore implements Closeable { + protected final CommonPriorityLabelsManager prLabelManager; + protected Configuration conf; + + public PriorityLabelsStore(CommonPriorityLabelsManager prLabelManager) { + this.prLabelManager = prLabelManager; + } + + /** + * Set priority label per queue + */ + public abstract void setPriorityLabelsForQueue(String queueName, + PriorityLabelsPerQueue priorityLabelsPerQueue) throws IOException; + + /** + * Store new priority labels in cluster + */ + public abstract void addToClusterPriorityLabels(Set priorityLabels) + throws IOException; + + /** + * Remove priority labels from cluster + */ + public abstract void removeFromClusterPriorityLabels( + Collection priorityLabels) throws IOException; + + /** + * Recover labels and node to labels mappings from store + * + * @param conf + */ + public abstract void recover() throws IOException; + + public void init(Configuration conf) throws Exception { + this.conf = conf; + } + + public CommonPriorityLabelsManager getPriorityLabelsManager() { + return prLabelManager; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/event/AddToClusterPriorityLabels.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/event/AddToClusterPriorityLabels.java new file mode 100644 index 0000000..b86dec1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/event/AddToClusterPriorityLabels.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.prioritylabels.event; + +import java.util.Set; + +public class AddToClusterPriorityLabels extends PriorityLabelsStoreEvent { + private Set priorityLabels; + + public AddToClusterPriorityLabels(Set priorityLabels) { + super(PriorityLabelsStoreEventType.ADD_TO_CLUSTER_PRIORITY_LABELS); + this.priorityLabels = priorityLabels; + } + + public Set getPriorityLabels() { + return priorityLabels; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/event/PriorityLabelsStoreEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/event/PriorityLabelsStoreEvent.java new file mode 100644 index 0000000..6d0896f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/event/PriorityLabelsStoreEvent.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.prioritylabels.event; + +import org.apache.hadoop.yarn.event.AbstractEvent; + +public class PriorityLabelsStoreEvent extends + AbstractEvent { + public PriorityLabelsStoreEvent(PriorityLabelsStoreEventType type) { + super(type); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/event/PriorityLabelsStoreEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/event/PriorityLabelsStoreEventType.java new file mode 100644 index 0000000..6b3b615 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/event/PriorityLabelsStoreEventType.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.prioritylabels.event; + +public enum PriorityLabelsStoreEventType { + ADD_TO_CLUSTER_PRIORITY_LABELS, + SET_PRIORITY_LABEL_PER_QUEUE, + REMOVE_FROM_CLUSTER_PRIORITY_LABELS +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/event/RemoveFromClusterPriorityLabels.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/event/RemoveFromClusterPriorityLabels.java new file mode 100644 index 0000000..71718bd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/event/RemoveFromClusterPriorityLabels.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.prioritylabels.event; + +import java.util.Collection; + +public class RemoveFromClusterPriorityLabels extends PriorityLabelsStoreEvent { + private Collection priorityLabels; + + public RemoveFromClusterPriorityLabels(Collection priorityLabels) { + super(PriorityLabelsStoreEventType.REMOVE_FROM_CLUSTER_PRIORITY_LABELS); + this.priorityLabels = priorityLabels; + } + + public Collection getPriorityLabels() { + return priorityLabels; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/event/SetPriorityLabelPerQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/event/SetPriorityLabelPerQueue.java new file mode 100644 index 0000000..0bc645a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/event/SetPriorityLabelPerQueue.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.prioritylabels.event; + +import org.apache.hadoop.yarn.api.records.PriorityLabelsPerQueue; + + +public class SetPriorityLabelPerQueue extends PriorityLabelsStoreEvent { + private String queueName; + private PriorityLabelsPerQueue priorityLabelsPerQueue; + + public SetPriorityLabelPerQueue(String queueName, + PriorityLabelsPerQueue priorityLabelsPerQueue) { + super(PriorityLabelsStoreEventType.SET_PRIORITY_LABEL_PER_QUEUE); + this.queueName = queueName; + this.priorityLabelsPerQueue = priorityLabelsPerQueue; + } + + public String getQueueName() { + return queueName; + } + + public PriorityLabelsPerQueue getPriorityLabelsPerQueue() { + return priorityLabelsPerQueue; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/prioritylabels/DummyCommonPriorityLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/prioritylabels/DummyCommonPriorityLabelsManager.java new file mode 100644 index 0000000..6821435 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/prioritylabels/DummyCommonPriorityLabelsManager.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.prioritylabels; + +import java.io.IOException; +import java.util.Collection; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.PriorityLabelsPerQueue; +import org.apache.hadoop.yarn.event.InlineDispatcher; + +public class DummyCommonPriorityLabelsManager extends CommonPriorityLabelsManager { + + Collection lastAddedPrioritylabels = null; + Collection lastRemovedPrioritylabels = null; + + @Override + public void initPriorityLabelStore(Configuration conf) { + this.store = new PriorityLabelsStore(this) { + + @Override + public void recover() throws IOException { + } + + @Override + public void close() throws IOException { + // do nothing + } + + @Override + public void setPriorityLabelsForQueue(String queueName, + PriorityLabelsPerQueue priorityLabelsPerQueue) throws IOException { + } + + @Override + public void addToClusterPriorityLabels(Set priorityLabels) + throws IOException { + lastAddedPrioritylabels = priorityLabels; + } + + @Override + public void removeFromClusterPriorityLabels( + Collection priorityLabels) throws IOException { + lastRemovedPrioritylabels = priorityLabels; + } + }; + } + + @Override + protected void initDispatcher(Configuration conf) { + super.dispatcher = new InlineDispatcher(); + } + + @Override + protected void startDispatcher() { + // do nothing + } + + @Override + protected void stopDispatcher() { + // do nothing + } + + @Override + protected void serviceStop() throws Exception { + super.serviceStop(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/prioritylabels/PriorityLabelTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/prioritylabels/PriorityLabelTestBase.java new file mode 100644 index 0000000..da476e2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/prioritylabels/PriorityLabelTestBase.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.prioritylabels; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.yarn.api.records.PriorityLabelsPerQueue; +import org.junit.Assert; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; + +public class PriorityLabelTestBase { + public static void assertMapEquals(Map m1, + ImmutableMap m2) { + Assert.assertEquals(m1.size(), m2.size()); + for (String k : m1.keySet()) { + Assert.assertTrue(m2.containsKey(k)); + m1.get(k).equals(m2.get(k)); + } + } + + public static void assertMapContains(Map m1, + ImmutableMap m2) { + for (String k : m2.keySet()) { + Assert.assertTrue(m1.containsKey(k)); + m1.get(k).equals(m2.get(k)); + } + } + + public static void assertCollectionEquals(Collection c1, + Collection c2) { + Set s1 = new HashSet(c1); + Set s2 = new HashSet(c2); + Assert.assertEquals(s1, s2); + Assert.assertTrue(s1.containsAll(s2)); + } + + public static Set toSet(E... elements) { + Set set = Sets.newHashSet(elements); + return set; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/prioritylabels/TestCommonPriorityLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/prioritylabels/TestCommonPriorityLabelsManager.java new file mode 100644 index 0000000..a0218f2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/prioritylabels/TestCommonPriorityLabelsManager.java @@ -0,0 +1,243 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.prioritylabels; + +import java.io.IOException; +import java.util.Arrays; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.PriorityLabelsPerQueue; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; + +public class TestCommonPriorityLabelsManager extends PriorityLabelTestBase { + DummyCommonPriorityLabelsManager mgr = null; + + @Before + public void before() { + mgr = new DummyCommonPriorityLabelsManager(); + mgr.init(new Configuration()); + mgr.start(); + } + + @After + public void after() { + mgr.stop(); + } + + @Test + public void testAddRemovePrioritylabel() throws Exception { + mgr.setMax_Priority_Value(-1); + // Add some label + mgr.addToClusterPriorityLabels(ImmutableSet.of("low:1")); + assertCollectionEquals(mgr.lastAddedPrioritylabels, Arrays.asList("low:1")); + + // Add a label w/o integer mapping, it should take incremented value from + // last updated priority integer value. + mgr.addToClusterPriorityLabels(ImmutableSet.of("medium")); + mgr.addToClusterPriorityLabels(toSet("high:3", "v_high:4")); + assertCollectionEquals(mgr.lastAddedPrioritylabels, + Sets.newHashSet("high:3", "v_high:4")); + + Assert.assertTrue(mgr.getClusterPriorityLabelsForRecovery().containsAll( + Sets.newHashSet("low:1", "medium:2", "high:3", "v_high:4"))); + + // try to remove null, empty and non-existed label, should fail + for (String p : Arrays.asList(null, + CommonPriorityLabelsManager.EMPTY_PRIORITY_LABEL_NAME, "dummy:5")) { + boolean caught = false; + try { + mgr.removeFromClusterPriorityLabels(Arrays.asList(p)); + } catch (IOException e) { + caught = true; + } + Assert.assertTrue("remove label should fail " + + "when label is null/empty/non-existed", caught); + } + + // Remove some label + mgr.removeFromClusterPriorityLabels(Arrays.asList("medium")); + assertCollectionEquals(mgr.lastRemovedPrioritylabels, + Arrays.asList("medium")); + Assert.assertTrue(mgr.getClusterPriorityLabelsForRecovery().containsAll( + Arrays.asList("low:1", "high:3", "v_high:4"))); + + mgr.removeFromClusterPriorityLabels(Arrays.asList("low", "high", "v_high")); + Assert.assertTrue(mgr.lastRemovedPrioritylabels.containsAll(Sets + .newHashSet("low", "high", "v_high"))); + Assert.assertTrue(mgr.getClusterPriorityLabels().isEmpty()); + } + + @Test + public void testAddandSetPrioritylabel() throws Exception { + // Add some label + mgr.addToClusterPriorityLabels(ImmutableSet.of("low:1")); + assertCollectionEquals(mgr.lastAddedPrioritylabels, Arrays.asList("low:1")); + + mgr.addToClusterPriorityLabels(toSet("medium:2", "high:3")); + assertCollectionEquals(mgr.lastAddedPrioritylabels, + Sets.newHashSet("medium:2", "high:3")); + + Assert.assertTrue(mgr.getClusterPriorityLabelsForRecovery().containsAll( + Sets.newHashSet("low:1", "medium:2", "high:3"))); + + // Set non-exiting labels to a queue + boolean caught = false; + try { + mgr.setPriorityLabelPerQueue("queueA", + PriorityLabelsPerQueue.newInstance("v_high", "low")); + } catch (IOException e) { + caught = true; + } + Assert.assertTrue("Cannot set label to a Queue " + + "when label is non-existing in Queue", caught); + + // Set some labels to a queue + mgr.setPriorityLabelPerQueue("queueA", + PriorityLabelsPerQueue.newInstance("medium", "low")); + + Assert.assertTrue(mgr.getPriorityLabelsFromQueue("queueA").equals( + PriorityLabelsPerQueue.newInstance("medium", "low"))); + } + + @Test + public void testAddReplaceRemovePriorityLabelsInQueue() throws Exception { + // set a label on a queue, but label doesn't exist + boolean caught = false; + try { + mgr.replacePriorityLabelsOnQueue(ImmutableMap.of("queueA", + PriorityLabelsPerQueue.newInstance("medium", "low"))); + } catch (IOException e) { + caught = true; + } + Assert.assertTrue("Trying to replace priority label to a queue but " + + "label doesn't exist in repository should fail", caught); + + // replace queue->pr label one by one with newly added label in cluster + mgr.addToClusterPriorityLabels(toSet("low:1", "medium:2", "high:3")); + mgr.replacePriorityLabelsOnQueue(ImmutableMap.of("queueA", + PriorityLabelsPerQueue.newInstance("medium", "high"))); + mgr.replacePriorityLabelsOnQueue(ImmutableMap.of("queueB", + PriorityLabelsPerQueue.newInstance("high", "low"))); + assertMapEquals(mgr.getPriorityLabels(), ImmutableMap.of("queueB", + PriorityLabelsPerQueue.newInstance("high", "low"), "queueA", + PriorityLabelsPerQueue.newInstance("medium", "high"))); + } + + @Test(timeout = 5000) + public void testRemovePrioritylabelAddedToQueue() throws Exception { + mgr.addToClusterPriorityLabels(toSet("low:1", "medium:2", "high:3")); + mgr.replacePriorityLabelsOnQueue(ImmutableMap.of("queueA", + PriorityLabelsPerQueue.newInstance("medium", "low"))); + mgr.replacePriorityLabelsOnQueue(ImmutableMap.of("queueB", + PriorityLabelsPerQueue.newInstance("high", "low"))); + + boolean caught = false; + try { + mgr.removeFromClusterPriorityLabels(ImmutableSet.of("medium")); + } catch (IOException e) { + caught = true; + } + Assert.assertTrue("Trying to remove priority label from cluster but " + + "label exists in a queue, should fail", caught); + + mgr.replacePriorityLabelsOnQueue(ImmutableMap.of("queueA", + PriorityLabelsPerQueue.newInstance("low", "high"))); + + // Now removal should be a success + mgr.removeFromClusterPriorityLabels(ImmutableSet.of("medium")); + assertMapEquals( + mgr.getPriorityLabels(), + ImmutableMap.of("queueB", + PriorityLabelsPerQueue.newInstance("high", "low"), + "queueA", + PriorityLabelsPerQueue.newInstance("low", "high"))); + assertCollectionEquals(mgr.lastRemovedPrioritylabels, + Arrays.asList("medium")); + + mgr.replacePriorityLabelsOnQueue(ImmutableMap.of("queueA", + PriorityLabelsPerQueue.newInstance("", ""))); + mgr.replacePriorityLabelsOnQueue(ImmutableMap.of("queueB", + PriorityLabelsPerQueue.newInstance("", ""))); + + // remove all labels from cluster + mgr.removeFromClusterPriorityLabels(ImmutableSet.of("high", "low")); + Assert.assertTrue(mgr.getClusterPriorityLabels().isEmpty()); + assertCollectionEquals(mgr.lastRemovedPrioritylabels, + Arrays.asList("high", "low")); + } + + @Test + public void testDuplicateInvalidWhenAddPriorityLabels() throws IOException { + mgr.addToClusterPriorityLabels(toSet("low:1")); + assertCollectionEquals(mgr.getClusterPriorityLabelsForRecovery(), + toSet("low:1")); + mgr.addToClusterPriorityLabels(toSet("low:2")); + assertCollectionEquals(mgr.getClusterPriorityLabelsForRecovery(), + toSet("low:2")); + + // try to add medium with same integer map key + boolean caught = false; + try { + mgr.addToClusterPriorityLabels(toSet("medium:2")); + } catch (IOException e) { + caught = true; + } + Assert.assertTrue("Trying to add priority label to cluster but " + + "another label exists with same priority value, should fail", caught); + assertCollectionEquals(mgr.getClusterPriorityLabelsForRecovery(), + toSet("low:2")); + } + + @Test + public void testTrimLabelsWhenAddRemovePriorityLabels() throws IOException { + mgr.addToClusterPriorityLabels(toSet(" low:1")); + assertCollectionEquals(mgr.getClusterPriorityLabels(), toSet("low")); + mgr.removeFromClusterPriorityLabels(toSet("low ")); + Assert.assertTrue(mgr.getClusterPriorityLabels().isEmpty()); + } + + @Test + public void testTrimLabelsWhenModifyLabelsInQueue() throws IOException { + mgr.addToClusterPriorityLabels(toSet(" low:1", "medium ")); + mgr.setPriorityLabelPerQueue("queueA", + PriorityLabelsPerQueue.newInstance("medium ", " low")); + assertMapEquals( + mgr.getPriorityLabels(), + ImmutableMap.of("queueA", + PriorityLabelsPerQueue.newInstance("medium", "low"))); + mgr.replacePriorityLabelsOnQueue(ImmutableMap.of("queueA", + PriorityLabelsPerQueue.newInstance("low ", " medium"))); + assertMapEquals( + mgr.getPriorityLabels(), + ImmutableMap.of("queueA", + PriorityLabelsPerQueue.newInstance("low", "medium"))); + + mgr.replacePriorityLabelsOnQueue(ImmutableMap.of("queueA", + PriorityLabelsPerQueue.newInstance("", ""))); + mgr.removeFromClusterPriorityLabels(toSet("low ", " medium")); + Assert.assertTrue(mgr.getClusterPriorityLabels().isEmpty()); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/prioritylabels/TestFileSystemPriorityLabelsStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/prioritylabels/TestFileSystemPriorityLabelsStore.java new file mode 100644 index 0000000..974e183 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/prioritylabels/TestFileSystemPriorityLabelsStore.java @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.prioritylabels; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.PriorityLabelsPerQueue; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.InlineDispatcher; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.ImmutableMap; + +public class TestFileSystemPriorityLabelsStore extends PriorityLabelTestBase { + MockPriorityLabelManager mgr = null; + Configuration conf = null; + + private static class MockPriorityLabelManager + extends + CommonPriorityLabelsManager { + @Override + protected void initDispatcher(Configuration conf) { + super.dispatcher = new InlineDispatcher(); + } + + @Override + protected void startDispatcher() { + // do nothing + } + + @Override + protected void stopDispatcher() { + // do nothing + } + } + + private FileSystemPriorityLabelsStore getStore() { + return (FileSystemPriorityLabelsStore) mgr.store; + } + + @Before + public void before() throws IOException { + mgr = new MockPriorityLabelManager(); + conf = new Configuration(); + File tempDir = File.createTempFile("nlb", ".tmp"); + tempDir.delete(); + tempDir.mkdirs(); + tempDir.deleteOnExit(); + conf.set(YarnConfiguration.FS_PRIORITY_LABELS_STORE_ROOT_DIR, + tempDir.getAbsolutePath()); + mgr.init(conf); + mgr.start(); + } + + @After + public void after() throws IOException { + getStore().fs.delete(getStore().fsWorkingPath, true); + mgr.stop(); + } + + @Test + public void testRecoverWithMirror() throws Exception { + mgr.addToClusterPriorityLabels(toSet("v_low", "low", "medium")); + mgr.addToClusterPriorityLabels(toSet("high")); + mgr.addToClusterPriorityLabels(toSet("v_high")); + mgr.setPriorityLabelPerQueue("queueA", + PriorityLabelsPerQueue.newInstance("medium", "low")); + mgr.replacePriorityLabelsOnQueue(ImmutableMap.of("queueB", + PriorityLabelsPerQueue.newInstance("high", "low"), "queueC", + PriorityLabelsPerQueue.newInstance("v_high", "medium"))); + Assert.assertEquals(5, mgr.getClusterPriorityLabels().size()); + + mgr.removeFromClusterPriorityLabels(toSet("v_low")); + + // shutdown mgr and start a new mgr + mgr.stop(); + + mgr = new MockPriorityLabelManager(); + mgr.init(conf); + + // check variables + Assert.assertEquals(4, mgr.getClusterPriorityLabels().size()); + Assert.assertTrue(mgr.getClusterPriorityLabels().containsAll( + Arrays.asList("low", "medium", "high", "v_high"))); + + assertMapContains(mgr.getPriorityLabels(), ImmutableMap.of("queueB", + PriorityLabelsPerQueue.newInstance("high", "low"), "queueC", + PriorityLabelsPerQueue.newInstance("v_high", "medium"), "queueA", + PriorityLabelsPerQueue.newInstance("medium", "low"))); + + // shutdown mgr and start a new mgr + mgr.stop(); + mgr = new MockPriorityLabelManager(); + mgr.init(conf); + + // check variables + Assert.assertEquals(4, mgr.getClusterPriorityLabels().size()); + Assert.assertTrue(mgr.getClusterPriorityLabels().containsAll( + Arrays.asList("low", "medium", "high", "v_high"))); + + assertMapContains(mgr.getPriorityLabels(), ImmutableMap.of("queueB", + PriorityLabelsPerQueue.newInstance("high", "low"), "queueC", + PriorityLabelsPerQueue.newInstance("v_high", "medium"), "queueA", + PriorityLabelsPerQueue.newInstance("medium", "low"))); + mgr.stop(); + } + + @Test(timeout = 10000) + public void testEditlogRecover() throws Exception { + mgr.addToClusterPriorityLabels(toSet("v_low", "low", "medium")); + mgr.addToClusterPriorityLabels(toSet("high")); + mgr.addToClusterPriorityLabels(toSet("v_high")); + mgr.setPriorityLabelPerQueue("queueA", + PriorityLabelsPerQueue.newInstance("medium", "low")); + mgr.replacePriorityLabelsOnQueue(ImmutableMap.of("queueB", + PriorityLabelsPerQueue.newInstance("high", "low"), "queueC", + PriorityLabelsPerQueue.newInstance("v_high", "medium"))); + Assert.assertEquals(5, mgr.getClusterPriorityLabels().size()); + + mgr.removeFromClusterPriorityLabels(toSet("v_low")); + /* + * After removed queueA: medium,low queueB: high,low queueC: v_high,medium + */ + + // shutdown mgr and start a new mgr + mgr.stop(); + + mgr = new MockPriorityLabelManager(); + mgr.init(conf); + + // check variables + Assert.assertEquals(4, mgr.getClusterPriorityLabels().size()); + Assert.assertTrue(mgr.getClusterPriorityLabels().containsAll( + Arrays.asList("low", "medium", "high", "v_high"))); + + assertMapContains(mgr.getPriorityLabels(), ImmutableMap.of("queueB", + PriorityLabelsPerQueue.newInstance("high", "low"), "queueC", + PriorityLabelsPerQueue.newInstance("v_high", "medium"), "queueA", + PriorityLabelsPerQueue.newInstance("medium", "low"))); + mgr.stop(); + } + + @Test + // (timeout = 10000) + public void testSerilizationAfterRecovery() throws Exception { + mgr.addToClusterPriorityLabels(toSet("v_low", "low", "medium")); + mgr.addToClusterPriorityLabels(toSet("high")); + mgr.setPriorityLabelPerQueue("queueA", + PriorityLabelsPerQueue.newInstance("medium", "low")); + mgr.replacePriorityLabelsOnQueue(ImmutableMap.of("queueB", + PriorityLabelsPerQueue.newInstance("high", "low"))); + + mgr.removeFromClusterPriorityLabels(toSet("v_low")); + + // shutdown mgr and start a new mgr + mgr.stop(); + + mgr = new MockPriorityLabelManager(); + mgr.init(conf); + mgr.start(); + + // check variables + Assert.assertEquals(3, mgr.getClusterPriorityLabels().size()); + Assert.assertTrue(mgr.getClusterPriorityLabels().containsAll( + Arrays.asList("low", "medium", "high"))); + + assertMapContains(mgr.getPriorityLabels(), ImmutableMap.of("queueB", + PriorityLabelsPerQueue.newInstance("high", "low"), "queueA", + PriorityLabelsPerQueue.newInstance("medium", "low"))); + + /* + * Add label v_high and set to queueC then shutdown + */ + mgr = new MockPriorityLabelManager(); + mgr.init(conf); + mgr.start(); + mgr.addToClusterPriorityLabels(toSet("v_high")); + mgr.setPriorityLabelPerQueue("queueC", + PriorityLabelsPerQueue.newInstance("v_high", "low")); + mgr.stop(); + + /* + * Recovery, and see if v_high is added in cluter and queueC + */ + mgr = new MockPriorityLabelManager(); + mgr.init(conf); + mgr.start(); + + // check variables + Assert.assertEquals(4, mgr.getClusterPriorityLabels().size()); + Assert.assertTrue(mgr.getClusterPriorityLabels().containsAll( + Arrays.asList("low", "medium", "high", "v_high"))); + assertMapContains(mgr.getPriorityLabels(), ImmutableMap.of("queueB", + PriorityLabelsPerQueue.newInstance("high", "low"), "queueA", + PriorityLabelsPerQueue.newInstance("medium", "low"), "queueC", + PriorityLabelsPerQueue.newInstance("v_high", "low"))); + mgr.stop(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/prioritylabels/MemoryRMPriorityLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/prioritylabels/MemoryRMPriorityLabelsManager.java new file mode 100644 index 0000000..9dc865f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/prioritylabels/MemoryRMPriorityLabelsManager.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.prioritylabels; + +import java.io.IOException; +import java.util.Collection; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.PriorityLabelsPerQueue; +import org.apache.hadoop.yarn.prioritylabels.PriorityLabelsStore; + +public class MemoryRMPriorityLabelsManager extends RMPriorityLabelsManager { + + @Override + public void initPriorityLabelStore(Configuration conf) { + this.store = new PriorityLabelsStore(this) { + + @Override + public void recover() throws IOException { + // do nothing + } + + @Override + public void close() throws IOException { + // do nothing + } + + @Override + public void setPriorityLabelsForQueue(String queueName, + PriorityLabelsPerQueue priorityLabelsPerQueue) throws IOException { + // do nothing + } + + @Override + public void addToClusterPriorityLabels(Set priorityLabels) + throws IOException { + // do nothing + } + + @Override + public void removeFromClusterPriorityLabels( + Collection priorityLabels) throws IOException { + // do nothing + } + }; + } + + @Override + protected void initDispatcher(Configuration conf) { + super.dispatcher = null; + } + + @Override + protected void startDispatcher() { + // do nothing + } + + @Override + protected void stopDispatcher() { + // do nothing + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/prioritylabels/RMPriorityLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/prioritylabels/RMPriorityLabelsManager.java new file mode 100644 index 0000000..870c7e7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/prioritylabels/RMPriorityLabelsManager.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.prioritylabels; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.yarn.api.records.ApplicationPriority; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.prioritylabels.CommonPriorityLabelsManager; + + +public class RMPriorityLabelsManager extends CommonPriorityLabelsManager { + + protected AccessControlList adminAcl; + protected AccessControlList priorityLabelAcl; + + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + adminAcl = new AccessControlList(conf.get(YarnConfiguration.YARN_ADMIN_ACL, + YarnConfiguration.DEFAULT_YARN_ADMIN_ACL)); + } + + /* + * Following method is used to get the integer mapping associated + * with a PriorityLabel for a submitted application + */ + public Integer getIntegerPriorityLabelMapping( + ApplicationPriority priorityLabel) throws IOException { + try { + readLock.lock(); + PriorityLabel label = clusterPriorityLabelCollections.get(priorityLabel); + if (null == label) { + throw new IOException("Priority label=" + label + + " is not present in cluster level"); + } + return label.getPriorityValue(); + } finally { + readLock.unlock(); + } + } + + public boolean checkAdminAccess(UserGroupInformation user) { + // make sure only admin can invoke + // this method + if (adminAcl.isUserAllowed(user)) { + return true; + } + return false; + } +} -- 1.9.4.msysgit.1