diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 4b4f581..7af6181 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1587,6 +1587,17 @@ private static void addDeprecatedKeys() { public static final String DEFAULT_FS_NODE_LABELS_STORE_RETRY_POLICY_SPEC = "2000, 500"; + public static final String PRIORITY_LABELS_PREFIX = YARN_PREFIX + + "priority-labels."; + + /** URI for PriorityLabelManager */ + public static final String FS_PRIORITY_LABELS_STORE_ROOT_DIR = PRIORITY_LABELS_PREFIX + + "fs-store.root-dir"; + public static final String FS_PRIORITY_LABELS_STORE_RETRY_POLICY_SPEC + = PRIORITY_LABELS_PREFIX + "fs-store.retry-policy-spec"; + public static final String DEFAULT_FS_PRIORITY_LABELS_STORE_RETRY_POLICY_SPEC + = "2000, 500"; + public YarnConfiguration() { super(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/CommonPriorityLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/CommonPriorityLabelsManager.java new file mode 100644 index 0000000..697c99b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/CommonPriorityLabelsManager.java @@ -0,0 +1,651 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.prioritylabels; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.ApplicationPriority; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.prioritylabels.event.AddNewPriorityLabels; +import org.apache.hadoop.yarn.prioritylabels.event.PriorityLabelsStoreEvent; +import org.apache.hadoop.yarn.prioritylabels.event.PriorityLabelsStoreEventType; +import org.apache.hadoop.yarn.prioritylabels.event.RemoveClusterPriorityLabels; +import org.apache.hadoop.yarn.prioritylabels.event.RemovePriorityLabels; +import org.apache.hadoop.yarn.prioritylabels.event.StoreClusterPriorityLabels; + + +public class CommonPriorityLabelsManager extends AbstractService { + + protected static final Log LOG = LogFactory + .getLog(CommonPriorityLabelsManager.class); + + public static final String EMPTY_PRIORITY_LABEL_NAME = ""; + public static final String EMPTY_QUEUE_NAME = ""; + public static final Integer EMPTY_PRIORITY_VALUE = -1; + public static final ApplicationPriority EMPTY_PRIORITY_LABEL = ApplicationPriority + .newInstance(EMPTY_PRIORITY_LABEL_NAME); + + protected Dispatcher dispatcher; + + protected ConcurrentMap clusterPriorityLabelCollections + = new ConcurrentHashMap(); + + // Map to queuName vs Queue object which has priotity labels + protected ConcurrentMap queueCollections + = new ConcurrentHashMap(); + + protected final ReadLock readLock; + protected final WriteLock writeLock; + + protected PriorityLabelsStore store; + private static Integer MAX_PRIORITY_VALUE = -1; + + protected static class PriorityLabel { + private String priorityLabelName; + private Integer priorityValue; + + protected PriorityLabel() { + this.setPriorityLabelName(EMPTY_PRIORITY_LABEL_NAME); + this.setPriorityValue(EMPTY_PRIORITY_VALUE); + } + + protected PriorityLabel(String labelName, Integer priorityValue) { + this.setPriorityLabelName(labelName); + this.setPriorityValue(priorityValue); + } + + public String getPriorityLabelName() { + return priorityLabelName; + } + + public void setPriorityLabelName(String priorityLabelName) { + this.priorityLabelName = priorityLabelName; + } + + public Integer getPriorityValue() { + return priorityValue; + } + + public void setPriorityValue(Integer priorityValue) { + this.priorityValue = priorityValue; + } + } + + protected static class Queue { + public String queueName; + public Set priorityLabels; + + protected Queue() { + this.queueName = EMPTY_QUEUE_NAME; + this.priorityLabels = null; + } + + public Queue(String queueName) { + this.queueName = queueName; + this.priorityLabels = new HashSet(); + } + } + + private final class ForwardingEventHandler implements + EventHandler { + + @Override + public void handle(PriorityLabelsStoreEvent event) { + if (isInState(STATE.STARTED)) { + handleStoreEvent(event); + } + } + } + + // Dispatcher related code + protected void handleStoreEvent(PriorityLabelsStoreEvent event) { + try { + switch (event.getType()) { + case ADD_PRIORITY_LABELS_TO_QUEUE: + AddNewPriorityLabels addNewPriorityLabelsEvent = (AddNewPriorityLabels) event; + store.addPriorityLabelsToQueue( + addNewPriorityLabelsEvent.getQueueName(), + addNewPriorityLabelsEvent.getPriorityLabels()); + break; + case REMOVE_PRIORITY_LABELS_FROM_QUEUE: + RemovePriorityLabels removePriorityLabelsEvent = (RemovePriorityLabels) event; + store.removePriorityLabelsFromQueue( + removePriorityLabelsEvent.getQueueName(), + removePriorityLabelsEvent.getPriorityLabels()); + break; + case STORE_CLUSTER_PRIORITY_LABELS: + StoreClusterPriorityLabels storeClusterPriorityLabels = (StoreClusterPriorityLabels) event; + store.storePriorityLabelsInCluster(storeClusterPriorityLabels + .getPriorityLabels()); + break; + case REMOVE_CLUSTER_PRIORITY_LABELS: + RemoveClusterPriorityLabels removeClusterPriorityLabelsEvent = (RemoveClusterPriorityLabels) event; + store.removePriorityLabelsFromCluster(removeClusterPriorityLabelsEvent + .getPriorityLabels()); + break; + default: + break; + } + } catch (IOException e) { + LOG.error("Failed to store priority label modification to storage"); + throw new YarnRuntimeException(e); + } + } + + public CommonPriorityLabelsManager() { + super(CommonPriorityLabelsManager.class.getName()); + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + readLock = lock.readLock(); + writeLock = lock.writeLock(); + } + + // UT purpose + protected void initDispatcher(Configuration conf) { + dispatcher = new AsyncDispatcher(); + AsyncDispatcher asyncDispatcher = (AsyncDispatcher) dispatcher; + asyncDispatcher.init(conf); + asyncDispatcher.setDrainEventsOnStop(); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + initPriorityLabelStore(conf); + clusterPriorityLabelCollections.put(EMPTY_PRIORITY_LABEL, + new PriorityLabel()); + } + + protected void initPriorityLabelStore(Configuration conf) throws Exception { + this.store = new FileSystemPriorityLabelsStore(this); + this.store.init(conf); + this.store.recover(); + } + + // for UT purpose + protected void startDispatcher() { + // start dispatcher + AsyncDispatcher asyncDispatcher = (AsyncDispatcher) dispatcher; + asyncDispatcher.start(); + } + + @Override + protected void serviceStart() throws Exception { + // init dispatcher only when service start, because recover will happen in + // service init, we don't want to trigger any event handling at that time. + initDispatcher(getConfig()); + + if (null != dispatcher) { + dispatcher.register(PriorityLabelsStoreEventType.class, + new ForwardingEventHandler()); + } + + startDispatcher(); + } + + // for UT purpose + protected void stopDispatcher() { + AsyncDispatcher asyncDispatcher = (AsyncDispatcher) dispatcher; + asyncDispatcher.stop(); + } + + @Override + protected void serviceStop() throws Exception { + // finalize store + stopDispatcher(); + + // only close store when we enabled store persistent + if (null != store) { + store.close(); + } + } + + /** + * Add cluster level priority labels to store + * + * @param labels + * new priority labels + */ + @SuppressWarnings("unchecked") + public void storeClusterPriorityLabels(Set priorityLabels) + throws IOException { + if (null == priorityLabels || priorityLabels.isEmpty()) { + return; + } + + Set priorityLabelMapping + = processLabelsForIntegerMapping(priorityLabels); + + for (PriorityLabel label : priorityLabelMapping) { + this.clusterPriorityLabelCollections.put( + ApplicationPriority.newInstance(label.getPriorityLabelName()), label); + } + if (null != dispatcher) { + dispatcher.getEventHandler().handle( + new StoreClusterPriorityLabels(priorityLabels)); + } + + LOG.info("Add priority labels: [" + + StringUtils.join(priorityLabels.iterator(), ",") + "]"); + } + + /** + * Add multiple node labels to repository + * + * @param labels + * new node labels added + */ + @SuppressWarnings("unchecked") + public void addPriorityLabelsToQueue(String queueName, + Set priorityLabels) throws IOException { + if (null == priorityLabels || priorityLabels.isEmpty()) { + return; + } + + Set queueLevelLabels + = verifyAndNormalizeLabels(priorityLabels); + + Queue currentQueue = null; + if (this.queueCollections.containsKey(queueName)) { + currentQueue = this.queueCollections.get(queueName); + currentQueue.priorityLabels.addAll(queueLevelLabels); + } else { + currentQueue = new Queue(queueName); + currentQueue.priorityLabels.addAll(queueLevelLabels); + this.queueCollections.put(queueName, currentQueue); + } + + if (null != dispatcher) { + dispatcher.getEventHandler().handle( + new AddNewPriorityLabels(queueName, + getPriorityLabelsToSave(currentQueue.priorityLabels))); + } + + LOG.info("Add priority labels: [" + StringUtils.join(priorityLabels.iterator(), ",") + + "]"); + } + + protected void checkRemoveFromClusterPriorityLabels( + Collection prLabelsToRemove) throws IOException { + if (null == prLabelsToRemove || prLabelsToRemove.isEmpty()) { + return; + } + + // Check if label to remove doesn't existed or null/empty, will throw + // exception if any of labels to remove doesn't meet requirement + for (String label : prLabelsToRemove) { + label = normalizeLabel(label); + if (label == null || label.isEmpty()) { + throw new IOException("PriorityLabel to be removed is null or empty"); + } + + ApplicationPriority appPrLabel = ApplicationPriority.newInstance(label); + if (!clusterPriorityLabelCollections.containsKey(appPrLabel)) { + throw new IOException("Priority label=" + label + + " to be removed doesn't existed in cluster " + + "priority labels collection."); + } + } + } + + @SuppressWarnings("unchecked") + protected void internalRemoveFromClusterPriorityLabels( + Collection prLabelsToRemove) { + // remove priority labels from queue + for (String queueName : queueCollections.keySet()) { + Queue currentQueue = queueCollections.get(queueName); + if (null != currentQueue) { + currentQueue.priorityLabels + .removeAll(normalizePriorityLabelForRemoval(prLabelsToRemove)); + } + } + + // remove labels from queue labels collection + for (String labelToRemove : prLabelsToRemove) { + clusterPriorityLabelCollections.remove(ApplicationPriority + .newInstance(labelToRemove)); + } + + // create event to remove labels + if (null != dispatcher) { + dispatcher.getEventHandler().handle( + new RemoveClusterPriorityLabels(prLabelsToRemove)); + } + + LOG.info("Remove priority labels: [" + + StringUtils.join(prLabelsToRemove.iterator(), ",") + "]"); + } + + /** + * Remove from cluster priority labels + * + * @param labelsToRemove + * priority labels to remove + * @throws IOException + */ + public void removeFromClusterPriorityLabels(Collection labelsToRemove) + throws IOException { + checkRemoveFromClusterPriorityLabels(labelsToRemove); + + internalRemoveFromClusterPriorityLabels(labelsToRemove); + } + + protected void checkRemovePriorityLabelsFromQueue(String queueName, + Set priorityLabels) throws IOException { + // check all labels being added existed + Set knownLabels = clusterPriorityLabelCollections + .keySet(); + + if (!knownLabels.containsAll(priorityLabels)) { + String msg = "Not all priority being removed contained by known " + + "prioritylabel collections, please check"; + LOG.error(msg); + throw new IOException(msg); + } + + Set originalLabels = null; + + Queue currentQueue = queueCollections.get(queueName); + + if (currentQueue == null) { + String msg = "Try to remove labels from Queue=" + queueName + + ", but the Queue doesn't existed"; + LOG.error(msg); + throw new IOException(msg); + } else { + originalLabels = currentQueue.priorityLabels; + } + + if (priorityLabels == null || priorityLabels.isEmpty()) { + return; + } + + if (!originalLabels.containsAll(priorityLabels)) { + String msg = "Try to remove labels , but not all labels contained by Queue=" + + queueName; + LOG.error(msg); + throw new IOException(msg); + } + } + + @SuppressWarnings("unchecked") + protected void internalRemovePriorityLabelsFromQueue(String queueName, + Set priorityLabels) { + Queue queue = queueCollections.get(queueName); + if (queue.priorityLabels != null) { + queue.priorityLabels.removeAll(priorityLabels); + if (null != dispatcher) { + dispatcher.getEventHandler().handle( + new RemovePriorityLabels(queue.queueName, + getPriorityLabelsToSave(queue.priorityLabels))); + } + } + + LOG.info("removePriorityLabelsOnQueue:"); + LOG.info(" Queue=" + queueName + ", priority labels=[" + + StringUtils.join(priorityLabels.iterator(), ",") + "]"); + } + + /** + * remove labels from queue, labels being removed most be contained by these + * queues + * + * @param priorityLabelsToRemove + * queueName + */ + public void removePriorityLabelsFromQueue(String queueName, + Set priorityLabelsToRemove) + throws IOException { + Set priorityLabels + = verifyAndNormalizeLabels(priorityLabelsToRemove); + checkRemovePriorityLabelsFromQueue(queueName, priorityLabels); + + internalRemovePriorityLabelsFromQueue(queueName, priorityLabels); + } + + protected void checkReplacePriorityLabelsOnQueue( + Map> replacePriorityLabelsOnQueue) + throws IOException { + if (null == replacePriorityLabelsOnQueue + || replacePriorityLabelsOnQueue.isEmpty()) { + return; + } + + // check all labels being added existed + Set knownLabels = clusterPriorityLabelCollections + .keySet(); + for (Entry> entry : replacePriorityLabelsOnQueue + .entrySet()) { + if (!knownLabels.containsAll(normalizeLabels(entry.getValue()))) { + String msg = "Not all labels being replaced contained by known " + + "priority label collections, please check" + ", new labels=[" + + StringUtils.join(entry.getValue().iterator(), ",") + "]"; + LOG.error(msg); + throw new IOException(msg); + } + } + } + + @SuppressWarnings("unchecked") + protected void internalReplacePriorityLabelsOnQueue( + Map> replacePriorityLabelsOnQueue) + throws IOException { + // do replace labels to nodes + for (Entry> entry : replacePriorityLabelsOnQueue + .entrySet()) { + String queueName = entry.getKey(); + Set labels = entry.getValue(); + + Queue queue = queueCollections.get(queueName); + if (queue.priorityLabels != null) { + queue.priorityLabels.clear(); + queue.priorityLabels.addAll(normalizeLabels(labels)); + if (null != dispatcher) { + dispatcher.getEventHandler().handle( + new RemovePriorityLabels(queue.queueName, + getPriorityLabelsToSave(queue.priorityLabels))); + } + } + } + + LOG.info("replacePriorityLabelsInQueue:"); + for (Entry> entry : replacePriorityLabelsOnQueue + .entrySet()) { + LOG.info(" Queue=" + entry.getKey() + ", priority labels=[" + + StringUtils.join(entry.getValue(), ",") + "]"); + } + } + + /** + * replace labels to nodes + * + * @param replacePriorityLabelsOnQueue + * queue -> labels map + */ + public void replacePriorityLabelsOnQueue( + Map> replacePriorityLabelsOnQueue) + throws IOException { + checkReplacePriorityLabelsOnQueue(replacePriorityLabelsOnQueue); + + internalReplacePriorityLabelsOnQueue(replacePriorityLabelsOnQueue); + } + + /** + * Get mapping of nodes to labels + * + * @return queues to labels map + */ + public Map> getPriorityLabels() { + try { + readLock.lock(); + Map> queueToPriorityLabels + = new HashMap>(); + for (Entry entry : queueCollections.entrySet()) { + String queueName = entry.getKey(); + Queue queue = entry.getValue(); + + Set priorityLabels = queue.priorityLabels; + if (priorityLabels == null || priorityLabels.isEmpty()) { + continue; + } + queueToPriorityLabels.put(queueName, + getPriorityLabelsToSave(priorityLabels)); + } + return Collections.unmodifiableMap(queueToPriorityLabels); + } finally { + readLock.unlock(); + } + } + + private Set normalizeLabels(Set priorityLabels) { + Set priorityLabelSet = new HashSet(); + for (String priorityLabel : priorityLabels) { + priorityLabelSet.add(ApplicationPriority.newInstance(priorityLabel)); + } + return priorityLabelSet; + } + + private Set verifyAndNormalizeLabels( + Set priorityLabels) { + Set priorityLabelSet = new HashSet(); + for (String priorityLabel : priorityLabels) { + if (isPriorityLabelExistsInCluster(priorityLabel)) { + priorityLabelSet.add(ApplicationPriority.newInstance(priorityLabel)); + } + } + return priorityLabelSet; + } + + private Set getPriorityLabelsToSave( + Set priorityLabels) { + Set priorityLabelSet = new HashSet(); + for (ApplicationPriority priorityLabel : priorityLabels) { + priorityLabelSet.add(priorityLabel.getApplicationPriority()); + } + return priorityLabelSet; + } + + /** + * Get existing valid labels in repository + * + * @return existing valid labels in repository + */ + public PriorityLabel getPriorityLabelFromCluster(String labelName) { + try { + readLock.lock(); + ApplicationPriority priority = ApplicationPriority.newInstance(labelName); + return this.clusterPriorityLabelCollections.get(priority); + } finally { + readLock.unlock(); + } + } + + /** + * Verify valid priority labels in repository + * + * @return true or false + */ + public boolean isPriorityLabelExistsInCluster(String labelName) { + try { + readLock.lock(); + ApplicationPriority priority = ApplicationPriority.newInstance(labelName); + return this.clusterPriorityLabelCollections.containsKey(priority); + } finally { + readLock.unlock(); + } + } + + /** + * Get existing valid labels in repository + * + * @return existing valid labels in repository + */ + public Set getClusterPriorityLabels() { + try { + readLock.lock(); + Set labels = new HashSet( + clusterPriorityLabelCollections.keySet()); + labels.remove(EMPTY_PRIORITY_LABEL); + return Collections.unmodifiableSet(getPriorityLabelsToSave(labels)); + } finally { + readLock.unlock(); + } + } + + protected String normalizeLabel(String label) { + if (label != null) { + return label.trim(); + } + return EMPTY_PRIORITY_LABEL_NAME; + } + + protected Collection normalizePriorityLabelForRemoval( + Collection priorityLabels) { + List priorityLabelCollection = new ArrayList(); + for (String prLabel : priorityLabels) { + PriorityLabel currentLabel = getPriorityLabelFromCluster(prLabel.trim()); + if (currentLabel != null) { + priorityLabelCollection.add(currentLabel); + } + } + return Collections.unmodifiableList(priorityLabelCollection); + } + + private Set processLabelsForIntegerMapping( + Set priorityLabels) { + Set priorityLabelMapping = new HashSet(); + + for (String priorityLabel : priorityLabels) { + String[] splittedOp = StringUtils.split(priorityLabel, ':'); + if (null != splittedOp && splittedOp.length != 0) { + int priorityValue = Integer.parseInt(splittedOp[1]); + priorityLabelMapping.add(new PriorityLabel(splittedOp[0].trim(), + priorityValue)); + if (priorityValue > MAX_PRIORITY_VALUE) { + MAX_PRIORITY_VALUE = priorityValue; + } + } else if (splittedOp.length == 1) { + // get the max priority value, and increase by 1 + priorityLabelMapping.add(new PriorityLabel(splittedOp[0].trim(), + ++MAX_PRIORITY_VALUE)); + } + } + return priorityLabelMapping; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/FileSystemPriorityLabelsStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/FileSystemPriorityLabelsStore.java new file mode 100644 index 0000000..8cf9f3d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/FileSystemPriorityLabelsStore.java @@ -0,0 +1,299 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.prioritylabels; + +import java.io.EOFException; +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddPriorityLabelsToQueueRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RemoveFromClusterPriorityLabelsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RemovePriorityLabelsFromQueueRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ReplacePriorityLabelsOnQueueRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.StorePriorityLabelsInClusterRequestProto; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddPriorityLabelsToQueueRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterPriorityLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemovePriorityLabelsFromQueueRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReplacePriorityLabelsOnQueueRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.StorePriorityLabelsInClusterRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddPriorityLabelsToQueueRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RemoveFromClusterPriorityLabelsRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RemovePriorityLabelsFromQueueRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReplacePriorityLabelsOnQueueRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.StorePriorityLabelsInClusterRequestPBImpl; + +import com.google.common.collect.Sets; + +public class FileSystemPriorityLabelsStore extends PriorityLabelsStore { + + public FileSystemPriorityLabelsStore(CommonPriorityLabelsManager mgr) { + super(mgr); + } + + protected static final Log LOG = LogFactory + .getLog(FileSystemPriorityLabelsStore.class); + + protected static final String DEFAULT_DIR_NAME = "priority-labels"; + protected static final String MIRROR_FILENAME = "prioritylabel.mirror"; + protected static final String EDITLOG_FILENAME = "prioritylabel.editlog"; + + protected enum SerializedLogType { + ADD_LABELS_TO_QUEUE, STORE_CLUSTER_LABELS, + REMOVE_LABELS_FROM_QUEUE, REMOVE_CLUSTER_PRIORITY_LABELS + } + + Path fsWorkingPath; + FileSystem fs; + FSDataOutputStream editlogOs; + Path editLogPath; + + private String getDefaultFSPriorityLabelsRootDir() throws IOException { + // default is in local: /tmp/hadoop-yarn-${user}/priority-labels/ + return "file:///tmp/hadoop-yarn-" + + UserGroupInformation.getCurrentUser().getShortUserName() + "/" + + DEFAULT_DIR_NAME; + } + + @Override + public void init(Configuration conf) throws Exception { + fsWorkingPath = new Path(conf.get( + YarnConfiguration.FS_PRIORITY_LABELS_STORE_ROOT_DIR, + getDefaultFSPriorityLabelsRootDir())); + + setFileSystem(conf); + + // mkdir of root dir path + fs.mkdirs(fsWorkingPath); + } + + @Override + public void close() throws IOException { + try { + fs.close(); + editlogOs.close(); + } catch (IOException e) { + LOG.warn("Exception happened whiling shutting down,", e); + } + } + + private void setFileSystem(Configuration conf) throws IOException { + Configuration confCopy = new Configuration(conf); + confCopy.setBoolean("dfs.client.retry.policy.enabled", true); + String retryPolicy = confCopy.get( + YarnConfiguration.FS_PRIORITY_LABELS_STORE_RETRY_POLICY_SPEC, + YarnConfiguration.DEFAULT_FS_PRIORITY_LABELS_STORE_RETRY_POLICY_SPEC); + confCopy.set("dfs.client.retry.policy.spec", retryPolicy); + fs = fsWorkingPath.getFileSystem(confCopy); + + // if it's local file system, use RawLocalFileSystem instead of + // LocalFileSystem, the latter one doesn't support append. + if (fs.getScheme().equals("file")) { + fs = ((LocalFileSystem) fs).getRaw(); + } + } + + private void ensureAppendEditlogFile() throws IOException { + editlogOs = fs.append(editLogPath); + } + + private void ensureCloseEditlogFile() throws IOException { + editlogOs.close(); + } + + @Override + public void storePriorityLabelsInCluster(Set priortyLabels) + throws IOException { + ensureAppendEditlogFile(); + editlogOs.writeInt(SerializedLogType.STORE_CLUSTER_LABELS.ordinal()); + ((StorePriorityLabelsInClusterRequestPBImpl) StorePriorityLabelsInClusterRequest + .newInstance(priortyLabels)).getProto().writeDelimitedTo(editlogOs); + ensureCloseEditlogFile(); + } + + @Override + public void addPriorityLabelsToQueue(String queueName, + Set priorityLabels) throws IOException { + ensureAppendEditlogFile(); + editlogOs.writeInt(SerializedLogType.ADD_LABELS_TO_QUEUE.ordinal()); + ((AddPriorityLabelsToQueueRequestPBImpl) AddPriorityLabelsToQueueRequest + .newInstance(queueName, priorityLabels)).getProto().writeDelimitedTo( + editlogOs); + ensureCloseEditlogFile(); + + } + + @Override + public void removePriorityLabelsFromQueue(String queueName, + Collection priorityLabels) throws IOException { + ensureAppendEditlogFile(); + editlogOs.writeInt(SerializedLogType.REMOVE_LABELS_FROM_QUEUE.ordinal()); + ((RemovePriorityLabelsFromQueueRequestPBImpl) RemovePriorityLabelsFromQueueRequest + .newInstance(queueName, Sets.newHashSet(priorityLabels.iterator()))) + .getProto().writeDelimitedTo(editlogOs); + ensureCloseEditlogFile(); + } + + @Override + public void removePriorityLabelsFromCluster(Collection priortyLabels) + throws IOException { + ensureAppendEditlogFile(); + editlogOs.writeInt(SerializedLogType.REMOVE_CLUSTER_PRIORITY_LABELS + .ordinal()); + ((RemoveFromClusterPriorityLabelsRequestPBImpl) RemoveFromClusterPriorityLabelsRequest + .newInstance(Sets.newHashSet(priortyLabels.iterator()))).getProto() + .writeDelimitedTo(editlogOs); + ensureCloseEditlogFile(); + } + + @Override + public void recover() throws IOException { + /* + * Steps of recover + * 1) Read from last mirror (from mirror or mirror.old) + * 2) Read from last edit log, and apply such edit log + * 3) Write new mirror to mirror.writing + * 4) Rename mirror to mirror.old + * 5) Move mirror.writing to mirror + * 6) Remove mirror.old + * 7) Remove edit log and create a new empty edit log + */ + + // Open mirror from serialized file + Path mirrorPath = new Path(fsWorkingPath, MIRROR_FILENAME); + Path oldMirrorPath = new Path(fsWorkingPath, MIRROR_FILENAME + ".old"); + + FSDataInputStream is = null; + if (fs.exists(mirrorPath)) { + is = fs.open(mirrorPath); + } else if (fs.exists(oldMirrorPath)) { + is = fs.open(oldMirrorPath); + } + + if (null != is) { + Set labels = new StorePriorityLabelsInClusterRequestPBImpl( + StorePriorityLabelsInClusterRequestProto.parseDelimitedFrom(is)) + .getPriorityLabels(); + Map> queueToLabels + = new ReplacePriorityLabelsOnQueueRequestPBImpl( + ReplacePriorityLabelsOnQueueRequestProto.parseDelimitedFrom(is)) + .getQueueToLabels(); + prLabelManager.storeClusterPriorityLabels(labels); + prLabelManager.replacePriorityLabelsOnQueue(queueToLabels); + is.close(); + } + + // Open and process editlog + editLogPath = new Path(fsWorkingPath, EDITLOG_FILENAME); + if (fs.exists(editLogPath)) { + is = fs.open(editLogPath); + + while (true) { + try { + // read edit log one by one + SerializedLogType type = SerializedLogType.values()[is.readInt()]; + + switch (type) { + case ADD_LABELS_TO_QUEUE: { + Collection labels = AddPriorityLabelsToQueueRequestProto + .parseDelimitedFrom(is).getPriorityLabelsList(); + String queueName = AddPriorityLabelsToQueueRequestProto + .parseDelimitedFrom(is).getQueueName(); + prLabelManager.addPriorityLabelsToQueue(queueName, + Sets.newHashSet(labels.iterator())); + break; + } + case REMOVE_LABELS_FROM_QUEUE: { + Collection labels = RemovePriorityLabelsFromQueueRequestProto + .parseDelimitedFrom(is).getPriorityLabelsList(); + String queueName = RemovePriorityLabelsFromQueueRequestProto + .parseDelimitedFrom(is).getQueueName(); + prLabelManager.removePriorityLabelsFromQueue(queueName, + Sets.newHashSet(labels.iterator())); + break; + } + case STORE_CLUSTER_LABELS: { + Collection labels = StorePriorityLabelsInClusterRequestProto + .parseDelimitedFrom(is).getPriorityLabelsList(); + prLabelManager.storeClusterPriorityLabels(Sets.newHashSet(labels + .iterator())); + break; + } + case REMOVE_CLUSTER_PRIORITY_LABELS: { + Collection labels = RemoveFromClusterPriorityLabelsRequestProto + .parseDelimitedFrom(is).getPriorityLabelsList(); + prLabelManager.removeFromClusterPriorityLabels(labels); + break; + } + default: + break; + } + } catch (EOFException e) { + // EOF hit, break + break; + } + } + } + + // Serialize current mirror to mirror.writing + Path writingMirrorPath = new Path(fsWorkingPath, MIRROR_FILENAME + + ".writing"); + FSDataOutputStream os = fs.create(writingMirrorPath, true); + ((StorePriorityLabelsInClusterRequestPBImpl) StorePriorityLabelsInClusterRequestPBImpl + .newInstance(prLabelManager.getClusterPriorityLabels())).getProto() + .writeDelimitedTo(os); + ((ReplacePriorityLabelsOnQueueRequestPBImpl) ReplacePriorityLabelsOnQueueRequest + .newInstance(prLabelManager.getPriorityLabels())).getProto() + .writeDelimitedTo(os); + os.close(); + + // Move mirror to mirror.old + if (fs.exists(mirrorPath)) { + fs.delete(oldMirrorPath, false); + fs.rename(mirrorPath, oldMirrorPath); + } + + // move mirror.writing to mirror + fs.rename(writingMirrorPath, mirrorPath); + fs.delete(writingMirrorPath, false); + + // remove mirror.old + fs.delete(oldMirrorPath, false); + + // create a new editlog file + editlogOs = fs.create(editLogPath, true); + editlogOs.close(); + + LOG.info("Finished write mirror at:" + mirrorPath.toString()); + LOG.info("Finished create editlog file at:" + editLogPath.toString()); + + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/PriorityLabelsStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/PriorityLabelsStore.java new file mode 100644 index 0000000..da293b3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/PriorityLabelsStore.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.prioritylabels; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Collection; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationPriority; + +public abstract class PriorityLabelsStore implements Closeable { + protected final CommonPriorityLabelsManager prLabelManager; + protected Configuration conf; + + public PriorityLabelsStore(CommonPriorityLabelsManager prLabelManager) { + this.prLabelManager = prLabelManager; + } + + /** + * Store priority labels to queue + */ + public abstract void addPriorityLabelsToQueue(String queueName, + Set priorityLabels) throws IOException; + + /** + * Store new priority labels in cluster + */ + public abstract void storePriorityLabelsInCluster(Set priorityLabels) + throws IOException; + + /** + * Remove priority labels from cluster + */ + public abstract void removePriorityLabelsFromCluster(Collection priorityLabels) + throws IOException; + + /** + * Remove labels from a queue + */ + public abstract void removePriorityLabelsFromQueue(String queueName, + Collection priorityLabels) throws IOException; + + /** + * Recover labels and node to labels mappings from store + * + * @param conf + */ + public abstract void recover() throws IOException; + + public void init(Configuration conf) throws Exception { + this.conf = conf; + } + + public CommonPriorityLabelsManager getPriorityLabelsManager() { + return prLabelManager; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/event/AddNewPriorityLabels.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/event/AddNewPriorityLabels.java new file mode 100644 index 0000000..b04632a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/event/AddNewPriorityLabels.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.prioritylabels.event; + +import java.util.Set; + + +public class AddNewPriorityLabels extends PriorityLabelsStoreEvent { + private String queueName; + private Set priorityLabel; + + public AddNewPriorityLabels(String queueName, + Set priorityLabel) { + super(PriorityLabelsStoreEventType.ADD_PRIORITY_LABELS_TO_QUEUE); + this.queueName = queueName; + this.priorityLabel = priorityLabel; + } + + public String getQueueName() { + return queueName; + } + + public Set getPriorityLabels() { + return priorityLabel; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/event/PriorityLabelsStoreEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/event/PriorityLabelsStoreEvent.java new file mode 100644 index 0000000..6d0896f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/event/PriorityLabelsStoreEvent.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.prioritylabels.event; + +import org.apache.hadoop.yarn.event.AbstractEvent; + +public class PriorityLabelsStoreEvent extends + AbstractEvent { + public PriorityLabelsStoreEvent(PriorityLabelsStoreEventType type) { + super(type); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/event/PriorityLabelsStoreEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/event/PriorityLabelsStoreEventType.java new file mode 100644 index 0000000..e23d847 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/event/PriorityLabelsStoreEventType.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.prioritylabels.event; + +public enum PriorityLabelsStoreEventType { + REMOVE_PRIORITY_LABELS_FROM_QUEUE, + ADD_PRIORITY_LABELS_TO_QUEUE, + STORE_CLUSTER_PRIORITY_LABELS, + REMOVE_CLUSTER_PRIORITY_LABELS +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/event/RemoveClusterPriorityLabels.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/event/RemoveClusterPriorityLabels.java new file mode 100644 index 0000000..e482fca --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/event/RemoveClusterPriorityLabels.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.prioritylabels.event; + +import java.util.Collection; + +public class RemoveClusterPriorityLabels extends PriorityLabelsStoreEvent { + private Collection priorityLabels; + + public RemoveClusterPriorityLabels(Collection priorityLabels) { + super(PriorityLabelsStoreEventType.REMOVE_CLUSTER_PRIORITY_LABELS); + this.priorityLabels = priorityLabels; + } + + public Collection getPriorityLabels() { + return priorityLabels; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/event/RemovePriorityLabels.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/event/RemovePriorityLabels.java new file mode 100644 index 0000000..9b8213e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/event/RemovePriorityLabels.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.prioritylabels.event; + +import java.util.Collection; + +public class RemovePriorityLabels extends PriorityLabelsStoreEvent { + private Collection priorityLabels; + private String queueName; + + public RemovePriorityLabels(String queueName, + Collection priorityLabels) { + super(PriorityLabelsStoreEventType.REMOVE_PRIORITY_LABELS_FROM_QUEUE); + this.priorityLabels = priorityLabels; + this.queueName = queueName; + } + + public Collection getPriorityLabels() { + return priorityLabels; + } + + public String getQueueName() { + return queueName; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/event/StoreClusterPriorityLabels.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/event/StoreClusterPriorityLabels.java new file mode 100644 index 0000000..3015021 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/prioritylabels/event/StoreClusterPriorityLabels.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.prioritylabels.event; + +import java.util.Set; + +public class StoreClusterPriorityLabels extends PriorityLabelsStoreEvent { + private Set priorityLabels; + + public StoreClusterPriorityLabels(Set priorityLabels) { + super(PriorityLabelsStoreEventType.STORE_CLUSTER_PRIORITY_LABELS); + this.priorityLabels = priorityLabels; + } + + public Set getPriorityLabels() { + return priorityLabels; + } +} -- 1.9.4.msysgit.1